Commit 2bf1dbf8 authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #565 from capnproto/own-attach

Add Own<T>::attach() which is much like Promise<T>::attach()
parents bacf9744 3ff5d10e
......@@ -2812,90 +2812,6 @@ public:
kj::Maybe<kj::Own<AsyncOutputStream>> stream;
};
class AttachmentOutputStream final: public kj::AsyncOutputStream {
// An AsyncOutputStream which also owns some separate object, released when the stream is freed.
public:
AttachmentOutputStream(kj::Own<kj::AsyncOutputStream> inner, kj::Own<kj::Refcounted> attachment)
: attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<void> write(const void* buffer, size_t size) override {
return inner->write(buffer, size);
}
kj::Promise<void> write(kj::ArrayPtr<const kj::ArrayPtr<const byte>> pieces) override {
return inner->write(pieces);
}
kj::Maybe<kj::Promise<uint64_t>> tryPumpFrom(
kj::AsyncInputStream& input, uint64_t amount = kj::maxValue) override {
return input.pumpTo(*inner, amount);
}
private:
// Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment;
kj::Own<kj::AsyncOutputStream> inner;
};
class AttachmentInputStream final: public kj::AsyncInputStream {
// An AsyncInputStream which also owns some separate object, released when the stream is freed.
public:
AttachmentInputStream(kj::Own<kj::AsyncInputStream> inner, kj::Own<kj::Refcounted> attachment)
: attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<size_t> read(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->read(buffer, minBytes, maxBytes);
}
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
return inner->tryRead(buffer, minBytes, maxBytes);
}
kj::Maybe<uint64_t> tryGetLength() override {
return inner->tryGetLength();
}
kj::Promise<uint64_t> pumpTo(kj::AsyncOutputStream& output, uint64_t amount) override {
return inner->pumpTo(output, amount);
}
private:
// Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment;
kj::Own<kj::AsyncInputStream> inner;
};
class AttachmentWebSocket final: public WebSocket {
// An WebSocket which also owns some separate object, released when the stream is freed.
public:
AttachmentWebSocket(kj::Own<WebSocket> inner, kj::Own<kj::Refcounted> attachment)
: attachment(kj::mv(attachment)), inner(kj::mv(inner)) {}
kj::Promise<void> send(kj::ArrayPtr<const byte> message) override {
return inner->send(message);
}
kj::Promise<void> send(kj::ArrayPtr<const char> message) override {
return inner->send(message);
}
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
return inner->close(code, reason);
}
kj::Promise<void> disconnect() override {
return inner->disconnect();
}
kj::Promise<Message> receive() override {
return inner->receive();
}
private:
// Note that it's important that `inner` be destroyed first since it typically depends on
// `attachment`.
kj::Own<kj::Refcounted> attachment;
kj::Own<WebSocket> inner;
};
class NetworkAddressHttpClient final: public HttpClient {
public:
NetworkAddressHttpClient(kj::Timer& timer, HttpHeaderTable& responseHeaderTable,
......@@ -2921,10 +2837,10 @@ public:
kj::Maybe<uint64_t> expectedBodySize = nullptr) override {
auto refcounted = getClient();
auto result = refcounted->client->request(method, url, headers, expectedBodySize);
result.body = kj::heap<AttachmentOutputStream>(kj::mv(result.body), kj::addRef(*refcounted));
result.body = result.body.attach(kj::addRef(*refcounted));
result.response = result.response.then(kj::mvCapture(refcounted,
[](kj::Own<RefcountedClient>&& refcounted, Response&& response) {
response.body = kj::heap<AttachmentInputStream>(kj::mv(response.body), kj::mv(refcounted));
response.body = response.body.attach(kj::mv(refcounted));
return kj::mv(response);
}));
return result;
......@@ -2938,8 +2854,7 @@ public:
[](kj::Own<RefcountedClient>&& refcounted, WebSocketResponse&& response) {
KJ_SWITCH_ONEOF(response.webSocketOrBody) {
KJ_CASE_ONEOF(body, kj::Own<kj::AsyncInputStream>) {
response.webSocketOrBody.init<kj::Own<kj::AsyncInputStream>>(
kj::heap<AttachmentInputStream>(kj::mv(body), kj::mv(refcounted)));
response.webSocketOrBody = body.attach(kj::mv(refcounted));
}
KJ_CASE_ONEOF(ws, kj::Own<WebSocket>) {
// The only reason we need to attach the client to the WebSocket is because otherwise
......@@ -2947,8 +2862,7 @@ public:
// ownership of the connection.
//
// TODO(perf): Maybe we could transfer ownership of the response headers specifically?
response.webSocketOrBody.init<kj::Own<WebSocket>>(
kj::heap<AttachmentWebSocket>(kj::mv(ws), kj::mv(refcounted)));
response.webSocketOrBody = ws.attach(kj::mv(refcounted));
}
}
return kj::mv(response);
......
......@@ -65,6 +65,77 @@ TEST(Memory, AssignNested) {
EXPECT_TRUE(destroyed1 && destroyed2);
}
struct DestructionOrderRecorder {
DestructionOrderRecorder(uint& counter, uint& recordTo)
: counter(counter), recordTo(recordTo) {}
~DestructionOrderRecorder() {
recordTo = ++counter;
}
uint& counter;
uint& recordTo;
};
TEST(Memory, Attach) {
uint counter = 0;
uint destroyed1 = 0;
uint destroyed2 = 0;
uint destroyed3 = 0;
auto obj1 = kj::heap<DestructionOrderRecorder>(counter, destroyed1);
auto obj2 = kj::heap<DestructionOrderRecorder>(counter, destroyed2);
auto obj3 = kj::heap<DestructionOrderRecorder>(counter, destroyed3);
auto ptr = obj1.get();
Own<DestructionOrderRecorder> combined = obj1.attach(kj::mv(obj2), kj::mv(obj3));
KJ_EXPECT(combined.get() == ptr);
KJ_EXPECT(obj1.get() == nullptr);
KJ_EXPECT(obj2.get() == nullptr);
KJ_EXPECT(obj3.get() == nullptr);
KJ_EXPECT(destroyed1 == 0);
KJ_EXPECT(destroyed2 == 0);
KJ_EXPECT(destroyed3 == 0);
combined = nullptr;
KJ_EXPECT(destroyed1 == 1, destroyed1);
KJ_EXPECT(destroyed2 == 2, destroyed2);
KJ_EXPECT(destroyed3 == 3, destroyed3);
}
TEST(Memory, AttachNested) {
uint counter = 0;
uint destroyed1 = 0;
uint destroyed2 = 0;
uint destroyed3 = 0;
auto obj1 = kj::heap<DestructionOrderRecorder>(counter, destroyed1);
auto obj2 = kj::heap<DestructionOrderRecorder>(counter, destroyed2);
auto obj3 = kj::heap<DestructionOrderRecorder>(counter, destroyed3);
auto ptr = obj1.get();
Own<DestructionOrderRecorder> combined = obj1.attach(kj::mv(obj2)).attach(kj::mv(obj3));
KJ_EXPECT(combined.get() == ptr);
KJ_EXPECT(obj1.get() == nullptr);
KJ_EXPECT(obj2.get() == nullptr);
KJ_EXPECT(obj3.get() == nullptr);
KJ_EXPECT(destroyed1 == 0);
KJ_EXPECT(destroyed2 == 0);
KJ_EXPECT(destroyed3 == 0);
combined = nullptr;
KJ_EXPECT(destroyed1 == 1, destroyed1);
KJ_EXPECT(destroyed2 == 2, destroyed2);
KJ_EXPECT(destroyed3 == 3, destroyed3);
}
// TODO(test): More tests.
} // namespace
......
......@@ -150,6 +150,15 @@ public:
return *this;
}
template <typename... Attachments>
Own<T> attach(Attachments&&... attachments);
// Returns an Own<T> which points to the same object but which also ensures that all values
// passed to `attachments` remain alive until after this object is destroyed. Normally
// `attachments` are other Own<?>s pointing to objects that this one depends on.
//
// Note that attachments will eventually be destroyed in the order they are listed. Hence,
// foo.attach(bar, baz) is equivalent to (but more efficient than) foo.attach(bar).attach(baz).
template <typename U>
Own<U> downcast() {
// Downcast the pointer to Own<U>, destroying the original pointer. If this pointer does not
......@@ -401,6 +410,50 @@ void Disposer::dispose(T* object) const {
Dispose_<T>::dispose(object, *this);
}
namespace _ { // private
template <typename... T>
struct OwnedBundle;
template <>
struct OwnedBundle<> {};
template <typename First, typename... Rest>
struct OwnedBundle<First, Rest...>: public OwnedBundle<Rest...> {
OwnedBundle(First&& first, Rest&&... rest)
: OwnedBundle<Rest...>(kj::fwd<Rest>(rest)...), first(kj::fwd<First>(first)) {}
// Note that it's intentional that `first` is destroyed before `rest`. This way, doing
// ptr.attach(foo, bar, baz) is equivalent to ptr.attach(foo).attach(bar).attach(baz) in terms
// of destruction order (although the former does fewer allocations).
Decay<First> first;
};
template <typename... T>
struct DisposableOwnedBundle final: public Disposer, public OwnedBundle<T...> {
DisposableOwnedBundle(T&&... values): OwnedBundle<T...>(kj::fwd<T>(values)...) {}
void disposeImpl(void* pointer) const override { delete this; }
};
} // namespace _ (private)
template <typename T>
template <typename... Attachments>
Own<T> Own<T>::attach(Attachments&&... attachments) {
T* ptrCopy = ptr;
KJ_IREQUIRE(ptrCopy != nullptr, "cannot attach to null pointer");
// HACK: If someone accidentally calls .attach() on a null pointer in opt mode, try our best to
// accomplish reasonable behavior: We turn the pointer non-null but still invalid, so that the
// disposer will still be called when the pointer goes out of scope.
if (ptrCopy == nullptr) ptrCopy = reinterpret_cast<T*>(1);
auto bundle = new _::DisposableOwnedBundle<Own<T>, Attachments...>(
kj::mv(*this), kj::fwd<Attachments>(attachments)...);
return Own<T>(ptrCopy, *bundle);
}
} // namespace kj
#endif // KJ_MEMORY_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment