rpc-twoparty.h 6.11 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22 23 24

#ifndef CAPNP_RPC_TWOPARTY_H_
#define CAPNP_RPC_TWOPARTY_H_

25
#if defined(__GNUC__) && !defined(CAPNP_HEADER_WARNINGS)
26 27 28
#pragma GCC system_header
#endif

29 30 31 32 33 34 35
#include "rpc.h"
#include "message.h"
#include <kj/async-io.h>
#include <capnp/rpc-twoparty.capnp.h>

namespace capnp {

36 37 38 39 40 41 42
namespace rpc {
  namespace twoparty {
    typedef VatId SturdyRefHostId;  // For backwards-compatibility with version 0.4.
  }
}

typedef VatNetwork<rpc::twoparty::VatId, rpc::twoparty::ProvisionId,
43
    rpc::twoparty::RecipientId, rpc::twoparty::ThirdPartyCapId, rpc::twoparty::JoinResult>
44 45 46 47
    TwoPartyVatNetworkBase;

class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
                          private TwoPartyVatNetworkBase::Connection {
48 49 50 51 52 53
  // A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte
  // stream.  This is used to implement the common case of a client/server network.
  //
  // See `ez-rpc.h` for a simple interface for setting up two-party clients and servers.
  // Use `TwoPartyVatNetwork` only if you need the advanced features.

54
public:
55 56
  TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side,
                     ReaderOptions receiveOptions = ReaderOptions());
57
  KJ_DISALLOW_COPY(TwoPartyVatNetwork);
58

59 60 61
  kj::Promise<void> onDisconnect() { return disconnectPromise.addBranch(); }
  // Returns a promise that resolves when the peer disconnects.

62 63
  rpc::twoparty::Side getSide() { return side; }

64 65
  // implements VatNetwork -----------------------------------------------------

66 67 68
  kj::Maybe<kj::Own<TwoPartyVatNetworkBase::Connection>> connect(
      rpc::twoparty::VatId::Reader ref) override;
  kj::Promise<kj::Own<TwoPartyVatNetworkBase::Connection>> accept() override;
69 70 71 72 73 74 75

private:
  class OutgoingMessageImpl;
  class IncomingMessageImpl;

  kj::AsyncIoStream& stream;
  rpc::twoparty::Side side;
76
  MallocMessageBuilder peerVatId;
77 78 79
  ReaderOptions receiveOptions;
  bool accepted = false;

Kenton Varda's avatar
Kenton Varda committed
80
  kj::Maybe<kj::Promise<void>> previousWrite;
81
  // Resolves when the previous write completes.  This effectively serves as the write queue.
Kenton Varda's avatar
Kenton Varda committed
82
  // Becomes null when shutdown() is called.
83 84 85 86 87

  kj::Own<kj::PromiseFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>> acceptFulfiller;
  // Fulfiller for the promise returned by acceptConnectionAsRefHost() on the client side, or the
  // second call on the server side.  Never fulfilled, because there is only one connection.

88 89 90
  kj::ForkedPromise<void> disconnectPromise = nullptr;

  class FulfillerDisposer: public kj::Disposer {
91
    // Hack:  TwoPartyVatNetwork is both a VatNetwork and a VatNetwork::Connection.  When the RPC
92
    //   system detects (or initiates) a disconnection, it drops its reference to the Connection.
David Renshaw's avatar
David Renshaw committed
93 94 95
    //   When all references have been dropped, then we want disconnectPromise to be fulfilled.
    //   So we hand out Own<Connection>s with this disposer attached, so that we can detect when
    //   they are dropped.
96

97
  public:
Kenton Varda's avatar
Kenton Varda committed
98
    mutable kj::Own<kj::PromiseFulfiller<void>> fulfiller;
99
    mutable uint refcount = 0;
100

101
    void disposeImpl(void* pointer) const override;
102
  };
103
  FulfillerDisposer disconnectFulfiller;
104

105
  kj::Own<TwoPartyVatNetworkBase::Connection> asConnection();
David Renshaw's avatar
David Renshaw committed
106
  // Returns a pointer to this with the disposer set to disconnectFulfiller.
107

108 109
  // implements Connection -----------------------------------------------------

110
  rpc::twoparty::VatId::Reader getPeerVatId() override;
111
  kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override;
112
  kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() override;
Kenton Varda's avatar
Kenton Varda committed
113
  kj::Promise<void> shutdown() override;
114 115
};

116 117 118 119 120 121 122
class TwoPartyServer: private kj::TaskSet::ErrorHandler {
  // Convenience class which implements a simple server which accepts connections on a listener
  // socket and serices them as two-party connections.

public:
  explicit TwoPartyServer(Capability::Client bootstrapInterface);

123 124 125
  void accept(kj::Own<kj::AsyncIoStream>&& connection);
  // Accepts the connection for servicing.

126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
  kj::Promise<void> listen(kj::ConnectionReceiver& listener);
  // Listens for connections on the given listener. The returned promise never resolves unless an
  // exception is thrown while trying to accept. You may discard the returned promise to cancel
  // listening.

private:
  Capability::Client bootstrapInterface;
  kj::TaskSet tasks;

  struct AcceptedConnection;

  void taskFailed(kj::Exception&& exception) override;
};

class TwoPartyClient {
  // Convenience class which implements a simple client.

public:
  explicit TwoPartyClient(kj::AsyncIoStream& connection);
145 146
  TwoPartyClient(kj::AsyncIoStream& connection, Capability::Client bootstrapInterface,
                 rpc::twoparty::Side side = rpc::twoparty::Side::CLIENT);
147 148 149 150

  Capability::Client bootstrap();
  // Get the server's bootstrap interface.

151 152
  inline kj::Promise<void> onDisconnect() { return network.onDisconnect(); }

153 154 155 156 157
private:
  TwoPartyVatNetwork network;
  RpcSystem<rpc::twoparty::VatId> rpcSystem;
};

158 159 160
}  // namespace capnp

#endif  // CAPNP_RPC_TWOPARTY_H_