Commit d26e07f6 authored by Kenton Varda's avatar Kenton Varda

Don't assume a short non-blocking read() means the socket buffer is empty.

According to David Klempner, this turns out not to be a safe assumption:

https://twitter.com/CaptainSegfault/status/1112622245531144194

Interestingly, it turns out this optimization was also blocking the delivery of certain errors, because we would skip the read() where that error would otherwise be delivered. (Hence the unit test update.)
parent e9c46ac1
...@@ -2203,9 +2203,11 @@ KJ_TEST("OS TwoWayPipe whenWriteDisconnected()") { ...@@ -2203,9 +2203,11 @@ KJ_TEST("OS TwoWayPipe whenWriteDisconnected()") {
abortedPromise.wait(io.waitScope); abortedPromise.wait(io.waitScope);
char buffer[4]; char buffer[4];
KJ_ASSERT(pipe.ends[0]->tryRead(&buffer, sizeof(buffer), sizeof(buffer)).wait(io.waitScope) == 3); KJ_ASSERT(pipe.ends[0]->tryRead(&buffer, 3, 3).wait(io.waitScope) == 3);
buffer[3] = '\0'; buffer[3] = '\0';
KJ_EXPECT(buffer == "bar"_kj); KJ_EXPECT(buffer == "bar"_kj);
// Note: Reading any further in pipe.ends[0] would throw "connection reset".
} }
KJ_TEST("import socket FD that's already broken") { KJ_TEST("import socket FD that's already broken") {
......
...@@ -161,8 +161,8 @@ public: ...@@ -161,8 +161,8 @@ public:
} }
Promise<void> write(const void* buffer, size_t size) override { Promise<void> write(const void* buffer, size_t size) override {
ssize_t writeResult; ssize_t n;
KJ_NONBLOCKING_SYSCALL(writeResult = ::write(fd, buffer, size)) { KJ_NONBLOCKING_SYSCALL(n = ::write(fd, buffer, size)) {
// Error. // Error.
// We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
...@@ -176,21 +176,23 @@ public: ...@@ -176,21 +176,23 @@ public:
return kj::READY_NOW; return kj::READY_NOW;
} }
// A negative result means EAGAIN, which we can treat the same as having written zero bytes. if (n < 0) {
size_t n = writeResult < 0 ? 0 : writeResult; // EAGAIN -- need to wait for writability and try again.
return observer.whenBecomesWritable().then([=]() {
if (n == size) { return write(buffer, size);
});
} else if (n == size) {
// All done.
return READY_NOW; return READY_NOW;
} } else {
// Fewer than `size` bytes were written, but we CANNOT assume we're out of buffer space, as
// Fewer than `size` bytes were written, therefore we must be out of buffer space. Wait until // Linux is known to return partial reads/writes when interrupted by a signal -- yes, even
// the fd becomes writable again. // for non-blocking operations. So, we'll need to write() again now, even though it will
buffer = reinterpret_cast<const byte*>(buffer) + n; // almost certainly fail with EAGAIN. See comments in the read path for more info.
size -= n; buffer = reinterpret_cast<const byte*>(buffer) + n;
size -= n;
return observer.whenBecomesWritable().then([=]() {
return write(buffer, size); return write(buffer, size);
}); }
} }
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override { Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
...@@ -411,29 +413,17 @@ private: ...@@ -411,29 +413,17 @@ private:
maxBytes -= n; maxBytes -= n;
alreadyRead.byteCount += n; alreadyRead.byteCount += n;
KJ_IF_MAYBE(atEnd, observer.atEndHint()) { // According to David Klempner, who works on Stubby at Google, we sadly CANNOT assume that
if (*atEnd) { // we've consumed the whole read buffer here. If a signal is delivered in the middle of a
// We've already received an indication that the next read() will return EOF, so there's // read() -- yes, even a non-blocking read -- it can cause the kernel to return a partial
// nothing to wait for. // result, with data still in the buffer.
return alreadyRead; // https://bugzilla.kernel.org/show_bug.cgi?id=199131
} else { // https://twitter.com/CaptainSegfault/status/1112622245531144194
// As of the last time the event queue was checked, the kernel reported that we were //
// *not* at the end of the stream. It's unlikely that this has changed in the short time // Unfortunately, we have no choice but to issue more read()s until it either tells us EOF
// it took to handle the event, therefore calling read() now will almost certainly fail // or EAGAIN. We used to have an optimization here using observer.atEndHint() (when it is
// with EAGAIN. Moreover, since EOF had not been received as of the last check, we know // non-null) to avoid a redundant call to read(). Alas...
// that even if it was received since then, whenBecomesReadable() will catch that. So, return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
// let's go ahead and skip calling read() here and instead go straight to waiting for
// more input.
return observer.whenBecomesReadable().then([=]() {
return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
});
}
} else {
// The kernel has not indicated one way or the other whether we are likely to be at EOF.
// In this case we *must* keep calling read() until we either get a return of zero or
// EAGAIN.
return tryReadInternal(buffer, minBytes, maxBytes, fdBuffer, maxFds, alreadyRead);
}
} }
} }
...@@ -543,9 +533,9 @@ private: ...@@ -543,9 +533,9 @@ private:
return writeInternal(firstPiece, morePieces, nullptr); return writeInternal(firstPiece, morePieces, nullptr);
} }
return observer.whenBecomesWritable().then([=]() { // As with read(), we cannot assume that a short write() really means the write buffer is
return writeInternal(firstPiece, morePieces, nullptr); // full (see comments in the read path above). We have to write again.
}); return writeInternal(firstPiece, morePieces, nullptr);
} else if (morePieces.size() == 0) { } else if (morePieces.size() == 0) {
// First piece was fully-consumed and there are no more pieces, so we're done. // First piece was fully-consumed and there are no more pieces, so we're done.
KJ_DASSERT(n == firstPiece.size(), n); KJ_DASSERT(n == firstPiece.size(), n);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment