Commit a0e2a45f authored by Kenton Varda's avatar Kenton Varda

UnixEventLoop supports polling fds and catching signals.

parent e7d27780
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "async-unix.h"
#include "thread.h"
#include "debug.h"
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <gtest/gtest.h>
namespace kj {
inline void delay() { usleep(10000); }
class DummyErrorHandler: public TaskSet::ErrorHandler {
public:
void taskFailed(kj::Exception&& exception) override {
kj::throwRecoverableException(kj::mv(exception));
}
};
class AsyncUnixTest: public testing::Test {
public:
static void SetUpTestCase() {
UnixEventLoop::captureSignal(SIGUSR2);
UnixEventLoop::captureSignal(SIGIO);
}
};
TEST_F(AsyncUnixTest, Signals) {
UnixEventLoop loop;
union sigval value;
value.sival_int = 123;
sigqueue(getpid(), SIGUSR2, value);
siginfo_t info = loop.wait(loop.onSignal(SIGUSR2));
EXPECT_EQ(SIGUSR2, info.si_signo);
EXPECT_EQ(SI_QUEUE, info.si_code);
EXPECT_EQ(123, info.si_value.sival_int);
}
TEST_F(AsyncUnixTest, SignalsMulti) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
TaskSet tasks(loop, dummyHandler);
tasks.add(loop.onSignal(SIGIO).thenInAnyThread([](siginfo_t&&) {
ADD_FAILURE() << "Received wrong signal.";
}));
union sigval value;
value.sival_int = 123;
sigqueue(getpid(), SIGUSR2, value);
siginfo_t info = loop.wait(loop.onSignal(SIGUSR2));
EXPECT_EQ(SIGUSR2, info.si_signo);
EXPECT_EQ(SI_QUEUE, info.si_code);
EXPECT_EQ(123, info.si_value.sival_int);
}
TEST_F(AsyncUnixTest, SignalsAsync) {
// Arrange for another thread to wait on a UnixEventLoop...
auto exitThread = newPromiseAndFulfiller<void>();
UnixEventLoop unixLoop;
Thread thread([&]() {
unixLoop.wait(kj::mv(exitThread.promise));
});
KJ_DEFER(exitThread.fulfiller->fulfill());
// Arrange to catch a signal in the other thread. But we haven't sent one yet.
bool received = false;
Promise<void> promise = unixLoop.there(unixLoop.onSignal(SIGUSR2),
[&](siginfo_t&& info) {
received = true;
EXPECT_EQ(SIGUSR2, info.si_signo);
EXPECT_EQ(SI_QUEUE, info.si_code);
EXPECT_EQ(123, info.si_value.sival_int);
});
delay();
EXPECT_FALSE(received);
union sigval value;
value.sival_int = 123;
sigqueue(getpid(), SIGUSR2, value);
SimpleEventLoop mainLoop;
mainLoop.wait(kj::mv(promise));
EXPECT_TRUE(received);
}
TEST_F(AsyncUnixTest, Poll) {
UnixEventLoop loop;
int pipefds[2];
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
KJ_SYSCALL(pipe(pipefds));
KJ_SYSCALL(write(pipefds[1], "foo", 3));
EXPECT_EQ(POLLIN, loop.wait(loop.onFdEvent(pipefds[0], POLLIN | POLLPRI)));
}
TEST_F(AsyncUnixTest, PollMulti) {
UnixEventLoop loop;
DummyErrorHandler dummyHandler;
int bogusPipefds[2];
KJ_SYSCALL(pipe(bogusPipefds));
KJ_DEFER({ close(bogusPipefds[1]); close(bogusPipefds[0]); });
TaskSet tasks(loop, dummyHandler);
tasks.add(loop.onFdEvent(bogusPipefds[0], POLLIN | POLLPRI).thenInAnyThread([](short s) {
KJ_DBG(s);
ADD_FAILURE() << "Received wrong poll.";
}));
int pipefds[2];
KJ_SYSCALL(pipe(pipefds));
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
KJ_SYSCALL(write(pipefds[1], "foo", 3));
EXPECT_EQ(POLLIN, loop.wait(loop.onFdEvent(pipefds[0], POLLIN | POLLPRI)));
}
TEST_F(AsyncUnixTest, PollAsync) {
// Arrange for another thread to wait on a UnixEventLoop...
auto exitThread = newPromiseAndFulfiller<void>();
UnixEventLoop unixLoop;
Thread thread([&]() {
unixLoop.wait(kj::mv(exitThread.promise));
});
KJ_DEFER(exitThread.fulfiller->fulfill());
// Make a pipe and wait on its read end in another thread. But don't write to it yet.
int pipefds[2];
KJ_DEFER({ close(pipefds[1]); close(pipefds[0]); });
KJ_SYSCALL(pipe(pipefds));
bool received = false;
Promise<void> promise = unixLoop.there(unixLoop.onFdEvent(pipefds[0], POLLIN | POLLPRI),
[&](short events) {
received = true;
EXPECT_EQ(POLLIN, events);
});
delay();
EXPECT_FALSE(received);
KJ_SYSCALL(write(pipefds[1], "foo", 3));
SimpleEventLoop mainLoop;
mainLoop.wait(kj::mv(promise));
EXPECT_TRUE(received);
}
} // namespace kj
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "async-unix.h"
#include "debug.h"
#include <setjmp.h>
#include <errno.h>
#include <limits>
namespace kj {
class NewWorkCallback {
public:
virtual void receivedNewWork() const = 0;
};
template <typename Item>
class WorkQueue {
// Item has two methods: fire() and cancel(). Exactly one of these methods will be called,
// exactly once. cancel() is called if the application discards the Own<Item> returned by add()
// before fire() gets a chance to be called.
//
// TODO(cleanup): Make this reusable, use it in EventLoop and TaskSet. The terminology, e.g.
// "item", could be improved.
public:
class ItemWrapper final: private Disposer {
public:
const Item& get() const {
return item;
}
Maybe<const ItemWrapper&> getNext() const {
// Get the next item in the queue. Returns nullptr if there are no more items.
ItemWrapper* nextCopy = __atomic_load_n(&next, __ATOMIC_ACQUIRE);
if (nextCopy == nullptr) {
return nullptr;
} else {
return nextCopy->skipDead();
}
}
template <typename... Params>
void fire(Params&&... params) const {
// If the listener has not yet dropped its pointer to the Item, invokes the item's `fire()`
// method with the given parameters.
doOnce([&]() {
const_cast<Item&>(item).fire(kj::fwd<Params>(params)...);
});
}
private:
Item item;
mutable _::Once once;
uint refcount = 2;
ItemWrapper* next = nullptr;
// The ItemWrapper cannot be destroyed until this pointer becomes non-null.
friend class WorkQueue;
template <typename... Params>
ItemWrapper(Params&&... params): item(kj::fwd<Params>(params)...) {}
void disposeImpl(void* pointer) const override {
// Called when the listener discards its Own<Item>.
//
// If a fire is in-progress in another thread, blocks until it completes.
//
// Once both fire() and done() have been called, the item wrapper is deleted.
//
// `drop()` is also responsible for calling the item's destructor.
if (!once.isInitialized()) {
doOnce([this]() { const_cast<Item&>(item).cancel(); });
}
removeRef();
}
void removeRef() const {
if (__atomic_sub_fetch(&refcount, 1, __ATOMIC_ACQ_REL) == 0) {
delete this;
}
}
template <typename Func>
void doOnce(Func&& func) const {
// Execute the given function once.
//
// TODO(cleanup): Perhaps there should be an easier way to do this, without the extra
// overhead of Lazy<T>.
class InitializerImpl: public _::Once::Initializer {
public:
InitializerImpl(Func& func): func(func) {}
void run() override {
func();
}
private:
Func& func;
};
InitializerImpl init(func);
once.runOnce(init);
}
Maybe<const ItemWrapper&> skipDead() const {
// Find the first ItemWrapper in the list (starting from this one) which isn't already
// fired.
const ItemWrapper* current = this;
while (current->once.isInitialized()) {
current = __atomic_load_n(&current->next, __ATOMIC_ACQUIRE);
if (current == nullptr) {
return nullptr;
}
}
return *current;
}
};
Maybe<const ItemWrapper&> peek(kj::Maybe<NewWorkCallback&> callback) {
// Get the first item in the list, or null if the list is empty.
//
// `callback` will be invoked in the future, if and when new work is added, to let the caller
// know that it's necessary to peek again. The callback will be called at most once. False
// alarms are possible; you must actually peek() again to find out if there is new work. If
// you are sure that you don't need to be notified, pass null for the callback.
KJ_IF_MAYBE(c, callback) {
__atomic_store_n(&newWorkCallback, c, __ATOMIC_RELEASE);
} else {
__atomic_store_n(&newWorkCallback, nullptr, __ATOMIC_RELEASE);
}
ItemWrapper* headCopy = __atomic_load_n(&head, __ATOMIC_ACQUIRE);
if (headCopy == nullptr) {
return nullptr;
} else {
return headCopy->skipDead();
}
}
void cleanup(uint count = std::numeric_limits<uint>::max()) {
// Goes through the list (up to a max of `count` items) and removes any that are no longer
// relevant.
ItemWrapper** ptr = &head;
while (count-- > 0) {
ItemWrapper* item = __atomic_load_n(ptr, __ATOMIC_ACQUIRE);
if (item == nullptr) {
return;
} else if (item->once.isInitialized()) {
// This item is no longer useful to us.
ItemWrapper* next = __atomic_load_n(&item->next, __ATOMIC_ACQUIRE);
if (next == nullptr) {
// We can't delete this item yet because another thread may be modifying its `next`
// pointer. We're at the end of the list, so we might as well return.
return;
} else {
// Unlink this item from the list.
// Note that since *ptr was already determined to be non-null, it now belongs to this
// thread, therefore we don't need to modify it atomically.
*ptr = next;
// If the tail pointer points to this item's next pointer (because a race in add() left
// it behind), be sure to get it updated. (This is unusual.)
if (__atomic_load_n(&tail, __ATOMIC_RELAXED) == &item->next) {
__atomic_store_n(&tail, &next->next, __ATOMIC_RELAXED);
}
// Now we can remove our ref to the item. This may delete it.
item->removeRef();
}
} else {
// Move on to the next item.
ptr = &item->next;
}
}
}
template <typename... Params>
Own<const Item> add(Params&&... params) const {
// Adds an item to the list, passing the given parameters to its constructor.
ItemWrapper* item = new ItemWrapper(kj::fwd<Params>(params)...);
Own<const Item> result(&item->item, *item);
ItemWrapper** tailCopy = __atomic_load_n(&tail, __ATOMIC_ACQUIRE);
ItemWrapper* expected = nullptr;
while (!__atomic_compare_exchange_n(tailCopy, &expected, item, false,
__ATOMIC_RELEASE, __ATOMIC_ACQUIRE)) {
// Oops, the tail pointer was out-of-date. Follow it looking for the real tail.
tailCopy = &expected->next;
expected = nullptr;
}
// Update tail to point at the end. Note that this is sloppy: it's possible that another
// thread has added another item concurrently and we're now moving the tail pointer backwards,
// but that's OK because we'll correct for it being behind the next time we use it.
__atomic_store_n(&tail, &item->next, __ATOMIC_RELEASE);
if (NewWorkCallback* callback = __atomic_load_n(&newWorkCallback, __ATOMIC_ACQUIRE)) {
__atomic_store_n(&newWorkCallback, nullptr, __ATOMIC_RELAXED);
callback->receivedNewWork();
}
return kj::mv(result);
}
private:
ItemWrapper* head = nullptr;
// Pointer to the first item.
mutable ItemWrapper** tail = &head;
// Usually points to the last `next` pointer in the chain (which should be null, since it's the
// last one). However, because `tail` cannot be atomically updated at the same time that `*tail`
// becomes non-null, `tail` may be behind by a step or two. In fact, we are sloppy about
// updating this pointer, so there's a chance it will remain behind indefinitely (if two threads
// adding items at the same time race to update `tail`), but it should not be too far behind.
NewWorkCallback* newWorkCallback = nullptr;
};
// =======================================================================================
namespace {
struct SignalCapture {
sigjmp_buf jumpTo;
siginfo_t siginfo;
};
__thread SignalCapture* threadCapture = nullptr;
void signalHandler(int, siginfo_t* siginfo, void*) {
SignalCapture* capture = threadCapture;
if (capture != nullptr) {
capture->siginfo = *siginfo;
siglongjmp(capture->jumpTo, 1);
}
}
void registerSignalHandler(int signum) {
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, signum);
sigprocmask(SIG_BLOCK, &mask, nullptr);
struct sigaction action;
memset(&action, 0, sizeof(action));
action.sa_sigaction = &signalHandler;
sigfillset(&action.sa_mask);
action.sa_flags = SA_SIGINFO;
sigaction(signum, &action, nullptr);
}
void registerSigusr1() {
registerSignalHandler(SIGUSR1);
}
pthread_once_t registerSigusr1Once = PTHREAD_ONCE_INIT;
} // namespace
// =======================================================================================
struct UnixEventLoop::Impl final: public NewWorkCallback {
Impl(UnixEventLoop& loop): loop(loop) {}
UnixEventLoop& loop;
WorkQueue<PollItem> pollQueue;
WorkQueue<SignalItem> signalQueue;
void receivedNewWork() const override {
loop.wake();
}
};
class UnixEventLoop::SignalItem {
public:
inline SignalItem(PromiseFulfiller<siginfo_t>& fulfiller, int signum)
: fulfiller(fulfiller), signum(signum) {}
inline void cancel() {}
void fire(const siginfo_t& siginfo) {
fulfiller.fulfill(kj::cp(siginfo));
}
inline int getSignum() const { return signum; }
private:
PromiseFulfiller<siginfo_t>& fulfiller;
int signum;
};
class UnixEventLoop::SignalPromiseAdapter {
public:
inline SignalPromiseAdapter(PromiseFulfiller<siginfo_t>& fulfiller,
const WorkQueue<SignalItem>& signalQueue,
int signum)
: item(signalQueue.add(fulfiller, signum)) {}
private:
Own<const SignalItem> item;
};
class UnixEventLoop::PollItem {
public:
inline PollItem(PromiseFulfiller<short>& fulfiller, int fd, short eventMask)
: fulfiller(fulfiller), fd(fd), eventMask(eventMask) {}
inline void cancel() {}
void fire(const struct pollfd& pollfd) {
fulfiller.fulfill(kj::cp(pollfd.revents));
}
inline void prepare(struct pollfd& pollfd) const {
pollfd.fd = fd;
pollfd.events = eventMask;
pollfd.revents = 0;
}
private:
PromiseFulfiller<short>& fulfiller;
int fd;
short eventMask;
};
class UnixEventLoop::PollPromiseAdapter {
public:
inline PollPromiseAdapter(PromiseFulfiller<short>& fulfiller,
const WorkQueue<PollItem>& pollQueue,
int fd, short eventMask)
: item(pollQueue.add(fulfiller, fd, eventMask)) {}
private:
Own<const PollItem> item;
};
UnixEventLoop::UnixEventLoop(): impl(heap<Impl>(*this)) {
pthread_once(&registerSigusr1Once, &registerSigusr1);
}
UnixEventLoop::~UnixEventLoop() {
}
Promise<short> UnixEventLoop::onFdEvent(int fd, short eventMask) const {
return newAdaptedPromise<short, PollPromiseAdapter>(impl->pollQueue, fd, eventMask);
}
Promise<siginfo_t> UnixEventLoop::onSignal(int signum) const {
return newAdaptedPromise<siginfo_t, SignalPromiseAdapter>(impl->signalQueue, signum);
}
void UnixEventLoop::captureSignal(int signum) {
KJ_REQUIRE(signum != SIGUSR1, "Sorry, SIGUSR1 is reserved by the UnixEventLoop implementation.");
registerSignalHandler(signum);
}
void UnixEventLoop::prepareToSleep() noexcept {
waitThread = pthread_self();
__atomic_store_n(&isSleeping, true, __ATOMIC_RELEASE);
}
void UnixEventLoop::sleep() {
SignalCapture capture;
threadCapture = &capture;
if (sigsetjmp(capture.jumpTo, true)) {
// We received a signal and longjmp'd back out of the signal handler.
threadCapture = nullptr;
__atomic_store_n(&isSleeping, false, __ATOMIC_RELAXED);
if (capture.siginfo.si_signo != SIGUSR1) {
// Fire any events waiting on this signal.
auto item = impl->signalQueue.peek(nullptr);
for (;;) {
KJ_IF_MAYBE(i, item) {
if (i->get().getSignum() == capture.siginfo.si_signo) {
i->fire(capture.siginfo);
}
item = i->getNext();
} else {
break;
}
}
}
return;
}
// Make sure we don't wait for any events that are no longer relevant, either because we fired
// them last time around or because the application has discarded the corresponding promises.
impl->signalQueue.cleanup();
impl->pollQueue.cleanup();
sigset_t newMask;
sigemptyset(&newMask);
sigaddset(&newMask, SIGUSR1);
{
auto item = impl->signalQueue.peek(*impl);
for (;;) {
KJ_IF_MAYBE(i, item) {
sigaddset(&newMask, i->get().getSignum());
item = i->getNext();
} else {
break;
}
}
}
kj::Vector<struct pollfd> pollfds;
kj::Vector<const WorkQueue<PollItem>::ItemWrapper*> pollItems;
{
auto item = impl->pollQueue.peek(*impl);
for (;;) {
KJ_IF_MAYBE(i, item) {
struct pollfd pollfd;
memset(&pollfd, 0, sizeof(pollfd));
i->get().prepare(pollfd);
pollfds.add(pollfd);
pollItems.add(i);
item = i->getNext();
} else {
break;
}
}
}
sigset_t origMask;
sigprocmask(SIG_UNBLOCK, &newMask, &origMask);
int pollResult = poll(pollfds.begin(), pollfds.size(), -1);
int error = pollResult < 0 ? errno : 0;
sigprocmask(SIG_SETMASK, &origMask, nullptr);
threadCapture = nullptr;
__atomic_store_n(&isSleeping, false, __ATOMIC_RELAXED);
if (pollResult < 0) {
KJ_FAIL_SYSCALL("poll()", error);
}
for (auto i: indices(pollfds)) {
if (pollfds[i].revents != 0) {
pollItems[i]->fire(pollfds[i]);
if (--pollResult <= 0) {
break;
}
}
}
}
void UnixEventLoop::wake() const {
// The first load is a fast-path check -- if false, we can avoid a barrier. If true, then we
// follow up with an exchange to set it false. If it turns out we were in fact the one thread
// to transition the value from true to false, then we go ahead and raise SIGUSR1 on the target
// thread to wake it up.
if (__atomic_load_n(&isSleeping, __ATOMIC_RELAXED) &&
__atomic_exchange_n(&isSleeping, false, __ATOMIC_ACQUIRE)) {
pthread_kill(waitThread, SIGUSR1);
}
}
} // namespace kj
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef KJ_ASYNC_UNIX_H_
#define KJ_ASYNC_UNIX_H_
#include "async.h"
#include "vector.h"
#include <signal.h>
#include <poll.h>
#include <pthread.h>
namespace kj {
class UnixEventLoop: public EventLoop {
// An EventLoop implementation which can wait for events on file descriptors as well as signals.
// This API only makes sense on Unix.
//
// The implementation uses `poll()` or possibly a platform-specific API (e.g. epoll, kqueue).
// To also wait on signals without race conditions, the implementation may block signals until
// just before `poll()` while using a signal handler which `siglongjmp()`s back to just before
// the signal was unblocked, or it may use a nicer platform-specific API like signalfd.
//
// The implementation uses SIGUSR1. The application must avoid using this signal for its own
// purposes.
public:
UnixEventLoop();
~UnixEventLoop();
Promise<short> onFdEvent(int fd, short eventMask) const;
// `eventMask` is a bitwise-OR of poll events (e.g. `POLLIN`, `POLLOUT`, etc.). The next time
// one or more of the given events occurs on `fd`, the set of events that occurred are returned.
//
// The result of waiting on the same FD twice at once is undefined.
Promise<siginfo_t> onSignal(int signum) const;
// When the given signal is delivered to this thread, return the corresponding siginfo_t.
// The signal must have been captured using `captureSignal()`.
//
// If `onSignal()` has not been called, the signal will remain blocked in this thread.
// Therefore, a signal which arrives before `onSignal()` was called will not be "missed" -- the
// next call to 'onSignal()' will receive it. Also, you can control which thread receives a
// process-wide signal by only calling `onSignal()` on that thread's event loop.
//
// The result of waiting on the same signal twice at once is undefined.
static void captureSignal(int signum);
// Arranges for the given signal to be captured and handled via UnixEventLoop, so that you may
// then pass it to `onSignal()`. This method is static because it registers a signal handler
// which applies process-wide. If any other threads exist in the process when `captureSignal()`
// is called, you *must* set the signal mask in those threads to block this signal, otherwise
// terrible things will happen if the signal happens to be delivered to those threads. If at
// all possible, call `captureSignal()` *before* creating threads, so that threads you create in
// the future will inherit the proper signal mask.
//
// To un-capture a signal, simply install a different signal handler and then un-block it from
// the signal mask.
protected:
void prepareToSleep() noexcept override;
void sleep() override;
void wake() const override;
private:
class PollItem;
class PollPromiseAdapter;
class SignalItem;
class SignalPromiseAdapter;
struct Impl;
Own<Impl> impl;
pthread_t waitThread;
bool isSleeping = false;
};
} // namespace kj
#endif // KJ_ASYNC_UNIX_H_
......@@ -1481,8 +1481,8 @@ private:
template <typename T, typename Adapter, typename... Params>
Promise<T> newAdaptedPromise(Params&&... adapterConstructorParams) {
return Promise<T>(Own<_::PromiseNode>(heap<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>(
kj::fwd<Params>(adapterConstructorParams)...)));
return Promise<T>(false, heap<_::AdapterPromiseNode<_::FixVoid<T>, Adapter>>(
kj::fwd<Params>(adapterConstructorParams)...));
}
template <typename T>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment