rpc-twoparty.h 4.87 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
//    list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
//    this list of conditions and the following disclaimer in the documentation
//    and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#ifndef CAPNP_RPC_TWOPARTY_H_
#define CAPNP_RPC_TWOPARTY_H_

#include "rpc.h"
#include "message.h"
#include <kj/async-io.h>
#include <capnp/rpc-twoparty.capnp.h>

namespace capnp {

typedef VatNetwork<rpc::twoparty::SturdyRefHostId, rpc::twoparty::ProvisionId,
35
    rpc::twoparty::RecipientId, rpc::twoparty::ThirdPartyCapId, rpc::twoparty::JoinResult>
36 37 38 39
    TwoPartyVatNetworkBase;

class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
                          private TwoPartyVatNetworkBase::Connection {
40 41 42 43 44 45
  // A `VatNetwork` that consists of exactly two parties communicating over an arbitrary byte
  // stream.  This is used to implement the common case of a client/server network.
  //
  // See `ez-rpc.h` for a simple interface for setting up two-party clients and servers.
  // Use `TwoPartyVatNetwork` only if you need the advanced features.

46
public:
47 48
  TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side,
                     ReaderOptions receiveOptions = ReaderOptions());
49

50 51 52 53 54 55 56 57 58
  kj::Promise<void> onDisconnect() { return disconnectPromise.addBranch(); }
  // Returns a promise that resolves when the peer disconnects.

  kj::Promise<void> onDrained() { return drainedPromise.addBranch(); }
  // Returns a promise that resolves once the peer has disconnected *and* all local objects
  // referencing this connection have been destroyed.  A caller might use this to decide when it
  // is safe to destroy the RpcSystem, if it isn't able to reliably destroy all objects using it
  // directly.

59 60
  // implements VatNetwork -----------------------------------------------------

61
  kj::Maybe<kj::Own<TwoPartyVatNetworkBase::Connection>> connectToRefHost(
62
      rpc::twoparty::SturdyRefHostId::Reader ref) override;
63
  kj::Promise<kj::Own<TwoPartyVatNetworkBase::Connection>> acceptConnectionAsRefHost() override;
64 65 66 67 68 69 70 71 72 73

private:
  class OutgoingMessageImpl;
  class IncomingMessageImpl;

  kj::AsyncIoStream& stream;
  rpc::twoparty::Side side;
  ReaderOptions receiveOptions;
  bool accepted = false;

Kenton Varda's avatar
Kenton Varda committed
74
  kj::Promise<void> previousWrite;
75 76 77 78 79 80
  // Resolves when the previous write completes.  This effectively serves as the write queue.

  kj::Own<kj::PromiseFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>> acceptFulfiller;
  // Fulfiller for the promise returned by acceptConnectionAsRefHost() on the client side, or the
  // second call on the server side.  Never fulfilled, because there is only one connection.

81
  kj::ForkedPromise<void> disconnectPromise = nullptr;
Kenton Varda's avatar
Kenton Varda committed
82
  kj::Own<kj::PromiseFulfiller<void>> disconnectFulfiller;
83 84 85 86
  kj::ForkedPromise<void> drainedPromise = nullptr;

  class FulfillerDisposer: public kj::Disposer {
  public:
Kenton Varda's avatar
Kenton Varda committed
87
    mutable kj::Own<kj::PromiseFulfiller<void>> fulfiller;
88

Kenton Varda's avatar
Kenton Varda committed
89
    void disposeImpl(void* pointer) const override { fulfiller->fulfill(); }
90 91 92
  };
  FulfillerDisposer drainedFulfiller;

93 94
  // implements Connection -----------------------------------------------------

95
  kj::Own<OutgoingRpcMessage> newOutgoingMessage(uint firstSegmentWordSize) override;
96
  kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> receiveIncomingMessage() override;
97
  void introduceTo(TwoPartyVatNetworkBase::Connection& recipient,
98 99 100 101
      rpc::twoparty::ThirdPartyCapId::Builder sendToRecipient,
      rpc::twoparty::RecipientId::Builder sendToTarget) override;
  ConnectionAndProvisionId connectToIntroduced(
      rpc::twoparty::ThirdPartyCapId::Reader capId) override;
102
  kj::Own<TwoPartyVatNetworkBase::Connection> acceptIntroducedConnection(
103 104 105 106 107 108
      rpc::twoparty::RecipientId::Reader recipientId) override;
};

}  // namespace capnp

#endif  // CAPNP_RPC_TWOPARTY_H_