Commit 02a6edae authored by Kenton Varda's avatar Kenton Varda

Implement newHttpService(HttpClient&) adapter.

This uncovered some bugs and revealed that there was no way to read the Content-Length of a HEAD response. Fixed.
parent 490a36e3
...@@ -571,7 +571,7 @@ kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() { ...@@ -571,7 +571,7 @@ kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() {
HttpMethod::GET, HttpMethod::GET,
"/foo/bar", "/foo/bar",
{{HttpHeaderId::HOST, "example.com"}}, {{HttpHeaderId::HOST, "example.com"}},
nullptr, {}, uint64_t(0), {},
}, },
{ {
...@@ -582,7 +582,7 @@ kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() { ...@@ -582,7 +582,7 @@ kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() {
HttpMethod::HEAD, HttpMethod::HEAD,
"/foo/bar", "/foo/bar",
{{HttpHeaderId::HOST, "example.com"}}, {{HttpHeaderId::HOST, "example.com"}},
nullptr, {}, uint64_t(0), {},
}, },
{ {
...@@ -650,7 +650,7 @@ kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() { ...@@ -650,7 +650,7 @@ kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() {
HttpMethod::GET, HttpMethod::GET,
"/", "/",
{{HttpHeaderId::HOST, HUGE_STRING}}, {{HttpHeaderId::HOST, HUGE_STRING}},
nullptr, {} uint64_t(0), {}
}, },
}; };
...@@ -787,7 +787,7 @@ KJ_TEST("HttpServer responses") { ...@@ -787,7 +787,7 @@ KJ_TEST("HttpServer responses") {
HttpMethod::GET, HttpMethod::GET,
"/", "/",
{}, {},
nullptr, {}, uint64_t(0), {},
}; };
HttpRequestTestCase HEAD_REQUEST = { HttpRequestTestCase HEAD_REQUEST = {
...@@ -797,7 +797,7 @@ KJ_TEST("HttpServer responses") { ...@@ -797,7 +797,7 @@ KJ_TEST("HttpServer responses") {
HttpMethod::HEAD, HttpMethod::HEAD,
"/", "/",
{}, {},
nullptr, {}, uint64_t(0), {},
}; };
auto io = kj::setupAsyncIo(); auto io = kj::setupAsyncIo();
...@@ -819,7 +819,7 @@ kj::ArrayPtr<const HttpTestCase> pipelineTestCases() { ...@@ -819,7 +819,7 @@ kj::ArrayPtr<const HttpTestCase> pipelineTestCases() {
"GET / HTTP/1.1\r\n" "GET / HTTP/1.1\r\n"
"\r\n", "\r\n",
HttpMethod::GET, "/", {}, nullptr, {}, HttpMethod::GET, "/", {}, uint64_t(0), {},
}, },
{ {
"HTTP/1.1 200 OK\r\n" "HTTP/1.1 200 OK\r\n"
...@@ -925,7 +925,7 @@ kj::ArrayPtr<const HttpTestCase> pipelineTestCases() { ...@@ -925,7 +925,7 @@ kj::ArrayPtr<const HttpTestCase> pipelineTestCases() {
"HEAD / HTTP/1.1\r\n" "HEAD / HTTP/1.1\r\n"
"\r\n", "\r\n",
HttpMethod::HEAD, "/", {}, nullptr, {}, HttpMethod::HEAD, "/", {}, uint64_t(0), {},
}, },
{ {
"HTTP/1.1 200 OK\r\n" "HTTP/1.1 200 OK\r\n"
...@@ -1444,6 +1444,49 @@ KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") { ...@@ -1444,6 +1444,49 @@ KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
KJ_TEST("newHttpService from HttpClient") {
auto PIPELINE_TESTS = pipelineTestCases();
auto io = kj::setupAsyncIo();
auto frontPipe = io.provider->newTwoWayPipe();
auto backPipe = io.provider->newTwoWayPipe();
kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
for (auto& testCase: PIPELINE_TESTS) {
writeResponsesPromise = writeResponsesPromise
.then([&]() {
return expectRead(*backPipe.ends[1], testCase.request.raw);
}).then([&]() {
return backPipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
});
}
{
HttpHeaderTable table;
auto backClient = newHttpClient(table, *backPipe.ends[0]);
auto frontService = newHttpService(*backClient);
HttpServer frontServer(io.provider->getTimer(), table, *frontService);
auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));
for (auto& testCase: PIPELINE_TESTS) {
KJ_CONTEXT(testCase.request.raw, testCase.response.raw);
frontPipe.ends[0]->write(testCase.request.raw.begin(), testCase.request.raw.size())
.wait(io.waitScope);
expectRead(*frontPipe.ends[0], testCase.response.raw).wait(io.waitScope);
}
frontPipe.ends[0]->shutdownWrite();
listenTask.wait(io.waitScope);
}
backPipe.ends[0]->shutdownWrite();
writeResponsesPromise.wait(io.waitScope);
}
// -----------------------------------------------------------------------------
KJ_TEST("HttpClient to capnproto.org") { KJ_TEST("HttpClient to capnproto.org") {
auto io = kj::setupAsyncIo(); auto io = kj::setupAsyncIo();
......
...@@ -1066,14 +1066,23 @@ class HttpNullEntityReader final: public HttpEntityBodyReader { ...@@ -1066,14 +1066,23 @@ class HttpNullEntityReader final: public HttpEntityBodyReader {
// Stream which reads until EOF. // Stream which reads until EOF.
public: public:
HttpNullEntityReader(HttpInputStream& inner) HttpNullEntityReader(HttpInputStream& inner, kj::Maybe<uint64_t> length)
: HttpEntityBodyReader(inner) { : HttpEntityBodyReader(inner), length(length) {
// `length` is what to return from tryGetLength(). For a response to a HEAD request, this may
// be non-zero.
doneReading(); doneReading();
} }
Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return size_t(0); return size_t(0);
} }
Maybe<uint64_t> tryGetLength() override {
return length;
}
private:
kj::Maybe<uint64_t> length;
}; };
class HttpConnectionCloseEntityReader final: public HttpEntityBodyReader { class HttpConnectionCloseEntityReader final: public HttpEntityBodyReader {
...@@ -1217,10 +1226,18 @@ static_assert(!fastCaseCmp<'f','O','o','B','1','a'>("FooB1"), ""); ...@@ -1217,10 +1226,18 @@ static_assert(!fastCaseCmp<'f','O','o','B','1','a'>("FooB1"), "");
kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody( kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody(
RequestOrResponse type, HttpMethod method, uint statusCode, RequestOrResponse type, HttpMethod method, uint statusCode,
HttpHeaders::ConnectionHeaders& connectionHeaders) { HttpHeaders::ConnectionHeaders& connectionHeaders) {
if (type == RESPONSE && (method == HttpMethod::HEAD || if (type == RESPONSE) {
statusCode == 204 || statusCode == 205 || statusCode == 304)) { if (method == HttpMethod::HEAD) {
// No body. // Body elided.
return kj::heap<HttpNullEntityReader>(*this); kj::Maybe<uint64_t> length;
if (connectionHeaders.contentLength != nullptr) {
length = strtoull(connectionHeaders.contentLength.cStr(), nullptr, 10);
}
return kj::heap<HttpNullEntityReader>(*this, length);
} else if (statusCode == 204 || statusCode == 205 || statusCode == 304) {
// No body.
return kj::heap<HttpNullEntityReader>(*this, uint64_t(0));
}
} }
if (connectionHeaders.transferEncoding != nullptr) { if (connectionHeaders.transferEncoding != nullptr) {
...@@ -1240,7 +1257,7 @@ kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody( ...@@ -1240,7 +1257,7 @@ kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody(
if (type == REQUEST) { if (type == REQUEST) {
// Lack of a Content-Length or Transfer-Encoding means no body for requests. // Lack of a Content-Length or Transfer-Encoding means no body for requests.
return kj::heap<HttpNullEntityReader>(*this); return kj::heap<HttpNullEntityReader>(*this, uint64_t(0));
} }
if (connectionHeaders.connection != nullptr) { if (connectionHeaders.connection != nullptr) {
...@@ -1252,7 +1269,7 @@ kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody( ...@@ -1252,7 +1269,7 @@ kj::Own<kj::AsyncInputStream> HttpInputStream::getEntityBody(
} }
KJ_FAIL_REQUIRE("don't know how HTTP body is delimited", headers); KJ_FAIL_REQUIRE("don't know how HTTP body is delimited", headers);
return kj::heap<HttpNullEntityReader>(*this); return kj::heap<HttpNullEntityReader>(*this, uint64_t(0));
} }
// ======================================================================================= // =======================================================================================
...@@ -1414,7 +1431,9 @@ public: ...@@ -1414,7 +1431,9 @@ public:
amount = kj::min(amount, length); amount = kj::min(amount, length);
length -= amount; length -= amount;
auto promise = inner.pumpBodyFrom(input, amount).then([this,amount](uint64_t actual) { auto promise = amount == 0
? kj::Promise<uint64_t>(amount)
: inner.pumpBodyFrom(input, amount).then([this,amount](uint64_t actual) {
// Adjust for bytes not written. // Adjust for bytes not written.
length += amount - actual; length += amount - actual;
if (length == 0) inner.finishBody(); if (length == 0) inner.finishBody();
...@@ -1602,7 +1621,7 @@ kj::Promise<HttpClient::WebSocketResponse> HttpClient::openWebSocket( ...@@ -1602,7 +1621,7 @@ kj::Promise<HttpClient::WebSocketResponse> HttpClient::openWebSocket(
}); });
} }
kj::Promise<kj::Own<kj::AsyncIoStream>> HttpClient::connect(kj::String host) { kj::Promise<kj::Own<kj::AsyncIoStream>> HttpClient::connect(kj::StringPtr host) {
KJ_UNIMPLEMENTED("CONNECT is not implemented by this HttpClient"); KJ_UNIMPLEMENTED("CONNECT is not implemented by this HttpClient");
} }
...@@ -1613,6 +1632,51 @@ kj::Own<HttpClient> newHttpClient( ...@@ -1613,6 +1632,51 @@ kj::Own<HttpClient> newHttpClient(
// ======================================================================================= // =======================================================================================
namespace {
class HttpServiceAdapter: public HttpService {
public:
HttpServiceAdapter(HttpClient& client): client(client) {}
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
auto innerReq = client.request(method, url, headers, requestBody.tryGetLength());
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(requestBody.pumpTo(*innerReq.body).ignoreResult()
.attach(kj::mv(innerReq.body)).eagerlyEvaluate(nullptr));
promises.add(innerReq.response
.then([&response](HttpClient::Response&& innerResponse) {
auto out = response.send(
innerResponse.statusCode, innerResponse.statusText, *innerResponse.headers,
innerResponse.body->tryGetLength());
auto promise = innerResponse.body->pumpTo(*out);
return promise.ignoreResult().attach(kj::mv(out), kj::mv(innerResponse.body));
}));
return kj::joinPromises(promises.finish());
}
kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::StringPtr host) override {
return client.connect(kj::mv(host));
}
// TODO(soon): other methods
private:
HttpClient& client;
};
} // namespace
kj::Own<HttpService> newHttpService(HttpClient& client) {
return kj::heap<HttpServiceAdapter>(client);
}
// =======================================================================================
kj::Promise<void> HttpService::openWebSocket( kj::Promise<void> HttpService::openWebSocket(
kj::StringPtr url, const HttpHeaders& headers, WebSocketResponse& response) { kj::StringPtr url, const HttpHeaders& headers, WebSocketResponse& response) {
class EmptyStream final: public kj::AsyncInputStream { class EmptyStream final: public kj::AsyncInputStream {
...@@ -1627,7 +1691,7 @@ kj::Promise<void> HttpService::openWebSocket( ...@@ -1627,7 +1691,7 @@ kj::Promise<void> HttpService::openWebSocket(
return promise.attach(kj::mv(requestBody)); return promise.attach(kj::mv(requestBody));
} }
kj::Promise<kj::Own<kj::AsyncIoStream>> HttpService::connect(kj::String host) { kj::Promise<kj::Own<kj::AsyncIoStream>> HttpService::connect(kj::StringPtr host) {
KJ_UNIMPLEMENTED("CONNECT is not implemented by this HttpService"); KJ_UNIMPLEMENTED("CONNECT is not implemented by this HttpService");
} }
...@@ -1759,8 +1823,13 @@ public: ...@@ -1759,8 +1823,13 @@ public:
if (currentMethod == nullptr) { if (currentMethod == nullptr) {
// Dang, already sent a partial response. Can't do anything else. // Dang, already sent a partial response. Can't do anything else.
KJ_LOG(ERROR, "HttpService threw exception after generating a partial response", //
"too late to report error to client", e); // If it's a DISCONNECTED exception, it's probably that the client disconnected, which is
// not really worth logging.
if (e.getType() != kj::Exception::Type::DISCONNECTED) {
KJ_LOG(ERROR, "HttpService threw exception after generating a partial response",
"too late to report error to client", e);
}
return kj::READY_NOW; return kj::READY_NOW;
} }
......
...@@ -433,7 +433,7 @@ public: ...@@ -433,7 +433,7 @@ public:
// //
// `url` and `headers` are invalidated when the returned promise resolves. // `url` and `headers` are invalidated when the returned promise resolves.
virtual kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::String host); virtual kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::StringPtr host);
// Handles CONNECT requests. Only relevant for proxy clients. Default implementation throws // Handles CONNECT requests. Only relevant for proxy clients. Default implementation throws
// UNIMPLEMENTED. // UNIMPLEMENTED.
}; };
...@@ -494,7 +494,7 @@ public: ...@@ -494,7 +494,7 @@ public:
// //
// `url` and `headers` are invalidated when the returned promise resolves. // `url` and `headers` are invalidated when the returned promise resolves.
virtual kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::String host); virtual kj::Promise<kj::Own<kj::AsyncIoStream>> connect(kj::StringPtr host);
// Handles CONNECT requests. Only relevant for proxy services. Default implementation throws // Handles CONNECT requests. Only relevant for proxy services. Default implementation throws
// UNIMPLEMENTED. // UNIMPLEMENTED.
}; };
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment