Commit 559044bf authored by Kenton Varda's avatar Kenton Varda

Make wrapConnectingSocketFd() have same signature on Unix as Windows.

parent f044b0a6
...@@ -416,27 +416,6 @@ public: ...@@ -416,27 +416,6 @@ public:
KJ_SYSCALL(::bind(sockfd, &addr.generic, addrlen), toString()); KJ_SYSCALL(::bind(sockfd, &addr.generic, addrlen), toString());
} }
void connect(int sockfd) const {
// Unfortunately connect() doesn't fit the mold of KJ_NONBLOCKING_SYSCALL, since it indicates
// non-blocking using EINPROGRESS.
for (;;) {
if (::connect(sockfd, &addr.generic, addrlen) < 0) {
int error = errno;
if (error == EINPROGRESS) {
return;
} else if (error != EINTR) {
KJ_FAIL_SYSCALL("connect()", error, toString()) {
// Recover by returning, since reads/writes will simply fail.
return;
}
}
} else {
// no error
return;
}
}
}
uint getPort() const { uint getPort() const {
switch (addr.generic.sa_family) { switch (addr.generic.sa_family) {
case AF_INET: return ntohs(addr.inet4.sin_port); case AF_INET: return ntohs(addr.inet4.sin_port);
...@@ -901,7 +880,26 @@ public: ...@@ -901,7 +880,26 @@ public:
Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) override { Own<AsyncIoStream> wrapSocketFd(int fd, uint flags = 0) override {
return heap<AsyncStreamFd>(eventPort, fd, flags); return heap<AsyncStreamFd>(eventPort, fd, flags);
} }
Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(int fd, uint flags = 0) override { Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
int fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) override {
// Unfortunately connect() doesn't fit the mold of KJ_NONBLOCKING_SYSCALL, since it indicates
// non-blocking using EINPROGRESS.
for (;;) {
if (::connect(fd, addr, addrlen) < 0) {
int error = errno;
if (error == EINPROGRESS) {
// Fine.
break;
} else if (error != EINTR) {
KJ_FAIL_SYSCALL("connect()", error) { break; }
return Own<AsyncIoStream>();
}
} else {
// no error
break;
}
}
auto result = heap<AsyncStreamFd>(eventPort, fd, flags); auto result = heap<AsyncStreamFd>(eventPort, fd, flags);
auto connected = result->waitConnected(); auto connected = result->waitConnected();
...@@ -940,7 +938,9 @@ public: ...@@ -940,7 +938,9 @@ public:
: lowLevel(lowLevel), addrs(kj::mv(addrs)) {} : lowLevel(lowLevel), addrs(kj::mv(addrs)) {}
Promise<Own<AsyncIoStream>> connect() override { Promise<Own<AsyncIoStream>> connect() override {
return connectImpl(0); auto addrsCopy = heapArray(addrs.asPtr());
auto promise = connectImpl(lowLevel, addrsCopy);
return promise.attach(kj::mv(addrsCopy));
} }
Own<ConnectionReceiver> listen() override { Own<ConnectionReceiver> listen() override {
...@@ -1010,34 +1010,23 @@ private: ...@@ -1010,34 +1010,23 @@ private:
Array<SocketAddress> addrs; Array<SocketAddress> addrs;
uint counter = 0; uint counter = 0;
Promise<Own<AsyncIoStream>> connectImpl(uint index) { static Promise<Own<AsyncIoStream>> connectImpl(
KJ_ASSERT(index < addrs.size()); LowLevelAsyncIoProvider& lowLevel, ArrayPtr<SocketAddress> addrs) {
KJ_ASSERT(addrs.size() > 0);
int fd = addrs[index].socket(SOCK_STREAM);
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { int fd = addrs[0].socket(SOCK_STREAM);
addrs[index].connect(fd);
})) {
// Connect failed.
close(fd);
if (index + 1 < addrs.size()) {
// Try the next address instead.
return connectImpl(index + 1);
} else {
// No more addresses to try, so propagate the exception.
return kj::mv(*exception);
}
}
return lowLevel.wrapConnectingSocketFd(fd, NEW_FD_FLAGS).then( return kj::evalNow([&]() {
[](Own<AsyncIoStream>&& stream) -> Promise<Own<AsyncIoStream>> { return lowLevel.wrapConnectingSocketFd(
fd, addrs[0].getRaw(), addrs[0].getRawSize(), NEW_FD_FLAGS);
}).then([](Own<AsyncIoStream>&& stream) -> Promise<Own<AsyncIoStream>> {
// Success, pass along. // Success, pass along.
return kj::mv(stream); return kj::mv(stream);
}, [this,index](Exception&& exception) -> Promise<Own<AsyncIoStream>> { }, [&lowLevel,addrs](Exception&& exception) mutable -> Promise<Own<AsyncIoStream>> {
// Connect failed. // Connect failed.
if (index + 1 < addrs.size()) { if (addrs.size() > 1) {
// Try the next address instead. // Try the next address instead.
return connectImpl(index + 1); return connectImpl(lowLevel, addrs.slice(1, addrs.size()));
} else { } else {
// No more addresses to try, so propagate the exception. // No more addresses to try, so propagate the exception.
return kj::mv(exception); return kj::mv(exception);
......
...@@ -425,20 +425,12 @@ public: ...@@ -425,20 +425,12 @@ public:
// //
// `flags` is a bitwise-OR of the values of the `Flags` enum. // `flags` is a bitwise-OR of the values of the `Flags` enum.
#if _WIN32
virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd( virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(
Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0; Fd fd, const struct sockaddr* addr, uint addrlen, uint flags = 0) = 0;
#else // Create an AsyncIoStream wrapping a socket and initiate a connection to the given address.
virtual Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(Fd fd, uint flags = 0) = 0; // The returned promise does not resolve until connection has completed.
#endif
// Create an AsyncIoStream wrapping a socket that is in the process of connecting. The returned
// promise should not resolve until connection has completed -- traditionally indicated by the
// descriptor becoming writable.
// //
// `flags` is a bitwise-OR of the values of the `Flags` enum. // `flags` is a bitwise-OR of the values of the `Flags` enum.
//
// On Windows, the callee initiates connect rather than the caller.
// TODO(now): Maybe on all systems?
virtual Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) = 0; virtual Own<ConnectionReceiver> wrapListenSocketFd(Fd fd, uint flags = 0) = 0;
// Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already // Create an AsyncIoStream wrapping a listen socket file descriptor. This socket should already
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment