Commit cde77c06 authored by Kenton Varda's avatar Kenton Varda

Work around signals being totally busted on Cygwin.

See my latest report: https://cygwin.com/ml/cygwin/2019-07/msg00052.html

We can't use signals for cross-thread wakeups on Cygwin because the same signal cannot be pending on two different threads at the same time.

So I broke down and made an implementation that uses pipes. Ugh.
parent 53b20b7a
...@@ -31,14 +31,15 @@ ...@@ -31,14 +31,15 @@
#include <pthread.h> #include <pthread.h>
#include <map> #include <map>
#include <sys/wait.h> #include <sys/wait.h>
#include <unistd.h>
#if KJ_USE_EPOLL #if KJ_USE_EPOLL
#include <unistd.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/signalfd.h> #include <sys/signalfd.h>
#include <sys/eventfd.h> #include <sys/eventfd.h>
#else #else
#include <poll.h> #include <poll.h>
#include <fcntl.h>
#endif #endif
namespace kj { namespace kj {
...@@ -108,10 +109,14 @@ void registerSignalHandler(int signum) { ...@@ -108,10 +109,14 @@ void registerSignalHandler(int signum) {
#endif #endif
} }
#if !KJ_USE_EPOLL && !KJ_USE_PIPE_FOR_WAKEUP
void registerReservedSignal() { void registerReservedSignal() {
registerSignalHandler(reservedSignal); registerSignalHandler(reservedSignal);
}
#endif
// We also disable SIGPIPE because users of UnixEventPort almost certainly don't want it. void ignoreSigpipe() {
// We disable SIGPIPE because users of UnixEventPort almost certainly don't want it.
while (signal(SIGPIPE, SIG_IGN) == SIG_ERR) { while (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
int error = errno; int error = errno;
if (error != EINTR) { if (error != EINTR) {
...@@ -298,10 +303,7 @@ UnixEventPort::UnixEventPort() ...@@ -298,10 +303,7 @@ UnixEventPort::UnixEventPort()
epollFd(-1), epollFd(-1),
signalFd(-1), signalFd(-1),
eventFd(-1) { eventFd(-1) {
// TODO(cleanup): We don't use the reserved signal on Linux; we use an eventfd instead. Should we ignoreSigpipe();
// skip registering it? Note that registerReservedSignal() also takes care of blocking SIGPIPE
// which is important.
registerReservedSignal();
int fd; int fd;
KJ_SYSCALL(fd = epoll_create1(EPOLL_CLOEXEC)); KJ_SYSCALL(fd = epoll_create1(EPOLL_CLOEXEC));
...@@ -631,6 +633,15 @@ bool UnixEventPort::doEpollWait(int timeout) { ...@@ -631,6 +633,15 @@ bool UnixEventPort::doEpollWait(int timeout) {
UnixEventPort::UnixEventPort() UnixEventPort::UnixEventPort()
: clock(systemPreciseMonotonicClock()), : clock(systemPreciseMonotonicClock()),
timerImpl(clock.now()) { timerImpl(clock.now()) {
#if KJ_USE_PIPE_FOR_WAKEUP
// Allocate a pipe to which we'll write a byte in order to wake this thread.
int fds[2];
KJ_SYSCALL(pipe(fds));
wakePipeIn = kj::AutoCloseFd(fds[0]);
wakePipeOut = kj::AutoCloseFd(fds[1]);
KJ_SYSCALL(fcntl(wakePipeIn, F_SETFD, FD_CLOEXEC));
KJ_SYSCALL(fcntl(wakePipeOut, F_SETFD, FD_CLOEXEC));
#else
static_assert(sizeof(threadId) >= sizeof(pthread_t), static_assert(sizeof(threadId) >= sizeof(pthread_t),
"pthread_t is larger than a long long on your platform. Please port."); "pthread_t is larger than a long long on your platform. Please port.");
*reinterpret_cast<pthread_t*>(&threadId) = pthread_self(); *reinterpret_cast<pthread_t*>(&threadId) = pthread_self();
...@@ -644,6 +655,9 @@ UnixEventPort::UnixEventPort() ...@@ -644,6 +655,9 @@ UnixEventPort::UnixEventPort()
// by the pthread_once and the mask update happened in every thread, but registering a signal // by the pthread_once and the mask update happened in every thread, but registering a signal
// handler is not an expensive operation, so whatever... we'll do it in every thread. // handler is not an expensive operation, so whatever... we'll do it in every thread.
registerReservedSignal(); registerReservedSignal();
#endif
ignoreSigpipe();
} }
UnixEventPort::~UnixEventPort() noexcept(false) {} UnixEventPort::~UnixEventPort() noexcept(false) {}
...@@ -791,16 +805,25 @@ Promise<void> UnixEventPort::FdObserver::whenWriteDisconnected() { ...@@ -791,16 +805,25 @@ Promise<void> UnixEventPort::FdObserver::whenWriteDisconnected() {
class UnixEventPort::PollContext { class UnixEventPort::PollContext {
public: public:
PollContext(FdObserver* ptr) { PollContext(UnixEventPort& port) {
while (ptr != nullptr) { for (FdObserver* ptr = port.observersHead; ptr != nullptr; ptr = ptr->next) {
struct pollfd pollfd; struct pollfd pollfd;
memset(&pollfd, 0, sizeof(pollfd)); memset(&pollfd, 0, sizeof(pollfd));
pollfd.fd = ptr->fd; pollfd.fd = ptr->fd;
pollfd.events = ptr->getEventMask(); pollfd.events = ptr->getEventMask();
pollfds.add(pollfd); pollfds.add(pollfd);
pollEvents.add(ptr); pollEvents.add(ptr);
ptr = ptr->next;
} }
#if KJ_USE_PIPE_FOR_WAKEUP
{
struct pollfd pollfd;
memset(&pollfd, 0, sizeof(pollfd));
pollfd.fd = port.wakePipeIn;
pollfd.events = POLLIN;
pollfds.add(pollfd);
}
#endif
} }
void run(int timeout) { void run(int timeout) {
...@@ -816,19 +839,36 @@ public: ...@@ -816,19 +839,36 @@ public:
} }
} }
void processResults() { bool processResults() {
if (pollResult < 0) { if (pollResult < 0) {
KJ_FAIL_SYSCALL("poll()", pollError); KJ_FAIL_SYSCALL("poll()", pollError);
} }
bool woken = false;
for (auto i: indices(pollfds)) { for (auto i: indices(pollfds)) {
if (pollfds[i].revents != 0) { if (pollfds[i].revents != 0) {
pollEvents[i]->fire(pollfds[i].revents); #if KJ_USE_PIPE_FOR_WAKEUP
if (i == pollEvents.size()) {
// The last pollfd is our cross-thread wake pipe.
woken = true;
// Discard junk in the wake pipe.
char junk[256];
ssize_t n;
do {
KJ_NONBLOCKING_SYSCALL(n = read(pollfds[i].fd, junk, sizeof(junk)));
} while (n >= 256);
} else {
#endif
pollEvents[i]->fire(pollfds[i].revents);
#if KJ_USE_PIPE_FOR_WAKEUP
}
#endif
if (--pollResult <= 0) { if (--pollResult <= 0) {
break; break;
} }
} }
} }
return woken;
} }
private: private:
...@@ -841,7 +881,10 @@ private: ...@@ -841,7 +881,10 @@ private:
bool UnixEventPort::wait() { bool UnixEventPort::wait() {
sigset_t newMask; sigset_t newMask;
sigemptyset(&newMask); sigemptyset(&newMask);
#if !KJ_USE_PIPE_FOR_WAKEUP
sigaddset(&newMask, reservedSignal); sigaddset(&newMask, reservedSignal);
#endif
{ {
auto ptr = signalHead; auto ptr = signalHead;
...@@ -854,7 +897,7 @@ bool UnixEventPort::wait() { ...@@ -854,7 +897,7 @@ bool UnixEventPort::wait() {
} }
} }
PollContext pollContext(observersHead); PollContext pollContext(*this);
// Capture signals. // Capture signals.
SignalCapture capture; SignalCapture capture;
...@@ -863,12 +906,16 @@ bool UnixEventPort::wait() { ...@@ -863,12 +906,16 @@ bool UnixEventPort::wait() {
// We received a signal and longjmp'd back out of the signal handler. // We received a signal and longjmp'd back out of the signal handler.
threadCapture = nullptr; threadCapture = nullptr;
#if !KJ_USE_PIPE_FOR_WAKEUP
if (capture.siginfo.si_signo == reservedSignal) { if (capture.siginfo.si_signo == reservedSignal) {
return true; return true;
} else { } else {
#endif
gotSignal(capture.siginfo); gotSignal(capture.siginfo);
return false; return false;
#if !KJ_USE_PIPE_FOR_WAKEUP
} }
#endif
} }
// Enable signals, run the poll, then mask them again. // Enable signals, run the poll, then mask them again.
...@@ -884,10 +931,10 @@ bool UnixEventPort::wait() { ...@@ -884,10 +931,10 @@ bool UnixEventPort::wait() {
threadCapture = nullptr; threadCapture = nullptr;
// Queue events. // Queue events.
pollContext.processResults(); bool result = pollContext.processResults();
timerImpl.advanceTo(clock.now()); timerImpl.advanceTo(clock.now());
return false; return result;
} }
bool UnixEventPort::poll() { bool UnixEventPort::poll() {
...@@ -903,11 +950,13 @@ bool UnixEventPort::poll() { ...@@ -903,11 +950,13 @@ bool UnixEventPort::poll() {
KJ_SYSCALL(sigpending(&pending)); KJ_SYSCALL(sigpending(&pending));
uint signalCount = 0; uint signalCount = 0;
#if !KJ_USE_PIPE_FOR_WAKEUP
if (sigismember(&pending, reservedSignal)) { if (sigismember(&pending, reservedSignal)) {
++signalCount; ++signalCount;
sigdelset(&pending, reservedSignal); sigdelset(&pending, reservedSignal);
sigdelset(&waitMask, reservedSignal); sigdelset(&waitMask, reservedSignal);
} }
#endif
{ {
auto ptr = signalHead; auto ptr = signalHead;
...@@ -932,11 +981,15 @@ bool UnixEventPort::poll() { ...@@ -932,11 +981,15 @@ bool UnixEventPort::poll() {
if (sigsetjmp(capture.jumpTo, false)) { if (sigsetjmp(capture.jumpTo, false)) {
// We received a signal and longjmp'd back out of the signal handler. // We received a signal and longjmp'd back out of the signal handler.
sigdelset(&waitMask, capture.siginfo.si_signo); sigdelset(&waitMask, capture.siginfo.si_signo);
#if !KJ_USE_PIPE_FOR_WAKEUP
if (capture.siginfo.si_signo == reservedSignal) { if (capture.siginfo.si_signo == reservedSignal) {
woken = true; woken = true;
} else { } else {
#endif
gotSignal(capture.siginfo); gotSignal(capture.siginfo);
#if !KJ_USE_PIPE_FOR_WAKEUP
} }
#endif
} else { } else {
#if __CYGWIN__ #if __CYGWIN__
// Cygwin's sigpending() incorrectly reports signals pending for any thread, not just our // Cygwin's sigpending() incorrectly reports signals pending for any thread, not just our
...@@ -961,9 +1014,11 @@ bool UnixEventPort::poll() { ...@@ -961,9 +1014,11 @@ bool UnixEventPort::poll() {
} }
{ {
PollContext pollContext(observersHead); PollContext pollContext(*this);
pollContext.run(0); pollContext.run(0);
pollContext.processResults(); if (pollContext.processResults()) {
woken = true;
}
} }
timerImpl.advanceTo(clock.now()); timerImpl.advanceTo(clock.now());
...@@ -971,10 +1026,20 @@ bool UnixEventPort::poll() { ...@@ -971,10 +1026,20 @@ bool UnixEventPort::poll() {
} }
void UnixEventPort::wake() const { void UnixEventPort::wake() const {
#if KJ_USE_PIPE_FOR_WAKEUP
// We're going to write() a single byte to our wake pipe in order to cause poll() to complete in
// the target thread.
//
// If this write() fails with EWOULDBLOCK, we don't care, because the target thread is already
// scheduled to wake up.
char c = 0;
KJ_NONBLOCKING_SYSCALL(write(wakePipeOut, &c, 1));
#else
int error = pthread_kill(*reinterpret_cast<const pthread_t*>(&threadId), reservedSignal); int error = pthread_kill(*reinterpret_cast<const pthread_t*>(&threadId), reservedSignal);
if (error != 0) { if (error != 0) {
KJ_FAIL_SYSCALL("pthread_kill", error); KJ_FAIL_SYSCALL("pthread_kill", error);
} }
#endif
} }
#endif // KJ_USE_EPOLL, else #endif // KJ_USE_EPOLL, else
......
...@@ -40,6 +40,14 @@ ...@@ -40,6 +40,14 @@
#define KJ_USE_EPOLL 1 #define KJ_USE_EPOLL 1
#endif #endif
#if __CYGWIN__ && !defined(KJ_USE_PIPE_FOR_WAKEUP)
// Cygwin has serious issues with the intersection of signals and threads, reported here:
// https://cygwin.com/ml/cygwin/2019-07/msg00052.html
// On Cygwin, therefore, we do not use signals to wake threads. Instead, each thread allocates a
// pipe, and we write a byte to the pipe to wake the thread... ick.
#define KJ_USE_PIPE_FOR_WAKEUP 1
#endif
namespace kj { namespace kj {
class UnixEventPort: public EventPort { class UnixEventPort: public EventPort {
...@@ -169,7 +177,12 @@ private: ...@@ -169,7 +177,12 @@ private:
FdObserver* observersHead = nullptr; FdObserver* observersHead = nullptr;
FdObserver** observersTail = &observersHead; FdObserver** observersTail = &observersHead;
#if KJ_USE_PIPE_FOR_WAKEUP
AutoCloseFd wakePipeIn;
AutoCloseFd wakePipeOut;
#else
unsigned long long threadId; // actually pthread_t unsigned long long threadId; // actually pthread_t
#endif
#endif #endif
struct ChildSet; struct ChildSet;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment