// Copyright (c) 2017 Sandstorm Development Group, Inc. and contributors // Licensed under the MIT License: // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #include "http.h" #include <kj/debug.h> #include <kj/test.h> #include <map> namespace kj { namespace { KJ_TEST("HttpMethod parse / stringify") { #define TRY(name) \ KJ_EXPECT(kj::str(HttpMethod::name) == #name); \ KJ_IF_MAYBE(parsed, tryParseHttpMethod(#name)) { \ KJ_EXPECT(*parsed == HttpMethod::name); \ } else { \ KJ_FAIL_EXPECT("couldn't parse \"" #name "\" as HttpMethod"); \ } KJ_HTTP_FOR_EACH_METHOD(TRY) #undef TRY KJ_EXPECT(tryParseHttpMethod("FOO") == nullptr); KJ_EXPECT(tryParseHttpMethod("") == nullptr); KJ_EXPECT(tryParseHttpMethod("G") == nullptr); KJ_EXPECT(tryParseHttpMethod("GE") == nullptr); KJ_EXPECT(tryParseHttpMethod("GET ") == nullptr); KJ_EXPECT(tryParseHttpMethod("get") == nullptr); } KJ_TEST("HttpHeaderTable") { HttpHeaderTable::Builder builder; auto host = builder.add("Host"); auto host2 = builder.add("hOsT"); auto fooBar = builder.add("Foo-Bar"); auto bazQux = builder.add("baz-qux"); auto bazQux2 = builder.add("Baz-Qux"); auto table = builder.build(); uint builtinHeaderCount = 0; #define INCREMENT(id, name) ++builtinHeaderCount; KJ_HTTP_FOR_EACH_BUILTIN_HEADER(INCREMENT) #undef INCREMENT KJ_EXPECT(table->idCount() == builtinHeaderCount + 2); KJ_EXPECT(host == HttpHeaderId::HOST); KJ_EXPECT(host != HttpHeaderId::DATE); KJ_EXPECT(host2 == host); KJ_EXPECT(host != fooBar); KJ_EXPECT(host != bazQux); KJ_EXPECT(fooBar != bazQux); KJ_EXPECT(bazQux == bazQux2); KJ_EXPECT(kj::str(host) == "Host"); KJ_EXPECT(kj::str(host2) == "Host"); KJ_EXPECT(kj::str(fooBar) == "Foo-Bar"); KJ_EXPECT(kj::str(bazQux) == "baz-qux"); KJ_EXPECT(kj::str(HttpHeaderId::HOST) == "Host"); KJ_EXPECT(table->idToString(HttpHeaderId::DATE) == "Date"); KJ_EXPECT(table->idToString(fooBar) == "Foo-Bar"); KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Date")) == HttpHeaderId::DATE); KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("dATE")) == HttpHeaderId::DATE); KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Foo-Bar")) == fooBar); KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("foo-BAR")) == fooBar); KJ_EXPECT(table->stringToId("foobar") == nullptr); KJ_EXPECT(table->stringToId("barfoo") == nullptr); } KJ_TEST("HttpHeaders::parseRequest") { HttpHeaderTable::Builder builder; auto fooBar = builder.add("Foo-Bar"); auto bazQux = builder.add("baz-qux"); auto table = builder.build(); HttpHeaders headers(*table); auto text = kj::heapString( "POST /some/path \t HTTP/1.1\r\n" "Foo-BaR: Baz\r\n" "Host: example.com\r\n" "Content-Length: 123\r\n" "DATE: early\r\n" "other-Header: yep\r\n" "\r\n"); auto result = KJ_ASSERT_NONNULL(headers.tryParseRequest(text.asArray())); KJ_EXPECT(result.method == HttpMethod::POST); KJ_EXPECT(result.url == "/some/path"); KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com"); KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early"); KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz"); KJ_EXPECT(headers.get(bazQux) == nullptr); KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr); KJ_EXPECT(result.connectionHeaders.contentLength == "123"); KJ_EXPECT(result.connectionHeaders.transferEncoding == nullptr); std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders; headers.forEach([&](kj::StringPtr name, kj::StringPtr value) { KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second); }); KJ_EXPECT(unpackedHeaders.size() == 4); KJ_EXPECT(unpackedHeaders["Host"] == "example.com"); KJ_EXPECT(unpackedHeaders["Date"] == "early"); KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz"); KJ_EXPECT(unpackedHeaders["other-Header"] == "yep"); KJ_EXPECT(headers.serializeRequest(result.method, result.url, result.connectionHeaders) == "POST /some/path HTTP/1.1\r\n" "Content-Length: 123\r\n" "Host: example.com\r\n" "Date: early\r\n" "Foo-Bar: Baz\r\n" "other-Header: yep\r\n" "\r\n"); } KJ_TEST("HttpHeaders::parseResponse") { HttpHeaderTable::Builder builder; auto fooBar = builder.add("Foo-Bar"); auto bazQux = builder.add("baz-qux"); auto table = builder.build(); HttpHeaders headers(*table); auto text = kj::heapString( "HTTP/1.1\t\t 418\t I'm a teapot\r\n" "Foo-BaR: Baz\r\n" "Host: example.com\r\n" "Content-Length: 123\r\n" "DATE: early\r\n" "other-Header: yep\r\n" "\r\n"); auto result = KJ_ASSERT_NONNULL(headers.tryParseResponse(text.asArray())); KJ_EXPECT(result.statusCode == 418); KJ_EXPECT(result.statusText == "I'm a teapot"); KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com"); KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early"); KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz"); KJ_EXPECT(headers.get(bazQux) == nullptr); KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr); KJ_EXPECT(result.connectionHeaders.contentLength == "123"); KJ_EXPECT(result.connectionHeaders.transferEncoding == nullptr); std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders; headers.forEach([&](kj::StringPtr name, kj::StringPtr value) { KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second); }); KJ_EXPECT(unpackedHeaders.size() == 4); KJ_EXPECT(unpackedHeaders["Host"] == "example.com"); KJ_EXPECT(unpackedHeaders["Date"] == "early"); KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz"); KJ_EXPECT(unpackedHeaders["other-Header"] == "yep"); KJ_EXPECT(headers.serializeResponse( result.statusCode, result.statusText, result.connectionHeaders) == "HTTP/1.1 418 I'm a teapot\r\n" "Content-Length: 123\r\n" "Host: example.com\r\n" "Date: early\r\n" "Foo-Bar: Baz\r\n" "other-Header: yep\r\n" "\r\n"); } KJ_TEST("HttpHeaders parse invalid") { auto table = HttpHeaderTable::Builder().build(); HttpHeaders headers(*table); // NUL byte in request. KJ_EXPECT(headers.tryParseRequest(kj::heapString( "POST \0 /some/path \t HTTP/1.1\r\n" "Foo-BaR: Baz\r\n" "Host: example.com\r\n" "DATE: early\r\n" "other-Header: yep\r\n" "\r\n")) == nullptr); // Control character in header name. KJ_EXPECT(headers.tryParseRequest(kj::heapString( "POST /some/path \t HTTP/1.1\r\n" "Foo-BaR: Baz\r\n" "Cont\001ent-Length: 123\r\n" "DATE: early\r\n" "other-Header: yep\r\n" "\r\n")) == nullptr); // Separator character in header name. KJ_EXPECT(headers.tryParseRequest(kj::heapString( "POST /some/path \t HTTP/1.1\r\n" "Foo-BaR: Baz\r\n" "Host: example.com\r\n" "DATE/: early\r\n" "other-Header: yep\r\n" "\r\n")) == nullptr); // Response status code not numeric. KJ_EXPECT(headers.tryParseResponse(kj::heapString( "HTTP/1.1\t\t abc\t I'm a teapot\r\n" "Foo-BaR: Baz\r\n" "Host: example.com\r\n" "DATE: early\r\n" "other-Header: yep\r\n" "\r\n")) == nullptr); } // ======================================================================================= class ReadFragmenter final: public kj::AsyncIoStream { public: ReadFragmenter(AsyncIoStream& inner, size_t limit): inner(inner), limit(limit) {} Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override { return inner.read(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes))); } Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { return inner.tryRead(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes))); } Maybe<uint64_t> tryGetLength() override { return inner.tryGetLength(); } Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override { return inner.pumpTo(output, amount); } Promise<void> write(const void* buffer, size_t size) override { return inner.write(buffer, size); } Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override { return inner.write(pieces); } Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override { return inner.tryPumpFrom(input, amount); } void shutdownWrite() override { return inner.shutdownWrite(); } void abortRead() override { return inner.abortRead(); } void getsockopt(int level, int option, void* value, uint* length) override { return inner.getsockopt(level, option, value, length); } void setsockopt(int level, int option, const void* value, uint length) override { return inner.setsockopt(level, option, value, length); } void getsockname(struct sockaddr* addr, uint* length) override { return inner.getsockname(addr, length); } void getpeername(struct sockaddr* addr, uint* length) override { return inner.getsockname(addr, length); } private: kj::AsyncIoStream& inner; size_t limit; }; template <typename T> class InitializeableArray: public Array<T> { public: InitializeableArray(std::initializer_list<T> init) : Array<T>(kj::heapArray(init)) {} }; enum Side { BOTH, CLIENT_ONLY, SERVER_ONLY }; struct HeaderTestCase { HttpHeaderId id; kj::StringPtr value; }; struct HttpRequestTestCase { kj::StringPtr raw; HttpMethod method; kj::StringPtr path; InitializeableArray<HeaderTestCase> requestHeaders; kj::Maybe<uint64_t> requestBodySize; InitializeableArray<kj::StringPtr> requestBodyParts; Side side = BOTH; // TODO(cleanup): Delete this constructor if/when we move to C++14. HttpRequestTestCase(kj::StringPtr raw, HttpMethod method, kj::StringPtr path, InitializeableArray<HeaderTestCase> requestHeaders, kj::Maybe<uint64_t> requestBodySize, InitializeableArray<kj::StringPtr> requestBodyParts, Side side = BOTH) : raw(raw), method(method), path(path), requestHeaders(kj::mv(requestHeaders)), requestBodySize(requestBodySize), requestBodyParts(kj::mv(requestBodyParts)), side(side) {} }; struct HttpResponseTestCase { kj::StringPtr raw; uint64_t statusCode; kj::StringPtr statusText; InitializeableArray<HeaderTestCase> responseHeaders; kj::Maybe<uint64_t> responseBodySize; InitializeableArray<kj::StringPtr> responseBodyParts; HttpMethod method = HttpMethod::GET; Side side = BOTH; // TODO(cleanup): Delete this constructor if/when we move to C++14. HttpResponseTestCase(kj::StringPtr raw, uint64_t statusCode, kj::StringPtr statusText, InitializeableArray<HeaderTestCase> responseHeaders, kj::Maybe<uint64_t> responseBodySize, InitializeableArray<kj::StringPtr> responseBodyParts, HttpMethod method = HttpMethod::GET, Side side = BOTH) : raw(raw), statusCode(statusCode), statusText(statusText), responseHeaders(kj::mv(responseHeaders)), responseBodySize(responseBodySize), responseBodyParts(kj::mv(responseBodyParts)), method(method), side(side) {} }; struct HttpTestCase { HttpRequestTestCase request; HttpResponseTestCase response; }; kj::Promise<void> writeEach(kj::AsyncOutputStream& out, kj::ArrayPtr<const kj::StringPtr> parts) { if (parts.size() == 0) return kj::READY_NOW; return out.write(parts[0].begin(), parts[0].size()) .then([&out,parts]() { return writeEach(out, parts.slice(1, parts.size())); }); } kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) { if (expected.size() == 0) return kj::READY_NOW; auto buffer = kj::heapArray<char>(expected.size()); auto promise = in.tryRead(buffer.begin(), 1, buffer.size()); return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) { if (amount == 0) { KJ_FAIL_ASSERT("expected data never sent", expected); } auto actual = buffer.slice(0, amount); if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) { KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual); } return expectRead(in, expected.slice(amount)); })); } kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::ArrayPtr<const byte> expected) { if (expected.size() == 0) return kj::READY_NOW; auto buffer = kj::heapArray<byte>(expected.size()); auto promise = in.tryRead(buffer.begin(), 1, buffer.size()); return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<byte> buffer, size_t amount) { if (amount == 0) { KJ_FAIL_ASSERT("expected data never sent", expected); } auto actual = buffer.slice(0, amount); if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) { KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual); } return expectRead(in, expected.slice(amount, expected.size())); })); } void testHttpClientRequest(kj::AsyncIoContext& io, const HttpRequestTestCase& testCase) { auto pipe = io.provider->newTwoWayPipe(); auto serverTask = expectRead(*pipe.ends[1], testCase.raw).then([&]() { static const char SIMPLE_RESPONSE[] = "HTTP/1.1 200 OK\r\n" "Content-Length: 0\r\n" "\r\n"; return pipe.ends[1]->write(SIMPLE_RESPONSE, strlen(SIMPLE_RESPONSE)); }).then([&]() -> kj::Promise<void> { return kj::NEVER_DONE; }); HttpHeaderTable table; auto client = newHttpClient(table, *pipe.ends[0]); HttpHeaders headers(table); for (auto& header: testCase.requestHeaders) { headers.set(header.id, header.value); } auto request = client->request(testCase.method, testCase.path, headers, testCase.requestBodySize); if (testCase.requestBodyParts.size() > 0) { writeEach(*request.body, testCase.requestBodyParts).wait(io.waitScope); } request.body = nullptr; auto clientTask = request.response .then([&](HttpClient::Response&& response) { auto promise = response.body->readAllText(); return promise.attach(kj::mv(response.body)); }).ignoreResult(); serverTask.exclusiveJoin(kj::mv(clientTask)).wait(io.waitScope); // Verify no more data written by client. client = nullptr; pipe.ends[0]->shutdownWrite(); KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == ""); } void testHttpClientResponse(kj::AsyncIoContext& io, const HttpResponseTestCase& testCase, size_t readFragmentSize) { auto pipe = io.provider->newTwoWayPipe(); ReadFragmenter fragmenter(*pipe.ends[0], readFragmentSize); auto expectedReqText = testCase.method == HttpMethod::GET || testCase.method == HttpMethod::HEAD ? kj::str(testCase.method, " / HTTP/1.1\r\n\r\n") : kj::str(testCase.method, " / HTTP/1.1\r\nContent-Length: 0\r\n"); auto serverTask = expectRead(*pipe.ends[1], expectedReqText).then([&]() { return pipe.ends[1]->write(testCase.raw.begin(), testCase.raw.size()); }).then([&]() -> kj::Promise<void> { pipe.ends[1]->shutdownWrite(); return kj::NEVER_DONE; }); HttpHeaderTable table; auto client = newHttpClient(table, fragmenter); HttpHeaders headers(table); auto request = client->request(testCase.method, "/", headers, uint64_t(0)); request.body = nullptr; auto clientTask = request.response .then([&](HttpClient::Response&& response) { KJ_EXPECT(response.statusCode == testCase.statusCode); KJ_EXPECT(response.statusText == testCase.statusText); for (auto& header: testCase.responseHeaders) { KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(header.id)) == header.value); } auto promise = response.body->readAllText(); return promise.attach(kj::mv(response.body)); }).then([&](kj::String body) { KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""), body); }); serverTask.exclusiveJoin(kj::mv(clientTask)).wait(io.waitScope); // Verify no more data written by client. client = nullptr; pipe.ends[0]->shutdownWrite(); KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == ""); } class TestHttpService final: public HttpService { public: TestHttpService(const HttpRequestTestCase& expectedRequest, const HttpResponseTestCase& response, HttpHeaderTable& table) : singleExpectedRequest(&expectedRequest), singleResponse(&response), responseHeaders(table) {} TestHttpService(kj::ArrayPtr<const HttpTestCase> testCases, HttpHeaderTable& table) : singleExpectedRequest(nullptr), singleResponse(nullptr), testCases(testCases), responseHeaders(table) {} uint getRequestCount() { return requestCount; } kj::Promise<void> request( HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, kj::AsyncInputStream& requestBody, Response& responseSender) override { auto& expectedRequest = testCases == nullptr ? *singleExpectedRequest : testCases[requestCount % testCases.size()].request; auto& response = testCases == nullptr ? *singleResponse : testCases[requestCount % testCases.size()].response; ++requestCount; KJ_EXPECT(method == expectedRequest.method, method); KJ_EXPECT(url == expectedRequest.path, url); for (auto& header: expectedRequest.requestHeaders) { KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(header.id)) == header.value); } auto size = requestBody.tryGetLength(); KJ_IF_MAYBE(expectedSize, expectedRequest.requestBodySize) { KJ_IF_MAYBE(s, size) { KJ_EXPECT(*s == *expectedSize, *s); } else { KJ_FAIL_EXPECT("tryGetLength() returned nullptr; expected known size"); } } else { KJ_EXPECT(size == nullptr); } return requestBody.readAllText() .then([this,&expectedRequest,&response,&responseSender](kj::String text) { KJ_EXPECT(text == kj::strArray(expectedRequest.requestBodyParts, ""), text); responseHeaders.clear(); for (auto& header: response.responseHeaders) { responseHeaders.set(header.id, header.value); } auto stream = responseSender.send(response.statusCode, response.statusText, responseHeaders, response.responseBodySize); auto promise = writeEach(*stream, response.responseBodyParts); return promise.attach(kj::mv(stream)); }); } private: const HttpRequestTestCase* singleExpectedRequest; const HttpResponseTestCase* singleResponse; kj::ArrayPtr<const HttpTestCase> testCases; HttpHeaders responseHeaders; uint requestCount = 0; }; void testHttpServerRequest(kj::AsyncIoContext& io, const HttpRequestTestCase& requestCase, const HttpResponseTestCase& responseCase) { auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; TestHttpService service(requestCase, responseCase, table); HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); pipe.ends[1]->write(requestCase.raw.begin(), requestCase.raw.size()).wait(io.waitScope); pipe.ends[1]->shutdownWrite(); expectRead(*pipe.ends[1], responseCase.raw).wait(io.waitScope); listenTask.wait(io.waitScope); KJ_EXPECT(service.getRequestCount() == 1); } kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() { static const auto HUGE_STRING = kj::strArray(kj::repeat("abcdefgh", 4096), ""); static const auto HUGE_REQUEST = kj::str( "GET / HTTP/1.1\r\n" "Host: ", HUGE_STRING, "\r\n" "\r\n"); static const HttpRequestTestCase REQUEST_TEST_CASES[] { { "GET /foo/bar HTTP/1.1\r\n" "Host: example.com\r\n" "\r\n", HttpMethod::GET, "/foo/bar", {{HttpHeaderId::HOST, "example.com"}}, uint64_t(0), {}, }, { "HEAD /foo/bar HTTP/1.1\r\n" "Host: example.com\r\n" "\r\n", HttpMethod::HEAD, "/foo/bar", {{HttpHeaderId::HOST, "example.com"}}, uint64_t(0), {}, }, { "POST / HTTP/1.1\r\n" "Content-Length: 9\r\n" "Host: example.com\r\n" "Content-Type: text/plain\r\n" "\r\n" "foobarbaz", HttpMethod::POST, "/", { {HttpHeaderId::HOST, "example.com"}, {HttpHeaderId::CONTENT_TYPE, "text/plain"}, }, 9, { "foo", "bar", "baz" }, }, { "POST / HTTP/1.1\r\n" "Transfer-Encoding: chunked\r\n" "Host: example.com\r\n" "Content-Type: text/plain\r\n" "\r\n" "3\r\n" "foo\r\n" "6\r\n" "barbaz\r\n" "0\r\n" "\r\n", HttpMethod::POST, "/", { {HttpHeaderId::HOST, "example.com"}, {HttpHeaderId::CONTENT_TYPE, "text/plain"}, }, nullptr, { "foo", "barbaz" }, }, { "POST / HTTP/1.1\r\n" "Transfer-Encoding: chunked\r\n" "Host: example.com\r\n" "Content-Type: text/plain\r\n" "\r\n" "1d\r\n" "0123456789abcdef0123456789abc\r\n" "0\r\n" "\r\n", HttpMethod::POST, "/", { {HttpHeaderId::HOST, "example.com"}, {HttpHeaderId::CONTENT_TYPE, "text/plain"}, }, nullptr, { "0123456789abcdef0123456789abc" }, }, { HUGE_REQUEST, HttpMethod::GET, "/", {{HttpHeaderId::HOST, HUGE_STRING}}, uint64_t(0), {} }, }; // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents REQUEST_TEST_CASES from implicitly // casting to our return type. return kj::arrayPtr(REQUEST_TEST_CASES, kj::size(REQUEST_TEST_CASES)); } kj::ArrayPtr<const HttpResponseTestCase> responseTestCases() { static const HttpResponseTestCase RESPONSE_TEST_CASES[] { { "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n" "\r\n" "baz qux", 200, "OK", {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, nullptr, {"baz qux"}, HttpMethod::GET, CLIENT_ONLY, // Server never sends connection: close }, { "HTTP/1.1 200 OK\r\n" "Content-Length: 123\r\n" "Content-Type: text/plain\r\n" "\r\n", 200, "OK", {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, 123, {}, HttpMethod::HEAD, }, { "HTTP/1.1 200 OK\r\n" "Content-Length: 8\r\n" "Content-Type: text/plain\r\n" "\r\n" "quxcorge", 200, "OK", {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, 8, { "qux", "corge" } }, { "HTTP/1.1 200 OK\r\n" "Transfer-Encoding: chunked\r\n" "Content-Type: text/plain\r\n" "\r\n" "3\r\n" "qux\r\n" "5\r\n" "corge\r\n" "0\r\n" "\r\n", 200, "OK", {{HttpHeaderId::CONTENT_TYPE, "text/plain"}}, nullptr, { "qux", "corge" } }, }; // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly // casting to our return type. return kj::arrayPtr(RESPONSE_TEST_CASES, kj::size(RESPONSE_TEST_CASES)); } KJ_TEST("HttpClient requests") { auto io = kj::setupAsyncIo(); for (auto& testCase: requestTestCases()) { if (testCase.side == SERVER_ONLY) continue; KJ_CONTEXT(testCase.raw); testHttpClientRequest(io, testCase); } } KJ_TEST("HttpClient responses") { auto io = kj::setupAsyncIo(); size_t FRAGMENT_SIZES[] = { 1, 2, 3, 4, 5, 6, 7, 8, 16, 31, kj::maxValue }; for (auto& testCase: responseTestCases()) { if (testCase.side == SERVER_ONLY) continue; for (size_t fragmentSize: FRAGMENT_SIZES) { KJ_CONTEXT(testCase.raw, fragmentSize); testHttpClientResponse(io, testCase, fragmentSize); } } } KJ_TEST("HttpServer requests") { HttpResponseTestCase RESPONSE = { "HTTP/1.1 200 OK\r\n" "Content-Length: 3\r\n" "\r\n" "foo", 200, "OK", {}, 3, {"foo"} }; HttpResponseTestCase HEAD_RESPONSE = { "HTTP/1.1 200 OK\r\n" "Content-Length: 3\r\n" "\r\n", 200, "OK", {}, 3, {"foo"} }; auto io = kj::setupAsyncIo(); for (auto& testCase: requestTestCases()) { if (testCase.side == CLIENT_ONLY) continue; KJ_CONTEXT(testCase.raw); testHttpServerRequest(io, testCase, testCase.method == HttpMethod::HEAD ? HEAD_RESPONSE : RESPONSE); } } KJ_TEST("HttpServer responses") { HttpRequestTestCase REQUEST = { "GET / HTTP/1.1\r\n" "\r\n", HttpMethod::GET, "/", {}, uint64_t(0), {}, }; HttpRequestTestCase HEAD_REQUEST = { "HEAD / HTTP/1.1\r\n" "\r\n", HttpMethod::HEAD, "/", {}, uint64_t(0), {}, }; auto io = kj::setupAsyncIo(); for (auto& testCase: responseTestCases()) { if (testCase.side == CLIENT_ONLY) continue; KJ_CONTEXT(testCase.raw); testHttpServerRequest(io, testCase.method == HttpMethod::HEAD ? HEAD_REQUEST : REQUEST, testCase); } } // ----------------------------------------------------------------------------- kj::ArrayPtr<const HttpTestCase> pipelineTestCases() { static const HttpTestCase PIPELINE_TESTS[] = { { { "GET / HTTP/1.1\r\n" "\r\n", HttpMethod::GET, "/", {}, uint64_t(0), {}, }, { "HTTP/1.1 200 OK\r\n" "Content-Length: 7\r\n" "\r\n" "foo bar", 200, "OK", {}, 7, { "foo bar" } }, }, { { "POST /foo HTTP/1.1\r\n" "Content-Length: 6\r\n" "\r\n" "grault", HttpMethod::POST, "/foo", {}, 6, { "grault" }, }, { "HTTP/1.1 404 Not Found\r\n" "Content-Length: 13\r\n" "\r\n" "baz qux corge", 404, "Not Found", {}, 13, { "baz qux corge" } }, }, // Throw a zero-size request/response into the pipeline to check for a bug that existed with // them previously. { { "POST /foo HTTP/1.1\r\n" "Content-Length: 0\r\n" "\r\n", HttpMethod::POST, "/foo", {}, uint64_t(0), {}, }, { "HTTP/1.1 200 OK\r\n" "Content-Length: 0\r\n" "\r\n", 200, "OK", {}, uint64_t(0), {} }, }, // Also a zero-size chunked request/response. { { "POST /foo HTTP/1.1\r\n" "Transfer-Encoding: chunked\r\n" "\r\n" "0\r\n" "\r\n", HttpMethod::POST, "/foo", {}, nullptr, {}, }, { "HTTP/1.1 200 OK\r\n" "Transfer-Encoding: chunked\r\n" "\r\n" "0\r\n" "\r\n", 200, "OK", {}, nullptr, {} }, }, { { "POST /bar HTTP/1.1\r\n" "Transfer-Encoding: chunked\r\n" "\r\n" "6\r\n" "garply\r\n" "5\r\n" "waldo\r\n" "0\r\n" "\r\n", HttpMethod::POST, "/bar", {}, nullptr, { "garply", "waldo" }, }, { "HTTP/1.1 200 OK\r\n" "Transfer-Encoding: chunked\r\n" "\r\n" "4\r\n" "fred\r\n" "5\r\n" "plugh\r\n" "0\r\n" "\r\n", 200, "OK", {}, nullptr, { "fred", "plugh" } }, }, { { "HEAD / HTTP/1.1\r\n" "\r\n", HttpMethod::HEAD, "/", {}, uint64_t(0), {}, }, { "HTTP/1.1 200 OK\r\n" "Content-Length: 7\r\n" "\r\n", 200, "OK", {}, 7, { "foo bar" } }, }, }; // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly // casting to our return type. return kj::arrayPtr(PIPELINE_TESTS, kj::size(PIPELINE_TESTS)); } KJ_TEST("HttpClient pipeline") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); kj::Promise<void> writeResponsesPromise = kj::READY_NOW; for (auto& testCase: PIPELINE_TESTS) { writeResponsesPromise = writeResponsesPromise .then([&]() { return expectRead(*pipe.ends[1], testCase.request.raw); }).then([&]() { return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size()); }); } HttpHeaderTable table; auto client = newHttpClient(table, *pipe.ends[0]); for (auto& testCase: PIPELINE_TESTS) { KJ_CONTEXT(testCase.request.raw, testCase.response.raw); HttpHeaders headers(table); for (auto& header: testCase.request.requestHeaders) { headers.set(header.id, header.value); } auto request = client->request( testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize); for (auto& part: testCase.request.requestBodyParts) { request.body->write(part.begin(), part.size()).wait(io.waitScope); } request.body = nullptr; auto response = request.response.wait(io.waitScope); KJ_EXPECT(response.statusCode == testCase.response.statusCode); auto body = response.body->readAllText().wait(io.waitScope); if (testCase.request.method == HttpMethod::HEAD) { KJ_EXPECT(body == ""); } else { KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body); } } client = nullptr; pipe.ends[0]->shutdownWrite(); writeResponsesPromise.wait(io.waitScope); } KJ_TEST("HttpClient parallel pipeline") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); kj::Promise<void> writeResponsesPromise = kj::READY_NOW; for (auto& testCase: PIPELINE_TESTS) { writeResponsesPromise = writeResponsesPromise .then([&]() { return expectRead(*pipe.ends[1], testCase.request.raw); }).then([&]() { return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size()); }); } HttpHeaderTable table; auto client = newHttpClient(table, *pipe.ends[0]); auto responsePromises = KJ_MAP(testCase, PIPELINE_TESTS) { KJ_CONTEXT(testCase.request.raw, testCase.response.raw); HttpHeaders headers(table); for (auto& header: testCase.request.requestHeaders) { headers.set(header.id, header.value); } auto request = client->request( testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize); for (auto& part: testCase.request.requestBodyParts) { request.body->write(part.begin(), part.size()).wait(io.waitScope); } return kj::mv(request.response); }; for (auto i: kj::indices(PIPELINE_TESTS)) { auto& testCase = PIPELINE_TESTS[i]; auto response = responsePromises[i].wait(io.waitScope); KJ_EXPECT(response.statusCode == testCase.response.statusCode); auto body = response.body->readAllText().wait(io.waitScope); if (testCase.request.method == HttpMethod::HEAD) { KJ_EXPECT(body == ""); } else { KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body); } } client = nullptr; pipe.ends[0]->shutdownWrite(); writeResponsesPromise.wait(io.waitScope); } KJ_TEST("HttpServer pipeline") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; TestHttpService service(PIPELINE_TESTS, table); HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); for (auto& testCase: PIPELINE_TESTS) { KJ_CONTEXT(testCase.request.raw, testCase.response.raw); pipe.ends[1]->write(testCase.request.raw.begin(), testCase.request.raw.size()) .wait(io.waitScope); expectRead(*pipe.ends[1], testCase.response.raw).wait(io.waitScope); } pipe.ends[1]->shutdownWrite(); listenTask.wait(io.waitScope); KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS)); } KJ_TEST("HttpServer parallel pipeline") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); auto allRequestText = kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.request.raw; }, ""); auto allResponseText = kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.response.raw; }, ""); HttpHeaderTable table; TestHttpService service(PIPELINE_TESTS, table); HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); pipe.ends[1]->write(allRequestText.begin(), allRequestText.size()).wait(io.waitScope); pipe.ends[1]->shutdownWrite(); auto rawResponse = pipe.ends[1]->readAllText().wait(io.waitScope); KJ_EXPECT(rawResponse == allResponseText, rawResponse); listenTask.wait(io.waitScope); KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS)); } KJ_TEST("HttpClient <-> HttpServer") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; TestHttpService service(PIPELINE_TESTS, table); HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[1])); auto client = newHttpClient(table, *pipe.ends[0]); for (auto& testCase: PIPELINE_TESTS) { KJ_CONTEXT(testCase.request.raw, testCase.response.raw); HttpHeaders headers(table); for (auto& header: testCase.request.requestHeaders) { headers.set(header.id, header.value); } auto request = client->request( testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize); for (auto& part: testCase.request.requestBodyParts) { request.body->write(part.begin(), part.size()).wait(io.waitScope); } request.body = nullptr; auto response = request.response.wait(io.waitScope); KJ_EXPECT(response.statusCode == testCase.response.statusCode); auto body = response.body->readAllText().wait(io.waitScope); if (testCase.request.method == HttpMethod::HEAD) { KJ_EXPECT(body == ""); } else { KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body); } } client = nullptr; pipe.ends[0]->shutdownWrite(); listenTask.wait(io.waitScope); KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS)); } // ----------------------------------------------------------------------------- KJ_TEST("WebSocket core protocol") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); auto client = newWebSocket(kj::mv(pipe.ends[0]), nullptr); auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); auto mediumString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 30), ""); auto bigString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 10000), ""); auto clientTask = client->send(kj::StringPtr("hello")) .then([&]() { return client->send(mediumString); }) .then([&]() { return client->send(bigString); }) .then([&]() { return client->send(kj::StringPtr("world").asBytes()); }) .then([&]() { return client->close(1234, "bored"); }); { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "hello"); } { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == mediumString); } { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == bigString); } { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::Array<byte>>()); KJ_EXPECT(kj::str(message.get<kj::Array<byte>>().asChars()) == "world"); } { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<WebSocket::Close>()); KJ_EXPECT(message.get<WebSocket::Close>().code == 1234); KJ_EXPECT(message.get<WebSocket::Close>().reason == "bored"); } auto serverTask = server->close(4321, "whatever"); { auto message = client->receive().wait(io.waitScope); KJ_ASSERT(message.is<WebSocket::Close>()); KJ_EXPECT(message.get<WebSocket::Close>().code == 4321); KJ_EXPECT(message.get<WebSocket::Close>().reason == "whatever"); } clientTask.wait(io.waitScope); serverTask.wait(io.waitScope); } KJ_TEST("WebSocket fragmented") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); auto client = kj::mv(pipe.ends[0]); auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); byte DATA[] = { 0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ', 0x00, 0x03, 'w', 'o', 'r', 0x80, 0x02, 'l', 'd', }; auto clientTask = client->write(DATA, sizeof(DATA)); { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "hello world"); } clientTask.wait(io.waitScope); } class FakeEntropySource final: public EntropySource { public: void generate(kj::ArrayPtr<byte> buffer) override { static constexpr byte DUMMY[4] = { 12, 34, 56, 78 }; for (auto i: kj::indices(buffer)) { buffer[i] = DUMMY[i % sizeof(DUMMY)]; } } }; KJ_TEST("WebSocket masked") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); FakeEntropySource maskGenerator; auto client = kj::mv(pipe.ends[0]); auto server = newWebSocket(kj::mv(pipe.ends[1]), maskGenerator); byte DATA[] = { 0x81, 0x86, 12, 34, 56, 78, 'h' ^ 12, 'e' ^ 34, 'l' ^ 56, 'l' ^ 78, 'o' ^ 12, ' ' ^ 34, }; auto clientTask = client->write(DATA, sizeof(DATA)); auto serverTask = server->send(kj::StringPtr("hello ")); { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "hello "); } expectRead(*client, DATA).wait(io.waitScope); clientTask.wait(io.waitScope); serverTask.wait(io.waitScope); } KJ_TEST("WebSocket unsolicited pong") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); auto client = kj::mv(pipe.ends[0]); auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); byte DATA[] = { 0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ', 0x8A, 0x03, 'f', 'o', 'o', 0x80, 0x05, 'w', 'o', 'r', 'l', 'd', }; auto clientTask = client->write(DATA, sizeof(DATA)); { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "hello world"); } clientTask.wait(io.waitScope); } KJ_TEST("WebSocket ping") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); auto client = kj::mv(pipe.ends[0]); auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); // Be extra-annoying by having the ping arrive between fragments. byte DATA[] = { 0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ', 0x89, 0x03, 'f', 'o', 'o', 0x80, 0x05, 'w', 'o', 'r', 'l', 'd', }; auto clientTask = client->write(DATA, sizeof(DATA)); { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "hello world"); } auto serverTask = server->send(kj::StringPtr("bar")); byte EXPECTED[] = { 0x8A, 0x03, 'f', 'o', 'o', // pong 0x81, 0x03, 'b', 'a', 'r', // message }; expectRead(*client, EXPECTED).wait(io.waitScope); clientTask.wait(io.waitScope); serverTask.wait(io.waitScope); } KJ_TEST("WebSocket ping mid-send") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); auto client = kj::mv(pipe.ends[0]); auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), ""); auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr); byte DATA[] = { 0x89, 0x03, 'f', 'o', 'o', // ping 0x81, 0x03, 'b', 'a', 'r', // some other message }; auto clientTask = client->write(DATA, sizeof(DATA)); { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "bar"); } byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 }; expectRead(*client, EXPECTED1).wait(io.waitScope); expectRead(*client, bigString).wait(io.waitScope); byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' }; expectRead(*client, EXPECTED2).wait(io.waitScope); clientTask.wait(io.waitScope); serverTask.wait(io.waitScope); } class UnbufferedPipe final: public AsyncIoStream { // An in-memory one-way pipe with no internal buffer. read() blocks waiting for write()s and // write() blocks waiting for read()s. // // TODO(cleanup): This is probably broadly useful. Put it in a utility library somewhere. // NOTE: Must implement handling of cancellation first! public: kj::Promise<void> write(const void* buffer, size_t size) override { KJ_SWITCH_ONEOF(current) { KJ_CASE_ONEOF(w, CurrentWrite) { KJ_FAIL_REQUIRE("can only call write() once at a time"); } KJ_CASE_ONEOF(r, CurrentRead) { if (size < r.minBytes) { // Write does not complete the current read. memcpy(r.buffer.begin(), buffer, size); r.minBytes -= size; r.alreadyRead += size; r.buffer = r.buffer.slice(size, r.buffer.size()); return kj::READY_NOW; } else if (size <= r.buffer.size()) { // Write satisfies the current read, and read satisfies the write. memcpy(r.buffer.begin(), buffer, size); r.fulfiller->fulfill(r.alreadyRead + size); current = None(); return kj::READY_NOW; } else { // Write satisfies the read and still has more data leftover to write. size_t amount = r.buffer.size(); memcpy(r.buffer.begin(), buffer, amount); r.fulfiller->fulfill(amount + r.alreadyRead); auto paf = kj::newPromiseAndFulfiller<void>(); current = CurrentWrite { kj::arrayPtr(reinterpret_cast<const byte*>(buffer) + amount, size - amount), kj::mv(paf.fulfiller) }; return kj::mv(paf.promise); } } KJ_CASE_ONEOF(e, Eof) { KJ_FAIL_REQUIRE("write after EOF"); } KJ_CASE_ONEOF(n, None) { auto paf = kj::newPromiseAndFulfiller<void>(); current = CurrentWrite { kj::arrayPtr(reinterpret_cast<const byte*>(buffer), size), kj::mv(paf.fulfiller) }; return kj::mv(paf.promise); } } KJ_UNREACHABLE; } kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { KJ_SWITCH_ONEOF(current) { KJ_CASE_ONEOF(w, CurrentWrite) { if (maxBytes < w.buffer.size()) { // Entire read satisfied by write, write is still pending. memcpy(buffer, w.buffer.begin(), maxBytes); w.buffer = w.buffer.slice(maxBytes, w.buffer.size()); return maxBytes; } else if (minBytes <= w.buffer.size()) { // Read is satisfied by write and consumes entire write. size_t result = w.buffer.size(); memcpy(buffer, w.buffer.begin(), result); w.fulfiller->fulfill(); current = None(); return result; } else { // Read consumes entire write and is not satisfied. size_t alreadyRead = w.buffer.size(); memcpy(buffer, w.buffer.begin(), alreadyRead); w.fulfiller->fulfill(); auto paf = kj::newPromiseAndFulfiller<size_t>(); current = CurrentRead { kj::arrayPtr(reinterpret_cast<byte*>(buffer) + alreadyRead, maxBytes - alreadyRead), minBytes - alreadyRead, alreadyRead, kj::mv(paf.fulfiller) }; return kj::mv(paf.promise); } } KJ_CASE_ONEOF(r, CurrentRead) { KJ_FAIL_REQUIRE("can only call read() once at a time"); } KJ_CASE_ONEOF(e, Eof) { return size_t(0); } KJ_CASE_ONEOF(n, None) { auto paf = kj::newPromiseAndFulfiller<size_t>(); current = CurrentRead { kj::arrayPtr(reinterpret_cast<byte*>(buffer), maxBytes), minBytes, 0, kj::mv(paf.fulfiller) }; return kj::mv(paf.promise); } } KJ_UNREACHABLE; } kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override { // TODO(cleanup): Should this be the defalut implementation of this method? if (pieces.size() == 0) return kj::READY_NOW; return write(pieces[0].begin(), pieces[0].size()) .then([this, pieces]() { return write(pieces.slice(1, pieces.size())); }); } void shutdownWrite() override { KJ_SWITCH_ONEOF(current) { KJ_CASE_ONEOF(w, CurrentWrite) { KJ_FAIL_REQUIRE("can't call shutdownWrite() during a write()"); } KJ_CASE_ONEOF(r, CurrentRead) { r.fulfiller->fulfill(kj::mv(r.alreadyRead)); } KJ_CASE_ONEOF(e, Eof) { // ignore } KJ_CASE_ONEOF(n, None) { // ignore } } current = Eof(); } private: struct CurrentWrite { kj::ArrayPtr<const byte> buffer; kj::Own<kj::PromiseFulfiller<void>> fulfiller; }; struct CurrentRead { kj::ArrayPtr<byte> buffer; size_t minBytes; size_t alreadyRead; kj::Own<kj::PromiseFulfiller<size_t>> fulfiller; }; struct Eof {}; struct None {}; kj::OneOf<CurrentWrite, CurrentRead, Eof, None> current = None(); }; class InputOutputPair final: public kj::AsyncIoStream { // Creates an AsyncIoStream out of an AsyncInputStream and an AsyncOutputStream. public: InputOutputPair(kj::AsyncInputStream& in, kj::AsyncIoStream& out) : in(in), out(out) {} kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override { return in.read(buffer, minBytes, maxBytes); } kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { return in.tryRead(buffer, minBytes, maxBytes); } Maybe<uint64_t> tryGetLength() override { return in.tryGetLength(); } Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount = kj::maxValue) override { return in.pumpTo(output, amount); } kj::Promise<void> write(const void* buffer, size_t size) override { return out.write(buffer, size); } kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override { return out.write(pieces); } kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom( kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override { return out.tryPumpFrom(input, amount); } void shutdownWrite() override { return out.shutdownWrite(); } private: kj::AsyncInputStream& in; kj::AsyncIoStream& out; }; KJ_TEST("WebSocket double-ping mid-send") { auto io = kj::setupAsyncIo(); UnbufferedPipe upPipe; UnbufferedPipe downPipe; InputOutputPair client(downPipe, upPipe); auto server = newWebSocket(kj::heap<InputOutputPair>(upPipe, downPipe), nullptr); auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), ""); auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr); byte DATA[] = { 0x89, 0x03, 'f', 'o', 'o', // ping 0x89, 0x03, 'q', 'u', 'x', // ping2 0x81, 0x03, 'b', 'a', 'r', // some other message }; auto clientTask = client.write(DATA, sizeof(DATA)); { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "bar"); } byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 }; expectRead(client, EXPECTED1).wait(io.waitScope); expectRead(client, bigString).wait(io.waitScope); byte EXPECTED2[] = { 0x8A, 0x03, 'q', 'u', 'x' }; expectRead(client, EXPECTED2).wait(io.waitScope); clientTask.wait(io.waitScope); serverTask.wait(io.waitScope); } KJ_TEST("WebSocket ping received during pong send") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); auto client = kj::mv(pipe.ends[0]); auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr); // Send a very large ping so that sending the pong takes a while. Then send a second ping // immediately after. byte PREFIX[] = { 0x89, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 }; auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), ""); byte POSTFIX[] = { 0x89, 0x03, 'f', 'o', 'o', 0x81, 0x03, 'b', 'a', 'r', }; kj::ArrayPtr<const byte> parts[] = {PREFIX, bigString.asBytes(), POSTFIX}; auto clientTask = client->write(parts); { auto message = server->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "bar"); } byte EXPECTED1[] = { 0x8A, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 }; expectRead(*client, EXPECTED1).wait(io.waitScope); expectRead(*client, bigString).wait(io.waitScope); byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' }; expectRead(*client, EXPECTED2).wait(io.waitScope); clientTask.wait(io.waitScope); } class TestWebSocketService final: public HttpService, private kj::TaskSet::ErrorHandler { public: TestWebSocketService(HttpHeaderTable& headerTable, HttpHeaderId hMyHeader) : headerTable(headerTable), hMyHeader(hMyHeader), tasks(*this) {} kj::Promise<void> request( HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, kj::AsyncInputStream& requestBody, Response& response) override { KJ_FAIL_ASSERT("can't get here"); } kj::Promise<void> openWebSocket( kj::StringPtr url, const HttpHeaders& headers, WebSocketResponse& response) override { HttpHeaders responseHeaders(headerTable); KJ_IF_MAYBE(h, headers.get(hMyHeader)) { responseHeaders.set(hMyHeader, kj::str("respond-", *h)); } if (url == "/return-error") { response.send(404, "Not Found", responseHeaders, uint64_t(0)); return kj::READY_NOW; } else if (url == "/ws-inline") { auto ws = response.acceptWebSocket(responseHeaders); return doWebSocket(*ws, "start-inline").attach(kj::mv(ws)); } else if (url == "/ws-detached") { auto ws = response.acceptWebSocket(responseHeaders); tasks.add(doWebSocket(*ws, "start-detached").attach(kj::mv(ws))); return kj::READY_NOW; } else { KJ_FAIL_ASSERT("unexpected path", url); } } private: HttpHeaderTable& headerTable; HttpHeaderId hMyHeader; kj::TaskSet tasks; void taskFailed(kj::Exception&& exception) override { KJ_LOG(ERROR, exception); } static kj::Promise<void> doWebSocket(WebSocket& ws, kj::StringPtr message) { auto copy = kj::str(message); return ws.send(copy).attach(kj::mv(copy)) .then([&ws]() { return ws.receive(); }).then([&ws](WebSocket::Message&& message) { KJ_SWITCH_ONEOF(message) { KJ_CASE_ONEOF(str, kj::String) { return doWebSocket(ws, kj::str("reply:", str)); } KJ_CASE_ONEOF(data, kj::Array<byte>) { return doWebSocket(ws, kj::str("reply:", data)); } KJ_CASE_ONEOF(close, WebSocket::Close) { auto reason = kj::str("close-reply:", close.reason); return ws.close(close.code + 1, reason).attach(kj::mv(reason)); } } KJ_UNREACHABLE; }); } }; const char WEBSOCKET_REQUEST_HANDSHAKE[] = " HTTP/1.1\r\n" "Connection: Upgrade\r\n" "Upgrade: websocket\r\n" "Sec-WebSocket-Key: DCI4TgwiOE4MIjhODCI4Tg==\r\n" "Sec-WebSocket-Version: 13\r\n" "My-Header: foo\r\n" "\r\n"; const char WEBSOCKET_RESPONSE_HANDSHAKE[] = "HTTP/1.1 101 Switching Protocols\r\n" "Connection: Upgrade\r\n" "Upgrade: websocket\r\n" "Sec-WebSocket-Accept: pShtIFKT0s8RYZvnWY/CrjQD8CM=\r\n" "My-Header: respond-foo\r\n" "\r\n"; const char WEBSOCKET_RESPONSE_HANDSHAKE_ERROR[] = "HTTP/1.1 404 Not Found\r\n" "Content-Length: 0\r\n" "My-Header: respond-foo\r\n" "\r\n"; const byte WEBSOCKET_FIRST_MESSAGE_INLINE[] = { 0x81, 0x0c, 's','t','a','r','t','-','i','n','l','i','n','e' }; const byte WEBSOCKET_FIRST_MESSAGE_DETACHED[] = { 0x81, 0x0e, 's','t','a','r','t','-','d','e','t','a','c','h','e','d' }; const byte WEBSOCKET_SEND_MESSAGE[] = { 0x81, 0x83, 12, 34, 56, 78, 'b'^12, 'a'^34, 'r'^56 }; const byte WEBSOCKET_REPLY_MESSAGE[] = { 0x81, 0x09, 'r','e','p','l','y',':','b','a','r' }; const byte WEBSOCKET_SEND_CLOSE[] = { 0x88, 0x85, 12, 34, 56, 78, 0x12^12, 0x34^34, 'q'^56, 'u'^78, 'x'^12 }; const byte WEBSOCKET_REPLY_CLOSE[] = { 0x88, 0x11, 0x12, 0x35, 'c','l','o','s','e','-','r','e','p','l','y',':','q','u','x' }; template <size_t s> kj::ArrayPtr<const byte> asBytes(const char (&chars)[s]) { return kj::ArrayPtr<const char>(chars, s - 1).asBytes(); } KJ_TEST("HttpClient WebSocket handshake") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE); auto serverTask = expectRead(*pipe.ends[1], request) .then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); }) .then([&]() { return pipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); }) .then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE); }) .then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); }) .then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_CLOSE); }) .then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); }) .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); HttpHeaderTable::Builder tableBuilder; HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); auto headerTable = tableBuilder.build(); FakeEntropySource entropySource; auto client = newHttpClient(*headerTable, *pipe.ends[0], entropySource); kj::HttpHeaders headers(*headerTable); headers.set(hMyHeader, "foo"); auto response = client->openWebSocket("/websocket", headers).wait(io.waitScope); KJ_EXPECT(response.statusCode == 101); KJ_EXPECT(response.statusText == "Switching Protocols", response.statusText); KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo"); KJ_ASSERT(response.webSocketOrBody.is<kj::Own<WebSocket>>()); auto ws = kj::mv(response.webSocketOrBody.get<kj::Own<WebSocket>>()); { auto message = ws->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "start-inline"); } ws->send(kj::StringPtr("bar")).wait(io.waitScope); { auto message = ws->receive().wait(io.waitScope); KJ_ASSERT(message.is<kj::String>()); KJ_EXPECT(message.get<kj::String>() == "reply:bar"); } ws->close(0x1234, "qux").wait(io.waitScope); { auto message = ws->receive().wait(io.waitScope); KJ_ASSERT(message.is<WebSocket::Close>()); KJ_EXPECT(message.get<WebSocket::Close>().code == 0x1235); KJ_EXPECT(message.get<WebSocket::Close>().reason == "close-reply:qux"); } serverTask.wait(io.waitScope); } KJ_TEST("HttpClient WebSocket error") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE); auto serverTask = expectRead(*pipe.ends[1], request) .then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)}); }) .then([&]() { return expectRead(*pipe.ends[1], request); }) .then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)}); }) .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); HttpHeaderTable::Builder tableBuilder; HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); auto headerTable = tableBuilder.build(); FakeEntropySource entropySource; auto client = newHttpClient(*headerTable, *pipe.ends[0], entropySource); kj::HttpHeaders headers(*headerTable); headers.set(hMyHeader, "foo"); { auto response = client->openWebSocket("/websocket", headers).wait(io.waitScope); KJ_EXPECT(response.statusCode == 404); KJ_EXPECT(response.statusText == "Not Found", response.statusText); KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo"); KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>()); } { auto response = client->openWebSocket("/websocket", headers).wait(io.waitScope); KJ_EXPECT(response.statusCode == 404); KJ_EXPECT(response.statusText == "Not Found", response.statusText); KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo"); KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>()); } serverTask.wait(io.waitScope); } KJ_TEST("HttpServer WebSocket handshake") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable::Builder tableBuilder; HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); auto headerTable = tableBuilder.build(); TestWebSocketService service(*headerTable, hMyHeader); HttpServer server(io.provider->getTimer(), *headerTable, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); auto request = kj::str("GET /ws-inline", WEBSOCKET_REQUEST_HANDSHAKE); pipe.ends[1]->write({request.asBytes()}).wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(io.waitScope); pipe.ends[1]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(io.waitScope); pipe.ends[1]->write({WEBSOCKET_SEND_CLOSE}).wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(io.waitScope); listenTask.wait(io.waitScope); } KJ_TEST("HttpServer WebSocket handshake detached") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable::Builder tableBuilder; HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); auto headerTable = tableBuilder.build(); TestWebSocketService service(*headerTable, hMyHeader); HttpServer server(io.provider->getTimer(), *headerTable, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); auto request = kj::str("GET /ws-detached", WEBSOCKET_REQUEST_HANDSHAKE); pipe.ends[1]->write({request.asBytes()}).wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope); listenTask.wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_DETACHED).wait(io.waitScope); pipe.ends[1]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(io.waitScope); pipe.ends[1]->write({WEBSOCKET_SEND_CLOSE}).wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(io.waitScope); } KJ_TEST("HttpServer WebSocket handshake error") { auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable::Builder tableBuilder; HttpHeaderId hMyHeader = tableBuilder.add("My-Header"); auto headerTable = tableBuilder.build(); TestWebSocketService service(*headerTable, hMyHeader); HttpServer server(io.provider->getTimer(), *headerTable, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); auto request = kj::str("GET /return-error", WEBSOCKET_REQUEST_HANDSHAKE); pipe.ends[1]->write({request.asBytes()}).wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(io.waitScope); // Can send more requests! pipe.ends[1]->write({request.asBytes()}).wait(io.waitScope); expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(io.waitScope); pipe.ends[1]->shutdownWrite(); listenTask.wait(io.waitScope); } // ----------------------------------------------------------------------------- KJ_TEST("HttpServer request timeout") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; TestHttpService service(PIPELINE_TESTS, table); HttpServerSettings settings; settings.headerTimeout = 1 * kj::MILLISECONDS; HttpServer server(io.provider->getTimer(), table, service, settings); // Shouldn't hang! Should time out. server.listenHttp(kj::mv(pipe.ends[0])).wait(io.waitScope); // Closes the connection without sending anything. KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == ""); } KJ_TEST("HttpServer pipeline timeout") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; TestHttpService service(PIPELINE_TESTS, table); HttpServerSettings settings; settings.pipelineTimeout = 1 * kj::MILLISECONDS; HttpServer server(io.provider->getTimer(), table, service, settings); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); // Do one request. pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) .wait(io.waitScope); expectRead(*pipe.ends[1], PIPELINE_TESTS[0].response.raw).wait(io.waitScope); // Listen task should time out even though we didn't shutdown the socket. listenTask.wait(io.waitScope); // In this case, no data is sent back. KJ_EXPECT(pipe.ends[1]->readAllText().wait(io.waitScope) == ""); } class BrokenHttpService final: public HttpService { // HttpService that doesn't send a response. public: BrokenHttpService() = default; explicit BrokenHttpService(kj::Exception&& exception): exception(kj::mv(exception)) {} kj::Promise<void> request( HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, kj::AsyncInputStream& requestBody, Response& responseSender) override { return requestBody.readAllBytes().then([this](kj::Array<byte>&&) -> kj::Promise<void> { KJ_IF_MAYBE(e, exception) { return kj::cp(*e); } else { return kj::READY_NOW; } }); } private: kj::Maybe<kj::Exception> exception; }; KJ_TEST("HttpServer no response") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; BrokenHttpService service; HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); // Do one request. pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) .wait(io.waitScope); auto text = pipe.ends[1]->readAllText().wait(io.waitScope); KJ_EXPECT(text == "HTTP/1.1 500 Internal Server Error\r\n" "Connection: close\r\n" "Content-Length: 51\r\n" "Content-Type: text/plain\r\n" "\r\n" "ERROR: The HttpService did not generate a response.", text); } KJ_TEST("HttpServer disconnected") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; BrokenHttpService service(KJ_EXCEPTION(DISCONNECTED, "disconnected")); HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); // Do one request. pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) .wait(io.waitScope); auto text = pipe.ends[1]->readAllText().wait(io.waitScope); KJ_EXPECT(text == "", text); } KJ_TEST("HttpServer overloaded") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; BrokenHttpService service(KJ_EXCEPTION(OVERLOADED, "overloaded")); HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); // Do one request. pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) .wait(io.waitScope); auto text = pipe.ends[1]->readAllText().wait(io.waitScope); KJ_EXPECT(text.startsWith("HTTP/1.1 503 Service Unavailable"), text); } KJ_TEST("HttpServer unimplemented") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; BrokenHttpService service(KJ_EXCEPTION(UNIMPLEMENTED, "unimplemented")); HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); // Do one request. pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) .wait(io.waitScope); auto text = pipe.ends[1]->readAllText().wait(io.waitScope); KJ_EXPECT(text.startsWith("HTTP/1.1 501 Not Implemented"), text); } KJ_TEST("HttpServer threw exception") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; BrokenHttpService service(KJ_EXCEPTION(FAILED, "failed")); HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); // Do one request. pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) .wait(io.waitScope); auto text = pipe.ends[1]->readAllText().wait(io.waitScope); KJ_EXPECT(text.startsWith("HTTP/1.1 500 Internal Server Error"), text); } class PartialResponseService final: public HttpService { // HttpService that sends a partial response then throws. public: kj::Promise<void> request( HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, kj::AsyncInputStream& requestBody, Response& response) override { return requestBody.readAllBytes() .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> { HttpHeaders headers(table); auto body = response.send(200, "OK", headers, 32); auto promise = body->write("foo", 3); return promise.attach(kj::mv(body)).then([]() -> kj::Promise<void> { return KJ_EXCEPTION(FAILED, "failed"); }); }); } private: kj::Maybe<kj::Exception> exception; HttpHeaderTable table; }; KJ_TEST("HttpServer threw exception after starting response") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; PartialResponseService service; HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); KJ_EXPECT_LOG(ERROR, "HttpService threw exception after generating a partial response"); // Do one request. pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) .wait(io.waitScope); auto text = pipe.ends[1]->readAllText().wait(io.waitScope); KJ_EXPECT(text == "HTTP/1.1 200 OK\r\n" "Content-Length: 32\r\n" "\r\n" "foo", text); } class SimpleInputStream final: public kj::AsyncInputStream { // An InputStream that returns bytes out of a static string. public: SimpleInputStream(kj::StringPtr text) : unread(text.asBytes()) {} kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { size_t amount = kj::min(maxBytes, unread.size()); memcpy(buffer, unread.begin(), amount); unread = unread.slice(amount, unread.size()); return amount; } private: kj::ArrayPtr<const byte> unread; }; class PumpResponseService final: public HttpService { // HttpService that uses pumpTo() to write a response, without carefully specifying how much to // pump, but the stream happens to be the right size. public: kj::Promise<void> request( HttpMethod method, kj::StringPtr url, const HttpHeaders& headers, kj::AsyncInputStream& requestBody, Response& response) override { return requestBody.readAllBytes() .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> { HttpHeaders headers(table); kj::StringPtr text = "Hello, World!"; auto body = response.send(200, "OK", headers, text.size()); auto stream = kj::heap<SimpleInputStream>(text); auto promise = stream->pumpTo(*body); return promise.attach(kj::mv(body), kj::mv(stream)) .then([text](uint64_t amount) { KJ_EXPECT(amount == text.size()); }); }); } private: kj::Maybe<kj::Exception> exception; HttpHeaderTable table; }; KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto pipe = io.provider->newTwoWayPipe(); HttpHeaderTable table; PumpResponseService service; HttpServer server(io.provider->getTimer(), table, service); auto listenTask = server.listenHttp(kj::mv(pipe.ends[0])); // Do one request. pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size()) .wait(io.waitScope); pipe.ends[1]->shutdownWrite(); auto text = pipe.ends[1]->readAllText().wait(io.waitScope); KJ_EXPECT(text == "HTTP/1.1 200 OK\r\n" "Content-Length: 13\r\n" "\r\n" "Hello, World!", text); } // ----------------------------------------------------------------------------- KJ_TEST("newHttpService from HttpClient") { auto PIPELINE_TESTS = pipelineTestCases(); auto io = kj::setupAsyncIo(); auto frontPipe = io.provider->newTwoWayPipe(); auto backPipe = io.provider->newTwoWayPipe(); kj::Promise<void> writeResponsesPromise = kj::READY_NOW; for (auto& testCase: PIPELINE_TESTS) { writeResponsesPromise = writeResponsesPromise .then([&]() { return expectRead(*backPipe.ends[1], testCase.request.raw); }).then([&]() { return backPipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size()); }); } { HttpHeaderTable table; auto backClient = newHttpClient(table, *backPipe.ends[0]); auto frontService = newHttpService(*backClient); HttpServer frontServer(io.provider->getTimer(), table, *frontService); auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1])); for (auto& testCase: PIPELINE_TESTS) { KJ_CONTEXT(testCase.request.raw, testCase.response.raw); frontPipe.ends[0]->write(testCase.request.raw.begin(), testCase.request.raw.size()) .wait(io.waitScope); expectRead(*frontPipe.ends[0], testCase.response.raw).wait(io.waitScope); } frontPipe.ends[0]->shutdownWrite(); listenTask.wait(io.waitScope); } backPipe.ends[0]->shutdownWrite(); writeResponsesPromise.wait(io.waitScope); } KJ_TEST("newHttpService from HttpClient WebSockets") { auto io = kj::setupAsyncIo(); auto frontPipe = io.provider->newTwoWayPipe(); auto backPipe = io.provider->newTwoWayPipe(); auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE); auto writeResponsesPromise = expectRead(*backPipe.ends[1], request) .then([&]() { return backPipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); }) .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); }) .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); }) .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); }) .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_CLOSE); }) .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); }) // expect EOF .then([&]() { return backPipe.ends[1]->readAllBytes(); }) .then([&](kj::ArrayPtr<byte> content) { KJ_EXPECT(content.size() == 0); // Send EOF. backPipe.ends[1]->shutdownWrite(); }) .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); { HttpHeaderTable table; FakeEntropySource entropySource; auto backClient = newHttpClient(table, *backPipe.ends[0], entropySource); auto frontService = newHttpService(*backClient); HttpServer frontServer(io.provider->getTimer(), table, *frontService); auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1])); frontPipe.ends[0]->write({request.asBytes()}).wait(io.waitScope); expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope); expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(io.waitScope); frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope); expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_MESSAGE).wait(io.waitScope); frontPipe.ends[0]->write({WEBSOCKET_SEND_CLOSE}).wait(io.waitScope); expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_CLOSE).wait(io.waitScope); frontPipe.ends[0]->shutdownWrite(); listenTask.wait(io.waitScope); } writeResponsesPromise.wait(io.waitScope); } KJ_TEST("newHttpService from HttpClient WebSockets disconnect") { auto io = kj::setupAsyncIo(); auto frontPipe = io.provider->newTwoWayPipe(); auto backPipe = io.provider->newTwoWayPipe(); auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE); auto writeResponsesPromise = expectRead(*backPipe.ends[1], request) .then([&]() { return backPipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); }) .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); }) .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); }) .then([&]() { backPipe.ends[1]->shutdownWrite(); }) .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); }); { HttpHeaderTable table; FakeEntropySource entropySource; auto backClient = newHttpClient(table, *backPipe.ends[0], entropySource); auto frontService = newHttpService(*backClient); HttpServer frontServer(io.provider->getTimer(), table, *frontService); auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1])); frontPipe.ends[0]->write({request.asBytes()}).wait(io.waitScope); expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(io.waitScope); expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(io.waitScope); frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(io.waitScope); KJ_EXPECT(frontPipe.ends[0]->readAllText().wait(io.waitScope) == ""); frontPipe.ends[0]->shutdownWrite(); listenTask.wait(io.waitScope); } writeResponsesPromise.wait(io.waitScope); } // ----------------------------------------------------------------------------- KJ_TEST("HttpClient to capnproto.org") { auto io = kj::setupAsyncIo(); auto maybeConn = io.provider->getNetwork().parseAddress("capnproto.org", 80) .then([](kj::Own<kj::NetworkAddress> addr) { auto promise = addr->connect(); return promise.attach(kj::mv(addr)); }).then([](kj::Own<kj::AsyncIoStream>&& connection) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> { return kj::mv(connection); }, [](kj::Exception&& e) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> { KJ_LOG(WARNING, "skipping test because couldn't connect to capnproto.org"); return nullptr; }).wait(io.waitScope); KJ_IF_MAYBE(conn, maybeConn) { // Successfully connected to capnproto.org. Try doing GET /. We expect to get a redirect to // HTTPS, because what kind of horrible web site would serve in plaintext, really? HttpHeaderTable table; auto client = newHttpClient(table, **conn); HttpHeaders headers(table); headers.set(HttpHeaderId::HOST, "capnproto.org"); auto response = client->request(HttpMethod::GET, "/", headers).response.wait(io.waitScope); KJ_EXPECT(response.statusCode / 100 == 3); auto location = KJ_ASSERT_NONNULL(response.headers->get(HttpHeaderId::LOCATION)); KJ_EXPECT(location == "https://capnproto.org/"); auto body = response.body->readAllText().wait(io.waitScope); } } } // namespace } // namespace kj