Commit 3c7ae27a authored by Kenton Varda's avatar Kenton Varda

Merge pull request #239 from mologie/feature-pollpri

Add observing for urgent data to UnixEventPort::FdObserver
parents fd8ed198 92f428d1
...@@ -12,6 +12,7 @@ Philip Quinn <p@partylemon.com>: cmake build and other assorted bits ...@@ -12,6 +12,7 @@ Philip Quinn <p@partylemon.com>: cmake build and other assorted bits
Brian Taylor <el.wubo@gmail.com>: emacs syntax highlighting Brian Taylor <el.wubo@gmail.com>: emacs syntax highlighting
Ben Laurie <ben@links.org>: discovered and responsibly disclosed security bugs Ben Laurie <ben@links.org>: discovered and responsibly disclosed security bugs
Kamal Marhubi <kamal@marhubi.com>: JSON parser Kamal Marhubi <kamal@marhubi.com>: JSON parser
Oliver Kuckertz <oliver.kuckertz@mologie.de>: FdObserver POLLPRI support
This file does not list people who maintain their own Cap'n Proto This file does not list people who maintain their own Cap'n Proto
implementations as separate projects. Those people are awesome too! :) implementations as separate projects. Those people are awesome too! :)
...@@ -26,7 +26,9 @@ ...@@ -26,7 +26,9 @@
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <netinet/in.h>
#include <kj/compat/gtest.h> #include <kj/compat/gtest.h>
#include <pthread.h> #include <pthread.h>
#include <algorithm> #include <algorithm>
...@@ -453,6 +455,71 @@ TEST(AsyncUnixTest, WriteObserver) { ...@@ -453,6 +455,71 @@ TEST(AsyncUnixTest, WriteObserver) {
EXPECT_TRUE(writable); EXPECT_TRUE(writable);
} }
TEST(AsyncUnixTest, UrgentObserver) {
// Verify that FdObserver correctly detects availability of out-of-band data.
// Availability of out-of-band data is implementation-specific.
// Linux's and OS X's TCP/IP stack supports out-of-band messages for TCP sockets, which is used
// for this test.
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
int tmpFd;
char c;
// Spawn a TCP server
KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
kj::AutoCloseFd serverFd(tmpFd);
sockaddr_in saddr = {0};
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
KJ_SYSCALL(bind(serverFd, reinterpret_cast<sockaddr*>(&saddr), sizeof(saddr)));
socklen_t saddrLen = sizeof(saddr);
KJ_SYSCALL(getsockname(serverFd, reinterpret_cast<sockaddr*>(&saddr), &saddrLen));
KJ_SYSCALL(listen(serverFd, 1));
// Accept one connection, send in-band and OOB byte, wait for a quit message
Thread thread([&]() {
int tmpFd;
char c;
sockaddr_in caddr;
socklen_t caddrLen = sizeof(caddr);
KJ_SYSCALL(tmpFd = accept(serverFd, reinterpret_cast<sockaddr*>(&caddr), &caddrLen));
kj::AutoCloseFd clientFd(tmpFd);
delay();
// Workaround: OS X won't signal POLLPRI without POLLIN. Also enqueue some in-band data.
c = 'i';
KJ_SYSCALL(send(clientFd, &c, 1, 0));
c = 'o';
KJ_SYSCALL(send(clientFd, &c, 1, MSG_OOB));
KJ_SYSCALL(recv(clientFd, &c, 1, 0));
EXPECT_EQ('q', c);
});
KJ_DEFER({ shutdown(serverFd, SHUT_RDWR); serverFd = nullptr; });
KJ_SYSCALL(tmpFd = socket(AF_INET, SOCK_STREAM, 0));
kj::AutoCloseFd clientFd(tmpFd);
KJ_SYSCALL(connect(clientFd, reinterpret_cast<sockaddr*>(&saddr), saddrLen));
UnixEventPort::FdObserver observer(port, clientFd,
UnixEventPort::FdObserver::OBSERVE_READ | UnixEventPort::FdObserver::OBSERVE_URGENT);
// Attempt to read the urgent byte prior to reading the in-band byte.
observer.whenUrgentDataAvailable().wait(waitScope);
KJ_SYSCALL(recv(clientFd, &c, 1, MSG_OOB));
EXPECT_EQ('o', c);
KJ_SYSCALL(recv(clientFd, &c, 1, 0));
EXPECT_EQ('i', c);
// Allow server thread to let its clientFd go out of scope.
c = 'q';
KJ_SYSCALL(send(clientFd, &c, 1, 0));
KJ_SYSCALL(shutdown(clientFd, SHUT_RDWR));
}
TEST(AsyncUnixTest, SteadyTimers) { TEST(AsyncUnixTest, SteadyTimers) {
captureSignals(); captureSignals();
UnixEventPort port; UnixEventPort port;
......
...@@ -288,6 +288,9 @@ UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint fla ...@@ -288,6 +288,9 @@ UnixEventPort::FdObserver::FdObserver(UnixEventPort& eventPort, int fd, uint fla
if (flags & OBSERVE_WRITE) { if (flags & OBSERVE_WRITE) {
event.events |= EPOLLOUT; event.events |= EPOLLOUT;
} }
if (flags & OBSERVE_URGENT) {
event.events |= EPOLLPRI;
}
event.events |= EPOLLET; // Set edge-triggered mode. event.events |= EPOLLET; // Set edge-triggered mode.
event.data.ptr = this; event.data.ptr = this;
...@@ -320,6 +323,13 @@ void UnixEventPort::FdObserver::fire(short events) { ...@@ -320,6 +323,13 @@ void UnixEventPort::FdObserver::fire(short events) {
writeFulfiller = nullptr; writeFulfiller = nullptr;
} }
} }
if (events & EPOLLPRI) {
KJ_IF_MAYBE(f, urgentFulfiller) {
f->get()->fulfill();
urgentFulfiller = nullptr;
}
}
} }
Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() { Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() {
...@@ -338,6 +348,15 @@ Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() { ...@@ -338,6 +348,15 @@ Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() {
return kj::mv(paf.promise); return kj::mv(paf.promise);
} }
Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() {
KJ_REQUIRE(flags & OBSERVE_URGENT,
"FdObserver was not set to observe availability of urgent data.");
auto paf = newPromiseAndFulfiller<void>();
urgentFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
bool UnixEventPort::wait() { bool UnixEventPort::wait() {
// epoll_wait()'s timeout is an `int` count of milliseconds, so truncate to that. // epoll_wait()'s timeout is an `int` count of milliseconds, so truncate to that.
// Also, make sure that we aren't within a millisecond of overflowing a `Duration` since that // Also, make sure that we aren't within a millisecond of overflowing a `Duration` since that
...@@ -597,7 +616,14 @@ void UnixEventPort::FdObserver::fire(short events) { ...@@ -597,7 +616,14 @@ void UnixEventPort::FdObserver::fire(short events) {
} }
} }
if (readFulfiller == nullptr && writeFulfiller == nullptr) { if (events & POLLPRI) {
KJ_IF_MAYBE(f, urgentFulfiller) {
f->get()->fulfill();
urgentFulfiller = nullptr;
}
}
if (readFulfiller == nullptr && writeFulfiller == nullptr && urgentFulfiller == nullptr) {
// Remove from list. // Remove from list.
if (next == nullptr) { if (next == nullptr) {
eventPort.observersTail = prev; eventPort.observersTail = prev;
...@@ -612,7 +638,8 @@ void UnixEventPort::FdObserver::fire(short events) { ...@@ -612,7 +638,8 @@ void UnixEventPort::FdObserver::fire(short events) {
short UnixEventPort::FdObserver::getEventMask() { short UnixEventPort::FdObserver::getEventMask() {
return (readFulfiller == nullptr ? 0 : (POLLIN | POLLRDHUP)) | return (readFulfiller == nullptr ? 0 : (POLLIN | POLLRDHUP)) |
(writeFulfiller == nullptr ? 0 : POLLOUT); (writeFulfiller == nullptr ? 0 : POLLOUT) |
(urgentFulfiller == nullptr ? 0 : POLLPRI);
} }
Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() { Promise<void> UnixEventPort::FdObserver::whenBecomesReadable() {
...@@ -645,6 +672,22 @@ Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() { ...@@ -645,6 +672,22 @@ Promise<void> UnixEventPort::FdObserver::whenBecomesWritable() {
return kj::mv(paf.promise); return kj::mv(paf.promise);
} }
Promise<void> UnixEventPort::FdObserver::whenUrgentDataAvailable() {
KJ_REQUIRE(flags & OBSERVE_URGENT,
"FdObserver was not set to observe availability of urgent data.");
if (prev == nullptr) {
KJ_DASSERT(next == nullptr);
prev = eventPort.observersTail;
*prev = this;
eventPort.observersTail = &next;
}
auto paf = newPromiseAndFulfiller<void>();
urgentFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
class UnixEventPort::PollContext { class UnixEventPort::PollContext {
public: public:
PollContext(FdObserver* ptr) { PollContext(FdObserver* ptr) {
......
...@@ -158,6 +158,7 @@ public: ...@@ -158,6 +158,7 @@ public:
enum Flags { enum Flags {
OBSERVE_READ = 1, OBSERVE_READ = 1,
OBSERVE_WRITE = 2, OBSERVE_WRITE = 2,
OBSERVE_URGENT = 4,
OBSERVE_READ_WRITE = OBSERVE_READ | OBSERVE_WRITE OBSERVE_READ_WRITE = OBSERVE_READ | OBSERVE_WRITE
}; };
...@@ -228,6 +229,15 @@ public: ...@@ -228,6 +229,15 @@ public:
// It is an error to call `whenBecomesWritable()` again when the promise returned previously // It is an error to call `whenBecomesWritable()` again when the promise returned previously
// has not yet resolved. If you do this, the previous promise may throw an exception. // has not yet resolved. If you do this, the previous promise may throw an exception.
Promise<void> whenUrgentDataAvailable();
// Resolves the next time the file descriptor's read buffer contains "urgent" data.
//
// The conditions for availability of urgent data are specific to the file descriptor's
// underlying implementation.
//
// It is an error to call `whenUrgentDataAvailable()` again when the promise returned previously
// has not yet resolved. If you do this, the previous promise may throw an exception.
private: private:
UnixEventPort& eventPort; UnixEventPort& eventPort;
int fd; int fd;
...@@ -235,6 +245,7 @@ private: ...@@ -235,6 +245,7 @@ private:
kj::Maybe<Own<PromiseFulfiller<void>>> readFulfiller; kj::Maybe<Own<PromiseFulfiller<void>>> readFulfiller;
kj::Maybe<Own<PromiseFulfiller<void>>> writeFulfiller; kj::Maybe<Own<PromiseFulfiller<void>>> writeFulfiller;
kj::Maybe<Own<PromiseFulfiller<void>>> urgentFulfiller;
// Replaced each time `whenBecomesReadable()` or `whenBecomesWritable()` is called. Reverted to // Replaced each time `whenBecomesReadable()` or `whenBecomesWritable()` is called. Reverted to
// null every time an event is fired. // null every time an event is fired.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment