Commit c50dd602 authored by Kenton Varda's avatar Kenton Varda

Use edge-triggered epoll on Linux.

parent 50b9a529
......@@ -110,8 +110,7 @@ class AsyncStreamFd: public OwnedFileDescriptor, public AsyncIoStream {
public:
AsyncStreamFd(UnixEventPort& eventPort, int fd, uint flags)
: OwnedFileDescriptor(fd, flags),
readObserver(eventPort, fd),
writeObserver(eventPort, fd) {}
observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ_WRITE) {}
virtual ~AsyncStreamFd() noexcept(false) {}
Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
......@@ -157,7 +156,7 @@ public:
buffer = reinterpret_cast<const byte*>(buffer) + n;
size -= n;
return writeObserver.whenBecomesWritable().then([=]() {
return observer.whenBecomesWritable().then([=]() {
return write(buffer, size);
});
}
......@@ -192,7 +191,7 @@ public:
if (pollResult == 0) {
// Not ready yet. We can safely use the edge-triggered observer.
return readObserver.whenBecomesReadable();
return observer.whenBecomesReadable();
} else {
// Ready now.
return kj::READY_NOW;
......@@ -200,8 +199,7 @@ public:
}
private:
UnixEventPort::ReadObserver readObserver;
UnixEventPort::WriteObserver writeObserver;
UnixEventPort::FdObserver observer;
Promise<size_t> tryReadInternal(void* buffer, size_t minBytes, size_t maxBytes,
size_t alreadyRead) {
......@@ -226,7 +224,7 @@ private:
if (n < 0) {
// Read would block.
return readObserver.whenBecomesReadable().then([=]() {
return observer.whenBecomesReadable().then([=]() {
return tryReadInternal(buffer, minBytes, maxBytes, alreadyRead);
});
} else if (n == 0) {
......@@ -243,7 +241,7 @@ private:
maxBytes -= n;
alreadyRead += n;
KJ_IF_MAYBE(atEnd, readObserver.atEndHint()) {
KJ_IF_MAYBE(atEnd, observer.atEndHint()) {
if (*atEnd) {
// We've already received an indication that the next read() will return EOF, so there's
// nothing to wait for.
......@@ -256,7 +254,7 @@ private:
// that even if it was received since then, whenBecomesReadable() will catch that. So,
// let's go ahead and skip calling read() here and instead go straight to waiting for
// more input.
return readObserver.whenBecomesReadable().then([=]() {
return observer.whenBecomesReadable().then([=]() {
return tryReadInternal(buffer, minBytes, maxBytes, alreadyRead);
});
}
......@@ -304,7 +302,7 @@ private:
if (n < firstPiece.size()) {
// Only part of the first piece was consumed. Wait for buffer space and then write again.
firstPiece = firstPiece.slice(n, firstPiece.size());
return writeObserver.whenBecomesWritable().then([=]() {
return observer.whenBecomesWritable().then([=]() {
return writeInternal(firstPiece, morePieces);
});
} else if (morePieces.size() == 0) {
......@@ -731,7 +729,8 @@ Promise<Array<SocketAddress>> SocketAddress::lookupHost(
class FdConnectionReceiver final: public ConnectionReceiver, public OwnedFileDescriptor {
public:
FdConnectionReceiver(UnixEventPort& eventPort, int fd, uint flags)
: OwnedFileDescriptor(fd, flags), eventPort(eventPort), observer(eventPort, fd) {}
: OwnedFileDescriptor(fd, flags), eventPort(eventPort),
observer(eventPort, fd, UnixEventPort::FdObserver::OBSERVE_READ) {}
Promise<Own<AsyncIoStream>> accept() override {
int newFd;
......@@ -785,7 +784,7 @@ public:
public:
UnixEventPort& eventPort;
UnixEventPort::ReadObserver observer;
UnixEventPort::FdObserver observer;
};
class TimerImpl final: public Timer {
......
......@@ -86,6 +86,27 @@ TEST_F(AsyncUnixTest, SignalWithValue) {
EXPECT_SI_CODE(SI_QUEUE, info.si_code);
EXPECT_EQ(123, info.si_value.sival_int);
}
TEST_F(AsyncUnixTest, SignalWithPointerValue) {
// This tests that if we use sigqueue() to attach a value to the signal, that value is received
// correctly. Note that this only works on platforms that support real-time signals -- even
// though the signal we're sending is SIGURG, the sigqueue() system call is introduced by RT
// signals. Hence this test won't run on e.g. Mac OSX.
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
union sigval value;
memset(&value, 0, sizeof(value));
value.sival_ptr = &port;
sigqueue(getpid(), SIGURG, value);
siginfo_t info = port.onSignal(SIGURG).wait(waitScope);
EXPECT_EQ(SIGURG, info.si_signo);
EXPECT_SI_CODE(SI_QUEUE, info.si_code);
EXPECT_EQ(&port, info.si_value.sival_ptr);
}
#endif
TEST_F(AsyncUnixTest, SignalsMultiListen) {
......@@ -207,14 +228,14 @@ TEST_F(AsyncUnixTest, ReadObserver) {
KJ_SYSCALL(pipe(pipefds));
kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
UnixEventPort::ReadObserver readObserver(port, infd);
UnixEventPort::FdObserver observer(port, infd, UnixEventPort::FdObserver::OBSERVE_READ);
KJ_SYSCALL(write(outfd, "foo", 3));
readObserver.whenBecomesReadable().wait(waitScope);
observer.whenBecomesReadable().wait(waitScope);
#if __linux__ // platform known to support POLLRDHUP
EXPECT_FALSE(KJ_ASSERT_NONNULL(readObserver.atEndHint()));
EXPECT_FALSE(KJ_ASSERT_NONNULL(observer.atEndHint()));
char buffer[4096];
ssize_t n;
......@@ -224,9 +245,9 @@ TEST_F(AsyncUnixTest, ReadObserver) {
KJ_SYSCALL(write(outfd, "bar", 3));
outfd = nullptr;
readObserver.whenBecomesReadable().wait(waitScope);
observer.whenBecomesReadable().wait(waitScope);
EXPECT_TRUE(KJ_ASSERT_NONNULL(readObserver.atEndHint()));
EXPECT_TRUE(KJ_ASSERT_NONNULL(observer.atEndHint()));
#endif
}
......@@ -237,10 +258,12 @@ TEST_F(AsyncUnixTest, ReadObserverMultiListen) {
int bogusPipefds[2];
KJ_SYSCALL(pipe(bogusPipefds));
UnixEventPort::ReadObserver bogusReadObserver(port, bogusPipefds[0]);
KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
bogusReadObserver.whenBecomesReadable().then([]() {
UnixEventPort::FdObserver bogusObserver(port, bogusPipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
bogusObserver.whenBecomesReadable().then([]() {
ADD_FAILURE() << "Received wrong poll.";
}).detach([](kj::Exception&& exception) {
ADD_FAILURE() << kj::str(exception).cStr();
......@@ -250,10 +273,11 @@ TEST_F(AsyncUnixTest, ReadObserverMultiListen) {
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::ReadObserver readObserver(port, pipefds[0]);
UnixEventPort::FdObserver observer(port, pipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
KJ_SYSCALL(write(pipefds[1], "foo", 3));
readObserver.whenBecomesReadable().wait(waitScope);
observer.whenBecomesReadable().wait(waitScope);
}
TEST_F(AsyncUnixTest, ReadObserverMultiReceive) {
......@@ -265,18 +289,22 @@ TEST_F(AsyncUnixTest, ReadObserverMultiReceive) {
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::ReadObserver readObserver(port, pipefds[0]);
UnixEventPort::FdObserver observer(port, pipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
KJ_SYSCALL(write(pipefds[1], "foo", 3));
int pipefds2[2];
KJ_SYSCALL(pipe(pipefds2));
KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
UnixEventPort::ReadObserver readObserver2(port, pipefds2[0]);
UnixEventPort::FdObserver observer2(port, pipefds2[0],
UnixEventPort::FdObserver::OBSERVE_READ);
KJ_SYSCALL(write(pipefds2[1], "bar", 3));
readObserver.whenBecomesReadable().wait(waitScope);
readObserver2.whenBecomesReadable().wait(waitScope);
auto promise1 = observer.whenBecomesReadable();
auto promise2 = observer2.whenBecomesReadable();
promise1.wait(waitScope);
promise2.wait(waitScope);
}
TEST_F(AsyncUnixTest, ReadObserverAsync) {
......@@ -288,7 +316,8 @@ TEST_F(AsyncUnixTest, ReadObserverAsync) {
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::ReadObserver readObserver(port, pipefds[0]);
UnixEventPort::FdObserver observer(port, pipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
Thread thread([&]() {
delay();
......@@ -296,7 +325,7 @@ TEST_F(AsyncUnixTest, ReadObserverAsync) {
});
// Wait for the event in this thread.
readObserver.whenBecomesReadable().wait(waitScope);
observer.whenBecomesReadable().wait(waitScope);
}
TEST_F(AsyncUnixTest, ReadObserverNoWait) {
......@@ -309,18 +338,20 @@ TEST_F(AsyncUnixTest, ReadObserverNoWait) {
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
UnixEventPort::ReadObserver readObserver(port, pipefds[0]);
UnixEventPort::FdObserver observer(port, pipefds[0],
UnixEventPort::FdObserver::OBSERVE_READ);
int pipefds2[2];
KJ_SYSCALL(pipe(pipefds2));
KJ_DEFER({ close(pipefds2[1]); close(pipefds2[0]); });
UnixEventPort::ReadObserver readObserver2(port, pipefds2[0]);
UnixEventPort::FdObserver observer2(port, pipefds2[0],
UnixEventPort::FdObserver::OBSERVE_READ);
int receivedCount = 0;
readObserver.whenBecomesReadable().then([&]() {
observer.whenBecomesReadable().then([&]() {
receivedCount++;
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
readObserver2.whenBecomesReadable().then([&]() {
observer2.whenBecomesReadable().then([&]() {
receivedCount++;
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
......@@ -360,7 +391,7 @@ TEST_F(AsyncUnixTest, WriteObserver) {
kj::AutoCloseFd infd(pipefds[0]), outfd(pipefds[1]);
setNonblocking(outfd);
UnixEventPort::WriteObserver writeObserver(port, outfd);
UnixEventPort::FdObserver observer(port, outfd, UnixEventPort::FdObserver::OBSERVE_WRITE);
// Fill buffer.
ssize_t n;
......@@ -369,7 +400,7 @@ TEST_F(AsyncUnixTest, WriteObserver) {
} while (n >= 0);
bool writable = false;
auto promise = writeObserver.whenBecomesWritable()
auto promise = observer.whenBecomesWritable()
.then([&]() { writable = true; }).eagerlyEvaluate(nullptr);
loop.run();
......
......@@ -28,11 +28,82 @@
#include <limits>
#include <set>
#include <chrono>
#include <pthread.h>
#if KJ_USE_EPOLL
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/signalfd.h>
#include <sys/eventfd.h>
#else
#include <poll.h>
#endif
namespace kj {
// =======================================================================================
// Timer code common to multiple implementations
struct UnixEventPort::TimerSet {
struct TimerBefore {
bool operator()(TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs);
};
using Timers = std::multiset<TimerPromiseAdapter*, TimerBefore>;
Timers timers;
};
class UnixEventPort::TimerPromiseAdapter {
public:
TimerPromiseAdapter(PromiseFulfiller<void>& fulfiller, UnixEventPort& port, TimePoint time)
: time(time), fulfiller(fulfiller), port(port) {
pos = port.timers->timers.insert(this);
}
~TimerPromiseAdapter() {
if (pos != port.timers->timers.end()) {
port.timers->timers.erase(pos);
}
}
void fulfill() {
fulfiller.fulfill();
port.timers->timers.erase(pos);
pos = port.timers->timers.end();
}
const TimePoint time;
PromiseFulfiller<void>& fulfiller;
UnixEventPort& port;
TimerSet::Timers::const_iterator pos;
};
bool UnixEventPort::TimerSet::TimerBefore::operator()(
TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs) {
return lhs->time < rhs->time;
}
Promise<void> UnixEventPort::atSteadyTime(TimePoint time) {
return newAdaptedPromise<void, TimerPromiseAdapter>(*this, time);
}
TimePoint UnixEventPort::currentSteadyTime() {
return origin<TimePoint>() + std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count() * NANOSECONDS;
}
void UnixEventPort::processTimers() {
frozenSteadyTime = currentSteadyTime();
for (;;) {
auto front = timers->timers.begin();
if (front == timers->timers.end() || (*front)->time > frozenSteadyTime) {
break;
}
(*front)->fulfill();
}
}
// =======================================================================================
// Signal code common to multiple implementations
namespace {
......@@ -58,41 +129,31 @@ void registerSignalHandler(int signum) {
tooLateToSetReserved = true;
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, signum);
sigprocmask(SIG_BLOCK, &mask, nullptr);
KJ_SYSCALL(sigemptyset(&mask));
KJ_SYSCALL(sigaddset(&mask, signum));
KJ_SYSCALL(sigprocmask(SIG_BLOCK, &mask, nullptr));
#if !KJ_USE_EPOLL // on Linux we'll use signalfd
struct sigaction action;
memset(&action, 0, sizeof(action));
action.sa_sigaction = &signalHandler;
sigfillset(&action.sa_mask);
KJ_SYSCALL(sigfillset(&action.sa_mask));
action.sa_flags = SA_SIGINFO;
sigaction(signum, &action, nullptr);
KJ_SYSCALL(sigaction(signum, &action, nullptr));
#endif
}
void registerReservedSignal() {
registerSignalHandler(reservedSignal);
// We also disable SIGPIPE because users of UnixEventLoop almost certainly don't want it.
signal(SIGPIPE, SIG_IGN);
KJ_SYSCALL(signal(SIGPIPE, SIG_IGN));
}
pthread_once_t registerReservedSignalOnce = PTHREAD_ONCE_INIT;
} // namespace
// =======================================================================================
struct UnixEventPort::TimerSet {
struct TimerBefore {
bool operator()(TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs);
};
using Timers = std::multiset<TimerPromiseAdapter*, TimerBefore>;
Timers timers;
};
// =======================================================================================
class UnixEventPort::SignalPromiseAdapter {
public:
inline SignalPromiseAdapter(PromiseFulfiller<siginfo_t>& fulfiller,
......@@ -134,97 +195,327 @@ public:
SignalPromiseAdapter** prev = nullptr;
};
class UnixEventPort::PollPromiseAdapter {
public:
inline PollPromiseAdapter(PromiseFulfiller<void>& fulfiller,
UnixEventPort& loop, int fd, short eventMask,
Maybe<UnixEventPort::ReadObserver&> readObserver)
: loop(loop), fd(fd), eventMask(eventMask),
fulfiller(fulfiller), readObserver(readObserver) {
prev = loop.pollTail;
*loop.pollTail = this;
loop.pollTail = &next;
Promise<siginfo_t> UnixEventPort::onSignal(int signum) {
return newAdaptedPromise<siginfo_t, SignalPromiseAdapter>(*this, signum);
}
void UnixEventPort::captureSignal(int signum) {
if (reservedSignal == SIGUSR1) {
KJ_REQUIRE(signum != SIGUSR1,
"Sorry, SIGUSR1 is reserved by the UnixEventPort implementation. You may call "
"UnixEventPort::setReservedSignal() to reserve a different signal.");
} else {
KJ_REQUIRE(signum != reservedSignal,
"Can't capture signal reserved using setReservedSignal().", signum);
}
registerSignalHandler(signum);
}
~PollPromiseAdapter() noexcept(false) {
if (prev != nullptr) {
if (next == nullptr) {
loop.pollTail = prev;
} else {
next->prev = prev;
}
*prev = next;
void UnixEventPort::setReservedSignal(int signum) {
KJ_REQUIRE(!tooLateToSetReserved,
"setReservedSignal() must be called before any calls to `captureSignal()` and "
"before any `UnixEventPort` is constructed.");
if (reservedSignal != SIGUSR1 && reservedSignal != signum) {
KJ_FAIL_REQUIRE("Detected multiple conflicting calls to setReservedSignal(). Please only "
"call this once, or always call it with the same signal number.");
}
reservedSignal = signum;
}
void UnixEventPort::gotSignal(const siginfo_t& siginfo) {
// Fire any events waiting on this signal.
auto ptr = signalHead;
while (ptr != nullptr) {
if (ptr->signum == siginfo.si_signo) {
ptr->fulfiller.fulfill(kj::cp(siginfo));
ptr = ptr->removeFromList();
} else {
ptr = ptr->next;
}
}
}
void fire(short events) {
KJ_IF_MAYBE(ro, readObserver) {
#ifndef POLLRDHUP
#define POLLRDHUP 0
#endif
if (events & (POLLHUP | POLLRDHUP)) {
ro->atEnd = true;
#if POLLRDHUP
} else {
// Since POLLRDHUP exists on this platform, and we didn't receive it, we know that we're not
// at the end.
ro->atEnd = false;
#endif
}
#if KJ_USE_EPOLL
// =======================================================================================
// epoll FdObserver implementation
UnixEventPort::UnixEventPort()
: timers(kj::heap<TimerSet>()),
frozenSteadyTime(currentSteadyTime()),
epollFd(-1),
signalFd(-1),
eventFd(-1) {
pthread_once(&registerReservedSignalOnce, &registerReservedSignal);
int fd;
KJ_SYSCALL(fd = epoll_create1(EPOLL_CLOEXEC));
epollFd = AutoCloseFd(fd);
KJ_SYSCALL(sigemptyset(&signalFdSigset));
KJ_SYSCALL(fd = signalfd(-1, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC));
signalFd = AutoCloseFd(fd);
KJ_SYSCALL(fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
eventFd = AutoCloseFd(fd);
struct epoll_event event;
memset(&event, 0, sizeof(event));
event.events = EPOLLIN;
event.data.u64 = 0;
KJ_SYSCALL(epoll_ctl(epollFd, EPOLL_CTL_ADD, signalFd, &event));
event.data.u64 = 1;
KJ_SYSCALL(epoll_ctl(epollFd, EPOLL_CTL_ADD, eventFd, &event));
}
UnixEventPort::~UnixEventPort() noexcept(false) {}
UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint flags)
: eventPort(eventPort), fd(fd), flags(flags) {
struct epoll_event event;
memset(&event, 0, sizeof(event));
if (flags & OBSERVE_READ) {
event.events |= EPOLLIN | EPOLLRDHUP;
}
if (flags & OBSERVE_WRITE) {
event.events |= EPOLLOUT;
}
event.events |= EPOLLET; // Set edge-triggered mode.
event.data.ptr = this;
KJ_SYSCALL(epoll_ctl(eventPort.epollFd, EPOLL_CTL_ADD, fd, &event));
}
UnixEventPort::FdObserver::~FdObserver() noexcept(false) {
KJ_SYSCALL(epoll_ctl(eventPort.epollFd, EPOLL_CTL_DEL, fd, nullptr));
}
void UnixEventPort::FdObserver::fire(short events) {
if (events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR)) {
if (events & (EPOLLHUP | EPOLLRDHUP)) {
atEnd = true;
} else {
// Since we didn't receive EPOLLRDHUP, we know that we're not at the end.
atEnd = false;
}
KJ_IF_MAYBE(f, readFulfiller) {
f->get()->fulfill();
readFulfiller = nullptr;
}
fulfiller.fulfill();
}
void removeFromList() {
if (next == nullptr) {
loop.pollTail = prev;
if (events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) {
KJ_IF_MAYBE(f, writeFulfiller) {
f->get()->fulfill();
writeFulfiller = nullptr;
}
}
}
Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() {
KJ_REQUIRE(flags & OBSERVE_READ, "FdObserver was not set to observe reads.");
auto paf = newPromiseAndFulfiller<void>();
readFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() {
KJ_REQUIRE(flags & OBSERVE_WRITE, "FdObserver was not set to observe writes.");
auto paf = newPromiseAndFulfiller<void>();
writeFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
void UnixEventPort::wait() {
// epoll_wait()'s timeout is an `int` count of milliseconds, so truncate to that.
// Also, make sure that we aren't within a millisecond of overflowing a `Duration` since that
// will break the math below.
constexpr Duration MAX_TIMEOUT =
min(int(maxValue) * MILLISECONDS, Duration(maxValue) - MILLISECONDS);
int epollTimeout = -1;
auto timer = timers->timers.begin();
if (timer != timers->timers.end()) {
Duration timeout = (*timer)->time - currentSteadyTime();
if (timeout < 0 * SECONDS) {
epollTimeout = 0;
} else if (timeout < MAX_TIMEOUT) {
// Round up to the next millisecond
epollTimeout = (timeout + 1 * MILLISECONDS - unit<Duration>()) / MILLISECONDS;
} else {
next->prev = prev;
epollTimeout = MAX_TIMEOUT / MILLISECONDS;
}
*prev = next;
next = nullptr;
prev = nullptr;
}
UnixEventPort& loop;
int fd;
short eventMask;
PromiseFulfiller<void>& fulfiller;
Maybe<UnixEventPort::ReadObserver&> readObserver; // to fill in atEnd hint.
PollPromiseAdapter* next = nullptr;
PollPromiseAdapter** prev = nullptr;
};
doEpollWait(epollTimeout);
}
class UnixEventPort::TimerPromiseAdapter {
public:
TimerPromiseAdapter(PromiseFulfiller<void>& fulfiller, UnixEventPort& port, TimePoint time)
: time(time), fulfiller(fulfiller), port(port) {
pos = port.timers->timers.insert(this);
void UnixEventPort::poll() {
doEpollWait(0);
}
static siginfo_t toRegularSiginfo(const struct signalfd_siginfo& siginfo) {
// Unfortunately, siginfo_t is mostly a big union and the correct set of fields to fill in
// depends on the type of signal. OTOH, signalfd_siginfo is a flat struct that expands all
// siginfo_t's union fields out to be non-overlapping. We can't just copy all the fields over
// because of the unions; we have to carefully figure out which fields are appropriate to fill
// in for this signal. Ick.
siginfo_t result;
memset(&result, 0, sizeof(result));
result.si_signo = siginfo.ssi_signo;
result.si_errno = siginfo.ssi_errno;
result.si_code = siginfo.ssi_code;
if (siginfo.ssi_code > 0) {
// Signal originated from the kernel. The structure of the siginfo depends primarily on the
// signal number.
switch (siginfo.ssi_signo) {
case SIGCHLD:
result.si_pid = siginfo.ssi_pid;
result.si_uid = siginfo.ssi_uid;
result.si_status = siginfo.ssi_status;
result.si_utime = siginfo.ssi_utime;
result.si_stime = siginfo.ssi_stime;
break;
case SIGILL:
case SIGFPE:
case SIGSEGV:
case SIGBUS:
case SIGTRAP:
result.si_addr = reinterpret_cast<void*>(static_cast<uintptr_t>(siginfo.ssi_addr));
#ifdef si_trapno
result.si_trapno = siginfo.ssi_trapno;
#endif
// ssi_addr_lsb is defined as coming immediately after ssi_addr in the kernel headers but
// apparently the userspace headers were never updated. So we do a pointer hack. :(
result.si_addr_lsb = *reinterpret_cast<const uint16_t*>(&siginfo.ssi_addr + 1);
break;
case SIGIO:
static_assert(SIGIO == SIGPOLL, "SIGIO != SIGPOLL?");
// Note: Technically, code can arrange for SIGIO signals to be delivered with a signal number
// other than SIGIO. AFAICT there is no way for us to detect this in the siginfo. Luckily
// SIGIO is totally obsoleted by epoll so it shouldn't come up.
result.si_band = siginfo.ssi_band;
result.si_fd = siginfo.ssi_fd;
break;
case SIGSYS:
// Apparently SIGSYS's fields are not available in signalfd_siginfo?
break;
}
} else {
// Signal originated from userspace. The sender could specify whatever signal number they
// wanted. The structure of the signal is determined by the API they used, which is identified
// by SI_CODE.
switch (siginfo.ssi_code) {
case SI_USER:
case SI_TKILL:
// kill(), tkill(), or tgkill().
result.si_pid = siginfo.ssi_pid;
result.si_uid = siginfo.ssi_uid;
break;
case SI_QUEUE:
case SI_MESGQ:
case SI_ASYNCIO:
default:
result.si_pid = siginfo.ssi_pid;
result.si_uid = siginfo.ssi_uid;
// This is awkward. In siginfo_t, si_ptr and si_int are in a union together. In
// signalfd_siginfo, they are not. We don't really know whether the app intended to send
// an int or a pointer. Presumably since the pointer is always larger than the int, if
// we write the pointer, we'll end up with the right value for the int? Presumably the
// two fields of signalfd_siginfo are actually extracted from one of these unions
// originally, so actually contain redundant data? Better write some tests...
result.si_ptr = reinterpret_cast<void*>(static_cast<uintptr_t>(siginfo.ssi_ptr));
break;
case SI_TIMER:
result.si_timerid = siginfo.ssi_tid;
result.si_overrun = siginfo.ssi_overrun;
// Again with this weirdness...
result.si_ptr = reinterpret_cast<void*>(static_cast<uintptr_t>(siginfo.ssi_ptr));
break;
}
}
~TimerPromiseAdapter() {
if (pos != port.timers->timers.end()) {
port.timers->timers.erase(pos);
return result;
}
void UnixEventPort::doEpollWait(int timeout) {
sigset_t newMask;
sigemptyset(&newMask);
{
auto ptr = signalHead;
while (ptr != nullptr) {
sigaddset(&newMask, ptr->signum);
ptr = ptr->next;
}
}
void fulfill() {
fulfiller.fulfill();
port.timers->timers.erase(pos);
pos = port.timers->timers.end();
if (memcmp(&newMask, &signalFdSigset, sizeof(newMask)) != 0) {
// Apparently we're not waiting on the same signals as last time. Need to update the signal
// FD's mask.
signalFdSigset = newMask;
KJ_SYSCALL(signalfd(signalFd, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC));
}
const TimePoint time;
PromiseFulfiller<void>& fulfiller;
UnixEventPort& port;
TimerSet::Timers::const_iterator pos;
};
struct epoll_event events[16];
int n;
KJ_SYSCALL(n = epoll_wait(epollFd, events, kj::size(events), timeout));
bool UnixEventPort::TimerSet::TimerBefore::operator()(
TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs) {
return lhs->time < rhs->time;
for (int i = 0; i < n; i++) {
if (events[i].data.u64 == 0) {
for (;;) {
struct signalfd_siginfo siginfo;
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = read(signalFd, &siginfo, sizeof(siginfo)));
if (n < 0) break; // no more signals
KJ_ASSERT(n == sizeof(siginfo));
gotSignal(toRegularSiginfo(siginfo));
}
} else if (events[i].data.u64 == 1) {
// Someone wanted to wake up this thread. Read and discard the event.
uint64_t value;
ssize_t n;
KJ_NONBLOCKING_SYSCALL(n = read(eventFd, &value, sizeof(value)));
KJ_ASSERT(n < 0 || n == sizeof(value));
} else {
FdObserver* observer = reinterpret_cast<FdObserver*>(events[i].data.ptr);
observer->fire(events[i].events);
}
}
processTimers();
}
#else // KJ_USE_EPOLL
// =======================================================================================
// Traditional poll() FdObserver implementation.
#ifndef POLLRDHUP
#define POLLRDHUP 0
#endif
UnixEventPort::UnixEventPort()
: timers(kj::heap<TimerSet>()),
frozenSteadyTime(currentSteadyTime()) {
......@@ -233,49 +524,101 @@ UnixEventPort::UnixEventPort()
UnixEventPort::~UnixEventPort() noexcept(false) {}
Promise<void> UnixEventPort::ReadObserver::whenBecomesReadable() {
return newAdaptedPromise<void, PollPromiseAdapter>(eventPort, fd, POLLIN | POLLRDHUP, *this);
UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint flags)
: eventPort(eventPort), fd(fd), flags(flags), next(nullptr), prev(nullptr) {}
UnixEventPort::FdObserver::~FdObserver() noexcept(false) {
if (prev != nullptr) {
if (next == nullptr) {
eventPort.observersTail = prev;
} else {
next->prev = prev;
}
*prev = next;
}
}
Promise<void> UnixEventPort::WriteObserver::whenBecomesWritable() {
return newAdaptedPromise<void, PollPromiseAdapter>(eventPort, fd, POLLOUT, nullptr);
void UnixEventPort::FdObserver::fire(short events) {
if (events & (POLLIN | POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) {
if (events & (POLLHUP | POLLRDHUP)) {
atEnd = true;
#if POLLRDHUP
} else {
// Since POLLRDHUP exists on this platform, and we didn't receive it, we know that we're not
// at the end.
atEnd = false;
#endif
}
KJ_IF_MAYBE(f, readFulfiller) {
f->get()->fulfill();
readFulfiller = nullptr;
}
}
if (events & (POLLOUT | POLLHUP | POLLERR | POLLNVAL)) {
KJ_IF_MAYBE(f, writeFulfiller) {
f->get()->fulfill();
writeFulfiller = nullptr;
}
}
if (readFulfiller == nullptr && writeFulfiller == nullptr) {
// Remove from list.
if (next == nullptr) {
eventPort.observersTail = prev;
} else {
next->prev = prev;
}
*prev = next;
next = nullptr;
prev = nullptr;
}
}
Promise<siginfo_t> UnixEventPort::onSignal(int signum) {
return newAdaptedPromise<siginfo_t, SignalPromiseAdapter>(*this, signum);
short UnixEventPort::FdObserver::getEventMask() {
return (readFulfiller == nullptr ? 0 : (POLLIN | POLLRDHUP)) |
(writeFulfiller == nullptr ? 0 : POLLOUT);
}
void UnixEventPort::captureSignal(int signum) {
if (reservedSignal == SIGUSR1) {
KJ_REQUIRE(signum != SIGUSR1,
"Sorry, SIGUSR1 is reserved by the UnixEventPort implementation. You may call "
"UnixEventPort::setReservedSignal() to reserve a different signal.");
} else {
KJ_REQUIRE(signum != reservedSignal,
"Can't capture signal reserved using setReservedSignal().", signum);
Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() {
KJ_REQUIRE(flags & OBSERVE_READ, "FdObserver was not set to observe reads.");
if (prev == nullptr) {
KJ_DASSERT(next == nullptr);
prev = eventPort.observersTail;
*prev = this;
eventPort.observersTail = &next;
}
registerSignalHandler(signum);
auto paf = newPromiseAndFulfiller<void>();
readFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
void UnixEventPort::setReservedSignal(int signum) {
KJ_REQUIRE(!tooLateToSetReserved,
"setReservedSignal() must be called before any calls to `captureSignal()` and "
"before any `UnixEventPort` is constructed.");
if (reservedSignal != SIGUSR1 && reservedSignal != signum) {
KJ_FAIL_REQUIRE("Detected multiple conflicting calls to setReservedSignal(). Please only "
"call this once, or always call it with the same signal number.");
Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() {
KJ_REQUIRE(flags & OBSERVE_WRITE, "FdObserver was not set to observe writes.");
if (prev == nullptr) {
KJ_DASSERT(next == nullptr);
prev = eventPort.observersTail;
*prev = this;
eventPort.observersTail = &next;
}
reservedSignal = signum;
auto paf = newPromiseAndFulfiller<void>();
writeFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
class UnixEventPort::PollContext {
public:
PollContext(PollPromiseAdapter* ptr) {
PollContext(FdObserver* ptr) {
while (ptr != nullptr) {
struct pollfd pollfd;
memset(&pollfd, 0, sizeof(pollfd));
pollfd.fd = ptr->fd;
pollfd.events = ptr->eventMask;
pollfd.events = ptr->getEventMask();
pollfds.add(pollfd);
pollEvents.add(ptr);
ptr = ptr->next;
......@@ -300,7 +643,6 @@ public:
for (auto i: indices(pollfds)) {
if (pollfds[i].revents != 0) {
pollEvents[i]->fire(pollfds[i].revents);
pollEvents[i]->removeFromList();
if (--pollResult <= 0) {
break;
}
......@@ -310,15 +652,11 @@ public:
private:
kj::Vector<struct pollfd> pollfds;
kj::Vector<PollPromiseAdapter*> pollEvents;
kj::Vector<FdObserver*> pollEvents;
int pollResult = 0;
int pollError = 0;
};
Promise<void> UnixEventPort::atSteadyTime(TimePoint time) {
return newAdaptedPromise<void, TimerPromiseAdapter>(*this, time);
}
void UnixEventPort::wait() {
sigset_t newMask;
sigemptyset(&newMask);
......@@ -332,7 +670,7 @@ void UnixEventPort::wait() {
}
}
PollContext pollContext(pollHead);
PollContext pollContext(observersHead);
// Capture signals.
SignalCapture capture;
......@@ -422,40 +760,13 @@ void UnixEventPort::poll() {
}
{
PollContext pollContext(pollHead);
PollContext pollContext(observersHead);
pollContext.run(0);
pollContext.processResults();
}
processTimers();
}
void UnixEventPort::gotSignal(const siginfo_t& siginfo) {
// Fire any events waiting on this signal.
auto ptr = signalHead;
while (ptr != nullptr) {
if (ptr->signum == siginfo.si_signo) {
ptr->fulfiller.fulfill(kj::cp(siginfo));
ptr = ptr->removeFromList();
} else {
ptr = ptr->next;
}
}
}
TimePoint UnixEventPort::currentSteadyTime() {
return origin<TimePoint>() + std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count() * NANOSECONDS;
}
void UnixEventPort::processTimers() {
frozenSteadyTime = currentSteadyTime();
for (;;) {
auto front = timers->timers.begin();
if (front == timers->timers.end() || (*front)->time > frozenSteadyTime) {
break;
}
(*front)->fulfill();
}
}
#endif // KJ_USE_EPOLL, else
} // namespace kj
......@@ -29,8 +29,12 @@
#include "async.h"
#include "time.h"
#include "vector.h"
#include "io.h"
#include <signal.h>
#include <pthread.h>
#if __linux__ && !defined(KJ_USE_EPOLL)
#define KJ_USE_EPOLL 1
#endif
namespace kj {
......@@ -51,9 +55,8 @@ public:
UnixEventPort();
~UnixEventPort() noexcept(false);
class ReadObserver;
class WriteObserver;
// Classes that watch an fd for readability or writability. See definitions below.
class FdObserver;
// Class that watches an fd for readability or writability. See definition below.
Promise<siginfo_t> onSignal(int signum);
// When the given signal is delivered to this thread, return the corresponding siginfo_t.
......@@ -91,34 +94,43 @@ public:
void poll() override;
private:
#if KJ_USE_EPOLL
// TODO(soon): epoll implementation
#else
class PollPromiseAdapter;
class SignalPromiseAdapter;
class TimerPromiseAdapter;
class PollContext;
struct TimerSet; // Defined in source file to avoid STL include.
class TimerPromiseAdapter;
class SignalPromiseAdapter;
PollPromiseAdapter* pollHead = nullptr;
PollPromiseAdapter** pollTail = &pollHead;
SignalPromiseAdapter* signalHead = nullptr;
SignalPromiseAdapter** signalTail = &signalHead;
Own<TimerSet> timers;
TimePoint frozenSteadyTime;
void gotSignal(const siginfo_t& siginfo);
SignalPromiseAdapter* signalHead = nullptr;
SignalPromiseAdapter** signalTail = &signalHead;
TimePoint currentSteadyTime();
void processTimers();
void gotSignal(const siginfo_t& siginfo);
friend class TimerPromiseAdapter;
#if KJ_USE_EPOLL
AutoCloseFd epollFd;
AutoCloseFd signalFd;
AutoCloseFd eventFd; // Used for cross-thread wakeups.
sigset_t signalFdSigset;
// Signal mask as currently set on the signalFd. Tracked so we can detect whether or not it
// needs updating.
void doEpollWait(int timeout);
#else
class PollContext;
FdObserver* observersHead = nullptr;
FdObserver** observersTail = &observersHead;
#endif
};
class UnixEventPort::ReadObserver {
// Object which watches a file descriptor to determine when it is readable.
class UnixEventPort::FdObserver {
// Object which watches a file descriptor to determine when it is readable or writable.
//
// For listen sockets, "readable" means that there is a connection to accept(). For everything
// else, it means that read() (or recv()) will return data.
......@@ -133,11 +145,19 @@ class UnixEventPort::ReadObserver {
// behavior. If at all possible, use the higher-level AsyncInputStream interface instead.
public:
ReadObserver(UnixEventPort& eventPort, int fd): eventPort(eventPort), fd(fd) {}
enum Flags {
OBSERVE_READ = 1,
OBSERVE_WRITE = 2,
OBSERVE_READ_WRITE = OBSERVE_READ | OBSERVE_WRITE
};
FdObserver(UnixEventPort& eventPort, int fd, uint flags);
// Begin watching the given file descriptor for readability. Only one ReadObserver may exist
// for a given file descriptor at a time.
KJ_DISALLOW_COPY(ReadObserver);
~FdObserver() noexcept(false);
KJ_DISALLOW_COPY(FdObserver);
Promise<void> whenBecomesReadable();
// Resolves the next time the file descriptor transitions from having no data to read to having
......@@ -177,37 +197,6 @@ public:
//
// This hint may be useful as an optimization to avoid an unnecessary system call.
private:
UnixEventPort& eventPort;
int fd;
#if KJ_USE_EPOLL
// TODO(soon): epoll implementation
Own<PromiseFulfiller<void>> fulfiller;
// Replaced each time `whenBecomesReadable()` is called.
#else
#endif
Maybe<bool> atEnd;
friend class UnixEventPort;
};
class UnixEventPort::WriteObserver {
// Object which watches a file descriptor to determine when it is writable.
//
// WARNING: The exact behavior of this class differs across systems, since event interfaces
// vary wildly. Be sure to read the documentation carefully and avoid depending on unspecified
// behavior. If at all possible, use the higher-level AsyncOutputStream interface instead.
public:
WriteObserver(UnixEventPort& eventPort, int fd): eventPort(eventPort), fd(fd) {}
// Begin watching the given file descriptor for writability. Only one WriteObserver may exist
// for a given file descriptor at a time.
KJ_DISALLOW_COPY(WriteObserver);
Promise<void> whenBecomesWritable();
// Resolves the next time the file descriptor transitions from having no space available in the
// write buffer to having some space available.
......@@ -232,11 +221,24 @@ public:
private:
UnixEventPort& eventPort;
int fd;
uint flags;
#if KJ_USE_EPOLL
Own<PromiseFulfiller<void>> fulfiller;
// Replaced each time `whenBecomesReadable()` is called.
#else
kj::Maybe<Own<PromiseFulfiller<void>>> readFulfiller;
kj::Maybe<Own<PromiseFulfiller<void>>> writeFulfiller;
// Replaced each time `whenBecomesReadable()` or `whenBecomesWritable()` is called. Reverted to
// null every time an event is fired.
Maybe<bool> atEnd;
void fire(short events);
#if !KJ_USE_EPOLL
FdObserver* next;
FdObserver** prev;
// Linked list of observers which currently have a non-null readFulfiller or writeFulfiller.
// If `prev` is null then the observer is not currently in the list.
short getEventMask();
#endif
friend class UnixEventPort;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment