Commit 0a84a3a7 authored by Kenton Varda's avatar Kenton Varda

Make TwoPartyVatNetwork::onDrained() work correctly on the server side. …

Make TwoPartyVatNetwork::onDrained() work correctly on the server side.  Previously only worked on the client side, where it's actually not very useful.  Fixes #71.  There's a deeper problem, though, that simply holding a capability received from the client will hold the dead connection open.
parent 7405f5cc
...@@ -66,6 +66,7 @@ kj::AsyncIoProvider::PipeThread runServer(kj::AsyncIoProvider& ioProvider, int& ...@@ -66,6 +66,7 @@ kj::AsyncIoProvider::PipeThread runServer(kj::AsyncIoProvider& ioProvider, int&
TestRestorer restorer(callCount); TestRestorer restorer(callCount);
auto server = makeRpcServer(network, restorer); auto server = makeRpcServer(network, restorer);
network.onDisconnect().wait(waitScope); network.onDisconnect().wait(waitScope);
network.onDrained().wait(waitScope);
}); });
} }
......
...@@ -32,22 +32,43 @@ TwoPartyVatNetwork::TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty: ...@@ -32,22 +32,43 @@ TwoPartyVatNetwork::TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty:
: stream(stream), side(side), receiveOptions(receiveOptions), previousWrite(kj::READY_NOW) { : stream(stream), side(side), receiveOptions(receiveOptions), previousWrite(kj::READY_NOW) {
{ {
auto paf = kj::newPromiseAndFulfiller<void>(); auto paf = kj::newPromiseAndFulfiller<void>();
disconnectPromise = paf.promise.fork(); drainedPromise = paf.promise.fork();
disconnectFulfiller = kj::mv(paf.fulfiller); drainedFulfiller.fulfiller = kj::mv(paf.fulfiller);
} }
{ {
auto paf = kj::newPromiseAndFulfiller<void>(); auto paf = kj::newPromiseAndFulfiller<void>();
drainedPromise = paf.promise.fork();
drainedFulfiller.fulfiller = kj::mv(paf.fulfiller); // If the RPC system on this side drops the connection, thus firing onDrained() before
// onDisconnected(), we also want to consider ourselves disconnected. Otherwise, we might
// not detect actual disconnect because the RPC system won't attempt to send or receive any
// more messages on the connection. So, we exclusive-join the disconnect promise with the
// first branch of drainedPromise.
disconnectPromise = paf.promise.exclusiveJoin(drainedPromise.addBranch()).fork();
disconnectFulfiller = kj::mv(paf.fulfiller);
} }
} }
void TwoPartyVatNetwork::FulfillerDisposer::disposeImpl(void* pointer) const {
KJ_DBG("deref", this, refcount);
if (--refcount == 0) {
fulfiller->fulfill();
}
}
kj::Own<TwoPartyVatNetworkBase::Connection> TwoPartyVatNetwork::asConnection() {
KJ_DBG("ref", &drainedFulfiller, drainedFulfiller.refcount);
++drainedFulfiller.refcount;
return kj::Own<TwoPartyVatNetworkBase::Connection>(this, drainedFulfiller);
}
kj::Maybe<kj::Own<TwoPartyVatNetworkBase::Connection>> TwoPartyVatNetwork::connectToRefHost( kj::Maybe<kj::Own<TwoPartyVatNetworkBase::Connection>> TwoPartyVatNetwork::connectToRefHost(
rpc::twoparty::SturdyRefHostId::Reader ref) { rpc::twoparty::SturdyRefHostId::Reader ref) {
if (ref.getSide() == side) { if (ref.getSide() == side) {
return nullptr; return nullptr;
} else { } else {
return kj::Own<TwoPartyVatNetworkBase::Connection>(this, drainedFulfiller); return asConnection();
} }
} }
...@@ -55,8 +76,7 @@ kj::Promise<kj::Own<TwoPartyVatNetworkBase::Connection>> ...@@ -55,8 +76,7 @@ kj::Promise<kj::Own<TwoPartyVatNetworkBase::Connection>>
TwoPartyVatNetwork::acceptConnectionAsRefHost() { TwoPartyVatNetwork::acceptConnectionAsRefHost() {
if (side == rpc::twoparty::Side::SERVER && !accepted) { if (side == rpc::twoparty::Side::SERVER && !accepted) {
accepted = true; accepted = true;
return kj::Own<TwoPartyVatNetworkBase::Connection>(this, return asConnection();
kj::DestructorOnlyDisposer<TwoPartyVatNetworkBase::Connection>::instance);
} else { } else {
// Create a promise that will never be fulfilled. // Create a promise that will never be fulfilled.
auto paf = kj::newPromiseAndFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>(); auto paf = kj::newPromiseAndFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>();
......
...@@ -49,12 +49,19 @@ public: ...@@ -49,12 +49,19 @@ public:
kj::Promise<void> onDisconnect() { return disconnectPromise.addBranch(); } kj::Promise<void> onDisconnect() { return disconnectPromise.addBranch(); }
// Returns a promise that resolves when the peer disconnects. // Returns a promise that resolves when the peer disconnects.
//
// TODO(soon): Currently this fires when the underlying physical connection breaks. It should
// fire after the RPC system has detected EOF itself and dropped its connection reference, so
// that it has a chance to reply to connections ended cleanly.
kj::Promise<void> onDrained() { return drainedPromise.addBranch(); } kj::Promise<void> onDrained() { return drainedPromise.addBranch(); }
// Returns a promise that resolves once the peer has disconnected *and* all local objects // Returns a promise that resolves once the peer has disconnected *and* all local objects
// referencing this connection have been destroyed. A caller might use this to decide when it // referencing this connection have been destroyed. A caller might use this to decide when it
// is safe to destroy the RpcSystem, if it isn't able to reliably destroy all objects using it // is safe to destroy the RpcSystem, if it isn't able to reliably destroy all objects using it
// directly. // directly.
//
// TODO(soon): This is not quite designed right. Those local objects should simply be disabled.
// Their existence should not prevent the RpcSystem from being destroyed.
// implements VatNetwork ----------------------------------------------------- // implements VatNetwork -----------------------------------------------------
...@@ -83,13 +90,22 @@ private: ...@@ -83,13 +90,22 @@ private:
kj::ForkedPromise<void> drainedPromise = nullptr; kj::ForkedPromise<void> drainedPromise = nullptr;
class FulfillerDisposer: public kj::Disposer { class FulfillerDisposer: public kj::Disposer {
// Hack: TwoPartyVatNetwork is both a VatNetwork and a VatNetwork::Connection. When all
// references to the Connection have been dropped, then we want onDrained() to fire. So we
// hand out Own<Connection>s with this disposer attached, so that we can detect when they are
// dropped.
public: public:
mutable kj::Own<kj::PromiseFulfiller<void>> fulfiller; mutable kj::Own<kj::PromiseFulfiller<void>> fulfiller;
mutable uint refcount = 0;
void disposeImpl(void* pointer) const override { fulfiller->fulfill(); } void disposeImpl(void* pointer) const override;
}; };
FulfillerDisposer drainedFulfiller; FulfillerDisposer drainedFulfiller;
kj::Own<TwoPartyVatNetworkBase::Connection> asConnection();
// Returns a pointer to this with the disposer set to drainedFulfiller.
// implements Connection ----------------------------------------------------- // implements Connection -----------------------------------------------------
kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override; kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment