Commit 36bbb49c authored by Kenton Varda's avatar Kenton Varda

Refactor: Remove NullEventPort in favor of a maybe'd EventPort.

This will be important in subsequent commits adding cross-thread events to EventLoop. We'll only want wait() to throw if we know no cross-thread events will ever come, but the EventPort won't know that, only the EventLoop will.
parent f81a83ac
...@@ -268,25 +268,6 @@ public: ...@@ -268,25 +268,6 @@ public:
LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler(); LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler();
class NullEventPort: public EventPort {
public:
bool wait() override {
KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
}
bool poll() override { return false; }
void wake() const override {
// TODO(someday): Implement using condvar.
kj::throwRecoverableException(KJ_EXCEPTION(UNIMPLEMENTED,
"Cross-thread events are not yet implemented for EventLoops with no EventPort."));
}
static NullEventPort instance;
};
NullEventPort NullEventPort::instance = NullEventPort();
} // namespace _ (private) } // namespace _ (private)
// ======================================================================================= // =======================================================================================
...@@ -299,8 +280,7 @@ void EventPort::wake() const { ...@@ -299,8 +280,7 @@ void EventPort::wake() const {
} }
EventLoop::EventLoop() EventLoop::EventLoop()
: port(_::NullEventPort::instance), : daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
daemons(kj::heap<TaskSet>(_::LoggingErrorHandler::instance)) {}
EventLoop::EventLoop(EventPort& port) EventLoop::EventLoop(EventPort& port)
: port(port), : port(port),
...@@ -384,7 +364,9 @@ bool EventLoop::isRunnable() { ...@@ -384,7 +364,9 @@ bool EventLoop::isRunnable() {
void EventLoop::setRunnable(bool runnable) { void EventLoop::setRunnable(bool runnable) {
if (runnable != lastRunnableState) { if (runnable != lastRunnableState) {
port.setRunnable(runnable); KJ_IF_MAYBE(p, port) {
p->setRunnable(runnable);
}
lastRunnableState = runnable; lastRunnableState = runnable;
} }
} }
...@@ -412,7 +394,9 @@ void WaitScope::poll() { ...@@ -412,7 +394,9 @@ void WaitScope::poll() {
for (;;) { for (;;) {
if (!loop.turn()) { if (!loop.turn()) {
// No events in the queue. Poll for I/O. // No events in the queue. Poll for I/O.
loop.port.poll(); KJ_IF_MAYBE(p, loop.port) {
p->poll();
}
if (!loop.isRunnable()) { if (!loop.isRunnable()) {
// Still no events in the queue. We're done. // Still no events in the queue. We're done.
...@@ -441,11 +425,17 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope ...@@ -441,11 +425,17 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope
if (!loop.turn()) { if (!loop.turn()) {
// No events in the queue. Wait for callback. // No events in the queue. Wait for callback.
counter = 0; counter = 0;
loop.port.wait(); KJ_IF_MAYBE(p, loop.port) {
p->wait();
} else {
KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
}
} else if (++counter > waitScope.busyPollInterval) { } else if (++counter > waitScope.busyPollInterval) {
// Note: It's intentional that if busyPollInterval is kj::maxValue, we never poll. // Note: It's intentional that if busyPollInterval is kj::maxValue, we never poll.
counter = 0; counter = 0;
loop.port.poll(); KJ_IF_MAYBE(p, loop.port) {
p->poll();
}
} }
} }
...@@ -473,7 +463,9 @@ bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) { ...@@ -473,7 +463,9 @@ bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) {
while (!doneEvent.fired) { while (!doneEvent.fired) {
if (!loop.turn()) { if (!loop.turn()) {
// No events in the queue. Poll for I/O. // No events in the queue. Poll for I/O.
loop.port.poll(); KJ_IF_MAYBE(p, loop.port) {
p->poll();
}
if (!doneEvent.fired && !loop.isRunnable()) { if (!doneEvent.fired && !loop.isRunnable()) {
// No progress. Give up. // No progress. Give up.
......
...@@ -754,7 +754,9 @@ public: ...@@ -754,7 +754,9 @@ public:
// Returns true if run() would currently do anything, or false if the queue is empty. // Returns true if run() would currently do anything, or false if the queue is empty.
private: private:
EventPort& port; kj::Maybe<EventPort&> port;
// If null, this thread doesn't receive I/O events from the OS. It can potentially receive
// events from other threads via the Executor.
bool running = false; bool running = false;
// True while looping -- wait() is then not allowed. // True while looping -- wait() is then not allowed.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment