rpc-twoparty-test.c++ 11.1 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22 23

#include "rpc-twoparty.h"
#include "test-util.h"
Kenton Varda's avatar
Kenton Varda committed
24
#include <capnp/rpc.capnp.h>
25 26 27
#include <kj/async-unix.h>
#include <kj/debug.h>
#include <kj/thread.h>
28 29 30 31 32 33 34 35 36 37
#include <kj/compat/gtest.h>

// TODO(cleanup): Auto-generate stringification functions for union discriminants.
namespace capnp {
namespace rpc {
inline kj::String KJ_STRINGIFY(Message::Which which) {
  return kj::str(static_cast<uint16_t>(which));
}
}  // namespace rpc
}  // namespace capnp
38 39 40 41 42 43 44

namespace capnp {
namespace _ {
namespace {

class TestRestorer final: public SturdyRefRestorer<test::TestSturdyRefObjectId> {
public:
45 46
  TestRestorer(int& callCount, int& handleCount)
      : callCount(callCount), handleCount(handleCount) {}
47 48 49 50 51 52 53 54 55

  Capability::Client restore(test::TestSturdyRefObjectId::Reader objectId) override {
    switch (objectId.getTag()) {
      case test::TestSturdyRefObjectId::Tag::TEST_INTERFACE:
        return kj::heap<TestInterfaceImpl>(callCount);
      case test::TestSturdyRefObjectId::Tag::TEST_EXTENDS:
        return Capability::Client(newBrokenCap("No TestExtends implemented."));
      case test::TestSturdyRefObjectId::Tag::TEST_PIPELINE:
        return kj::heap<TestPipelineImpl>(callCount);
Kenton Varda's avatar
Kenton Varda committed
56 57 58 59
      case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLEE:
        return kj::heap<TestTailCalleeImpl>(callCount);
      case test::TestSturdyRefObjectId::Tag::TEST_TAIL_CALLER:
        return kj::heap<TestTailCallerImpl>(callCount);
60
      case test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF:
61
        return kj::heap<TestMoreStuffImpl>(callCount, handleCount);
62 63 64 65 66 67
    }
    KJ_UNREACHABLE;
  }

private:
  int& callCount;
68
  int& handleCount;
69 70
};

71 72
kj::AsyncIoProvider::PipeThread runServer(kj::AsyncIoProvider& ioProvider,
                                          int& callCount, int& handleCount) {
73
  return ioProvider.newPipeThread(
74 75
      [&callCount, &handleCount](
       kj::AsyncIoProvider& ioProvider, kj::AsyncIoStream& stream, kj::WaitScope& waitScope) {
76
    TwoPartyVatNetwork network(stream, rpc::twoparty::Side::SERVER);
77
    TestRestorer restorer(callCount, handleCount);
78
    auto server = makeRpcServer(network, restorer);
79
    network.onDisconnect().wait(waitScope);
80
  });
81 82
}

83
Capability::Client getPersistentCap(RpcSystem<rpc::twoparty::VatId>& client,
84 85
                                    rpc::twoparty::Side side,
                                    test::TestSturdyRefObjectId::Tag tag) {
86
  // Create the VatId.
87
  MallocMessageBuilder hostIdMessage(8);
88
  auto hostId = hostIdMessage.initRoot<rpc::twoparty::VatId>();
89
  hostId.setSide(side);
90 91 92

  // Create the SturdyRefObjectId.
  MallocMessageBuilder objectIdMessage(8);
93
  objectIdMessage.initRoot<test::TestSturdyRefObjectId>().setTag(tag);
94 95

  // Connect to the remote capability.
96
  return client.restore(hostId, objectIdMessage.getRoot<AnyPointer>());
97 98 99
}

TEST(TwoPartyNetwork, Basic) {
100
  auto ioContext = kj::setupAsyncIo();
101
  int callCount = 0;
102
  int handleCount = 0;
103

104
  auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
105
  TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
106
  auto rpcClient = makeRpcClient(network);
107

108 109
  // Request the particular capability from the server.
  auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
110 111
      test::TestSturdyRefObjectId::Tag::TEST_INTERFACE).castAs<test::TestInterface>();

112
  // Use the capability.
113 114 115 116 117 118 119 120 121 122 123
  auto request1 = client.fooRequest();
  request1.setI(123);
  request1.setJ(true);
  auto promise1 = request1.send();

  auto request2 = client.bazRequest();
  initTestMessage(request2.initS());
  auto promise2 = request2.send();

  bool barFailed = false;
  auto request3 = client.barRequest();
124
  auto promise3 = request3.send().then(
125 126 127 128 129 130 131 132
      [](Response<test::TestInterface::BarResults>&& response) {
        ADD_FAILURE() << "Expected bar() call to fail.";
      }, [&](kj::Exception&& e) {
        barFailed = true;
      });

  EXPECT_EQ(0, callCount);

133
  auto response1 = promise1.wait(ioContext.waitScope);
134 135 136

  EXPECT_EQ("foo", response1.getX());

137
  auto response2 = promise2.wait(ioContext.waitScope);
138

139
  promise3.wait(ioContext.waitScope);
140 141 142 143 144

  EXPECT_EQ(2, callCount);
  EXPECT_TRUE(barFailed);
}

145
TEST(TwoPartyNetwork, Pipelining) {
146
  auto ioContext = kj::setupAsyncIo();
147
  int callCount = 0;
148
  int handleCount = 0;
149 150
  int reverseCallCount = 0;  // Calls back from server to client.

151
  auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
152
  TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
153
  auto rpcClient = makeRpcClient(network);
154

155
  bool disconnected = false;
156
  kj::Promise<void> disconnectPromise = network.onDisconnect().then([&]() { disconnected = true; });
157

158 159 160 161
  {
    // Request the particular capability from the server.
    auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
        test::TestSturdyRefObjectId::Tag::TEST_PIPELINE).castAs<test::TestPipeline>();
162

163 164 165 166
    {
      // Use the capability.
      auto request = client.getCapRequest();
      request.setN(234);
167
      request.setInCap(kj::heap<TestInterfaceImpl>(reverseCallCount));
168

169
      auto promise = request.send();
170

171 172 173
      auto pipelineRequest = promise.getOutBox().getCap().fooRequest();
      pipelineRequest.setI(321);
      auto pipelinePromise = pipelineRequest.send();
174

175 176 177
      auto pipelineRequest2 = promise.getOutBox().getCap()
          .castAs<test::TestExtends>().graultRequest();
      auto pipelinePromise2 = pipelineRequest2.send();
178

179 180 181 182 183
      promise = nullptr;  // Just to be annoying, drop the original promise.

      EXPECT_EQ(0, callCount);
      EXPECT_EQ(0, reverseCallCount);

184
      auto response = pipelinePromise.wait(ioContext.waitScope);
185 186
      EXPECT_EQ("bar", response.getX());

187
      auto response2 = pipelinePromise2.wait(ioContext.waitScope);
188 189 190 191 192
      checkTestMessage(response2);

      EXPECT_EQ(3, callCount);
      EXPECT_EQ(1, reverseCallCount);
    }
193

194
    EXPECT_FALSE(disconnected);
195

196
    // What if we disconnect?
197
    serverThread.pipe->shutdownWrite();
198

199
    // The other side should also disconnect.
200
    disconnectPromise.wait(ioContext.waitScope);
201 202 203 204 205

    {
      // Use the now-broken capability.
      auto request = client.getCapRequest();
      request.setN(234);
206
      request.setInCap(kj::heap<TestInterfaceImpl>(reverseCallCount));
207 208 209 210 211 212 213 214 215 216 217

      auto promise = request.send();

      auto pipelineRequest = promise.getOutBox().getCap().fooRequest();
      pipelineRequest.setI(321);
      auto pipelinePromise = pipelineRequest.send();

      auto pipelineRequest2 = promise.getOutBox().getCap()
          .castAs<test::TestExtends>().graultRequest();
      auto pipelinePromise2 = pipelineRequest2.send();

218 219
      EXPECT_ANY_THROW(pipelinePromise.wait(ioContext.waitScope));
      EXPECT_ANY_THROW(pipelinePromise2.wait(ioContext.waitScope));
220 221 222 223 224

      EXPECT_EQ(3, callCount);
      EXPECT_EQ(1, reverseCallCount);
    }
  }
225 226
}

227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
TEST(TwoPartyNetwork, Release) {
  auto ioContext = kj::setupAsyncIo();
  int callCount = 0;
  int handleCount = 0;

  auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
  TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);
  auto rpcClient = makeRpcClient(network);

  // Request the particular capability from the server.
  auto client = getPersistentCap(rpcClient, rpc::twoparty::Side::SERVER,
      test::TestSturdyRefObjectId::Tag::TEST_MORE_STUFF).castAs<test::TestMoreStuff>();

  auto handle1 = client.getHandleRequest().send().wait(ioContext.waitScope).getHandle();
  auto promise = client.getHandleRequest().send();
  auto handle2 = promise.wait(ioContext.waitScope).getHandle();

  EXPECT_EQ(2, handleCount);

  handle1 = nullptr;

  // There once was a bug where the last outgoing message (and any capabilities attached) would
  // not get cleaned up (until a new message was sent). This appeared to be a bug in Release,
  // becaues if a client received a message and then released a capability from it but then did
  // not make any further calls, then the capability would not be released because the message
  // introducing it remained the last server -> client message (because a "Release" message has
  // no reply). Here we are explicitly trying to catch this bug. This proves tricky, because when
  // we drop a reference on the client side, there's no particular way to wait for the release
  // message to reach the server except to make a subsequent call and wait for the return -- but
256
  // that would mask the bug. So, we wait spin waiting for handleCount to change.
257

258 259 260 261 262 263
  uint maxSpins = 1000;

  while (handleCount > 1) {
    ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
    KJ_ASSERT(--maxSpins > 0);
  }
264 265 266 267 268 269 270 271 272
  EXPECT_EQ(1, handleCount);

  handle2 = nullptr;

  ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
  EXPECT_EQ(1, handleCount);

  promise = nullptr;

273 274 275 276
  while (handleCount > 0) {
    ioContext.provider->getTimer().afterDelay(10 * kj::MILLISECONDS).wait(ioContext.waitScope);
    KJ_ASSERT(--maxSpins > 0);
  }
277 278 279
  EXPECT_EQ(0, handleCount);
}

Kenton Varda's avatar
Kenton Varda committed
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
TEST(TwoPartyNetwork, Abort) {
  // Verify that aborts are received.

  auto ioContext = kj::setupAsyncIo();
  int callCount = 0;
  int handleCount = 0;

  auto serverThread = runServer(*ioContext.provider, callCount, handleCount);
  TwoPartyVatNetwork network(*serverThread.pipe, rpc::twoparty::Side::CLIENT);

  MallocMessageBuilder refMessage(128);
  auto hostId = refMessage.initRoot<rpc::twoparty::VatId>();
  hostId.setSide(rpc::twoparty::Side::SERVER);

  auto conn = KJ_ASSERT_NONNULL(network.connect(hostId));

  {
    // Send an invalid message (Return to non-existent question).
    auto msg = conn->newOutgoingMessage(128);
    auto body = msg->getBody().initAs<rpc::Message>().initReturn();
    body.setAnswerId(1234);
    body.setCanceled();
    msg->send();
  }

  auto reply = KJ_ASSERT_NONNULL(conn->receiveIncomingMessage().wait(ioContext.waitScope));
  EXPECT_EQ(rpc::Message::ABORT, reply->getBody().getAs<rpc::Message>().which());

  EXPECT_TRUE(conn->receiveIncomingMessage().wait(ioContext.waitScope) == nullptr);
}

311 312 313
}  // namespace
}  // namespace _
}  // namespace capnp