Commit fd7a63ca authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #564 from capnproto/fix-http-client

Don't reuse HTTP client connections if the previous request didn't read the entire response.
parents d95c12e2 d823f23a
......@@ -19,6 +19,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#define KJ_TESTING_KJ 1
#include "http.h"
#include <kj/debug.h>
#include <kj/test.h>
......@@ -2468,8 +2470,10 @@ public:
auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
auto stream = response.send(200, "OK", HttpHeaders(headerTable), body.size());
auto promise = stream->write(body.begin(), body.size());
return promise.attach(kj::mv(stream), kj::mv(body));
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(stream->write(body.begin(), body.size()));
promises.add(requestBody.readAllBytes().ignoreResult());
return kj::joinPromises(promises.finish()).attach(kj::mv(stream), kj::mv(body));
}
kj::Promise<void> openWebSocket(
......@@ -2539,6 +2543,17 @@ KJ_TEST("HttpClient connection management") {
req2.wait(io.waitScope);
KJ_EXPECT(count == 2);
// We can reuse after a POST, provided we write the whole POST body properly.
{
auto req = client->request(
HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6));
req.body->write("foobar", 6).wait(io.waitScope);
req.response.wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
}
KJ_EXPECT(count == 2);
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 2);
// Advance time for half the timeout, then exercise one of the connections.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
doRequest().wait(io.waitScope);
......@@ -2569,9 +2584,23 @@ KJ_TEST("HttpClient connection management") {
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response
.wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
KJ_EXPECT(count == 0);
// Connections where we failed to read the full response body are not reused.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)).response
.wait(io.waitScope);
KJ_EXPECT(count == 0);
// Connections where we failed to write the full request body are not reused.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
client->request(HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6)).response
.wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
KJ_EXPECT(count == 0);
#if !_WIN32 // TODO(soon): Figure out why this doesn't work on Windows. Probably a bug in
// Win32IocpEventPort::poll().
// If the server times out the connection, we figure it out on the client.
......
......@@ -1026,6 +1026,10 @@ public:
: inner(inner), headerBuffer(kj::heapArray<char>(MIN_BUFFER)), headers(table) {
}
bool canReuse() {
return !broken;
}
// ---------------------------------------------------------------------------
// Stream locking: While an entity-body is being read, the body stream "locks" the underlying
// HTTP stream. Once the entity-body is complete, we can read the next pipelined message.
......@@ -1041,9 +1045,10 @@ public:
// Called when a body input stream was destroyed without reading to the end.
KJ_REQUIRE_NONNULL(onMessageDone)->reject(KJ_EXCEPTION(FAILED,
"client did not finish reading previous HTTP response body",
"can't read next pipelined response"));
"application did not finish reading previous HTTP response body",
"can't read next pipelined request/response"));
onMessageDone = nullptr;
broken = true;
}
// ---------------------------------------------------------------------------
......@@ -1218,6 +1223,9 @@ private:
// as a side-effect of HTTP chunked encoding, where such a newline is added to the end of each
// chunk, for no good reason.
bool broken = false;
// Becomes true if the caller failed to read the whole entity-body before closing the stream.
kj::Promise<void> messageReadQueue = kj::READY_NOW;
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> onMessageDone;
......@@ -1612,6 +1620,10 @@ class HttpOutputStream {
public:
HttpOutputStream(AsyncOutputStream& inner): inner(inner) {}
bool canReuse() {
return !inBody && !broken;
}
void writeHeaders(String content) {
// Writes some header content and begins a new entity body.
......@@ -1671,6 +1683,7 @@ public:
// Called if the application failed to write all expected body bytes.
KJ_REQUIRE(inBody) { return; }
inBody = false;
broken = true;
writeQueue = writeQueue.then([]() -> kj::Promise<void> {
return KJ_EXCEPTION(FAILED,
......@@ -1688,6 +1701,7 @@ private:
AsyncOutputStream& inner;
kj::Promise<void> writeQueue = kj::READY_NOW;
bool inBody = false;
bool broken = false;
void queueWrite(kj::String content) {
writeQueue = writeQueue.then(kj::mvCapture(content, [this](kj::String&& content) {
......@@ -2431,9 +2445,9 @@ public:
settings(kj::mv(settings)) {}
bool canReuse() {
// Returns true if
// Returns true if we can reuse this HttpClient for another request.
return !upgraded && !closed;
return !upgraded && !closed && httpInput.canReuse() && httpOutput.canReuse();
}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
......@@ -2803,7 +2817,7 @@ class AttachmentOutputStream final: public kj::AsyncOutputStream {
public:
AttachmentOutputStream(kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::Refcounted> attachment)
: inner(kj::mv(inner)), attachment(kj::mv(attachment)) {}
: attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<void> write(const void* buffer, size_t size) override {
return inner->write(buffer, size);
......@@ -2817,8 +2831,10 @@ public:
}
private:
kj::Own<kj::AsyncOutputStream> inner;
// Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment;
kj::Own<kj::AsyncOutputStream> inner;
};
class AttachmentInputStream final: public kj::AsyncInputStream {
......@@ -2826,7 +2842,7 @@ class AttachmentInputStream final: public kj::AsyncInputStream {
public:
AttachmentInputStream(kj::Own<kj::AsyncInputStream> inner, kj::Own<kj::Refcounted> attachment)
: inner(kj::mv(inner)), attachment(kj::mv(attachment)) {}
: attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
......@@ -2844,8 +2860,10 @@ public:
}
private:
kj::Own<kj::AsyncInputStream> inner;
// Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment;
kj::Own<kj::AsyncInputStream> inner;
};
class NetworkAddressHttpClient final: public HttpClient {
......@@ -3145,8 +3163,6 @@ private:
networkToUse = &KJ_REQUIRE_NONNULL(tlsNetwork, "this HttpClient doesn't support HTTPS");
}
auto paf = kj::newPromiseAndFulfiller<kj::Promise<void>>();
auto promise = networkToUse->parseAddress(parsed.host, isHttps ? 443 : 80)
.then([this](kj::Own<kj::NetworkAddress> addr) {
return kj::heap<NetworkAddressHttpClient>(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment