Commit 0c482f58 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #557 from capnproto/http-client

Implement HttpClient that automatically manages connections. 
parents 99858df2 480e1f48
......@@ -136,8 +136,12 @@ class PromiseNode {
// internal implementation details.
public:
virtual void onReady(Event& event) noexcept = 0;
virtual void onReady(Event* event) noexcept = 0;
// Arms the given event when ready.
//
// May be called multiple times. If called again before the event was armed, the old event will
// never be armed, only the new one. If called again after the event was armed, the new event
// will be armed immediately. Can be called with nullptr to un-register the existing event.
virtual void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept;
// Tells the node that `selfPtr` is the pointer that owns this node, and will continue to own
......@@ -159,12 +163,11 @@ protected:
// Helper class for implementing onReady().
public:
void init(Event& newEvent);
// Returns true if arm() was already called.
void init(Event* newEvent);
void arm();
// Arms the event if init() has already been called and makes future calls to init() return
// true.
// Arms the event if init() has already been called and makes future calls to init()
// automatically arm the event.
private:
Event* event = nullptr;
......@@ -178,7 +181,7 @@ public:
ImmediatePromiseNodeBase();
~ImmediatePromiseNodeBase() noexcept(false);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
};
template <typename T>
......@@ -212,7 +215,7 @@ class AttachmentPromiseNodeBase: public PromiseNode {
public:
AttachmentPromiseNodeBase(Own<PromiseNode>&& dependency);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -338,7 +341,7 @@ class TransformPromiseNodeBase: public PromiseNode {
public:
TransformPromiseNodeBase(Own<PromiseNode>&& dependency, void* continuationTracePtr);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -410,7 +413,7 @@ public:
// Called by the hub to indicate that it is ready.
// implements PromiseNode ------------------------------------------
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
PromiseNode* getInnerForTrace() override;
protected:
......@@ -545,7 +548,7 @@ public:
explicit ChainPromiseNode(Own<PromiseNode> inner);
~ChainPromiseNode() noexcept(false);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
void setSelfPointer(Own<PromiseNode>* selfPtr) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -585,7 +588,7 @@ public:
ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right);
~ExclusiveJoinPromiseNode() noexcept(false);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
void get(ExceptionOrValue& output) noexcept override;
PromiseNode* getInnerForTrace() override;
......@@ -619,7 +622,7 @@ public:
ExceptionOrValue* resultParts, size_t partSize);
~ArrayJoinPromiseNodeBase() noexcept(false);
void onReady(Event& event) noexcept override final;
void onReady(Event* event) noexcept override final;
void get(ExceptionOrValue& output) noexcept override final;
PromiseNode* getInnerForTrace() override final;
......@@ -698,7 +701,7 @@ class EagerPromiseNodeBase: public PromiseNode, protected Event {
public:
EagerPromiseNodeBase(Own<PromiseNode>&& dependency, ExceptionOrValue& resultRef);
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
PromiseNode* getInnerForTrace() override;
private:
......@@ -735,7 +738,7 @@ Own<PromiseNode> spark(Own<PromiseNode>&& node) {
class AdapterPromiseNodeBase: public PromiseNode {
public:
void onReady(Event& event) noexcept override;
void onReady(Event* event) noexcept override;
protected:
inline void setReady() {
......@@ -854,7 +857,7 @@ template <typename T>
T Promise<T>::wait(WaitScope& waitScope) {
_::ExceptionOr<_::FixVoid<T>> result;
waitImpl(kj::mv(node), result, waitScope);
_::waitImpl(kj::mv(node), result, waitScope);
KJ_IF_MAYBE(value, result.value) {
KJ_IF_MAYBE(exception, result.exception) {
......@@ -875,7 +878,7 @@ inline void Promise<void>::wait(WaitScope& waitScope) {
_::ExceptionOr<_::Void> result;
waitImpl(kj::mv(node), result, waitScope);
_::waitImpl(kj::mv(node), result, waitScope);
if (result.value != nullptr) {
KJ_IF_MAYBE(exception, result.exception) {
......@@ -889,6 +892,11 @@ inline void Promise<void>::wait(WaitScope& waitScope) {
}
}
template <typename T>
bool Promise<T>::poll(WaitScope& waitScope) {
return _::pollImpl(*node, waitScope);
}
template <typename T>
ForkedPromise<T> Promise<T>::fork() {
return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node)));
......
......@@ -199,6 +199,7 @@ private:
void detach(kj::Promise<void>&& promise);
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope);
bool pollImpl(_::PromiseNode& node, WaitScope& waitScope);
Promise<void> yield();
Own<PromiseNode> neverDone();
......
......@@ -748,5 +748,16 @@ TEST(Async, SetRunnable) {
}
}
TEST(Async, Poll) {
EventLoop loop;
WaitScope waitScope(loop);
auto paf = newPromiseAndFulfiller<void>();
KJ_ASSERT(!paf.promise.poll(waitScope));
paf.fulfiller->fulfill();
KJ_ASSERT(paf.promise.poll(waitScope));
paf.promise.wait(waitScope);
}
} // namespace
} // namespace kj
......@@ -66,8 +66,8 @@ public:
class YieldPromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event& event) noexcept override {
event.armBreadthFirst();
void onReady(_::Event* event) noexcept override {
if (event) event->armBreadthFirst();
}
void get(_::ExceptionOrValue& output) noexcept override {
output.as<_::Void>() = _::Void();
......@@ -76,7 +76,7 @@ public:
class NeverDonePromiseNode final: public _::PromiseNode {
public:
void onReady(_::Event& event) noexcept override {
void onReady(_::Event* event) noexcept override {
// ignore
}
void get(_::ExceptionOrValue& output) noexcept override {
......@@ -108,7 +108,7 @@ public:
Task(TaskSetImpl& taskSet, Own<_::PromiseNode>&& nodeParam)
: taskSet(taskSet), node(kj::mv(nodeParam)) {
node->setSelfPointer(&node);
node->onReady(*this);
node->onReady(this);
}
protected:
......@@ -312,6 +312,26 @@ void EventLoop::leaveScope() {
threadLocalEventLoop = nullptr;
}
void WaitScope::poll() {
KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
loop.running = true;
KJ_DEFER(loop.running = false);
for (;;) {
if (!loop.turn()) {
// No events in the queue. Poll for I/O.
loop.port.poll();
if (!loop.isRunnable()) {
// Still no events in the queue. We're done.
return;
}
}
}
}
namespace _ { // private
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope) {
......@@ -321,7 +341,7 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope
BoolEvent doneEvent;
node->setSelfPointer(&node);
node->onReady(doneEvent);
node->onReady(&doneEvent);
loop.running = true;
KJ_DEFER(loop.running = false);
......@@ -343,6 +363,35 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope
}
}
bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) {
EventLoop& loop = waitScope.loop;
KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
KJ_REQUIRE(!loop.running, "poll() is not allowed from within event callbacks.");
BoolEvent doneEvent;
node.onReady(&doneEvent);
loop.running = true;
KJ_DEFER(loop.running = false);
while (!doneEvent.fired) {
if (!loop.turn()) {
// No events in the queue. Poll for I/O.
loop.port.poll();
if (!doneEvent.fired && !loop.isRunnable()) {
// No progress. Give up.
node.onReady(nullptr);
loop.setRunnable(false);
return false;
}
}
}
loop.setRunnable(loop.isRunnable());
return true;
}
Promise<void> yield() {
return Promise<void>(false, kj::heap<YieldPromiseNode>());
}
......@@ -498,26 +547,28 @@ void PromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {}
PromiseNode* PromiseNode::getInnerForTrace() { return nullptr; }
void PromiseNode::OnReadyEvent::init(Event& newEvent) {
void PromiseNode::OnReadyEvent::init(Event* newEvent) {
if (event == _kJ_ALREADY_READY) {
// A new continuation was added to a promise that was already ready. In this case, we schedule
// breadth-first, to make it difficult for applications to accidentally starve the event loop
// by repeatedly waiting on immediate promises.
newEvent.armBreadthFirst();
if (newEvent) newEvent->armBreadthFirst();
} else {
event = &newEvent;
event = newEvent;
}
}
void PromiseNode::OnReadyEvent::arm() {
if (event == nullptr) {
event = _kJ_ALREADY_READY;
} else {
KJ_ASSERT(event != _kJ_ALREADY_READY, "arm() should only be called once");
if (event != nullptr) {
// A promise resolved and an event is already waiting on it. In this case, arm in depth-first
// order so that the event runs immediately after the current one. This way, chained promises
// execute together for better cache locality and lower latency.
event->armDepthFirst();
}
event = _kJ_ALREADY_READY;
}
// -------------------------------------------------------------------
......@@ -525,8 +576,8 @@ void PromiseNode::OnReadyEvent::arm() {
ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
ImmediatePromiseNodeBase::~ImmediatePromiseNodeBase() noexcept(false) {}
void ImmediatePromiseNodeBase::onReady(Event& event) noexcept {
event.armBreadthFirst();
void ImmediatePromiseNodeBase::onReady(Event* event) noexcept {
if (event) event->armBreadthFirst();
}
ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception)
......@@ -543,7 +594,7 @@ AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependen
dependency->setSelfPointer(&dependency);
}
void AttachmentPromiseNodeBase::onReady(Event& event) noexcept {
void AttachmentPromiseNodeBase::onReady(Event* event) noexcept {
dependency->onReady(event);
}
......@@ -567,7 +618,7 @@ TransformPromiseNodeBase::TransformPromiseNodeBase(
dependency->setSelfPointer(&dependency);
}
void TransformPromiseNodeBase::onReady(Event& event) noexcept {
void TransformPromiseNodeBase::onReady(Event* event) noexcept {
dependency->onReady(event);
}
......@@ -635,7 +686,7 @@ void ForkBranchBase::releaseHub(ExceptionOrValue& output) {
}
}
void ForkBranchBase::onReady(Event& event) noexcept {
void ForkBranchBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......@@ -648,7 +699,7 @@ PromiseNode* ForkBranchBase::getInnerForTrace() {
ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& resultRef)
: inner(kj::mv(innerParam)), resultRef(resultRef) {
inner->setSelfPointer(&inner);
inner->onReady(*this);
inner->onReady(this);
}
Maybe<Own<Event>> ForkHubBase::fire() {
......@@ -682,16 +733,16 @@ _::PromiseNode* ForkHubBase::getInnerForTrace() {
ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam)
: state(STEP1), inner(kj::mv(innerParam)) {
inner->setSelfPointer(&inner);
inner->onReady(*this);
inner->onReady(this);
}
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {}
void ChainPromiseNode::onReady(Event& event) noexcept {
void ChainPromiseNode::onReady(Event* event) noexcept {
switch (state) {
case STEP1:
KJ_REQUIRE(onReadyEvent == nullptr, "onReady() can only be called once.");
onReadyEvent = &event;
onReadyEvent = event;
return;
case STEP2:
inner->onReady(event);
......@@ -755,7 +806,7 @@ Maybe<Own<Event>> ChainPromiseNode::fire() {
*selfPtr = kj::mv(inner);
selfPtr->get()->setSelfPointer(selfPtr);
if (onReadyEvent != nullptr) {
selfPtr->get()->onReady(*onReadyEvent);
selfPtr->get()->onReady(onReadyEvent);
}
// Return our self-pointer so that the caller takes care of deleting it.
......@@ -763,7 +814,7 @@ Maybe<Own<Event>> ChainPromiseNode::fire() {
} else {
inner->setSelfPointer(&inner);
if (onReadyEvent != nullptr) {
inner->onReady(*onReadyEvent);
inner->onReady(onReadyEvent);
}
return nullptr;
......@@ -777,7 +828,7 @@ ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<Pr
ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {}
void ExclusiveJoinPromiseNode::onReady(Event& event) noexcept {
void ExclusiveJoinPromiseNode::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......@@ -797,7 +848,7 @@ ExclusiveJoinPromiseNode::Branch::Branch(
ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependencyParam)
: joinNode(joinNode), dependency(kj::mv(dependencyParam)) {
dependency->setSelfPointer(&dependency);
dependency->onReady(*this);
dependency->onReady(this);
}
ExclusiveJoinPromiseNode::Branch::~Branch() noexcept(false) {}
......@@ -847,7 +898,7 @@ ArrayJoinPromiseNodeBase::ArrayJoinPromiseNodeBase(
}
ArrayJoinPromiseNodeBase::~ArrayJoinPromiseNodeBase() noexcept(false) {}
void ArrayJoinPromiseNodeBase::onReady(Event& event) noexcept {
void ArrayJoinPromiseNodeBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......@@ -873,7 +924,7 @@ ArrayJoinPromiseNodeBase::Branch::Branch(
ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependencyParam, ExceptionOrValue& output)
: joinNode(joinNode), dependency(kj::mv(dependencyParam)), output(output) {
dependency->setSelfPointer(&dependency);
dependency->onReady(*this);
dependency->onReady(this);
}
ArrayJoinPromiseNodeBase::Branch::~Branch() noexcept(false) {}
......@@ -921,10 +972,10 @@ EagerPromiseNodeBase::EagerPromiseNodeBase(
Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef)
: dependency(kj::mv(dependencyParam)), resultRef(resultRef) {
dependency->setSelfPointer(&dependency);
dependency->onReady(*this);
dependency->onReady(this);
}
void EagerPromiseNodeBase::onReady(Event& event) noexcept {
void EagerPromiseNodeBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......@@ -946,7 +997,7 @@ Maybe<Own<Event>> EagerPromiseNodeBase::fire() {
// -------------------------------------------------------------------
void AdapterPromiseNodeBase::onReady(Event& event) noexcept {
void AdapterPromiseNodeBase::onReady(Event* event) noexcept {
onReadyEvent.init(event);
}
......
......@@ -235,6 +235,19 @@ public:
// TODO(someday): Implement fibers, and let them call wait() even when they are handling an
// event.
bool poll(WaitScope& waitScope);
// Returns true if a call to wait() would complete without blocking, false if it would block.
//
// If the promise is not yet resolved, poll() will pump the event loop and poll for I/O in an
// attempt to resolve it. Only when there is nothing left to do will it return false.
//
// Generally, poll() is most useful in tests. Often, you may want to verify that a promise does
// not resolve until some specific event occurs. To do so, poll() the promise before the event to
// verify it isn't resolved, then trigger the event, then poll() again to verify that it resolves.
// The first poll() verifies that the promise doesn't resolve early, which would otherwise be
// hard to do deterministically. The second poll() allows you to check that the promise has
// resolved and avoid a wait() that might deadlock in the case that it hasn't.
ForkedPromise<T> fork() KJ_WARN_UNUSED_RESULT;
// Forks the promise, so that multiple different clients can independently wait on the result.
// `T` must be copy-constructable for this to work. Or, in the special case where `T` is
......@@ -650,6 +663,7 @@ private:
friend void _::detach(kj::Promise<void>&& promise);
friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
WaitScope& waitScope);
friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
friend class _::Event;
friend class WaitScope;
};
......@@ -668,11 +682,15 @@ public:
inline ~WaitScope() { loop.leaveScope(); }
KJ_DISALLOW_COPY(WaitScope);
void poll();
// Pumps the event queue and polls for I/O until there's nothing left to do (without blocking).
private:
EventLoop& loop;
friend class EventLoop;
friend void _::waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result,
WaitScope& waitScope);
friend bool _::pollImpl(_::PromiseNode& node, WaitScope& waitScope);
};
} // namespace kj
......
......@@ -2355,6 +2355,340 @@ KJ_TEST("newHttpService from HttpClient WebSockets disconnect") {
// -----------------------------------------------------------------------------
class CountingIoStream final: public kj::AsyncIoStream {
// An AsyncIoStream wrapper which decrements a counter when destroyed (allowing us to count how
// many connections are open).
public:
CountingIoStream(kj::Own<kj::AsyncIoStream> inner, uint& count)
: inner(kj::mv(inner)), count(count) {}
~CountingIoStream() noexcept(false) {
--count;
}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
}
kj::Maybe<uint64_t> tryGetLength() override {
return inner->tryGetLength();;
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
return inner->pumpTo(output, amount);
}
kj::Promise<void> write(const void* buffer, size_t size) override {
return inner->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return inner->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return inner->tryPumpFrom(input, amount);
}
void shutdownWrite() override {
return inner->shutdownWrite();
}
void abortRead() override {
return inner->abortRead();
}
public:
kj::Own<AsyncIoStream> inner;
uint& count;
};
class CountingNetworkAddress final: public kj::NetworkAddress {
public:
CountingNetworkAddress(kj::NetworkAddress& inner, uint& count)
: inner(inner), count(count), addrCount(ownAddrCount) {}
CountingNetworkAddress(kj::Own<kj::NetworkAddress> inner, uint& count, uint& addrCount)
: inner(*inner), ownInner(kj::mv(inner)), count(count), addrCount(addrCount) {}
~CountingNetworkAddress() noexcept(false) {
--addrCount;
}
kj::Promise<kj::Own<kj::AsyncIoStream>> connect() override {
++count;
return inner.connect()
.then([this](kj::Own<kj::AsyncIoStream> stream) -> kj::Own<kj::AsyncIoStream> {
return kj::heap<CountingIoStream>(kj::mv(stream), count);
});
}
kj::Own<kj::ConnectionReceiver> listen() override { KJ_UNIMPLEMENTED("test"); }
kj::Own<kj::NetworkAddress> clone() override { KJ_UNIMPLEMENTED("test"); }
kj::String toString() override { KJ_UNIMPLEMENTED("test"); }
private:
kj::NetworkAddress& inner;
kj::Own<kj::NetworkAddress> ownInner;
uint& count;
uint ownAddrCount = 1;
uint& addrCount;
};
class ConnectionCountingNetwork final: public kj::Network {
public:
ConnectionCountingNetwork(kj::Network& inner, uint& count, uint& addrCount)
: inner(inner), count(count), addrCount(addrCount) {}
Promise<Own<NetworkAddress>> parseAddress(StringPtr addr, uint portHint = 0) override {
++addrCount;
return inner.parseAddress(addr, portHint)
.then([this](Own<NetworkAddress>&& addr) -> Own<NetworkAddress> {
return kj::heap<CountingNetworkAddress>(kj::mv(addr), count, addrCount);
});
}
Own<NetworkAddress> getSockaddr(const void* sockaddr, uint len) override {
KJ_UNIMPLEMENTED("test");
}
Own<Network> restrictPeers(
kj::ArrayPtr<const kj::StringPtr> allow,
kj::ArrayPtr<const kj::StringPtr> deny = nullptr) override {
KJ_UNIMPLEMENTED("test");
}
private:
kj::Network& inner;
uint& count;
uint& addrCount;
};
class DummyService final: public HttpService {
public:
DummyService(HttpHeaderTable& headerTable): headerTable(headerTable) {}
kj::Promise<void> request(
HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::AsyncInputStream& requestBody, Response& response) override {
KJ_ASSERT(url != "/throw");
auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
auto stream = response.send(200, "OK", HttpHeaders(headerTable), body.size());
auto promise = stream->write(body.begin(), body.size());
return promise.attach(kj::mv(stream), kj::mv(body));
}
kj::Promise<void> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers, WebSocketResponse& response) override {
auto ws = response.acceptWebSocket(HttpHeaders(headerTable));
auto body = kj::str(headers.get(HttpHeaderId::HOST).orDefault("null"), ":", url);
auto sendPromise = ws->send(body);
auto promises = kj::heapArrayBuilder<kj::Promise<void>>(2);
promises.add(sendPromise.attach(kj::mv(body)));
promises.add(ws->receive().ignoreResult());
return kj::joinPromises(promises.finish()).attach(kj::mv(ws));
}
private:
HttpHeaderTable& headerTable;
};
KJ_TEST("HttpClient connection management") {
auto io = kj::setupAsyncIo();
kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
HttpHeaderTable headerTable;
auto listener = io.provider->getNetwork().parseAddress("localhost", 0)
.wait(io.waitScope)->listen();
DummyService service(headerTable);
HttpServerSettings serverSettings;
HttpServer server(serverTimer, headerTable, service, serverSettings);
auto listenTask = server.listenHttp(*listener);
auto addr = io.provider->getNetwork().parseAddress("localhost", listener->getPort())
.wait(io.waitScope);
uint count = 0;
CountingNetworkAddress countingAddr(*addr, count);
FakeEntropySource entropySource;
HttpClientSettings clientSettings;
clientSettings.entropySource = entropySource;
auto client = newHttpClient(clientTimer, headerTable, countingAddr, clientSettings);
KJ_EXPECT(count == 0);
uint i = 0;
auto doRequest = [&]() {
uint n = i++;
return client->request(HttpMethod::GET, kj::str("/", n), HttpHeaders(headerTable)).response
.then([](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([n](kj::String body) {
KJ_EXPECT(body == kj::str("null:/", n));
});
};
// We can do several requests in a row and only have one connection.
doRequest().wait(io.waitScope);
doRequest().wait(io.waitScope);
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
// But if we do two in parallel, we'll end up with two connections.
auto req1 = doRequest();
auto req2 = doRequest();
req1.wait(io.waitScope);
req2.wait(io.waitScope);
KJ_EXPECT(count == 2);
// Advance time for half the timeout, then exercise one of the connections.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
doRequest().wait(io.waitScope);
doRequest().wait(io.waitScope);
io.waitScope.poll();
KJ_EXPECT(count == 2);
// Advance time past when the other connection should time out. It should be dropped.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout * 3 / 4);
io.waitScope.poll();
KJ_EXPECT(count == 1);
// Wait for the other to drop.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout / 2);
io.waitScope.poll();
KJ_EXPECT(count == 0);
// New request creates a new connection again.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
// WebSocket connections are not reused.
client->openWebSocket(kj::str("/websocket"), HttpHeaders(headerTable))
.wait(io.waitScope);
KJ_EXPECT(count == 0);
// Errored connections are not reused.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
client->request(HttpMethod::GET, kj::str("/throw"), HttpHeaders(headerTable)).response
.wait(io.waitScope);
KJ_EXPECT(count == 0);
#if !_WIN32 // TODO(soon): Figure out why this doesn't work on Windows. Probably a bug in
// Win32IocpEventPort::poll().
// If the server times out the connection, we figure it out on the client.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
serverTimer.advanceTo(serverTimer.now() + serverSettings.pipelineTimeout * 2);
io.waitScope.poll();
KJ_EXPECT(count == 0);
#endif
// Can still make requests.
doRequest().wait(io.waitScope);
KJ_EXPECT(count == 1);
}
KJ_TEST("HttpClient multi host") {
auto io = kj::setupAsyncIo();
kj::TimerImpl serverTimer(kj::origin<kj::TimePoint>());
kj::TimerImpl clientTimer(kj::origin<kj::TimePoint>());
HttpHeaderTable headerTable;
auto listener1 = io.provider->getNetwork().parseAddress("localhost", 0)
.wait(io.waitScope)->listen();
auto listener2 = io.provider->getNetwork().parseAddress("localhost", 0)
.wait(io.waitScope)->listen();
DummyService service(headerTable);
HttpServer server(serverTimer, headerTable, service);
auto listenTask1 = server.listenHttp(*listener1);
auto listenTask2 = server.listenHttp(*listener2);
uint count = 0, addrCount = 0;
uint tlsCount = 0, tlsAddrCount = 0;
ConnectionCountingNetwork countingNetwork(io.provider->getNetwork(), count, addrCount);
ConnectionCountingNetwork countingTlsNetwork(io.provider->getNetwork(), tlsCount, tlsAddrCount);
HttpClientSettings clientSettings;
auto client = newHttpClient(clientTimer, headerTable,
countingNetwork, countingTlsNetwork, clientSettings);
KJ_EXPECT(count == 0);
uint i = 0;
auto doRequest = [&](bool tls, uint port) {
uint n = i++;
return client->request(HttpMethod::GET,
kj::str((tls ? "https://localhost:" : "http://localhost:"), port, '/', n),
HttpHeaders(headerTable)).response
.then([](HttpClient::Response&& response) {
auto promise = response.body->readAllText();
return promise.attach(kj::mv(response.body));
}).then([n, port](kj::String body) {
KJ_EXPECT(body == kj::str("localhost:", port, ":/", n), body, port, n);
});
};
uint port1 = listener1->getPort();
uint port2 = listener2->getPort();
// We can do several requests in a row to the same host and only have one connection.
doRequest(false, port1).wait(io.waitScope);
doRequest(false, port1).wait(io.waitScope);
doRequest(false, port1).wait(io.waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 1);
KJ_EXPECT(tlsAddrCount == 0);
// Request a different host, and now we have two connections.
doRequest(false, port2).wait(io.waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 0);
// Try TLS.
doRequest(true, port1).wait(io.waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(tlsCount == 1);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 1);
// Try first host again, no change in connection count.
doRequest(false, port1).wait(io.waitScope);
KJ_EXPECT(count == 2);
KJ_EXPECT(tlsCount == 1);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 1);
// Multiple requests in parallel forces more connections to that host.
auto promise1 = doRequest(false, port1);
auto promise2 = doRequest(false, port1);
promise1.wait(io.waitScope);
promise2.wait(io.waitScope);
KJ_EXPECT(count == 3);
KJ_EXPECT(tlsCount == 1);
KJ_EXPECT(addrCount == 2);
KJ_EXPECT(tlsAddrCount == 1);
// Let everything expire.
clientTimer.advanceTo(clientTimer.now() + clientSettings.idleTimout * 2);
io.waitScope.poll();
KJ_EXPECT(count == 0);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 0);
KJ_EXPECT(tlsAddrCount == 0);
// We can still request those hosts again.
doRequest(false, port1).wait(io.waitScope);
KJ_EXPECT(count == 1);
KJ_EXPECT(tlsCount == 0);
KJ_EXPECT(addrCount == 1);
KJ_EXPECT(tlsAddrCount == 0);
}
// -----------------------------------------------------------------------------
KJ_TEST("HttpClient to capnproto.org") {
auto io = kj::setupAsyncIo();
......
......@@ -20,11 +20,14 @@
// THE SOFTWARE.
#include "http.h"
#include "url.h"
#include <kj/debug.h>
#include <kj/parse/char.h>
#include <unordered_map>
#include <stdlib.h>
#include <kj/encoding.h>
#include <deque>
#include <map>
namespace kj {
......@@ -1046,8 +1049,21 @@ public:
// ---------------------------------------------------------------------------
kj::Promise<bool> awaitNextMessage() {
// Waits until more data is available, but doesn't consume it. Only meant for server-side use,
// after a request is handled, to check for pipelined requests. Returns false on EOF.
// Waits until more data is available, but doesn't consume it. Returns false on EOF.
//
// Used on the server after a request is handled, to check for pipelined requests.
//
// Used on the client to detect when idle connections are closed from the server end. (In this
// case, the promise always returns false or is canceled.)
if (onMessageDone != nullptr) {
// We're still working on reading the previous body.
auto fork = messageReadQueue.fork();
messageReadQueue = fork.addBranch();
return fork.addBranch().then([this]() {
return awaitNextMessage();
});
}
// Slightly-crappy code to snarf the expected line break. This will actually eat the leading
// regex /\r*\n?/.
......@@ -2408,17 +2424,26 @@ namespace {
class HttpClientImpl final: public HttpClient {
public:
HttpClientImpl(HttpHeaderTable& responseHeaderTable, kj::Own<kj::AsyncIoStream> rawStream,
kj::Maybe<EntropySource&> entropySource)
HttpClientSettings settings)
: httpInput(*rawStream, responseHeaderTable),
httpOutput(*rawStream),
ownStream(kj::mv(rawStream)),
entropySource(entropySource) {}
settings(kj::mv(settings)) {}
bool canReuse() {
// Returns true if
return !upgraded && !closed;
}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
KJ_REQUIRE(!upgraded,
"can't make further requests on this HttpClient because it has been or is in the process "
"of being upgraded");
KJ_REQUIRE(!closed,
"this HttpClient's connection has been closed by the server or due to an error");
closeWatcherTask = nullptr;
HttpHeaders::ConnectionHeaders connectionHeaders;
kj::String lengthStr;
......@@ -2448,14 +2473,21 @@ public:
auto responsePromise = httpInput.readResponseHeaders()
.then([this,method](kj::Maybe<HttpHeaders::Response>&& response) -> HttpClient::Response {
KJ_IF_MAYBE(r, response) {
return {
HttpClient::Response result {
r->statusCode,
r->statusText,
&httpInput.getHeaders(),
httpInput.getEntityBody(HttpInputStream::RESPONSE, method, r->statusCode,
r->connectionHeaders)
};
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(r->connectionHeaders.connection.cStr())) {
closed = true;
} else {
watchForClose();
}
return result;
} else {
closed = true;
KJ_FAIL_REQUIRE("received invalid HTTP response") { break; }
return HttpClient::Response();
}
......@@ -2469,13 +2501,16 @@ public:
KJ_REQUIRE(!upgraded,
"can't make further requests on this HttpClient because it has been or is in the process "
"of being upgraded");
KJ_REQUIRE(!closed,
"this HttpClient's connection has been closed by the server or due to an error");
closeWatcherTask = nullptr;
// Mark upgraded for now, even though the upgrade could fail, because we can't allow pipelined
// requests in the meantime.
upgraded = true;
byte keyBytes[16];
KJ_ASSERT_NONNULL(this->entropySource,
KJ_ASSERT_NONNULL(settings.entropySource,
"can't use openWebSocket() because no EntropySource was provided when creating the "
"HttpClient").generate(keyBytes);
auto keyBase64 = kj::encodeBase64(keyBytes);
......@@ -2515,17 +2550,23 @@ public:
r->statusCode,
r->statusText,
&httpInput.getHeaders(),
upgradeToWebSocket(kj::mv(ownStream), httpInput, httpOutput, entropySource),
upgradeToWebSocket(kj::mv(ownStream), httpInput, httpOutput, settings.entropySource),
};
} else {
upgraded = false;
return {
HttpClient::WebSocketResponse result {
r->statusCode,
r->statusText,
&httpInput.getHeaders(),
httpInput.getEntityBody(HttpInputStream::RESPONSE, HttpMethod::GET, r->statusCode,
r->connectionHeaders)
};
if (fastCaseCmp<'c', 'l', 'o', 's', 'e'>(r->connectionHeaders.connection.cStr())) {
closed = true;
} else {
watchForClose();
}
return result;
}
} else {
KJ_FAIL_REQUIRE("received invalid HTTP response") { break; }
......@@ -2538,8 +2579,29 @@ private:
HttpInputStream httpInput;
HttpOutputStream httpOutput;
kj::Own<AsyncIoStream> ownStream;
kj::Maybe<EntropySource&> entropySource;
HttpClientSettings settings;
kj::Maybe<kj::Promise<void>> closeWatcherTask;
bool upgraded = false;
bool closed = false;
void watchForClose() {
closeWatcherTask = httpInput.awaitNextMessage().then([this](bool hasData) {
if (hasData) {
// Uhh... The server sent some data before we asked for anything. Perhaps due to properties
// of this application, the server somehow already knows what the next request will be, and
// it is trying to optimize. Or maybe this is some sort of test and the server is just
// replaying a script. In any case, we will humor it -- leave the data in the buffer and
// let it become the response to the next request.
} else {
// EOF -- server disconnected.
// Proactively free up the socket.
ownStream = nullptr;
closed = true;
}
}).eagerlyEvaluate(nullptr);
}
};
} // namespace
......@@ -2566,10 +2628,579 @@ kj::Promise<kj::Own<kj::AsyncIoStream>> HttpClient::connect(kj::StringPtr host)
kj::Own<HttpClient> newHttpClient(
HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream,
kj::Maybe<EntropySource&> entropySource) {
HttpClientSettings settings) {
return kj::heap<HttpClientImpl>(responseHeaderTable,
kj::Own<kj::AsyncIoStream>(&stream, kj::NullDisposer::instance),
entropySource);
kj::mv(settings));
}
// =======================================================================================
namespace {
class PromiseIoStream final: public kj::AsyncIoStream, private kj::TaskSet::ErrorHandler {
// An AsyncIoStream which waits for a promise to resolve then forwards all calls to the promised
// stream.
//
// TODO(cleanup): Make this more broadly available.
public:
PromiseIoStream(kj::Promise<kj::Own<AsyncIoStream>> promise)
: promise(promise.then([this](kj::Own<AsyncIoStream> result) {
stream = kj::mv(result);
}).fork()),
tasks(*this) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->read(buffer, minBytes, maxBytes);
} else {
return promise.addBranch().then([this,buffer,minBytes,maxBytes]() {
return KJ_ASSERT_NONNULL(stream)->read(buffer, minBytes, maxBytes);
});
}
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->tryRead(buffer, minBytes, maxBytes);
} else {
return promise.addBranch().then([this,buffer,minBytes,maxBytes]() {
return KJ_ASSERT_NONNULL(stream)->tryRead(buffer, minBytes, maxBytes);
});
}
}
kj::Maybe<uint64_t> tryGetLength() override {
KJ_IF_MAYBE(s, stream) {
return s->get()->tryGetLength();
} else {
return nullptr;
}
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->pumpTo(output, amount);
} else {
return promise.addBranch().then([this,&output,amount]() {
return KJ_ASSERT_NONNULL(stream)->pumpTo(output, amount);
});
}
}
kj::Promise<void> write(const void* buffer, size_t size) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->write(buffer, size);
} else {
return promise.addBranch().then([this,buffer,size]() {
return KJ_ASSERT_NONNULL(stream)->write(buffer, size);
});
}
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->write(pieces);
} else {
return promise.addBranch().then([this,pieces]() {
return KJ_ASSERT_NONNULL(stream)->write(pieces);
});
}
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->tryPumpFrom(input, amount);
} else {
return promise.addBranch().then([this,&input,amount]() {
// Call input.pumpTo() on the resolved stream instead.
return input.pumpTo(*KJ_ASSERT_NONNULL(stream), amount);
});
}
}
void shutdownWrite() override {
KJ_IF_MAYBE(s, stream) {
return s->get()->shutdownWrite();
} else {
tasks.add(promise.addBranch().then([this]() {
return KJ_ASSERT_NONNULL(stream)->shutdownWrite();
}));
}
}
void abortRead() override {
KJ_IF_MAYBE(s, stream) {
return s->get()->abortRead();
} else {
tasks.add(promise.addBranch().then([this]() {
return KJ_ASSERT_NONNULL(stream)->abortRead();
}));
}
}
public:
kj::ForkedPromise<void> promise;
kj::Maybe<kj::Own<AsyncIoStream>> stream;
kj::TaskSet tasks;
void taskFailed(kj::Exception&& exception) override {
KJ_LOG(ERROR, exception);
}
};
class PromiseOutputStream final: public kj::AsyncOutputStream {
// An AsyncOutputStream which waits for a promise to resolve then forwards all calls to the
// promised stream.
//
// TODO(cleanup): Make this more broadly available.
// TODO(cleanup): Can this share implementation with PromiseIoStream? Seems hard.
public:
PromiseOutputStream(kj::Promise<kj::Own<AsyncOutputStream>> promise)
: promise(promise.then([this](kj::Own<AsyncOutputStream> result) {
stream = kj::mv(result);
}).fork()) {}
kj::Promise<void> write(const void* buffer, size_t size) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->write(buffer, size);
} else {
return promise.addBranch().then([this,buffer,size]() {
return KJ_ASSERT_NONNULL(stream)->write(buffer, size);
});
}
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->write(pieces);
} else {
return promise.addBranch().then([this,pieces]() {
return KJ_ASSERT_NONNULL(stream)->write(pieces);
});
}
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
KJ_IF_MAYBE(s, stream) {
return s->get()->tryPumpFrom(input, amount);
} else {
return promise.addBranch().then([this,&input,amount]() {
// Call input.pumpTo() on the resolved stream instead.
return input.pumpTo(*KJ_ASSERT_NONNULL(stream), amount);
});
}
}
public:
kj::ForkedPromise<void> promise;
kj::Maybe<kj::Own<AsyncOutputStream>> stream;
};
class AttachmentOutputStream final: public kj::AsyncOutputStream {
// An AsyncOutputStream which also owns some separate object, released when the stream is freed.
public:
AttachmentOutputStream(kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::Refcounted> attachment)
: inner(kj::mv(inner)), attachment(kj::mv(attachment)) {}
kj::Promise<void> write(const void* buffer, size_t size) override {
return inner->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return inner->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return input.pumpTo(*inner, amount);
}
private:
kj::Own<kj::AsyncOutputStream> inner;
kj::Own<kj::Refcounted> attachment;
};
class AttachmentInputStream final: public kj::AsyncInputStream {
// An AsyncInputStream which also owns some separate object, released when the stream is freed.
public:
AttachmentInputStream(kj::Own<kj::AsyncInputStream> inner, kj::Own<kj::Refcounted> attachment)
: inner(kj::mv(inner)), attachment(kj::mv(attachment)) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
}
kj::Maybe<uint64_t> tryGetLength() override {
return inner->tryGetLength();
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
return inner->pumpTo(output, amount);
}
private:
kj::Own<kj::AsyncInputStream> inner;
kj::Own<kj::Refcounted> attachment;
};
class NetworkAddressHttpClient final: public HttpClient {
public:
NetworkAddressHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::Own<kj::NetworkAddress> address, HttpClientSettings settings)
: timer(timer),
responseHeaderTable(responseHeaderTable),
address(kj::mv(address)),
settings(kj::mv(settings)) {}
bool isDrained() {
// Returns true if there are no open connections.
return activeConnectionCount == 0 && availableClients.empty();
}
kj::Promise<void> onDrained() {
// Returns a promise which resolves the next time isDrained() transitions from false to true.
auto paf = kj::newPromiseAndFulfiller<void>();
drainedFulfiller = kj::mv(paf.fulfiller);
return kj::mv(paf.promise);
}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
auto refcounted = getClient();
auto result = refcounted->client->request(method, url, headers, expectedBodySize);
result.body = kj::heap<AttachmentOutputStream>(kj::mv(result.body), kj::addRef(*refcounted));
result.response = result.response.then(kj::mvCapture(refcounted,
[](kj::Own<RefcountedClient>&& refcounted, Response&& response) {
response.body = kj::heap<AttachmentInputStream>(kj::mv(response.body), kj::mv(refcounted));
return kj::mv(response);
}));
return result;
}
kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers) override {
auto refcounted = getClient();
auto result = refcounted->client->openWebSocket(url, headers);
return result.then(kj::mvCapture(refcounted,
[](kj::Own<RefcountedClient>&& refcounted, WebSocketResponse&& response) {
KJ_SWITCH_ONEOF(response.webSocketOrBody) {
KJ_CASE_ONEOF(body, kj::Own<kj::AsyncInputStream>) {
response.webSocketOrBody.init<kj::Own<kj::AsyncInputStream>>(
kj::heap<AttachmentInputStream>(kj::mv(body), kj::mv(refcounted)));
}
KJ_CASE_ONEOF(ws, kj::Own<WebSocket>) {
// We actually don't need to attach the HttpClient to the WebSocket -- our HttpClient
// implementation transfers ownership of the connection into the WebSocket.
}
}
return kj::mv(response);
}));
}
private:
kj::Timer& timer;
HttpHeaderTable& responseHeaderTable;
kj::Own<kj::NetworkAddress> address;
HttpClientSettings settings;
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> drainedFulfiller;
uint activeConnectionCount = 0;
bool timeoutsScheduled = false;
kj::Promise<void> timeoutTask = nullptr;
struct AvailableClient {
kj::Own<HttpClientImpl> client;
kj::TimePoint expires;
};
std::deque<AvailableClient> availableClients;
struct RefcountedClient final: public kj::Refcounted {
RefcountedClient(NetworkAddressHttpClient& parent, kj::Own<HttpClientImpl> client)
: parent(parent), client(kj::mv(client)) {
++parent.activeConnectionCount;
}
~RefcountedClient() noexcept(false) {
--parent.activeConnectionCount;
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
parent.returnClientToAvailable(kj::mv(client));
})) {
KJ_LOG(ERROR, *exception);
}
}
NetworkAddressHttpClient& parent;
kj::Own<HttpClientImpl> client;
};
kj::Own<RefcountedClient> getClient() {
for (;;) {
if (availableClients.empty()) {
auto stream = kj::heap<PromiseIoStream>(address->connect());
return kj::refcounted<RefcountedClient>(*this,
kj::heap<HttpClientImpl>(responseHeaderTable, kj::mv(stream), settings));
} else {
auto client = kj::mv(availableClients.back().client);
availableClients.pop_back();
if (client->canReuse()) {
return kj::refcounted<RefcountedClient>(*this, kj::mv(client));
}
// Whoops, this client's connection was closed by the server at some point. Discard.
}
}
}
void returnClientToAvailable(kj::Own<HttpClientImpl> client) {
// Only return the connection to the pool if it is reusable.
if (client->canReuse()) {
availableClients.push_back(AvailableClient {
kj::mv(client), timer.now() + settings.idleTimout
});
}
// Call this either way because it also signals onDrained().
if (!timeoutsScheduled) {
timeoutsScheduled = true;
timeoutTask = applyTimeouts();
}
}
kj::Promise<void> applyTimeouts() {
if (availableClients.empty()) {
timeoutsScheduled = false;
if (activeConnectionCount == 0) {
KJ_IF_MAYBE(f, drainedFulfiller) {
f->get()->fulfill();
drainedFulfiller = nullptr;
}
}
return kj::READY_NOW;
} else {
auto time = availableClients.front().expires;
return timer.atTime(time).then([this,time]() {
while (!availableClients.empty() && availableClients.front().expires <= time) {
availableClients.pop_front();
}
return applyTimeouts();
});
}
}
};
class PromiseNetworkAddressHttpClient final: public HttpClient {
// An HttpClient which waits for a promise to resolve then forwards all calls to the promised
// client.
public:
PromiseNetworkAddressHttpClient(kj::Promise<kj::Own<NetworkAddressHttpClient>> promise)
: promise(promise.then([this](kj::Own<NetworkAddressHttpClient>&& client) {
this->client = kj::mv(client);
}).fork()) {}
bool isDrained() {
KJ_IF_MAYBE(c, client) {
return c->get()->isDrained();
} else {
return false;
}
}
kj::Promise<void> onDrained() {
KJ_IF_MAYBE(c, client) {
return c->get()->onDrained();
} else {
return promise.addBranch().then([this]() {
return KJ_ASSERT_NONNULL(client)->onDrained();
}, [](kj::Exception&& e) {
// Connecting failed. Treat as immediately drained.
return kj::READY_NOW;
});
}
}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
KJ_IF_MAYBE(c, client) {
return c->get()->request(method, url, headers, expectedBodySize);
} else {
// This gets complicated since request() returns a pair of a stream and a promise.
auto urlCopy = kj::str(url);
auto headersCopy = headers.clone();
auto combined = promise.addBranch().then(kj::mvCapture(urlCopy, kj::mvCapture(headersCopy,
[this,method,expectedBodySize](HttpHeaders&& headers, kj::String&& url)
-> kj::Tuple<kj::Own<kj::AsyncOutputStream>, kj::Promise<Response>> {
auto req = KJ_ASSERT_NONNULL(client)->request(method, url, headers, expectedBodySize);
return kj::tuple(kj::mv(req.body), kj::mv(req.response));
})));
auto split = combined.split();
return {
kj::heap<PromiseOutputStream>(kj::mv(kj::get<0>(split))),
kj::mv(kj::get<1>(split))
};
}
}
kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers) override {
KJ_IF_MAYBE(c, client) {
return c->get()->openWebSocket(url, headers);
} else {
auto urlCopy = kj::str(url);
auto headersCopy = headers.clone();
return promise.addBranch().then(kj::mvCapture(urlCopy, kj::mvCapture(headersCopy,
[this](HttpHeaders&& headers, kj::String&& url) {
return KJ_ASSERT_NONNULL(client)->openWebSocket(url, headers);
})));
}
}
private:
kj::ForkedPromise<void> promise;
kj::Maybe<kj::Own<NetworkAddressHttpClient>> client;
};
class NetworkHttpClient final: public HttpClient, private kj::TaskSet::ErrorHandler {
public:
NetworkHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::Network& network, kj::Maybe<kj::Network&> tlsNetwork,
HttpClientSettings settings)
: timer(timer),
responseHeaderTable(responseHeaderTable),
network(network),
tlsNetwork(tlsNetwork),
settings(kj::mv(settings)),
tasks(*this) {}
Request request(HttpMethod method, kj::StringPtr url, const HttpHeaders& headers,
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
auto parsed = Url::parse(url, Url::HTTP_PROXY_REQUEST);
auto path = parsed.toString(Url::HTTP_REQUEST);
auto headersCopy = headers.clone();
headersCopy.set(HttpHeaderId::HOST, parsed.host);
return getClient(parsed).request(method, path, headersCopy, expectedBodySize);
}
kj::Promise<WebSocketResponse> openWebSocket(
kj::StringPtr url, const HttpHeaders& headers) override {
auto parsed = Url::parse(url, Url::HTTP_PROXY_REQUEST);
auto path = parsed.toString(Url::HTTP_REQUEST);
auto headersCopy = headers.clone();
headersCopy.set(HttpHeaderId::HOST, parsed.host);
return getClient(parsed).openWebSocket(path, headersCopy);
}
private:
kj::Timer& timer;
HttpHeaderTable& responseHeaderTable;
kj::Network& network;
kj::Maybe<kj::Network&> tlsNetwork;
HttpClientSettings settings;
struct Host {
kj::String name; // including port, if non-default
kj::Own<PromiseNetworkAddressHttpClient> client;
};
std::map<kj::StringPtr, Host> httpHosts;
std::map<kj::StringPtr, Host> httpsHosts;
struct RequestInfo {
HttpMethod method;
kj::String hostname;
kj::String path;
HttpHeaders headers;
kj::Maybe<uint64_t> expectedBodySize;
};
kj::TaskSet tasks;
HttpClient& getClient(kj::Url& parsed) {
bool isHttps = parsed.scheme == "https";
bool isHttp = parsed.scheme == "http";
KJ_REQUIRE(isHttp || isHttps);
auto& hosts = isHttps ? httpsHosts : httpHosts;
// Look for a cached client for this host.
// TODO(perf): It would be nice to recognize when different hosts have the same address and
// reuse the same connection pool, but:
// - We'd need a reliable way to compare NetworkAddresses, e.g. .equals() and .hashCode().
// It's very Java... ick.
// - Correctly handling TLS would be tricky: we'd need to verify that the new hostname is
// on the certificate. When SNI is in use we might have to request an additional
// certificate (is that possible?).
auto iter = hosts.find(parsed.host);
if (iter == hosts.end()) {
// Need to open a new connection.
kj::Network* networkToUse = &network;
if (isHttps) {
networkToUse = &KJ_REQUIRE_NONNULL(tlsNetwork, "this HttpClient doesn't support HTTPS");
}
auto paf = kj::newPromiseAndFulfiller<kj::Promise<void>>();
auto promise = networkToUse->parseAddress(parsed.host, isHttps ? 443 : 80)
.then([this](kj::Own<kj::NetworkAddress> addr) {
return kj::heap<NetworkAddressHttpClient>(
timer, responseHeaderTable, kj::mv(addr), settings);
});
Host host {
kj::mv(parsed.host),
kj::heap<PromiseNetworkAddressHttpClient>(kj::mv(promise))
};
kj::StringPtr nameRef = host.name;
auto insertResult = hosts.insert(std::make_pair(nameRef, kj::mv(host)));
KJ_ASSERT(insertResult.second);
iter = insertResult.first;
tasks.add(handleCleanup(hosts, iter));
}
return *iter->second.client;
}
kj::Promise<void> handleCleanup(std::map<kj::StringPtr, Host>& hosts,
std::map<kj::StringPtr, Host>::iterator iter) {
return iter->second.client->onDrained()
.then([this,&hosts,iter]() -> kj::Promise<void> {
// Double-check that it's really drained to avoid race conditions.
if (iter->second.client->isDrained()) {
hosts.erase(iter);
return kj::READY_NOW;
} else {
return handleCleanup(hosts, iter);
}
});
}
void taskFailed(kj::Exception&& exception) override {
KJ_LOG(ERROR, exception);
}
};
} // namespace
kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::NetworkAddress& addr, HttpClientSettings settings) {
return kj::heap<NetworkAddressHttpClient>(timer, responseHeaderTable,
kj::Own<kj::NetworkAddress>(&addr, kj::NullDisposer::instance), kj::mv(settings));
}
kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::Network& network, kj::Maybe<kj::Network&> tlsNetwork,
HttpClientSettings settings) {
return kj::heap<NetworkHttpClient>(
timer, responseHeaderTable, network, tlsNetwork, kj::mv(settings));
}
// =======================================================================================
......
......@@ -562,26 +562,53 @@ public:
// UNIMPLEMENTED.
};
kj::Own<HttpClient> newHttpClient(HttpHeaderTable& responseHeaderTable, kj::Network& network,
kj::Maybe<kj::Network&> tlsNetwork = nullptr,
kj::Maybe<EntropySource&> entropySource = nullptr);
// Creates a proxy HttpClient that connects to hosts over the given network.
struct HttpClientSettings {
kj::Duration idleTimout = 5 * kj::SECONDS;
// For clients which automatically create new connections, any connection idle for at least this
// long will be closed.
kj::Maybe<EntropySource&> entropySource = nullptr;
// Must be provided in order to use `openWebSocket`. If you don't need WebSockets, this can be
// omitted. The WebSocket protocol uses random values to avoid triggering flaws (including
// security flaws) in certain HTTP proxy software. Specifically, entropy is used to generate the
// `Sec-WebSocket-Key` header and to generate frame masks. If you know that there are no broken
// or vulnerable proxies between you and the server, you can provide a dummy entropy source that
// doesn't generate real entropy (e.g. returning the same value every time). Otherwise, you must
// provide a cryptographically-random entropy source.
};
kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::Network& network, kj::Maybe<kj::Network&> tlsNetwork,
HttpClientSettings settings = HttpClientSettings());
// Creates a proxy HttpClient that connects to hosts over the given network. The URL must always
// be an absolute URL; the host is parsed from the URL. This implementation will automatically
// add an appropriate Host header (and convert the URL to just a path) once it has connected.
//
// Note that if you wish to route traffic through an HTTP proxy server rather than connect to
// remote hosts directly, you should use the form of newHttpClient() that takes a NetworkAddress,
// and supply the proxy's address.
//
// `responseHeaderTable` is used when parsing HTTP responses. Requests can use any header table.
//
// `tlsNetwork` is required to support HTTPS destination URLs. Otherwise, only HTTP URLs can be
// `tlsNetwork` is required to support HTTPS destination URLs. If null, only HTTP URLs can be
// fetched.
kj::Own<HttpClient> newHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
kj::NetworkAddress& addr,
HttpClientSettings settings = HttpClientSettings());
// Creates an HttpClient that always connects to the given address no matter what URL is requested.
// The client will open and close connections as needed. It will attempt to reuse connections for
// multiple requests but will not send a new request before the previous response on the same
// connection has completed, as doing so can result in head-of-line blocking issues. The client may
// be used as a proxy client or a host client depending on whether the peer is operating as
// a proxy. (Hint: This is the best kind of client to use when routing traffic through an HTTP
// proxy. `addr` should be the address of the proxy, and the proxy itself will resolve remote hosts
// based on the URLs passed to it.)
//
// `entropySource` must be provided in order to use `openWebSocket`. If you don't need WebSockets,
// `entropySource` can be omitted. The WebSocket protocol uses random values to avoid triggering
// flaws (including security flaws) in certain HTTP proxy software. Specifically, entropy is used
// to generate the `Sec-WebSocket-Key` header and to generate frame masks. If you know that there
// are no broken or vulnerable proxies between you and the server, you can provide an dummy entropy
// source that doesn't generate real entropy (e.g. returning the same value every time). Otherwise,
// you must provide a cryptographically-random entropy source.
// `responseHeaderTable` is used when parsing HTTP responses. Requests can use any header table.
kj::Own<HttpClient> newHttpClient(HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream,
kj::Maybe<EntropySource&> entropySource = nullptr);
HttpClientSettings settings = HttpClientSettings());
// Creates an HttpClient that speaks over the given pre-established connection. The client may
// be used as a proxy client or a host client depending on whether the peer is operating as
// a proxy.
......@@ -591,14 +618,12 @@ kj::Own<HttpClient> newHttpClient(HttpHeaderTable& responseHeaderTable, kj::Asyn
// fail as well. If the destination server chooses to close the connection after a response,
// subsequent requests will fail. If a response takes a long time, it blocks subsequent responses.
// If a WebSocket is opened successfully, all subsequent requests fail.
//
// `entropySource` must be provided in order to use `openWebSocket`. If you don't need WebSockets,
// `entropySource` can be omitted. The WebSocket protocol uses random values to avoid triggering
// flaws (including security flaws) in certain HTTP proxy software. Specifically, entropy is used
// to generate the `Sec-WebSocket-Key` header and to generate frame masks. If you know that there
// are no broken or vulnerable proxies between you and the server, you can provide an dummy entropy
// source that doesn't generate real entropy (e.g. returning the same value every time). Otherwise,
// you must provide a cryptographically-random entropy source.
kj::Own<HttpClient> newHttpClient(
HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream,
kj::Maybe<EntropySource&> entropySource) KJ_DEPRECATED("use HttpClientSettings");
// Temporary for backwards-compatibilty.
// TODO(soon): Remove this before next release.
kj::Own<HttpClient> newHttpClient(HttpService& service);
kj::Own<HttpService> newHttpService(HttpClient& client);
......@@ -726,6 +751,14 @@ inline void HttpHeaders::forEach(Func&& func) const {
}
}
inline kj::Own<HttpClient> newHttpClient(
HttpHeaderTable& responseHeaderTable, kj::AsyncIoStream& stream,
kj::Maybe<EntropySource&> entropySource) {
HttpClientSettings settings;
settings.entropySource = entropySource;
return newHttpClient(responseHeaderTable, stream, kj::mv(settings));
}
} // namespace kj
#endif // KJ_COMPAT_HTTP_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment