// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors // Licensed under the MIT License: // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #if !_WIN32 #include "async-unix.h" #include "debug.h" #include "threadlocal.h" #include <setjmp.h> #include <errno.h> #include <inttypes.h> #include <limits> #include <chrono> #include <pthread.h> #include <map> #include <sys/wait.h> #if KJ_USE_EPOLL #include <unistd.h> #include <sys/epoll.h> #include <sys/signalfd.h> #include <sys/eventfd.h> #else #include <poll.h> #endif namespace kj { // ======================================================================================= // Timer code common to multiple implementations TimePoint UnixEventPort::readClock() { return origin<TimePoint>() + std::chrono::duration_cast<std::chrono::nanoseconds>( std::chrono::steady_clock::now().time_since_epoch()).count() * NANOSECONDS; } // ======================================================================================= // Signal code common to multiple implementations namespace { int reservedSignal = SIGUSR1; bool tooLateToSetReserved = false; bool capturedChildExit = false; bool threadClaimedChildExits = false; struct SignalCapture { sigjmp_buf jumpTo; siginfo_t siginfo; }; #if !KJ_USE_EPOLL // on Linux we'll use signalfd KJ_THREADLOCAL_PTR(SignalCapture) threadCapture = nullptr; void signalHandler(int, siginfo_t* siginfo, void*) { SignalCapture* capture = threadCapture; if (capture != nullptr) { capture->siginfo = *siginfo; siglongjmp(capture->jumpTo, 1); } } #endif void registerSignalHandler(int signum) { tooLateToSetReserved = true; sigset_t mask; KJ_SYSCALL(sigemptyset(&mask)); KJ_SYSCALL(sigaddset(&mask, signum)); KJ_SYSCALL(sigprocmask(SIG_BLOCK, &mask, nullptr)); #if !KJ_USE_EPOLL // on Linux we'll use signalfd struct sigaction action; memset(&action, 0, sizeof(action)); action.sa_sigaction = &signalHandler; KJ_SYSCALL(sigfillset(&action.sa_mask)); action.sa_flags = SA_SIGINFO; KJ_SYSCALL(sigaction(signum, &action, nullptr)); #endif } void registerReservedSignal() { registerSignalHandler(reservedSignal); // We also disable SIGPIPE because users of UnixEventPort almost certainly don't want it. while (signal(SIGPIPE, SIG_IGN) == SIG_ERR) { int error = errno; if (error != EINTR) { KJ_FAIL_SYSCALL("signal(SIGPIPE, SIG_IGN)", error); } } } pthread_once_t registerReservedSignalOnce = PTHREAD_ONCE_INIT; } // namespace struct UnixEventPort::ChildSet { std::map<pid_t, ChildExitPromiseAdapter*> waiters; void checkExits(); }; class UnixEventPort::ChildExitPromiseAdapter { public: inline ChildExitPromiseAdapter(PromiseFulfiller<int>& fulfiller, ChildSet& childSet, Maybe<pid_t>& pidRef) : childSet(childSet), pid(KJ_REQUIRE_NONNULL(pidRef, "`pid` must be non-null at the time `onChildExit()` is called")), pidRef(pidRef), fulfiller(fulfiller) { KJ_REQUIRE(childSet.waiters.insert(std::make_pair(pid, this)).second, "already called onChildExit() for this pid"); } ~ChildExitPromiseAdapter() noexcept(false) { childSet.waiters.erase(pid); } ChildSet& childSet; pid_t pid; Maybe<pid_t>& pidRef; PromiseFulfiller<int>& fulfiller; }; void UnixEventPort::ChildSet::checkExits() { for (;;) { int status; pid_t pid; KJ_SYSCALL_HANDLE_ERRORS(pid = waitpid(-1, &status, WNOHANG)) { case ECHILD: return; default: KJ_FAIL_SYSCALL("waitpid()", error); } if (pid == 0) break; auto iter = waiters.find(pid); if (iter != waiters.end()) { iter->second->pidRef = nullptr; iter->second->fulfiller.fulfill(kj::cp(status)); } } } Promise<int> UnixEventPort::onChildExit(Maybe<pid_t>& pid) { KJ_REQUIRE(capturedChildExit, "must call UnixEventPort::captureChildExit() to use onChildExit()."); ChildSet* cs; KJ_IF_MAYBE(c, childSet) { cs = *c; } else { // In theory we should do an atomic compare-and-swap on threadClaimedChildExits, but this is // for debug purposes only so it's not a big deal. KJ_REQUIRE(!threadClaimedChildExits, "only one UnixEvertPort per process may listen for child exits"); threadClaimedChildExits = true; auto newChildSet = kj::heap<ChildSet>(); cs = newChildSet; childSet = kj::mv(newChildSet); } return kj::newAdaptedPromise<int, ChildExitPromiseAdapter>(*cs, pid); } void UnixEventPort::captureChildExit() { captureSignal(SIGCHLD); capturedChildExit = true; } class UnixEventPort::SignalPromiseAdapter { public: inline SignalPromiseAdapter(PromiseFulfiller<siginfo_t>& fulfiller, UnixEventPort& loop, int signum) : loop(loop), signum(signum), fulfiller(fulfiller) { prev = loop.signalTail; *loop.signalTail = this; loop.signalTail = &next; } ~SignalPromiseAdapter() noexcept(false) { if (prev != nullptr) { if (next == nullptr) { loop.signalTail = prev; } else { next->prev = prev; } *prev = next; } } SignalPromiseAdapter* removeFromList() { auto result = next; if (next == nullptr) { loop.signalTail = prev; } else { next->prev = prev; } *prev = next; next = nullptr; prev = nullptr; return result; } UnixEventPort& loop; int signum; PromiseFulfiller<siginfo_t>& fulfiller; SignalPromiseAdapter* next = nullptr; SignalPromiseAdapter** prev = nullptr; }; Promise<siginfo_t> UnixEventPort::onSignal(int signum) { KJ_REQUIRE(signum != SIGCHLD || !capturedChildExit, "can't call onSigal(SIGCHLD) when kj::UnixEventPort::captureChildExit() has been called"); return newAdaptedPromise<siginfo_t, SignalPromiseAdapter>(*this, signum); } void UnixEventPort::captureSignal(int signum) { if (reservedSignal == SIGUSR1) { KJ_REQUIRE(signum != SIGUSR1, "Sorry, SIGUSR1 is reserved by the UnixEventPort implementation. You may call " "UnixEventPort::setReservedSignal() to reserve a different signal."); } else { KJ_REQUIRE(signum != reservedSignal, "Can't capture signal reserved using setReservedSignal().", signum); } registerSignalHandler(signum); } void UnixEventPort::setReservedSignal(int signum) { KJ_REQUIRE(!tooLateToSetReserved, "setReservedSignal() must be called before any calls to `captureSignal()` and " "before any `UnixEventPort` is constructed."); if (reservedSignal != SIGUSR1 && reservedSignal != signum) { KJ_FAIL_REQUIRE("Detected multiple conflicting calls to setReservedSignal(). Please only " "call this once, or always call it with the same signal number."); } reservedSignal = signum; } void UnixEventPort::gotSignal(const siginfo_t& siginfo) { // If onChildExit() has been called and this is SIGCHLD, check for child exits. KJ_IF_MAYBE(cs, childSet) { if (siginfo.si_signo == SIGCHLD) { cs->get()->checkExits(); return; } } // Fire any events waiting on this signal. auto ptr = signalHead; while (ptr != nullptr) { if (ptr->signum == siginfo.si_signo) { ptr->fulfiller.fulfill(kj::cp(siginfo)); ptr = ptr->removeFromList(); } else { ptr = ptr->next; } } } #if KJ_USE_EPOLL // ======================================================================================= // epoll FdObserver implementation UnixEventPort::UnixEventPort() : timerImpl(readClock()), epollFd(-1), signalFd(-1), eventFd(-1) { pthread_once(®isterReservedSignalOnce, ®isterReservedSignal); int fd; KJ_SYSCALL(fd = epoll_create1(EPOLL_CLOEXEC)); epollFd = AutoCloseFd(fd); KJ_SYSCALL(sigemptyset(&signalFdSigset)); KJ_SYSCALL(fd = signalfd(-1, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC)); signalFd = AutoCloseFd(fd); KJ_SYSCALL(fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); eventFd = AutoCloseFd(fd); struct epoll_event event; memset(&event, 0, sizeof(event)); event.events = EPOLLIN; event.data.u64 = 0; KJ_SYSCALL(epoll_ctl(epollFd, EPOLL_CTL_ADD, signalFd, &event)); event.data.u64 = 1; KJ_SYSCALL(epoll_ctl(epollFd, EPOLL_CTL_ADD, eventFd, &event)); } UnixEventPort::~UnixEventPort() noexcept(false) { if (childSet != nullptr) { // We had claimed the exclusive right to call onChildExit(). Release that right. threadClaimedChildExits = false; } } UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint flags) : eventPort(eventPort), fd(fd), flags(flags) { struct epoll_event event; memset(&event, 0, sizeof(event)); if (flags & OBSERVE_READ) { event.events |= EPOLLIN | EPOLLRDHUP; } if (flags & OBSERVE_WRITE) { event.events |= EPOLLOUT; } if (flags & OBSERVE_URGENT) { event.events |= EPOLLPRI; } event.events |= EPOLLET; // Set edge-triggered mode. event.data.ptr = this; KJ_SYSCALL(epoll_ctl(eventPort.epollFd, EPOLL_CTL_ADD, fd, &event)); } UnixEventPort::FdObserver::~FdObserver() noexcept(false) { KJ_SYSCALL(epoll_ctl(eventPort.epollFd, EPOLL_CTL_DEL, fd, nullptr)) { break; } } void UnixEventPort::FdObserver::fire(short events) { if (events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR)) { if (events & (EPOLLHUP | EPOLLRDHUP)) { atEnd = true; } else { // Since we didn't receive EPOLLRDHUP, we know that we're not at the end. atEnd = false; } KJ_IF_MAYBE(f, readFulfiller) { f->get()->fulfill(); readFulfiller = nullptr; } } if (events & (EPOLLOUT | EPOLLHUP | EPOLLERR)) { KJ_IF_MAYBE(f, writeFulfiller) { f->get()->fulfill(); writeFulfiller = nullptr; } } if (events & (EPOLLHUP | EPOLLERR)) { KJ_IF_MAYBE(f, hupFulfiller) { f->get()->fulfill(); hupFulfiller = nullptr; } } if (events & EPOLLPRI) { KJ_IF_MAYBE(f, urgentFulfiller) { f->get()->fulfill(); urgentFulfiller = nullptr; } } } Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() { KJ_REQUIRE(flags & OBSERVE_READ, "FdObserver was not set to observe reads."); auto paf = newPromiseAndFulfiller<void>(); readFulfiller = kj::mv(paf.fulfiller); return kj::mv(paf.promise); } Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() { KJ_REQUIRE(flags & OBSERVE_WRITE, "FdObserver was not set to observe writes."); auto paf = newPromiseAndFulfiller<void>(); writeFulfiller = kj::mv(paf.fulfiller); return kj::mv(paf.promise); } Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() { KJ_REQUIRE(flags & OBSERVE_URGENT, "FdObserver was not set to observe availability of urgent data."); auto paf = newPromiseAndFulfiller<void>(); urgentFulfiller = kj::mv(paf.fulfiller); return kj::mv(paf.promise); } Promise<void> UnixEventPort::FdObserver::whenWriteDisconnected() { auto paf = newPromiseAndFulfiller<void>(); hupFulfiller = kj::mv(paf.fulfiller); return kj::mv(paf.promise); } bool UnixEventPort::wait() { return doEpollWait( timerImpl.timeoutToNextEvent(readClock(), MILLISECONDS, int(maxValue)) .map([](uint64_t t) -> int { return t; }) .orDefault(-1)); } bool UnixEventPort::poll() { return doEpollWait(0); } void UnixEventPort::wake() const { uint64_t one = 1; ssize_t n; KJ_NONBLOCKING_SYSCALL(n = write(eventFd, &one, sizeof(one))); KJ_ASSERT(n < 0 || n == sizeof(one)); } static siginfo_t toRegularSiginfo(const struct signalfd_siginfo& siginfo) { // Unfortunately, siginfo_t is mostly a big union and the correct set of fields to fill in // depends on the type of signal. OTOH, signalfd_siginfo is a flat struct that expands all // siginfo_t's union fields out to be non-overlapping. We can't just copy all the fields over // because of the unions; we have to carefully figure out which fields are appropriate to fill // in for this signal. Ick. siginfo_t result; memset(&result, 0, sizeof(result)); result.si_signo = siginfo.ssi_signo; result.si_errno = siginfo.ssi_errno; result.si_code = siginfo.ssi_code; if (siginfo.ssi_code > 0) { // Signal originated from the kernel. The structure of the siginfo depends primarily on the // signal number. switch (siginfo.ssi_signo) { case SIGCHLD: result.si_pid = siginfo.ssi_pid; result.si_uid = siginfo.ssi_uid; result.si_status = siginfo.ssi_status; result.si_utime = siginfo.ssi_utime; result.si_stime = siginfo.ssi_stime; break; case SIGILL: case SIGFPE: case SIGSEGV: case SIGBUS: case SIGTRAP: result.si_addr = reinterpret_cast<void*>(static_cast<uintptr_t>(siginfo.ssi_addr)); #ifdef si_trapno result.si_trapno = siginfo.ssi_trapno; #endif #ifdef si_addr_lsb // ssi_addr_lsb is defined as coming immediately after ssi_addr in the kernel headers but // apparently the userspace headers were never updated. So we do a pointer hack. :( result.si_addr_lsb = *reinterpret_cast<const uint16_t*>(&siginfo.ssi_addr + 1); #endif break; case SIGIO: static_assert(SIGIO == SIGPOLL, "SIGIO != SIGPOLL?"); // Note: Technically, code can arrange for SIGIO signals to be delivered with a signal number // other than SIGIO. AFAICT there is no way for us to detect this in the siginfo. Luckily // SIGIO is totally obsoleted by epoll so it shouldn't come up. result.si_band = siginfo.ssi_band; result.si_fd = siginfo.ssi_fd; break; case SIGSYS: // Apparently SIGSYS's fields are not available in signalfd_siginfo? break; } } else { // Signal originated from userspace. The sender could specify whatever signal number they // wanted. The structure of the signal is determined by the API they used, which is identified // by SI_CODE. switch (siginfo.ssi_code) { case SI_USER: case SI_TKILL: // kill(), tkill(), or tgkill(). result.si_pid = siginfo.ssi_pid; result.si_uid = siginfo.ssi_uid; break; case SI_QUEUE: case SI_MESGQ: case SI_ASYNCIO: default: result.si_pid = siginfo.ssi_pid; result.si_uid = siginfo.ssi_uid; // This is awkward. In siginfo_t, si_ptr and si_int are in a union together. In // signalfd_siginfo, they are not. We don't really know whether the app intended to send // an int or a pointer. Presumably since the pointer is always larger than the int, if // we write the pointer, we'll end up with the right value for the int? Presumably the // two fields of signalfd_siginfo are actually extracted from one of these unions // originally, so actually contain redundant data? Better write some tests... // // Making matters even stranger, siginfo.ssi_ptr is 64-bit even on 32-bit systems, and // it appears that instead of doing the obvious thing by casting the pointer value to // 64 bits, the kernel actually memcpy()s the 32-bit value into the 64-bit space. As // a result, on big-endian 32-bit systems, the original pointer value ends up in the // *upper* 32 bits of siginfo.ssi_ptr, which is totally weird. We play along and use // a memcpy() on our end too, to get the right result on all platforms. memcpy(&result.si_ptr, &siginfo.ssi_ptr, sizeof(result.si_ptr)); break; case SI_TIMER: result.si_timerid = siginfo.ssi_tid; result.si_overrun = siginfo.ssi_overrun; // Again with this weirdness... result.si_ptr = reinterpret_cast<void*>(static_cast<uintptr_t>(siginfo.ssi_ptr)); break; } } return result; } bool UnixEventPort::doEpollWait(int timeout) { sigset_t newMask; sigemptyset(&newMask); { auto ptr = signalHead; while (ptr != nullptr) { sigaddset(&newMask, ptr->signum); ptr = ptr->next; } if (childSet != nullptr) { sigaddset(&newMask, SIGCHLD); } } if (memcmp(&newMask, &signalFdSigset, sizeof(newMask)) != 0) { // Apparently we're not waiting on the same signals as last time. Need to update the signal // FD's mask. signalFdSigset = newMask; KJ_SYSCALL(signalfd(signalFd, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC)); } struct epoll_event events[16]; int n = epoll_wait(epollFd, events, kj::size(events), timeout); if (n < 0) { int error = errno; if (error == EINTR) { // We can't simply restart the epoll call because we need to recompute the timeout. Instead, // we pretend epoll_wait() returned zero events. This will cause the event loop to spin once, // decide it has nothing to do, recompute timeouts, then return to waiting. n = 0; } else { KJ_FAIL_SYSCALL("epoll_wait()", error); } } bool woken = false; for (int i = 0; i < n; i++) { if (events[i].data.u64 == 0) { for (;;) { struct signalfd_siginfo siginfo; ssize_t n; KJ_NONBLOCKING_SYSCALL(n = read(signalFd, &siginfo, sizeof(siginfo))); if (n < 0) break; // no more signals KJ_ASSERT(n == sizeof(siginfo)); gotSignal(toRegularSiginfo(siginfo)); } } else if (events[i].data.u64 == 1) { // Someone called wake() from another thread. Consume the event. uint64_t value; ssize_t n; KJ_NONBLOCKING_SYSCALL(n = read(eventFd, &value, sizeof(value))); KJ_ASSERT(n < 0 || n == sizeof(value)); // We were woken. Need to return true. woken = true; } else { FdObserver* observer = reinterpret_cast<FdObserver*>(events[i].data.ptr); observer->fire(events[i].events); } } timerImpl.advanceTo(readClock()); return woken; } #else // KJ_USE_EPOLL // ======================================================================================= // Traditional poll() FdObserver implementation. #ifndef POLLRDHUP #define POLLRDHUP 0 #endif UnixEventPort::UnixEventPort() : timerImpl(readClock()) { static_assert(sizeof(threadId) >= sizeof(pthread_t), "pthread_t is larger than a long long on your platform. Please port."); *reinterpret_cast<pthread_t*>(&threadId) = pthread_self(); pthread_once(®isterReservedSignalOnce, ®isterReservedSignal); } UnixEventPort::~UnixEventPort() noexcept(false) {} UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint flags) : eventPort(eventPort), fd(fd), flags(flags), next(nullptr), prev(nullptr) {} UnixEventPort::FdObserver::~FdObserver() noexcept(false) { if (prev != nullptr) { if (next == nullptr) { eventPort.observersTail = prev; } else { next->prev = prev; } *prev = next; } } void UnixEventPort::FdObserver::fire(short events) { if (events & (POLLIN | POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) { if (events & (POLLHUP | POLLRDHUP)) { atEnd = true; #if POLLRDHUP } else { // Since POLLRDHUP exists on this platform, and we didn't receive it, we know that we're not // at the end. atEnd = false; #endif } KJ_IF_MAYBE(f, readFulfiller) { f->get()->fulfill(); readFulfiller = nullptr; } } if (events & (POLLOUT | POLLHUP | POLLERR | POLLNVAL)) { KJ_IF_MAYBE(f, writeFulfiller) { f->get()->fulfill(); writeFulfiller = nullptr; } } if (events & (POLLHUP | POLLERR | POLLNVAL)) { KJ_IF_MAYBE(f, hupFulfiller) { f->get()->fulfill(); hupFulfiller = nullptr; } } if (events & POLLPRI) { KJ_IF_MAYBE(f, urgentFulfiller) { f->get()->fulfill(); urgentFulfiller = nullptr; } } if (readFulfiller == nullptr && writeFulfiller == nullptr && urgentFulfiller == nullptr) { // Remove from list. if (next == nullptr) { eventPort.observersTail = prev; } else { next->prev = prev; } *prev = next; next = nullptr; prev = nullptr; } } short UnixEventPort::FdObserver::getEventMask() { return (readFulfiller == nullptr ? 0 : (POLLIN | POLLRDHUP)) | (writeFulfiller == nullptr ? 0 : POLLOUT) | (urgentFulfiller == nullptr ? 0 : POLLPRI) | // The POSIX standard says POLLHUP and POLLERR will be reported even if not requested. // But on MacOS, if `events` is 0, then POLLHUP apparently will not be reported: // https://openradar.appspot.com/37537852 // It seems that by settingc any non-zero value -- even one documented as ignored -- we // cause POLLHUP to be reported. Both POLLHUP and POLLERR are documented as being ignored. // So, we'll go ahead and set them. This has no effect on non-broken OSs, causes MacOS to // do the right thing, and sort of looks as if we're explicitly requesting notification of // these two conditions, which we do after all want to know about. POLLHUP | POLLERR; } Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() { KJ_REQUIRE(flags & OBSERVE_READ, "FdObserver was not set to observe reads."); if (prev == nullptr) { KJ_DASSERT(next == nullptr); prev = eventPort.observersTail; *prev = this; eventPort.observersTail = &next; } auto paf = newPromiseAndFulfiller<void>(); readFulfiller = kj::mv(paf.fulfiller); return kj::mv(paf.promise); } Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() { KJ_REQUIRE(flags & OBSERVE_WRITE, "FdObserver was not set to observe writes."); if (prev == nullptr) { KJ_DASSERT(next == nullptr); prev = eventPort.observersTail; *prev = this; eventPort.observersTail = &next; } auto paf = newPromiseAndFulfiller<void>(); writeFulfiller = kj::mv(paf.fulfiller); return kj::mv(paf.promise); } Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() { KJ_REQUIRE(flags & OBSERVE_URGENT, "FdObserver was not set to observe availability of urgent data."); if (prev == nullptr) { KJ_DASSERT(next == nullptr); prev = eventPort.observersTail; *prev = this; eventPort.observersTail = &next; } auto paf = newPromiseAndFulfiller<void>(); urgentFulfiller = kj::mv(paf.fulfiller); return kj::mv(paf.promise); } Promise<void> UnixEventPort::FdObserver::whenWriteDisconnected() { if (prev == nullptr) { KJ_DASSERT(next == nullptr); prev = eventPort.observersTail; *prev = this; eventPort.observersTail = &next; } auto paf = newPromiseAndFulfiller<void>(); hupFulfiller = kj::mv(paf.fulfiller); return kj::mv(paf.promise); } class UnixEventPort::PollContext { public: PollContext(FdObserver* ptr) { while (ptr != nullptr) { struct pollfd pollfd; memset(&pollfd, 0, sizeof(pollfd)); pollfd.fd = ptr->fd; pollfd.events = ptr->getEventMask(); pollfds.add(pollfd); pollEvents.add(ptr); ptr = ptr->next; } } void run(int timeout) { pollResult = ::poll(pollfds.begin(), pollfds.size(), timeout); pollError = pollResult < 0 ? errno : 0; if (pollError == EINTR) { // We can't simply restart the poll call because we need to recompute the timeout. Instead, // we pretend poll() returned zero events. This will cause the event loop to spin once, // decide it has nothing to do, recompute timeouts, then return to waiting. pollResult = 0; pollError = 0; } } void processResults() { if (pollResult < 0) { KJ_FAIL_SYSCALL("poll()", pollError); } for (auto i: indices(pollfds)) { if (pollfds[i].revents != 0) { pollEvents[i]->fire(pollfds[i].revents); if (--pollResult <= 0) { break; } } } } private: kj::Vector<struct pollfd> pollfds; kj::Vector<FdObserver*> pollEvents; int pollResult = 0; int pollError = 0; }; bool UnixEventPort::wait() { sigset_t newMask; sigemptyset(&newMask); sigaddset(&newMask, reservedSignal); { auto ptr = signalHead; while (ptr != nullptr) { sigaddset(&newMask, ptr->signum); ptr = ptr->next; } if (childSet != nullptr) { sigaddset(&newMask, SIGCHLD); } } PollContext pollContext(observersHead); // Capture signals. SignalCapture capture; if (sigsetjmp(capture.jumpTo, true)) { // We received a signal and longjmp'd back out of the signal handler. threadCapture = nullptr; if (capture.siginfo.si_signo == reservedSignal) { return true; } else { gotSignal(capture.siginfo); return false; } } // Enable signals, run the poll, then mask them again. sigset_t origMask; threadCapture = &capture; sigprocmask(SIG_UNBLOCK, &newMask, &origMask); pollContext.run( timerImpl.timeoutToNextEvent(readClock(), MILLISECONDS, int(maxValue)) .map([](uint64_t t) -> int { return t; }) .orDefault(-1)); sigprocmask(SIG_SETMASK, &origMask, nullptr); threadCapture = nullptr; // Queue events. pollContext.processResults(); timerImpl.advanceTo(readClock()); return false; } bool UnixEventPort::poll() { // volatile so that longjmp() doesn't clobber it. volatile bool woken = false; sigset_t pending; sigset_t waitMask; sigemptyset(&pending); sigfillset(&waitMask); // Count how many signals that we care about are pending. KJ_SYSCALL(sigpending(&pending)); uint signalCount = 0; if (sigismember(&pending, reservedSignal)) { ++signalCount; sigdelset(&pending, reservedSignal); sigdelset(&waitMask, reservedSignal); } { auto ptr = signalHead; while (ptr != nullptr) { if (sigismember(&pending, ptr->signum)) { ++signalCount; sigdelset(&pending, ptr->signum); sigdelset(&waitMask, ptr->signum); } ptr = ptr->next; } } // Wait for each pending signal. It would be nice to use sigtimedwait() here but it is not // available on OSX. :( Instead, we call sigsuspend() once per expected signal. while (signalCount-- > 0) { SignalCapture capture; threadCapture = &capture; if (sigsetjmp(capture.jumpTo, true)) { // We received a signal and longjmp'd back out of the signal handler. sigdelset(&waitMask, capture.siginfo.si_signo); if (capture.siginfo.si_signo == reservedSignal) { woken = true; } else { gotSignal(capture.siginfo); } } else { sigsuspend(&waitMask); KJ_FAIL_ASSERT("sigsuspend() shouldn't return because the signal handler should " "have siglongjmp()ed."); } threadCapture = nullptr; } { PollContext pollContext(observersHead); pollContext.run(0); pollContext.processResults(); } timerImpl.advanceTo(readClock()); return woken; } void UnixEventPort::wake() const { int error = pthread_kill(*reinterpret_cast<const pthread_t*>(&threadId), reservedSignal); if (error != 0) { KJ_FAIL_SYSCALL("pthread_kill", error); } } #endif // KJ_USE_EPOLL, else } // namespace kj #endif // !_WIN32