Commit 26914900 authored by Kenton Varda's avatar Kenton Varda

Fix obscure bug where the last outgoing message (and any capabilities therein)…

Fix obscure bug where the last outgoing message (and any capabilities therein) would not get released (until a new message was sent, replacing it as the last).

This took hours to track down, because it initially looked like "Release" messages weren't being honored in some cases (when they happened to be releasing a capability from the last message, and no subsequent messages were sent). Initial attempts to capture this in a unit test failed because the test of course used a subsequent call to detect if the capability had been released, which succeeded.
parent 78283ba3
...@@ -189,8 +189,9 @@ TEST(Capability, AsyncCancelation) { ...@@ -189,8 +189,9 @@ TEST(Capability, AsyncCancelation) {
auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr); auto destructionPromise = paf.promise.then([&]() { destroyed = true; }).eagerlyEvaluate(nullptr);
int callCount = 0; int callCount = 0;
int handleCount = 0;
test::TestMoreStuff::Client client(kj::heap<TestMoreStuffImpl>(callCount)); test::TestMoreStuff::Client client(kj::heap<TestMoreStuffImpl>(callCount, handleCount));
kj::Promise<void> promise = nullptr; kj::Promise<void> promise = nullptr;
......
...@@ -404,6 +404,7 @@ TestNetworkAdapter& TestNetwork::add(kj::StringPtr name) { ...@@ -404,6 +404,7 @@ TestNetworkAdapter& TestNetwork::add(kj::StringPtr name) {
class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> { class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> {
public: public:
int callCount = 0; int callCount = 0;
int handleCount = 0;
Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override { Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override {
switch (objectId.getTag()) { switch (objectId.getTag()) {
...@@ -418,7 +419,7 @@ public: ...@@ -418,7 +419,7 @@ public:
case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER: case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER:
return kj::heap<TestTailCallerImpl>(callCount); return kj::heap<TestTailCallerImpl>(callCount);
case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF: case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF:
return kj::heap<TestMoreStuffImpl>(callCount); return kj::heap<TestMoreStuffImpl>(callCount, handleCount);
} }
KJ_UNREACHABLE; KJ_UNREACHABLE;
} }
...@@ -529,6 +530,34 @@ TEST(Rpc, Pipelining) { ...@@ -529,6 +530,34 @@ TEST(Rpc, Pipelining) {
EXPECT_EQ(1, chainedCallCount); EXPECT_EQ(1, chainedCallCount);
} }
TEST(Rpc, Release) {
TestContext context;
auto client = context.connect(test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF)
.castAs<test::TestMoreStuff>();
auto handle1 = client.getHandleRequest().send().wait(context.waitScope).getHandle();
auto promise = client.getHandleRequest().send();
auto handle2 = promise.wait(context.waitScope).getHandle();
EXPECT_EQ(2, context.restorer.handleCount);
handle1 = nullptr;
for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope);
EXPECT_EQ(1, context.restorer.handleCount);
handle2 = nullptr;
for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope);
EXPECT_EQ(1, context.restorer.handleCount);
promise = nullptr;
for (uint i = 0; i < 16; i++) kj::evalLater([]() {}).wait(context.waitScope);
EXPECT_EQ(0, context.restorer.handleCount);
}
TEST(Rpc, TailCall) { TEST(Rpc, TailCall) {
TestContext context; TestContext context;
......
...@@ -32,7 +32,8 @@ namespace { ...@@ -32,7 +32,8 @@ namespace {
class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> { class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> {
public: public:
TestRestorer(int& callCount): callCount(callCount) {} TestRestorer(int& callCount, int& handleCount)
: callCount(callCount), handleCount(handleCount) {}
Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override { Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override {
switch (objectId.getTag()) { switch (objectId.getTag()) {
...@@ -47,21 +48,23 @@ public: ...@@ -47,21 +48,23 @@ public:
case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER: case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER:
return kj::heap<TestTailCallerImpl>(callCount); return kj::heap<TestTailCallerImpl>(callCount);
case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF: case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF:
return kj::heap<TestMoreStuffImpl>(callCount); return kj::heap<TestMoreStuffImpl>(callCount, handleCount);
} }
KJ_UNREACHABLE; KJ_UNREACHABLE;
} }
private: private:
int& callCount; int& callCount;
int& handleCount;
}; };
kj::AsyncIoProvider::PipeThread runServer(kj::AsyncIoProvider& ioProvider, int& callCount) { kj::AsyncIoProvider::PipeThread runServer(kj::AsyncIoProvider& ioProvider,
int& callCount, int& handleCount) {
return ioProvider.newPipeThread( return ioProvider.newPipeThread(
[&callCount](kj::AsyncIoProvider& ioProvider, kj::AsyncIoStream& stream, [&callCount, &handleCount](
kj::WaitScope& waitScope) { kj::AsyncIoProvider& ioProvider, kj::AsyncIoStream& stream, kj::WaitScope& waitScope) {
TwoPartyVatNetwork network(stream, rpc::twoparty::Side::SERVER); TwoPartyVatNetwork network(stream, rpc::twoparty::Side::SERVER);
TestRestorer restorer(callCount); TestRestorer restorer(callCount, handleCount);
auto server = makeRpcServer(network, restorer); auto server = makeRpcServer(network, restorer);
network.onDisconnect().wait(waitScope); network.onDisconnect().wait(waitScope);
}); });
...@@ -86,8 +89,9 @@ Capability::Client getPersistentCap(RpcSystem<rpc::twoparty::SturdyRefHostId>& c ...@@ -86,8 +89,9 @@ Capability::Client getPersistentCap(RpcSystem<rpc::twoparty::SturdyRefHostId>& c
TEST(TwoPartyNetwork, Basic) { TEST(TwoPartyNetwork, Basic) {
auto ioContext = kj::setupAsyncIo(); auto ioContext = kj::setupAsyncIo();
int callCount = 0; int callCount = 0;
int handleCount = 0;
auto serverThread = runServer(*ioContext.provider, callCount); auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT); TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
auto rpcClient = makeRpcClient(network); auto rpcClient = makeRpcClient(network);
...@@ -131,9 +135,10 @@ TEST(TwoPartyNetwork, Basic) { ...@@ -131,9 +135,10 @@ TEST(TwoPartyNetwork, Basic) {
TEST(TwoPartyNetwork, Pipelining) { TEST(TwoPartyNetwork, Pipelining) {
auto ioContext = kj::setupAsyncIo(); auto ioContext = kj::setupAsyncIo();
int callCount = 0; int callCount = 0;
int handleCount = 0;
int reverseCallCount = 0; // Calls back from server to client. int reverseCallCount = 0; // Calls back from server to client.
auto serverThread = runServer(*ioContext.provider, callCount); auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT); TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
auto rpcClient = makeRpcClient(network); auto rpcClient = makeRpcClient(network);
...@@ -209,6 +214,51 @@ TEST(TwoPartyNetwork, Pipelining) { ...@@ -209,6 +214,51 @@ TEST(TwoPartyNetwork, Pipelining) {
} }
} }
TEST(TwoPartyNetwork, Release) {
auto ioContext = kj::setupAsyncIo();
int callCount = 0;
int handleCount = 0;
auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
auto rpcClient = makeRpcClient(network);
// Request the particular capability from the server.
auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF).castAs<test::TestMoreStuff>();
auto handle1 = client.getHandleRequest().send().wait(ioContext.waitScope).getHandle();
auto promise = client.getHandleRequest().send();
auto handle2 = promise.wait(ioContext.waitScope).getHandle();
EXPECT_EQ(2, handleCount);
handle1 = nullptr;
// There once was a bug where the last outgoing message (and any capabilities attached) would
// not get cleaned up (until a new message was sent). This appeared to be a bug in Release,
// becaues if a client received a message and then released a capability from it but then did
// not make any further calls, then the capability would not be released because the message
// introducing it remained the last server -> client message (because a "Release" message has
// no reply). Here we are explicitly trying to catch this bug. This proves tricky, because when
// we drop a reference on the client side, there's no particular way to wait for the release
// message to reach the server except to make a subsequent call and wait for the return -- but
// that would mask the bug. So we wait 10ms...
ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
EXPECT_EQ(1, handleCount);
handle2 = nullptr;
ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
EXPECT_EQ(1, handleCount);
promise = nullptr;
ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
EXPECT_EQ(0, handleCount);
}
} // namespace } // namespace
} // namespace _ } // namespace _
} // namespace capnp } // namespace capnp
...@@ -86,9 +86,12 @@ public: ...@@ -86,9 +86,12 @@ public:
// Note that if the write fails, all further writes will be skipped due to the exception. // Note that if the write fails, all further writes will be skipped due to the exception.
// We never actually handle this exception because we assume the read end will fail as well // We never actually handle this exception because we assume the read end will fail as well
// and it's cleaner to handle the failure there. // and it's cleaner to handle the failure there.
auto promise = writeMessage(network.stream, message).eagerlyEvaluate(nullptr); return writeMessage(network.stream, message);
return kj::mv(promise); }).attach(kj::addRef(*this))
}).attach(kj::addRef(*this)); // Note that it's important that the eagerlyEvaluate() come *after* the attach() because
// otherwise the message (and any capabilities in it) will not be released until a new
// message is written! (Kenton once spent all afternoon tracking this down...)
.eagerlyEvaluate(nullptr);
} }
private: private:
......
...@@ -959,7 +959,8 @@ kj::Promise<void> TestTailCalleeImpl::foo(FooContext context) { ...@@ -959,7 +959,8 @@ kj::Promise<void> TestTailCalleeImpl::foo(FooContext context) {
return kj::READY_NOW; return kj::READY_NOW;
} }
TestMoreStuffImpl::TestMoreStuffImpl(int& callCount): callCount(callCount) {} TestMoreStuffImpl::TestMoreStuffImpl(int& callCount, int& handleCount)
: callCount(callCount), handleCount(handleCount) {}
kj::Promise<void> TestMoreStuffImpl::getCallSequence(GetCallSequenceContext context) { kj::Promise<void> TestMoreStuffImpl::getCallSequence(GetCallSequenceContext context) {
auto result = context.getResults(); auto result = context.getResults();
...@@ -1071,5 +1072,19 @@ kj::Promise<void> TestMoreStuffImpl::loop(uint depth, test::TestInterface::Clien ...@@ -1071,5 +1072,19 @@ kj::Promise<void> TestMoreStuffImpl::loop(uint depth, test::TestInterface::Clien
} }
} }
class HandleImpl final: public test::TestHandle::Server {
public:
HandleImpl(int& count): count(count) { ++count; }
~HandleImpl() { --count; }
private:
int& count;
};
kj::Promise<void> TestMoreStuffImpl::getHandle(GetHandleContext context) {
context.getResults().setHandle(kj::heap<HandleImpl>(handleCount));
return kj::READY_NOW;
}
} // namespace _ (private) } // namespace _ (private)
} // namespace capnp } // namespace capnp
...@@ -220,7 +220,7 @@ private: ...@@ -220,7 +220,7 @@ private:
class TestMoreStuffImpl final: public test::TestMoreStuff::Server { class TestMoreStuffImpl final: public test::TestMoreStuff::Server {
public: public:
TestMoreStuffImpl(int& callCount); TestMoreStuffImpl(int& callCount, int& handleCount);
kj::Promise<void> getCallSequence(GetCallSequenceContext context) override; kj::Promise<void> getCallSequence(GetCallSequenceContext context) override;
...@@ -240,8 +240,11 @@ public: ...@@ -240,8 +240,11 @@ public:
kj::Promise<void> expectCancel(ExpectCancelContext context) override; kj::Promise<void> expectCancel(ExpectCancelContext context) override;
kj::Promise<void> getHandle(GetHandleContext context) override;
private: private:
int& callCount; int& callCount;
int& handleCount;
test::TestInterface::Client clientToHold = nullptr; test::TestInterface::Client clientToHold = nullptr;
kj::Promise<void> loop(uint depth, test::TestInterface::Client cap, ExpectCancelContext context); kj::Promise<void> loop(uint depth, test::TestInterface::Client cap, ExpectCancelContext context);
......
...@@ -629,6 +629,8 @@ interface TestTailCaller { ...@@ -629,6 +629,8 @@ interface TestTailCaller {
foo @0 (i :Int32, callee :TestTailCallee) -> TestTailCallee.TailResult; foo @0 (i :Int32, callee :TestTailCallee) -> TestTailCallee.TailResult;
} }
interface TestHandle {}
interface TestMoreStuff extends(TestCallOrder) { interface TestMoreStuff extends(TestCallOrder) {
# Catch-all type that contains lots of testing methods. # Catch-all type that contains lots of testing methods.
...@@ -657,6 +659,10 @@ interface TestMoreStuff extends(TestCallOrder) { ...@@ -657,6 +659,10 @@ interface TestMoreStuff extends(TestCallOrder) {
# evalLater()-loops forever, holding `cap`. Must be canceled. # evalLater()-loops forever, holding `cap`. Must be canceled.
methodWithDefaults @8 (a :Text, b :UInt32 = 123, c :Text = "foo") -> (d :Text, e :Text = "bar"); methodWithDefaults @8 (a :Text, b :UInt32 = 123, c :Text = "foo") -> (d :Text, e :Text = "bar");
getHandle @9 () -> (handle :TestHandle);
# Get a new handle. Tests have an out-of-band way to check the current number of live handles, so
# this can be used to test garbage collection.
} }
interface TestKeywordMethods { interface TestKeywordMethods {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment