Commit b8b51f58 authored by Kenton Varda's avatar Kenton Varda

Extend UnixEventPort with the ability to listen for subprocess exit.

(Later, we should add a nice subprocess API around this, maybe even one that can work on Windows too...)
parent d54b005b
......@@ -34,6 +34,7 @@
#include <kj/compat/gtest.h>
#include <pthread.h>
#include <algorithm>
#include <sys/wait.h>
namespace kj {
namespace {
......@@ -57,6 +58,8 @@ void captureSignals() {
// use SIGUSR1 because it is reserved by UnixEventPort and SIGUSR2 is used by Valgrind on OSX.
UnixEventPort::captureSignal(SIGURG);
UnixEventPort::captureSignal(SIGIO);
UnixEventPort::captureChildExit();
}
}
......@@ -602,6 +605,94 @@ TEST(AsyncUnixTest, Wake) {
EXPECT_TRUE(port.wait());
}
int exitCodeForSignal = 0;
void exitSignalHandler(int) {
_exit(exitCodeForSignal);
}
struct TestChild {
kj::Maybe<pid_t> pid;
kj::Promise<int> promise = nullptr;
TestChild(UnixEventPort& port, int exitCode) {
pid_t p;
KJ_SYSCALL(p = fork());
if (p == 0) {
// Arrange for SIGTERM to cause the process to exit normally.
exitCodeForSignal = exitCode;
signal(SIGTERM, &exitSignalHandler);
sigset_t sigs;
sigemptyset(&sigs);
sigaddset(&sigs, SIGTERM);
sigprocmask(SIG_UNBLOCK, &sigs, nullptr);
for (;;) pause();
}
pid = p;
promise = port.onChildExit(pid);
}
~TestChild() noexcept(false) {
KJ_IF_MAYBE(p, pid) {
KJ_SYSCALL(::kill(*p, SIGKILL)) { return; }
int status;
KJ_SYSCALL(waitpid(*p, &status, 0)) { return; }
}
}
void kill(int signo) {
KJ_SYSCALL(::kill(KJ_REQUIRE_NONNULL(pid), signo));
}
KJ_DISALLOW_COPY(TestChild);
};
TEST(AsyncUnixTest, ChildProcess) {
captureSignals();
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
// Block SIGTERM so that we can carefully un-block it in children.
sigset_t sigs, oldsigs;
KJ_SYSCALL(sigemptyset(&sigs));
KJ_SYSCALL(sigaddset(&sigs, SIGTERM));
KJ_SYSCALL(sigprocmask(SIG_BLOCK, &sigs, &oldsigs));
KJ_DEFER(KJ_SYSCALL(sigprocmask(SIG_SETMASK, &oldsigs, nullptr)) { break; });
TestChild child1(port, 123);
TestChild child2(port, 234);
TestChild child3(port, 345);
KJ_EXPECT(!child1.promise.poll(waitScope));
KJ_EXPECT(!child2.promise.poll(waitScope));
KJ_EXPECT(!child3.promise.poll(waitScope));
child1.kill(SIGTERM);
{
int status = child1.promise.wait(waitScope);
KJ_EXPECT(WIFEXITED(status));
KJ_EXPECT(WEXITSTATUS(status) == 123);
}
KJ_EXPECT(!child2.promise.poll(waitScope));
KJ_EXPECT(!child3.promise.poll(waitScope));
child2.kill(SIGKILL);
{
int status = child2.promise.wait(waitScope);
KJ_EXPECT(!WIFEXITED(status));
KJ_EXPECT(WIFSIGNALED(status));
KJ_EXPECT(WTERMSIG(status) == SIGKILL);
}
KJ_EXPECT(!child3.promise.poll(waitScope));
// child3 will be killed and synchronously waited on the way out.
}
} // namespace
} // namespace kj
......
......@@ -30,6 +30,8 @@
#include <limits>
#include <chrono>
#include <pthread.h>
#include <map>
#include <sys/wait.h>
#if KJ_USE_EPOLL
#include <unistd.h>
......@@ -57,6 +59,8 @@ namespace {
int reservedSignal = SIGUSR1;
bool tooLateToSetReserved = false;
bool capturedChildExit = false;
bool threadClaimedChildExits = false;
struct SignalCapture {
sigjmp_buf jumpTo;
......@@ -109,6 +113,73 @@ pthread_once_t registerReservedSignalOnce = PTHREAD_ONCE_INIT;
} // namespace
struct UnixEventPort::ChildSet {
std::map<pid_t, ChildExitPromiseAdapter*> waiters;
void checkExits();
};
class UnixEventPort::ChildExitPromiseAdapter {
public:
inline ChildExitPromiseAdapter(PromiseFulfiller<int>& fulfiller,
ChildSet& childSet, Maybe<pid_t>& pidRef)
: childSet(childSet),
pid(KJ_REQUIRE_NONNULL(pidRef,
"`pid` must be non-null at the time `onChildExit()` is called")),
pidRef(pidRef), fulfiller(fulfiller) {
KJ_REQUIRE(childSet.waiters.insert(std::make_pair(pid, this)).second,
"already called onChildExit() for this pid");
}
~ChildExitPromiseAdapter() noexcept(false) {
childSet.waiters.erase(pid);
}
ChildSet& childSet;
pid_t pid;
Maybe<pid_t>& pidRef;
PromiseFulfiller<int>& fulfiller;
};
void UnixEventPort::ChildSet::checkExits() {
for (;;) {
int status;
pid_t pid;
KJ_SYSCALL(pid = waitpid(-1, &status, WNOHANG));
if (pid == 0) break;
auto iter = waiters.find(pid);
if (iter != waiters.end()) {
iter->second->pidRef = nullptr;
iter->second->fulfiller.fulfill(kj::cp(status));
}
}
}
Promise<int> UnixEventPort::onChildExit(Maybe<pid_t>& pid) {
ChildSet* cs;
KJ_IF_MAYBE(c, childSet) {
cs = *c;
} else {
// In theory we should do an atomic compare-and-swap on threadClaimedChildExits, but this is
// for debug purposes only so it's not a big deal.
KJ_REQUIRE(!threadClaimedChildExits,
"only one UnixEvertPort per process may listen for child exits");
threadClaimedChildExits = true;
auto newChildSet = kj::heap<ChildSet>();
cs = newChildSet;
childSet = kj::mv(newChildSet);
}
return kj::newAdaptedPromise<int, ChildExitPromiseAdapter>(*cs, pid);
}
void UnixEventPort::captureChildExit() {
captureSignal(SIGCHLD);
capturedChildExit = true;
}
class UnixEventPort::SignalPromiseAdapter {
public:
inline SignalPromiseAdapter(PromiseFulfiller<siginfo_t>& fulfiller,
......@@ -151,6 +222,8 @@ public:
};
Promise<siginfo_t> UnixEventPort::onSignal(int signum) {
KJ_REQUIRE(signum != SIGCHLD || !capturedChildExit,
"can't call onSigal(SIGCHLD) when kj::UnixEventPort::captureChildExit() has been called");
return newAdaptedPromise<siginfo_t, SignalPromiseAdapter>(*this, signum);
}
......@@ -178,6 +251,14 @@ void UnixEventPort::setReservedSignal(int signum) {
}
void UnixEventPort::gotSignal(const siginfo_t& siginfo) {
// If onChildExit() has been called and this is SIGCHLD, check for child exits.
KJ_IF_MAYBE(cs, childSet) {
if (siginfo.si_signo == SIGCHLD) {
cs->get()->checkExits();
return;
}
}
// Fire any events waiting on this signal.
auto ptr = signalHead;
while (ptr != nullptr) {
......@@ -222,7 +303,12 @@ UnixEventPort::UnixEventPort()
KJ_SYSCALL(epoll_ctl(epollFd, EPOLL_CTL_ADD, eventFd, &event));
}
UnixEventPort::~UnixEventPort() noexcept(false) {}
UnixEventPort::~UnixEventPort() noexcept(false) {
if (childSet != nullptr) {
// We had claimed the exclusive right to call onChildExit(). Release that right.
threadClaimedChildExits = false;
}
}
UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint flags)
: eventPort(eventPort), fd(fd), flags(flags) {
......@@ -440,6 +526,9 @@ bool UnixEventPort::doEpollWait(int timeout) {
sigaddset(&newMask, ptr->signum);
ptr = ptr->next;
}
if (childSet != nullptr) {
sigaddset(&newMask, SIGCHLD);
}
}
if (memcmp(&newMask, &signalFdSigset, sizeof(newMask)) != 0) {
......@@ -674,6 +763,9 @@ bool UnixEventPort::wait() {
sigaddset(&newMask, ptr->signum);
ptr = ptr->next;
}
if (childSet != nullptr) {
sigaddset(&newMask, SIGCHLD);
}
}
PollContext pollContext(observersHead);
......
......@@ -99,15 +99,49 @@ public:
Timer& getTimer() { return timerImpl; }
Promise<int> onChildExit(Maybe<pid_t>& pid);
// When the given child process exits, resolves to its wait status, as returned by wait(2). You
// will need to use the WIFEXITED() etc. macros to interpret the status code.
//
// You must call onChildExit() immediately after the child is created, before returning to the
// event loop. Otherwise, you may miss the child exit event.
//
// `pid` is a reference to a Maybe<pid_t> which must be non-null at the time of the call. When
// wait() is invoked (and indicates this pid has finished), `pid` will be nulled out. This is
// necessary to avoid a race condition: as soon as the child has been wait()ed, the PID table
// entry is freed and can then be reused. So, if you ever want safely to call `kill()` on the
// PID, it's necessary to know whether it has been wait()ed already. Since the promise's
// .then() continuation may not run immediately, we need a more precise way, hence we null out
// the Maybe.
//
// You must call `kj::UnixEventPort::captureChildExit()` early in your program if you want to use
// `onChildExit()`.
//
// WARNING: Only one UnixEventPort per process is allowed to use onChildExit(). This is because
// child exit is signaled to the process via SIGCHLD, and Unix does not allow the program to
// control which thread receives the signal. (We may fix this in the future by automatically
// coordinating between threads when multiple threads are expecting child exits.)
// WARNING 2: If any UnixEventPort in the process is currently waiting for onChildExit(), then
// *only* that port's thread can safely wait on child processes, even synchronously. This is
// because the thread which used onChildExit() uses wait() to reap children, without specifying
// which child, and therefore it may inadvertently reap children created by other threads.
static void captureChildExit();
// Arranges for child process exit to be captured and handled via UnixEventPort, so than you may
// call `onChildExit()`. Much like `captureSignal()`, this static method must be called early on
// in program startup.
//
// This method may capture the `SIGCHLD` signal. You must not use `captureSignal(SIGCHLD)` nor
// `onSignal(SIGCHLD)` in your own code if you use `captureChildExit()`.
// implements EventPort ------------------------------------------------------
bool wait() override;
bool poll() override;
void wake() const override;
private:
struct TimerSet; // Defined in source file to avoid STL include.
class TimerPromiseAdapter;
class SignalPromiseAdapter;
class ChildExitPromiseAdapter;
TimerImpl timerImpl;
......@@ -138,6 +172,9 @@ private:
unsigned long long threadId; // actually pthread_t
#endif
struct ChildSet;
Maybe<Own<ChildSet>> childSet;
};
class UnixEventPort::FdObserver {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment