Commit 87c7462d authored by Kenton Varda's avatar Kenton Varda

Use Own::attach() to clean up code in http.c++.

parent 58070f85
...@@ -2812,90 +2812,6 @@ public: ...@@ -2812,90 +2812,6 @@ public:
kj::Maybe<kj::Own<AsyncOutputStream>> stream; kj::Maybe<kj::Own<AsyncOutputStream>> stream;
}; };
class AttachmentOutputStream final: public kj::AsyncOutputStream {
// An AsyncOutputStream which also owns some separate object, released when the stream is freed.
public:
AttachmentOutputStream(kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::Refcounted> attachment)
: attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<void> write(const void* buffer, size_t size) override {
return inner->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return inner->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return input.pumpTo(*inner, amount);
}
private:
// Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment;
kj::Own<kj::AsyncOutputStream> inner;
};
class AttachmentInputStream final: public kj::AsyncInputStream {
// An AsyncInputStream which also owns some separate object, released when the stream is freed.
public:
AttachmentInputStream(kj::Own<kj::AsyncInputStream> inner, kj::Own<kj::Refcounted> attachment)
: attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
}
kj::Maybe<uint64_t> tryGetLength() override {
return inner->tryGetLength();
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
return inner->pumpTo(output, amount);
}
private:
// Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment;
kj::Own<kj::AsyncInputStream> inner;
};
class AttachmentWebSocket final: public WebSocket {
// An WebSocket which also owns some separate object, released when the stream is freed.
public:
AttachmentWebSocket(kj::Own<WebSocket> inner, kj::Own<kj::Refcounted> attachment)
: attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
return inner->send(message);
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
return inner->send(message);
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
return inner->close(code, reason);
}
kj::Promise<void> disconnect() override {
return inner->disconnect();
}
kj::Promise<Message> receive() override {
return inner->receive();
}
private:
// Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment;
kj::Own<WebSocket> inner;
};
class NetworkAddressHttpClient final: public HttpClient { class NetworkAddressHttpClient final: public HttpClient {
public: public:
NetworkAddressHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable, NetworkAddressHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
...@@ -2921,10 +2837,10 @@ public: ...@@ -2921,10 +2837,10 @@ public:
kj::Maybe<uint64_t> expectedBodySize = nullptr) override { kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
auto refcounted = getClient(); auto refcounted = getClient();
auto result = refcounted->client->request(method, url, headers, expectedBodySize); auto result = refcounted->client->request(method, url, headers, expectedBodySize);
result.body = kj::heap<AttachmentOutputStream>(kj::mv(result.body), kj::addRef(*refcounted)); result.body = result.body.attach(kj::addRef(*refcounted));
result.response = result.response.then(kj::mvCapture(refcounted, result.response = result.response.then(kj::mvCapture(refcounted,
[](kj::Own<RefcountedClient>&& refcounted, Response&& response) { [](kj::Own<RefcountedClient>&& refcounted, Response&& response) {
response.body = kj::heap<AttachmentInputStream>(kj::mv(response.body), kj::mv(refcounted)); response.body = response.body.attach(kj::mv(refcounted));
return kj::mv(response); return kj::mv(response);
})); }));
return result; return result;
...@@ -2938,8 +2854,7 @@ public: ...@@ -2938,8 +2854,7 @@ public:
[](kj::Own<RefcountedClient>&& refcounted, WebSocketResponse&& response) { [](kj::Own<RefcountedClient>&& refcounted, WebSocketResponse&& response) {
KJ_SWITCH_ONEOF(response.webSocketOrBody) { KJ_SWITCH_ONEOF(response.webSocketOrBody) {
KJ_CASE_ONEOF(body, kj::Own<kj::AsyncInputStream>) { KJ_CASE_ONEOF(body, kj::Own<kj::AsyncInputStream>) {
response.webSocketOrBody.init<kj::Own<kj::AsyncInputStream>>( response.webSocketOrBody = body.attach(kj::mv(refcounted));
kj::heap<AttachmentInputStream>(kj::mv(body), kj::mv(refcounted)));
} }
KJ_CASE_ONEOF(ws, kj::Own<WebSocket>) { KJ_CASE_ONEOF(ws, kj::Own<WebSocket>) {
// The only reason we need to attach the client to the WebSocket is because otherwise // The only reason we need to attach the client to the WebSocket is because otherwise
...@@ -2947,8 +2862,7 @@ public: ...@@ -2947,8 +2862,7 @@ public:
// ownership of the connection. // ownership of the connection.
// //
// TODO(perf): Maybe we could transfer ownership of the response headers specifically? // TODO(perf): Maybe we could transfer ownership of the response headers specifically?
response.webSocketOrBody.init<kj::Own<WebSocket>>( response.webSocketOrBody = ws.attach(kj::mv(refcounted));
kj::heap<AttachmentWebSocket>(kj::mv(ws), kj::mv(refcounted)));
} }
} }
return kj::mv(response); return kj::mv(response);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment