Unverified Commit 147c0e54 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #739 from capnproto/disable-connection-reuse

Make it possible to configure HttpClient to not reuse connections at all.
parents ab359776 d3bc948e
...@@ -2604,16 +2604,18 @@ public: ...@@ -2604,16 +2604,18 @@ public:
class CountingNetworkAddress final: public kj::NetworkAddress { class CountingNetworkAddress final: public kj::NetworkAddress {
public: public:
CountingNetworkAddress(kj::NetworkAddress& inner, uint& count) CountingNetworkAddress(kj::NetworkAddress& inner, uint& count, uint& cumulative)
: inner(inner), count(count), addrCount(ownAddrCount) {} : inner(inner), count(count), addrCount(ownAddrCount), cumulative(cumulative) {}
CountingNetworkAddress(kj::Own<kj::NetworkAddress> inner, uint& count, uint& addrCount) CountingNetworkAddress(kj::Own<kj::NetworkAddress> inner, uint& count, uint& addrCount)
: inner(*inner), ownInner(kj::mv(inner)), count(count), addrCount(addrCount) {} : inner(*inner), ownInner(kj::mv(inner)), count(count), addrCount(addrCount),
cumulative(ownCumulative) {}
~CountingNetworkAddress() noexcept(false) { ~CountingNetworkAddress() noexcept(false) {
--addrCount; --addrCount;
} }
kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override { kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override {
++count; ++count;
++cumulative;
return inner.connect() return inner.connect()
.then([this](kj::Own<kj::AsyncIoStream> stream) -> kj::Own<kj::AsyncIoStream> { .then([this](kj::Own<kj::AsyncIoStream> stream) -> kj::Own<kj::AsyncIoStream> {
return kj::heap<CountingIoStream>(kj::mv(stream), count); return kj::heap<CountingIoStream>(kj::mv(stream), count);
...@@ -2630,6 +2632,8 @@ private: ...@@ -2630,6 +2632,8 @@ private:
uint& count; uint& count;
uint ownAddrCount = 1; uint ownAddrCount = 1;
uint& addrCount; uint& addrCount;
uint ownCumulative = 0;
uint& cumulative;
}; };
class ConnectionCountingNetwork final: public kj::Network { class ConnectionCountingNetwork final: public kj::Network {
...@@ -2710,7 +2714,8 @@ KJ_TEST("HttpClient connection management") { ...@@ -2710,7 +2714,8 @@ KJ_TEST("HttpClient connection management") {
auto addr = io.provider->getNetwork().parseAddress("localhost", listener->getPort()) auto addr = io.provider->getNetwork().parseAddress("localhost", listener->getPort())
.wait(io.waitScope); .wait(io.waitScope);
uint count = 0; uint count = 0;
CountingNetworkAddress countingAddr(*addr, count); uint cumulative = 0;
CountingNetworkAddress countingAddr(*addr, count, cumulative);
FakeEntropySource entropySource; FakeEntropySource entropySource;
HttpClientSettings clientSettings; HttpClientSettings clientSettings;
...@@ -2718,6 +2723,7 @@ KJ_TEST("HttpClient connection management") { ...@@ -2718,6 +2723,7 @@ KJ_TEST("HttpClient connection management") {
auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings); auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
KJ_EXPECT(count == 0); KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 0);
uint i = 0; uint i = 0;
auto doRequest = [&]() { auto doRequest = [&]() {
...@@ -2736,6 +2742,7 @@ KJ_TEST("HttpClient connection management") { ...@@ -2736,6 +2742,7 @@ KJ_TEST("HttpClient connection management") {
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 1);
// But if we do two in parallel, we'll end up with two connections. // But if we do two in parallel, we'll end up with two connections.
auto req1 = doRequest(); auto req1 = doRequest();
...@@ -2743,6 +2750,7 @@ KJ_TEST("HttpClient connection management") { ...@@ -2743,6 +2750,7 @@ KJ_TEST("HttpClient connection management") {
req1.wait(io.waitScope); req1.wait(io.waitScope);
req2.wait(io.waitScope); req2.wait(io.waitScope);
KJ_EXPECT(count == 2); KJ_EXPECT(count == 2);
KJ_EXPECT(cumulative == 2);
// We can reuse after a POST, provided we write the whole POST body properly. // We can reuse after a POST, provided we write the whole POST body properly.
{ {
...@@ -2752,8 +2760,10 @@ KJ_TEST("HttpClient connection management") { ...@@ -2752,8 +2760,10 @@ KJ_TEST("HttpClient connection management") {
req.response.wait(io.waitScope).body->readAllBytes().wait(io.waitScope); req.response.wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
} }
KJ_EXPECT(count == 2); KJ_EXPECT(count == 2);
KJ_EXPECT(cumulative == 2);
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 2); KJ_EXPECT(count == 2);
KJ_EXPECT(cumulative == 2);
// Advance time for half the timeout, then exercise one of the connections. // Advance time for half the timeout, then exercise one of the connections.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2); clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
...@@ -2761,52 +2771,65 @@ KJ_TEST("HttpClient connection management") { ...@@ -2761,52 +2771,65 @@ KJ_TEST("HttpClient connection management") {
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
io.waitScope.poll(); io.waitScope.poll();
KJ_EXPECT(count == 2); KJ_EXPECT(count == 2);
KJ_EXPECT(cumulative == 2);
// Advance time past when the other connection should time out. It should be dropped. // Advance time past when the other connection should time out. It should be dropped.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout * 3 / 4); clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout * 3 / 4);
io.waitScope.poll(); io.waitScope.poll();
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 2);
// Wait for the other to drop. // Wait for the other to drop.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2); clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
io.waitScope.poll(); io.waitScope.poll();
KJ_EXPECT(count == 0); KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 2);
// New request creates a new connection again. // New request creates a new connection again.
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 3);
// WebSocket connections are not reused. // WebSocket connections are not reused.
client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable)) client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable))
.wait(io.waitScope); .wait(io.waitScope);
KJ_EXPECT(count == 0); KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 3);
// Errored connections are not reused. // Errored connections are not reused.
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 4);
client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response
.wait(io.waitScope).body->readAllBytes().wait(io.waitScope); .wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
KJ_EXPECT(count == 0); KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 4);
// Connections where we failed to read the full response body are not reused. // Connections where we failed to read the full response body are not reused.
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 5);
client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)).response client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)).response
.wait(io.waitScope); .wait(io.waitScope);
KJ_EXPECT(count == 0); KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 5);
// Connections where we didn't even wait for the response headers are not reused. // Connections where we didn't even wait for the response headers are not reused.
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 6);
client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable)); client->request(HttpMethod::GET, kj::str("/foo"), HttpHeaders(headerTable));
KJ_EXPECT(count == 0); KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 6);
// Connections where we failed to write the full request body are not reused. // Connections where we failed to write the full request body are not reused.
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 7);
client->request(HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6)).response client->request(HttpMethod::POST, kj::str("/foo"), HttpHeaders(headerTable), size_t(6)).response
.wait(io.waitScope).body->readAllBytes().wait(io.waitScope); .wait(io.waitScope).body->readAllBytes().wait(io.waitScope);
KJ_EXPECT(count == 0); KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 7);
#if __linux__ #if __linux__
// TODO(someday): Figure out why this doesn't work on Windows and is flakey on Mac. My guess is // TODO(someday): Figure out why this doesn't work on Windows and is flakey on Mac. My guess is
...@@ -2821,14 +2844,76 @@ KJ_TEST("HttpClient connection management") { ...@@ -2821,14 +2844,76 @@ KJ_TEST("HttpClient connection management") {
// If the server times out the connection, we figure it out on the client. // If the server times out the connection, we figure it out on the client.
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 8);
serverTimer.advanceTo(serverTimer.now() + serverSettings.pipelineTimeout * 2); serverTimer.advanceTo(serverTimer.now() + serverSettings.pipelineTimeout * 2);
io.waitScope.poll(); io.waitScope.poll();
KJ_EXPECT(count == 0); KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 8);
#else
++cumulative; // hack
#endif #endif
// Can still make requests. // Can still make requests.
doRequest().wait(io.waitScope); doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1); KJ_EXPECT(count == 1);
KJ_EXPECT(cumulative == 9);
}
KJ_TEST("HttpClient disable connection reuse") {
auto io = kj::setupAsyncIo();
kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
HttpHeaderTable headerTable;
auto listener = io.provider->getNetwork().parseAddress("localhost", 0)
.wait(io.waitScope)->listen();
DummyService service(headerTable);
HttpServerSettings serverSettings;
HttpServer server(serverTimer, headerTable, service, serverSettings);
auto listenTask = server.listenHttp(*listener);
auto addr = io.provider->getNetwork().parseAddress("localhost", listener->getPort())
.wait(io.waitScope);
uint count = 0;
uint cumulative = 0;
CountingNetworkAddress countingAddr(*addr, count, cumulative);
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
clientSettings.idleTimout = 0 * kj::SECONDS;
auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 0);
uint i = 0;
auto doRequest = [&]() {
uint n = i++;
return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
.then([](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([n](kj::String body) {
KJ_EXPECT(body == kj::str("null:/", n));
});
};
// We can do several requests in a row and only have one connection.
doRequest().wait(io.waitScope);
doRequest().wait(io.waitScope);
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 3);
// But if we do two in parallel, we'll end up with two connections.
auto req1 = doRequest();
auto req2 = doRequest();
req1.wait(io.waitScope);
req2.wait(io.waitScope);
KJ_EXPECT(count == 0);
KJ_EXPECT(cumulative == 5);
} }
KJ_TEST("HttpClient multi host") { KJ_TEST("HttpClient multi host") {
......
...@@ -3516,8 +3516,9 @@ private: ...@@ -3516,8 +3516,9 @@ private:
} }
void returnClientToAvailable(kj::Own<HttpClientImpl> client) { void returnClientToAvailable(kj::Own<HttpClientImpl> client) {
// Only return the connection to the pool if it is reusable. // Only return the connection to the pool if it is reusable and if our settings indicate we
if (client->canReuse()) { // should reuse connections.
if (client->canReuse() && settings.idleTimout > 0 * kj::SECONDS) {
availableClients.push_back(AvailableClient { availableClients.push_back(AvailableClient {
kj::mv(client), timer.now() + settings.idleTimout kj::mv(client), timer.now() + settings.idleTimout
}); });
......
...@@ -583,7 +583,7 @@ public: ...@@ -583,7 +583,7 @@ public:
struct HttpClientSettings { struct HttpClientSettings {
kj::Duration idleTimout = 5 * kj::SECONDS; kj::Duration idleTimout = 5 * kj::SECONDS;
// For clients which automatically create new connections, any connection idle for at least this // For clients which automatically create new connections, any connection idle for at least this
// long will be closed. // long will be closed. Set this to 0 to prevent connection reuse entirely.
kj::Maybe<EntropySource&> entropySource = nullptr; kj::Maybe<EntropySource&> entropySource = nullptr;
// Must be provided in order to use `openWebSocket`. If you don't need WebSockets, this can be // Must be provided in order to use `openWebSocket`. If you don't need WebSockets, this can be
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment