Add blocking connection with timeout, and initial poll functionality.

This commit is contained in:
Christoffer Lerno
2023-09-03 19:03:00 +02:00
parent 29cc9ad8b1
commit d1bb9c55ee
11 changed files with 203 additions and 15 deletions

View File

@@ -25,6 +25,7 @@ fault IoError
FILE_NOT_FOUND,
FILE_NOT_VALID,
GENERAL_ERROR,
ILLEGAL_ARGUMENT,
INCOMPLETE_WRITE,
INTERRUPTED,
INVALID_POSITION,

View File

@@ -46,7 +46,7 @@ const AIFamily PLATFORM_AF_UTUN = 38;
const AIFamily PLATFORM_AF_VSOCK = 40;
const AIFamily PLATFORM_AF_MAX = 41;
const int PLATFORM_O_NONBLOCK = 0x30;
const int PLATFORM_O_NONBLOCK = 0x04;
// https://opensource.apple.com/source/xnu/xnu-4570.41.2/bsd/sys/socket.h.auto.html
const int SOL_SOCKET = 0xffff;
@@ -82,4 +82,15 @@ const int SO_NOSIGPIPE = 0x1022; // Apple: No SIGPIPE on EPIPE
const int SO_NOADDRERR = 0x1023; // Apple: Returns EADDRNOTAVAIL when src is not available anymore
const int SO_NWRITE = 0x1024; // Apple: Get number of bytes currently in send socket buffer
const int SO_REUSESHAREUID = 0x1025; // Apple: Allow reuse of port/socket by different userids
const int SO_LINGER_SEC = 0x1080; // linger on close if data present (in seconds)
const int SO_LINGER_SEC = 0x1080; // linger on close if data present (in seconds)
const CShort POLLRDNORM = 0x0040;
const CShort POLLRDBAND = 0x0080;
const CShort POLLWRNORM = POLLOUT;
const CShort POLLWRBAND = 0x0100;
const CShort POLLEXTEND = 0x0200; // file may have been extended
const CShort POLLATTRIB = 0x0400; // file attributes may have changed
const CShort POLLNLINK = 0x0800; // (un)link/rename may have happened
const CShort POLLWRITE = 0x1000; // file's contents may have changed

View File

@@ -77,4 +77,14 @@ const int SO_RESERVE_MEM = 73;
const int SO_TXREHASH = 74;
const int SO_RCVMARK = 75;
const int SO_PASSPIDFD = 76;
const int SO_PEERPIDFD = 77;
const int SO_PEERPIDFD = 77;
const CUShort POLLRDNORM = 0x0040;
const CUShort POLLRDBAND = 0x0080;
const CUShort POLLWRNORM = 0x0100;
const CUShort POLLWRBAND = 0x0200;
const CUShort POLLMSG = 0x0400;
const CUShort POLLREMOVE = 0x1000;
const CUShort POLLRDHUP = 0x2000;
const CUShort POLLFREE = 0x4000;
const CUShort POLL_BUSY_LOOP = 0x8000;

View File

@@ -6,6 +6,14 @@ const int F_SETFL = 4;
def NativeSocket = distinct inline Fd;
struct Posix_pollfd
{
CInt fd;
CUShort events;
CUShort revents;
}
def Posix_nfds_t = CUInt;
extern fn CInt connect(NativeSocket socket, SockAddrPtr address, Socklen_t address_len);
extern fn NativeSocket socket(AIFamily af, AISockType type, AIProtocol protocol) @extern("socket");
@@ -13,10 +21,18 @@ extern fn int fcntl(NativeSocket socket, int cmd, ...) @extern("fcntl");
extern fn CInt bind(NativeSocket socket, SockAddrPtr address, Socklen_t address_len) @extern("bind");
extern fn CInt listen(NativeSocket socket, CInt backlog) @extern("listen");
extern fn NativeSocket accept(NativeSocket socket, SockAddrPtr address, Socklen_t* address_len) @extern("accept");
extern fn CInt poll(Posix_pollfd* fds, Posix_nfds_t nfds, CInt timeout);
fn anyfault socket_error()
const CUShort POLLIN = 0x0001;
const CUShort POLLPRI = 0x0002;
const CUShort POLLOUT = 0x0004;
const CUShort POLLERR = 0x0008;
const CUShort POLLHUP = 0x0010;
const CUShort POLLNVAL = 0x0020;
fn anyfault convert_error(Errno error)
{
switch (libc::errno())
switch (error)
{
case errno::EACCES: return IoError.NO_PERMISSION;
case errno::EADDRINUSE: return NetError.ADDRESS_IN_USE;
@@ -34,6 +50,11 @@ fn anyfault socket_error()
}
}
fn anyfault socket_error()
{
return convert_error(libc::errno());
}
macro void! NativeSocket.close(self)
{
if (libc::close(self))
@@ -65,5 +86,5 @@ macro void! NativeSocket.set_non_blocking(self, bool non_blocking)
macro bool NativeSocket.is_non_blocking(self)
{
return fcntl(self, F_GETFL, 0) & O_NONBLOCK == O_NONBLOCK;
return fcntl(self, F_GETFL, 0) & O_NONBLOCK != 0;
}

View File

@@ -9,9 +9,13 @@ const AIFamily PLATFORM_AF_INET6 = 23;
const AIFamily PLATFORM_AF_IRDA = 26;
const AIFamily PLATFORM_AF_BTH = 32;
const int FIONREAD = 1074030207;
const int FIONBIO = -2147195266;
const int FIOASYNC = -2147195267;
def NativeSocket = distinct uptr;
extern fn int ioctlsocket(NativeSocket, long cmd, ulong *argp);
extern fn CInt ioctlsocket(NativeSocket, CLong cmd, CULong *argp);
extern fn WSAError closesocket(NativeSocket);
extern fn NativeSocket socket(AIFamily af, AISockType type, AIProtocol protocol);
extern fn int connect(NativeSocket, SockAddrPtr address, Socklen_t address_len);
@@ -21,7 +25,7 @@ extern fn NativeSocket accept(NativeSocket, SockAddrPtr address, Socklen_t* addr
fn void! NativeSocket.set_non_blocking(self, bool non_blocking)
{
if (ioctlsocket(self, win32::FIONBIO, &&(ulong)non_blocking))
if (ioctlsocket(self, win32::FIONBIO, &&(CULong)non_blocking))
{
return socket_error()?;
}
@@ -76,7 +80,19 @@ fn anyfault convert_error(WSAError error)
default: return IoError.GENERAL_ERROR;
}
}
fn anyfault socket_error()
{
return convert_error(win32_WSAGetLastError());
}
const CUShort POLLIN = win32::POLLIN;
const CUShort POLLPRI = win32::POLLPRI;
const CUShort POLLOUT = win32::POLLOUT;
const CUShort POLLERR = win32::POLLERR;
const CUShort POLLHUP = win32::POLLHUP;
const CUShort POLLNVAL = win32::POLLNVAL;
const CUShort POLLRDNORM = win32::POLLRDNORM;
const CUShort POLLRDBAND = win32::POLLRDBAND;
const CUShort POLLWRNORM = win32::POLLWRNORM;
const CUShort POLLWRBAND = win32::POLLWRBAND;

View File

@@ -25,6 +25,53 @@ macro void @loop_over_ai(AddrInfo* ai; @body(NativeSocket fd, AddrInfo* ai))
}
}
const Duration POLL_FOREVER = -1;
def PollSubscribes = distinct ushort;
def PollEvents = distinct ushort;
const PollSubscribes SUBSCRIBE_ANY_READ = os::POLLIN;
const PollSubscribes SUBSCRIBE_PRIO_READ = os::POLLPRI;
const PollSubscribes SUBSCRIBE_OOB_READ = os::POLLRDBAND;
const PollSubscribes SUBSCRIBE_READ = os::POLLRDNORM;
const PollSubscribes SUBSCRIBE_ANY_WRITE = os::POLLOUT;
const PollSubscribes SUBSCRIBE_OOB_WRITE = os::POLLWRBAND;
const PollSubscribes SUBSCRIBE_WRITE = os::POLLWRNORM;
const PollEvents POLL_EVENT_READ_PRIO = os::POLLPRI;
const PollEvents POLL_EVENT_READ_OOB = os::POLLRDBAND;
const PollEvents POLL_EVENT_READ = os::POLLRDNORM;
const PollEvents POLL_EVENT_WRITE_OOB = os::POLLWRBAND;
const PollEvents POLL_EVENT_WRITE = os::POLLWRNORM;
const PollEvents POLL_EVENT_DISCONNECT = os::POLLHUP;
const PollEvents POLL_EVENT_ERROR = os::POLLERR;
const PollEvents POLL_EVENT_INVALID = os::POLLNVAL;
struct Poll
{
NativeSocket socket;
PollSubscribes events;
PollEvents revents;
}
/**
* @param [inout] polls
* @param timeout "duration to poll."
**/
fn ulong! poll(Poll[] polls, Duration timeout)
{
long time_ms = timeout.to_ms();
if (time_ms > CInt.max) time_ms = CInt.max;
$if env::WIN32:
CInt result = win32_WSAPoll((Win32_LPWSAPOLLFD)polls.ptr, (Win32_ULONG)polls.len, (CInt)time_ms);
$else
CInt result = os::poll((Posix_pollfd*)polls.ptr, (Posix_nfds_t)polls.len, (CInt)time_ms);
$endif
if (result == 0) return 0;
if (result > 0) return (ulong)result;
return os::socket_error()?;
}
macro Socket new_socket(fd, ai)
{
Socket sock = { .stream.fns = &SOCKETSTREAM_INTERFACE, .sock = fd, .ai_addrlen = ai.ai_addrlen };

View File

@@ -1,5 +1,6 @@
module std::net @if(os::SUPPORTS_INET);
import libc;
macro apply_sockoptions(sockfd, options) @private
{
Socket sock = { .sock = sockfd };
@@ -8,7 +9,6 @@ macro apply_sockoptions(sockfd, options) @private
fn Socket! connect_from_addrinfo(AddrInfo* addrinfo, SocketOption[] options) @private
{
@loop_over_ai(addrinfo; NativeSocket sockfd, AddrInfo* ai)
{
apply_sockoptions(sockfd, options)!;
@@ -34,6 +34,52 @@ fn bool last_error_is_delayed_connect()
return err == errno::EINPROGRESS || err == errno::EAGAIN || err == errno::EWOULDBLOCK;
$endswitch
}
fn Socket! connect_with_timeout_from_addrinfo(AddrInfo* addrinfo, SocketOption[] options, Duration timeout) @private
{
Clock c = 0;
@loop_over_ai(addrinfo; NativeSocket sockfd, AddrInfo* ai)
{
apply_sockoptions(sockfd, options)!;
sockfd.set_non_blocking(true)!;
int errcode = os::connect(sockfd, ai.ai_addr, ai.ai_addrlen);
if (!errcode)
{
// It worked, restore blocking.
sockfd.set_non_blocking(false)!;
return new_socket(sockfd, ai);
}
if (last_error_is_delayed_connect())
{
Duration timeout_left = timeout;
if (c)
{
Duration to_remove = c.to_now().to_duration();
if (to_remove >= timeout_left)
{
return NetError.CONNECTION_TIMED_OUT?;
}
timeout_left -= to_remove;
}
else
{
c = clock::now();
}
Poll poll_request = { sockfd, SUBSCRIBE_ANY_WRITE, 0 };
if (!poll((&poll_request)[:1], timeout_left)!)
{
return NetError.CONNECTION_TIMED_OUT?;
}
if (poll_request.revents & POLL_EVENT_WRITE)
{
sockfd.set_non_blocking(false)!;
return new_socket(sockfd, ai);
}
}
};
return os::socket_error()?;
}
fn Socket! connect_async_from_addrinfo(AddrInfo* addrinfo, SocketOption[] options) @private
{
@loop_over_ai(addrinfo; NativeSocket sockfd, AddrInfo* ai)
@@ -48,7 +94,6 @@ fn Socket! connect_async_from_addrinfo(AddrInfo* addrinfo, SocketOption[] option
}
};
return os::socket_error()?;
}
macro @network_loop_over_ai(network, host, port; @body(fd, ai)) @private

View File

@@ -5,16 +5,20 @@ import libc;
def TcpSocket = distinct inline Socket;
def TcpServerSocket = distinct inline Socket;
fn TcpSocket! connect(String host, uint port, SocketOption... options, IpProtocol protocol = UNSPECIFIED)
fn TcpSocket! connect(String host, uint port, Duration timeout = 0, SocketOption... options, IpProtocol protocol = UNSPECIFIED)
{
AddrInfo* ai = net::addrinfo(host, port, protocol.ai_family, os::SOCK_STREAM)!;
defer os::freeaddrinfo(ai);
return connect_to(ai, ...options);
if (timeout > 0)
{
return connect_to(ai, ...options);
}
return (TcpSocket)net::connect_with_timeout_from_addrinfo(ai, options, timeout)!;
}
fn TcpSocket! connect_async(String host, uint port, SocketOption... options, IpProtocol protocol = UNSPECIFIED)
{
AddrInfo* ai = net::addrinfo(host, port, protocol.ai_family, os::SOCK_STREAM)!;
AddrInfo* ai = net::addrinfo(host, port, protocol.ai_family, os::SOCK_STREAM)!;
defer os::freeaddrinfo(ai);
return connect_async_to(ai, ...options);
}

View File

@@ -146,6 +146,7 @@ def Win32_SC_LOCK = Win32_LPVOID;
def Win32_SERVICE_STATUS_HANDLE = Win32_HANDLE;
def Win32_SHORT = short;
def Win32_SIZE_T = usz;
def Win32_SOCKET = Win32_HANDLE;
def Win32_SSIZE_T = isz;
def Win32_TBYTE = Win32_WCHAR;
def Win32_TCHAR = Win32_WCHAR;

View File

@@ -1,11 +1,37 @@
module std::os::win32 @if(env::WIN32);
def WSAError = distinct int;
// See https://github.com/wine-mirror/wine/blob/master/include/winsock2.h
def WSAError = distinct int;
struct Win32_pollfd
{
Win32_SOCKET fd;
Win32_SHORT events;
Win32_SHORT revents;
}
def Win32_WSAPOLLFD = Win32_pollfd;
def Win32_PWSAPOLLFD = Win32_WSAPOLLFD*;
def Win32_LPWSAPOLLFD = Win32_WSAPOLLFD*;
const Win32_SHORT POLLERR = 0x0001;
const Win32_SHORT POLLHUP = 0x0002;
const Win32_SHORT POLLNVAL = 0x0004;
const Win32_SHORT POLLWRNORM = 0x0010;
const Win32_SHORT POLLWRBAND = 0x0020;
const Win32_SHORT POLLRDNORM = 0x0100;
const Win32_SHORT POLLRDBAND = 0x0200;
const Win32_SHORT POLLPRI = 0x0400;
const Win32_SHORT POLLIN = POLLRDNORM | POLLRDBAND;
const Win32_SHORT POLLOUT = POLLWRNORM;
const SD_RECEIVE = 0x00;
const SD_SEND = 0x01;
const SD_BOTH = 0x02;
extern fn CInt win32_WSAPoll(Win32_LPWSAPOLLFD fdArray, Win32_ULONG fds, Win32_INT timeout) @extern("WSAPoll") @builtin;
extern fn WSAError win32_WSAGetLastError() @extern("WSAGetLastError") @builtin;
extern fn void win32_WSASetLastError(WSAError error) @extern("WSASetLastError") @builtin;
extern fn CInt win32_WSAStartup(Win32_WORD, void*) @extern("WSAStartup") @builtin;
const int FIONBIO = -2147195266;
const int FIONREAD = 1074030207;
const int SIOCATMARK = 1074033415;

View File

@@ -5,6 +5,11 @@ def Duration = distinct long @inline;
def Clock = distinct ulong @inline;
def NanoDuration = distinct long @inline;
fn Duration ms(long l) @inline => (Duration)l * 1000;
fn Duration sec(long l) @inline => (Duration)l * MICROSECONDS_PER_SECOND;
fn Duration min(long l) @inline => (Duration)l * MICROSECONDS_PER_MINUTE;
fn Duration hour(long l) @inline => (Duration)l * MICROSECONDS_PER_HOUR;
const Duration MICROSECONDS_PER_SECOND = 1_000_000;
const Duration MICROSECONDS_PER_MINUTE = MICROSECONDS_PER_SECOND * 60;
const Duration MICROSECONDS_PER_HOUR = MICROSECONDS_PER_MINUTE * 60;
@@ -85,6 +90,7 @@ fn double NanoDuration.to_sec(nd) => (double)nd / 1_000_000_000.0;
fn long NanoDuration.to_ms(nd) => (long)nd / 1_000_000;
fn Duration NanoDuration.to_duration(nd) => (Duration)nd / 1_000;
fn NanoDuration Duration.to_nano(td) => (NanoDuration)td * 1_000;
fn long Duration.to_ms(td) => (long)td / 1_000;
fn usz! NanoDuration.to_format(&self, Formatter* formatter) @dynamic
{