rpc.c++ 98.8 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22

#include "rpc.h"
23
#include "message.h"
24 25 26
#include <kj/debug.h>
#include <kj/vector.h>
#include <kj/async.h>
Kenton Varda's avatar
Kenton Varda committed
27
#include <kj/one-of.h>
Kenton Varda's avatar
Kenton Varda committed
28
#include <kj/function.h>
29
#include <unordered_map>
Kenton Varda's avatar
Kenton Varda committed
30
#include <map>
31 32 33 34 35 36 37 38
#include <queue>
#include <capnp/rpc.capnp.h>

namespace capnp {
namespace _ {  // private

namespace {

Kenton Varda's avatar
Kenton Varda committed
39 40 41 42 43 44 45 46 47
template <typename T>
inline constexpr uint messageSizeHint() {
  return 1 + sizeInWords<rpc::Message>() + sizeInWords<T>();
}
template <>
inline constexpr uint messageSizeHint<void>() {
  return 1 + sizeInWords<rpc::Message>();
}

48 49 50
constexpr const uint MESSAGE_TARGET_SIZE_HINT = sizeInWords<rpc::MessageTarget>() +
    sizeInWords<rpc::PromisedAnswer>() + 16;  // +16 for ops; hope that's enough

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
constexpr const uint CAP_DESCRIPTOR_SIZE_HINT = sizeInWords<rpc::CapDescriptor>() +
    sizeInWords<rpc::PromisedAnswer>();

constexpr const uint64_t MAX_SIZE_HINT = 1 << 20;

uint copySizeHint(MessageSize size) {
  uint64_t sizeHint = size.wordCount + size.capCount * CAP_DESCRIPTOR_SIZE_HINT;
  return kj::min(MAX_SIZE_HINT, sizeHint);
}

uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint, uint additional) {
  KJ_IF_MAYBE(s, sizeHint) {
    return copySizeHint(*s) + additional;
  } else {
    return 0;
  }
}

Kenton Varda's avatar
Kenton Varda committed
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
kj::Maybe<kj::Array<PipelineOp>> toPipelineOps(List<rpc::PromisedAnswer::Op>::Reader ops) {
  auto result = kj::heapArrayBuilder<PipelineOp>(ops.size());
  for (auto opReader: ops) {
    PipelineOp op;
    switch (opReader.which()) {
      case rpc::PromisedAnswer::Op::NOOP:
        op.type = PipelineOp::NOOP;
        break;
      case rpc::PromisedAnswer::Op::GET_POINTER_FIELD:
        op.type = PipelineOp::GET_POINTER_FIELD;
        op.pointerIndex = opReader.getGetPointerField();
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported pipeline op.", (uint)opReader.which()) {
          return nullptr;
        }
    }
86
    result.add(op);
Kenton Varda's avatar
Kenton Varda committed
87 88 89 90 91
  }
  return result.finish();
}

Orphan<List<rpc::PromisedAnswer::Op>> fromPipelineOps(
Kenton Varda's avatar
Kenton Varda committed
92
    Orphanage orphanage, kj::ArrayPtr<const PipelineOp> ops) {
Kenton Varda's avatar
Kenton Varda committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  auto result = orphanage.newOrphan<List<rpc::PromisedAnswer::Op>>(ops.size());
  auto builder = result.get();
  for (uint i: kj::indices(ops)) {
    rpc::PromisedAnswer::Op::Builder opBuilder = builder[i];
    switch (ops[i].type) {
      case PipelineOp::NOOP:
        opBuilder.setNoop();
        break;
      case PipelineOp::GET_POINTER_FIELD:
        opBuilder.setGetPointerField(ops[i].pointerIndex);
        break;
    }
  }
  return result;
}

Kenton Varda's avatar
Kenton Varda committed
109
kj::Exception toException(const rpc::Exception::Reader& exception) {
110 111
  return kj::Exception(static_cast<kj::Exception::Type>(exception.getType()),
      "(remote)", 0, kj::str("remote exception: ", exception.getReason()));
Kenton Varda's avatar
Kenton Varda committed
112 113 114
}

void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) {
Kenton Varda's avatar
Kenton Varda committed
115 116 117
  // TODO(someday):  Indicate the remote server name as part of the stack trace.  Maybe even
  //   transmit stack traces?
  builder.setReason(exception.getDescription());
118
  builder.setType(static_cast<rpc::Exception::Type>(exception.getType()));
Kenton Varda's avatar
Kenton Varda committed
119 120 121 122 123

  if (exception.getType() == kj::Exception::Type::FAILED &&
      !exception.getDescription().startsWith("remote exception:")) {
    KJ_LOG(INFO, "returning failure over rpc", exception);
  }
Kenton Varda's avatar
Kenton Varda committed
124 125
}

Kenton Varda's avatar
Kenton Varda committed
126
uint exceptionSizeHint(const kj::Exception& exception) {
Kenton Varda's avatar
Kenton Varda committed
127
  return sizeInWords<rpc::Exception>() + exception.getDescription().size() / sizeof(word) + 1;
Kenton Varda's avatar
Kenton Varda committed
128 129
}

Kenton Varda's avatar
Kenton Varda committed
130
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
131

132 133 134 135 136 137 138 139 140 141 142 143 144
template <typename Id, typename T>
class ExportTable {
  // Table mapping integers to T, where the integers are chosen locally.

public:
  kj::Maybe<T&> find(Id id) {
    if (id < slots.size() && slots[id] != nullptr) {
      return slots[id];
    } else {
      return nullptr;
    }
  }

145
  T erase(Id id, T& entry) {
Kenton Varda's avatar
Kenton Varda committed
146 147
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
148 149 150 151 152 153 154 155
    // `entry` is a reference to the entry being released -- we require this in order to prove
    // that the caller has already done a find() to check that this entry exists.  We can't check
    // ourselves because the caller may have nullified the entry in the meantime.
    KJ_DREQUIRE(&entry == &slots[id]);
    T toRelease = kj::mv(slots[id]);
    slots[id] = T();
    freeIds.push(id);
    return toRelease;
156 157 158 159 160 161 162 163 164 165 166 167 168
  }

  T& next(Id& id) {
    if (freeIds.empty()) {
      id = slots.size();
      return slots.add();
    } else {
      id = freeIds.top();
      freeIds.pop();
      return slots[id];
    }
  }

169 170 171 172 173 174 175 176 177
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i = 0; i < slots.size(); i++) {
      if (slots[i] != nullptr) {
        func(i, slots[i]);
      }
    }
  }

178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
private:
  kj::Vector<T> slots;
  std::priority_queue<Id, std::vector<Id>, std::greater<Id>> freeIds;
};

template <typename Id, typename T>
class ImportTable {
  // Table mapping integers to T, where the integers are chosen remotely.

public:
  T& operator[](Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      return high[id];
    }
  }

Kenton Varda's avatar
Kenton Varda committed
196 197 198 199 200
  kj::Maybe<T&> find(Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      auto iter = high.find(id);
Kenton Varda's avatar
Kenton Varda committed
201
      if (iter == high.end()) {
Kenton Varda's avatar
Kenton Varda committed
202 203 204 205 206 207 208
        return nullptr;
      } else {
        return iter->second;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
209 210 211
  T erase(Id id) {
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
212
    if (id < kj::size(low)) {
Kenton Varda's avatar
Kenton Varda committed
213
      T toRelease = kj::mv(low[id]);
214
      low[id] = T();
Kenton Varda's avatar
Kenton Varda committed
215
      return toRelease;
216
    } else {
Kenton Varda's avatar
Kenton Varda committed
217
      T toRelease = kj::mv(high[id]);
218
      high.erase(id);
Kenton Varda's avatar
Kenton Varda committed
219
      return toRelease;
220 221 222
    }
  }

223 224 225 226 227 228 229 230 231 232
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i: kj::indices(low)) {
      func(i, low[i]);
    }
    for (auto& entry: high) {
      func(entry.first, entry.second);
    }
  }

233 234 235 236 237
private:
  T low[16];
  std::unordered_map<Id, T> high;
};

Kenton Varda's avatar
Kenton Varda committed
238 239
// =======================================================================================

240
class RpcConnectionState final: public kj::TaskSet::ErrorHandler, public kj::Refcounted {
241
public:
Kenton Varda's avatar
Kenton Varda committed
242 243 244 245 246
  struct DisconnectInfo {
    kj::Promise<void> shutdownPromise;
    // Task which is working on sending an abort message and cleanly ending the connection.
  };

247
  RpcConnectionState(kj::Maybe<Capability::Client> bootstrapInterface,
248
                     kj::Maybe<RealmGateway<>::Client> gateway,
249
                     kj::Maybe<SturdyRefRestorerBase&> restorer,
250
                     kj::Own<VatNetworkBase::Connection>&& connectionParam,
Kenton Varda's avatar
Kenton Varda committed
251
                     kj::Own<kj::PromiseFulfiller<DisconnectInfo>>&& disconnectFulfiller)
252 253
      : bootstrapInterface(kj::mv(bootstrapInterface)), gateway(kj::mv(gateway)),
        restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), tasks(*this) {
254
    connection.init<Connected>(kj::mv(connectionParam));
255 256 257
    tasks.add(messageLoop());
  }

258
  kj::Own<ClientHook> restore(AnyPointer::Reader objectId) {
259 260 261 262
    if (connection.is<Disconnected>()) {
      return newBrokenCap(kj::cp(connection.get<Disconnected>()));
    }

263
    QuestionId questionId;
264
    auto& question = questions.next(questionId);
265

266
    question.isAwaitingReturn = true;
267

268
    auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
Kenton Varda's avatar
Kenton Varda committed
269

270 271
    auto questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller));
    question.selfRef = *questionRef;
Kenton Varda's avatar
Kenton Varda committed
272

273
    paf.promise = paf.promise.attach(kj::addRef(*questionRef));
274 275

    {
276
      auto message = connection.get<Connected>()->newOutgoingMessage(
277
          objectId.targetSize().wordCount + messageSizeHint<rpc::Bootstrap>());
278

279
      auto builder = message->getBody().initAs<rpc::Message>().initBootstrap();
280
      builder.setQuestionId(questionId);
281
      builder.getDeprecatedObjectId().set(objectId);
282 283 284 285

      message->send();
    }

286
    auto pipeline = kj::refcounted<RpcPipeline>(*this, kj::mv(questionRef), kj::mv(paf.promise));
287

288
    return pipeline->getPipelinedCap(kj::Array<const PipelineOp>(nullptr));
Kenton Varda's avatar
Kenton Varda committed
289 290
  }

291
  void taskFailed(kj::Exception&& exception) override {
292 293 294 295
    disconnect(kj::mv(exception));
  }

  void disconnect(kj::Exception&& exception) {
296 297 298 299 300
    if (!connection.is<Connected>()) {
      // Already disconnected.
      return;
    }

301 302
    kj::Exception networkException(kj::Exception::Type::DISCONNECTED,
        exception.getFile(), exception.getLine(), kj::heapString(exception.getDescription()));
303 304

    KJ_IF_MAYBE(newException, kj::runCatchingExceptions([&]() {
305 306
      // Carefully pull all the objects out of the tables prior to releasing them because their
      // destructors could come back and mess with the tables.
307 308 309 310
      kj::Vector<kj::Own<PipelineHook>> pipelinesToRelease;
      kj::Vector<kj::Own<ClientHook>> clientsToRelease;
      kj::Vector<kj::Promise<kj::Own<RpcResponse>>> tailCallsToRelease;
      kj::Vector<kj::Promise<void>> resolveOpsToRelease;
311 312

      // All current questions complete with exceptions.
313
      questions.forEach([&](QuestionId id, Question& question) {
Kenton Varda's avatar
Kenton Varda committed
314
        KJ_IF_MAYBE(questionRef, question.selfRef) {
315 316
          // QuestionRef still present.
          questionRef->reject(kj::cp(networkException));
Kenton Varda's avatar
Kenton Varda committed
317
        }
318 319
      });

320
      answers.forEach([&](AnswerId id, Answer& answer) {
321 322 323 324
        KJ_IF_MAYBE(p, answer.pipeline) {
          pipelinesToRelease.add(kj::mv(*p));
        }

325
        KJ_IF_MAYBE(promise, answer.redirectedResults) {
326
          tailCallsToRelease.add(kj::mv(*promise));
327 328
        }

329 330 331 332 333
        KJ_IF_MAYBE(context, answer.callContext) {
          context->requestCancel();
        }
      });

334
      exports.forEach([&](ExportId id, Export& exp) {
335
        clientsToRelease.add(kj::mv(exp.clientHook));
336
        resolveOpsToRelease.add(kj::mv(exp.resolveOp));
337 338 339
        exp = Export();
      });

340
      imports.forEach([&](ImportId id, Import& import) {
341 342
        KJ_IF_MAYBE(f, import.promiseFulfiller) {
          f->get()->reject(kj::cp(networkException));
343 344 345
        }
      });

346
      embargoes.forEach([&](EmbargoId id, Embargo& embargo) {
347 348 349 350
        KJ_IF_MAYBE(f, embargo.fulfiller) {
          f->get()->reject(kj::cp(networkException));
        }
      });
351 352 353 354 355
    })) {
      // Some destructor must have thrown an exception.  There is no appropriate place to report
      // these errors.
      KJ_LOG(ERROR, "Uncaught exception when destroying capabilities dropped by disconnect.",
             *newException);
356 357
    }

358 359 360
    // Send an abort message, but ignore failure.
    kj::runCatchingExceptions([&]() {
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
361
          messageSizeHint<void>() + exceptionSizeHint(exception));
362 363
      fromException(exception, message->getBody().getAs<rpc::Message>().initAbort());
      message->send();
364
    });
365 366

    // Indicate disconnect.
367 368 369 370 371 372 373 374 375 376 377
    auto shutdownPromise = connection.get<Connected>()->shutdown()
        .attach(kj::mv(connection.get<Connected>()))
        .then([]() -> kj::Promise<void> { return kj::READY_NOW; },
              [](kj::Exception&& e) -> kj::Promise<void> {
          // Don't report disconnects as an error.
          if (e.getType() != kj::Exception::Type::DISCONNECTED) {
            return kj::mv(e);
          }
          return kj::READY_NOW;
        });
    disconnectFulfiller->fulfill(DisconnectInfo { kj::mv(shutdownPromise) });
378
    connection.init<Disconnected>(kj::mv(networkException));
379 380
  }

381
private:
382
  class RpcClient;
Kenton Varda's avatar
Kenton Varda committed
383
  class ImportClient;
384
  class PromiseClient;
Kenton Varda's avatar
Kenton Varda committed
385
  class QuestionRef;
Kenton Varda's avatar
Kenton Varda committed
386
  class RpcPipeline;
Kenton Varda's avatar
Kenton Varda committed
387
  class RpcCallContext;
Kenton Varda's avatar
Kenton Varda committed
388
  class RpcResponse;
Kenton Varda's avatar
Kenton Varda committed
389

390 391 392 393 394 395
  // =======================================================================================
  // The Four Tables entry types
  //
  // We have to define these before we can define the class's fields.

  typedef uint32_t QuestionId;
396
  typedef QuestionId AnswerId;
397
  typedef uint32_t ExportId;
398 399 400 401 402 403 404 405 406 407
  typedef ExportId ImportId;
  // See equivalent definitions in rpc.capnp.
  //
  // We always use the type that refers to the local table of the same name.  So e.g. although
  // QuestionId and AnswerId are the same type, we use QuestionId when referring to an entry in
  // the local question table (which corresponds to the peer's answer table) and use AnswerId
  // to refer to an entry in our answer table (which corresponds to the peer's question table).
  // Since all messages in the RPC protocol are defined from the sender's point of view, this
  // means that any time we read an ID from a received message, its type should invert.
  // TODO(cleanup):  Perhaps we could enforce that in a type-safe way?  Hmm...
408 409

  struct Question {
410 411 412
    kj::Array<ExportId> paramExports;
    // List of exports that were sent in the request.  If the response has `releaseParamCaps` these
    // will need to be released.
413

Kenton Varda's avatar
Kenton Varda committed
414 415 416
    kj::Maybe<QuestionRef&> selfRef;
    // The local QuestionRef, set to nullptr when it is destroyed, which is also when `Finish` is
    // sent.
417

418 419 420
    bool isAwaitingReturn = false;
    // True from when `Call` is sent until `Return` is received.

421 422 423
    bool isTailCall = false;
    // Is this a tail call?  If so, we don't expect to receive results in the `Return`.

Kenton Varda's avatar
Kenton Varda committed
424
    inline bool operator==(decltype(nullptr)) const {
425
      return !isAwaitingReturn && selfRef == nullptr;
Kenton Varda's avatar
Kenton Varda committed
426 427
    }
    inline bool operator!=(decltype(nullptr)) const { return !operator==(nullptr); }
428 429 430
  };

  struct Answer {
431 432 433 434 435 436
    Answer() = default;
    Answer(const Answer&) = delete;
    Answer(Answer&&) = default;
    Answer& operator=(Answer&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

437 438 439 440
    bool active = false;
    // True from the point when the Call message is received to the point when both the `Finish`
    // message has been received and the `Return` has been sent.

441
    kj::Maybe<kj::Own<PipelineHook>> pipeline;
442 443
    // Send pipelined calls here.  Becomes null as soon as a `Finish` is received.

444
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> redirectedResults;
445 446
    // For locally-redirected calls (Call.sendResultsTo.yourself), this is a promise for the call
    // result, to be picked up by a subsequent `Return`.
447

448
    kj::Maybe<RpcCallContext&> callContext;
449 450
    // The call context, if it's still active.  Becomes null when the `Return` message is sent.
    // This object, if non-null, is owned by `asyncOp`.
451

452 453 454
    kj::Array<ExportId> resultExports;
    // List of exports that were sent in the results.  If the finish has `releaseResultCaps` these
    // will need to be released.
455 456 457 458 459 460
  };

  struct Export {
    uint refcount = 0;
    // When this reaches 0, drop `clientHook` and free this export.

461
    kj::Own<ClientHook> clientHook;
462

463 464 465 466
    kj::Promise<void> resolveOp = nullptr;
    // If this export is a promise (not a settled capability), the `resolveOp` represents the
    // ongoing operation to wait for that promise to resolve and then send a `Resolve` message.

467 468 469 470 471 472 473 474 475 476 477
    inline bool operator==(decltype(nullptr)) const { return refcount == 0; }
    inline bool operator!=(decltype(nullptr)) const { return refcount != 0; }
  };

  struct Import {
    Import() = default;
    Import(const Import&) = delete;
    Import(Import&&) = default;
    Import& operator=(Import&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

478
    kj::Maybe<ImportClient&> importClient;
479 480
    // Becomes null when the import is destroyed.

481 482 483 484 485
    kj::Maybe<RpcClient&> appClient;
    // Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient.
    // Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is
    // resolved and the import is no longer needed).

486
    kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Own<ClientHook>>>> promiseFulfiller;
487 488 489
    // If non-null, the import is a promise.
  };

Kenton Varda's avatar
Kenton Varda committed
490 491 492 493 494 495 496 497 498 499 500 501
  typedef uint32_t EmbargoId;

  struct Embargo {
    // For handling the `Disembargo` message when looping back to self.

    kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
    // Fulfill this when the Disembargo arrives.

    inline bool operator==(decltype(nullptr)) const { return fulfiller == nullptr; }
    inline bool operator!=(decltype(nullptr)) const { return fulfiller != nullptr; }
  };

502 503 504
  // =======================================================================================
  // OK, now we can define RpcConnectionState's member data.

505
  kj::Maybe<Capability::Client> bootstrapInterface;
506
  kj::Maybe<RealmGateway<>::Client> gateway;
507
  kj::Maybe<SturdyRefRestorerBase&> restorer;
508 509 510 511 512 513 514

  typedef kj::Own<VatNetworkBase::Connection> Connected;
  typedef kj::Exception Disconnected;
  kj::OneOf<Connected, Disconnected> connection;
  // Once the connection has failed, we drop it and replace it with an exception, which will be
  // thrown from all further calls.

Kenton Varda's avatar
Kenton Varda committed
515
  kj::Own<kj::PromiseFulfiller<DisconnectInfo>> disconnectFulfiller;
516

517 518
  ExportTable<ExportId, Export> exports;
  ExportTable<QuestionId, Question> questions;
519 520
  ImportTable<AnswerId, Answer> answers;
  ImportTable<ImportId, Import> imports;
521 522 523 524 525 526 527 528 529
  // The Four Tables!
  // The order of the tables is important for correct destruction.

  std::unordered_map<ClientHook*, ExportId> exportsByCap;
  // Maps already-exported ClientHook objects to their ID in the export table.

  ExportTable<EmbargoId, Embargo> embargoes;
  // There are only four tables.  This definitely isn't a fifth table.  I don't know what you're
  // talking about.
530 531 532 533

  kj::TaskSet tasks;

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
534
  // ClientHook implementations
535

Kenton Varda's avatar
Kenton Varda committed
536
  class RpcClient: public ClientHook, public kj::Refcounted {
537
  public:
538
    RpcClient(RpcConnectionState& connectionState)
539
        : connectionState(kj::addRef(connectionState)) {}
540

541 542 543 544
    virtual kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) = 0;
    // Writes a CapDescriptor referencing this client.  The CapDescriptor must be sent as part of
    // the very next message sent on the connection, as it may become invalid if other things
    // happen.
Kenton Varda's avatar
Kenton Varda committed
545 546 547 548
    //
    // If writing the descriptor adds a new export to the export table, or increments the refcount
    // on an existing one, then the ID is returned and the caller is responsible for removing it
    // later.
Kenton Varda's avatar
Kenton Varda committed
549

550 551
    virtual kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) = 0;
Kenton Varda's avatar
Kenton Varda committed
552
    // Writes the appropriate call target for calls to this capability and returns null.
Kenton Varda's avatar
Kenton Varda committed
553
    //
Kenton Varda's avatar
Kenton Varda committed
554 555 556 557
    // - OR -
    //
    // If calls have been redirected to some other local ClientHook, returns that hook instead.
    // This can happen if the capability represents a promise that has been resolved.
Kenton Varda's avatar
Kenton Varda committed
558

559
    virtual kj::Own<ClientHook> getInnermostClient() = 0;
Kenton Varda's avatar
Kenton Varda committed
560 561 562 563
    // If this client just wraps some other client -- even if it is only *temporarily* wrapping
    // that other client -- return a reference to the other client, transitively.  Otherwise,
    // return a new reference to *this.

Kenton Varda's avatar
Kenton Varda committed
564 565
    // implements ClientHook -----------------------------------------

566
    Request<AnyPointer, AnyPointer> newCall(
567
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
568 569 570 571 572 573 574 575 576 577 578 579
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          //
          // We pull a neat trick here: We actually end up returning a RequestHook for an import
          // request on the gateway cap, but with the "root" of the request actually pointing
          // to the "params" field of the real request.

          sizeHint = sizeHint.map([](MessageSize hint) {
            ++hint.capCount;
            hint.wordCount += sizeInWords<RealmGateway<>::ImportParams>();
580
            return hint;
581 582 583
          });

          auto request = g->importRequest(sizeHint);
584
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
585

586 587 588 589 590 591 592 593 594 595 596
          // Awkwardly, request.initParams() would return a SaveParams struct, but to construct
          // the Request<AnyPointer, AnyPointer> to return we need an AnyPointer::Builder, and you
          // can't go backwards from a struct builder to an AnyPointer builder. So instead we
          // manually get at the pointer by converting the outer request to AnyStruct and then
          // pulling the pointer from the pointer section.
          auto pointers = toAny(request).getPointerSection();
          KJ_ASSERT(pointers.size() >= 2);
          auto paramsPtr = pointers[1];
          KJ_ASSERT(paramsPtr.isNull());

          return Request<AnyPointer, AnyPointer>(paramsPtr, RequestHook::from(kj::mv(request)));
597 598 599
        }
      }

600 601 602 603 604
      return newCallNoIntercept(interfaceId, methodId, sizeHint);
    }

    Request<AnyPointer, AnyPointer> newCallNoIntercept(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) {
605 606 607 608
      if (!connectionState->connection.is<Connected>()) {
        return newBrokenRequest(kj::cp(connectionState->connection.get<Disconnected>()), sizeHint);
      }

609
      auto request = kj::heap<RpcRequest>(
610 611
          *connectionState, *connectionState->connection.get<Connected>(),
          sizeHint, kj::addRef(*this));
612 613 614 615 616 617
      auto callBuilder = request->getCall();

      callBuilder.setInterfaceId(interfaceId);
      callBuilder.setMethodId(methodId);

      auto root = request->getRoot();
618
      return Request<AnyPointer, AnyPointer>(root, kj::mv(request));
619 620
    }

Kenton Varda's avatar
Kenton Varda committed
621
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
622
                                kj::Own<CallContextHook>&& context) override {
623 624 625 626 627 628 629 630 631 632 633
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          auto params = context->getParams().getAs<Persistent<>::SaveParams>();

          auto requestSize = params.totalSize();
          ++requestSize.capCount;
          requestSize.wordCount += sizeInWords<RealmGateway<>::ImportParams>();

          auto request = g->importRequest(requestSize);
634
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
635 636 637 638 639 640 641 642
          request.setParams(params);

          context->allowCancellation();
          context->releaseParams();
          return context->directTailCall(RequestHook::from(kj::mv(request)));
        }
      }

643 644 645 646 647 648 649
      return callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    VoidPromiseAndPipeline callNoIntercept(uint64_t interfaceId, uint16_t methodId,
                                           kj::Own<CallContextHook>&& context) {
      // Implement call() by copying params and results messages.

Kenton Varda's avatar
Kenton Varda committed
650
      auto params = context->getParams();
651
      auto request = newCall(interfaceId, methodId, params.targetSize());
Kenton Varda's avatar
Kenton Varda committed
652

653
      request.set(params);
Kenton Varda's avatar
Kenton Varda committed
654 655
      context->releaseParams();

656
      // We can and should propagate cancellation.
657
      context->allowCancellation();
658

659
      return context->directTailCall(RequestHook::from(kj::mv(request)));
Kenton Varda's avatar
Kenton Varda committed
660 661
    }

662
    kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
663 664
      return kj::addRef(*this);
    }
665
    const void* getBrand() override {
666
      return connectionState.get();
Kenton Varda's avatar
Kenton Varda committed
667 668
    }

669
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
670 671
  };

672 673 674 675
  class ImportClient final: public RpcClient {
    // A ClientHook that wraps an entry in the import table.

  public:
676
    ImportClient(RpcConnectionState& connectionState, ImportId importId)
Kenton Varda's avatar
Kenton Varda committed
677 678
        : RpcClient(connectionState), importId(importId) {}

Kenton Varda's avatar
Kenton Varda committed
679
    ~ImportClient() noexcept(false) {
680
      unwindDetector.catchExceptionsIfUnwinding([&]() {
681
        // Remove self from the import table, if the table is still pointing at us.
682 683 684 685 686
        KJ_IF_MAYBE(import, connectionState->imports.find(importId)) {
          KJ_IF_MAYBE(i, import->importClient) {
            if (i == this) {
              connectionState->imports.erase(importId);
            }
687 688
          }
        }
Kenton Varda's avatar
Kenton Varda committed
689

690
        // Send a message releasing our remote references.
691 692
        if (remoteRefcount > 0 && connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
693 694 695 696 697 698 699
              messageSizeHint<rpc::Release>());
          rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease();
          builder.setId(importId);
          builder.setReferenceCount(remoteRefcount);
          message->send();
        }
      });
700 701
    }

702 703 704
    void addRemoteRef() {
      // Add a new RemoteRef and return a new ref to this client representing it.
      ++remoteRefcount;
Kenton Varda's avatar
Kenton Varda committed
705
    }
706

707
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
708
      descriptor.setReceiverHosted(importId);
Kenton Varda's avatar
Kenton Varda committed
709
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
710 711
    }

712 713
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
714
      target.setImportedCap(importId);
Kenton Varda's avatar
Kenton Varda committed
715
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
716 717
    }

718
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
719 720 721
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
722
    // implements ClientHook -----------------------------------------
723

724
    kj::Maybe<ClientHook&> getResolved() override {
725 726 727
      return nullptr;
    }

728
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
729 730 731
      return nullptr;
    }

Kenton Varda's avatar
Kenton Varda committed
732
  private:
733
    ImportId importId;
Kenton Varda's avatar
Kenton Varda committed
734 735 736

    uint remoteRefcount = 0;
    // Number of times we've received this import from the peer.
737 738

    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
739 740
  };

741 742 743
  class PipelineClient final: public RpcClient {
    // A ClientHook representing a pipelined promise.  Always wrapped in PromiseClient.

Kenton Varda's avatar
Kenton Varda committed
744
  public:
745 746
    PipelineClient(RpcConnectionState& connectionState,
                   kj::Own<QuestionRef>&& questionRef,
747
                   kj::Array<PipelineOp>&& ops)
Kenton Varda's avatar
Kenton Varda committed
748
        : RpcClient(connectionState), questionRef(kj::mv(questionRef)), ops(kj::mv(ops)) {}
Kenton Varda's avatar
Kenton Varda committed
749

750
   kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
751 752 753 754 755
      auto promisedAnswer = descriptor.initReceiverAnswer();
      promisedAnswer.setQuestionId(questionRef->getId());
      promisedAnswer.adoptTransform(fromPipelineOps(
          Orphanage::getForMessageContaining(descriptor), ops));
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
756 757
    }

758 759
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
Kenton Varda's avatar
Kenton Varda committed
760 761 762 763
      auto builder = target.initPromisedAnswer();
      builder.setQuestionId(questionRef->getId());
      builder.adoptTransform(fromPipelineOps(Orphanage::getForMessageContaining(builder), ops));
      return nullptr;
764 765
    }

766
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
767 768 769
      return kj::addRef(*this);
    }

770
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
771

772
    kj::Maybe<ClientHook&> getResolved() override {
773 774 775
      return nullptr;
    }

776
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
777
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
778 779 780
    }

  private:
781
    kj::Own<QuestionRef> questionRef;
782
    kj::Array<PipelineOp> ops;
Kenton Varda's avatar
Kenton Varda committed
783 784
  };

785 786 787 788
  class PromiseClient final: public RpcClient {
    // A ClientHook that initially wraps one client (in practice, an ImportClient or a
    // PipelineClient) and then, later on, redirects to some other client.

Kenton Varda's avatar
Kenton Varda committed
789
  public:
790 791 792
    PromiseClient(RpcConnectionState& connectionState,
                  kj::Own<ClientHook> initial,
                  kj::Promise<kj::Own<ClientHook>> eventual,
793
                  kj::Maybe<ImportId> importId)
794
        : RpcClient(connectionState),
795 796
          isResolved(false),
          cap(kj::mv(initial)),
797
          importId(importId),
798 799 800
          fork(eventual.fork()),
          resolveSelfPromise(fork.addBranch().then(
              [this](kj::Own<ClientHook>&& resolution) {
801
                resolve(kj::mv(resolution), false);
802
              }, [this](kj::Exception&& exception) {
803
                resolve(newBrokenCap(kj::mv(exception)), true);
804 805 806 807
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
808
              })) {
809 810 811 812 813 814
      // Create a client that starts out forwarding all calls to `initial` but, once `eventual`
      // resolves, will forward there instead.  In addition, `whenMoreResolved()` will return a fork
      // of `eventual`.  Note that this means the application could hold on to `eventual` even after
      // the `PromiseClient` is destroyed; `eventual` must therefore make sure to hold references to
      // anything that needs to stay alive in order to resolve it correctly (such as making sure the
      // import ID is not released).
Kenton Varda's avatar
Kenton Varda committed
815
    }
Kenton Varda's avatar
Kenton Varda committed
816

817 818 819 820 821 822
    ~PromiseClient() noexcept(false) {
      KJ_IF_MAYBE(id, importId) {
        // This object is representing an import promise.  That means the import table may still
        // contain a pointer back to it.  Remove that pointer.  Note that we have to verify that
        // the import still exists and the pointer still points back to this object because this
        // object may actually outlive the import.
823
        KJ_IF_MAYBE(import, connectionState->imports.find(*id)) {
824 825 826 827 828 829 830 831 832
          KJ_IF_MAYBE(c, import->appClient) {
            if (c == this) {
              import->appClient = nullptr;
            }
          }
        }
      }
    }

833
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
834
      receivedCall = true;
835
      return connectionState->writeDescriptor(*cap, descriptor);
Kenton Varda's avatar
Kenton Varda committed
836
    }
Kenton Varda's avatar
Kenton Varda committed
837

838 839
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
840
      receivedCall = true;
841
      return connectionState->writeTarget(*cap, target);
Kenton Varda's avatar
Kenton Varda committed
842 843
    }

844
    kj::Own<ClientHook> getInnermostClient() override {
845
      receivedCall = true;
846
      return connectionState->getInnermostClient(*cap);
Kenton Varda's avatar
Kenton Varda committed
847 848
    }

Kenton Varda's avatar
Kenton Varda committed
849
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
850

851
    Request<AnyPointer, AnyPointer> newCall(
852
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
853
      receivedCall = true;
854
      return cap->newCall(interfaceId, methodId, sizeHint);
855 856
    }

857
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
858
                                kj::Own<CallContextHook>&& context) override {
859
      receivedCall = true;
860
      return cap->call(interfaceId, methodId, kj::mv(context));
861 862
    }

863
    kj::Maybe<ClientHook&> getResolved() override {
864 865
      if (isResolved) {
        return *cap;
866 867 868
      } else {
        return nullptr;
      }
Kenton Varda's avatar
Kenton Varda committed
869 870
    }

871
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
872
      return fork.addBranch();
Kenton Varda's avatar
Kenton Varda committed
873
    }
Kenton Varda's avatar
Kenton Varda committed
874 875

  private:
876 877
    bool isResolved;
    kj::Own<ClientHook> cap;
878

879
    kj::Maybe<ImportId> importId;
880
    kj::ForkedPromise<kj::Own<ClientHook>> fork;
Kenton Varda's avatar
Kenton Varda committed
881 882 883 884 885

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

886
    bool receivedCall = false;
Kenton Varda's avatar
Kenton Varda committed
887

888
    void resolve(kj::Own<ClientHook> replacement, bool isError) {
889 890
      if (replacement->getBrand() != connectionState.get() && receivedCall && !isError &&
          connectionState->connection.is<Connected>()) {
891 892 893 894 895
        // The new capability is hosted locally, not on the remote machine.  And, we had made calls
        // to the promise.  We need to make sure those calls echo back to us before we allow new
        // calls to go directly to the local capability, so we need to set a local embargo and send
        // a `Disembargo` to echo through the peer.

896
        auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
897
            messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
Kenton Varda's avatar
Kenton Varda committed
898

Kenton Varda's avatar
Kenton Varda committed
899
        auto disembargo = message->getBody().initAs<rpc::Message>().initDisembargo();
Kenton Varda's avatar
Kenton Varda committed
900 901

        {
902
          auto redirect = connectionState->writeTarget(*cap, disembargo.initTarget());
Kenton Varda's avatar
Kenton Varda committed
903 904 905 906 907
          KJ_ASSERT(redirect == nullptr,
                    "Original promise target should always be from this RPC connection.");
        }

        EmbargoId embargoId;
908
        Embargo& embargo = connectionState->embargoes.next(embargoId);
Kenton Varda's avatar
Kenton Varda committed
909 910 911 912 913 914 915

        disembargo.getContext().setSenderLoopback(embargoId);

        auto paf = kj::newPromiseAndFulfiller<void>();
        embargo.fulfiller = kj::mv(paf.fulfiller);

        // Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back.
916 917
        auto embargoPromise = paf.promise.then(
            kj::mvCapture(replacement, [this](kj::Own<ClientHook>&& replacement) {
Kenton Varda's avatar
Kenton Varda committed
918 919 920 921 922
              return kj::mv(replacement);
            }));

        // We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise
        // client instead.
923
        replacement = newLocalPromiseClient(kj::mv(embargoPromise));
Kenton Varda's avatar
Kenton Varda committed
924 925 926 927 928

        // Send the `Disembargo`.
        message->send();
      }

929 930
      cap = replacement->addRef();
      isResolved = true;
Kenton Varda's avatar
Kenton Varda committed
931
    }
Kenton Varda's avatar
Kenton Varda committed
932
  };
933

934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982
  class NoInterceptClient final: public RpcClient {
    // A wrapper around an RpcClient which bypasses special handling of "save" requests. When we
    // intercept a "save" request and invoke a RealmGateway, we give it a version of the capability
    // with intercepting disabled, since usually the first thing the RealmGateway will do is turn
    // around and call save() again.
    //
    // This is admittedly sort of backwards: the interception of "save" ought to be the part
    // implemented by a wrapper. However, that would require placing a wrapper around every
    // RpcClient we create whereas NoInterceptClient only needs to be injected after a save()
    // request occurs and is intercepted.

  public:
    NoInterceptClient(RpcClient& inner)
        : RpcClient(*inner.connectionState),
          inner(kj::addRef(inner)) {}

    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
      return inner->writeDescriptor(descriptor);
    }

    kj::Maybe<kj::Own<ClientHook>> writeTarget(rpc::MessageTarget::Builder target) override {
      return inner->writeTarget(target);
    }

    kj::Own<ClientHook> getInnermostClient() override {
      return inner->getInnermostClient();
    }

    Request<AnyPointer, AnyPointer> newCall(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
      return inner->newCallNoIntercept(interfaceId, methodId, sizeHint);
    }
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
                                kj::Own<CallContextHook>&& context) override {
      return inner->callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    kj::Maybe<ClientHook&> getResolved() override {
      return nullptr;
    }

    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
      return nullptr;
    }

  private:
    kj::Own<RpcClient> inner;
  };

983
  kj::Maybe<ExportId> writeDescriptor(ClientHook& cap, rpc::CapDescriptor::Builder descriptor) {
984
    // Write a descriptor for the given capability.
Kenton Varda's avatar
Kenton Varda committed
985

986
    // Find the innermost wrapped capability.
987
    ClientHook* inner = &cap;
988 989 990 991 992 993 994 995 996
    for (;;) {
      KJ_IF_MAYBE(resolved, inner->getResolved()) {
        inner = resolved;
      } else {
        break;
      }
    }

    if (inner->getBrand() == this) {
997
      return kj::downcast<RpcClient>(*inner).writeDescriptor(descriptor);
Kenton Varda's avatar
Kenton Varda committed
998
    } else {
999 1000
      auto iter = exportsByCap.find(inner);
      if (iter != exportsByCap.end()) {
1001
        // We've already seen and exported this capability before.  Just up the refcount.
1002
        auto& exp = KJ_ASSERT_NONNULL(exports.find(iter->second));
Kenton Varda's avatar
Kenton Varda committed
1003 1004 1005 1006
        ++exp.refcount;
        descriptor.setSenderHosted(iter->second);
        return iter->second;
      } else {
1007
        // This is the first time we've seen this capability.
Kenton Varda's avatar
Kenton Varda committed
1008
        ExportId exportId;
1009 1010
        auto& exp = exports.next(exportId);
        exportsByCap[inner] = exportId;
Kenton Varda's avatar
Kenton Varda committed
1011
        exp.refcount = 1;
1012 1013 1014 1015
        exp.clientHook = inner->addRef();

        KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) {
          // This is a promise.  Arrange for the `Resolve` message to be sent later.
Kenton Varda's avatar
Kenton Varda committed
1016
          exp.resolveOp = resolveExportedPromise(exportId, kj::mv(*wrapped));
1017 1018 1019
          descriptor.setSenderPromise(exportId);
        } else {
          descriptor.setSenderHosted(exportId);
1020 1021
        }

Kenton Varda's avatar
Kenton Varda committed
1022 1023
        return exportId;
      }
Kenton Varda's avatar
Kenton Varda committed
1024 1025 1026
    }
  }

1027
  kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> capTable,
1028 1029 1030 1031
                                       rpc::Payload::Builder payload) {
    auto capTableBuilder = payload.initCapTable(capTable.size());
    kj::Vector<ExportId> exports(capTable.size());
    for (uint i: kj::indices(capTable)) {
1032 1033 1034 1035 1036 1037
      KJ_IF_MAYBE(cap, capTable[i]) {
        KJ_IF_MAYBE(exportId, writeDescriptor(**cap, capTableBuilder[i])) {
          exports.add(*exportId);
        }
      } else {
        capTableBuilder[i].setNone();
1038 1039 1040 1041 1042
      }
    }
    return exports.releaseAsArray();
  }

1043
  kj::Maybe<kj::Own<ClientHook>> writeTarget(ClientHook& cap, rpc::MessageTarget::Builder target) {
Kenton Varda's avatar
Kenton Varda committed
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
    // If calls to the given capability should pass over this connection, fill in `target`
    // appropriately for such a call and return nullptr.  Otherwise, return a `ClientHook` to which
    // the call should be forwarded; the caller should then delegate the call to that `ClientHook`.
    //
    // The main case where this ends up returning non-null is if `cap` is a promise that has
    // recently resolved.  The application might have started building a request before the promise
    // resolved, and so the request may have been built on the assumption that it would be sent over
    // this network connection, but then the promise resolved to point somewhere else before the
    // request was sent.  Now the request has to be redirected to the new target instead.

    if (cap.getBrand() == this) {
1055
      return kj::downcast<RpcClient>(cap).writeTarget(target);
Kenton Varda's avatar
Kenton Varda committed
1056 1057 1058 1059 1060
    } else {
      return cap.addRef();
    }
  }

1061 1062
  kj::Own<ClientHook> getInnermostClient(ClientHook& client) {
    ClientHook* ptr = &client;
Kenton Varda's avatar
Kenton Varda committed
1063 1064 1065 1066 1067 1068 1069 1070 1071
    for (;;) {
      KJ_IF_MAYBE(inner, ptr->getResolved()) {
        ptr = inner;
      } else {
        break;
      }
    }

    if (ptr->getBrand() == this) {
1072
      return kj::downcast<RpcClient>(*ptr).getInnermostClient();
Kenton Varda's avatar
Kenton Varda committed
1073 1074 1075 1076 1077 1078
    } else {
      return ptr->addRef();
    }
  }

  kj::Promise<void> resolveExportedPromise(
1079
      ExportId exportId, kj::Promise<kj::Own<ClientHook>>&& promise) {
Kenton Varda's avatar
Kenton Varda committed
1080 1081 1082 1083
    // Implements exporting of a promise.  The promise has been exported under the given ID, and is
    // to eventually resolve to the ClientHook produced by `promise`.  This method waits for that
    // resolve to happen and then sends the appropriate `Resolve` message to the peer.

1084
    return promise.then(
1085
        [this,exportId](kj::Own<ClientHook>&& resolution) -> kj::Promise<void> {
Kenton Varda's avatar
Kenton Varda committed
1086 1087
      // Successful resolution.

1088 1089 1090 1091 1092
      KJ_ASSERT(connection.is<Connected>(),
                "Resolving export should have been canceled on disconnect.") {
        return kj::READY_NOW;
      }

Kenton Varda's avatar
Kenton Varda committed
1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103
      // Get the innermost ClientHook backing the resolved client.  This includes traversing
      // PromiseClients that haven't resolved yet to their underlying ImportClient or
      // PipelineClient, so that we get a remote promise that might resolve later.  This is
      // important to make sure that if the peer sends a `Disembargo` back to us, it bounces back
      // correctly instead of going to the result of some future resolution.  See the documentation
      // for `Disembargo` in `rpc.capnp`.
      resolution = getInnermostClient(*resolution);

      // Update the export table to point at this object instead.  We know that our entry in the
      // export table is still live because when it is destroyed the asynchronous resolution task
      // (i.e. this code) is canceled.
1104 1105
      auto& exp = KJ_ASSERT_NONNULL(exports.find(exportId));
      exportsByCap.erase(exp.clientHook);
Kenton Varda's avatar
Kenton Varda committed
1106 1107
      exp.clientHook = kj::mv(resolution);

1108
      if (exp.clientHook->getBrand() != this) {
Kenton Varda's avatar
Kenton Varda committed
1109 1110 1111
        // We're resolving to a local capability.  If we're resolving to a promise, we might be
        // able to reuse our export table entry and avoid sending a message.

1112
        KJ_IF_MAYBE(promise, exp.clientHook->whenMoreResolved()) {
Kenton Varda's avatar
Kenton Varda committed
1113 1114 1115 1116
          // We're replacing a promise with another local promise.  In this case, we might actually
          // be able to just reuse the existing export table entry to represent the new promise --
          // unless it already has an entry.  Let's check.

1117
          auto insertResult = exportsByCap.insert(std::make_pair(exp.clientHook.get(), exportId));
Kenton Varda's avatar
Kenton Varda committed
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128

          if (insertResult.second) {
            // The new promise was not already in the table, therefore the existing export table
            // entry has now been repurposed to represent it.  There is no need to send a resolve
            // message at all.  We do, however, have to start resolving the next promise.
            return resolveExportedPromise(exportId, kj::mv(*promise));
          }
        }
      }

      // OK, we have to send a `Resolve` message.
1129
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1130 1131 1132
          messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
1133
      writeDescriptor(*exp.clientHook, resolve.initCap());
Kenton Varda's avatar
Kenton Varda committed
1134 1135 1136 1137 1138
      message->send();

      return kj::READY_NOW;
    }, [this,exportId](kj::Exception&& exception) {
      // send error resolution
1139
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1140 1141 1142 1143 1144
          messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
      fromException(exception, resolve.initException());
      message->send();
1145 1146 1147
    }).eagerlyEvaluate([this](kj::Exception&& exception) {
      // Put the exception on the TaskSet which will cause the connection to be terminated.
      tasks.add(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1148 1149 1150
    });
  }

Kenton Varda's avatar
Kenton Varda committed
1151
  // =====================================================================================
1152
  // Interpreting CapDescriptor
Kenton Varda's avatar
Kenton Varda committed
1153

1154
  kj::Own<ClientHook> import(ImportId importId, bool isPromise) {
1155
    // Receive a new import.
Kenton Varda's avatar
Kenton Varda committed
1156

1157 1158
    auto& import = imports[importId];
    kj::Own<ImportClient> importClient;
Kenton Varda's avatar
Kenton Varda committed
1159

1160 1161 1162 1163 1164 1165
    // Create the ImportClient, or if one already exists, use it.
    KJ_IF_MAYBE(c, import.importClient) {
      importClient = kj::addRef(*c);
    } else {
      importClient = kj::refcounted<ImportClient>(*this, importId);
      import.importClient = *importClient;
Kenton Varda's avatar
Kenton Varda committed
1166
    }
1167

1168 1169
    // We just received a copy of this import ID, so the remote refcount has gone up.
    importClient->addRemoteRef();
Kenton Varda's avatar
Kenton Varda committed
1170

1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188
    if (isPromise) {
      // We need to construct a PromiseClient around this import, if we haven't already.
      KJ_IF_MAYBE(c, import.appClient) {
        // Use the existing one.
        return kj::addRef(*c);
      } else {
        // Create a promise for this import's resolution.
        auto paf = kj::newPromiseAndFulfiller<kj::Own<ClientHook>>();
        import.promiseFulfiller = kj::mv(paf.fulfiller);

        // Make sure the import is not destroyed while this promise exists.
        paf.promise = paf.promise.attach(kj::addRef(*importClient));

        // Create a PromiseClient around it and return it.
        auto result = kj::refcounted<PromiseClient>(
            *this, kj::mv(importClient), kj::mv(paf.promise), importId);
        import.appClient = *result;
        return kj::mv(result);
1189
      }
1190 1191 1192
    } else {
      import.appClient = *importClient;
      return kj::mv(importClient);
1193
    }
1194
  }
1195

1196
  kj::Maybe<kj::Own<ClientHook>> receiveCap(rpc::CapDescriptor::Reader descriptor) {
1197 1198
    switch (descriptor.which()) {
      case rpc::CapDescriptor::NONE:
1199
        return nullptr;
1200

1201 1202 1203 1204
      case rpc::CapDescriptor::SENDER_HOSTED:
        return import(descriptor.getSenderHosted(), false);
      case rpc::CapDescriptor::SENDER_PROMISE:
        return import(descriptor.getSenderPromise(), true);
1205

1206 1207 1208 1209
      case rpc::CapDescriptor::RECEIVER_HOSTED:
        KJ_IF_MAYBE(exp, exports.find(descriptor.getReceiverHosted())) {
          return exp->clientHook->addRef();
        } else {
1210 1211 1212
          return newBrokenCap("invalid 'receiverHosted' export ID");
        }

1213 1214
      case rpc::CapDescriptor::RECEIVER_ANSWER: {
        auto promisedAnswer = descriptor.getReceiverAnswer();
1215

1216 1217 1218 1219 1220 1221 1222
        KJ_IF_MAYBE(answer, answers.find(promisedAnswer.getQuestionId())) {
          if (answer->active) {
            KJ_IF_MAYBE(pipeline, answer->pipeline) {
              KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
                return pipeline->get()->getPipelinedCap(*ops);
              } else {
                return newBrokenCap("unrecognized pipeline ops");
Kenton Varda's avatar
Kenton Varda committed
1223 1224
              }
            }
1225
          }
1226 1227
        }

1228
        return newBrokenCap("invalid 'receiverAnswer'");
1229 1230
      }

1231 1232 1233
      case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
        // We don't support third-party caps, so use the vine instead.
        return import(descriptor.getThirdPartyHosted().getVineId(), false);
Kenton Varda's avatar
Kenton Varda committed
1234

1235 1236 1237
      default:
        KJ_FAIL_REQUIRE("unknown CapDescriptor type") { break; }
        return newBrokenCap("unknown CapDescriptor type");
Kenton Varda's avatar
Kenton Varda committed
1238
    }
1239
  }
1240

1241 1242
  kj::Array<kj::Maybe<kj::Own<ClientHook>>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable) {
    auto result = kj::heapArrayBuilder<kj::Maybe<kj::Own<ClientHook>>>(capTable.size());
1243 1244
    for (auto cap: capTable) {
      result.add(receiveCap(cap));
Kenton Varda's avatar
Kenton Varda committed
1245
    }
1246 1247
    return result.finish();
  }
1248 1249

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
1250
  // RequestHook/PipelineHook/ResponseHook implementations
1251

Kenton Varda's avatar
Kenton Varda committed
1252 1253 1254 1255
  class QuestionRef: public kj::Refcounted {
    // A reference to an entry on the question table.  Used to detect when the `Finish` message
    // can be sent.

Kenton Varda's avatar
Kenton Varda committed
1256
  public:
1257
    inline QuestionRef(
1258 1259 1260
        RpcConnectionState& connectionState, QuestionId id,
        kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller)
        : connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1261 1262

    ~QuestionRef() {
1263
      unwindDetector.catchExceptionsIfUnwinding([&]() {
1264 1265 1266
        auto& question = KJ_ASSERT_NONNULL(
            connectionState->questions.find(id), "Question ID no longer on table?");

1267
        // Send the "Finish" message (if the connection is not already broken).
1268 1269
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1270
              messageSizeHint<rpc::Finish>());
1271 1272
          auto builder = message->getBody().getAs<rpc::Message>().initFinish();
          builder.setQuestionId(id);
1273 1274 1275 1276 1277
          // If we're still awaiting a return, then this request is being canceled, and we're going
          // to ignore any capabilities in the return message, so set releaseResultCaps true. If we
          // already received the return, then we've already built local proxies for the caps and
          // will send Release messages when those are destoryed.
          builder.setReleaseResultCaps(question.isAwaitingReturn);
1278
          message->send();
1279
        }
Kenton Varda's avatar
Kenton Varda committed
1280

1281 1282 1283
        // Check if the question has returned and, if so, remove it from the table.
        // Remove question ID from the table.  Must do this *after* sending `Finish` to ensure that
        // the ID is not re-allocated before the `Finish` message can be sent.
1284 1285
        if (question.isAwaitingReturn) {
          // Still waiting for return, so just remove the QuestionRef pointer from the table.
1286
          question.selfRef = nullptr;
1287 1288 1289
        } else {
          // Call has already returned, so we can now remove it from the table.
          connectionState->questions.erase(id, question);
1290 1291
        }
      });
Kenton Varda's avatar
Kenton Varda committed
1292 1293 1294 1295
    }

    inline QuestionId getId() const { return id; }

1296
    void fulfill(kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1297 1298 1299
      fulfiller->fulfill(kj::mv(response));
    }

1300
    void fulfill(kj::Promise<kj::Own<RpcResponse>>&& promise) {
1301 1302 1303
      fulfiller->fulfill(kj::mv(promise));
    }

Kenton Varda's avatar
Kenton Varda committed
1304 1305
    void reject(kj::Exception&& exception) {
      fulfiller->reject(kj::mv(exception));
1306
    }
Kenton Varda's avatar
Kenton Varda committed
1307 1308

  private:
1309
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1310
    QuestionId id;
1311
    kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller;
1312
    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
1313 1314
  };

Kenton Varda's avatar
Kenton Varda committed
1315
  class RpcRequest final: public RequestHook {
1316
  public:
1317 1318
    RpcRequest(RpcConnectionState& connectionState, VatNetworkBase::Connection& connection,
               kj::Maybe<MessageSize> sizeHint, kj::Own<RpcClient>&& target)
1319
        : connectionState(kj::addRef(connectionState)),
Kenton Varda's avatar
Kenton Varda committed
1320
          target(kj::mv(target)),
1321
          message(connection.newOutgoingMessage(
1322 1323
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Call>() +
                  sizeInWords<rpc::Payload>() + MESSAGE_TARGET_SIZE_HINT))),
Kenton Varda's avatar
Kenton Varda committed
1324
          callBuilder(message->getBody().getAs<rpc::Message>().initCall()),
1325
          paramsBuilder(callBuilder.getParams().getContent()) {}
Kenton Varda's avatar
Kenton Varda committed
1326

1327
    inline AnyPointer::Builder getRoot() {
Kenton Varda's avatar
Kenton Varda committed
1328 1329
      return paramsBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1330 1331 1332
    inline rpc::Call::Builder getCall() {
      return callBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1333

1334
    RemotePromise<AnyPointer> send() override {
1335
      if (!connectionState->connection.is<Connected>()) {
1336
        // Connection is broken.
1337
        const kj::Exception& e = connectionState->connection.get<Disconnected>();
1338
        return RemotePromise<AnyPointer>(
1339 1340
            kj::Promise<Response<AnyPointer>>(kj::cp(e)),
            AnyPointer::Pipeline(newBrokenPipeline(kj::cp(e))));
1341
      }
Kenton Varda's avatar
Kenton Varda committed
1342

1343 1344 1345
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // We'll have to make a new request and do a copy.  Ick.
Kenton Varda's avatar
Kenton Varda committed
1346

1347
        auto replacement = redirect->get()->newCall(
1348
            callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize());
1349 1350 1351
        replacement.set(paramsBuilder);
        return replacement.send();
      } else {
1352
        auto sendResult = sendInternal(false);
Kenton Varda's avatar
Kenton Varda committed
1353

1354
        auto forkedPromise = sendResult.promise.fork();
1355

1356 1357 1358
        // The pipeline must get notified of resolution before the app does to maintain ordering.
        auto pipeline = kj::refcounted<RpcPipeline>(
            *connectionState, kj::mv(sendResult.questionRef), forkedPromise.addBranch());
Kenton Varda's avatar
Kenton Varda committed
1359

1360 1361 1362 1363 1364
        auto appPromise = forkedPromise.addBranch().then(
            [=](kj::Own<RpcResponse>&& response) {
              auto reader = response->getResults();
              return Response<AnyPointer>(reader, kj::mv(response));
            });
Kenton Varda's avatar
Kenton Varda committed
1365

1366 1367 1368 1369
        return RemotePromise<AnyPointer>(
            kj::mv(appPromise),
            AnyPointer::Pipeline(kj::mv(pipeline)));
      }
Kenton Varda's avatar
Kenton Varda committed
1370 1371
    }

1372 1373 1374
    struct TailInfo {
      QuestionId questionId;
      kj::Promise<void> promise;
1375
      kj::Own<PipelineHook> pipeline;
1376 1377 1378 1379 1380 1381 1382 1383 1384 1385
    };

    kj::Maybe<TailInfo> tailSend() {
      // Send the request as a tail call.
      //
      // Returns null if for some reason a tail call is not possible and the caller should fall
      // back to using send() and copying the response.

      SendInternalResult sendResult;

1386
      if (!connectionState->connection.is<Connected>()) {
1387 1388 1389
        // Disconnected; fall back to a regular send() which will fail appropriately.
        return nullptr;
      }
1390

1391 1392 1393 1394 1395 1396
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // Fall back to regular send().
        return nullptr;
      } else {
        sendResult = sendInternal(true);
1397 1398
      }

1399
      auto promise = sendResult.promise.then([](kj::Own<RpcResponse>&& response) {
1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
        // Response should be null if `Return` handling code is correct.
        KJ_ASSERT(!response) { break; }
      });

      QuestionId questionId = sendResult.questionRef->getId();

      auto pipeline = kj::refcounted<RpcPipeline>(*connectionState, kj::mv(sendResult.questionRef));

      return TailInfo { questionId, kj::mv(promise), kj::mv(pipeline) };
    }

1411
    const void* getBrand() override {
1412 1413 1414
      return connectionState.get();
    }

Kenton Varda's avatar
Kenton Varda committed
1415
  private:
1416
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1417

1418
    kj::Own<RpcClient> target;
Kenton Varda's avatar
Kenton Varda committed
1419 1420
    kj::Own<OutgoingRpcMessage> message;
    rpc::Call::Builder callBuilder;
1421
    AnyPointer::Builder paramsBuilder;
1422 1423 1424

    struct SendInternalResult {
      kj::Own<QuestionRef> questionRef;
1425
      kj::Promise<kj::Own<RpcResponse>> promise = nullptr;
1426 1427
    };

1428
    SendInternalResult sendInternal(bool isTailCall) {
1429 1430
      // Build the cap table.
      auto exports = connectionState->writeDescriptors(
1431
          message->getCapTable(), callBuilder.getParams());
1432

1433
      // Init the question table.  Do this after writing descriptors to avoid interference.
1434
      QuestionId questionId;
1435
      auto& question = connectionState->questions.next(questionId);
1436 1437 1438
      question.isAwaitingReturn = true;
      question.paramExports = kj::mv(exports);
      question.isTailCall = isTailCall;
1439

1440
      // Finish and send.
1441 1442 1443 1444
      callBuilder.setQuestionId(questionId);
      if (isTailCall) {
        callBuilder.getSendResultsTo().setYourself();
      }
1445
      message->send();
1446

1447
      // Make the result promise.
1448
      SendInternalResult result;
1449
      auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
1450
      result.questionRef = kj::refcounted<QuestionRef>(
1451
          *connectionState, questionId, kj::mv(paf.fulfiller));
1452
      question.selfRef = *result.questionRef;
1453
      result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
1454

1455
      // Send and return.
1456 1457
      return kj::mv(result);
    }
Kenton Varda's avatar
Kenton Varda committed
1458 1459
  };

Kenton Varda's avatar
Kenton Varda committed
1460
  class RpcPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1461
  public:
1462 1463
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef,
                kj::Promise<kj::Own<RpcResponse>>&& redirectLaterParam)
1464
        : connectionState(kj::addRef(connectionState)),
1465 1466 1467
          redirectLater(redirectLaterParam.fork()),
          resolveSelfPromise(KJ_ASSERT_NONNULL(redirectLater).addBranch().then(
              [this](kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1468
                resolve(kj::mv(response));
1469
              }, [this](kj::Exception&& exception) {
Kenton Varda's avatar
Kenton Varda committed
1470
                resolve(kj::mv(exception));
1471 1472 1473 1474
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
1475 1476 1477
              })) {
      // Construct a new RpcPipeline.

1478
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1479
    }
Kenton Varda's avatar
Kenton Varda committed
1480

1481
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef)
1482 1483 1484 1485
        : connectionState(kj::addRef(connectionState)),
          resolveSelfPromise(nullptr) {
      // Construct a new RpcPipeline that is never expected to resolve.

1486
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1487
    }
Kenton Varda's avatar
Kenton Varda committed
1488 1489 1490

    // implements PipelineHook ---------------------------------------

1491
    kj::Own<PipelineHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1492 1493 1494
      return kj::addRef(*this);
    }

1495
    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
Kenton Varda's avatar
Kenton Varda committed
1496 1497 1498 1499 1500 1501 1502
      auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
      for (auto& op: ops) {
        copy.add(op);
      }
      return getPipelinedCap(copy.finish());
    }

1503
    kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override {
1504
      if (state.is<Waiting>()) {
1505 1506
        // Wrap a PipelineClient in a PromiseClient.
        auto pipelineClient = kj::refcounted<PipelineClient>(
1507
            *connectionState, kj::addRef(*state.get<Waiting>()), kj::heapArray(ops.asPtr()));
1508

1509
        KJ_IF_MAYBE(r, redirectLater) {
1510 1511 1512 1513
          auto resolutionPromise = r->addBranch().then(kj::mvCapture(ops,
              [](kj::Array<PipelineOp> ops, kj::Own<RpcResponse>&& response) {
                return response->getResults().getPipelinedCap(ops);
              }));
1514

1515 1516 1517 1518 1519 1520
          return kj::refcounted<PromiseClient>(
              *connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise), nullptr);
        } else {
          // Oh, this pipeline will never get redirected, so just return the PipelineClient.
          return kj::mv(pipelineClient);
        }
1521 1522
      } else if (state.is<Resolved>()) {
        return state.get<Resolved>()->getResults().getPipelinedCap(ops);
Kenton Varda's avatar
Kenton Varda committed
1523
      } else {
1524
        return newBrokenCap(kj::cp(state.get<Broken>()));
Kenton Varda's avatar
Kenton Varda committed
1525
      }
Kenton Varda's avatar
Kenton Varda committed
1526 1527 1528
    }

  private:
1529 1530
    kj::Own<RpcConnectionState> connectionState;
    kj::Maybe<kj::ForkedPromise<kj::Own<RpcResponse>>> redirectLater;
Kenton Varda's avatar
Kenton Varda committed
1531

1532 1533
    typedef kj::Own<QuestionRef> Waiting;
    typedef kj::Own<RpcResponse> Resolved;
Kenton Varda's avatar
Kenton Varda committed
1534
    typedef kj::Exception Broken;
1535
    kj::OneOf<Waiting, Resolved, Broken> state;
Kenton Varda's avatar
Kenton Varda committed
1536 1537 1538 1539 1540

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

1541
    void resolve(kj::Own<RpcResponse>&& response) {
1542 1543
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Resolved>(kj::mv(response));
Kenton Varda's avatar
Kenton Varda committed
1544 1545 1546
    }

    void resolve(const kj::Exception&& exception) {
1547 1548
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Broken>(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1549
    }
Kenton Varda's avatar
Kenton Varda committed
1550 1551
  };

1552 1553
  class RpcResponse: public ResponseHook {
  public:
1554
    virtual AnyPointer::Reader getResults() = 0;
1555
    virtual kj::Own<RpcResponse> addRef() = 0;
1556 1557 1558
  };

  class RpcResponseImpl final: public RpcResponse, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1559
  public:
1560
    RpcResponseImpl(RpcConnectionState& connectionState,
1561 1562
                    kj::Own<QuestionRef>&& questionRef,
                    kj::Own<IncomingRpcMessage>&& message,
1563
                    AnyPointer::Reader results)
1564 1565
        : connectionState(kj::addRef(connectionState)),
          message(kj::mv(message)),
1566
          reader(results),
Kenton Varda's avatar
Kenton Varda committed
1567
          questionRef(kj::mv(questionRef)) {}
Kenton Varda's avatar
Kenton Varda committed
1568

1569
    AnyPointer::Reader getResults() override {
Kenton Varda's avatar
Kenton Varda committed
1570 1571 1572
      return reader;
    }

1573
    kj::Own<RpcResponse> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1574 1575 1576
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
1577
  private:
1578
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1579
    kj::Own<IncomingRpcMessage> message;
1580
    AnyPointer::Reader reader;
1581
    kj::Own<QuestionRef> questionRef;
Kenton Varda's avatar
Kenton Varda committed
1582 1583 1584 1585 1586 1587
  };

  // =====================================================================================
  // CallContextHook implementation

  class RpcServerResponse {
Kenton Varda's avatar
Kenton Varda committed
1588
  public:
1589
    virtual AnyPointer::Builder getResultsBuilder() = 0;
1590 1591 1592 1593
  };

  class RpcServerResponseImpl final: public RpcServerResponse {
  public:
1594
    RpcServerResponseImpl(RpcConnectionState& connectionState,
1595
                          kj::Own<OutgoingRpcMessage>&& message,
1596 1597 1598
                          rpc::Payload::Builder payload)
        : connectionState(connectionState),
          message(kj::mv(message)),
1599
          payload(payload) {}
Kenton Varda's avatar
Kenton Varda committed
1600

1601
    AnyPointer::Builder getResultsBuilder() override {
1602
      return payload.getContent();
Kenton Varda's avatar
Kenton Varda committed
1603 1604
    }

1605 1606 1607 1608 1609
    kj::Maybe<kj::Array<ExportId>> send() {
      // Send the response and return the export list.  Returns nullptr if there were no caps.
      // (Could return a non-null empty array if there were caps but none of them were exports.)

      // Build the cap table.
1610
      auto capTable = message->getCapTable();
1611 1612
      auto exports = connectionState.writeDescriptors(capTable, payload);

1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623
      // Capabilities that we are returning are subject to embargos. See `Disembargo` in rpc.capnp.
      // As explained there, in order to deal with the Tribble 4-way race condition, we need to
      // make sure that if we're returning any remote promises, that we ignore any subsequent
      // resolution of those promises for the purpose of pipelined requests on this answer. Luckily,
      // we can modify the cap table in-place.
      for (auto& slot: capTable) {
        KJ_IF_MAYBE(cap, slot) {
          slot = connectionState.getInnermostClient(**cap);
        }
      }

Kenton Varda's avatar
Kenton Varda committed
1624
      message->send();
1625 1626 1627 1628 1629
      if (capTable.size() == 0) {
        return nullptr;
      } else {
        return kj::mv(exports);
      }
Kenton Varda's avatar
Kenton Varda committed
1630 1631 1632
    }

  private:
1633
    RpcConnectionState& connectionState;
Kenton Varda's avatar
Kenton Varda committed
1634
    kj::Own<OutgoingRpcMessage> message;
1635
    rpc::Payload::Builder payload;
Kenton Varda's avatar
Kenton Varda committed
1636 1637
  };

1638 1639 1640
  class LocallyRedirectedRpcResponse final
      : public RpcResponse, public RpcServerResponse, public kj::Refcounted{
  public:
1641
    LocallyRedirectedRpcResponse(kj::Maybe<MessageSize> sizeHint)
1642 1643
        : message(sizeHint.map([](MessageSize size) { return size.wordCount; })
                          .orDefault(SUGGESTED_FIRST_SEGMENT_WORDS)) {}
1644

1645
    AnyPointer::Builder getResultsBuilder() override {
1646
      return message.getRoot<AnyPointer>();
1647 1648
    }

1649
    AnyPointer::Reader getResults() override {
1650
      return message.getRoot<AnyPointer>();
1651 1652
    }

1653
    kj::Own<RpcResponse> addRef() override {
1654 1655 1656 1657
      return kj::addRef(*this);
    }

  private:
1658
    MallocMessageBuilder message;
1659 1660
  };

Kenton Varda's avatar
Kenton Varda committed
1661 1662
  class RpcCallContext final: public CallContextHook, public kj::Refcounted {
  public:
1663
    RpcCallContext(RpcConnectionState& connectionState, AnswerId answerId,
1664
                   kj::Own<IncomingRpcMessage>&& request, const AnyPointer::Reader& params,
1665
                   bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller)
1666
        : connectionState(kj::addRef(connectionState)),
1667
          answerId(answerId),
Kenton Varda's avatar
Kenton Varda committed
1668
          request(kj::mv(request)),
1669
          params(params),
1670
          returnMessage(nullptr),
Kenton Varda's avatar
Kenton Varda committed
1671 1672
          redirectResults(redirectResults),
          cancelFulfiller(kj::mv(cancelFulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1673

1674 1675 1676 1677
    ~RpcCallContext() noexcept(false) {
      if (isFirstResponder()) {
        // We haven't sent a return yet, so we must have been canceled.  Send a cancellation return.
        unwindDetector.catchExceptionsIfUnwinding([&]() {
Kenton Varda's avatar
Kenton Varda committed
1678
          // Don't send anything if the connection is broken.
1679 1680
          if (connectionState->connection.is<Connected>()) {
            auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1681
                messageSizeHint<rpc::Return>() + sizeInWords<rpc::Payload>());
Kenton Varda's avatar
Kenton Varda committed
1682
            auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1683

1684
            builder.setAnswerId(answerId);
1685
            builder.setReleaseParamCaps(false);
1686

Kenton Varda's avatar
Kenton Varda committed
1687 1688 1689 1690 1691 1692 1693 1694 1695
            if (redirectResults) {
              // The reason we haven't sent a return is because the results were sent somewhere
              // else.
              builder.setResultsSentElsewhere();
            } else {
              builder.setCanceled();
            }

            message->send();
1696
          }
1697

1698
          cleanupAnswerTable(nullptr, true);
1699 1700 1701 1702
        });
      }
    }

1703
    kj::Own<RpcResponse> consumeRedirectedResponse() {
1704 1705
      KJ_ASSERT(redirectResults);

1706
      if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
1707 1708 1709 1710 1711 1712

      // Note that the context needs to keep its own reference to the response so that it doesn't
      // get GC'd until the PipelineHook drops its reference to the context.
      return kj::downcast<LocallyRedirectedRpcResponse>(*KJ_ASSERT_NONNULL(response)).addRef();
    }

Kenton Varda's avatar
Kenton Varda committed
1713
    void sendReturn() {
1714
      KJ_ASSERT(!redirectResults);
1715 1716 1717 1718

      // Avoid sending results if canceled so that we don't have to figure out whether or not
      // `releaseResultCaps` was set in the already-received `Finish`.
      if (!(cancellationFlags & CANCEL_REQUESTED) && isFirstResponder()) {
1719 1720 1721 1722 1723
        KJ_ASSERT(connectionState->connection.is<Connected>(),
                  "Cancellation should have been requested on disconnect.") {
          return;
        }

1724
        if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
Kenton Varda's avatar
Kenton Varda committed
1725

1726
        returnMessage.setAnswerId(answerId);
1727
        returnMessage.setReleaseParamCaps(false);
Kenton Varda's avatar
Kenton Varda committed
1728

1729 1730 1731 1732 1733 1734 1735 1736
        auto exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send();
        KJ_IF_MAYBE(e, exports) {
          // Caps were returned, so we can't free the pipeline yet.
          cleanupAnswerTable(kj::mv(*e), false);
        } else {
          // No caps in the results, therefore the pipeline is irrelevant.
          cleanupAnswerTable(nullptr, true);
        }
Kenton Varda's avatar
Kenton Varda committed
1737 1738 1739
      }
    }
    void sendErrorReturn(kj::Exception&& exception) {
1740
      KJ_ASSERT(!redirectResults);
Kenton Varda's avatar
Kenton Varda committed
1741
      if (isFirstResponder()) {
1742 1743 1744 1745
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
              messageSizeHint<rpc::Return>() + exceptionSizeHint(exception));
          auto builder = message->getBody().initAs<rpc::Message>().initReturn();
Kenton Varda's avatar
Kenton Varda committed
1746

1747 1748 1749
          builder.setAnswerId(answerId);
          builder.setReleaseParamCaps(false);
          fromException(exception, builder.initException());
Kenton Varda's avatar
Kenton Varda committed
1750

1751 1752
          message->send();
        }
1753 1754 1755 1756

        // Do not allow releasing the pipeline because we want pipelined calls to propagate the
        // exception rather than fail with a "no such field" exception.
        cleanupAnswerTable(nullptr, false);
Kenton Varda's avatar
Kenton Varda committed
1757 1758 1759
      }
    }

1760
    void requestCancel() {
Kenton Varda's avatar
Kenton Varda committed
1761 1762 1763 1764 1765 1766
      // Hints that the caller wishes to cancel this call.  At the next time when cancellation is
      // deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes
      // safe, the RpcCallContext will send a normal return when the call completes.  Either way
      // the RpcCallContext is now responsible for cleaning up the entry in the answer table, since
      // a Finish message was already received.

1767 1768 1769 1770
      bool previouslyAllowedButNotRequested = cancellationFlags == CANCEL_ALLOWED;
      cancellationFlags |= CANCEL_REQUESTED;

      if (previouslyAllowedButNotRequested) {
Kenton Varda's avatar
Kenton Varda committed
1771
        // We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously.  Initiate
Kenton Varda's avatar
Kenton Varda committed
1772
        // the cancellation.
Kenton Varda's avatar
Kenton Varda committed
1773
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1774
      }
Kenton Varda's avatar
Kenton Varda committed
1775
    }
1776 1777 1778

    // implements CallContextHook ------------------------------------

1779
    AnyPointer::Reader getParams() override {
1780 1781 1782 1783 1784 1785
      KJ_REQUIRE(request != nullptr, "Can't call getParams() after releaseParams().");
      return params;
    }
    void releaseParams() override {
      request = nullptr;
    }
1786
    AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
1787
      KJ_IF_MAYBE(r, response) {
1788
        return r->get()->getResultsBuilder();
1789
      } else {
1790 1791
        kj::Own<RpcServerResponse> response;

1792
        if (redirectResults || !connectionState->connection.is<Connected>()) {
1793
          response = kj::refcounted<LocallyRedirectedRpcResponse>(sizeHint);
1794
        } else {
1795
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1796 1797
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Return>() +
                               sizeInWords<rpc::Payload>()));
1798 1799 1800 1801 1802 1803
          returnMessage = message->getBody().initAs<rpc::Message>().initReturn();
          response = kj::heap<RpcServerResponseImpl>(
              *connectionState, kj::mv(message), returnMessage.getResults());
        }

        auto results = response->getResultsBuilder();
Kenton Varda's avatar
Kenton Varda committed
1804 1805
        this->response = kj::mv(response);
        return results;
1806 1807
      }
    }
1808 1809 1810
    kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
      auto result = directTailCall(kj::mv(request));
      KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
1811
        f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
1812 1813 1814 1815
      }
      return kj::mv(result.promise);
    }
    ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
1816 1817 1818
      KJ_REQUIRE(response == nullptr,
                 "Can't call tailCall() after initializing the results struct.");

1819
      if (request->getBrand() == connectionState.get() && !redirectResults) {
1820 1821 1822 1823 1824
        // The tail call is headed towards the peer that called us in the first place, so we can
        // optimize out the return trip.

        KJ_IF_MAYBE(tailInfo, kj::downcast<RpcRequest>(*request).tailSend()) {
          if (isFirstResponder()) {
1825 1826 1827 1828
            if (connectionState->connection.is<Connected>()) {
              auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
                  messageSizeHint<rpc::Return>());
              auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1829

1830 1831 1832
              builder.setAnswerId(answerId);
              builder.setReleaseParamCaps(false);
              builder.setTakeFromOtherQuestion(tailInfo->questionId);
1833

1834 1835
              message->send();
            }
1836 1837 1838

            // There are no caps in our return message, but of course the tail results could have
            // caps, so we must continue to honor pipeline calls (and just bounce them back).
1839
            cleanupAnswerTable(nullptr, false);
1840
          }
1841
          return { kj::mv(tailInfo->promise), kj::mv(tailInfo->pipeline) };
1842 1843 1844 1845 1846 1847 1848
        }
      }

      // Just forwarding to another local call.
      auto promise = request->send();

      // Wait for response.
1849
      auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
1850 1851 1852
        // Copy the response.
        // TODO(perf):  It would be nice if we could somehow make the response get built in-place
        //   but requires some refactoring.
1853
        getResults(tailResponse.targetSize()).set(tailResponse);
1854
      });
1855 1856

      return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
1857
    }
1858 1859
    kj::Promise<AnyPointer::Pipeline> onTailCall() override {
      auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
1860 1861 1862
      tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
      return kj::mv(paf.promise);
    }
1863
    void allowCancellation() override {
1864 1865 1866 1867
      bool previouslyRequestedButNotAllowed = cancellationFlags == CANCEL_REQUESTED;
      cancellationFlags |= CANCEL_ALLOWED;

      if (previouslyRequestedButNotAllowed) {
Kenton Varda's avatar
Kenton Varda committed
1868 1869 1870
        // We just set CANCEL_ALLOWED, and CANCEL_REQUESTED was already set previously.  Initiate
        // the cancellation.
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1871
      }
1872 1873 1874 1875 1876 1877
    }
    kj::Own<CallContextHook> addRef() override {
      return kj::addRef(*this);
    }

  private:
1878
    kj::Own<RpcConnectionState> connectionState;
1879
    AnswerId answerId;
1880

Kenton Varda's avatar
Kenton Varda committed
1881 1882 1883
    // Request ---------------------------------------------

    kj::Maybe<kj::Own<IncomingRpcMessage>> request;
1884
    AnyPointer::Reader params;
Kenton Varda's avatar
Kenton Varda committed
1885

Kenton Varda's avatar
Kenton Varda committed
1886 1887 1888
    // Response --------------------------------------------

    kj::Maybe<kj::Own<RpcServerResponse>> response;
Kenton Varda's avatar
Kenton Varda committed
1889
    rpc::Return::Builder returnMessage;
1890
    bool redirectResults = false;
Kenton Varda's avatar
Kenton Varda committed
1891
    bool responseSent = false;
1892
    kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1893 1894 1895 1896 1897 1898 1899 1900

    // Cancellation state ----------------------------------

    enum CancellationFlags {
      CANCEL_REQUESTED = 1,
      CANCEL_ALLOWED = 2
    };

1901
    uint8_t cancellationFlags = 0;
1902
    // When both flags are set, the cancellation process will begin.
Kenton Varda's avatar
Kenton Varda committed
1903

1904
    kj::Own<kj::PromiseFulfiller<void>> cancelFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1905 1906 1907
    // Fulfilled when cancellation has been both requested and permitted.  The fulfilled promise is
    // exclusive-joined with the outermost promise waiting on the call return, so fulfilling it
    // cancels that promise.
Kenton Varda's avatar
Kenton Varda committed
1908

1909 1910
    kj::UnwindDetector unwindDetector;

Kenton Varda's avatar
Kenton Varda committed
1911 1912 1913 1914 1915 1916 1917
    // -----------------------------------------------------

    bool isFirstResponder() {
      if (responseSent) {
        return false;
      } else {
        responseSent = true;
1918 1919 1920
        return true;
      }
    }
Kenton Varda's avatar
Kenton Varda committed
1921

1922
    void cleanupAnswerTable(kj::Array<ExportId> resultExports, bool shouldFreePipeline) {
1923 1924 1925
      // We need to remove the `callContext` pointer -- which points back to us -- from the
      // answer table.  Or we might even be responsible for removing the entire answer table
      // entry.
Kenton Varda's avatar
Kenton Varda committed
1926

1927
      if (cancellationFlags & CANCEL_REQUESTED) {
1928 1929 1930
        // Already received `Finish` so it's our job to erase the table entry. We shouldn't have
        // sent results if canceled, so we shouldn't have an export list to deal with.
        KJ_ASSERT(resultExports.size() == 0);
1931
        connectionState->answers.erase(answerId);
1932
      } else {
1933
        // We just have to null out callContext and set the exports.
1934
        auto& answer = connectionState->answers[answerId];
1935
        answer.callContext = nullptr;
1936 1937 1938 1939 1940 1941
        answer.resultExports = kj::mv(resultExports);

        if (shouldFreePipeline) {
          // We can free the pipeline early, because we know all pipeline calls are invalid (e.g.
          // because there are no caps in the result to receive pipeline requests).
          KJ_ASSERT(resultExports.size() == 0);
Kenton Varda's avatar
Kenton Varda committed
1942
          answer.pipeline = nullptr;
Kenton Varda's avatar
Kenton Varda committed
1943 1944 1945
        }
      }
    }
1946 1947
  };

Kenton Varda's avatar
Kenton Varda committed
1948 1949 1950
  // =====================================================================================
  // Message handling

1951
  kj::Promise<void> messageLoop() {
1952 1953 1954 1955 1956
    if (!connection.is<Connected>()) {
      return kj::READY_NOW;
    }

    return connection.get<Connected>()->receiveIncomingMessage().then(
1957
        [this](kj::Maybe<kj::Own<IncomingRpcMessage>>&& message) {
1958 1959
      KJ_IF_MAYBE(m, message) {
        handleMessage(kj::mv(*m));
1960
        return true;
1961
      } else {
1962
        disconnect(KJ_EXCEPTION(DISCONNECTED, "Peer disconnected."));
1963
        return false;
1964
      }
1965
    }).then([this](bool keepGoing) {
1966 1967 1968 1969
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
1970
      if (keepGoing) tasks.add(messageLoop());
1971
    });
1972 1973 1974 1975
  }

  void handleMessage(kj::Own<IncomingRpcMessage> message) {
    auto reader = message->getBody().getAs<rpc::Message>();
1976

1977 1978
    switch (reader.which()) {
      case rpc::Message::UNIMPLEMENTED:
Kenton Varda's avatar
Kenton Varda committed
1979
        handleUnimplemented(reader.getUnimplemented());
1980 1981 1982
        break;

      case rpc::Message::ABORT:
Kenton Varda's avatar
Kenton Varda committed
1983
        handleAbort(reader.getAbort());
1984 1985
        break;

1986 1987 1988 1989
      case rpc::Message::BOOTSTRAP:
        handleBootstrap(kj::mv(message), reader.getBootstrap());
        break;

1990
      case rpc::Message::CALL:
Kenton Varda's avatar
Kenton Varda committed
1991
        handleCall(kj::mv(message), reader.getCall());
1992 1993
        break;

Kenton Varda's avatar
Kenton Varda committed
1994
      case rpc::Message::RETURN:
Kenton Varda's avatar
Kenton Varda committed
1995
        handleReturn(kj::mv(message), reader.getReturn());
Kenton Varda's avatar
Kenton Varda committed
1996 1997 1998
        break;

      case rpc::Message::FINISH:
Kenton Varda's avatar
Kenton Varda committed
1999
        handleFinish(reader.getFinish());
Kenton Varda's avatar
Kenton Varda committed
2000 2001
        break;

Kenton Varda's avatar
Kenton Varda committed
2002
      case rpc::Message::RESOLVE:
2003
        handleResolve(reader.getResolve());
Kenton Varda's avatar
Kenton Varda committed
2004 2005 2006
        break;

      case rpc::Message::RELEASE:
2007
        handleRelease(reader.getRelease());
Kenton Varda's avatar
Kenton Varda committed
2008 2009
        break;

2010
      case rpc::Message::DISEMBARGO:
Kenton Varda's avatar
Kenton Varda committed
2011
        handleDisembargo(reader.getDisembargo());
2012 2013
        break;

2014
      default: {
2015 2016 2017 2018 2019 2020
        if (connection.is<Connected>()) {
          auto message = connection.get<Connected>()->newOutgoingMessage(
              firstSegmentSize(reader.totalSize(), messageSizeHint<void>()));
          message->getBody().initAs<rpc::Message>().setUnimplemented(reader);
          message->send();
        }
2021 2022 2023 2024 2025
        break;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2026
  void handleUnimplemented(const rpc::Message::Reader& message) {
Kenton Varda's avatar
Kenton Varda committed
2027
    switch (message.which()) {
2028 2029 2030
      case rpc::Message::RESOLVE: {
        auto cap = message.getResolve().getCap();
        switch (cap.which()) {
2031 2032 2033
          case rpc::CapDescriptor::NONE:
            // Nothing to do (but this ought never to happen).
            break;
2034
          case rpc::CapDescriptor::SENDER_HOSTED:
2035
            releaseExport(cap.getSenderHosted(), 1);
2036 2037
            break;
          case rpc::CapDescriptor::SENDER_PROMISE:
2038
            releaseExport(cap.getSenderPromise(), 1);
2039 2040 2041 2042 2043 2044
            break;
          case rpc::CapDescriptor::RECEIVER_ANSWER:
          case rpc::CapDescriptor::RECEIVER_HOSTED:
            // Nothing to do.
            break;
          case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
2045
            releaseExport(cap.getThirdPartyHosted().getVineId(), 1);
2046 2047
            break;
        }
Kenton Varda's avatar
Kenton Varda committed
2048
        break;
2049
      }
Kenton Varda's avatar
Kenton Varda committed
2050 2051 2052 2053 2054

      default:
        KJ_FAIL_ASSERT("Peer did not implement required RPC message type.", (uint)message.which());
        break;
    }
2055 2056
  }

Kenton Varda's avatar
Kenton Varda committed
2057
  void handleAbort(const rpc::Exception::Reader& exception) {
2058 2059 2060
    kj::throwRecoverableException(toException(exception));
  }

2061 2062 2063
  // ---------------------------------------------------------------------------
  // Level 0

2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146
  class SingleCapPipeline: public PipelineHook, public kj::Refcounted {
  public:
    SingleCapPipeline(kj::Own<ClientHook>&& cap)
        : cap(kj::mv(cap)) {}

    kj::Own<PipelineHook> addRef() override {
      return kj::addRef(*this);
    }

    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
      if (ops.size() == 0) {
        return cap->addRef();
      } else {
        return newBrokenCap("Invalid pipeline transform.");
      }
    }

  private:
    kj::Own<ClientHook> cap;
  };

  void handleBootstrap(kj::Own<IncomingRpcMessage>&& message,
                       const rpc::Bootstrap::Reader& bootstrap) {
    AnswerId answerId = bootstrap.getQuestionId();

    if (!connection.is<Connected>()) {
      // Disconnected; ignore.
      return;
    }

    auto response = connection.get<Connected>()->newOutgoingMessage(
        messageSizeHint<rpc::Return>() + sizeInWords<rpc::CapDescriptor>() + 32);

    rpc::Return::Builder ret = response->getBody().getAs<rpc::Message>().initReturn();
    ret.setAnswerId(answerId);

    kj::Own<ClientHook> capHook;
    kj::Array<ExportId> resultExports;
    KJ_DEFER(releaseExports(resultExports));  // in case something goes wrong

    // Call the restorer and initialize the answer.
    KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
      Capability::Client cap = nullptr;
      KJ_IF_MAYBE(r, restorer) {
        cap = r->baseRestore(bootstrap.getDeprecatedObjectId());
      } else KJ_IF_MAYBE(b, bootstrapInterface) {
        if (bootstrap.hasDeprecatedObjectId()) {
          KJ_FAIL_REQUIRE("This vat only supports a bootstrap interface, not the old "
                          "Cap'n-Proto-0.4-style named exports.") { return; }
        } else {
          cap = *b;
        }
      } else {
        KJ_FAIL_REQUIRE("This vat does not expose any public/bootstrap interfaces.") { return; }
      }

      auto payload = ret.initResults();
      payload.getContent().setAs<Capability>(kj::mv(cap));

      auto capTable = response->getCapTable();
      KJ_DASSERT(capTable.size() == 1);
      resultExports = writeDescriptors(capTable, payload);
      capHook = KJ_ASSERT_NONNULL(capTable[0])->addRef();
    })) {
      fromException(*exception, ret.initException());
      capHook = newBrokenCap(kj::mv(*exception));
    }

    message = nullptr;

    // Add the answer to the answer table for pipelining and send the response.
    auto& answer = answers[answerId];
    KJ_REQUIRE(!answer.active, "questionId is already in use") {
      return;
    }

    answer.resultExports = kj::mv(resultExports);
    answer.active = true;
    answer.pipeline = kj::Own<PipelineHook>(kj::refcounted<SingleCapPipeline>(kj::mv(capHook)));

    response->send();
  }

Kenton Varda's avatar
Kenton Varda committed
2147
  void handleCall(kj::Own<IncomingRpcMessage>&& message, const rpc::Call::Reader& call) {
2148
    kj::Own<ClientHook> capability;
2149

Kenton Varda's avatar
Kenton Varda committed
2150 2151 2152 2153 2154
    KJ_IF_MAYBE(t, getMessageTarget(call.getTarget())) {
      capability = kj::mv(*t);
    } else {
      // Exception already reported.
      return;
2155 2156
    }

2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168
    bool redirectResults;
    switch (call.getSendResultsTo().which()) {
      case rpc::Call::SendResultsTo::CALLER:
        redirectResults = false;
        break;
      case rpc::Call::SendResultsTo::YOURSELF:
        redirectResults = true;
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported `Call.sendResultsTo`.") { return; }
    }

2169
    auto payload = call.getParams();
2170
    message->initCapTable(receiveCaps(payload.getCapTable()));
Kenton Varda's avatar
Kenton Varda committed
2171 2172
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

2173
    AnswerId answerId = call.getQuestionId();
Kenton Varda's avatar
Kenton Varda committed
2174

2175
    auto context = kj::refcounted<RpcCallContext>(
2176
        *this, answerId, kj::mv(message), payload.getContent(),
2177
        redirectResults, kj::mv(cancelPaf.fulfiller));
2178

2179
    // No more using `call` after this point, as it now belongs to the context.
2180 2181

    {
2182
      auto& answer = answers[answerId];
2183 2184 2185 2186 2187 2188

      KJ_REQUIRE(!answer.active, "questionId is already in use") {
        return;
      }

      answer.active = true;
Kenton Varda's avatar
Kenton Varda committed
2189
      answer.callContext = *context;
Kenton Varda's avatar
Kenton Varda committed
2190 2191
    }

2192 2193
    auto promiseAndPipeline = startCall(
        call.getInterfaceId(), call.getMethodId(), kj::mv(capability), context->addRef());
Kenton Varda's avatar
Kenton Varda committed
2194

2195
    // Things may have changed -- in particular if startCall() immediately called
Kenton Varda's avatar
Kenton Varda committed
2196 2197 2198
    // context->directTailCall().

    {
2199
      auto& answer = answers[answerId];
Kenton Varda's avatar
Kenton Varda committed
2200

2201 2202
      answer.pipeline = kj::mv(promiseAndPipeline.pipeline);

2203
      if (redirectResults) {
Kenton Varda's avatar
Kenton Varda committed
2204
        auto resultsPromise = promiseAndPipeline.promise.then(
2205 2206 2207
            kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) {
              return context->consumeRedirectedResponse();
            }));
Kenton Varda's avatar
Kenton Varda committed
2208 2209

        // If the call that later picks up `redirectedResults` decides to discard it, we need to
2210
        // make sure our call is not itself canceled unless it has called allowCancellation().
Kenton Varda's avatar
Kenton Varda committed
2211 2212
        // So we fork the promise and join one branch with the cancellation promise, in order to
        // hold on to it.
2213
        auto forked = resultsPromise.fork();
Kenton Varda's avatar
Kenton Varda committed
2214 2215
        answer.redirectedResults = forked.addBranch();

2216 2217 2218
        cancelPaf.promise
            .exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}))
            .detach([](kj::Exception&&) {});
2219 2220 2221 2222 2223
      } else {
        // Hack:  Both the success and error continuations need to use the context.  We could
        //   refcount, but both will be destroyed at the same time anyway.
        RpcCallContext* contextPtr = context;

2224
        promiseAndPipeline.promise.then(
2225 2226 2227 2228 2229 2230 2231
            [contextPtr]() {
              contextPtr->sendReturn();
            }, [contextPtr](kj::Exception&& exception) {
              contextPtr->sendErrorReturn(kj::mv(exception));
            }).then([]() {}, [&](kj::Exception&& exception) {
              // Handle exceptions that occur in sendReturn()/sendErrorReturn().
              taskFailed(kj::mv(exception));
2232 2233 2234
            }).attach(kj::mv(context))
            .exclusiveJoin(kj::mv(cancelPaf.promise))
            .detach([](kj::Exception&&) {});
2235
      }
2236 2237 2238
    }
  }

2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265
  ClientHook::VoidPromiseAndPipeline startCall(
      uint64_t interfaceId, uint64_t methodId,
      kj::Own<ClientHook>&& capability, kj::Own<CallContextHook>&& context) {
    if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
      KJ_IF_MAYBE(g, gateway) {
        // Wait, this is a call to Persistent.save() and we need to translate it through our
        // gateway.

        auto params = context->getParams().getAs<Persistent<>::SaveParams>();

        auto requestSize = params.totalSize();
        ++requestSize.capCount;
        requestSize.wordCount += sizeInWords<RealmGateway<>::ExportParams>();

        auto request = g->exportRequest(requestSize);
        request.setCap(Persistent<>::Client(capability->addRef()));
        request.setParams(params);

        context->allowCancellation();
        context->releaseParams();
        return context->directTailCall(RequestHook::from(kj::mv(request)));
      }
    }

    return capability->call(interfaceId, methodId, context->addRef());
  }

2266
  kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) {
Kenton Varda's avatar
Kenton Varda committed
2267
    switch (target.which()) {
2268 2269
      case rpc::MessageTarget::IMPORTED_CAP: {
        KJ_IF_MAYBE(exp, exports.find(target.getImportedCap())) {
Kenton Varda's avatar
Kenton Varda committed
2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280
          return exp->clientHook->addRef();
        } else {
          KJ_FAIL_REQUIRE("Message target is not a current export ID.") {
            return nullptr;
          }
        }
        break;
      }

      case rpc::MessageTarget::PROMISED_ANSWER: {
        auto promisedAnswer = target.getPromisedAnswer();
2281
        kj::Own<PipelineHook> pipeline;
Kenton Varda's avatar
Kenton Varda committed
2282

2283 2284 2285 2286 2287 2288 2289 2290 2291
        auto& base = answers[promisedAnswer.getQuestionId()];
        KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") {
          return nullptr;
        }
        KJ_IF_MAYBE(p, base.pipeline) {
          pipeline = p->get()->addRef();
        } else {
          KJ_FAIL_REQUIRE("PromisedAnswer.questionId is already finished or contained no "
                          "capabilities.") {
Kenton Varda's avatar
Kenton Varda committed
2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308
            return nullptr;
          }
        }

        KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
          return pipeline->getPipelinedCap(*ops);
        } else {
          // Exception already thrown.
          return nullptr;
        }
      }

      default:
        KJ_FAIL_REQUIRE("Unknown message target type.", target) {
          return nullptr;
        }
    }
2309 2310

    KJ_UNREACHABLE;
Kenton Varda's avatar
Kenton Varda committed
2311 2312
  }

Kenton Varda's avatar
Kenton Varda committed
2313
  void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
Kenton Varda's avatar
Kenton Varda committed
2314 2315
    // Transitive destructors can end up manipulating the question table and invalidating our
    // pointer into it, so make sure these destructors run later.
2316 2317
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2318
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease;
2319

2320
    KJ_IF_MAYBE(question, questions.find(ret.getAnswerId())) {
2321 2322
      KJ_REQUIRE(question->isAwaitingReturn, "Duplicate Return.") { return; }
      question->isAwaitingReturn = false;
Kenton Varda's avatar
Kenton Varda committed
2323

2324 2325
      if (ret.getReleaseParamCaps()) {
        exportsToRelease = kj::mv(question->paramExports);
Kenton Varda's avatar
Kenton Varda committed
2326
      } else {
2327
        question->paramExports = nullptr;
Kenton Varda's avatar
Kenton Varda committed
2328
      }
2329

2330 2331
      KJ_IF_MAYBE(questionRef, question->selfRef) {
        switch (ret.which()) {
2332
          case rpc::Return::RESULTS: {
2333 2334 2335
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `results`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2336
            }
Kenton Varda's avatar
Kenton Varda committed
2337

2338
            auto payload = ret.getResults();
2339
            message->initCapTable(receiveCaps(payload.getCapTable()));
2340
            questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
2341
                *this, kj::addRef(*questionRef), kj::mv(message), payload.getContent()));
2342
            break;
2343
          }
2344

2345 2346 2347 2348
          case rpc::Return::EXCEPTION:
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2349
            }
2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361

            questionRef->reject(toException(ret.getException()));
            break;

          case rpc::Return::CANCELED:
            KJ_FAIL_REQUIRE("Return message falsely claims call was canceled.") { return; }
            break;

          case rpc::Return::RESULTS_SENT_ELSEWHERE:
            KJ_REQUIRE(question->isTailCall,
                "`Return` had `resultsSentElsewhere` but this was not a tail call.") {
              return;
2362 2363
            }

2364 2365 2366 2367
            // Tail calls are fulfilled with a null pointer.
            questionRef->fulfill(kj::Own<RpcResponse>());
            break;

2368 2369
          case rpc::Return::TAKE_FROM_OTHER_QUESTION:
            KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2370 2371 2372 2373 2374
              KJ_IF_MAYBE(response, answer->redirectedResults) {
                questionRef->fulfill(kj::mv(*response));
              } else {
                KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` referenced a call that did not "
                                "use `sendResultsTo.yourself`.") { return; }
2375 2376
              }
            } else {
2377
              KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` had invalid answer ID.") { return; }
2378 2379
            }

2380
            break;
2381

2382 2383 2384 2385
          default:
            KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; }
        }
      } else {
2386
        if (ret.isTakeFromOtherQuestion()) {
2387
          // Be sure to release the tail call's promise.
2388
          KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2389 2390 2391
            promiseToRelease = kj::mv(answer->redirectedResults);
          }
        }
Kenton Varda's avatar
Kenton Varda committed
2392

2393 2394
        // Looks like this question was canceled earlier, so `Finish` was already sent, with
        // `releaseResultCaps` set true so that we don't have to release them here.  We can go
2395
        // ahead and delete it from the table.
2396
        questions.erase(ret.getAnswerId(), *question);
Kenton Varda's avatar
Kenton Varda committed
2397
      }
Kenton Varda's avatar
Kenton Varda committed
2398

Kenton Varda's avatar
Kenton Varda committed
2399 2400 2401 2402 2403
    } else {
      KJ_FAIL_REQUIRE("Invalid question ID in Return message.") { return; }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2404
  void handleFinish(const rpc::Finish::Reader& finish) {
Kenton Varda's avatar
Kenton Varda committed
2405 2406
    // Delay release of these things until return so that transitive destructors don't accidentally
    // modify the answer table and invalidate our pointer into it.
2407 2408
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2409
    Answer answerToRelease;
2410
    kj::Maybe<kj::Own<PipelineHook>> pipelineToRelease;
Kenton Varda's avatar
Kenton Varda committed
2411

2412
    KJ_IF_MAYBE(answer, answers.find(finish.getQuestionId())) {
2413
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2414

2415 2416 2417 2418
      if (finish.getReleaseResultCaps()) {
        exportsToRelease = kj::mv(answer->resultExports);
      } else {
        answer->resultExports = nullptr;
2419
      }
Kenton Varda's avatar
Kenton Varda committed
2420

2421 2422
      pipelineToRelease = kj::mv(answer->pipeline);

2423 2424 2425 2426 2427
      // If the call isn't actually done yet, cancel it.  Otherwise, we can go ahead and erase the
      // question from the table.
      KJ_IF_MAYBE(context, answer->callContext) {
        context->requestCancel();
      } else {
Kenton Varda's avatar
Kenton Varda committed
2428
        answerToRelease = answers.erase(finish.getQuestionId());
2429
      }
Kenton Varda's avatar
Kenton Varda committed
2430
    } else {
2431
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2432
    }
2433 2434
  }

2435 2436 2437
  // ---------------------------------------------------------------------------
  // Level 1

2438
  void handleResolve(const rpc::Resolve::Reader& resolve) {
2439
    kj::Own<ClientHook> replacement;
2440
    kj::Maybe<kj::Exception> exception;
2441 2442 2443 2444

    // Extract the replacement capability.
    switch (resolve.which()) {
      case rpc::Resolve::CAP:
2445 2446 2447 2448 2449
        KJ_IF_MAYBE(cap, receiveCap(resolve.getCap())) {
          replacement = kj::mv(*cap);
        } else {
          KJ_FAIL_REQUIRE("'Resolve' contained 'CapDescriptor.none'.") { return; }
        }
2450 2451 2452
        break;

      case rpc::Resolve::EXCEPTION:
2453 2454 2455 2456
        // We can't set `replacement` to a new broken cap here because this will confuse
        // PromiseClient::Resolve() into thinking that the remote promise resolved to a local
        // capability and therefore a Disembargo is needed. We must actually reject the promise.
        exception = toException(resolve.getException());
2457 2458 2459 2460 2461 2462 2463
        break;

      default:
        KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; }
    }

    // If the import is on the table, fulfill it.
2464
    KJ_IF_MAYBE(import, imports.find(resolve.getPromiseId())) {
2465 2466
      KJ_IF_MAYBE(fulfiller, import->promiseFulfiller) {
        // OK, this is in fact an unfulfilled promise!
2467 2468 2469 2470 2471
        KJ_IF_MAYBE(e, exception) {
          fulfiller->get()->reject(kj::mv(*e));
        } else {
          fulfiller->get()->fulfill(kj::mv(replacement));
        }
2472 2473 2474 2475 2476 2477 2478 2479
      } else if (import->importClient != nullptr) {
        // It appears this is a valid entry on the import table, but was not expected to be a
        // promise.
        KJ_FAIL_REQUIRE("Got 'Resolve' for a non-promise import.") { break; }
      }
    }
  }

2480
  void handleRelease(const rpc::Release::Reader& release) {
Kenton Varda's avatar
Kenton Varda committed
2481
    releaseExport(release.getId(), release.getReferenceCount());
2482 2483
  }

Kenton Varda's avatar
Kenton Varda committed
2484
  void releaseExport(ExportId id, uint refcount) {
2485
    KJ_IF_MAYBE(exp, exports.find(id)) {
2486
      KJ_REQUIRE(refcount <= exp->refcount, "Tried to drop export's refcount below zero.") {
Kenton Varda's avatar
Kenton Varda committed
2487
        return;
2488 2489 2490 2491
      }

      exp->refcount -= refcount;
      if (exp->refcount == 0) {
2492
        exportsByCap.erase(exp->clientHook);
2493
        exports.erase(id, *exp);
2494 2495 2496
      }
    } else {
      KJ_FAIL_REQUIRE("Tried to release invalid export ID.") {
Kenton Varda's avatar
Kenton Varda committed
2497
        return;
2498 2499 2500 2501
      }
    }
  }

2502 2503 2504 2505 2506 2507
  void releaseExports(kj::ArrayPtr<ExportId> exports) {
    for (auto exportId: exports) {
      releaseExport(exportId, 1);
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2508 2509 2510 2511
  void handleDisembargo(const rpc::Disembargo::Reader& disembargo) {
    auto context = disembargo.getContext();
    switch (context.which()) {
      case rpc::Disembargo::Context::SENDER_LOOPBACK: {
2512
        kj::Own<ClientHook> target;
Kenton Varda's avatar
Kenton Varda committed
2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534

        KJ_IF_MAYBE(t, getMessageTarget(disembargo.getTarget())) {
          target = kj::mv(*t);
        } else {
          // Exception already reported.
          return;
        }

        for (;;) {
          KJ_IF_MAYBE(r, target->getResolved()) {
            target = r->addRef();
          } else {
            break;
          }
        }

        KJ_REQUIRE(target->getBrand() == this,
                   "'Disembargo' of type 'senderLoopback' sent to an object that does not point "
                   "back to the sender.") {
          return;
        }

Kenton Varda's avatar
Kenton Varda committed
2535 2536 2537 2538
        EmbargoId embargoId = context.getSenderLoopback();

        // We need to insert an evalLater() here to make sure that any pending calls towards this
        // cap have had time to find their way through the event loop.
2539 2540
        tasks.add(kj::evalLater(kj::mvCapture(
            target, [this,embargoId](kj::Own<ClientHook>&& target) {
2541 2542 2543 2544
          if (!connection.is<Connected>()) {
            return;
          }

2545
          RpcClient& downcasted = kj::downcast<RpcClient>(*target);
Kenton Varda's avatar
Kenton Varda committed
2546

2547
          auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
2548 2549 2550 2551 2552 2553
              messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
          auto builder = message->getBody().initAs<rpc::Message>().initDisembargo();

          {
            auto redirect = downcasted.writeTarget(builder.initTarget());

2554
            // Disembargoes should only be sent to capabilities that were previously the subject of
Kenton Varda's avatar
Kenton Varda committed
2555
            // a `Resolve` message.  But `writeTarget` only ever returns non-null when called on
2556 2557 2558
            // a PromiseClient.  The code which sends `Resolve` and `Return` should have replaced
            // any promise with a direct node in order to solve the Tribble 4-way race condition.
            // See the documentation of Disembargo in rpc.capnp for more.
Kenton Varda's avatar
Kenton Varda committed
2559 2560
            KJ_REQUIRE(redirect == nullptr,
                       "'Disembargo' of type 'senderLoopback' sent to an object that does not "
2561
                       "appear to have been the subject of a previous 'Resolve' message.") {
Kenton Varda's avatar
Kenton Varda committed
2562 2563
              return;
            }
Kenton Varda's avatar
Kenton Varda committed
2564 2565
          }

Kenton Varda's avatar
Kenton Varda committed
2566
          builder.getContext().setReceiverLoopback(embargoId);
Kenton Varda's avatar
Kenton Varda committed
2567

Kenton Varda's avatar
Kenton Varda committed
2568 2569
          message->send();
        })));
Kenton Varda's avatar
Kenton Varda committed
2570 2571 2572 2573

        break;
      }

Kenton Varda's avatar
Kenton Varda committed
2574
      case rpc::Disembargo::Context::RECEIVER_LOOPBACK: {
2575
        KJ_IF_MAYBE(embargo, embargoes.find(context.getReceiverLoopback())) {
Kenton Varda's avatar
Kenton Varda committed
2576
          KJ_ASSERT_NONNULL(embargo->fulfiller)->fulfill();
2577
          embargoes.erase(context.getReceiverLoopback(), *embargo);
Kenton Varda's avatar
Kenton Varda committed
2578 2579 2580 2581 2582 2583
        } else {
          KJ_FAIL_REQUIRE("Invalid embargo ID in 'Disembargo.context.receiverLoopback'.") {
            return;
          }
        }
        break;
Kenton Varda's avatar
Kenton Varda committed
2584
      }
Kenton Varda's avatar
Kenton Varda committed
2585 2586 2587 2588 2589 2590

      default:
        KJ_FAIL_REQUIRE("Unimplemented Disembargo type.", disembargo) { return; }
    }
  }

2591 2592
  // ---------------------------------------------------------------------------
  // Level 2
2593 2594 2595 2596
};

}  // namespace

2597
class RpcSystemBase::Impl final: public kj::TaskSet::ErrorHandler {
2598
public:
2599 2600 2601 2602
  Impl(VatNetworkBase& network, kj::Maybe<Capability::Client> bootstrapInterface,
       kj::Maybe<RealmGateway<>::Client> gateway)
      : network(network), bootstrapInterface(kj::mv(bootstrapInterface)),
        gateway(kj::mv(gateway)), tasks(*this) {
2603 2604 2605
    tasks.add(acceptLoop());
  }
  Impl(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2606
      : network(network), restorer(restorer), tasks(*this) {
2607 2608 2609 2610
    tasks.add(acceptLoop());
  }

  ~Impl() noexcept(false) {
2611 2612 2613 2614 2615
    unwindDetector.catchExceptionsIfUnwinding([&]() {
      // std::unordered_map doesn't like it when elements' destructors throw, so carefully
      // disassemble it.
      if (!connections.empty()) {
        kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size());
2616
        kj::Exception shutdownException = KJ_EXCEPTION(FAILED, "RpcSystem was destroyed.");
2617 2618 2619 2620
        for (auto& entry: connections) {
          entry.second->disconnect(kj::cp(shutdownException));
          deleteMe.add(kj::mv(entry.second));
        }
2621
      }
2622
    });
2623
  }
2624

2625 2626 2627 2628 2629 2630 2631 2632
  Capability::Client bootstrap(_::StructReader vatId) {
    // For now we delegate to restore() since it's equivalent, but eventually we'll remove restore()
    // and implement bootstrap() directly.
    return restore(vatId, AnyPointer::Reader());
  }

  Capability::Client restore(_::StructReader vatId, AnyPointer::Reader objectId) {
    KJ_IF_MAYBE(connection, network.baseConnect(vatId)) {
2633
      auto& state = getConnectionState(kj::mv(*connection));
2634 2635 2636 2637 2638 2639 2640
      return Capability::Client(state.restore(objectId));
    } else KJ_IF_MAYBE(r, restorer) {
      return r->baseRestore(objectId);
    } else {
      return Capability::Client(newBrokenCap(
          "SturdyRef referred to a local object but there is no local SturdyRef restorer."));
    }
2641 2642 2643
  }

  void taskFailed(kj::Exception&& exception) override {
2644
    KJ_LOG(ERROR, exception);
Kenton Varda's avatar
Kenton Varda committed
2645 2646
  }

2647 2648
private:
  VatNetworkBase& network;
2649
  kj::Maybe<Capability::Client> bootstrapInterface;
2650
  kj::Maybe<RealmGateway<>::Client> gateway;
2651 2652 2653 2654 2655
  kj::Maybe<SturdyRefRestorerBase&> restorer;
  kj::TaskSet tasks;

  typedef std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>>
      ConnectionMap;
2656
  ConnectionMap connections;
Kenton Varda's avatar
Kenton Varda committed
2657

2658 2659
  kj::UnwindDetector unwindDetector;

2660 2661 2662
  RpcConnectionState& getConnectionState(kj::Own<VatNetworkBase::Connection>&& connection) {
    auto iter = connections.find(connection);
    if (iter == connections.end()) {
2663
      VatNetworkBase::Connection* connectionPtr = connection;
Kenton Varda's avatar
Kenton Varda committed
2664 2665 2666
      auto onDisconnect = kj::newPromiseAndFulfiller<RpcConnectionState::DisconnectInfo>();
      tasks.add(onDisconnect.promise
          .then([this,connectionPtr](RpcConnectionState::DisconnectInfo info) {
2667
        connections.erase(connectionPtr);
Kenton Varda's avatar
Kenton Varda committed
2668
        tasks.add(kj::mv(info.shutdownPromise));
2669 2670
      }));
      auto newState = kj::refcounted<RpcConnectionState>(
2671 2672
          bootstrapInterface, gateway, restorer, kj::mv(connection),
          kj::mv(onDisconnect.fulfiller));
2673
      RpcConnectionState& result = *newState;
2674
      connections.insert(std::make_pair(connectionPtr, kj::mv(newState)));
2675 2676 2677 2678 2679 2680 2681
      return result;
    } else {
      return *iter->second;
    }
  }

  kj::Promise<void> acceptLoop() {
2682
    auto receive = network.baseAccept().then(
2683
        [this](kj::Own<VatNetworkBase::Connection>&& connection) {
2684
      getConnectionState(kj::mv(connection));
2685 2686 2687 2688 2689 2690 2691 2692
    });
    return receive.then([this]() {
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
      tasks.add(acceptLoop());
    });
2693
  }
2694 2695
};

2696
RpcSystemBase::RpcSystemBase(VatNetworkBase& network,
2697 2698 2699
                             kj::Maybe<Capability::Client> bootstrapInterface,
                             kj::Maybe<RealmGateway<>::Client> gateway)
    : impl(kj::heap<Impl>(network, kj::mv(bootstrapInterface), kj::mv(gateway))) {}
2700
RpcSystemBase::RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2701
    : impl(kj::heap<Impl>(network, restorer)) {}
2702
RpcSystemBase::RpcSystemBase(RpcSystemBase&& other) noexcept = default;
2703 2704
RpcSystemBase::~RpcSystemBase() noexcept(false) {}

2705 2706 2707 2708
Capability::Client RpcSystemBase::baseBootstrap(_::StructReader vatId) {
  return impl->bootstrap(vatId);
}

2709
Capability::Client RpcSystemBase::baseRestore(
2710
    _::StructReader hostId, AnyPointer::Reader objectId) {
2711
  return impl->restore(hostId, objectId);
2712 2713 2714 2715
}

}  // namespace _ (private)
}  // namespace capnp