Commit 49afd825 authored by Kenton Varda's avatar Kenton Varda

Don't reuse HTTP client connections if the previous request didn't read the entire response.

(Or if it didn't send the entire request body.)
parent d95c12e2
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE. // THE SOFTWARE.
#define KJ_TESTING_KJ 1
#include "http.h" #include "http.h"
#include <kj/debug.h> #include <kj/debug.h>
#include <kj/test.h> #include <kj/test.h>
...@@ -2468,8 +2470,10 @@ public: ...@@ -2468,8 +2470,10 @@ public:
auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url); auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
auto stream = response.send(200, "OK", HttpHeaders(headerTable), body.size()); auto stream = response.send(200, "OK", HttpHeaders(headerTable), body.size());
auto promise = stream->write(body.begin(), body.size()); auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
return promise.attach(kj::mv(stream), kj::mv(body)); promises.add(stream->write(body.begin(), body.size()));
promises.add(requestBody.readAllBytes().ignoreResult());
return kj::joinPromises(promises.finish()).attach(kj::mv(stream), kj::mv(body));
} }
kj::Promise<void> openWebSocket( kj::Promise<void> openWebSocket(
...@@ -2539,6 +2543,17 @@ KJ_TEST("HttpClient connection management") { ...@@ -2539,6 +2543,17 @@ KJ_TEST("HttpClient connection management") {
req2.wait(io.waitScope); req2.wait(io.waitScope);
KJ_EXPECT(count == 2); KJ_EXPECT(count == 2);
// We can reuse after a POST, provided we write the whole POST body properly.
{
auto req = client->request(
HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6));
req.body->write("foobar", 6).wait(io.waitScope);
req.response.wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
}
KJ_EXPECT(count == 2);
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 2);
// Advance time for half the timeout, then exercise one of the connections. // Advance time for half the timeout, then exercise one of the connections.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2); clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
...@@ -2569,9 +2584,23 @@ KJ_TEST("HttpClient connection management") { ...@@ -2569,9 +2584,23 @@ KJ_TEST("HttpClient connection management") {
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response
.wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
KJ_EXPECT(count == 0);
// Connections where we failed to read the full response body are not reused.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)).response
.wait(io.waitScope); .wait(io.waitScope);
KJ_EXPECT(count == 0); KJ_EXPECT(count == 0);
// Connections where we failed to write the full request body are not reused.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
client->request(HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6)).response
.wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
KJ_EXPECT(count == 0);
#if !_WIN32 // TODO(soon): Figure out why this doesn't work on Windows. Probably a bug in #if !_WIN32 // TODO(soon): Figure out why this doesn't work on Windows. Probably a bug in
// Win32IocpEventPort::poll(). // Win32IocpEventPort::poll().
// If the server times out the connection, we figure it out on the client. // If the server times out the connection, we figure it out on the client.
......
...@@ -1026,6 +1026,10 @@ public: ...@@ -1026,6 +1026,10 @@ public:
: inner(inner), headerBuffer(kj::heapArray<char>(MIN_BUFFER)), headers(table) { : inner(inner), headerBuffer(kj::heapArray<char>(MIN_BUFFER)), headers(table) {
} }
bool canReuse() {
return !broken;
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Stream locking: While an entity-body is being read, the body stream "locks" the underlying // Stream locking: While an entity-body is being read, the body stream "locks" the underlying
// HTTP stream. Once the entity-body is complete, we can read the next pipelined message. // HTTP stream. Once the entity-body is complete, we can read the next pipelined message.
...@@ -1041,9 +1045,10 @@ public: ...@@ -1041,9 +1045,10 @@ public:
// Called when a body input stream was destroyed without reading to the end. // Called when a body input stream was destroyed without reading to the end.
KJ_REQUIRE_NONNULL(onMessageDone)->reject(KJ_EXCEPTION(FAILED, KJ_REQUIRE_NONNULL(onMessageDone)->reject(KJ_EXCEPTION(FAILED,
"client did not finish reading previous HTTP response body", "application did not finish reading previous HTTP response body",
"can't read next pipelined response")); "can't read next pipelined request/response"));
onMessageDone = nullptr; onMessageDone = nullptr;
broken = true;
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
...@@ -1218,6 +1223,9 @@ private: ...@@ -1218,6 +1223,9 @@ private:
// as a side-effect of HTTP chunked encoding, where such a newline is added to the end of each // as a side-effect of HTTP chunked encoding, where such a newline is added to the end of each
// chunk, for no good reason. // chunk, for no good reason.
bool broken = false;
// Becomes true if the caller failed to read the whole entity-body before closing the stream.
kj::Promise<void> messageReadQueue = kj::READY_NOW; kj::Promise<void> messageReadQueue = kj::READY_NOW;
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> onMessageDone; kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> onMessageDone;
...@@ -1612,6 +1620,10 @@ class HttpOutputStream { ...@@ -1612,6 +1620,10 @@ class HttpOutputStream {
public: public:
HttpOutputStream(AsyncOutputStream& inner): inner(inner) {} HttpOutputStream(AsyncOutputStream& inner): inner(inner) {}
bool canReuse() {
return !inBody && !broken;
}
void writeHeaders(String content) { void writeHeaders(String content) {
// Writes some header content and begins a new entity body. // Writes some header content and begins a new entity body.
...@@ -1671,6 +1683,7 @@ public: ...@@ -1671,6 +1683,7 @@ public:
// Called if the application failed to write all expected body bytes. // Called if the application failed to write all expected body bytes.
KJ_REQUIRE(inBody) { return; } KJ_REQUIRE(inBody) { return; }
inBody = false; inBody = false;
broken = true;
writeQueue = writeQueue.then([]() -> kj::Promise<void> { writeQueue = writeQueue.then([]() -> kj::Promise<void> {
return KJ_EXCEPTION(FAILED, return KJ_EXCEPTION(FAILED,
...@@ -1688,6 +1701,7 @@ private: ...@@ -1688,6 +1701,7 @@ private:
AsyncOutputStream& inner; AsyncOutputStream& inner;
kj::Promise<void> writeQueue = kj::READY_NOW; kj::Promise<void> writeQueue = kj::READY_NOW;
bool inBody = false; bool inBody = false;
bool broken = false;
void queueWrite(kj::String content) { void queueWrite(kj::String content) {
writeQueue = writeQueue.then(kj::mvCapture(content, [this](kj::String&& content) { writeQueue = writeQueue.then(kj::mvCapture(content, [this](kj::String&& content) {
...@@ -2431,9 +2445,9 @@ public: ...@@ -2431,9 +2445,9 @@ public:
settings(kj::mv(settings)) {} settings(kj::mv(settings)) {}
bool canReuse() { bool canReuse() {
// Returns true if // Returns true if we can reuse this HttpClient for another request.
return !upgraded && !closed; return !upgraded && !closed && httpInput.canReuse() && httpOutput.canReuse();
} }
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
...@@ -2803,7 +2817,7 @@ class AttachmentOutputStream final: public kj::AsyncOutputStream { ...@@ -2803,7 +2817,7 @@ class AttachmentOutputStream final: public kj::AsyncOutputStream {
public: public:
AttachmentOutputStream(kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::Refcounted> attachment) AttachmentOutputStream(kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::Refcounted> attachment)
: inner(kj::mv(inner)), attachment(kj::mv(attachment)) {} : attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<void> write(const void* buffer, size_t size) override { kj::Promise<void> write(const void* buffer, size_t size) override {
return inner->write(buffer, size); return inner->write(buffer, size);
...@@ -2817,8 +2831,10 @@ public: ...@@ -2817,8 +2831,10 @@ public:
} }
private: private:
kj::Own<kj::AsyncOutputStream> inner; // Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment; kj::Own<kj::Refcounted> attachment;
kj::Own<kj::AsyncOutputStream> inner;
}; };
class AttachmentInputStream final: public kj::AsyncInputStream { class AttachmentInputStream final: public kj::AsyncInputStream {
...@@ -2826,7 +2842,7 @@ class AttachmentInputStream final: public kj::AsyncInputStream { ...@@ -2826,7 +2842,7 @@ class AttachmentInputStream final: public kj::AsyncInputStream {
public: public:
AttachmentInputStream(kj::Own<kj::AsyncInputStream> inner, kj::Own<kj::Refcounted> attachment) AttachmentInputStream(kj::Own<kj::AsyncInputStream> inner, kj::Own<kj::Refcounted> attachment)
: inner(kj::mv(inner)), attachment(kj::mv(attachment)) {} : attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override { kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes); return inner->read(buffer, minBytes, maxBytes);
...@@ -2844,8 +2860,10 @@ public: ...@@ -2844,8 +2860,10 @@ public:
} }
private: private:
kj::Own<kj::AsyncInputStream> inner; // Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment; kj::Own<kj::Refcounted> attachment;
kj::Own<kj::AsyncInputStream> inner;
}; };
class NetworkAddressHttpClient final: public HttpClient { class NetworkAddressHttpClient final: public HttpClient {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment