Commit f06816aa authored by Oliver Kuckertz's avatar Oliver Kuckertz

Add option to listen for out-of-band data to FdObserver

parent 21e7b91e
...@@ -288,6 +288,9 @@ UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint fla ...@@ -288,6 +288,9 @@ UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint fla
if (flags & OBSERVE_WRITE) { if (flags & OBSERVE_WRITE) {
event.events |= EPOLLOUT; event.events |= EPOLLOUT;
} }
if (flags & OBSERVE_URGENT) {
event.events |= EPOLLPRI;
}
event.events |= EPOLLET; // Set edge-triggered mode. event.events |= EPOLLET; // Set edge-triggered mode.
event.data.ptr = this; event.data.ptr = this;
...@@ -320,6 +323,13 @@ void UnixEventPort::FdObserver::fire(short events) { ...@@ -320,6 +323,13 @@ void UnixEventPort::FdObserver::fire(short events) {
writeFulfiller = nullptr; writeFulfiller = nullptr;
} }
} }
if (events & EPOLLPRI) {
KJ_IF_MAYBE(f, urgentFulfiller) {
f->get()->fulfill();
urgentFulfiller = nullptr;
}
}
} }
Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() { Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() {
...@@ -338,6 +348,15 @@ Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() { ...@@ -338,6 +348,15 @@ Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() {
return kj::mv(paf.promise); return kj::mv(paf.promise);
} }
Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() {
KJ_REQUIRE(flags & OBSERVE_URGENT,
"FdObserver was not set to observe availability of urgent data.");
auto paf = newPromiseAndFulfiller<void>();
urgentFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
bool UnixEventPort::wait() { bool UnixEventPort::wait() {
// epoll_wait()'s timeout is an `int` count of milliseconds, so truncate to that. // epoll_wait()'s timeout is an `int` count of milliseconds, so truncate to that.
// Also, make sure that we aren't within a millisecond of overflowing a `Duration` since that // Also, make sure that we aren't within a millisecond of overflowing a `Duration` since that
...@@ -590,7 +609,14 @@ void UnixEventPort::FdObserver::fire(short events) { ...@@ -590,7 +609,14 @@ void UnixEventPort::FdObserver::fire(short events) {
} }
} }
if (readFulfiller == nullptr && writeFulfiller == nullptr) { if (events & POLLPRI) {
KJ_IF_MAYBE(f, urgentFulfiller) {
f->get()->fulfill();
urgentFulfiller = nullptr;
}
}
if (readFulfiller == nullptr && writeFulfiller == nullptr && urgentFulfiller == nullptr) {
// Remove from list. // Remove from list.
if (next == nullptr) { if (next == nullptr) {
eventPort.observersTail = prev; eventPort.observersTail = prev;
...@@ -605,7 +631,8 @@ void UnixEventPort::FdObserver::fire(short events) { ...@@ -605,7 +631,8 @@ void UnixEventPort::FdObserver::fire(short events) {
short UnixEventPort::FdObserver::getEventMask() { short UnixEventPort::FdObserver::getEventMask() {
return (readFulfiller == nullptr ? 0 : (POLLIN | POLLRDHUP)) | return (readFulfiller == nullptr ? 0 : (POLLIN | POLLRDHUP)) |
(writeFulfiller == nullptr ? 0 : POLLOUT); (writeFulfiller == nullptr ? 0 : POLLOUT) |
(urgentFulfiller == nullptr ? 0 : POLLPRI);
} }
Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() { Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() {
...@@ -638,6 +665,22 @@ Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() { ...@@ -638,6 +665,22 @@ Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() {
return kj::mv(paf.promise); return kj::mv(paf.promise);
} }
Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() {
KJ_REQUIRE(flags & OBSERVE_URGENT,
"FdObserver was not set to observe availability of urgent data.");
if (prev == nullptr) {
KJ_DASSERT(next == nullptr);
prev = eventPort.observersTail;
*prev = this;
eventPort.observersTail = &next;
}
auto paf = newPromiseAndFulfiller<void>();
urgentFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
class UnixEventPort::PollContext { class UnixEventPort::PollContext {
public: public:
PollContext(FdObserver* ptr) { PollContext(FdObserver* ptr) {
......
...@@ -158,6 +158,7 @@ public: ...@@ -158,6 +158,7 @@ public:
enum Flags { enum Flags {
OBSERVE_READ = 1, OBSERVE_READ = 1,
OBSERVE_WRITE = 2, OBSERVE_WRITE = 2,
OBSERVE_URGENT = 4,
OBSERVE_READ_WRITE = OBSERVE_READ | OBSERVE_WRITE OBSERVE_READ_WRITE = OBSERVE_READ | OBSERVE_WRITE
}; };
...@@ -228,6 +229,15 @@ public: ...@@ -228,6 +229,15 @@ public:
// It is an error to call `whenBecomesWritable()` again when the promise returned previously // It is an error to call `whenBecomesWritable()` again when the promise returned previously
// has not yet resolved. If you do this, the previous promise may throw an exception. // has not yet resolved. If you do this, the previous promise may throw an exception.
Promise<void> whenUrgentDataAvailable();
// Resolves the next time the file descriptor's read buffer contains "urgent" data.
//
// The conditions for availability of urgent data are specific to the file descriptor's
// underlying implementation.
//
// It is an error to call `whenUrgentDataAvailable()` again when the promise returned previously
// has not yet resolved. If you do this, the previous promise may throw an exception.
private: private:
UnixEventPort& eventPort; UnixEventPort& eventPort;
int fd; int fd;
...@@ -235,6 +245,7 @@ private: ...@@ -235,6 +245,7 @@ private:
kj::Maybe<Own<PromiseFulfiller<void>>> readFulfiller; kj::Maybe<Own<PromiseFulfiller<void>>> readFulfiller;
kj::Maybe<Own<PromiseFulfiller<void>>> writeFulfiller; kj::Maybe<Own<PromiseFulfiller<void>>> writeFulfiller;
kj::Maybe<Own<PromiseFulfiller<void>>> urgentFulfiller;
// Replaced each time `whenBecomesReadable()` or `whenBecomesWritable()` is called. Reverted to // Replaced each time `whenBecomesReadable()` or `whenBecomesWritable()` is called. Reverted to
// null every time an event is fired. // null every time an event is fired.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment