Commit c84d57a3 authored by Kenton Varda's avatar Kenton Varda

Fix handling of queued RT signals.

For regular (non-RT) POSIX signals, the process can only have at most one instance of each signal queued for delivery at a time. If another copy of the signal arrives before the first is delivered, the new signal is ignored. The idea was that signals are only meant to wake the process up to check some input; the signal itself is not the input.

POSIX RT signals are different. Multiple copies of the same signal can be queued, and each is delivered separately. Each signal may contain some additional information that needs to be processed. The signals themselves are input.

UnixEventPort's `onSignal()` method returns a Promise that resolves the next time the signal is delivered. When the Promise is resolved, the signal is also supposed to be blocked until `onSignal()` can be called again, so that the app cannot miss signals delivered in between.

However, the epoll/signalfd implementation had a bug where it would pull _all_ queued signals off the `signalfd` at once, only delivering the first instance of each signal number and dropping subsequent instances on the floor. That's fine for regular signals, but not RT signals.

This change fixes the bug and adds a test. Incidentally, the poll()-based implementation has been correct all along.
parent 44c6b461
...@@ -62,6 +62,10 @@ void captureSignals() { ...@@ -62,6 +62,10 @@ void captureSignals() {
UnixEventPort::captureSignal(SIGURG); UnixEventPort::captureSignal(SIGURG);
UnixEventPort::captureSignal(SIGIO); UnixEventPort::captureSignal(SIGIO);
#ifdef SIGRTMIN
UnixEventPort::captureSignal(SIGRTMIN);
#endif
UnixEventPort::captureChildExit(); UnixEventPort::captureChildExit();
} }
} }
...@@ -878,6 +882,45 @@ KJ_TEST("UnixEventPort poll for signals") { ...@@ -878,6 +882,45 @@ KJ_TEST("UnixEventPort poll for signals") {
promise2.wait(waitScope); promise2.wait(waitScope);
} }
#ifdef SIGRTMIN
void testRtSignals(UnixEventPort& port, WaitScope& waitScope, bool doPoll) {
union sigval value;
memset(&value, 0, sizeof(value));
// Queue three copies of the signal upfront.
for (uint i = 0; i < 3; i++) {
value.sival_int = 123 + i;
KJ_SYSCALL(sigqueue(getpid(), SIGRTMIN, value));
}
// Now wait for them.
for (uint i = 0; i < 3; i++) {
auto promise = port.onSignal(SIGRTMIN);
if (doPoll) {
KJ_ASSERT(promise.poll(waitScope));
}
auto info = promise.wait(waitScope);
KJ_EXPECT(info.si_value.sival_int == 123 + i);
}
KJ_EXPECT(!port.onSignal(SIGRTMIN).poll(waitScope));
}
KJ_TEST("UnixEventPort can receive multiple queued instances of an RT signal") {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
testRtSignals(port, waitScope, true);
// Test again, but don't poll() the promises. This may test a different code path, if poll() and
// wait() are very different in how they read signals. (For the poll(2)-based implementation of
// UnixEventPort, they are indeed pretty different.)
testRtSignals(port, waitScope, false);
}
#endif
} // namespace } // namespace
} // namespace kj } // namespace kj
......
...@@ -601,6 +601,19 @@ bool UnixEventPort::doEpollWait(int timeout) { ...@@ -601,6 +601,19 @@ bool UnixEventPort::doEpollWait(int timeout) {
KJ_ASSERT(n == sizeof(siginfo)); KJ_ASSERT(n == sizeof(siginfo));
gotSignal(toRegularSiginfo(siginfo)); gotSignal(toRegularSiginfo(siginfo));
#ifdef SIGRTMIN
if (siginfo.ssi_signo >= SIGRTMIN) {
// This is an RT signal. There could be multiple copies queued. We need to remove it from
// the signalfd's signal mask before we continue, to avoid accidentally reading and
// discarding the extra copies.
// TODO(perf): If high throughput of RT signals is desired then perhaps we should read
// them all into userspace and queue them here. Maybe we even need a better interface
// than onSignal() for receiving high-volume RT signals.
KJ_SYSCALL(sigdelset(&signalFdSigset, siginfo.ssi_signo));
KJ_SYSCALL(signalfd(signalFd, &signalFdSigset, SFD_NONBLOCK | SFD_CLOEXEC));
}
#endif
} }
} else if (events[i].data.u64 == 1) { } else if (events[i].data.u64 == 1) {
// Someone called wake() from another thread. Consume the event. // Someone called wake() from another thread. Consume the event.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment