rpc-twoparty.c++ 7.51 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22 23 24 25 26 27

#include "rpc-twoparty.h"
#include "serialize-async.h"
#include <kj/debug.h>

namespace capnp {

28 29
TwoPartyVatNetwork::TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty::Side side,
                                       ReaderOptions receiveOptions)
30 31 32 33 34 35
    : stream(stream), side(side), peerVatId(4),
      receiveOptions(receiveOptions), previousWrite(kj::READY_NOW) {
  peerVatId.initRoot<rpc::twoparty::VatId>().setSide(
      side == rpc::twoparty::Side::CLIENT ? rpc::twoparty::Side::SERVER
                                          : rpc::twoparty::Side::CLIENT);

36 37 38
  auto paf = kj::newPromiseAndFulfiller<void>();
  disconnectPromise = paf.promise.fork();
  disconnectFulfiller.fulfiller = kj::mv(paf.fulfiller);
39
}
40

41 42 43 44 45 46 47
void TwoPartyVatNetwork::FulfillerDisposer::disposeImpl(void* pointer) const {
  if (--refcount == 0) {
    fulfiller->fulfill();
  }
}

kj::Own<TwoPartyVatNetworkBase::Connection> TwoPartyVatNetwork::asConnection() {
48 49
  ++disconnectFulfiller.refcount;
  return kj::Own<TwoPartyVatNetworkBase::Connection>(this, disconnectFulfiller);
50 51
}

52 53
kj::Maybe<kj::Own<TwoPartyVatNetworkBase::Connection>> TwoPartyVatNetwork::connect(
    rpc::twoparty::VatId::Reader ref) {
54 55 56
  if (ref.getSide() == side) {
    return nullptr;
  } else {
57
    return asConnection();
58 59 60
  }
}

61
kj::Promise<kj::Own<TwoPartyVatNetworkBase::Connection>> TwoPartyVatNetwork::accept() {
62 63
  if (side == rpc::twoparty::Side::SERVER && !accepted) {
    accepted = true;
64
    return asConnection();
65 66 67 68 69 70 71 72 73 74 75
  } else {
    // Create a promise that will never be fulfilled.
    auto paf = kj::newPromiseAndFulfiller<kj::Own<TwoPartyVatNetworkBase::Connection>>();
    acceptFulfiller = kj::mv(paf.fulfiller);
    return kj::mv(paf.promise);
  }
}

class TwoPartyVatNetwork::OutgoingMessageImpl final
    : public OutgoingRpcMessage, public kj::Refcounted {
public:
Kenton Varda's avatar
Kenton Varda committed
76
  OutgoingMessageImpl(TwoPartyVatNetwork& network, uint firstSegmentWordSize)
77 78 79
      : network(network),
        message(firstSegmentWordSize == 0 ? SUGGESTED_FIRST_SEGMENT_WORDS : firstSegmentWordSize) {}

80 81
  AnyPointer::Builder getBody() override {
    return message.getRoot<AnyPointer>();
82 83 84
  }

  void send() override {
Kenton Varda's avatar
Kenton Varda committed
85 86
    network.previousWrite = KJ_ASSERT_NONNULL(network.previousWrite, "already shut down")
        .then([&]() {
87 88 89
      // Note that if the write fails, all further writes will be skipped due to the exception.
      // We never actually handle this exception because we assume the read end will fail as well
      // and it's cleaner to handle the failure there.
90 91 92 93 94 95
      return writeMessage(network.stream, message);
    }).attach(kj::addRef(*this))
      // Note that it's important that the eagerlyEvaluate() come *after* the attach() because
      // otherwise the message (and any capabilities in it) will not be released until a new
      // message is written! (Kenton once spent all afternoon tracking this down...)
      .eagerlyEvaluate(nullptr);
96 97 98
  }

private:
Kenton Varda's avatar
Kenton Varda committed
99
  TwoPartyVatNetwork& network;
100 101 102 103 104 105 106
  MallocMessageBuilder message;
};

class TwoPartyVatNetwork::IncomingMessageImpl final: public IncomingRpcMessage {
public:
  IncomingMessageImpl(kj::Own<MessageReader> message): message(kj::mv(message)) {}

107 108
  AnyPointer::Reader getBody() override {
    return message->getRoot<AnyPointer>();
109 110 111 112 113 114
  }

private:
  kj::Own<MessageReader> message;
};

115 116 117 118
rpc::twoparty::VatId::Reader TwoPartyVatNetwork::getPeerVatId() {
  return peerVatId.getRoot<rpc::twoparty::VatId>();
}

119
kj::Own<OutgoingRpcMessage> TwoPartyVatNetwork::newOutgoingMessage(uint firstSegmentWordSize) {
120 121 122
  return kj::refcounted<OutgoingMessageImpl>(*this, firstSegmentWordSize);
}

123
kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> TwoPartyVatNetwork::receiveIncomingMessage() {
124
  return kj::evalLater([&]() {
125 126 127 128 129 130 131 132
    return tryReadMessage(stream, receiveOptions)
        .then([&](kj::Maybe<kj::Own<MessageReader>>&& message)
              -> kj::Maybe<kj::Own<IncomingRpcMessage>> {
      KJ_IF_MAYBE(m, message) {
        return kj::Own<IncomingRpcMessage>(kj::heap<IncomingMessageImpl>(kj::mv(*m)));
      } else {
        return nullptr;
      }
133 134 135 136
    });
  });
}

Kenton Varda's avatar
Kenton Varda committed
137 138 139 140 141 142 143 144
kj::Promise<void> TwoPartyVatNetwork::shutdown() {
  kj::Promise<void> result = KJ_ASSERT_NONNULL(previousWrite, "already shut down").then([this]() {
    stream.shutdownWrite();
  });
  previousWrite = nullptr;
  return kj::mv(result);
}

145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
// =======================================================================================

TwoPartyServer::TwoPartyServer(Capability::Client bootstrapInterface)
    : bootstrapInterface(kj::mv(bootstrapInterface)), tasks(*this) {}

struct TwoPartyServer::AcceptedConnection {
  kj::Own<kj::AsyncIoStream> connection;
  TwoPartyVatNetwork network;
  RpcSystem<rpc::twoparty::VatId> rpcSystem;

  explicit AcceptedConnection(Capability::Client bootstrapInterface,
                              kj::Own<kj::AsyncIoStream>&& connectionParam)
      : connection(kj::mv(connectionParam)),
        network(*connection, rpc::twoparty::Side::SERVER),
        rpcSystem(makeRpcServer(network, kj::mv(bootstrapInterface))) {}
};

Kenton Varda's avatar
Kenton Varda committed
162
void TwoPartyServer::accept(kj::Own<kj::AsyncIoStream>&& connection) {
163 164 165 166 167 168 169
  auto connectionState = kj::heap<AcceptedConnection>(bootstrapInterface, kj::mv(connection));

  // Run the connection until disconnect.
  auto promise = connectionState->network.onDisconnect();
  tasks.add(promise.attach(kj::mv(connectionState)));
}

170 171 172
kj::Promise<void> TwoPartyServer::listen(kj::ConnectionReceiver& listener) {
  return listener.accept()
      .then([this,&listener](kj::Own<kj::AsyncIoStream>&& connection) mutable {
173
    accept(kj::mv(connection));
174 175 176 177 178 179 180 181 182 183 184 185
    return listen(listener);
  });
}

void TwoPartyServer::taskFailed(kj::Exception&& exception) {
  KJ_LOG(ERROR, exception);
}

TwoPartyClient::TwoPartyClient(kj::AsyncIoStream& connection)
    : network(connection, rpc::twoparty::Side::CLIENT),
      rpcSystem(makeRpcClient(network)) {}

186 187

TwoPartyClient::TwoPartyClient(kj::AsyncIoStream& connection,
188 189 190
                               Capability::Client bootstrapInterface,
                               rpc::twoparty::Side side)
    : network(connection, side),
191 192
      rpcSystem(network, bootstrapInterface) {}

193 194 195
Capability::Client TwoPartyClient::bootstrap() {
  MallocMessageBuilder message(4);
  auto vatId = message.getRoot<rpc::twoparty::VatId>();
196 197 198
  vatId.setSide(network.getSide() == rpc::twoparty::Side::CLIENT
                ? rpc::twoparty::Side::SERVER
                : rpc::twoparty::Side::CLIENT);
199 200 201
  return rpcSystem.bootstrap(vatId);
}

202
}  // namespace capnp