Commit 2751b577 authored by Kenton Varda's avatar Kenton Varda

Implement HttpClient that automatically manages connections.

There are actually two new client types:
- One which always connects to a given NetworkAddress, but will automatically manage a pool of reusable connections.
- One which looks up the remote address based on the URL it is given, and manages a pool of connections for each host.

The latter of these two is a "true HTTP client library".
parent ece2a1aa
......@@ -2355,6 +2355,344 @@ KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
// -----------------------------------------------------------------------------
// TODO(now): Test NetworkAddressHttpClient:
// - Serial requests open only one connection.
// - Parallel requests open multiple connections.
// - Connections time out.
class CountingIoStream final: public kj::AsyncIoStream {
// An AsyncIoStream which waits for a promise to resolve then forwards all calls to the promised
// stream.
//
// TODO(cleanup): Make this more broadly available.
public:
CountingIoStream(kj::Own<kj::AsyncIoStream> inner, uint& count)
: inner(kj::mv(inner)), count(count) {}
~CountingIoStream() noexcept(false) {
--count;
}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
}
kj::Maybe<uint64_t> tryGetLength() override {
return inner->tryGetLength();;
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
return inner->pumpTo(output, amount);
}
kj::Promise<void> write(const void* buffer, size_t size) override {
return inner->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return inner->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return inner->tryPumpFrom(input, amount);
}
void shutdownWrite() override {
return inner->shutdownWrite();
}
void abortRead() override {
return inner->abortRead();
}
public:
kj::Own<AsyncIoStream> inner;
uint& count;
};
class CountingNetworkAddress final: public kj::NetworkAddress {
public:
CountingNetworkAddress(kj::NetworkAddress& inner, uint& count)
: inner(inner), count(count), addrCount(ownAddrCount) {}
CountingNetworkAddress(kj::Own<kj::NetworkAddress> inner, uint& count, uint& addrCount)
: inner(*inner), ownInner(kj::mv(inner)), count(count), addrCount(addrCount) {}
~CountingNetworkAddress() noexcept(false) {
--addrCount;
}
kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override {
++count;
return inner.connect()
.then([this](kj::Own<kj::AsyncIoStream> stream) -> kj::Own<kj::AsyncIoStream> {
return kj::heap<CountingIoStream>(kj::mv(stream), count);
});
}
kj::Own<kj::ConnectionReceiver> listen() override { KJ_UNIMPLEMENTED("test"); }
kj::Own<kj::NetworkAddress> clone() override { KJ_UNIMPLEMENTED("test"); }
kj::String toString() override { KJ_UNIMPLEMENTED("test"); }
private:
kj::NetworkAddress& inner;
kj::Own<kj::NetworkAddress> ownInner;
uint& count;
uint ownAddrCount = 1;
uint& addrCount;
};
class ConnectionCountingNetwork final: public kj::Network {
public:
ConnectionCountingNetwork(kj::Network& inner, uint& count, uint& addrCount)
: inner(inner), count(count), addrCount(addrCount) {}
Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
++addrCount;
return inner.parseAddress(addr, portHint)
.then([this](Own<NetworkAddress>&& addr) -> Own<NetworkAddress> {
return kj::heap<CountingNetworkAddress>(kj::mv(addr), count, addrCount);
});
}
Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
KJ_UNIMPLEMENTED("test");
}
Own<Network> restrictPeers(
kj::ArrayPtr<const kj::StringPtr> allow,
kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
KJ_UNIMPLEMENTED("test");
}
private:
kj::Network& inner;
uint& count;
uint& addrCount;
};
class DummyService final: public HttpService {
public:
DummyService(HttpHeaderTable& headerTable): headerTable(headerTable) {}
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
KJ_ASSERT(url != "/throw");
auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
auto stream = response.send(200, "OK", HttpHeaders(headerTable), body.size());
auto promise = stream->write(body.begin(), body.size());
return promise.attach(kj::mv(stream), kj::mv(body));
}
kj::Promise<void> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers, WebSocketResponse& response) override {
auto ws = response.acceptWebSocket(HttpHeaders(headerTable));
auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
auto sendPromise = ws->send(body);
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(sendPromise.attach(kj::mv(body)));
promises.add(ws->receive().ignoreResult());
return kj::joinPromises(promises.finish()).attach(kj::mv(ws));
}
private:
HttpHeaderTable& headerTable;
};
KJ_TEST("HttpClient connection management") {
auto io = kj::setupAsyncIo();
kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
HttpHeaderTable headerTable;
auto listener = io.provider->getNetwork().parseAddress("localhost", 0)
.wait(io.waitScope)->listen();
DummyService service(headerTable);
HttpServerSettings serverSettings;
HttpServer server(serverTimer, headerTable, service, serverSettings);
auto listenTask = server.listenHttp(*listener);
auto addr = io.provider->getNetwork().parseAddress("localhost", listener->getPort())
.wait(io.waitScope);
uint count = 0;
CountingNetworkAddress countingAddr(*addr, count);
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
KJ_EXPECT(count == 0);
uint i = 0;
auto doRequest = [&]() {
uint n = i++;
return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
.then([](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([n](kj::String body) {
KJ_EXPECT(body == kj::str("null:/", n));
});
};
// We can do several requests in a row and only have one connection.
doRequest().wait(io.waitScope);
doRequest().wait(io.waitScope);
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
// But if we do two in parallel, we'll end up with two connections.
auto req1 = doRequest();
auto req2 = doRequest();
req1.wait(io.waitScope);
req2.wait(io.waitScope);
KJ_EXPECT(count == 2);
// Advance time for half the timeout, then exercise one of the connections.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
doRequest().wait(io.waitScope);
doRequest().wait(io.waitScope);
io.waitScope.poll();
KJ_EXPECT(count == 2);
// Advance time past when the other connection should time out. It should be dropped.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout * 3 / 4);
io.waitScope.poll();
KJ_EXPECT(count == 1);
// Wait for the other to drop.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
io.waitScope.poll();
KJ_EXPECT(count == 0);
// New request creates a new connection again.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
// WebSocket connections are not reused.
client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable))
.wait(io.waitScope);
KJ_EXPECT(count == 0);
// Errored connections are not reused.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response
.wait(io.waitScope);
KJ_EXPECT(count == 0);
// If the server times out the connection, we figure it out on the client.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
serverTimer.advanceTo(serverTimer.now() + serverSettings.pipelineTimeout * 2);
io.waitScope.poll();
KJ_EXPECT(count == 0);
// Can still make requests.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
}
KJ_TEST("HttpClient multi host") {
auto io = kj::setupAsyncIo();
kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
HttpHeaderTable headerTable;
auto listener1 = io.provider->getNetwork().parseAddress("localhost", 0)
.wait(io.waitScope)->listen();
auto listener2 = io.provider->getNetwork().parseAddress("localhost", 0)
.wait(io.waitScope)->listen();
DummyService service(headerTable);
HttpServer server(serverTimer, headerTable, service);
auto listenTask1 = server.listenHttp(*listener1);
auto listenTask2 = server.listenHttp(*listener2);
uint count = 0, addrCount = 0;
uint tlsCount = 0, tlsAddrCount = 0;
ConnectionCountingNetwork countingNetwork(io.provider->getNetwork(), count, addrCount);
ConnectionCountingNetwork countingTlsNetwork(io.provider->getNetwork(), tlsCount, tlsAddrCount);
HttpClientSettings clientSettings;
auto client = newHttpClient(clientTimer, headerTable,
countingNetwork, countingTlsNetwork, clientSettings);
KJ_EXPECT(count == 0);
uint i = 0;
auto doRequest = [&](bool tls, uint port) {
uint n = i++;
return client->request(HttpMethod::GET,
kj::str((tls ? "https://localhost:" : "http://localhost:"), port, '/', n),
HttpHeaders(headerTable)).response
.then([](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([n, port](kj::String body) {
KJ_EXPECT(body == kj::str("localhost:", port, ":/", n), body, port, n);
});
};
uint port1 = listener1->getPort();
uint port2 = listener2->getPort();
// We can do several requests in a row to the same host and only have one connection.
doRequest(false, port1).wait(io.waitScope);
doRequest(false, port1).wait(io.waitScope);
doRequest(false, port1).wait(io.waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 1);
KJ_EXPECT(tlsAddrCount == 0);
// Request a different host, and now we have two connections.
doRequest(false, port2).wait(io.waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 0);
// Try TLS.
doRequest(true, port1).wait(io.waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(tlsCount == 1);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 1);
// Try first host again, no change in connection count.
doRequest(false, port1).wait(io.waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(tlsCount == 1);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 1);
// Multipre requests in parallel forces more connections to that host.
auto promise1 = doRequest(false, port1);
auto promise2 = doRequest(false, port1);
promise1.wait(io.waitScope);
promise2.wait(io.waitScope);
KJ_EXPECT(count == 3);
KJ_EXPECT(tlsCount == 1);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 1);
// Let everything expire.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout * 2);
kj::Promise<void>(kj::NEVER_DONE).poll(io.waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 0);
KJ_EXPECT(tlsAddrCount == 0);
// We can still request those hosts again.
doRequest(false, port1).wait(io.waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 1);
KJ_EXPECT(tlsAddrCount == 0);
}
// -----------------------------------------------------------------------------
KJ_TEST("HttpClient to capnproto.org") {
auto io = kj::setupAsyncIo();
......
......@@ -20,11 +20,14 @@
// THE SOFTWARE.
#include "http.h"
#include "url.h"
#include <kj/debug.h>
#include <kj/parse/char.h>
#include <unordered_map>
#include <stdlib.h>
#include <kj/encoding.h>
#include <deque>
#include <map>
namespace kj {
......@@ -1046,8 +1049,21 @@ public:
// ---------------------------------------------------------------------------
kj::Promise<bool> awaitNextMessage() {
// Waits until more data is available, but doesn't consume it. Only meant for server-side use,
// after a request is handled, to check for pipelined requests. Returns false on EOF.
// Waits until more data is available, but doesn't consume it. Returns false on EOF.
//
// Used on the server after a request is handled, to check for pipelined requests.
//
// Used on the client to detect when idle connections are closed from the server end. (In this
// case, the promise always returns false or is canceled.)
if (onMessageDone != nullptr) {
// We're still working on reading the previous body.
auto fork = messageReadQueue.fork();
messageReadQueue = fork.addBranch();
return fork.addBranch().then([this]() {
return awaitNextMessage();
});
}
// Slightly-crappy code to snarf the expected line break. This will actually eat the leading
// regex /\r*\n?/.
......@@ -2408,17 +2424,26 @@ namespace {
class HttpClientImpl final: public HttpClient {
public:
HttpClientImpl(HttpHeaderTable& responseHeaderTable, kj::Own<kj::AsyncIoStream> rawStream,
kj::Maybe<EntropySource&> entropySource)
HttpClientSettings settings)
: httpInput(*rawStream, responseHeaderTable),
httpOutput(*rawStream),
ownStream(kj::mv(rawStream)),
entropySource(entropySource) {}
settings(kj::mv(settings)) {}
bool canReuse() {
// Returns true if
return !upgraded && !closed;
}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
KJ_REQUIRE(!upgraded,
"can't make further requests on this HttpClient because it has been or is in the process "
"of being upgraded");
KJ_REQUIRE(!closed,
"this HttpClient's connection has been closed by the server or due to an error");
closeWatcherTask = nullptr;
HttpHeaders::ConnectionHeaders connectionHeaders;
kj::String lengthStr;
......@@ -2448,14 +2473,21 @@ public:
auto responsePromise = httpInput.readResponseHeaders()
.then([this,method](kj::Maybe<HttpHeaders::Response>&& response) -> HttpClient::Response {
KJ_IF_MAYBE(r, response) {
return {
HttpClient::Response result {
r->statusCode,
r->statusText,
&httpInput.getHeaders(),
httpInput.getEntityBody(HttpInputStream::RESPONSE, method, r->statusCode,
r->connectionHeaders)
};
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(r->connectionHeaders.connection.cStr())) {
closed = true;
} else {
watchForClose();
}
return result;
} else {
closed = true;
KJ_FAIL_REQUIRE("received invalid HTTP response") { break; }
return HttpClient::Response();
}
......@@ -2469,13 +2501,16 @@ public:
KJ_REQUIRE(!upgraded,
"can't make further requests on this HttpClient because it has been or is in the process "
"of being upgraded");
KJ_REQUIRE(!closed,
"this HttpClient's connection has been closed by the server or due to an error");
closeWatcherTask = nullptr;
// Mark upgraded for now, even though the upgrade could fail, because we can't allow pipelined
// requests in the meantime.
upgraded = true;
byte keyBytes[16];
KJ_ASSERT_NONNULL(this->entropySource,
KJ_ASSERT_NONNULL(settings.entropySource,
"can't use openWebSocket() because no EntropySource was provided when creating the "
"HttpClient").generate(keyBytes);
auto keyBase64 = kj::encodeBase64(keyBytes);
......@@ -2515,17 +2550,23 @@ public:
r->statusCode,
r->statusText,
&httpInput.getHeaders(),
upgradeToWebSocket(kj::mv(ownStream), httpInput, httpOutput, entropySource),
upgradeToWebSocket(kj::mv(ownStream), httpInput, httpOutput, settings.entropySource),
};
} else {
upgraded = false;
return {
HttpClient::WebSocketResponse result {
r->statusCode,
r->statusText,
&httpInput.getHeaders(),
httpInput.getEntityBody(HttpInputStream::RESPONSE, HttpMethod::GET, r->statusCode,
r->connectionHeaders)
};
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(r->connectionHeaders.connection.cStr())) {
closed = true;
} else {
watchForClose();
}
return result;
}
} else {
KJ_FAIL_REQUIRE("received invalid HTTP response") { break; }
......@@ -2538,8 +2579,29 @@ private:
HttpInputStream httpInput;
HttpOutputStream httpOutput;
kj::Own<AsyncIoStream> ownStream;
kj::Maybe<EntropySource&> entropySource;
HttpClientSettings settings;
kj::Maybe<kj::Promise<void>> closeWatcherTask;
bool upgraded = false;
bool closed = false;
void watchForClose() {
closeWatcherTask = httpInput.awaitNextMessage().then([this](bool hasData) {
if (hasData) {
// Uhh... The server sent some data before we asked for anything. Perhaps due to properties
// of this application, the server somehow already knows what the next request will be, and
// it is trying to optimize. Or maybe this is some sort of test and the server is just
// replaying a script. In any case, we will humor it -- leave the data in the buffer and
// let it become the response to the next request.
} else {
// EOF -- server disconnected.
// Proactively free up the socket.
ownStream = nullptr;
closed = true;
}
}).eagerlyEvaluate(nullptr);
}
};
} // namespace
......@@ -2566,10 +2628,579 @@ kj::Promise<kj::Own<kj::AsyncIoStream>> HttpClient::connect(kj::StringPtr host)
kj::Own<HttpClient> newHttpClient(
HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream,
kj::Maybe<EntropySource&> entropySource) {
HttpClientSettings settings) {
return kj::heap<HttpClientImpl>(responseHeaderTable,
kj::Own<kj::AsyncIoStream>(&stream, kj::NullDisposer::instance),
entropySource);
kj::mv(settings));
}
// =======================================================================================
namespace {
class PromiseIoStream final: public kj::AsyncIoStream, private kj::TaskSet::ErrorHandler {
// An AsyncIoStream which waits for a promise to resolve then forwards all calls to the promised
// stream.
//
// TODO(cleanup): Make this more broadly available.
public:
PromiseIoStream(kj::Promise<kj::Own<AsyncIoStream>> promise)
: promise(promise.then([this](kj::Own<AsyncIoStream> result) {
stream = kj::mv(result);
}).fork()),
tasks(*this) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->read(buffer, minBytes, maxBytes);
} else {
return promise.addBranch().then([this,buffer,minBytes,maxBytes]() {
return KJ_ASSERT_NONNULL(stream)->read(buffer, minBytes, maxBytes);
});
}
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->tryRead(buffer, minBytes, maxBytes);
} else {
return promise.addBranch().then([this,buffer,minBytes,maxBytes]() {
return KJ_ASSERT_NONNULL(stream)->tryRead(buffer, minBytes, maxBytes);
});
}
}
kj::Maybe<uint64_t> tryGetLength() override {
KJ_IF_MAYBE(s, stream) {
return s->get()->tryGetLength();
} else {
return nullptr;
}
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->pumpTo(output, amount);
} else {
return promise.addBranch().then([this,&output,amount]() {
return KJ_ASSERT_NONNULL(stream)->pumpTo(output, amount);
});
}
}
kj::Promise<void> write(const void* buffer, size_t size) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->write(buffer, size);
} else {
return promise.addBranch().then([this,buffer,size]() {
return KJ_ASSERT_NONNULL(stream)->write(buffer, size);
});
}
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->write(pieces);
} else {
return promise.addBranch().then([this,pieces]() {
return KJ_ASSERT_NONNULL(stream)->write(pieces);
});
}
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->tryPumpFrom(input, amount);
} else {
return promise.addBranch().then([this,&input,amount]() {
// Call input.pumpTo() on the resolved stream instead.
return input.pumpTo(*KJ_ASSERT_NONNULL(stream), amount);
});
}
}
void shutdownWrite() override {
KJ_IF_MAYBE(s, stream) {
return s->get()->shutdownWrite();
} else {
tasks.add(promise.addBranch().then([this]() {
return KJ_ASSERT_NONNULL(stream)->shutdownWrite();
}));
}
}
void abortRead() override {
KJ_IF_MAYBE(s, stream) {
return s->get()->abortRead();
} else {
tasks.add(promise.addBranch().then([this]() {
return KJ_ASSERT_NONNULL(stream)->abortRead();
}));
}
}
public:
kj::ForkedPromise<void> promise;
kj::Maybe<kj::Own<AsyncIoStream>> stream;
kj::TaskSet tasks;
void taskFailed(kj::Exception&& exception) override {
KJ_LOG(ERROR, exception);
}
};
class PromiseOutputStream final: public kj::AsyncOutputStream {
// An AsyncOutputStream which waits for a promise to resolve then forwards all calls to the
// promised stream.
//
// TODO(cleanup): Make this more broadly available.
// TODO(cleanup): Can this share implementation with PromiseIoStream? Seems hard.
public:
PromiseOutputStream(kj::Promise<kj::Own<AsyncOutputStream>> promise)
: promise(promise.then([this](kj::Own<AsyncOutputStream> result) {
stream = kj::mv(result);
}).fork()) {}
kj::Promise<void> write(const void* buffer, size_t size) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->write(buffer, size);
} else {
return promise.addBranch().then([this,buffer,size]() {
return KJ_ASSERT_NONNULL(stream)->write(buffer, size);
});
}
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->write(pieces);
} else {
return promise.addBranch().then([this,pieces]() {
return KJ_ASSERT_NONNULL(stream)->write(pieces);
});
}
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->tryPumpFrom(input, amount);
} else {
return promise.addBranch().then([this,&input,amount]() {
// Call input.pumpTo() on the resolved stream instead.
return input.pumpTo(*KJ_ASSERT_NONNULL(stream), amount);
});
}
}
public:
kj::ForkedPromise<void> promise;
kj::Maybe<kj::Own<AsyncOutputStream>> stream;
};
class AttachmentOutputStream: public kj::AsyncOutputStream {
// An AsyncOutputStream which also owns some separate object, released when the stream is freed.
public:
AttachmentOutputStream(kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::Refcounted> attachment)
: inner(kj::mv(inner)), attachment(kj::mv(attachment)) {}
kj::Promise<void> write(const void* buffer, size_t size) override {
return inner->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return inner->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return input.pumpTo(*inner, amount);
}
private:
kj::Own<kj::AsyncOutputStream> inner;
kj::Own<kj::Refcounted> attachment;
};
class AttachmentInputStream: public kj::AsyncInputStream {
// An AsyncInputStream which also owns some separate object, released when the stream is freed.
public:
AttachmentInputStream(kj::Own<kj::AsyncInputStream> inner, kj::Own<kj::Refcounted> attachment)
: inner(kj::mv(inner)), attachment(kj::mv(attachment)) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
}
kj::Maybe<uint64_t> tryGetLength() override {
return inner->tryGetLength();
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
return inner->pumpTo(output, amount);
}
private:
kj::Own<kj::AsyncInputStream> inner;
kj::Own<kj::Refcounted> attachment;
};
class NetworkAddressHttpClient final: public HttpClient {
public:
NetworkAddressHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::Own<kj::NetworkAddress> address, HttpClientSettings settings)
: timer(timer),
responseHeaderTable(responseHeaderTable),
address(kj::mv(address)),
settings(kj::mv(settings)) {}
bool isDrained() {
// Returns true if there are no open connections.
return activeConnectionCount == 0 && availableClients.empty();
}
kj::Promise<void> onDrained() {
// Returns a promise which resolves the next time isDrained() transitions from false to true.
auto paf = kj::newPromiseAndFulfiller<void>();
drainedFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
auto refcounted = getClient();
auto result = refcounted->client->request(method, url, headers, expectedBodySize);
result.body = kj::heap<AttachmentOutputStream>(kj::mv(result.body), kj::addRef(*refcounted));
result.response = result.response.then(kj::mvCapture(refcounted,
[](kj::Own<RefcountedClient>&& refcounted, Response&& response) {
response.body = kj::heap<AttachmentInputStream>(kj::mv(response.body), kj::mv(refcounted));
return kj::mv(response);
}));
return result;
}
kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers) override {
auto refcounted = getClient();
auto result = refcounted->client->openWebSocket(url, headers);
return result.then(kj::mvCapture(refcounted,
[](kj::Own<RefcountedClient>&& refcounted, WebSocketResponse&& response) {
KJ_SWITCH_ONEOF(response.webSocketOrBody) {
KJ_CASE_ONEOF(body, kj::Own<kj::AsyncInputStream>) {
response.webSocketOrBody.init<kj::Own<kj::AsyncInputStream>>(
kj::heap<AttachmentInputStream>(kj::mv(body), kj::mv(refcounted)));
}
KJ_CASE_ONEOF(ws, kj::Own<WebSocket>) {
// We actually don't need to attach the HttpClient to the WebSocket -- our HttpClient
// implementation transfers ownership of the connection into the WebSocket.
}
}
return kj::mv(response);
}));
}
private:
kj::Timer& timer;
HttpHeaderTable& responseHeaderTable;
kj::Own<kj::NetworkAddress> address;
HttpClientSettings settings;
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> drainedFulfiller;
uint activeConnectionCount = 0;
bool timeoutsScheduled = false;
kj::Promise<void> timeoutTask = nullptr;
struct AvailableClient {
kj::Own<HttpClientImpl> client;
kj::TimePoint expires;
};
std::deque<AvailableClient> availableClients;
struct RefcountedClient: public kj::Refcounted {
RefcountedClient(NetworkAddressHttpClient& parent, kj::Own<HttpClientImpl> client)
: parent(parent), client(kj::mv(client)) {
++parent.activeConnectionCount;
}
~RefcountedClient() noexcept(false) {
--parent.activeConnectionCount;
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
// Only return the connection to the pool if it is reusable.
if (client->canReuse()) {
parent.availableClients.push_back(AvailableClient {
kj::mv(client), parent.timer.now() + parent.settings.idleTimout
});
}
// Call this either way because it also signals onDrained().
parent.ensureTimetoutsScheduled();
})) {
KJ_LOG(ERROR, *exception);
}
}
NetworkAddressHttpClient& parent;
kj::Own<HttpClientImpl> client;
};
kj::Own<RefcountedClient> getClient() {
for (;;) {
if (availableClients.empty()) {
auto stream = kj::heap<PromiseIoStream>(address->connect());
return kj::refcounted<RefcountedClient>(*this,
kj::heap<HttpClientImpl>(responseHeaderTable, kj::mv(stream), settings));
} else {
auto client = kj::mv(availableClients.back().client);
availableClients.pop_back();
if (client->canReuse()) {
return kj::refcounted<RefcountedClient>(*this, kj::mv(client));
}
// Whoops, this client's connection was closed by the server at some point. Discard.
}
}
}
void ensureTimetoutsScheduled() {
if (!timeoutsScheduled) {
timeoutsScheduled = true;
timeoutTask = applyTimeouts();
}
}
kj::Promise<void> applyTimeouts() {
if (availableClients.empty()) {
timeoutsScheduled = false;
if (activeConnectionCount == 0) {
KJ_IF_MAYBE(f, drainedFulfiller) {
f->get()->fulfill();
drainedFulfiller = nullptr;
}
}
return kj::READY_NOW;
} else {
auto time = availableClients.front().expires;
return timer.atTime(time).then([this,time]() {
while (!availableClients.empty() && availableClients.front().expires <= time) {
availableClients.pop_front();
}
return applyTimeouts();
});
}
}
};
class PromiseNetworkAddressHttpClient final: public HttpClient {
// An HttpClient which waits for a promise to resolve then forwards all calls to the promised
// client.
public:
PromiseNetworkAddressHttpClient(kj::Promise<kj::Own<NetworkAddressHttpClient>> promise)
: promise(promise.then([this](kj::Own<NetworkAddressHttpClient>&& client) {
this->client = kj::mv(client);
}).fork()) {}
bool isDrained() {
KJ_IF_MAYBE(c, client) {
return c->get()->isDrained();
} else {
return false;
}
}
kj::Promise<void> onDrained() {
KJ_IF_MAYBE(c, client) {
return c->get()->onDrained();
} else {
return promise.addBranch().then([this]() {
return KJ_ASSERT_NONNULL(client)->onDrained();
}, [](kj::Exception&& e) {
// Connecting failed. Treat as immediately drained.
return kj::READY_NOW;
});
}
}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
KJ_IF_MAYBE(c, client) {
return c->get()->request(method, url, headers, expectedBodySize);
} else {
// This gets complicated since request() returns a pair of a stream and a promise.
auto urlCopy = kj::str(url);
auto headersCopy = headers.clone();
auto combined = promise.addBranch().then(kj::mvCapture(urlCopy, kj::mvCapture(headersCopy,
[this,method,expectedBodySize](HttpHeaders&& headers, kj::String&& url)
-> kj::Tuple<kj::Own<kj::AsyncOutputStream>, kj::Promise<Response>> {
auto req = KJ_ASSERT_NONNULL(client)->request(method, url, headers, expectedBodySize);
return kj::tuple(kj::mv(req.body), kj::mv(req.response));
})));
auto split = combined.split();
return {
kj::heap<PromiseOutputStream>(kj::mv(kj::get<0>(split))),
kj::mv(kj::get<1>(split))
};
}
}
kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers) override {
KJ_IF_MAYBE(c, client) {
return c->get()->openWebSocket(url, headers);
} else {
auto urlCopy = kj::str(url);
auto headersCopy = headers.clone();
return promise.addBranch().then(kj::mvCapture(urlCopy, kj::mvCapture(headersCopy,
[this](HttpHeaders&& headers, kj::String&& url) {
return KJ_ASSERT_NONNULL(client)->openWebSocket(url, headers);
})));
}
}
private:
kj::ForkedPromise<void> promise;
kj::Maybe<kj::Own<NetworkAddressHttpClient>> client;
};
class NetworkHttpClient final: public HttpClient, private kj::TaskSet::ErrorHandler {
public:
NetworkHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::Network& network, kj::Maybe<kj::Network&> tlsNetwork,
HttpClientSettings settings)
: timer(timer),
responseHeaderTable(responseHeaderTable),
network(network),
tlsNetwork(tlsNetwork),
settings(kj::mv(settings)),
tasks(*this) {}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
auto parsed = Url::parse(url, Url::HTTP_PROXY_REQUEST);
auto path = parsed.toString(Url::HTTP_REQUEST);
auto headersCopy = headers.clone();
headersCopy.set(HttpHeaderId::HOST, parsed.host);
return getClient(parsed).request(method, path, headersCopy, expectedBodySize);
}
kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers) override {
auto parsed = Url::parse(url, Url::HTTP_PROXY_REQUEST);
auto path = parsed.toString(Url::HTTP_REQUEST);
auto headersCopy = headers.clone();
headersCopy.set(HttpHeaderId::HOST, parsed.host);
return getClient(parsed).openWebSocket(path, headersCopy);
}
private:
kj::Timer& timer;
HttpHeaderTable& responseHeaderTable;
kj::Network& network;
kj::Maybe<kj::Network&> tlsNetwork;
HttpClientSettings settings;
struct Host {
kj::String name; // including port, if non-default
kj::Own<PromiseNetworkAddressHttpClient> client;
};
std::map<kj::StringPtr, Host> httpHosts;
std::map<kj::StringPtr, Host> httpsHosts;
struct RequestInfo {
HttpMethod method;
kj::String hostname;
kj::String path;
HttpHeaders headers;
kj::Maybe<uint64_t> expectedBodySize;
};
kj::TaskSet tasks;
HttpClient& getClient(kj::Url& parsed) {
bool isHttps = parsed.scheme == "https";
bool isHttp = parsed.scheme == "http";
KJ_REQUIRE(isHttp || isHttps);
auto& hosts = isHttps ? httpsHosts : httpHosts;
// Look for a cached client for this host.
// TODO(perf): It would be nice to recognize when different hosts have the same address and
// reuse the same connection pool, but:
// - We'd need a reliable way to compare NetworkAddresses, e.g. .equals() and .hashCode().
// It's very Java... ick.
// - Correctly handling TLS would be tricky: we'd need to verify that the new hostname is
// on the certificate. When SNI is in use we might have to request an additional
// certificate (is that possible?).
auto iter = hosts.find(parsed.host);
if (iter == hosts.end()) {
// Need to open a new connection.
kj::Network* networkToUse = &network;
if (isHttps) {
networkToUse = &KJ_REQUIRE_NONNULL(tlsNetwork, "this HttpClient doesn't support HTTPS");
}
auto paf = kj::newPromiseAndFulfiller<kj::Promise<void>>();
auto promise = networkToUse->parseAddress(parsed.host, isHttps ? 443 : 80)
.then([this](kj::Own<kj::NetworkAddress> addr) {
return kj::heap<NetworkAddressHttpClient>(
timer, responseHeaderTable, kj::mv(addr), settings);
});
Host host {
kj::mv(parsed.host),
kj::heap<PromiseNetworkAddressHttpClient>(kj::mv(promise))
};
kj::StringPtr nameRef = host.name;
auto insertResult = hosts.insert(std::make_pair(nameRef, kj::mv(host)));
KJ_ASSERT(insertResult.second);
iter = insertResult.first;
tasks.add(handleCleanup(hosts, iter));
}
return *iter->second.client;
}
kj::Promise<void> handleCleanup(std::map<kj::StringPtr, Host>& hosts,
std::map<kj::StringPtr, Host>::iterator iter) {
return iter->second.client->onDrained()
.then([this,&hosts,iter]() -> kj::Promise<void> {
// Double-check that it's really drained to avoid race conditions.
if (iter->second.client->isDrained()) {
hosts.erase(iter);
return kj::READY_NOW;
} else {
return handleCleanup(hosts, iter);
}
});
}
void taskFailed(kj::Exception&& exception) override {
KJ_LOG(ERROR, exception);
}
};
} // namespace
kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::NetworkAddress& addr, HttpClientSettings settings) {
return kj::heap<NetworkAddressHttpClient>(timer, responseHeaderTable,
kj::Own<kj::NetworkAddress>(&addr, kj::NullDisposer::instance), kj::mv(settings));
}
kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::Network& network, kj::Maybe<kj::Network&> tlsNetwork,
HttpClientSettings settings) {
return kj::heap<NetworkHttpClient>(
timer, responseHeaderTable, network, tlsNetwork, kj::mv(settings));
}
// =======================================================================================
......
......@@ -562,26 +562,53 @@ public:
// UNIMPLEMENTED.
};
kj::Own<HttpClient> newHttpClient(HttpHeaderTable& responseHeaderTable, kj::Network& network,
kj::Maybe<kj::Network&> tlsNetwork = nullptr,
kj::Maybe<EntropySource&> entropySource = nullptr);
// Creates a proxy HttpClient that connects to hosts over the given network.
struct HttpClientSettings {
kj::Duration idleTimout = 5 * kj::SECONDS;
// For clients which automatically create new connections, any connection idle for at least this
// long will be closed.
kj::Maybe<EntropySource&> entropySource = nullptr;
// Must be provided in order to use `openWebSocket`. If you don't need WebSockets, this can be
// omitted. The WebSocket protocol uses random values to avoid triggering flaws (including
// security flaws) in certain HTTP proxy software. Specifically, entropy is used to generate the
// `Sec-WebSocket-Key` header and to generate frame masks. If you know that there are no broken
// or vulnerable proxies between you and the server, you can provide an dummy entropy source that
// doesn't generate real entropy (e.g. returning the same value every time). Otherwise, you must
// provide a cryptographically-random entropy source.
};
kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::Network& network, kj::Maybe<kj::Network&> tlsNetwork,
HttpClientSettings settings = HttpClientSettings());
// Creates a proxy HttpClient that connects to hosts over the given network. The URL must always
// be an absolute URL; the host is parsed from the URL. This implementation will automatically
// add an appropriate Host header (and convert the URL to just a path) once it has connected.
//
// Note that if you wish to route traffic through an HTTP proxy server rather than connect to
// remote hosts directly, you should use the form of newHttpClient() that takes a NetworkAddress,
// and supply the proxy's address.
//
// `responseHeaderTable` is used when parsing HTTP responses. Requests can use any header table.
//
// `tlsNetwork` is required to support HTTPS destination URLs. Otherwise, only HTTP URLs can be
// `tlsNetwork` is required to support HTTPS destination URLs. If null, only HTTP URLs can be
// fetched.
kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::NetworkAddress& addr,
HttpClientSettings settings = HttpClientSettings());
// Creates an HttpClient that always connects to the given address no matter what URL is requested.
// The client will open and close connections as needed. It will attempt to reuse connections for
// multiple requests but will not send a new request before the previous response on the same
// connection has completed, as doing so can result in head-of-line blocking issues. The client may
// be used as a proxy client or a host client depending on whether the peer is operating as
// a proxy. (Hint: This is the best kind of client to use when routing traffic through an HTTP
// proxy. `addr` should be the address of the proxy, and the proxy itself will resolve remote hosts
// based on the URLs passed to it.)
//
// `entropySource` must be provided in order to use `openWebSocket`. If you don't need WebSockets,
// `entropySource` can be omitted. The WebSocket protocol uses random values to avoid triggering
// flaws (including security flaws) in certain HTTP proxy software. Specifically, entropy is used
// to generate the `Sec-WebSocket-Key` header and to generate frame masks. If you know that there
// are no broken or vulnerable proxies between you and the server, you can provide an dummy entropy
// source that doesn't generate real entropy (e.g. returning the same value every time). Otherwise,
// you must provide a cryptographically-random entropy source.
// `responseHeaderTable` is used when parsing HTTP responses. Requests can use any header table.
kj::Own<HttpClient> newHttpClient(HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream,
kj::Maybe<EntropySource&> entropySource = nullptr);
HttpClientSettings settings = HttpClientSettings());
// Creates an HttpClient that speaks over the given pre-established connection. The client may
// be used as a proxy client or a host client depending on whether the peer is operating as
// a proxy.
......@@ -591,14 +618,12 @@ kj::Own<HttpClient> newHttpClient(HttpHeaderTable& responseHeaderTable, kj::Asyn
// fail as well. If the destination server chooses to close the connection after a response,
// subsequent requests will fail. If a response takes a long time, it blocks subsequent responses.
// If a WebSocket is opened successfully, all subsequent requests fail.
//
// `entropySource` must be provided in order to use `openWebSocket`. If you don't need WebSockets,
// `entropySource` can be omitted. The WebSocket protocol uses random values to avoid triggering
// flaws (including security flaws) in certain HTTP proxy software. Specifically, entropy is used
// to generate the `Sec-WebSocket-Key` header and to generate frame masks. If you know that there
// are no broken or vulnerable proxies between you and the server, you can provide an dummy entropy
// source that doesn't generate real entropy (e.g. returning the same value every time). Otherwise,
// you must provide a cryptographically-random entropy source.
kj::Own<HttpClient> newHttpClient(
HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream,
kj::Maybe<EntropySource&> entropySource) KJ_DEPRECATED("use HttpClientSettings");
// Temporary for backwards-compatibilty.
// TODO(soon): Remove this before next release.
kj::Own<HttpClient> newHttpClient(HttpService& service);
kj::Own<HttpService> newHttpService(HttpClient& client);
......@@ -726,6 +751,14 @@ inline void HttpHeaders::forEach(Func&& func) const {
}
}
inline kj::Own<HttpClient> newHttpClient(
HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream,
kj::Maybe<EntropySource&> entropySource) {
HttpClientSettings settings;
settings.entropySource = entropySource;
return newHttpClient(responseHeaderTable, stream, kj::mv(settings));
}
} // namespace kj
#endif // KJ_COMPAT_HTTP_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment