Commit 86276457 authored by Kenton Varda's avatar Kenton Varda

Fix RealmGateway to avoid double-transforms on loopback.

Previously, in the presence of promise capabilities, a save() request could cross over the same gateway twice (or more!). Some gateways (e.g. Sandstorm's) implement one-way transformations, so this could cause the request to fail when it shouldn't.
parent 03f39afe
...@@ -545,6 +545,10 @@ kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& pro ...@@ -545,6 +545,10 @@ kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& pro
return kj::refcounted<QueuedClient>(kj::mv(promise)); return kj::refcounted<QueuedClient>(kj::mv(promise));
} }
kj::Own<PipelineHook> newLocalPromisePipeline(kj::Promise<kj::Own<PipelineHook>>&& promise) {
return kj::refcounted<QueuedPipeline>(kj::mv(promise));
}
// ======================================================================================= // =======================================================================================
namespace { namespace {
......
...@@ -595,6 +595,10 @@ kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& pro ...@@ -595,6 +595,10 @@ kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& pro
// the new client. This hook's `getResolved()` and `whenMoreResolved()` methods will reflect the // the new client. This hook's `getResolved()` and `whenMoreResolved()` methods will reflect the
// redirection to the eventual replacement client. // redirection to the eventual replacement client.
kj::Own<PipelineHook> newLocalPromisePipeline(kj::Promise<kj::Own<PipelineHook>>&& promise);
// Returns a PipelineHook that queues up calls until `promise` resolves, then forwards them to
// the new pipeline.
kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason); kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason);
kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason); kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason);
// Helper function that creates a capability which simply throws exceptions when called. // Helper function that creates a capability which simply throws exceptions when called.
......
...@@ -1147,6 +1147,114 @@ TEST(Rpc, RealmGatewayExport) { ...@@ -1147,6 +1147,114 @@ TEST(Rpc, RealmGatewayExport) {
EXPECT_EQ("exported-foo", response.getSturdyRef()); EXPECT_EQ("exported-foo", response.getSturdyRef());
} }
TEST(Rpc, RealmGatewayImportExport) {
// Test that a save request which leaves the realm, bounces through a promise capability, and
// then comes back into the realm, does not actually get translated both ways.
TestRealmGateway::Client gateway = kj::heap<TestGateway>();
Persistent<test::TestSturdyRef>::Client bootstrap = kj::heap<TestPersistent>("foo");
MallocMessageBuilder serverHostIdBuilder;
auto serverHostId = serverHostIdBuilder.getRoot<test::TestSturdyRefHostId>();
serverHostId.setHost("server");
MallocMessageBuilder clientHostIdBuilder;
auto clientHostId = clientHostIdBuilder.getRoot<test::TestSturdyRefHostId>();
clientHostId.setHost("client");
kj::EventLoop loop;
kj::WaitScope waitScope(loop);
TestNetwork network;
TestRestorer restorer;
TestNetworkAdapter& clientNetwork = network.add("client");
TestNetworkAdapter& serverNetwork = network.add("server");
RpcSystem<test::TestSturdyRefHostId> rpcClient =
makeRpcServer(clientNetwork, bootstrap, gateway);
auto paf = kj::newPromiseAndFulfiller<Capability::Client>();
RpcSystem<test::TestSturdyRefHostId> rpcServer =
makeRpcServer(serverNetwork, kj::mv(paf.promise));
auto client = rpcClient.bootstrap(serverHostId).castAs<Persistent<test::TestSturdyRef>>();
bool responseReady = false;
auto responsePromise = client.saveRequest().send()
.then([&](Response<Persistent<test::TestSturdyRef>::SaveResults>&& response) {
responseReady = true;
return kj::mv(response);
}).eagerlyEvaluate(nullptr);
// Crank the event loop to give the message time to reach the server and block on the promise
// resolution.
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
EXPECT_FALSE(responseReady);
paf.fulfiller->fulfill(rpcServer.bootstrap(clientHostId));
auto response = responsePromise.wait(waitScope);
// Should have the original value. If it went through export and re-import, though, then this
// will be "imported-exported-foo", which is wrong.
EXPECT_EQ("foo", response.getSturdyRef().getObjectId().getAs<Text>());
}
TEST(Rpc, RealmGatewayImportExport) {
// Test that a save request which enters the realm, bounces through a promise capability, and
// then goes back out of the realm, does not actually get translated both ways.
TestRealmGateway::Client gateway = kj::heap<TestGateway>();
Persistent<Text>::Client bootstrap = kj::heap<TestPersistentText>("foo");
MallocMessageBuilder serverHostIdBuilder;
auto serverHostId = serverHostIdBuilder.getRoot<test::TestSturdyRefHostId>();
serverHostId.setHost("server");
MallocMessageBuilder clientHostIdBuilder;
auto clientHostId = clientHostIdBuilder.getRoot<test::TestSturdyRefHostId>();
clientHostId.setHost("client");
kj::EventLoop loop;
kj::WaitScope waitScope(loop);
TestNetwork network;
TestRestorer restorer;
TestNetworkAdapter& clientNetwork = network.add("client");
TestNetworkAdapter& serverNetwork = network.add("server");
RpcSystem<test::TestSturdyRefHostId> rpcClient =
makeRpcServer(clientNetwork, bootstrap);
auto paf = kj::newPromiseAndFulfiller<Capability::Client>();
RpcSystem<test::TestSturdyRefHostId> rpcServer =
makeRpcServer(serverNetwork, kj::mv(paf.promise), gateway);
auto client = rpcClient.bootstrap(serverHostId).castAs<Persistent<Text>>();
bool responseReady = false;
auto responsePromise = client.saveRequest().send()
.then([&](Response<Persistent<Text>::SaveResults>&& response) {
responseReady = true;
return kj::mv(response);
}).eagerlyEvaluate(nullptr);
// Crank the event loop to give the message time to reach the server and block on the promise
// resolution.
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
kj::evalLater([]() {}).wait(waitScope);
EXPECT_FALSE(responseReady);
paf.fulfiller->fulfill(rpcServer.bootstrap(clientHostId));
auto response = responsePromise.wait(waitScope);
// Should have the original value. If it went through import and re-export, though, then this
// will be "exported-imported-foo", which is wrong.
EXPECT_EQ("foo", response.getSturdyRef());
}
} // namespace } // namespace
} // namespace _ (private) } // namespace _ (private)
} // namespace capnp } // namespace capnp
...@@ -663,7 +663,7 @@ private: ...@@ -663,7 +663,7 @@ private:
// Implement call() by copying params and results messages. // Implement call() by copying params and results messages.
auto params = context->getParams(); auto params = context->getParams();
auto request = newCall(interfaceId, methodId, params.targetSize()); auto request = newCallNoIntercept(interfaceId, methodId, params.targetSize());
request.set(params); request.set(params);
context->releaseParams(); context->releaseParams();
...@@ -865,12 +865,42 @@ private: ...@@ -865,12 +865,42 @@ private:
Request<AnyPointer, AnyPointer> newCall( Request<AnyPointer, AnyPointer> newCall(
uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override { uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
if (!isResolved && interfaceId == typeId<Persistent<>>() && methodId == 0 &&
connectionState->gateway != nullptr) {
// This is a call to Persistent.save(), and we're not resolved yet, and the underlying
// remote capability will perform a gateway translation. This isn't right if the promise
// ultimately resolves to a local capability. Instead, we'll need to queue the call until
// the promise resolves.
return newLocalPromiseClient(fork.addBranch())
->newCall(interfaceId, methodId, sizeHint);
}
receivedCall = true; receivedCall = true;
return cap->newCall(interfaceId, methodId, sizeHint); return cap->newCall(interfaceId, methodId, sizeHint);
} }
VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId, VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
kj::Own<CallContextHook>&& context) override { kj::Own<CallContextHook>&& context) override {
if (!isResolved && interfaceId == typeId<Persistent<>>() && methodId == 0 &&
connectionState->gateway != nullptr) {
// This is a call to Persistent.save(), and we're not resolved yet, and the underlying
// remote capability will perform a gateway translation. This isn't right if the promise
// ultimately resolves to a local capability. Instead, we'll need to queue the call until
// the promise resolves.
auto vpapPromises = fork.addBranch().then(kj::mvCapture(context,
[interfaceId,methodId](kj::Own<CallContextHook>&& context,
kj::Own<ClientHook> resolvedCap) {
auto vpap = resolvedCap->call(interfaceId, methodId, kj::mv(context));
return kj::tuple(kj::mv(vpap.promise), kj::mv(vpap.pipeline));
})).split();
return {
kj::mv(kj::get<0>(vpapPromises)),
newLocalPromisePipeline(kj::mv(kj::get<1>(vpapPromises))),
};
}
receivedCall = true; receivedCall = true;
return cap->call(interfaceId, methodId, kj::mv(context)); return cap->call(interfaceId, methodId, kj::mv(context));
} }
...@@ -2305,6 +2335,31 @@ private: ...@@ -2305,6 +2335,31 @@ private:
// Wait, this is a call to Persistent.save() and we need to translate it through our // Wait, this is a call to Persistent.save() and we need to translate it through our
// gateway. // gateway.
KJ_IF_MAYBE(resolvedPromise, capability->whenMoreResolved()) {
// The plot thickens: We're looking at a promise capability. It could end up resolving
// to a capability outside the gateway, in which case we don't want to translate at all.
auto promises = resolvedPromise->then(kj::mvCapture(context,
[this,interfaceId,methodId](kj::Own<CallContextHook>&& context,
kj::Own<ClientHook> resolvedCap) {
auto vpap = startCall(interfaceId, methodId, kj::mv(resolvedCap), kj::mv(context));
return kj::tuple(kj::mv(vpap.promise), kj::mv(vpap.pipeline));
})).attach(addRef(*this)).split();
return {
kj::mv(kj::get<0>(promises)),
newLocalPromisePipeline(kj::mv(kj::get<1>(promises))),
};
}
if (capability->getBrand() == this) {
// This capability is one of our own, pointing back out over the network. That means
// that it would be inappropriate to apply the gateway transformation. We just want to
// reflect the call back.
return kj::downcast<RpcClient>(*capability)
.callNoIntercept(interfaceId, methodId, kj::mv(context));
}
auto params = context->getParams().getAs<Persistent<>::SaveParams>(); auto params = context->getParams().getAs<Persistent<>::SaveParams>();
auto requestSize = params.totalSize(); auto requestSize = params.totalSize();
...@@ -2321,7 +2376,7 @@ private: ...@@ -2321,7 +2376,7 @@ private:
} }
} }
return capability->call(interfaceId, methodId, context->addRef()); return capability->call(interfaceId, methodId, kj::mv(context));
} }
kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) { kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment