Commit 1e542ad1 authored by Kenton Varda's avatar Kenton Varda

Use new userspace pipes in http-test.

This speeds up the test somewhat, but more importantly, it tests the pipe implementation across a variety of usage patterns.

This actually uncovered a bug in the HTTP implementation: An HttpClient could inadvertently issue overlapping reads in cases where multiple concurrent (pipelined) requests are made.
parent 7117175d
This diff is collapsed.
...@@ -2492,8 +2492,10 @@ public: ...@@ -2492,8 +2492,10 @@ public:
bodyStream = heap<HttpChunkedEntityWriter>(httpOutput); bodyStream = heap<HttpChunkedEntityWriter>(httpOutput);
} }
auto responsePromise = httpInput.readResponseHeaders() auto id = ++counter;
.then([this,method](kj::Maybe<HttpHeaders::Response>&& response) -> HttpClient::Response {
auto responsePromise = httpInput.readResponseHeaders().then(
[this,method,id](kj::Maybe<HttpHeaders::Response>&& response) -> HttpClient::Response {
KJ_IF_MAYBE(r, response) { KJ_IF_MAYBE(r, response) {
auto& headers = httpInput.getHeaders(); auto& headers = httpInput.getHeaders();
HttpClient::Response result { HttpClient::Response result {
...@@ -2506,8 +2508,11 @@ public: ...@@ -2506,8 +2508,11 @@ public:
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>( if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(
headers.get(HttpHeaderId::CONNECTION).orDefault(nullptr).cStr())) { headers.get(HttpHeaderId::CONNECTION).orDefault(nullptr).cStr())) {
closed = true; closed = true;
} else { } else if (counter == id) {
watchForClose(); watchForClose();
} else {
// Anothe request was already queued after this one, so we don't want to watch for
// stream closure because we're fully expecting another response.
} }
return result; return result;
} else { } else {
...@@ -2550,9 +2555,11 @@ public: ...@@ -2550,9 +2555,11 @@ public:
// No entity-body. // No entity-body.
httpOutput.finishBody(); httpOutput.finishBody();
auto id = ++counter;
return httpInput.readResponseHeaders() return httpInput.readResponseHeaders()
.then(kj::mvCapture(keyBase64, .then(kj::mvCapture(keyBase64,
[this](kj::StringPtr keyBase64, kj::Maybe<HttpHeaders::Response>&& response) [this,id](kj::StringPtr keyBase64, kj::Maybe<HttpHeaders::Response>&& response)
-> HttpClient::WebSocketResponse { -> HttpClient::WebSocketResponse {
KJ_IF_MAYBE(r, response) { KJ_IF_MAYBE(r, response) {
auto& headers = httpInput.getHeaders(); auto& headers = httpInput.getHeaders();
...@@ -2593,8 +2600,11 @@ public: ...@@ -2593,8 +2600,11 @@ public:
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>( if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(
headers.get(HttpHeaderId::CONNECTION).orDefault(nullptr).cStr())) { headers.get(HttpHeaderId::CONNECTION).orDefault(nullptr).cStr())) {
closed = true; closed = true;
} else { } else if (counter == id) {
watchForClose(); watchForClose();
} else {
// Anothe request was already queued after this one, so we don't want to watch for
// stream closure because we're fully expecting another response.
} }
return result; return result;
} }
...@@ -2614,6 +2624,10 @@ private: ...@@ -2614,6 +2624,10 @@ private:
bool upgraded = false; bool upgraded = false;
bool closed = false; bool closed = false;
uint counter = 0;
// Counts requests for the sole purpose of detecting if more requests have been made after some
// point in history.
void watchForClose() { void watchForClose() {
closeWatcherTask = httpInput.awaitNextMessage().then([this](bool hasData) { closeWatcherTask = httpInput.awaitNextMessage().then([this](bool hasData) {
if (hasData) { if (hasData) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment