Commit 50b9a529 authored by Kenton Varda's avatar Kenton Varda

Refactor UnixEventPort interface to be edge-triggering-friendly.

parent d4dc32d6
...@@ -38,11 +38,7 @@ ...@@ -38,11 +38,7 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netdb.h> #include <netdb.h>
#include <set> #include <set>
#include <poll.h>
#ifndef POLLRDHUP
// Linux-only optimization. If not available, define to 0, as this will make it a no-op.
#define POLLRDHUP 0
#endif
namespace kj { namespace kj {
...@@ -113,7 +109,9 @@ private: ...@@ -113,7 +109,9 @@ private:
class AsyncStreamFd: public OwnedFileDescriptor, public AsyncIoStream { class AsyncStreamFd: public OwnedFileDescriptor, public AsyncIoStream {
public: public:
AsyncStreamFd(UnixEventPort& eventPort, int fd, uint flags) AsyncStreamFd(UnixEventPort& eventPort, int fd, uint flags)
: OwnedFileDescriptor(fd, flags), eventPort(eventPort) {} : OwnedFileDescriptor(fd, flags),
readObserver(eventPort, fd),
writeObserver(eventPort, fd) {}
virtual ~AsyncStreamFd() noexcept(false) {} virtual ~AsyncStreamFd() noexcept(false) {}
Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override { Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
...@@ -134,7 +132,17 @@ public: ...@@ -134,7 +132,17 @@ public:
Promise<void> write(const void* buffer, size_t size) override { Promise<void> write(const void* buffer, size_t size) override {
ssize_t writeResult; ssize_t writeResult;
KJ_NONBLOCKING_SYSCALL(writeResult = ::write(fd, buffer, size)) { KJ_NONBLOCKING_SYSCALL(writeResult = ::write(fd, buffer, size)) {
return READY_NOW; // Error.
// We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
// a bug that exists in both Clang and GCC:
// http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
// http://llvm.org/bugs/show_bug.cgi?id=12286
goto error;
}
if (false) {
error:
return kj::READY_NOW;
} }
// A negative result means EAGAIN, which we can treat the same as having written zero bytes. // A negative result means EAGAIN, which we can treat the same as having written zero bytes.
...@@ -142,12 +150,14 @@ public: ...@@ -142,12 +150,14 @@ public:
if (n == size) { if (n == size) {
return READY_NOW; return READY_NOW;
} else {
buffer = reinterpret_cast<const byte*>(buffer) + n;
size -= n;
} }
return eventPort.onFdEvent(fd, POLLOUT).then([=](short) { // Fewer than `size` bytes were written, therefore we must be out of buffer space. Wait until
// the fd becomes writable again.
buffer = reinterpret_cast<const byte*>(buffer) + n;
size -= n;
return writeObserver.whenBecomesWritable().then([=]() {
return write(buffer, size); return write(buffer, size);
}); });
} }
...@@ -166,9 +176,32 @@ public: ...@@ -166,9 +176,32 @@ public:
KJ_SYSCALL(shutdown(fd, SHUT_WR)); KJ_SYSCALL(shutdown(fd, SHUT_WR));
} }
Promise<void> waitConnected() {
// Wait until initial connection has completed. This actually just waits until it is writable.
// Can't just go directly to writeObserver.whenBecomesWritable() because of edge triggering. We
// need to explicitly check if the socket is already connected.
struct pollfd pollfd;
memset(&pollfd, 0, sizeof(pollfd));
pollfd.fd = fd;
pollfd.events = POLLOUT;
int pollResult;
KJ_SYSCALL(pollResult = poll(&pollfd, 1, 0));
if (pollResult == 0) {
// Not ready yet. We can safely use the edge-triggered observer.
return readObserver.whenBecomesReadable();
} else {
// Ready now.
return kj::READY_NOW;
}
}
private: private:
UnixEventPort& eventPort; UnixEventPort::ReadObserver readObserver;
bool gotHup = false; UnixEventPort::WriteObserver writeObserver;
Promise<size_t> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes, Promise<size_t> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes,
size_t alreadyRead) { size_t alreadyRead) {
...@@ -178,44 +211,61 @@ private: ...@@ -178,44 +211,61 @@ private:
ssize_t n; ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = ::read(fd, buffer, maxBytes)) { KJ_NONBLOCKING_SYSCALL(n = ::read(fd, buffer, maxBytes)) {
// Error.
// We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
// a bug that exists in both Clang and GCC:
// http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
// http://llvm.org/bugs/show_bug.cgi?id=12286
goto error;
}
if (false) {
error:
return alreadyRead; return alreadyRead;
} }
if (n < 0) { if (n < 0) {
// Read would block. // Read would block.
return eventPort.onFdEvent(fd, POLLIN | POLLRDHUP).then([=](short events) { return readObserver.whenBecomesReadable().then([=]() {
gotHup = events & (POLLHUP | POLLRDHUP);
return tryReadInternal(buffer, minBytes, maxBytes, alreadyRead); return tryReadInternal(buffer, minBytes, maxBytes, alreadyRead);
}); });
} else if (n == 0) { } else if (n == 0) {
// EOF -OR- maxBytes == 0. // EOF -OR- maxBytes == 0.
return alreadyRead; return alreadyRead;
} else if (implicitCast<size_t>(n) < minBytes) { } else if (implicitCast<size_t>(n) >= minBytes) {
// We read enough to stop here.
return alreadyRead + n;
} else {
// The kernel returned fewer bytes than we asked for (and fewer than we need). // The kernel returned fewer bytes than we asked for (and fewer than we need).
if (gotHup) {
// We've already received an indication that the next read() will return EOF, so there's buffer = reinterpret_cast<byte*>(buffer) + n;
// nothing to wait for. minBytes -= n;
return alreadyRead + n; maxBytes -= n;
alreadyRead += n;
KJ_IF_MAYBE(atEnd, readObserver.atEndHint()) {
if (*atEnd) {
// We've already received an indication that the next read() will return EOF, so there's
// nothing to wait for.
return alreadyRead;
} else {
// As of the last time the event queue was checked, the kernel reported that we were
// *not* at the end of the stream. It's unlikely that this has changed in the short time
// it took to handle the event, therefore calling read() now will almost certainly fail
// with EAGAIN. Moreover, since EOF had not been received as of the last check, we know
// that even if it was received since then, whenBecomesReadable() will catch that. So,
// let's go ahead and skip calling read() here and instead go straight to waiting for
// more input.
return readObserver.whenBecomesReadable().then([=]() {
return tryReadInternal(buffer, minBytes, maxBytes, alreadyRead);
});
}
} else { } else {
// We know that calling read() again will simply fail with EAGAIN (unless a new packet just // The kernel has not indicated one way or the other whether we are likely to be at EOF.
// arrived, which is unlikely), so let's not bother to call read() again but instead just // In this case we *must* keep calling read() until we either get a return of zero or
// go strait to polling. // EAGAIN.
// return tryReadInternal(buffer, minBytes, maxBytes, alreadyRead);
// Note: Actually, if we haven't done any polls yet, then we haven't had a chance to
// receive POLLRDHUP yet, so it's possible we're at EOF. But that seems like a
// sufficiently unusual case that we're better off skipping straight to polling here.
buffer = reinterpret_cast<byte*>(buffer) + n;
minBytes -= n;
maxBytes -= n;
alreadyRead += n;
return eventPort.onFdEvent(fd, POLLIN | POLLRDHUP).then([=](short events) {
gotHup = events & (POLLHUP | POLLRDHUP);
return tryReadInternal(buffer, minBytes, maxBytes, alreadyRead);
});
} }
} else {
// We read enough to stop here.
return alreadyRead + n;
} }
} }
...@@ -252,9 +302,9 @@ private: ...@@ -252,9 +302,9 @@ private:
// Discard all data that was written, then issue a new write for what's left (if any). // Discard all data that was written, then issue a new write for what's left (if any).
for (;;) { for (;;) {
if (n < firstPiece.size()) { if (n < firstPiece.size()) {
// Only part of the first piece was consumed. Wait for POLLOUT and then write again. // Only part of the first piece was consumed. Wait for buffer space and then write again.
firstPiece = firstPiece.slice(n, firstPiece.size()); firstPiece = firstPiece.slice(n, firstPiece.size());
return eventPort.onFdEvent(fd, POLLOUT).then([=](short) { return writeObserver.whenBecomesWritable().then([=]() {
return writeInternal(firstPiece, morePieces); return writeInternal(firstPiece, morePieces);
}); });
} else if (morePieces.size() == 0) { } else if (morePieces.size() == 0) {
...@@ -681,7 +731,7 @@ Promise<Array<SocketAddress>> SocketAddress::lookupHost( ...@@ -681,7 +731,7 @@ Promise<Array<SocketAddress>> SocketAddress::lookupHost(
class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFileDescriptor { class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFileDescriptor {
public: public:
FdConnectionReceiver(UnixEventPort& eventPort, int fd, uint flags) FdConnectionReceiver(UnixEventPort& eventPort, int fd, uint flags)
: OwnedFileDescriptor(fd, flags), eventPort(eventPort) {} : OwnedFileDescriptor(fd, flags), eventPort(eventPort), observer(eventPort, fd) {}
Promise<Own<AsyncIoStream>> accept() override { Promise<Own<AsyncIoStream>> accept() override {
int newFd; int newFd;
...@@ -704,7 +754,7 @@ public: ...@@ -704,7 +754,7 @@ public:
case EWOULDBLOCK: case EWOULDBLOCK:
#endif #endif
// Not ready yet. // Not ready yet.
return eventPort.onFdEvent(fd, POLLIN).then([this](short) { return observer.whenBecomesReadable().then([this]() {
return accept(); return accept();
}); });
...@@ -735,6 +785,7 @@ public: ...@@ -735,6 +785,7 @@ public:
public: public:
UnixEventPort& eventPort; UnixEventPort& eventPort;
UnixEventPort::ReadObserver observer;
}; };
class TimerImpl final: public Timer { class TimerImpl final: public Timer {
...@@ -773,16 +824,17 @@ public: ...@@ -773,16 +824,17 @@ public:
} }
Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(int fd, uint flags = 0) override { Promise<Own<AsyncIoStream>> wrapConnectingSocketFd(int fd, uint flags = 0) override {
auto result = heap<AsyncStreamFd>(eventPort, fd, flags); auto result = heap<AsyncStreamFd>(eventPort, fd, flags);
return eventPort.onFdEvent(fd, POLLOUT).then(kj::mvCapture(result,
[fd](Own<AsyncIoStream>&& stream, short events) { auto connected = result->waitConnected();
int err; return connected.then(kj::mvCapture(result, [fd](Own<AsyncIoStream>&& stream) {
socklen_t errlen = sizeof(err); int err;
KJ_SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen)); socklen_t errlen = sizeof(err);
if (err != 0) { KJ_SYSCALL(getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen));
KJ_FAIL_SYSCALL("connect()", err) { break; } if (err != 0) {
} KJ_FAIL_SYSCALL("connect()", err) { break; }
return kj::mv(stream); }
})); return kj::mv(stream);
}));
} }
Own<ConnectionReceiver> wrapListenSocketFd(int fd, uint flags = 0) override { Own<ConnectionReceiver> wrapListenSocketFd(int fd, uint flags = 0) override {
return heap<FdConnectionReceiver>(eventPort, fd, flags); return heap<FdConnectionReceiver>(eventPort, fd, flags);
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "async-unix.h" #include "async-unix.h"
#include "thread.h" #include "thread.h"
#include "debug.h" #include "debug.h"
#include "io.h"
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/types.h> #include <sys/types.h>
...@@ -197,29 +198,49 @@ TEST_F(AsyncUnixTest, SignalsNoWait) { ...@@ -197,29 +198,49 @@ TEST_F(AsyncUnixTest, SignalsNoWait) {
#endif // !__CYGWIN32__ #endif // !__CYGWIN32__
TEST_F(AsyncUnixTest, Poll) { TEST_F(AsyncUnixTest, ReadObserver) {
UnixEventPort port; UnixEventPort port;
EventLoop loop(port); EventLoop loop(port);
WaitScope waitScope(loop); WaitScope waitScope(loop);
int pipefds[2]; int pipefds[2];
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
KJ_SYSCALL(pipe(pipefds)); KJ_SYSCALL(pipe(pipefds));
KJ_SYSCALL(write(pipefds[1], "foo", 3)); kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
UnixEventPort::ReadObserver readObserver(port, infd);
KJ_SYSCALL(write(outfd, "foo", 3));
readObserver.whenBecomesReadable().wait(waitScope);
#if __linux__ // platform known to support POLLRDHUP
EXPECT_FALSE(KJ_ASSERT_NONNULL(readObserver.atEndHint()));
EXPECT_EQ(POLLIN, port.onFdEvent(pipefds[0], POLLIN | POLLPRI).wait(waitScope)); char buffer[4096];
ssize_t n;
KJ_SYSCALL(n = read(infd, &buffer, sizeof(buffer)));
EXPECT_EQ(3, n);
KJ_SYSCALL(write(outfd, "bar", 3));
outfd = nullptr;
readObserver.whenBecomesReadable().wait(waitScope);
EXPECT_TRUE(KJ_ASSERT_NONNULL(readObserver.atEndHint()));
#endif
} }
TEST_F(AsyncUnixTest, PollMultiListen) { TEST_F(AsyncUnixTest, ReadObserverMultiListen) {
UnixEventPort port; UnixEventPort port;
EventLoop loop(port); EventLoop loop(port);
WaitScope waitScope(loop); WaitScope waitScope(loop);
int bogusPipefds[2]; int bogusPipefds[2];
KJ_SYSCALL(pipe(bogusPipefds)); KJ_SYSCALL(pipe(bogusPipefds));
UnixEventPort::ReadObserver bogusReadObserver(port, bogusPipefds[0]);
KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); }); KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
port.onFdEvent(bogusPipefds[0], POLLIN | POLLPRI).then([](short s) { bogusReadObserver.whenBecomesReadable().then([]() {
ADD_FAILURE() << "Received wrong poll."; ADD_FAILURE() << "Received wrong poll.";
}).detach([](kj::Exception&& exception) { }).detach([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr(); ADD_FAILURE() << kj::str(exception).cStr();
...@@ -228,12 +249,14 @@ TEST_F(AsyncUnixTest, PollMultiListen) { ...@@ -228,12 +249,14 @@ TEST_F(AsyncUnixTest, PollMultiListen) {
int pipefds[2]; int pipefds[2];
KJ_SYSCALL(pipe(pipefds)); KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); }); KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::ReadObserver readObserver(port, pipefds[0]);
KJ_SYSCALL(write(pipefds[1], "foo", 3)); KJ_SYSCALL(write(pipefds[1], "foo", 3));
EXPECT_EQ(POLLIN, port.onFdEvent(pipefds[0], POLLIN | POLLPRI).wait(waitScope)); readObserver.whenBecomesReadable().wait(waitScope);
} }
TEST_F(AsyncUnixTest, PollMultiReceive) { TEST_F(AsyncUnixTest, ReadObserverMultiReceive) {
UnixEventPort port; UnixEventPort port;
EventLoop loop(port); EventLoop loop(port);
WaitScope waitScope(loop); WaitScope waitScope(loop);
...@@ -241,36 +264,42 @@ TEST_F(AsyncUnixTest, PollMultiReceive) { ...@@ -241,36 +264,42 @@ TEST_F(AsyncUnixTest, PollMultiReceive) {
int pipefds[2]; int pipefds[2];
KJ_SYSCALL(pipe(pipefds)); KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); }); KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::ReadObserver readObserver(port, pipefds[0]);
KJ_SYSCALL(write(pipefds[1], "foo", 3)); KJ_SYSCALL(write(pipefds[1], "foo", 3));
int pipefds2[2]; int pipefds2[2];
KJ_SYSCALL(pipe(pipefds2)); KJ_SYSCALL(pipe(pipefds2));
KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); }); KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
UnixEventPort::ReadObserver readObserver2(port, pipefds2[0]);
KJ_SYSCALL(write(pipefds2[1], "bar", 3)); KJ_SYSCALL(write(pipefds2[1], "bar", 3));
EXPECT_EQ(POLLIN, port.onFdEvent(pipefds[0], POLLIN | POLLPRI).wait(waitScope)); readObserver.whenBecomesReadable().wait(waitScope);
EXPECT_EQ(POLLIN, port.onFdEvent(pipefds2[0], POLLIN | POLLPRI).wait(waitScope)); readObserver2.whenBecomesReadable().wait(waitScope);
} }
TEST_F(AsyncUnixTest, PollAsync) { TEST_F(AsyncUnixTest, ReadObserverAsync) {
UnixEventPort port; UnixEventPort port;
EventLoop loop(port); EventLoop loop(port);
WaitScope waitScope(loop); WaitScope waitScope(loop);
// Make a pipe and wait on its read end while another thread writes to it. // Make a pipe and wait on its read end while another thread writes to it.
int pipefds[2]; int pipefds[2];
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
KJ_SYSCALL(pipe(pipefds)); KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::ReadObserver readObserver(port, pipefds[0]);
Thread thread([&]() { Thread thread([&]() {
delay(); delay();
KJ_SYSCALL(write(pipefds[1], "foo", 3)); KJ_SYSCALL(write(pipefds[1], "foo", 3));
}); });
// Wait for the event in this thread. // Wait for the event in this thread.
EXPECT_EQ(POLLIN, port.onFdEvent(pipefds[0], POLLIN | POLLPRI).wait(waitScope)); readObserver.whenBecomesReadable().wait(waitScope);
} }
TEST_F(AsyncUnixTest, PollNoWait) { TEST_F(AsyncUnixTest, ReadObserverNoWait) {
// Verify that UnixEventPort::poll() correctly receives pending FD events. // Verify that UnixEventPort::poll() correctly receives pending FD events.
UnixEventPort port; UnixEventPort port;
...@@ -280,19 +309,19 @@ TEST_F(AsyncUnixTest, PollNoWait) { ...@@ -280,19 +309,19 @@ TEST_F(AsyncUnixTest, PollNoWait) {
int pipefds[2]; int pipefds[2];
KJ_SYSCALL(pipe(pipefds)); KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); }); KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::ReadObserver readObserver(port, pipefds[0]);
int pipefds2[2]; int pipefds2[2];
KJ_SYSCALL(pipe(pipefds2)); KJ_SYSCALL(pipe(pipefds2));
KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); }); KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
UnixEventPort::ReadObserver readObserver2(port, pipefds2[0]);
int receivedCount = 0; int receivedCount = 0;
port.onFdEvent(pipefds[0], POLLIN | POLLPRI).then([&](short&& events) { readObserver.whenBecomesReadable().then([&]() {
receivedCount++; receivedCount++;
EXPECT_EQ(POLLIN, events);
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); }); }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
port.onFdEvent(pipefds2[0], POLLIN | POLLPRI).then([&](short&& events) { readObserver2.whenBecomesReadable().then([&]() {
receivedCount++; receivedCount++;
EXPECT_EQ(POLLIN, events);
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); }); }).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
KJ_SYSCALL(write(pipefds[1], "foo", 3)); KJ_SYSCALL(write(pipefds[1], "foo", 3));
...@@ -313,6 +342,52 @@ TEST_F(AsyncUnixTest, PollNoWait) { ...@@ -313,6 +342,52 @@ TEST_F(AsyncUnixTest, PollNoWait) {
EXPECT_EQ(2, receivedCount); EXPECT_EQ(2, receivedCount);
} }
static void setNonblocking(int fd) {
int flags;
KJ_SYSCALL(flags = fcntl(fd, F_GETFL));
if ((flags & O_NONBLOCK) == 0) {
KJ_SYSCALL(fcntl(fd, F_SETFL, flags | O_NONBLOCK));
}
}
TEST_F(AsyncUnixTest, WriteObserver) {
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
setNonblocking(outfd);
UnixEventPort::WriteObserver writeObserver(port, outfd);
// Fill buffer.
ssize_t n;
do {
KJ_NONBLOCKING_SYSCALL(n = write(outfd, "foo", 3));
} while (n >= 0);
bool writable = false;
auto promise = writeObserver.whenBecomesWritable()
.then([&]() { writable = true; }).eagerlyEvaluate(nullptr);
loop.run();
port.poll();
loop.run();
EXPECT_FALSE(writable);
char buffer[4096];
KJ_SYSCALL(read(infd, &buffer, sizeof(buffer)));
loop.run();
port.poll();
loop.run();
EXPECT_TRUE(writable);
}
TEST_F(AsyncUnixTest, SteadyTimers) { TEST_F(AsyncUnixTest, SteadyTimers) {
UnixEventPort port; UnixEventPort port;
EventLoop loop(port); EventLoop loop(port);
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include <limits> #include <limits>
#include <set> #include <set>
#include <chrono> #include <chrono>
#include <poll.h>
namespace kj { namespace kj {
...@@ -135,9 +136,11 @@ public: ...@@ -135,9 +136,11 @@ public:
class UnixEventPort::PollPromiseAdapter { class UnixEventPort::PollPromiseAdapter {
public: public:
inline PollPromiseAdapter(PromiseFulfiller<short>& fulfiller, inline PollPromiseAdapter(PromiseFulfiller<void>& fulfiller,
UnixEventPort& loop, int fd, short eventMask) UnixEventPort& loop, int fd, short eventMask,
: loop(loop), fd(fd), eventMask(eventMask), fulfiller(fulfiller) { Maybe<UnixEventPort::ReadObserver&> readObserver)
: loop(loop), fd(fd), eventMask(eventMask),
fulfiller(fulfiller), readObserver(readObserver) {
prev = loop.pollTail; prev = loop.pollTail;
*loop.pollTail = this; *loop.pollTail = this;
loop.pollTail = &next; loop.pollTail = &next;
...@@ -154,6 +157,24 @@ public: ...@@ -154,6 +157,24 @@ public:
} }
} }
void fire(short events) {
KJ_IF_MAYBE(ro, readObserver) {
#ifndef POLLRDHUP
#define POLLRDHUP 0
#endif
if (events & (POLLHUP | POLLRDHUP)) {
ro->atEnd = true;
#if POLLRDHUP
} else {
// Since POLLRDHUP exists on this platform, and we didn't receive it, we know that we're not
// at the end.
ro->atEnd = false;
#endif
}
}
fulfiller.fulfill();
}
void removeFromList() { void removeFromList() {
if (next == nullptr) { if (next == nullptr) {
loop.pollTail = prev; loop.pollTail = prev;
...@@ -168,7 +189,8 @@ public: ...@@ -168,7 +189,8 @@ public:
UnixEventPort& loop; UnixEventPort& loop;
int fd; int fd;
short eventMask; short eventMask;
PromiseFulfiller<short>& fulfiller; PromiseFulfiller<void>& fulfiller;
Maybe<UnixEventPort::ReadObserver&> readObserver; // to fill in atEnd hint.
PollPromiseAdapter* next = nullptr; PollPromiseAdapter* next = nullptr;
PollPromiseAdapter** prev = nullptr; PollPromiseAdapter** prev = nullptr;
}; };
...@@ -211,8 +233,12 @@ UnixEventPort::UnixEventPort() ...@@ -211,8 +233,12 @@ UnixEventPort::UnixEventPort()
UnixEventPort::~UnixEventPort() noexcept(false) {} UnixEventPort::~UnixEventPort() noexcept(false) {}
Promise<short> UnixEventPort::onFdEvent(int fd, short eventMask) { Promise<void> UnixEventPort::ReadObserver::whenBecomesReadable() {
return newAdaptedPromise<short, PollPromiseAdapter>(*this, fd, eventMask); return newAdaptedPromise<void, PollPromiseAdapter>(eventPort, fd, POLLIN | POLLRDHUP, *this);
}
Promise<void> UnixEventPort::WriteObserver::whenBecomesWritable() {
return newAdaptedPromise<void, PollPromiseAdapter>(eventPort, fd, POLLOUT, nullptr);
} }
Promise<siginfo_t> UnixEventPort::onSignal(int signum) { Promise<siginfo_t> UnixEventPort::onSignal(int signum) {
...@@ -273,7 +299,7 @@ public: ...@@ -273,7 +299,7 @@ public:
for (auto i: indices(pollfds)) { for (auto i: indices(pollfds)) {
if (pollfds[i].revents != 0) { if (pollfds[i].revents != 0) {
pollEvents[i]->fulfiller.fulfill(kj::mv(pollfds[i].revents)); pollEvents[i]->fire(pollfds[i].revents);
pollEvents[i]->removeFromList(); pollEvents[i]->removeFromList();
if (--pollResult <= 0) { if (--pollResult <= 0) {
break; break;
......
...@@ -30,14 +30,11 @@ ...@@ -30,14 +30,11 @@
#include "time.h" #include "time.h"
#include "vector.h" #include "vector.h"
#include <signal.h> #include <signal.h>
#include <poll.h>
#include <pthread.h> #include <pthread.h>
namespace kj { namespace kj {
class UnixEventPort: public EventPort { class UnixEventPort: public EventPort {
// THIS INTERFACE IS LIKELY TO CHANGE; consider using only what is defined in async-io.h instead.
//
// An EventPort implementation which can wait for events on file descriptors as well as signals. // An EventPort implementation which can wait for events on file descriptors as well as signals.
// This API only makes sense on Unix. // This API only makes sense on Unix.
// //
...@@ -54,9 +51,9 @@ public: ...@@ -54,9 +51,9 @@ public:
UnixEventPort(); UnixEventPort();
~UnixEventPort() noexcept(false); ~UnixEventPort() noexcept(false);
Promise<short> onFdEvent(int fd, short eventMask); class ReadObserver;
// `eventMask` is a bitwise-OR of poll events (e.g. `POLLIN`, `POLLOUT`, etc.). The next time class WriteObserver;
// one or more of the given events occurs on `fd`, the set of events that occurred are returned. // Classes that watch an fd for readability or writability. See definitions below.
Promise<siginfo_t> onSignal(int signum); Promise<siginfo_t> onSignal(int signum);
// When the given signal is delivered to this thread, return the corresponding siginfo_t. // When the given signal is delivered to this thread, return the corresponding siginfo_t.
...@@ -94,6 +91,9 @@ public: ...@@ -94,6 +91,9 @@ public:
void poll() override; void poll() override;
private: private:
#if KJ_USE_EPOLL
// TODO(soon): epoll implementation
#else
class PollPromiseAdapter; class PollPromiseAdapter;
class SignalPromiseAdapter; class SignalPromiseAdapter;
class TimerPromiseAdapter; class TimerPromiseAdapter;
...@@ -114,6 +114,132 @@ private: ...@@ -114,6 +114,132 @@ private:
void processTimers(); void processTimers();
friend class TimerPromiseAdapter; friend class TimerPromiseAdapter;
#endif
};
class UnixEventPort::ReadObserver {
// Object which watches a file descriptor to determine when it is readable.
//
// For listen sockets, "readable" means that there is a connection to accept(). For everything
// else, it means that read() (or recv()) will return data.
//
// The presence of out-of-band data should NOT fire this event. However, the event may
// occasionally fire spurriously (when there is actually no data to read), and one thing that can
// cause such spurrious events is the arrival of OOB data on certain platforms whose event
// interfaces fail to distinguish between regular and OOB data (e.g. Mac OSX).
//
// WARNING: The exact behavior of this class differs across systems, since event interfaces
// vary wildly. Be sure to read the documentation carefully and avoid depending on unspecified
// behavior. If at all possible, use the higher-level AsyncInputStream interface instead.
public:
ReadObserver(UnixEventPort& eventPort, int fd): eventPort(eventPort), fd(fd) {}
// Begin watching the given file descriptor for readability. Only one ReadObserver may exist
// for a given file descriptor at a time.
KJ_DISALLOW_COPY(ReadObserver);
Promise<void> whenBecomesReadable();
// Resolves the next time the file descriptor transitions from having no data to read to having
// some data to read.
//
// KJ uses "edge-triggered" event notification whenever possible. As a result, it is an error
// to call this method when there is already data in the read buffer which has been there since
// prior to the last turn of the event loop or prior to creation FdWatcher. In this case, it is
// unspecified whether the promise will ever resolve -- it depends on the underlying event
// mechanism being used.
//
// In order to avoid this problem, make sure that you only call `whenBecomesReadable()`
// only at times when you know the buffer is empty. You know this for sure when one of the
// following happens:
// * read() or recv() fails with EAGAIN or EWOULDBLOCK. (You MUST have non-blocking mode
// enabled on the fd!)
// * The file descriptor is a regular byte-oriented object (like a socket or pipe),
// read() or recv() returns fewer than the number of bytes requested, and `atEndHint()`
// returns false. This can only happen if the buffer is empty but EOF is not reached. (Note,
// though, that for record-oriented file descriptors like Linux's inotify interface, this
// rule does not hold, because it could simply be that the next record did not fit into the
// space available.)
//
// It is an error to call `whenBecomesReadable()` again when the promise returned previously
// has not yet resolved. If you do this, the previous promise may throw an exception.
inline Maybe<bool> atEndHint() { return atEnd; }
// Returns true if the event system has indicated that EOF has been received. There may still
// be data in the read buffer, but once that is gone, there's nothing left.
//
// Returns false if the event system has indicated that EOF had NOT been received as of the
// last turn of the event loop.
//
// Returns nullptr if the event system does not know whether EOF has been reached. In this
// case, the only way to know for sure is to call read() or recv() and check if it returns
// zero.
//
// This hint may be useful as an optimization to avoid an unnecessary system call.
private:
UnixEventPort& eventPort;
int fd;
#if KJ_USE_EPOLL
// TODO(soon): epoll implementation
Own<PromiseFulfiller<void>> fulfiller;
// Replaced each time `whenBecomesReadable()` is called.
#else
#endif
Maybe<bool> atEnd;
friend class UnixEventPort;
};
class UnixEventPort::WriteObserver {
// Object which watches a file descriptor to determine when it is writable.
//
// WARNING: The exact behavior of this class differs across systems, since event interfaces
// vary wildly. Be sure to read the documentation carefully and avoid depending on unspecified
// behavior. If at all possible, use the higher-level AsyncOutputStream interface instead.
public:
WriteObserver(UnixEventPort& eventPort, int fd): eventPort(eventPort), fd(fd) {}
// Begin watching the given file descriptor for writability. Only one WriteObserver may exist
// for a given file descriptor at a time.
KJ_DISALLOW_COPY(WriteObserver);
Promise<void> whenBecomesWritable();
// Resolves the next time the file descriptor transitions from having no space available in the
// write buffer to having some space available.
//
// KJ uses "edge-triggered" event notification whenever possible. As a result, it is an error
// to call this method when there is already space in the write buffer which has been there
// since prior to the last turn of the event loop or prior to creation FdWatcher. In this case,
// it is unspecified whether the promise will ever resolve -- it depends on the underlying
// event mechanism being used.
//
// In order to avoid this problem, make sure that you only call `whenBecomesWritable()`
// only at times when you know the buffer is full. You know this for sure when one of the
// following happens:
// * write() or send() fails with EAGAIN or EWOULDBLOCK. (You MUST have non-blocking mode
// enabled on the fd!)
// * write() or send() succeeds but accepts fewer than the number of bytes provided. This can
// only happen if the buffer is full.
//
// It is an error to call `whenBecomesWritable()` again when the promise returned previously
// has not yet resolved. If you do this, the previous promise may throw an exception.
private:
UnixEventPort& eventPort;
int fd;
#if KJ_USE_EPOLL
Own<PromiseFulfiller<void>> fulfiller;
// Replaced each time `whenBecomesReadable()` is called.
#else
#endif
friend class UnixEventPort;
}; };
} // namespace kj } // namespace kj
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment