rpc.h 17.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
//    list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
//    this list of conditions and the following disclaimer in the documentation
//    and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#ifndef CAPNP_RPC_H_
#define CAPNP_RPC_H_

#include "capability.h"

namespace capnp {

// =======================================================================================
// ***************************************************************************************
// This section contains various internal stuff that needs to be declared upfront.
34
// Scroll down to `class VatNetwork` or `class RpcSystem` for the public interfaces.
35 36 37
// ***************************************************************************************
// =======================================================================================

38 39
// TODO(cleanup):  Put these in rpc-internal.h?

40 41 42
class OutgoingRpcMessage;
class IncomingRpcMessage;

43
template <typename SturdyRefHostId>
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
class RpcSystem;

namespace _ {  // private

class VatNetworkBase {
  // Non-template version of VatNetwork.  Ignore this class; see VatNetwork, below.

public:
  class Connection;

  struct ConnectionAndProvisionId {
    kj::Own<Connection> connection;
    kj::Own<OutgoingRpcMessage> firstMessage;
    Orphan<ObjectPointer> provisionId;
  };

  class Connection {
  public:
    virtual kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) const = 0;
63
    virtual kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() = 0;
64 65 66 67 68 69 70 71
    virtual void baseIntroduceTo(Connection& recipient,
        ObjectPointer::Builder sendToRecipient,
        ObjectPointer::Builder sendToTarget) = 0;
    virtual ConnectionAndProvisionId baseConnectToIntroduced(
        ObjectPointer::Reader capId) = 0;
    virtual kj::Own<Connection> baseAcceptIntroducedConnection(
        ObjectPointer::Reader recipientId) = 0;
  };
72
  virtual kj::Maybe<kj::Own<Connection>> baseConnectToRefHost(_::StructReader hostId) = 0;
73 74 75 76 77 78 79 80 81 82
  virtual kj::Promise<kj::Own<Connection>> baseAcceptConnectionAsRefHost() = 0;
};

class SturdyRefRestorerBase {
public:
  virtual Capability::Client baseRestore(ObjectPointer::Reader ref) = 0;
};

class RpcSystemBase {
public:
83
  RpcSystemBase(VatNetworkBase& network, kj::Maybe<SturdyRefRestorerBase&> restorer,
84
                const kj::EventLoop& eventLoop);
85
  RpcSystemBase(RpcSystemBase&& other) noexcept;
86 87 88 89 90 91
  ~RpcSystemBase() noexcept(false);

private:
  class Impl;
  kj::Own<Impl> impl;

92
  Capability::Client baseRestore(_::StructReader hostId, ObjectPointer::Reader objectId);
93 94 95
  // TODO(someday):  Maybe define a public API called `TypelessStruct` so we don't have to rely
  // on `_::StructReader` here?

96
  template <typename>
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
  friend class capnp::RpcSystem;
};

}  // namespace _ (private)

// =======================================================================================
// ***************************************************************************************
// User-relevant interfaces start here.
// ***************************************************************************************
// =======================================================================================

class OutgoingRpcMessage {
public:
  virtual ObjectPointer::Builder getBody() = 0;
  // Get the message body, which the caller may fill in any way it wants.  (The standard RPC
  // implementation initializes it as a Message as defined in rpc.capnp.)

  virtual void send() = 0;
  // Send the message, or at least put it in a queue to be sent later.  Note that the builder
  // returned by `getBody()` remains valid at least until the `OutgoingRpcMessage` is destroyed.
};

class IncomingRpcMessage {
public:
  virtual ObjectPointer::Reader getBody() = 0;
  // Get the message body, to be interpreted by the caller.  (The standard RPC implementation
  // interprets it as a Message as defined in rpc.capnp.)
};

126
template <typename SturdyRefHostId, typename ProvisionId, typename RecipientId,
127
          typename ThirdPartyCapId, typename JoinResult>
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
class VatNetwork: public _::VatNetworkBase {
public:
  class Connection;

  struct ConnectionAndProvisionId {
    // Result of connecting to a vat introduced by another vat.

    kj::Own<Connection> connection;
    // Connection to the new vat.

    kj::Own<OutgoingRpcMessage> firstMessage;
    // An already-allocated `OutgoingRpcMessage` associated with `connection`.  The RPC system will
    // construct this as an `Accept` message and send it.

    Orphan<ProvisionId> provisionId;
    // A `ProvisionId` already allocated inside `firstMessage`, which the RPC system will use to
    // build the `Accept` message.
  };

  class Connection: public _::VatNetworkBase::Connection {
    // A two-way RPC connection.
    //
    // This object may represent a connection that doesn't exist yet, but is expected to exist
    // in the future.  In this case, sent messages will automatically be queued and sent once the
    // connection is ready, so that the caller doesn't need to know the difference.

  public:
    // Level 0 features ----------------------------------------------

    virtual kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) const = 0;
    // Allocate a new message to be sent on this connection.
    //
    // If `firstSegmentWordSize` is non-zero, it should be treated as a hint suggesting how large
    // to make the first segment.  This is entirely a hint and the connection may adjust it up or
    // down.  If it is zero, the connection should choose the size itself.
    //
    // Notice that this may be called from any thread.

166 167 168
    virtual kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() = 0;
    // Wait for a message to be received and return it.  If the read stream cleanly terminates,
    // return null.  If any other problem occurs, throw an exception.
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193

    // Level 3 features ----------------------------------------------

    virtual void introduceTo(Connection& recipient,
        typename ThirdPartyCapId::Builder sendToRecipient,
        typename RecipientId::Builder sendToTarget) = 0;
    // Call before starting a three-way introduction, assuming a `Provide` message is to be sent on
    // this connection and a `ThirdPartyCapId` is to be sent to `recipient`.  `sendToRecipient` and
    // `sendToTarget` are filled in with the identifiers that need to be sent to the recipient
    // (in a `CapDescriptor`) and on this connection (in the `Provide` message), respectively.
    //
    // `recipient` must be from the same `VatNetwork` as this connection.

    virtual ConnectionAndProvisionId connectToIntroduced(
        typename ThirdPartyCapId::Reader capId) = 0;
    // Given a ThirdPartyCapId received over this connection, connect to the third party.  The
    // caller should then send an `Accept` message over the new connection.

    virtual kj::Own<Connection> acceptIntroducedConnection(
        typename RecipientId::Reader recipientId) = 0;
    // Given a `RecipientId` received in a `Provide` message on this `Connection`, wait for the
    // recipient to connect, and return the connection formed.  Usually, the first message received
    // on the new connection will be an `Accept` message.

  private:
194
    void baseIntroduceTo(VatNetworkBase::Connection& recipient,
195 196 197 198 199 200 201 202 203 204
        ObjectPointer::Builder sendToRecipient,
        ObjectPointer::Builder sendToTarget) override final;
    _::VatNetworkBase::ConnectionAndProvisionId baseConnectToIntroduced(
        ObjectPointer::Reader capId) override final;
    kj::Own<_::VatNetworkBase::Connection> baseAcceptIntroducedConnection(
        ObjectPointer::Reader recipientId) override final;
  };

  // Level 0 features ------------------------------------------------

205 206 207 208 209 210 211 212 213 214 215
  virtual kj::Maybe<kj::Own<Connection>> connectToRefHost(
      typename SturdyRefHostId::Reader hostId) = 0;
  // Connect to a SturdyRef host.  Note that this method immediately returns a `Connection`, even
  // if the network connection has not yet been established.  Messages can be queued to this
  // connection and will be delivered once it is open.  The caller must attempt to read from the
  // connection to verify that it actually succeeded; the read will fail if the connection
  // couldn't be opened.  Some network implementations may actually start sending messages before
  // hearing back from the server at all, to avoid a round trip.
  //
  // Once connected, the caller should start by sending a `Restore` message for the associated
  // SturdyRefObjectId.
216
  //
217
  // Returns nullptr if `hostId` refers to the local host.
218 219 220

  virtual kj::Promise<kj::Own<Connection>> acceptConnectionAsRefHost() = 0;
  // Wait for the next incoming connection and return it.  Only connections formed by
221
  // connectToRefHost() are returned by this method.
222 223 224 225 226 227 228
  //
  // Once connected, the first received message will usually be a `Restore`.

  // Level 4 features ------------------------------------------------
  // TODO(someday)

private:
229 230
  kj::Maybe<kj::Own<_::VatNetworkBase::Connection>>
      baseConnectToRefHost(_::StructReader hostId) override final;
231 232 233 234
  kj::Promise<kj::Own<_::VatNetworkBase::Connection>>
      baseAcceptConnectionAsRefHost() override final;
};

235
template <typename SturdyRefObjectId>
236 237 238 239 240
class SturdyRefRestorer: public _::SturdyRefRestorerBase {
  // Applications that can restore SturdyRefs must implement this interface and provide it to the
  // RpcSystem.

public:
241 242
  virtual Capability::Client restore(typename SturdyRefObjectId::Reader ref) = 0;
  // Restore the given object, returning a capability representing it.  This is guaranteed only
243
  // to be called on the RpcSystem's EventLoop's thread.
244 245 246 247 248

private:
  Capability::Client baseRestore(ObjectPointer::Reader ref) override final;
};

249
template <typename SturdyRefHostId>
250 251 252
class RpcSystem: public _::RpcSystemBase {
public:
  template <typename ProvisionId, typename RecipientId,
253
            typename ThirdPartyCapId, typename JoinResult,
254
            typename LocalSturdyRefObjectId>
255
  RpcSystem(
256
      VatNetwork<SturdyRefHostId, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>& network,
257 258
      kj::Maybe<SturdyRefRestorer<LocalSturdyRefObjectId>&> restorer,
      const kj::EventLoop& eventLoop);
259
  RpcSystem(RpcSystem&& other) = default;
260

261
  Capability::Client restore(typename SturdyRefHostId::Reader hostId,
262
                             ObjectPointer::Reader objectId);
263 264 265
  // Restore the given SturdyRef from the network and return the capability representing it.
};

266
template <typename SturdyRefHostId, typename LocalSturdyRefObjectId,
267
          typename ProvisionId, typename RecipientId, typename ThirdPartyCapId, typename JoinResult>
268
RpcSystem<SturdyRefHostId> makeRpcServer(
269
    VatNetwork<SturdyRefHostId, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>& network,
270
    SturdyRefRestorer<LocalSturdyRefObjectId>& restorer,
271 272 273 274 275 276 277
    const kj::EventLoop& eventLoop = kj::EventLoop::current());
// Make an RPC server.  Typical usage (e.g. in a main() function):
//
//    MyEventLoop eventLoop;
//    MyNetwork network(eventLoop);
//    MyRestorer restorer;
//    auto server = makeRpcServer(network, restorer, eventLoop);
278
//    eventLoop.wait(...);  // (e.g. wait on a promise that never returns)
279

280
template <typename SturdyRefHostId, typename ProvisionId,
281
          typename RecipientId, typename ThirdPartyCapId, typename JoinResult>
282
RpcSystem<SturdyRefHostId> makeRpcClient(
283
    VatNetwork<SturdyRefHostId, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>& network,
284
    const kj::EventLoop& eventLoop = kj::EventLoop::current());
285
// Make an RPC client.  Typical usage (e.g. in a main() function):
286 287 288 289 290
//
//    MyEventLoop eventLoop;
//    MyNetwork network(eventLoop);
//    MyRestorer restorer;
//    auto client = makeRpcClient(network, restorer);
291
//    MyCapability::Client cap = client.restore(hostId, objId).castAs<MyCapability>();
292 293 294 295 296 297 298 299 300 301
//    auto response = eventLoop.wait(cap.fooRequest().send());
//    handleMyResponse(response);

// =======================================================================================
// ***************************************************************************************
// Inline implementation details start here
// ***************************************************************************************
// =======================================================================================

template <typename SturdyRef, typename ProvisionId, typename RecipientId,
302 303
          typename ThirdPartyCapId, typename JoinResult>
void VatNetwork<SturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>::
304
    Connection::baseIntroduceTo(VatNetworkBase::Connection& recipient,
305 306
                                ObjectPointer::Builder sendToRecipient,
                                ObjectPointer::Builder sendToTarget) {
307
  introduceTo(kj::downcast<Connection>(recipient), sendToRecipient.initAs<ThirdPartyCapId>(),
308 309 310 311
              sendToTarget.initAs<RecipientId>());
}

template <typename SturdyRef, typename ProvisionId, typename RecipientId,
312
          typename ThirdPartyCapId, typename JoinResult>
313
_::VatNetworkBase::ConnectionAndProvisionId
314
    VatNetwork<SturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>::
315 316 317 318 319 320
    Connection::baseConnectToIntroduced(ObjectPointer::Reader capId) {
  auto result = connectToIntroduced(capId.getAs<ThirdPartyCapId>());
  return { kj::mv(result.connection), kj::mv(result.firstMessage), kj::mv(result.provisionId) };
}

template <typename SturdyRef, typename ProvisionId, typename RecipientId,
321
          typename ThirdPartyCapId, typename JoinResult>
322
kj::Own<_::VatNetworkBase::Connection>
323
    VatNetwork<SturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>::
324 325 326 327 328
    Connection::baseAcceptIntroducedConnection(ObjectPointer::Reader recipientId) {
  return acceptIntroducedConnection(recipientId.getAs<RecipientId>());
}

template <typename SturdyRef, typename ProvisionId, typename RecipientId,
329
          typename ThirdPartyCapId, typename JoinResult>
330
kj::Maybe<kj::Own<_::VatNetworkBase::Connection>>
331
    VatNetwork<SturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>::
332 333 334 335 336
    baseConnectToRefHost(_::StructReader ref) {
  auto maybe = connectToRefHost(typename SturdyRef::Reader(ref));
  return maybe.map([](kj::Own<Connection>& conn) -> kj::Own<_::VatNetworkBase::Connection> {
    return kj::mv(conn);
  });
337 338 339
}

template <typename SturdyRef, typename ProvisionId, typename RecipientId,
340
          typename ThirdPartyCapId, typename JoinResult>
341
kj::Promise<kj::Own<_::VatNetworkBase::Connection>>
342
    VatNetwork<SturdyRef, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>::
343 344 345 346 347 348 349 350 351 352 353 354
    baseAcceptConnectionAsRefHost() {
  return acceptConnectionAsRefHost().thenInAnyThread(
      [](kj::Own<Connection>&& connection) -> kj::Own<_::VatNetworkBase::Connection> {
    return kj::mv(connection);
  });
}

template <typename SturdyRef>
Capability::Client SturdyRefRestorer<SturdyRef>::baseRestore(ObjectPointer::Reader ref) {
  return restore(ref.getAs<SturdyRef>());
}

355
template <typename SturdyRefHostId>
356
template <typename ProvisionId, typename RecipientId,
357
          typename ThirdPartyCapId, typename JoinResult,
358 359
          typename LocalSturdyRefObjectId>
RpcSystem<SturdyRefHostId>::RpcSystem(
360
      VatNetwork<SturdyRefHostId, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>& network,
361 362
      kj::Maybe<SturdyRefRestorer<LocalSturdyRefObjectId>&> restorer,
      const kj::EventLoop& eventLoop)
363 364
    : _::RpcSystemBase(network, restorer, eventLoop) {}

365
template <typename SturdyRefHostId>
366
Capability::Client RpcSystem<SturdyRefHostId>::restore(
367
    typename SturdyRefHostId::Reader hostId, ObjectPointer::Reader objectId) {
368
  return baseRestore(_::PointerHelpers<SturdyRefHostId>::getInternalReader(hostId), objectId);
369 370
}

371
template <typename SturdyRefHostId, typename LocalSturdyRefObjectId,
372
          typename ProvisionId, typename RecipientId, typename ThirdPartyCapId, typename JoinResult>
373
RpcSystem<SturdyRefHostId> makeRpcServer(
374
    VatNetwork<SturdyRefHostId, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>& network,
375 376 377
    SturdyRefRestorer<LocalSturdyRefObjectId>& restorer, const kj::EventLoop& eventLoop) {
  return RpcSystem<SturdyRefHostId>(network,
      kj::Maybe<SturdyRefRestorer<LocalSturdyRefObjectId>&>(restorer), eventLoop);
378 379
}

380
template <typename SturdyRefHostId, typename ProvisionId,
381
          typename RecipientId, typename ThirdPartyCapId, typename JoinResult>
382
RpcSystem<SturdyRefHostId> makeRpcClient(
383
    VatNetwork<SturdyRefHostId, ProvisionId, RecipientId, ThirdPartyCapId, JoinResult>& network,
384
    const kj::EventLoop& eventLoop) {
385 386
  return RpcSystem<SturdyRefHostId>(network,
      kj::Maybe<SturdyRefRestorer<ObjectPointer>&>(nullptr), eventLoop);
387 388
}

389 390 391
}  // namespace capnp

#endif  // CAPNP_RPC_H_