// Copyright (c) 2017 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

#define KJ_TESTING_KJ 1

#include "http.h"
#include <kj/debug.h>
#include <kj/test.h>
#include <map>

namespace kj {
namespace {

KJ_TEST("HttpMethod parse / stringify") {
#define TRY(name) \
  KJ_EXPECT(kj::str(HttpMethod::name) == #name); \
  KJ_IF_MAYBE(parsed, tryParseHttpMethod(#name)) { \
    KJ_EXPECT(*parsed == HttpMethod::name); \
  } else { \
    KJ_FAIL_EXPECT("couldn't parse \"" #name "\" as HttpMethod"); \
  }

  KJ_HTTP_FOR_EACH_METHOD(TRY)
#undef TRY

  KJ_EXPECT(tryParseHttpMethod("FOO") == nullptr);
  KJ_EXPECT(tryParseHttpMethod("") == nullptr);
  KJ_EXPECT(tryParseHttpMethod("G") == nullptr);
  KJ_EXPECT(tryParseHttpMethod("GE") == nullptr);
  KJ_EXPECT(tryParseHttpMethod("GET ") == nullptr);
  KJ_EXPECT(tryParseHttpMethod("get") == nullptr);
}

KJ_TEST("HttpHeaderTable") {
  HttpHeaderTable::Builder builder;

  auto host = builder.add("Host");
  auto host2 = builder.add("hOsT");
  auto fooBar = builder.add("Foo-Bar");
  auto bazQux = builder.add("baz-qux");
  auto bazQux2 = builder.add("Baz-Qux");

  auto table = builder.build();

  uint builtinHeaderCount = 0;
#define INCREMENT(id, name) ++builtinHeaderCount;
  KJ_HTTP_FOR_EACH_BUILTIN_HEADER(INCREMENT)
#undef INCREMENT

  KJ_EXPECT(table->idCount() == builtinHeaderCount + 2);

  KJ_EXPECT(host == HttpHeaderId::HOST);
  KJ_EXPECT(host != HttpHeaderId::DATE);
  KJ_EXPECT(host2 == host);

  KJ_EXPECT(host != fooBar);
  KJ_EXPECT(host != bazQux);
  KJ_EXPECT(fooBar != bazQux);
  KJ_EXPECT(bazQux == bazQux2);

  KJ_EXPECT(kj::str(host) == "Host");
  KJ_EXPECT(kj::str(host2) == "Host");
  KJ_EXPECT(kj::str(fooBar) == "Foo-Bar");
  KJ_EXPECT(kj::str(bazQux) == "baz-qux");
  KJ_EXPECT(kj::str(HttpHeaderId::HOST) == "Host");

  KJ_EXPECT(table->idToString(HttpHeaderId::DATE) == "Date");
  KJ_EXPECT(table->idToString(fooBar) == "Foo-Bar");

  KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Date")) == HttpHeaderId::DATE);
  KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("dATE")) == HttpHeaderId::DATE);
  KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("Foo-Bar")) == fooBar);
  KJ_EXPECT(KJ_ASSERT_NONNULL(table->stringToId("foo-BAR")) == fooBar);
  KJ_EXPECT(table->stringToId("foobar") == nullptr);
  KJ_EXPECT(table->stringToId("barfoo") == nullptr);
}

KJ_TEST("HttpHeaders::parseRequest") {
  HttpHeaderTable::Builder builder;

  auto fooBar = builder.add("Foo-Bar");
  auto bazQux = builder.add("baz-qux");

  auto table = builder.build();

  HttpHeaders headers(*table);
  auto text = kj::heapString(
      "POST   /some/path \t   HTTP/1.1\r\n"
      "Foo-BaR: Baz\r\n"
      "Host: example.com\r\n"
      "Content-Length: 123\r\n"
      "DATE:     early\r\n"
      "other-Header: yep\r\n"
      "\r\n");
  auto result = KJ_ASSERT_NONNULL(headers.tryParseRequest(text.asArray()));

  KJ_EXPECT(result.method == HttpMethod::POST);
  KJ_EXPECT(result.url == "/some/path");
  KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com");
  KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early");
  KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz");
  KJ_EXPECT(headers.get(bazQux) == nullptr);
  KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr);
  KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::CONTENT_LENGTH)) == "123");
  KJ_EXPECT(headers.get(HttpHeaderId::TRANSFER_ENCODING) == nullptr);

  std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders;
  headers.forEach([&](kj::StringPtr name, kj::StringPtr value) {
    KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second);
  });
  KJ_EXPECT(unpackedHeaders.size() == 5);
  KJ_EXPECT(unpackedHeaders["Content-Length"] == "123");
  KJ_EXPECT(unpackedHeaders["Host"] == "example.com");
  KJ_EXPECT(unpackedHeaders["Date"] == "early");
  KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz");
  KJ_EXPECT(unpackedHeaders["other-Header"] == "yep");

  KJ_EXPECT(headers.serializeRequest(result.method, result.url) ==
      "POST /some/path HTTP/1.1\r\n"
      "Content-Length: 123\r\n"
      "Host: example.com\r\n"
      "Date: early\r\n"
      "Foo-Bar: Baz\r\n"
      "other-Header: yep\r\n"
      "\r\n");
}

KJ_TEST("HttpHeaders::parseResponse") {
  HttpHeaderTable::Builder builder;

  auto fooBar = builder.add("Foo-Bar");
  auto bazQux = builder.add("baz-qux");

  auto table = builder.build();

  HttpHeaders headers(*table);
  auto text = kj::heapString(
      "HTTP/1.1\t\t  418\t    I'm a teapot\r\n"
      "Foo-BaR: Baz\r\n"
      "Host: example.com\r\n"
      "Content-Length: 123\r\n"
      "DATE:     early\r\n"
      "other-Header: yep\r\n"
      "\r\n");
  auto result = KJ_ASSERT_NONNULL(headers.tryParseResponse(text.asArray()));

  KJ_EXPECT(result.statusCode == 418);
  KJ_EXPECT(result.statusText == "I'm a teapot");
  KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::HOST)) == "example.com");
  KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::DATE)) == "early");
  KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(fooBar)) == "Baz");
  KJ_EXPECT(headers.get(bazQux) == nullptr);
  KJ_EXPECT(headers.get(HttpHeaderId::CONTENT_TYPE) == nullptr);
  KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(HttpHeaderId::CONTENT_LENGTH)) == "123");
  KJ_EXPECT(headers.get(HttpHeaderId::TRANSFER_ENCODING) == nullptr);

  std::map<kj::StringPtr, kj::StringPtr> unpackedHeaders;
  headers.forEach([&](kj::StringPtr name, kj::StringPtr value) {
    KJ_EXPECT(unpackedHeaders.insert(std::make_pair(name, value)).second);
  });
  KJ_EXPECT(unpackedHeaders.size() == 5);
  KJ_EXPECT(unpackedHeaders["Content-Length"] == "123");
  KJ_EXPECT(unpackedHeaders["Host"] == "example.com");
  KJ_EXPECT(unpackedHeaders["Date"] == "early");
  KJ_EXPECT(unpackedHeaders["Foo-Bar"] == "Baz");
  KJ_EXPECT(unpackedHeaders["other-Header"] == "yep");

  KJ_EXPECT(headers.serializeResponse(
        result.statusCode, result.statusText) ==
      "HTTP/1.1 418 I'm a teapot\r\n"
      "Content-Length: 123\r\n"
      "Host: example.com\r\n"
      "Date: early\r\n"
      "Foo-Bar: Baz\r\n"
      "other-Header: yep\r\n"
      "\r\n");
}

KJ_TEST("HttpHeaders parse invalid") {
  auto table = HttpHeaderTable::Builder().build();
  HttpHeaders headers(*table);

  // NUL byte in request.
  KJ_EXPECT(headers.tryParseRequest(kj::heapString(
      "POST  \0 /some/path \t   HTTP/1.1\r\n"
      "Foo-BaR: Baz\r\n"
      "Host: example.com\r\n"
      "DATE:     early\r\n"
      "other-Header: yep\r\n"
      "\r\n")) == nullptr);

  // Control character in header name.
  KJ_EXPECT(headers.tryParseRequest(kj::heapString(
      "POST   /some/path \t   HTTP/1.1\r\n"
      "Foo-BaR: Baz\r\n"
      "Cont\001ent-Length: 123\r\n"
      "DATE:     early\r\n"
      "other-Header: yep\r\n"
      "\r\n")) == nullptr);

  // Separator character in header name.
  KJ_EXPECT(headers.tryParseRequest(kj::heapString(
      "POST   /some/path \t   HTTP/1.1\r\n"
      "Foo-BaR: Baz\r\n"
      "Host: example.com\r\n"
      "DATE/:     early\r\n"
      "other-Header: yep\r\n"
      "\r\n")) == nullptr);

  // Response status code not numeric.
  KJ_EXPECT(headers.tryParseResponse(kj::heapString(
      "HTTP/1.1\t\t  abc\t    I'm a teapot\r\n"
      "Foo-BaR: Baz\r\n"
      "Host: example.com\r\n"
      "DATE:     early\r\n"
      "other-Header: yep\r\n"
      "\r\n")) == nullptr);
}

KJ_TEST("HttpHeaders validation") {
  auto table = HttpHeaderTable::Builder().build();
  HttpHeaders headers(*table);

  headers.add("Valid-Name", "valid value");

  // The HTTP RFC prohibits control characters, but browsers only prohibit \0, \r, and \n. KJ goes
  // with the browsers for compatibility.
  headers.add("Valid-Name", "valid\x01value");

  // The HTTP RFC does not permit non-ASCII values.
  // KJ chooses to interpret them as UTF-8, to avoid the need for any expensive conversion.
  // Browsers apparently interpret them as LATIN-1. Applications can reinterpet these strings as
  // LATIN-1 easily enough if they really need to.
  headers.add("Valid-Name", u8"valid€value");

  KJ_EXPECT_THROW_MESSAGE("invalid header name", headers.add("Invalid Name", "value"));
  KJ_EXPECT_THROW_MESSAGE("invalid header name", headers.add("Invalid@Name", "value"));

  KJ_EXPECT_THROW_MESSAGE("invalid header value", headers.set(HttpHeaderId::HOST, "in\nvalid"));
  KJ_EXPECT_THROW_MESSAGE("invalid header value", headers.add("Valid-Name", "in\nvalid"));
}

// =======================================================================================

class ReadFragmenter final: public kj::AsyncIoStream {
public:
  ReadFragmenter(AsyncIoStream& inner, size_t limit): inner(inner), limit(limit) {}

  Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
    return inner.read(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes)));
  }
  Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    return inner.tryRead(buffer, minBytes, kj::max(minBytes, kj::min(limit, maxBytes)));
  }

  Maybe<uint64_t> tryGetLength() override { return inner.tryGetLength(); }

  Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount) override {
    return inner.pumpTo(output, amount);
  }

  Promise<void> write(const void* buffer, size_t size) override {
    return inner.write(buffer, size);
  }
  Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
    return inner.write(pieces);
  }

  Maybe<Promise<uint64_t>> tryPumpFrom(AsyncInputStream& input, uint64_t amount) override {
    return inner.tryPumpFrom(input, amount);
  }

  void shutdownWrite() override {
    return inner.shutdownWrite();
  }

  void abortRead() override { return inner.abortRead(); }

  void getsockopt(int level, int option, void* value, uint* length) override {
    return inner.getsockopt(level, option, value, length);
  }
  void setsockopt(int level, int option, const void* value, uint length) override {
    return inner.setsockopt(level, option, value, length);
  }

  void getsockname(struct sockaddr* addr, uint* length) override {
    return inner.getsockname(addr, length);
  }
  void getpeername(struct sockaddr* addr, uint* length) override {
    return inner.getsockname(addr, length);
  }

private:
  kj::AsyncIoStream& inner;
  size_t limit;
};

template <typename T>
class InitializeableArray: public Array<T> {
public:
  InitializeableArray(std::initializer_list<T> init)
      : Array<T>(kj::heapArray(init)) {}
};

enum Side { BOTH, CLIENT_ONLY, SERVER_ONLY };

struct HeaderTestCase {
  HttpHeaderId id;
  kj::StringPtr value;
};

struct HttpRequestTestCase {
  kj::StringPtr raw;

  HttpMethod method;
  kj::StringPtr path;
  InitializeableArray<HeaderTestCase> requestHeaders;
  kj::Maybe<uint64_t> requestBodySize;
  InitializeableArray<kj::StringPtr> requestBodyParts;

  Side side = BOTH;

  // TODO(cleanup): Delete this constructor if/when we move to C++14.
  HttpRequestTestCase(kj::StringPtr raw, HttpMethod method, kj::StringPtr path,
                      InitializeableArray<HeaderTestCase> requestHeaders,
                      kj::Maybe<uint64_t> requestBodySize,
                      InitializeableArray<kj::StringPtr> requestBodyParts,
                      Side side = BOTH)
      : raw(raw), method(method), path(path), requestHeaders(kj::mv(requestHeaders)),
        requestBodySize(requestBodySize), requestBodyParts(kj::mv(requestBodyParts)),
        side(side) {}
};

struct HttpResponseTestCase {
  kj::StringPtr raw;

  uint64_t statusCode;
  kj::StringPtr statusText;
  InitializeableArray<HeaderTestCase> responseHeaders;
  kj::Maybe<uint64_t> responseBodySize;
  InitializeableArray<kj::StringPtr> responseBodyParts;

  HttpMethod method = HttpMethod::GET;

  Side side = BOTH;

  // TODO(cleanup): Delete this constructor if/when we move to C++14.
  HttpResponseTestCase(kj::StringPtr raw, uint64_t statusCode, kj::StringPtr statusText,
                       InitializeableArray<HeaderTestCase> responseHeaders,
                       kj::Maybe<uint64_t> responseBodySize,
                       InitializeableArray<kj::StringPtr> responseBodyParts,
                       HttpMethod method = HttpMethod::GET,
                       Side side = BOTH)
      : raw(raw), statusCode(statusCode), statusText(statusText),
        responseHeaders(kj::mv(responseHeaders)), responseBodySize(responseBodySize),
        responseBodyParts(kj::mv(responseBodyParts)), method(method), side(side) {}
};

struct HttpTestCase {
  HttpRequestTestCase request;
  HttpResponseTestCase response;
};

kj::Promise<void> writeEach(kj::AsyncOutputStream& out, kj::ArrayPtr<const kj::StringPtr> parts) {
  if (parts.size() == 0) return kj::READY_NOW;

  return out.write(parts[0].begin(), parts[0].size())
      .then([&out,parts]() {
    return writeEach(out, parts.slice(1, parts.size()));
  });
}

kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::StringPtr expected) {
  if (expected.size() == 0) return kj::READY_NOW;

  auto buffer = kj::heapArray<char>(expected.size());

  auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
  return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<char> buffer, size_t amount) {
    if (amount == 0) {
      KJ_FAIL_ASSERT("expected data never sent", expected);
    }

    auto actual = buffer.slice(0, amount);
    if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
      KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
    }

    return expectRead(in, expected.slice(amount));
  }));
}

kj::Promise<void> expectRead(kj::AsyncInputStream& in, kj::ArrayPtr<const byte> expected) {
  if (expected.size() == 0) return kj::READY_NOW;

  auto buffer = kj::heapArray<byte>(expected.size());

  auto promise = in.tryRead(buffer.begin(), 1, buffer.size());
  return promise.then(kj::mvCapture(buffer, [&in,expected](kj::Array<byte> buffer, size_t amount) {
    if (amount == 0) {
      KJ_FAIL_ASSERT("expected data never sent", expected);
    }

    auto actual = buffer.slice(0, amount);
    if (memcmp(actual.begin(), expected.begin(), actual.size()) != 0) {
      KJ_FAIL_ASSERT("data from stream doesn't match expected", expected, actual);
    }

    return expectRead(in, expected.slice(amount, expected.size()));
  }));
}

void testHttpClientRequest(kj::WaitScope& waitScope, const HttpRequestTestCase& testCase) {
  auto pipe = kj::newTwoWayPipe();

  auto serverTask = expectRead(*pipe.ends[1], testCase.raw).then([&]() {
    static const char SIMPLE_RESPONSE[] =
        "HTTP/1.1 200 OK\r\n"
        "Content-Length: 0\r\n"
        "\r\n";
    return pipe.ends[1]->write(SIMPLE_RESPONSE, strlen(SIMPLE_RESPONSE));
  }).then([&]() -> kj::Promise<void> {
    return kj::NEVER_DONE;
  });

  HttpHeaderTable table;
  auto client = newHttpClient(table, *pipe.ends[0]);

  HttpHeaders headers(table);
  for (auto& header: testCase.requestHeaders) {
    headers.set(header.id, header.value);
  }

  auto request = client->request(testCase.method, testCase.path, headers, testCase.requestBodySize);
  if (testCase.requestBodyParts.size() > 0) {
    writeEach(*request.body, testCase.requestBodyParts).wait(waitScope);
  }
  request.body = nullptr;
  auto clientTask = request.response
      .then([&](HttpClient::Response&& response) {
    auto promise = response.body->readAllText();
    return promise.attach(kj::mv(response.body));
  }).ignoreResult();

  serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope);

  // Verify no more data written by client.
  client = nullptr;
  pipe.ends[0]->shutdownWrite();
  KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}

void testHttpClientResponse(kj::WaitScope& waitScope, const HttpResponseTestCase& testCase,
                            size_t readFragmentSize) {
  auto pipe = kj::newTwoWayPipe();
  ReadFragmenter fragmenter(*pipe.ends[0], readFragmentSize);

  auto expectedReqText = testCase.method == HttpMethod::GET || testCase.method == HttpMethod::HEAD
      ? kj::str(testCase.method, " / HTTP/1.1\r\n\r\n")
      : kj::str(testCase.method, " / HTTP/1.1\r\nContent-Length: 0\r\n");

  auto serverTask = expectRead(*pipe.ends[1], expectedReqText).then([&]() {
    return pipe.ends[1]->write(testCase.raw.begin(), testCase.raw.size());
  }).then([&]() -> kj::Promise<void> {
    pipe.ends[1]->shutdownWrite();
    return kj::NEVER_DONE;
  });

  HttpHeaderTable table;
  auto client = newHttpClient(table, fragmenter);

  HttpHeaders headers(table);
  auto request = client->request(testCase.method, "/", headers, uint64_t(0));
  request.body = nullptr;
  auto clientTask = request.response
      .then([&](HttpClient::Response&& response) {
    KJ_EXPECT(response.statusCode == testCase.statusCode);
    KJ_EXPECT(response.statusText == testCase.statusText);

    for (auto& header: testCase.responseHeaders) {
      KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(header.id)) == header.value);
    }
    auto promise = response.body->readAllText();
    return promise.attach(kj::mv(response.body));
  }).then([&](kj::String body) {
    KJ_EXPECT(body == kj::strArray(testCase.responseBodyParts, ""), body);
  });

  serverTask.exclusiveJoin(kj::mv(clientTask)).wait(waitScope);

  // Verify no more data written by client.
  client = nullptr;
  pipe.ends[0]->shutdownWrite();
  KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}

void testHttpClient(kj::WaitScope& waitScope, HttpHeaderTable& table,
                    HttpClient& client, const HttpTestCase& testCase) {
  KJ_CONTEXT(testCase.request.raw, testCase.response.raw);

  HttpHeaders headers(table);
  for (auto& header: testCase.request.requestHeaders) {
    headers.set(header.id, header.value);
  }

  auto request = client.request(
      testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
  for (auto& part: testCase.request.requestBodyParts) {
    request.body->write(part.begin(), part.size()).wait(waitScope);
  }
  request.body = nullptr;

  auto response = request.response.wait(waitScope);

  KJ_EXPECT(response.statusCode == testCase.response.statusCode);
  auto body = response.body->readAllText().wait(waitScope);
  if (testCase.request.method == HttpMethod::HEAD) {
    KJ_EXPECT(body == "");
  } else {
    KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
  }
}

class TestHttpService final: public HttpService {
public:
  TestHttpService(const HttpRequestTestCase& expectedRequest,
                  const HttpResponseTestCase& response,
                  HttpHeaderTable& table)
      : singleExpectedRequest(&expectedRequest),
        singleResponse(&response),
        responseHeaders(table) {}
  TestHttpService(kj::ArrayPtr<const HttpTestCase> testCases,
                  HttpHeaderTable& table)
      : singleExpectedRequest(nullptr),
        singleResponse(nullptr),
        testCases(testCases),
        responseHeaders(table) {}

  uint getRequestCount() { return requestCount; }

  kj::Promise<void> request(
      HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
      kj::AsyncInputStream& requestBody, Response& responseSender) override {
    auto& expectedRequest = testCases == nullptr ? *singleExpectedRequest :
        testCases[requestCount % testCases.size()].request;
    auto& response = testCases == nullptr ? *singleResponse :
        testCases[requestCount % testCases.size()].response;

    ++requestCount;

    KJ_EXPECT(method == expectedRequest.method, method);
    KJ_EXPECT(url == expectedRequest.path, url);

    for (auto& header: expectedRequest.requestHeaders) {
      KJ_EXPECT(KJ_ASSERT_NONNULL(headers.get(header.id)) == header.value);
    }

    auto size = requestBody.tryGetLength();
    KJ_IF_MAYBE(expectedSize, expectedRequest.requestBodySize) {
      KJ_IF_MAYBE(s, size) {
        KJ_EXPECT(*s == *expectedSize, *s);
      } else {
        KJ_FAIL_EXPECT("tryGetLength() returned nullptr; expected known size");
      }
    } else {
      KJ_EXPECT(size == nullptr);
    }

    return requestBody.readAllText()
        .then([this,&expectedRequest,&response,&responseSender](kj::String text) {
      KJ_EXPECT(text == kj::strArray(expectedRequest.requestBodyParts, ""), text);

      responseHeaders.clear();
      for (auto& header: response.responseHeaders) {
        responseHeaders.set(header.id, header.value);
      }

      auto stream = responseSender.send(response.statusCode, response.statusText,
                                        responseHeaders, response.responseBodySize);
      auto promise = writeEach(*stream, response.responseBodyParts);
      return promise.attach(kj::mv(stream));
    });
  }

private:
  const HttpRequestTestCase* singleExpectedRequest;
  const HttpResponseTestCase* singleResponse;
  kj::ArrayPtr<const HttpTestCase> testCases;
  HttpHeaders responseHeaders;
  uint requestCount = 0;
};

void testHttpServerRequest(kj::WaitScope& waitScope, kj::Timer& timer,
                           const HttpRequestTestCase& requestCase,
                           const HttpResponseTestCase& responseCase) {
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  TestHttpService service(requestCase, responseCase, table);
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  pipe.ends[1]->write(requestCase.raw.begin(), requestCase.raw.size()).wait(waitScope);
  pipe.ends[1]->shutdownWrite();

  expectRead(*pipe.ends[1], responseCase.raw).wait(waitScope);

  listenTask.wait(waitScope);

  KJ_EXPECT(service.getRequestCount() == 1);
}

kj::ArrayPtr<const HttpRequestTestCase> requestTestCases() {
  static const auto HUGE_STRING = kj::strArray(kj::repeat("abcdefgh", 4096), "");
  static const auto HUGE_REQUEST = kj::str(
      "GET / HTTP/1.1\r\n"
      "Host: ", HUGE_STRING, "\r\n"
      "\r\n");

  static const HttpRequestTestCase REQUEST_TEST_CASES[] {
    {
      "GET /foo/bar HTTP/1.1\r\n"
      "Host: example.com\r\n"
      "\r\n",

      HttpMethod::GET,
      "/foo/bar",
      {{HttpHeaderId::HOST, "example.com"}},
      uint64_t(0), {},
    },

    {
      "HEAD /foo/bar HTTP/1.1\r\n"
      "Host: example.com\r\n"
      "\r\n",

      HttpMethod::HEAD,
      "/foo/bar",
      {{HttpHeaderId::HOST, "example.com"}},
      uint64_t(0), {},
    },

    {
      "POST / HTTP/1.1\r\n"
      "Content-Length: 9\r\n"
      "Host: example.com\r\n"
      "Content-Type: text/plain\r\n"
      "\r\n"
      "foobarbaz",

      HttpMethod::POST,
      "/",
      {
        {HttpHeaderId::HOST, "example.com"},
        {HttpHeaderId::CONTENT_TYPE, "text/plain"},
      },
      9, { "foo", "bar", "baz" },
    },

    {
      "POST / HTTP/1.1\r\n"
      "Transfer-Encoding: chunked\r\n"
      "Host: example.com\r\n"
      "Content-Type: text/plain\r\n"
      "\r\n"
      "3\r\n"
      "foo\r\n"
      "6\r\n"
      "barbaz\r\n"
      "0\r\n"
      "\r\n",

      HttpMethod::POST,
      "/",
      {
        {HttpHeaderId::HOST, "example.com"},
        {HttpHeaderId::CONTENT_TYPE, "text/plain"},
      },
      nullptr, { "foo", "barbaz" },
    },

    {
      "POST / HTTP/1.1\r\n"
      "Transfer-Encoding: chunked\r\n"
      "Host: example.com\r\n"
      "Content-Type: text/plain\r\n"
      "\r\n"
      "1d\r\n"
      "0123456789abcdef0123456789abc\r\n"
      "0\r\n"
      "\r\n",

      HttpMethod::POST,
      "/",
      {
        {HttpHeaderId::HOST, "example.com"},
        {HttpHeaderId::CONTENT_TYPE, "text/plain"},
      },
      nullptr, { "0123456789abcdef0123456789abc" },
    },

    {
      HUGE_REQUEST,

      HttpMethod::GET,
      "/",
      {{HttpHeaderId::HOST, HUGE_STRING}},
      uint64_t(0), {}
    },

    {
      "GET /foo/bar HTTP/1.1\r\n"
      "Content-Length: 6\r\n"
      "Host: example.com\r\n"
      "\r\n"
      "foobar",

      HttpMethod::GET,
      "/foo/bar",
      {{HttpHeaderId::HOST, "example.com"}},
      uint64_t(6), { "foobar" },
    },

    {
      "GET /foo/bar HTTP/1.1\r\n"
      "Transfer-Encoding: chunked\r\n"
      "Host: example.com\r\n"
      "\r\n"
      "3\r\n"
      "foo\r\n"
      "3\r\n"
      "bar\r\n"
      "0\r\n"
      "\r\n",

      HttpMethod::GET,
      "/foo/bar",
      {{HttpHeaderId::HOST, "example.com"},
       {HttpHeaderId::TRANSFER_ENCODING, "chunked"}},
      nullptr, { "foo", "bar" },
    }
  };

  // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents REQUEST_TEST_CASES from implicitly
  //   casting to our return type.
  return kj::arrayPtr(REQUEST_TEST_CASES, kj::size(REQUEST_TEST_CASES));
}

kj::ArrayPtr<const HttpResponseTestCase> responseTestCases() {
  static const HttpResponseTestCase RESPONSE_TEST_CASES[] {
    {
      "HTTP/1.1 200 OK\r\n"
      "Content-Type: text/plain\r\n"
      "Connection: close\r\n"
      "\r\n"
      "baz qux",

      200, "OK",
      {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
      nullptr, {"baz qux"},

      HttpMethod::GET,
      CLIENT_ONLY,   // Server never sends connection: close
    },

    {
      "HTTP/1.1 200 OK\r\n"
      "Content-Length: 123\r\n"
      "Content-Type: text/plain\r\n"
      "\r\n",

      200, "OK",
      {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
      123, {},

      HttpMethod::HEAD,
    },

    {
      "HTTP/1.1 200 OK\r\n"
      "Content-Length: foobar\r\n"
      "Content-Type: text/plain\r\n"
      "\r\n",

      200, "OK",
      {{HttpHeaderId::CONTENT_TYPE, "text/plain"},
       {HttpHeaderId::CONTENT_LENGTH, "foobar"}},
      123, {},

      HttpMethod::HEAD,
    },

    {
      "HTTP/1.1 200 OK\r\n"
      "Content-Length: 8\r\n"
      "Content-Type: text/plain\r\n"
      "\r\n"
      "quxcorge",

      200, "OK",
      {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
      8, { "qux", "corge" }
    },

    {
      "HTTP/1.1 200 OK\r\n"
      "Transfer-Encoding: chunked\r\n"
      "Content-Type: text/plain\r\n"
      "\r\n"
      "3\r\n"
      "qux\r\n"
      "5\r\n"
      "corge\r\n"
      "0\r\n"
      "\r\n",

      200, "OK",
      {{HttpHeaderId::CONTENT_TYPE, "text/plain"}},
      nullptr, { "qux", "corge" }
    },
  };

  // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly
  //   casting to our return type.
  return kj::arrayPtr(RESPONSE_TEST_CASES, kj::size(RESPONSE_TEST_CASES));
}

KJ_TEST("HttpClient requests") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);

  for (auto& testCase: requestTestCases()) {
    if (testCase.side == SERVER_ONLY) continue;
    KJ_CONTEXT(testCase.raw);
    testHttpClientRequest(waitScope, testCase);
  }
}

KJ_TEST("HttpClient responses") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  size_t FRAGMENT_SIZES[] = { 1, 2, 3, 4, 5, 6, 7, 8, 16, 31, kj::maxValue };

  for (auto& testCase: responseTestCases()) {
    if (testCase.side == SERVER_ONLY) continue;
    for (size_t fragmentSize: FRAGMENT_SIZES) {
      KJ_CONTEXT(testCase.raw, fragmentSize);
      testHttpClientResponse(waitScope, testCase, fragmentSize);
    }
  }
}

KJ_TEST("HttpClient canceled write") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);

  auto pipe = kj::newTwoWayPipe();

  auto serverPromise = pipe.ends[1]->readAllText();

  {
    HttpHeaderTable table;
    auto client = newHttpClient(table, *pipe.ends[0]);

    auto body = kj::heapArray<byte>(4096);
    memset(body.begin(), 0xcf, body.size());

    auto req = client->request(HttpMethod::POST, "/", HttpHeaders(table), uint64_t(4096));

    // Start a write and immediately cancel it.
    (void)req.body->write(body.begin(), body.size());

    KJ_EXPECT_THROW_MESSAGE("overwrote", req.body->write("foo", 3).wait(waitScope));
    req.body = nullptr;

    KJ_EXPECT(!serverPromise.poll(waitScope));

    KJ_EXPECT_THROW_MESSAGE("can't start new request until previous request body",
        client->request(HttpMethod::GET, "/", HttpHeaders(table)).response.wait(waitScope));
  }

  pipe.ends[0]->shutdownWrite();
  auto text = serverPromise.wait(waitScope);
  KJ_EXPECT(text == "POST / HTTP/1.1\r\nContent-Length: 4096\r\n\r\n", text);
}

KJ_TEST("HttpServer requests") {
  HttpResponseTestCase RESPONSE = {
    "HTTP/1.1 200 OK\r\n"
    "Content-Length: 3\r\n"
    "\r\n"
    "foo",

    200, "OK",
    {},
    3, {"foo"}
  };

  HttpResponseTestCase HEAD_RESPONSE = {
    "HTTP/1.1 200 OK\r\n"
    "Content-Length: 3\r\n"
    "\r\n",

    200, "OK",
    {},
    3, {"foo"}
  };

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());

  for (auto& testCase: requestTestCases()) {
    if (testCase.side == CLIENT_ONLY) continue;
    KJ_CONTEXT(testCase.raw);
    testHttpServerRequest(waitScope, timer, testCase,
        testCase.method == HttpMethod::HEAD ? HEAD_RESPONSE : RESPONSE);
  }
}

KJ_TEST("HttpServer responses") {
  HttpRequestTestCase REQUEST = {
    "GET / HTTP/1.1\r\n"
    "\r\n",

    HttpMethod::GET,
    "/",
    {},
    uint64_t(0), {},
  };

  HttpRequestTestCase HEAD_REQUEST = {
    "HEAD / HTTP/1.1\r\n"
    "\r\n",

    HttpMethod::HEAD,
    "/",
    {},
    uint64_t(0), {},
  };

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());

  for (auto& testCase: responseTestCases()) {
    if (testCase.side == CLIENT_ONLY) continue;
    KJ_CONTEXT(testCase.raw);
    testHttpServerRequest(waitScope, timer,
        testCase.method == HttpMethod::HEAD ? HEAD_REQUEST : REQUEST, testCase);
  }
}

// -----------------------------------------------------------------------------

kj::ArrayPtr<const HttpTestCase> pipelineTestCases() {
  static const HttpTestCase PIPELINE_TESTS[] = {
    {
      {
        "GET / HTTP/1.1\r\n"
        "\r\n",

        HttpMethod::GET, "/", {}, uint64_t(0), {},
      },
      {
        "HTTP/1.1 200 OK\r\n"
        "Content-Length: 7\r\n"
        "\r\n"
        "foo bar",

        200, "OK", {}, 7, { "foo bar" }
      },
    },

    {
      {
        "POST /foo HTTP/1.1\r\n"
        "Content-Length: 6\r\n"
        "\r\n"
        "grault",

        HttpMethod::POST, "/foo", {}, 6, { "grault" },
      },
      {
        "HTTP/1.1 404 Not Found\r\n"
        "Content-Length: 13\r\n"
        "\r\n"
        "baz qux corge",

        404, "Not Found", {}, 13, { "baz qux corge" }
      },
    },

    // Throw a zero-size request/response into the pipeline to check for a bug that existed with
    // them previously.
    {
      {
        "POST /foo HTTP/1.1\r\n"
        "Content-Length: 0\r\n"
        "\r\n",

        HttpMethod::POST, "/foo", {}, uint64_t(0), {},
      },
      {
        "HTTP/1.1 200 OK\r\n"
        "Content-Length: 0\r\n"
        "\r\n",

        200, "OK", {}, uint64_t(0), {}
      },
    },

    // Also a zero-size chunked request/response.
    {
      {
        "POST /foo HTTP/1.1\r\n"
        "Transfer-Encoding: chunked\r\n"
        "\r\n"
        "0\r\n"
        "\r\n",

        HttpMethod::POST, "/foo", {}, nullptr, {},
      },
      {
        "HTTP/1.1 200 OK\r\n"
        "Transfer-Encoding: chunked\r\n"
        "\r\n"
        "0\r\n"
        "\r\n",

        200, "OK", {}, nullptr, {}
      },
    },

    {
      {
        "POST /bar HTTP/1.1\r\n"
        "Transfer-Encoding: chunked\r\n"
        "\r\n"
        "6\r\n"
        "garply\r\n"
        "5\r\n"
        "waldo\r\n"
        "0\r\n"
        "\r\n",

        HttpMethod::POST, "/bar", {}, nullptr, { "garply", "waldo" },
      },
      {
        "HTTP/1.1 200 OK\r\n"
        "Transfer-Encoding: chunked\r\n"
        "\r\n"
        "4\r\n"
        "fred\r\n"
        "5\r\n"
        "plugh\r\n"
        "0\r\n"
        "\r\n",

        200, "OK", {}, nullptr, { "fred", "plugh" }
      },
    },

    {
      {
        "HEAD / HTTP/1.1\r\n"
        "\r\n",

        HttpMethod::HEAD, "/", {}, uint64_t(0), {},
      },
      {
        "HTTP/1.1 200 OK\r\n"
        "Content-Length: 7\r\n"
        "\r\n",

        200, "OK", {}, 7, { "foo bar" }
      },
    },
  };

  // TODO(cleanup): A bug in GCC 4.8, fixed in 4.9, prevents RESPONSE_TEST_CASES from implicitly
  //   casting to our return type.
  return kj::arrayPtr(PIPELINE_TESTS, kj::size(PIPELINE_TESTS));
}

KJ_TEST("HttpClient pipeline") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
  for (auto& testCase: PIPELINE_TESTS) {
    writeResponsesPromise = writeResponsesPromise
        .then([&]() {
      return expectRead(*pipe.ends[1], testCase.request.raw);
    }).then([&]() {
      return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
    });
  }

  HttpHeaderTable table;
  auto client = newHttpClient(table, *pipe.ends[0]);

  for (auto& testCase: PIPELINE_TESTS) {
    testHttpClient(waitScope, table, *client, testCase);
  }

  client = nullptr;
  pipe.ends[0]->shutdownWrite();

  writeResponsesPromise.wait(waitScope);
}

KJ_TEST("HttpClient parallel pipeline") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  kj::Promise<void> readRequestsPromise = kj::READY_NOW;
  kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
  for (auto& testCase: PIPELINE_TESTS) {
    auto forked = readRequestsPromise
        .then([&]() {
      return expectRead(*pipe.ends[1], testCase.request.raw);
    }).fork();
    readRequestsPromise = forked.addBranch();

    // Don't write each response until the corresponding request is received.
    auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
    promises.add(forked.addBranch());
    promises.add(kj::mv(writeResponsesPromise));
    writeResponsesPromise = kj::joinPromises(promises.finish()).then([&]() {
      return pipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
    });
  }

  HttpHeaderTable table;
  auto client = newHttpClient(table, *pipe.ends[0]);

  auto responsePromises = KJ_MAP(testCase, PIPELINE_TESTS) {
    KJ_CONTEXT(testCase.request.raw, testCase.response.raw);

    HttpHeaders headers(table);
    for (auto& header: testCase.request.requestHeaders) {
      headers.set(header.id, header.value);
    }

    auto request = client->request(
        testCase.request.method, testCase.request.path, headers, testCase.request.requestBodySize);
    for (auto& part: testCase.request.requestBodyParts) {
      request.body->write(part.begin(), part.size()).wait(waitScope);
    }

    return kj::mv(request.response);
  };

  for (auto i: kj::indices(PIPELINE_TESTS)) {
    auto& testCase = PIPELINE_TESTS[i];
    auto response = responsePromises[i].wait(waitScope);

    KJ_EXPECT(response.statusCode == testCase.response.statusCode);
    auto body = response.body->readAllText().wait(waitScope);
    if (testCase.request.method == HttpMethod::HEAD) {
      KJ_EXPECT(body == "");
    } else {
      KJ_EXPECT(body == kj::strArray(testCase.response.responseBodyParts, ""), body);
    }
  }

  client = nullptr;
  pipe.ends[0]->shutdownWrite();

  writeResponsesPromise.wait(waitScope);
}

KJ_TEST("HttpServer pipeline") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  TestHttpService service(PIPELINE_TESTS, table);
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  for (auto& testCase: PIPELINE_TESTS) {
    KJ_CONTEXT(testCase.request.raw, testCase.response.raw);

    pipe.ends[1]->write(testCase.request.raw.begin(), testCase.request.raw.size())
        .wait(waitScope);

    expectRead(*pipe.ends[1], testCase.response.raw).wait(waitScope);
  }

  pipe.ends[1]->shutdownWrite();
  listenTask.wait(waitScope);

  KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}

KJ_TEST("HttpServer parallel pipeline") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  auto allRequestText =
      kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.request.raw; }, "");
  auto allResponseText =
      kj::strArray(KJ_MAP(testCase, PIPELINE_TESTS) { return testCase.response.raw; }, "");

  HttpHeaderTable table;
  TestHttpService service(PIPELINE_TESTS, table);
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  pipe.ends[1]->write(allRequestText.begin(), allRequestText.size()).wait(waitScope);
  pipe.ends[1]->shutdownWrite();

  auto rawResponse = pipe.ends[1]->readAllText().wait(waitScope);
  KJ_EXPECT(rawResponse == allResponseText, rawResponse);

  listenTask.wait(waitScope);

  KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}

KJ_TEST("HttpClient <-> HttpServer") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  TestHttpService service(PIPELINE_TESTS, table);
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[1]));
  auto client = newHttpClient(table, *pipe.ends[0]);

  for (auto& testCase: PIPELINE_TESTS) {
    testHttpClient(waitScope, table, *client, testCase);
  }

  client = nullptr;
  pipe.ends[0]->shutdownWrite();
  listenTask.wait(waitScope);
  KJ_EXPECT(service.getRequestCount() == kj::size(PIPELINE_TESTS));
}

// -----------------------------------------------------------------------------

KJ_TEST("WebSocket core protocol") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  auto client = newWebSocket(kj::mv(pipe.ends[0]), nullptr);
  auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);

  auto mediumString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 30), "");
  auto bigString = kj::strArray(kj::repeat(kj::StringPtr("123456789"), 10000), "");

  auto clientTask = client->send(kj::StringPtr("hello"))
      .then([&]() { return client->send(mediumString); })
      .then([&]() { return client->send(bigString); })
      .then([&]() { return client->send(kj::StringPtr("world").asBytes()); })
      .then([&]() { return client->close(1234, "bored"); });

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "hello");
  }

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == mediumString);
  }

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == bigString);
  }

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::Array<byte>>());
    KJ_EXPECT(kj::str(message.get<kj::Array<byte>>().asChars()) == "world");
  }

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<WebSocket::Close>());
    KJ_EXPECT(message.get<WebSocket::Close>().code == 1234);
    KJ_EXPECT(message.get<WebSocket::Close>().reason == "bored");
  }

  auto serverTask = server->close(4321, "whatever");

  {
    auto message = client->receive().wait(waitScope);
    KJ_ASSERT(message.is<WebSocket::Close>());
    KJ_EXPECT(message.get<WebSocket::Close>().code == 4321);
    KJ_EXPECT(message.get<WebSocket::Close>().reason == "whatever");
  }

  clientTask.wait(waitScope);
  serverTask.wait(waitScope);
}

KJ_TEST("WebSocket fragmented") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  auto client = kj::mv(pipe.ends[0]);
  auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);

  byte DATA[] = {
    0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ',

    0x00, 0x03, 'w', 'o', 'r',

    0x80, 0x02, 'l', 'd',
  };

  auto clientTask = client->write(DATA, sizeof(DATA));

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "hello world");
  }

  clientTask.wait(waitScope);
}

class FakeEntropySource final: public EntropySource {
public:
  void generate(kj::ArrayPtr<byte> buffer) override {
    static constexpr byte DUMMY[4] = { 12, 34, 56, 78 };

    for (auto i: kj::indices(buffer)) {
      buffer[i] = DUMMY[i % sizeof(DUMMY)];
    }
  }
};

KJ_TEST("WebSocket masked") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();
  FakeEntropySource maskGenerator;

  auto client = kj::mv(pipe.ends[0]);
  auto server = newWebSocket(kj::mv(pipe.ends[1]), maskGenerator);

  byte DATA[] = {
    0x81, 0x86, 12, 34, 56, 78, 'h' ^ 12, 'e' ^ 34, 'l' ^ 56, 'l' ^ 78, 'o' ^ 12, ' ' ^ 34,
  };

  auto clientTask = client->write(DATA, sizeof(DATA));
  auto serverTask = server->send(kj::StringPtr("hello "));

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "hello ");
  }

  expectRead(*client, DATA).wait(waitScope);

  clientTask.wait(waitScope);
  serverTask.wait(waitScope);
}

KJ_TEST("WebSocket unsolicited pong") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  auto client = kj::mv(pipe.ends[0]);
  auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);

  byte DATA[] = {
    0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ',

    0x8A, 0x03, 'f', 'o', 'o',

    0x80, 0x05, 'w', 'o', 'r', 'l', 'd',
  };

  auto clientTask = client->write(DATA, sizeof(DATA));

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "hello world");
  }

  clientTask.wait(waitScope);
}

KJ_TEST("WebSocket ping") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  auto client = kj::mv(pipe.ends[0]);
  auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);

  // Be extra-annoying by having the ping arrive between fragments.
  byte DATA[] = {
    0x01, 0x06, 'h', 'e', 'l', 'l', 'o', ' ',

    0x89, 0x03, 'f', 'o', 'o',

    0x80, 0x05, 'w', 'o', 'r', 'l', 'd',
  };

  auto clientTask = client->write(DATA, sizeof(DATA));

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "hello world");
  }

  auto serverTask = server->send(kj::StringPtr("bar"));

  byte EXPECTED[] = {
    0x8A, 0x03, 'f', 'o', 'o',  // pong
    0x81, 0x03, 'b', 'a', 'r',  // message
  };

  expectRead(*client, EXPECTED).wait(waitScope);

  clientTask.wait(waitScope);
  serverTask.wait(waitScope);
}

KJ_TEST("WebSocket ping mid-send") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  auto client = kj::mv(pipe.ends[0]);
  auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);

  auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
  auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr);

  byte DATA[] = {
    0x89, 0x03, 'f', 'o', 'o',  // ping
    0x81, 0x03, 'b', 'a', 'r',  // some other message
  };

  auto clientTask = client->write(DATA, sizeof(DATA));

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "bar");
  }

  byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
  expectRead(*client, EXPECTED1).wait(waitScope);
  expectRead(*client, bigString).wait(waitScope);

  byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' };
  expectRead(*client, EXPECTED2).wait(waitScope);

  clientTask.wait(waitScope);
  serverTask.wait(waitScope);
}

class InputOutputPair final: public kj::AsyncIoStream {
  // Creates an AsyncIoStream out of an AsyncInputStream and an AsyncOutputStream.

public:
  InputOutputPair(kj::Own<kj::AsyncInputStream> in, kj::Own<kj::AsyncOutputStream> out)
      : in(kj::mv(in)), out(kj::mv(out)) {}

  kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
    return in->read(buffer, minBytes, maxBytes);
  }
  kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    return in->tryRead(buffer, minBytes, maxBytes);
  }

  Maybe<uint64_t> tryGetLength() override {
    return in->tryGetLength();
  }

  Promise<uint64_t> pumpTo(AsyncOutputStream& output, uint64_t amount = kj::maxValue) override {
    return in->pumpTo(output, amount);
  }

  kj::Promise<void> write(const void* buffer, size_t size) override {
    return out->write(buffer, size);
  }

  kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
    return out->write(pieces);
  }

  kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
      kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
    return out->tryPumpFrom(input, amount);
  }

  void shutdownWrite() override {
    out = nullptr;
  }

private:
  kj::Own<kj::AsyncInputStream> in;
  kj::Own<kj::AsyncOutputStream> out;
};

KJ_TEST("WebSocket double-ping mid-send") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);

  auto upPipe = newOneWayPipe();
  auto downPipe = newOneWayPipe();
  InputOutputPair client(kj::mv(downPipe.in), kj::mv(upPipe.out));
  auto server = newWebSocket(kj::heap<InputOutputPair>(kj::mv(upPipe.in), kj::mv(downPipe.out)),
                             nullptr);

  auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
  auto serverTask = server->send(bigString).eagerlyEvaluate(nullptr);

  byte DATA[] = {
    0x89, 0x03, 'f', 'o', 'o',  // ping
    0x89, 0x03, 'q', 'u', 'x',  // ping2
    0x81, 0x03, 'b', 'a', 'r',  // some other message
  };

  auto clientTask = client.write(DATA, sizeof(DATA));

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "bar");
  }

  byte EXPECTED1[] = { 0x81, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
  expectRead(client, EXPECTED1).wait(waitScope);
  expectRead(client, bigString).wait(waitScope);

  byte EXPECTED2[] = { 0x8A, 0x03, 'q', 'u', 'x' };
  expectRead(client, EXPECTED2).wait(waitScope);

  clientTask.wait(waitScope);
  serverTask.wait(waitScope);
}

KJ_TEST("WebSocket ping received during pong send") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  auto client = kj::mv(pipe.ends[0]);
  auto server = newWebSocket(kj::mv(pipe.ends[1]), nullptr);

  // Send a very large ping so that sending the pong takes a while. Then send a second ping
  // immediately after.
  byte PREFIX[] = { 0x89, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
  auto bigString = kj::strArray(kj::repeat(kj::StringPtr("12345678"), 65536), "");
  byte POSTFIX[] = {
    0x89, 0x03, 'f', 'o', 'o',
    0x81, 0x03, 'b', 'a', 'r',
  };

  kj::ArrayPtr<const byte> parts[] = {PREFIX, bigString.asBytes(), POSTFIX};
  auto clientTask = client->write(parts);

  {
    auto message = server->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "bar");
  }

  byte EXPECTED1[] = { 0x8A, 0x7f, 0, 0, 0, 0, 0, 8, 0, 0 };
  expectRead(*client, EXPECTED1).wait(waitScope);
  expectRead(*client, bigString).wait(waitScope);

  byte EXPECTED2[] = { 0x8A, 0x03, 'f', 'o', 'o' };
  expectRead(*client, EXPECTED2).wait(waitScope);

  clientTask.wait(waitScope);
}

class TestWebSocketService final: public HttpService, private kj::TaskSet::ErrorHandler {
public:
  TestWebSocketService(HttpHeaderTable& headerTable, HttpHeaderId hMyHeader)
      : headerTable(headerTable), hMyHeader(hMyHeader), tasks(*this) {}

  kj::Promise<void> request(
      HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
      kj::AsyncInputStream& requestBody, Response& response) override {
    KJ_ASSERT(headers.isWebSocket());

    HttpHeaders responseHeaders(headerTable);
    KJ_IF_MAYBE(h, headers.get(hMyHeader)) {
      responseHeaders.set(hMyHeader, kj::str("respond-", *h));
    }

    if (url == "/return-error") {
      response.send(404, "Not Found", responseHeaders, uint64_t(0));
      return kj::READY_NOW;
    } else if (url == "/websocket") {
      auto ws = response.acceptWebSocket(responseHeaders);
      return doWebSocket(*ws, "start-inline").attach(kj::mv(ws));
    } else {
      KJ_FAIL_ASSERT("unexpected path", url);
    }
  }

private:
  HttpHeaderTable& headerTable;
  HttpHeaderId hMyHeader;
  kj::TaskSet tasks;

  void taskFailed(kj::Exception&& exception) override {
    KJ_LOG(ERROR, exception);
  }

  static kj::Promise<void> doWebSocket(WebSocket& ws, kj::StringPtr message) {
    auto copy = kj::str(message);
    return ws.send(copy).attach(kj::mv(copy))
        .then([&ws]() {
      return ws.receive();
    }).then([&ws](WebSocket::Message&& message) {
      KJ_SWITCH_ONEOF(message) {
        KJ_CASE_ONEOF(str, kj::String) {
          return doWebSocket(ws, kj::str("reply:", str));
        }
        KJ_CASE_ONEOF(data, kj::Array<byte>) {
          return doWebSocket(ws, kj::str("reply:", data));
        }
        KJ_CASE_ONEOF(close, WebSocket::Close) {
          auto reason = kj::str("close-reply:", close.reason);
          return ws.close(close.code + 1, reason).attach(kj::mv(reason));
        }
      }
      KJ_UNREACHABLE;
    });
  }
};

const char WEBSOCKET_REQUEST_HANDSHAKE[] =
    " HTTP/1.1\r\n"
    "Connection: Upgrade\r\n"
    "Upgrade: websocket\r\n"
    "Sec-WebSocket-Key: DCI4TgwiOE4MIjhODCI4Tg==\r\n"
    "Sec-WebSocket-Version: 13\r\n"
    "My-Header: foo\r\n"
    "\r\n";
const char WEBSOCKET_RESPONSE_HANDSHAKE[] =
    "HTTP/1.1 101 Switching Protocols\r\n"
    "Connection: Upgrade\r\n"
    "Upgrade: websocket\r\n"
    "Sec-WebSocket-Accept: pShtIFKT0s8RYZvnWY/CrjQD8CM=\r\n"
    "My-Header: respond-foo\r\n"
    "\r\n";
const char WEBSOCKET_RESPONSE_HANDSHAKE_ERROR[] =
    "HTTP/1.1 404 Not Found\r\n"
    "Content-Length: 0\r\n"
    "My-Header: respond-foo\r\n"
    "\r\n";
const byte WEBSOCKET_FIRST_MESSAGE_INLINE[] =
    { 0x81, 0x0c, 's','t','a','r','t','-','i','n','l','i','n','e' };
const byte WEBSOCKET_SEND_MESSAGE[] =
    { 0x81, 0x83, 12, 34, 56, 78, 'b'^12, 'a'^34, 'r'^56 };
const byte WEBSOCKET_REPLY_MESSAGE[] =
    { 0x81, 0x09, 'r','e','p','l','y',':','b','a','r' };
const byte WEBSOCKET_SEND_CLOSE[] =
    { 0x88, 0x85, 12, 34, 56, 78, 0x12^12, 0x34^34, 'q'^56, 'u'^78, 'x'^12 };
const byte WEBSOCKET_REPLY_CLOSE[] =
    { 0x88, 0x11, 0x12, 0x35, 'c','l','o','s','e','-','r','e','p','l','y',':','q','u','x' };

template <size_t s>
kj::ArrayPtr<const byte> asBytes(const char (&chars)[s]) {
  return kj::ArrayPtr<const char>(chars, s - 1).asBytes();
}

void testWebSocketClient(kj::WaitScope& waitScope, HttpHeaderTable& headerTable,
                         kj::HttpHeaderId hMyHeader, HttpClient& client) {
  kj::HttpHeaders headers(headerTable);
  headers.set(hMyHeader, "foo");
  auto response = client.openWebSocket("/websocket", headers).wait(waitScope);

  KJ_EXPECT(response.statusCode == 101);
  KJ_EXPECT(response.statusText == "Switching Protocols", response.statusText);
  KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo");
  KJ_ASSERT(response.webSocketOrBody.is<kj::Own<WebSocket>>());
  auto ws = kj::mv(response.webSocketOrBody.get<kj::Own<WebSocket>>());

  {
    auto message = ws->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "start-inline");
  }

  ws->send(kj::StringPtr("bar")).wait(waitScope);
  {
    auto message = ws->receive().wait(waitScope);
    KJ_ASSERT(message.is<kj::String>());
    KJ_EXPECT(message.get<kj::String>() == "reply:bar");
  }

  ws->close(0x1234, "qux").wait(waitScope);
  {
    auto message = ws->receive().wait(waitScope);
    KJ_ASSERT(message.is<WebSocket::Close>());
    KJ_EXPECT(message.get<WebSocket::Close>().code == 0x1235);
    KJ_EXPECT(message.get<WebSocket::Close>().reason == "close-reply:qux");
  }
}

KJ_TEST("HttpClient WebSocket handshake") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);

  auto serverTask = expectRead(*pipe.ends[1], request)
      .then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); })
      .then([&]() { return pipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); })
      .then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
      .then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); })
      .then([&]() { return expectRead(*pipe.ends[1], WEBSOCKET_SEND_CLOSE); })
      .then([&]() { return pipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); })
      .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });

  HttpHeaderTable::Builder tableBuilder;
  HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
  auto headerTable = tableBuilder.build();

  FakeEntropySource entropySource;

  auto client = newHttpClient(*headerTable, *pipe.ends[0], entropySource);

  testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);

  serverTask.wait(waitScope);
}

KJ_TEST("HttpClient WebSocket error") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  auto pipe = kj::newTwoWayPipe();

  auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);

  auto serverTask = expectRead(*pipe.ends[1], request)
      .then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)}); })
      .then([&]() { return expectRead(*pipe.ends[1], request); })
      .then([&]() { return pipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE_ERROR)}); })
      .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });

  HttpHeaderTable::Builder tableBuilder;
  HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
  auto headerTable = tableBuilder.build();

  FakeEntropySource entropySource;

  auto client = newHttpClient(*headerTable, *pipe.ends[0], entropySource);

  kj::HttpHeaders headers(*headerTable);
  headers.set(hMyHeader, "foo");

  {
    auto response = client->openWebSocket("/websocket", headers).wait(waitScope);

    KJ_EXPECT(response.statusCode == 404);
    KJ_EXPECT(response.statusText == "Not Found", response.statusText);
    KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo");
    KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>());
  }

  {
    auto response = client->openWebSocket("/websocket", headers).wait(waitScope);

    KJ_EXPECT(response.statusCode == 404);
    KJ_EXPECT(response.statusText == "Not Found", response.statusText);
    KJ_EXPECT(KJ_ASSERT_NONNULL(response.headers->get(hMyHeader)) == "respond-foo");
    KJ_ASSERT(response.webSocketOrBody.is<kj::Own<AsyncInputStream>>());
  }

  serverTask.wait(waitScope);
}

KJ_TEST("HttpServer WebSocket handshake") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable::Builder tableBuilder;
  HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
  auto headerTable = tableBuilder.build();
  TestWebSocketService service(*headerTable, hMyHeader);
  HttpServer server(timer, *headerTable, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
  pipe.ends[1]->write({request.asBytes()}).wait(waitScope);
  expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);

  expectRead(*pipe.ends[1], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
  pipe.ends[1]->write({WEBSOCKET_SEND_MESSAGE}).wait(waitScope);
  expectRead(*pipe.ends[1], WEBSOCKET_REPLY_MESSAGE).wait(waitScope);
  pipe.ends[1]->write({WEBSOCKET_SEND_CLOSE}).wait(waitScope);
  expectRead(*pipe.ends[1], WEBSOCKET_REPLY_CLOSE).wait(waitScope);

  listenTask.wait(waitScope);
}

KJ_TEST("HttpServer WebSocket handshake error") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable::Builder tableBuilder;
  HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
  auto headerTable = tableBuilder.build();
  TestWebSocketService service(*headerTable, hMyHeader);
  HttpServer server(timer, *headerTable, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  auto request = kj::str("GET /return-error", WEBSOCKET_REQUEST_HANDSHAKE);
  pipe.ends[1]->write({request.asBytes()}).wait(waitScope);
  expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope);

  // Can send more requests!
  pipe.ends[1]->write({request.asBytes()}).wait(waitScope);
  expectRead(*pipe.ends[1], WEBSOCKET_RESPONSE_HANDSHAKE_ERROR).wait(waitScope);

  pipe.ends[1]->shutdownWrite();

  listenTask.wait(waitScope);
}

// -----------------------------------------------------------------------------

KJ_TEST("HttpServer request timeout") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  TestHttpService service(PIPELINE_TESTS, table);
  HttpServerSettings settings;
  settings.headerTimeout = 1 * kj::MILLISECONDS;
  HttpServer server(timer, table, service, settings);

  // Shouldn't hang! Should time out.
  auto promise = server.listenHttp(kj::mv(pipe.ends[0]));
  KJ_EXPECT(!promise.poll(waitScope));
  timer.advanceTo(timer.now() + settings.headerTimeout / 2);
  KJ_EXPECT(!promise.poll(waitScope));
  timer.advanceTo(timer.now() + settings.headerTimeout);
  promise.wait(waitScope);

  // Closes the connection without sending anything.
  KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}

KJ_TEST("HttpServer pipeline timeout") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  TestHttpService service(PIPELINE_TESTS, table);
  HttpServerSettings settings;
  settings.pipelineTimeout = 1 * kj::MILLISECONDS;
  HttpServer server(timer, table, service, settings);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  // Do one request.
  pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
      .wait(waitScope);
  expectRead(*pipe.ends[1], PIPELINE_TESTS[0].response.raw).wait(waitScope);

  // Listen task should time out even though we didn't shutdown the socket.
  KJ_EXPECT(!listenTask.poll(waitScope));
  timer.advanceTo(timer.now() + settings.pipelineTimeout / 2);
  KJ_EXPECT(!listenTask.poll(waitScope));
  timer.advanceTo(timer.now() + settings.pipelineTimeout);
  listenTask.wait(waitScope);

  // In this case, no data is sent back.
  KJ_EXPECT(pipe.ends[1]->readAllText().wait(waitScope) == "");
}

class BrokenHttpService final: public HttpService {
  // HttpService that doesn't send a response.
public:
  BrokenHttpService() = default;
  explicit BrokenHttpService(kj::Exception&& exception): exception(kj::mv(exception)) {}

  kj::Promise<void> request(
      HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
      kj::AsyncInputStream& requestBody, Response& responseSender) override {
    return requestBody.readAllBytes().then([this](kj::Array<byte>&&) -> kj::Promise<void> {
      KJ_IF_MAYBE(e, exception) {
        return kj::cp(*e);
      } else {
        return kj::READY_NOW;
      }
    });
  }

private:
  kj::Maybe<kj::Exception> exception;
};

KJ_TEST("HttpServer no response") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  BrokenHttpService service;
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  // Do one request.
  pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
      .wait(waitScope);
  auto text = pipe.ends[1]->readAllText().wait(waitScope);

  KJ_EXPECT(text ==
      "HTTP/1.1 500 Internal Server Error\r\n"
      "Connection: close\r\n"
      "Content-Length: 51\r\n"
      "Content-Type: text/plain\r\n"
      "\r\n"
      "ERROR: The HttpService did not generate a response.", text);
}

KJ_TEST("HttpServer disconnected") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  BrokenHttpService service(KJ_EXCEPTION(DISCONNECTED, "disconnected"));
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  // Do one request.
  pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
      .wait(waitScope);
  auto text = pipe.ends[1]->readAllText().wait(waitScope);

  KJ_EXPECT(text == "", text);
}

KJ_TEST("HttpServer overloaded") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  BrokenHttpService service(KJ_EXCEPTION(OVERLOADED, "overloaded"));
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  // Do one request.
  pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
      .wait(waitScope);
  auto text = pipe.ends[1]->readAllText().wait(waitScope);

  KJ_EXPECT(text.startsWith("HTTP/1.1 503 Service Unavailable"), text);
}

KJ_TEST("HttpServer unimplemented") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  BrokenHttpService service(KJ_EXCEPTION(UNIMPLEMENTED, "unimplemented"));
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  // Do one request.
  pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
      .wait(waitScope);
  auto text = pipe.ends[1]->readAllText().wait(waitScope);

  KJ_EXPECT(text.startsWith("HTTP/1.1 501 Not Implemented"), text);
}

KJ_TEST("HttpServer threw exception") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  BrokenHttpService service(KJ_EXCEPTION(FAILED, "failed"));
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  // Do one request.
  pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
      .wait(waitScope);
  auto text = pipe.ends[1]->readAllText().wait(waitScope);

  KJ_EXPECT(text.startsWith("HTTP/1.1 500 Internal Server Error"), text);
}

class PartialResponseService final: public HttpService {
  // HttpService that sends a partial response then throws.
public:
  kj::Promise<void> request(
      HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
      kj::AsyncInputStream& requestBody, Response& response) override {
    return requestBody.readAllBytes()
        .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> {
      HttpHeaders headers(table);
      auto body = response.send(200, "OK", headers, 32);
      auto promise = body->write("foo", 3);
      return promise.attach(kj::mv(body)).then([]() -> kj::Promise<void> {
        return KJ_EXCEPTION(FAILED, "failed");
      });
    });
  }

private:
  kj::Maybe<kj::Exception> exception;
  HttpHeaderTable table;
};

KJ_TEST("HttpServer threw exception after starting response") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  PartialResponseService service;
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  KJ_EXPECT_LOG(ERROR, "HttpService threw exception after generating a partial response");

  // Do one request.
  pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
      .wait(waitScope);
  auto text = pipe.ends[1]->readAllText().wait(waitScope);

  KJ_EXPECT(text ==
      "HTTP/1.1 200 OK\r\n"
      "Content-Length: 32\r\n"
      "\r\n"
      "foo", text);
}

class PartialResponseNoThrowService final: public HttpService {
  // HttpService that sends a partial response then returns without throwing.
public:
  kj::Promise<void> request(
      HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
      kj::AsyncInputStream& requestBody, Response& response) override {
    return requestBody.readAllBytes()
        .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> {
      HttpHeaders headers(table);
      auto body = response.send(200, "OK", headers, 32);
      auto promise = body->write("foo", 3);
      return promise.attach(kj::mv(body));
    });
  }

private:
  kj::Maybe<kj::Exception> exception;
  HttpHeaderTable table;
};

KJ_TEST("HttpServer failed to write complete response but didn't throw") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  PartialResponseNoThrowService service;
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  // Do one request.
  pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
      .wait(waitScope);
  auto text = pipe.ends[1]->readAllText().wait(waitScope);

  KJ_EXPECT(text ==
      "HTTP/1.1 200 OK\r\n"
      "Content-Length: 32\r\n"
      "\r\n"
      "foo", text);
}

class SimpleInputStream final: public kj::AsyncInputStream {
  // An InputStream that returns bytes out of a static string.

public:
  SimpleInputStream(kj::StringPtr text)
      : unread(text.asBytes()) {}

  kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    size_t amount = kj::min(maxBytes, unread.size());
    memcpy(buffer, unread.begin(), amount);
    unread = unread.slice(amount, unread.size());
    return amount;
  }

private:
  kj::ArrayPtr<const byte> unread;
};

class PumpResponseService final: public HttpService {
  // HttpService that uses pumpTo() to write a response, without carefully specifying how much to
  // pump, but the stream happens to be the right size.
public:
  kj::Promise<void> request(
      HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
      kj::AsyncInputStream& requestBody, Response& response) override {
    return requestBody.readAllBytes()
        .then([this,&response](kj::Array<byte>&&) -> kj::Promise<void> {
      HttpHeaders headers(table);
      kj::StringPtr text = "Hello, World!";
      auto body = response.send(200, "OK", headers, text.size());

      auto stream = kj::heap<SimpleInputStream>(text);
      auto promise = stream->pumpTo(*body);
      return promise.attach(kj::mv(body), kj::mv(stream))
          .then([text](uint64_t amount) {
        KJ_EXPECT(amount == text.size());
      });
    });
  }

private:
  kj::Maybe<kj::Exception> exception;
  HttpHeaderTable table;
};

KJ_TEST("HttpFixedLengthEntityWriter correctly implements tryPumpFrom") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable table;
  PumpResponseService service;
  HttpServer server(timer, table, service);

  auto listenTask = server.listenHttp(kj::mv(pipe.ends[0]));

  // Do one request.
  pipe.ends[1]->write(PIPELINE_TESTS[0].request.raw.begin(), PIPELINE_TESTS[0].request.raw.size())
      .wait(waitScope);
  pipe.ends[1]->shutdownWrite();
  auto text = pipe.ends[1]->readAllText().wait(waitScope);

  KJ_EXPECT(text ==
      "HTTP/1.1 200 OK\r\n"
      "Content-Length: 13\r\n"
      "\r\n"
      "Hello, World!", text);
}

// -----------------------------------------------------------------------------

KJ_TEST("newHttpService from HttpClient") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto frontPipe = kj::newTwoWayPipe();
  auto backPipe = kj::newTwoWayPipe();

  kj::Promise<void> writeResponsesPromise = kj::READY_NOW;
  for (auto& testCase: PIPELINE_TESTS) {
    writeResponsesPromise = writeResponsesPromise
        .then([&]() {
      return expectRead(*backPipe.ends[1], testCase.request.raw);
    }).then([&]() {
      return backPipe.ends[1]->write(testCase.response.raw.begin(), testCase.response.raw.size());
    });
  }

  {
    HttpHeaderTable table;
    auto backClient = newHttpClient(table, *backPipe.ends[0]);
    auto frontService = newHttpService(*backClient);
    HttpServer frontServer(timer, table, *frontService);
    auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));

    for (auto& testCase: PIPELINE_TESTS) {
      KJ_CONTEXT(testCase.request.raw, testCase.response.raw);

      frontPipe.ends[0]->write(testCase.request.raw.begin(), testCase.request.raw.size())
               .wait(waitScope);

      expectRead(*frontPipe.ends[0], testCase.response.raw).wait(waitScope);
    }

    frontPipe.ends[0]->shutdownWrite();
    listenTask.wait(waitScope);
  }

  backPipe.ends[0]->shutdownWrite();
  writeResponsesPromise.wait(waitScope);
}

KJ_TEST("newHttpService from HttpClient WebSockets") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto frontPipe = kj::newTwoWayPipe();
  auto backPipe = kj::newTwoWayPipe();

  auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
  auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
      .then([&]() { return backPipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); })
      .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); })
      .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
      .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_MESSAGE}); })
      .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_CLOSE); })
      .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_REPLY_CLOSE}); })
      // expect EOF
      .then([&]() { return backPipe.ends[1]->readAllBytes(); })
      .then([&](kj::ArrayPtr<byte> content) {
        KJ_EXPECT(content.size() == 0);
        // Send EOF.
        backPipe.ends[1]->shutdownWrite();
      })
      .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });

  {
    HttpHeaderTable table;
    FakeEntropySource entropySource;
    auto backClient = newHttpClient(table, *backPipe.ends[0], entropySource);
    auto frontService = newHttpService(*backClient);
    HttpServer frontServer(timer, table, *frontService);
    auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));

    frontPipe.ends[0]->write({request.asBytes()}).wait(waitScope);
    expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);

    expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
    frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(waitScope);
    expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_MESSAGE).wait(waitScope);
    frontPipe.ends[0]->write({WEBSOCKET_SEND_CLOSE}).wait(waitScope);
    expectRead(*frontPipe.ends[0], WEBSOCKET_REPLY_CLOSE).wait(waitScope);

    frontPipe.ends[0]->shutdownWrite();
    listenTask.wait(waitScope);
  }

  writeResponsesPromise.wait(waitScope);
}

KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto frontPipe = kj::newTwoWayPipe();
  auto backPipe = kj::newTwoWayPipe();

  auto request = kj::str("GET /websocket", WEBSOCKET_REQUEST_HANDSHAKE);
  auto writeResponsesPromise = expectRead(*backPipe.ends[1], request)
      .then([&]() { return backPipe.ends[1]->write({asBytes(WEBSOCKET_RESPONSE_HANDSHAKE)}); })
      .then([&]() { return backPipe.ends[1]->write({WEBSOCKET_FIRST_MESSAGE_INLINE}); })
      .then([&]() { return expectRead(*backPipe.ends[1], WEBSOCKET_SEND_MESSAGE); })
      .then([&]() { backPipe.ends[1]->shutdownWrite(); })
      .eagerlyEvaluate([](kj::Exception&& e) { KJ_LOG(ERROR, e); });

  {
    HttpHeaderTable table;
    FakeEntropySource entropySource;
    auto backClient = newHttpClient(table, *backPipe.ends[0], entropySource);
    auto frontService = newHttpService(*backClient);
    HttpServer frontServer(timer, table, *frontService);
    auto listenTask = frontServer.listenHttp(kj::mv(frontPipe.ends[1]));

    frontPipe.ends[0]->write({request.asBytes()}).wait(waitScope);
    expectRead(*frontPipe.ends[0], WEBSOCKET_RESPONSE_HANDSHAKE).wait(waitScope);

    expectRead(*frontPipe.ends[0], WEBSOCKET_FIRST_MESSAGE_INLINE).wait(waitScope);
    frontPipe.ends[0]->write({WEBSOCKET_SEND_MESSAGE}).wait(waitScope);

    KJ_EXPECT(frontPipe.ends[0]->readAllText().wait(waitScope) == "");

    frontPipe.ends[0]->shutdownWrite();
    listenTask.wait(waitScope);
  }

  writeResponsesPromise.wait(waitScope);
}

// -----------------------------------------------------------------------------

KJ_TEST("newHttpClient from HttpService") {
  auto PIPELINE_TESTS = pipelineTestCases();

  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());

  HttpHeaderTable table;
  TestHttpService service(PIPELINE_TESTS, table);
  auto client = newHttpClient(service);

  for (auto& testCase: PIPELINE_TESTS) {
    testHttpClient(waitScope, table, *client, testCase);
  }
}

KJ_TEST("newHttpClient from HttpService WebSockets") {
  kj::EventLoop eventLoop;
  kj::WaitScope waitScope(eventLoop);
  kj::TimerImpl timer(kj::origin<kj::TimePoint>());
  auto pipe = kj::newTwoWayPipe();

  HttpHeaderTable::Builder tableBuilder;
  HttpHeaderId hMyHeader = tableBuilder.add("My-Header");
  auto headerTable = tableBuilder.build();
  TestWebSocketService service(*headerTable, hMyHeader);
  auto client = newHttpClient(service);

  testWebSocketClient(waitScope, *headerTable, hMyHeader, *client);
}

// -----------------------------------------------------------------------------

class CountingIoStream final: public kj::AsyncIoStream {
  // An AsyncIoStream wrapper which decrements a counter when destroyed (allowing us to count how
  // many connections are open).

public:
  CountingIoStream(kj::Own<kj::AsyncIoStream> inner, uint& count)
      : inner(kj::mv(inner)), count(count) {}
  ~CountingIoStream() noexcept(false) {
    --count;
  }

  kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
    return inner->read(buffer, minBytes, maxBytes);
  }
  kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
    return inner->tryRead(buffer, minBytes, maxBytes);
  }
  kj::Maybe<uint64_t> tryGetLength() override {
    return inner->tryGetLength();;
  }
  kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
    return inner->pumpTo(output, amount);
  }
  kj::Promise<void> write(const void* buffer, size_t size) override {
    return inner->write(buffer, size);
  }
  kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
    return inner->write(pieces);
  }
  kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
      kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
    return inner->tryPumpFrom(input, amount);
  }
  void shutdownWrite() override {
    return inner->shutdownWrite();
  }
  void abortRead() override {
    return inner->abortRead();
  }

public:
  kj::Own<AsyncIoStream> inner;
  uint& count;
};

class CountingNetworkAddress final: public kj::NetworkAddress {
public:
  CountingNetworkAddress(kj::NetworkAddress& inner, uint& count)
      : inner(inner), count(count), addrCount(ownAddrCount) {}
  CountingNetworkAddress(kj::Own<kj::NetworkAddress> inner, uint& count, uint& addrCount)
      : inner(*inner), ownInner(kj::mv(inner)), count(count), addrCount(addrCount) {}
  ~CountingNetworkAddress() noexcept(false) {
    --addrCount;
  }

  kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override {
    ++count;
    return inner.connect()
        .then([this](kj::Own<kj::AsyncIoStream> stream) -> kj::Own<kj::AsyncIoStream> {
      return kj::heap<CountingIoStream>(kj::mv(stream), count);
    });
  }

  kj::Own<kj::ConnectionReceiver> listen() override { KJ_UNIMPLEMENTED("test"); }
  kj::Own<kj::NetworkAddress> clone() override { KJ_UNIMPLEMENTED("test"); }
  kj::String toString() override { KJ_UNIMPLEMENTED("test"); }

private:
  kj::NetworkAddress& inner;
  kj::Own<kj::NetworkAddress> ownInner;
  uint& count;
  uint ownAddrCount = 1;
  uint& addrCount;
};

class ConnectionCountingNetwork final: public kj::Network {
public:
  ConnectionCountingNetwork(kj::Network& inner, uint& count, uint& addrCount)
      : inner(inner), count(count), addrCount(addrCount) {}

  Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
    ++addrCount;
    return inner.parseAddress(addr, portHint)
        .then([this](Own<NetworkAddress>&& addr) -> Own<NetworkAddress> {
      return kj::heap<CountingNetworkAddress>(kj::mv(addr), count, addrCount);
    });
  }
  Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
    KJ_UNIMPLEMENTED("test");
  }
  Own<Network> restrictPeers(
      kj::ArrayPtr<const kj::StringPtr> allow,
      kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
    KJ_UNIMPLEMENTED("test");
  }

private:
  kj::Network& inner;
  uint& count;
  uint& addrCount;
};

class DummyService final: public HttpService {
public:
  DummyService(HttpHeaderTable& headerTable): headerTable(headerTable) {}

  kj::Promise<void> request(
      HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
      kj::AsyncInputStream& requestBody, Response& response) override {
    if (!headers.isWebSocket()) {
      KJ_ASSERT(url != "/throw");

      auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
      auto stream = response.send(200, "OK", HttpHeaders(headerTable), body.size());
      auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
      promises.add(stream->write(body.begin(), body.size()));
      promises.add(requestBody.readAllBytes().ignoreResult());
      return kj::joinPromises(promises.finish()).attach(kj::mv(stream), kj::mv(body));
    } else {
      auto ws = response.acceptWebSocket(HttpHeaders(headerTable));
      auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
      auto sendPromise = ws->send(body);

      auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
      promises.add(sendPromise.attach(kj::mv(body)));
      promises.add(ws->receive().ignoreResult());
      return kj::joinPromises(promises.finish()).attach(kj::mv(ws));
    }
  }

private:
  HttpHeaderTable& headerTable;
};

KJ_TEST("HttpClient connection management") {
  auto io = kj::setupAsyncIo();

  kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
  kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
  HttpHeaderTable headerTable;

  auto listener = io.provider->getNetwork().parseAddress("localhost", 0)
      .wait(io.waitScope)->listen();
  DummyService service(headerTable);
  HttpServerSettings serverSettings;
  HttpServer server(serverTimer, headerTable, service, serverSettings);
  auto listenTask = server.listenHttp(*listener);

  auto addr = io.provider->getNetwork().parseAddress("localhost", listener->getPort())
      .wait(io.waitScope);
  uint count = 0;
  CountingNetworkAddress countingAddr(*addr, count);

  FakeEntropySource entropySource;
  HttpClientSettings clientSettings;
  clientSettings.entropySource = entropySource;
  auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);

  KJ_EXPECT(count == 0);

  uint i = 0;
  auto doRequest = [&]() {
    uint n = i++;
    return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
        .then([](HttpClient::Response&& response) {
      auto promise = response.body->readAllText();
      return promise.attach(kj::mv(response.body));
    }).then([n](kj::String body) {
      KJ_EXPECT(body == kj::str("null:/", n));
    });
  };

  // We can do several requests in a row and only have one connection.
  doRequest().wait(io.waitScope);
  doRequest().wait(io.waitScope);
  doRequest().wait(io.waitScope);
  KJ_EXPECT(count == 1);

  // But if we do two in parallel, we'll end up with two connections.
  auto req1 = doRequest();
  auto req2 = doRequest();
  req1.wait(io.waitScope);
  req2.wait(io.waitScope);
  KJ_EXPECT(count == 2);

  // We can reuse after a POST, provided we write the whole POST body properly.
  {
    auto req = client->request(
        HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6));
    req.body->write("foobar", 6).wait(io.waitScope);
    req.response.wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
  }
  KJ_EXPECT(count == 2);
  doRequest().wait(io.waitScope);
  KJ_EXPECT(count == 2);

  // Advance time for half the timeout, then exercise one of the connections.
  clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
  doRequest().wait(io.waitScope);
  doRequest().wait(io.waitScope);
  io.waitScope.poll();
  KJ_EXPECT(count == 2);

  // Advance time past when the other connection should time out. It should be dropped.
  clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout * 3 / 4);
  io.waitScope.poll();
  KJ_EXPECT(count == 1);

  // Wait for the other to drop.
  clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
  io.waitScope.poll();
  KJ_EXPECT(count == 0);

  // New request creates a new connection again.
  doRequest().wait(io.waitScope);
  KJ_EXPECT(count == 1);

  // WebSocket connections are not reused.
  client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable))
      .wait(io.waitScope);
  KJ_EXPECT(count == 0);

  // Errored connections are not reused.
  doRequest().wait(io.waitScope);
  KJ_EXPECT(count == 1);
  client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response
      .wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
  KJ_EXPECT(count == 0);

  // Connections where we failed to read the full response body are not reused.
  doRequest().wait(io.waitScope);
  KJ_EXPECT(count == 1);
  client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)).response
      .wait(io.waitScope);
  KJ_EXPECT(count == 0);

  // Connections where we didn't even wait for the response headers are not reused.
  doRequest().wait(io.waitScope);
  KJ_EXPECT(count == 1);
  client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable));
  KJ_EXPECT(count == 0);

  // Connections where we failed to write the full request body are not reused.
  doRequest().wait(io.waitScope);
  KJ_EXPECT(count == 1);
  client->request(HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6)).response
      .wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
  KJ_EXPECT(count == 0);

#if __linux__
  // TODO(soon): Figure out why this doesn't work on Windows and is flakey on Mac. My guess is that
  //   the closing of the TCP connection propagates synchronously on Linux so that by the time we
  //   poll() the EventPort it reports the client end of the connection has reached EOF, whereas on
  //   Mac and Windows this propagation probably involves some concurrent process which may or may
  //   not complete before we poll(). A solution in this case would be to use a dummy in-memory
  //   ConnectionReceiver that returns in-memory pipes (see UnbufferedPipe earlier in this file),
  //   so that we don't rely on any non-local behavior. Another solution would be to pause for
  //   a short time, maybe.

  // If the server times out the connection, we figure it out on the client.
  doRequest().wait(io.waitScope);
  KJ_EXPECT(count == 1);
  serverTimer.advanceTo(serverTimer.now() + serverSettings.pipelineTimeout * 2);
  io.waitScope.poll();
  KJ_EXPECT(count == 0);
#endif

  // Can still make requests.
  doRequest().wait(io.waitScope);
  KJ_EXPECT(count == 1);
}

KJ_TEST("HttpClient multi host") {
  auto io = kj::setupAsyncIo();

  kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
  kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
  HttpHeaderTable headerTable;

  auto listener1 = io.provider->getNetwork().parseAddress("localhost", 0)
      .wait(io.waitScope)->listen();
  auto listener2 = io.provider->getNetwork().parseAddress("localhost", 0)
      .wait(io.waitScope)->listen();
  DummyService service(headerTable);
  HttpServer server(serverTimer, headerTable, service);
  auto listenTask1 = server.listenHttp(*listener1);
  auto listenTask2 = server.listenHttp(*listener2);

  uint count = 0, addrCount = 0;
  uint tlsCount = 0, tlsAddrCount = 0;
  ConnectionCountingNetwork countingNetwork(io.provider->getNetwork(), count, addrCount);
  ConnectionCountingNetwork countingTlsNetwork(io.provider->getNetwork(), tlsCount, tlsAddrCount);

  HttpClientSettings clientSettings;
  auto client = newHttpClient(clientTimer, headerTable,
      countingNetwork, countingTlsNetwork, clientSettings);

  KJ_EXPECT(count == 0);

  uint i = 0;
  auto doRequest = [&](bool tls, uint port) {
    uint n = i++;
    return client->request(HttpMethod::GET,
        kj::str((tls ? "https://localhost:" : "http://localhost:"), port, '/', n),
                HttpHeaders(headerTable)).response
        .then([](HttpClient::Response&& response) {
      auto promise = response.body->readAllText();
      return promise.attach(kj::mv(response.body));
    }).then([n, port](kj::String body) {
      KJ_EXPECT(body == kj::str("localhost:", port, ":/", n), body, port, n);
    });
  };

  uint port1 = listener1->getPort();
  uint port2 = listener2->getPort();

  // We can do several requests in a row to the same host and only have one connection.
  doRequest(false, port1).wait(io.waitScope);
  doRequest(false, port1).wait(io.waitScope);
  doRequest(false, port1).wait(io.waitScope);
  KJ_EXPECT(count == 1);
  KJ_EXPECT(tlsCount == 0);
  KJ_EXPECT(addrCount == 1);
  KJ_EXPECT(tlsAddrCount == 0);

  // Request a different host, and now we have two connections.
  doRequest(false, port2).wait(io.waitScope);
  KJ_EXPECT(count == 2);
  KJ_EXPECT(tlsCount == 0);
  KJ_EXPECT(addrCount == 2);
  KJ_EXPECT(tlsAddrCount == 0);

  // Try TLS.
  doRequest(true, port1).wait(io.waitScope);
  KJ_EXPECT(count == 2);
  KJ_EXPECT(tlsCount == 1);
  KJ_EXPECT(addrCount == 2);
  KJ_EXPECT(tlsAddrCount == 1);

  // Try first host again, no change in connection count.
  doRequest(false, port1).wait(io.waitScope);
  KJ_EXPECT(count == 2);
  KJ_EXPECT(tlsCount == 1);
  KJ_EXPECT(addrCount == 2);
  KJ_EXPECT(tlsAddrCount == 1);

  // Multiple requests in parallel forces more connections to that host.
  auto promise1 = doRequest(false, port1);
  auto promise2 = doRequest(false, port1);
  promise1.wait(io.waitScope);
  promise2.wait(io.waitScope);
  KJ_EXPECT(count == 3);
  KJ_EXPECT(tlsCount == 1);
  KJ_EXPECT(addrCount == 2);
  KJ_EXPECT(tlsAddrCount == 1);

  // Let everything expire.
  clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout * 2);
  io.waitScope.poll();
  KJ_EXPECT(count == 0);
  KJ_EXPECT(tlsCount == 0);
  KJ_EXPECT(addrCount == 0);
  KJ_EXPECT(tlsAddrCount == 0);

  // We can still request those hosts again.
  doRequest(false, port1).wait(io.waitScope);
  KJ_EXPECT(count == 1);
  KJ_EXPECT(tlsCount == 0);
  KJ_EXPECT(addrCount == 1);
  KJ_EXPECT(tlsAddrCount == 0);
}

// -----------------------------------------------------------------------------

KJ_TEST("HttpClient to capnproto.org") {
  auto io = kj::setupAsyncIo();

  auto maybeConn = io.provider->getNetwork().parseAddress("capnproto.org", 80)
      .then([](kj::Own<kj::NetworkAddress> addr) {
    auto promise = addr->connect();
    return promise.attach(kj::mv(addr));
  }).then([](kj::Own<kj::AsyncIoStream>&& connection) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> {
    return kj::mv(connection);
  }, [](kj::Exception&& e) -> kj::Maybe<kj::Own<kj::AsyncIoStream>> {
    KJ_LOG(WARNING, "skipping test because couldn't connect to capnproto.org");
    return nullptr;
  }).wait(io.waitScope);

  KJ_IF_MAYBE(conn, maybeConn) {
    // Successfully connected to capnproto.org. Try doing GET /. We expect to get a redirect to
    // HTTPS, because what kind of horrible web site would serve in plaintext, really?

    HttpHeaderTable table;
    auto client = newHttpClient(table, **conn);

    HttpHeaders headers(table);
    headers.set(HttpHeaderId::HOST, "capnproto.org");

    auto response = client->request(HttpMethod::GET, "/", headers).response.wait(io.waitScope);
    KJ_EXPECT(response.statusCode / 100 == 3);
    auto location = KJ_ASSERT_NONNULL(response.headers->get(HttpHeaderId::LOCATION));
    KJ_EXPECT(location == "https://capnproto.org/");

    auto body = response.body->readAllText().wait(io.waitScope);
  }
}

}  // namespace
}  // namespace kj