rpc.c++ 98.7 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22

#include "rpc.h"
23
#include "message.h"
24 25 26
#include <kj/debug.h>
#include <kj/vector.h>
#include <kj/async.h>
Kenton Varda's avatar
Kenton Varda committed
27
#include <kj/one-of.h>
Kenton Varda's avatar
Kenton Varda committed
28
#include <kj/function.h>
29
#include <unordered_map>
Kenton Varda's avatar
Kenton Varda committed
30
#include <map>
31 32 33 34 35 36 37 38
#include <queue>
#include <capnp/rpc.capnp.h>

namespace capnp {
namespace _ {  // private

namespace {

Kenton Varda's avatar
Kenton Varda committed
39 40 41 42 43 44 45 46 47
template <typename T>
inline constexpr uint messageSizeHint() {
  return 1 + sizeInWords<rpc::Message>() + sizeInWords<T>();
}
template <>
inline constexpr uint messageSizeHint<void>() {
  return 1 + sizeInWords<rpc::Message>();
}

48 49 50
constexpr const uint MESSAGE_TARGET_SIZE_HINT = sizeInWords<rpc::MessageTarget>() +
    sizeInWords<rpc::PromisedAnswer>() + 16;  // +16 for ops; hope that's enough

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
constexpr const uint CAP_DESCRIPTOR_SIZE_HINT = sizeInWords<rpc::CapDescriptor>() +
    sizeInWords<rpc::PromisedAnswer>();

constexpr const uint64_t MAX_SIZE_HINT = 1 << 20;

uint copySizeHint(MessageSize size) {
  uint64_t sizeHint = size.wordCount + size.capCount * CAP_DESCRIPTOR_SIZE_HINT;
  return kj::min(MAX_SIZE_HINT, sizeHint);
}

uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint, uint additional) {
  KJ_IF_MAYBE(s, sizeHint) {
    return copySizeHint(*s) + additional;
  } else {
    return 0;
  }
}

Kenton Varda's avatar
Kenton Varda committed
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
kj::Maybe<kj::Array<PipelineOp>> toPipelineOps(List<rpc::PromisedAnswer::Op>::Reader ops) {
  auto result = kj::heapArrayBuilder<PipelineOp>(ops.size());
  for (auto opReader: ops) {
    PipelineOp op;
    switch (opReader.which()) {
      case rpc::PromisedAnswer::Op::NOOP:
        op.type = PipelineOp::NOOP;
        break;
      case rpc::PromisedAnswer::Op::GET_POINTER_FIELD:
        op.type = PipelineOp::GET_POINTER_FIELD;
        op.pointerIndex = opReader.getGetPointerField();
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported pipeline op.", (uint)opReader.which()) {
          return nullptr;
        }
    }
86
    result.add(op);
Kenton Varda's avatar
Kenton Varda committed
87 88 89 90 91
  }
  return result.finish();
}

Orphan<List<rpc::PromisedAnswer::Op>> fromPipelineOps(
Kenton Varda's avatar
Kenton Varda committed
92
    Orphanage orphanage, kj::ArrayPtr<const PipelineOp> ops) {
Kenton Varda's avatar
Kenton Varda committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  auto result = orphanage.newOrphan<List<rpc::PromisedAnswer::Op>>(ops.size());
  auto builder = result.get();
  for (uint i: kj::indices(ops)) {
    rpc::PromisedAnswer::Op::Builder opBuilder = builder[i];
    switch (ops[i].type) {
      case PipelineOp::NOOP:
        opBuilder.setNoop();
        break;
      case PipelineOp::GET_POINTER_FIELD:
        opBuilder.setGetPointerField(ops[i].pointerIndex);
        break;
    }
  }
  return result;
}

Kenton Varda's avatar
Kenton Varda committed
109
kj::Exception toException(const rpc::Exception::Reader& exception) {
110 111
  return kj::Exception(static_cast<kj::Exception::Type>(exception.getType()),
      "(remote)", 0, kj::str("remote exception: ", exception.getReason()));
Kenton Varda's avatar
Kenton Varda committed
112 113 114
}

void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) {
Kenton Varda's avatar
Kenton Varda committed
115 116 117
  // TODO(someday):  Indicate the remote server name as part of the stack trace.  Maybe even
  //   transmit stack traces?
  builder.setReason(exception.getDescription());
118
  builder.setType(static_cast<rpc::Exception::Type>(exception.getType()));
Kenton Varda's avatar
Kenton Varda committed
119 120
}

Kenton Varda's avatar
Kenton Varda committed
121
uint exceptionSizeHint(const kj::Exception& exception) {
Kenton Varda's avatar
Kenton Varda committed
122
  return sizeInWords<rpc::Exception>() + exception.getDescription().size() / sizeof(word) + 1;
Kenton Varda's avatar
Kenton Varda committed
123 124
}

Kenton Varda's avatar
Kenton Varda committed
125
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
126

127 128 129 130 131 132 133 134 135 136 137 138 139
template <typename Id, typename T>
class ExportTable {
  // Table mapping integers to T, where the integers are chosen locally.

public:
  kj::Maybe<T&> find(Id id) {
    if (id < slots.size() && slots[id] != nullptr) {
      return slots[id];
    } else {
      return nullptr;
    }
  }

140
  T erase(Id id, T& entry) {
Kenton Varda's avatar
Kenton Varda committed
141 142
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
143 144 145 146 147 148 149 150
    // `entry` is a reference to the entry being released -- we require this in order to prove
    // that the caller has already done a find() to check that this entry exists.  We can't check
    // ourselves because the caller may have nullified the entry in the meantime.
    KJ_DREQUIRE(&entry == &slots[id]);
    T toRelease = kj::mv(slots[id]);
    slots[id] = T();
    freeIds.push(id);
    return toRelease;
151 152 153 154 155 156 157 158 159 160 161 162 163
  }

  T& next(Id& id) {
    if (freeIds.empty()) {
      id = slots.size();
      return slots.add();
    } else {
      id = freeIds.top();
      freeIds.pop();
      return slots[id];
    }
  }

164 165 166 167 168 169 170 171 172
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i = 0; i < slots.size(); i++) {
      if (slots[i] != nullptr) {
        func(i, slots[i]);
      }
    }
  }

173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
private:
  kj::Vector<T> slots;
  std::priority_queue<Id, std::vector<Id>, std::greater<Id>> freeIds;
};

template <typename Id, typename T>
class ImportTable {
  // Table mapping integers to T, where the integers are chosen remotely.

public:
  T& operator[](Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      return high[id];
    }
  }

Kenton Varda's avatar
Kenton Varda committed
191 192 193 194 195
  kj::Maybe<T&> find(Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      auto iter = high.find(id);
Kenton Varda's avatar
Kenton Varda committed
196
      if (iter == high.end()) {
Kenton Varda's avatar
Kenton Varda committed
197 198 199 200 201 202 203
        return nullptr;
      } else {
        return iter->second;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
204 205 206
  T erase(Id id) {
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
207
    if (id < kj::size(low)) {
Kenton Varda's avatar
Kenton Varda committed
208
      T toRelease = kj::mv(low[id]);
209
      low[id] = T();
Kenton Varda's avatar
Kenton Varda committed
210
      return toRelease;
211
    } else {
Kenton Varda's avatar
Kenton Varda committed
212
      T toRelease = kj::mv(high[id]);
213
      high.erase(id);
Kenton Varda's avatar
Kenton Varda committed
214
      return toRelease;
215 216 217
    }
  }

218 219 220 221 222 223 224 225 226 227
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i: kj::indices(low)) {
      func(i, low[i]);
    }
    for (auto& entry: high) {
      func(entry.first, entry.second);
    }
  }

228 229 230 231 232
private:
  T low[16];
  std::unordered_map<Id, T> high;
};

Kenton Varda's avatar
Kenton Varda committed
233 234
// =======================================================================================

235
class RpcConnectionState final: public kj::TaskSet::ErrorHandler, public kj::Refcounted {
236
public:
Kenton Varda's avatar
Kenton Varda committed
237 238 239 240 241
  struct DisconnectInfo {
    kj::Promise<void> shutdownPromise;
    // Task which is working on sending an abort message and cleanly ending the connection.
  };

242
  RpcConnectionState(kj::Maybe<Capability::Client> bootstrapInterface,
243
                     kj::Maybe<RealmGateway<>::Client> gateway,
244
                     kj::Maybe<SturdyRefRestorerBase&> restorer,
245
                     kj::Own<VatNetworkBase::Connection>&& connectionParam,
Kenton Varda's avatar
Kenton Varda committed
246
                     kj::Own<kj::PromiseFulfiller<DisconnectInfo>>&& disconnectFulfiller)
247 248
      : bootstrapInterface(kj::mv(bootstrapInterface)), gateway(kj::mv(gateway)),
        restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), tasks(*this) {
249
    connection.init<Connected>(kj::mv(connectionParam));
250 251 252
    tasks.add(messageLoop());
  }

253
  kj::Own<ClientHook> restore(AnyPointer::Reader objectId) {
254 255 256 257
    if (connection.is<Disconnected>()) {
      return newBrokenCap(kj::cp(connection.get<Disconnected>()));
    }

258
    QuestionId questionId;
259
    auto& question = questions.next(questionId);
260

261
    question.isAwaitingReturn = true;
262

263
    auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
Kenton Varda's avatar
Kenton Varda committed
264

265 266
    auto questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller));
    question.selfRef = *questionRef;
Kenton Varda's avatar
Kenton Varda committed
267

268
    paf.promise = paf.promise.attach(kj::addRef(*questionRef));
269 270

    {
271
      auto message = connection.get<Connected>()->newOutgoingMessage(
272
          objectId.targetSize().wordCount + messageSizeHint<rpc::Bootstrap>());
273

274
      auto builder = message->getBody().initAs<rpc::Message>().initBootstrap();
275
      builder.setQuestionId(questionId);
276
      builder.getDeprecatedObjectId().set(objectId);
277 278 279 280

      message->send();
    }

281
    auto pipeline = kj::refcounted<RpcPipeline>(*this, kj::mv(questionRef), kj::mv(paf.promise));
282

283
    return pipeline->getPipelinedCap(kj::Array<const PipelineOp>(nullptr));
Kenton Varda's avatar
Kenton Varda committed
284 285
  }

286
  void taskFailed(kj::Exception&& exception) override {
287 288 289 290
    disconnect(kj::mv(exception));
  }

  void disconnect(kj::Exception&& exception) {
291 292 293 294 295
    if (!connection.is<Connected>()) {
      // Already disconnected.
      return;
    }

296 297
    kj::Exception networkException(kj::Exception::Type::DISCONNECTED,
        exception.getFile(), exception.getLine(), kj::heapString(exception.getDescription()));
298 299

    KJ_IF_MAYBE(newException, kj::runCatchingExceptions([&]() {
300 301
      // Carefully pull all the objects out of the tables prior to releasing them because their
      // destructors could come back and mess with the tables.
302 303 304 305
      kj::Vector<kj::Own<PipelineHook>> pipelinesToRelease;
      kj::Vector<kj::Own<ClientHook>> clientsToRelease;
      kj::Vector<kj::Promise<kj::Own<RpcResponse>>> tailCallsToRelease;
      kj::Vector<kj::Promise<void>> resolveOpsToRelease;
306 307

      // All current questions complete with exceptions.
308
      questions.forEach([&](QuestionId id, Question& question) {
Kenton Varda's avatar
Kenton Varda committed
309
        KJ_IF_MAYBE(questionRef, question.selfRef) {
310 311
          // QuestionRef still present.
          questionRef->reject(kj::cp(networkException));
Kenton Varda's avatar
Kenton Varda committed
312
        }
313 314
      });

315
      answers.forEach([&](AnswerId id, Answer& answer) {
316 317 318 319
        KJ_IF_MAYBE(p, answer.pipeline) {
          pipelinesToRelease.add(kj::mv(*p));
        }

320
        KJ_IF_MAYBE(promise, answer.redirectedResults) {
321
          tailCallsToRelease.add(kj::mv(*promise));
322 323
        }

324 325 326 327 328
        KJ_IF_MAYBE(context, answer.callContext) {
          context->requestCancel();
        }
      });

329
      exports.forEach([&](ExportId id, Export& exp) {
330
        clientsToRelease.add(kj::mv(exp.clientHook));
331
        resolveOpsToRelease.add(kj::mv(exp.resolveOp));
332 333 334
        exp = Export();
      });

335
      imports.forEach([&](ImportId id, Import& import) {
336 337
        KJ_IF_MAYBE(f, import.promiseFulfiller) {
          f->get()->reject(kj::cp(networkException));
338 339 340
        }
      });

341
      embargoes.forEach([&](EmbargoId id, Embargo& embargo) {
342 343 344 345
        KJ_IF_MAYBE(f, embargo.fulfiller) {
          f->get()->reject(kj::cp(networkException));
        }
      });
346 347 348 349 350
    })) {
      // Some destructor must have thrown an exception.  There is no appropriate place to report
      // these errors.
      KJ_LOG(ERROR, "Uncaught exception when destroying capabilities dropped by disconnect.",
             *newException);
351 352
    }

353 354 355
    // Send an abort message, but ignore failure.
    kj::runCatchingExceptions([&]() {
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
356
          messageSizeHint<void>() + exceptionSizeHint(exception));
357 358
      fromException(exception, message->getBody().getAs<rpc::Message>().initAbort());
      message->send();
359
    });
360 361

    // Indicate disconnect.
362 363 364 365 366 367 368 369 370 371 372
    auto shutdownPromise = connection.get<Connected>()->shutdown()
        .attach(kj::mv(connection.get<Connected>()))
        .then([]() -> kj::Promise<void> { return kj::READY_NOW; },
              [](kj::Exception&& e) -> kj::Promise<void> {
          // Don't report disconnects as an error.
          if (e.getType() != kj::Exception::Type::DISCONNECTED) {
            return kj::mv(e);
          }
          return kj::READY_NOW;
        });
    disconnectFulfiller->fulfill(DisconnectInfo { kj::mv(shutdownPromise) });
373
    connection.init<Disconnected>(kj::mv(networkException));
374 375
  }

376
private:
377
  class RpcClient;
Kenton Varda's avatar
Kenton Varda committed
378
  class ImportClient;
379
  class PromiseClient;
Kenton Varda's avatar
Kenton Varda committed
380
  class QuestionRef;
Kenton Varda's avatar
Kenton Varda committed
381
  class RpcPipeline;
Kenton Varda's avatar
Kenton Varda committed
382
  class RpcCallContext;
Kenton Varda's avatar
Kenton Varda committed
383
  class RpcResponse;
Kenton Varda's avatar
Kenton Varda committed
384

385 386 387 388 389 390
  // =======================================================================================
  // The Four Tables entry types
  //
  // We have to define these before we can define the class's fields.

  typedef uint32_t QuestionId;
391
  typedef QuestionId AnswerId;
392
  typedef uint32_t ExportId;
393 394 395 396 397 398 399 400 401 402
  typedef ExportId ImportId;
  // See equivalent definitions in rpc.capnp.
  //
  // We always use the type that refers to the local table of the same name.  So e.g. although
  // QuestionId and AnswerId are the same type, we use QuestionId when referring to an entry in
  // the local question table (which corresponds to the peer's answer table) and use AnswerId
  // to refer to an entry in our answer table (which corresponds to the peer's question table).
  // Since all messages in the RPC protocol are defined from the sender's point of view, this
  // means that any time we read an ID from a received message, its type should invert.
  // TODO(cleanup):  Perhaps we could enforce that in a type-safe way?  Hmm...
403 404

  struct Question {
405 406 407
    kj::Array<ExportId> paramExports;
    // List of exports that were sent in the request.  If the response has `releaseParamCaps` these
    // will need to be released.
408

Kenton Varda's avatar
Kenton Varda committed
409 410 411
    kj::Maybe<QuestionRef&> selfRef;
    // The local QuestionRef, set to nullptr when it is destroyed, which is also when `Finish` is
    // sent.
412

413 414 415
    bool isAwaitingReturn = false;
    // True from when `Call` is sent until `Return` is received.

416 417 418
    bool isTailCall = false;
    // Is this a tail call?  If so, we don't expect to receive results in the `Return`.

Kenton Varda's avatar
Kenton Varda committed
419
    inline bool operator==(decltype(nullptr)) const {
420
      return !isAwaitingReturn && selfRef == nullptr;
Kenton Varda's avatar
Kenton Varda committed
421 422
    }
    inline bool operator!=(decltype(nullptr)) const { return !operator==(nullptr); }
423 424 425
  };

  struct Answer {
426 427 428 429 430 431
    Answer() = default;
    Answer(const Answer&) = delete;
    Answer(Answer&&) = default;
    Answer& operator=(Answer&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

432 433 434 435
    bool active = false;
    // True from the point when the Call message is received to the point when both the `Finish`
    // message has been received and the `Return` has been sent.

436
    kj::Maybe<kj::Own<PipelineHook>> pipeline;
437 438
    // Send pipelined calls here.  Becomes null as soon as a `Finish` is received.

439
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> redirectedResults;
440 441
    // For locally-redirected calls (Call.sendResultsTo.yourself), this is a promise for the call
    // result, to be picked up by a subsequent `Return`.
442

443
    kj::Maybe<RpcCallContext&> callContext;
444 445
    // The call context, if it's still active.  Becomes null when the `Return` message is sent.
    // This object, if non-null, is owned by `asyncOp`.
446

447 448 449
    kj::Array<ExportId> resultExports;
    // List of exports that were sent in the results.  If the finish has `releaseResultCaps` these
    // will need to be released.
450 451 452 453 454 455
  };

  struct Export {
    uint refcount = 0;
    // When this reaches 0, drop `clientHook` and free this export.

456
    kj::Own<ClientHook> clientHook;
457

458 459 460 461
    kj::Promise<void> resolveOp = nullptr;
    // If this export is a promise (not a settled capability), the `resolveOp` represents the
    // ongoing operation to wait for that promise to resolve and then send a `Resolve` message.

462 463 464 465 466 467 468 469 470 471 472
    inline bool operator==(decltype(nullptr)) const { return refcount == 0; }
    inline bool operator!=(decltype(nullptr)) const { return refcount != 0; }
  };

  struct Import {
    Import() = default;
    Import(const Import&) = delete;
    Import(Import&&) = default;
    Import& operator=(Import&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

473
    kj::Maybe<ImportClient&> importClient;
474 475
    // Becomes null when the import is destroyed.

476 477 478 479 480
    kj::Maybe<RpcClient&> appClient;
    // Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient.
    // Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is
    // resolved and the import is no longer needed).

481
    kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Own<ClientHook>>>> promiseFulfiller;
482 483 484
    // If non-null, the import is a promise.
  };

Kenton Varda's avatar
Kenton Varda committed
485 486 487 488 489 490 491 492 493 494 495 496
  typedef uint32_t EmbargoId;

  struct Embargo {
    // For handling the `Disembargo` message when looping back to self.

    kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
    // Fulfill this when the Disembargo arrives.

    inline bool operator==(decltype(nullptr)) const { return fulfiller == nullptr; }
    inline bool operator!=(decltype(nullptr)) const { return fulfiller != nullptr; }
  };

497 498 499
  // =======================================================================================
  // OK, now we can define RpcConnectionState's member data.

500
  kj::Maybe<Capability::Client> bootstrapInterface;
501
  kj::Maybe<RealmGateway<>::Client> gateway;
502
  kj::Maybe<SturdyRefRestorerBase&> restorer;
503 504 505 506 507 508 509

  typedef kj::Own<VatNetworkBase::Connection> Connected;
  typedef kj::Exception Disconnected;
  kj::OneOf<Connected, Disconnected> connection;
  // Once the connection has failed, we drop it and replace it with an exception, which will be
  // thrown from all further calls.

Kenton Varda's avatar
Kenton Varda committed
510
  kj::Own<kj::PromiseFulfiller<DisconnectInfo>> disconnectFulfiller;
511

512 513
  ExportTable<ExportId, Export> exports;
  ExportTable<QuestionId, Question> questions;
514 515
  ImportTable<AnswerId, Answer> answers;
  ImportTable<ImportId, Import> imports;
516 517 518 519 520 521 522 523 524
  // The Four Tables!
  // The order of the tables is important for correct destruction.

  std::unordered_map<ClientHook*, ExportId> exportsByCap;
  // Maps already-exported ClientHook objects to their ID in the export table.

  ExportTable<EmbargoId, Embargo> embargoes;
  // There are only four tables.  This definitely isn't a fifth table.  I don't know what you're
  // talking about.
525 526 527 528

  kj::TaskSet tasks;

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
529
  // ClientHook implementations
530

Kenton Varda's avatar
Kenton Varda committed
531
  class RpcClient: public ClientHook, public kj::Refcounted {
532
  public:
533
    RpcClient(RpcConnectionState& connectionState)
534
        : connectionState(kj::addRef(connectionState)) {}
535

536 537 538 539
    virtual kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) = 0;
    // Writes a CapDescriptor referencing this client.  The CapDescriptor must be sent as part of
    // the very next message sent on the connection, as it may become invalid if other things
    // happen.
Kenton Varda's avatar
Kenton Varda committed
540 541 542 543
    //
    // If writing the descriptor adds a new export to the export table, or increments the refcount
    // on an existing one, then the ID is returned and the caller is responsible for removing it
    // later.
Kenton Varda's avatar
Kenton Varda committed
544

545 546
    virtual kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) = 0;
Kenton Varda's avatar
Kenton Varda committed
547
    // Writes the appropriate call target for calls to this capability and returns null.
Kenton Varda's avatar
Kenton Varda committed
548
    //
Kenton Varda's avatar
Kenton Varda committed
549 550 551 552
    // - OR -
    //
    // If calls have been redirected to some other local ClientHook, returns that hook instead.
    // This can happen if the capability represents a promise that has been resolved.
Kenton Varda's avatar
Kenton Varda committed
553

554
    virtual kj::Own<ClientHook> getInnermostClient() = 0;
Kenton Varda's avatar
Kenton Varda committed
555 556 557 558
    // If this client just wraps some other client -- even if it is only *temporarily* wrapping
    // that other client -- return a reference to the other client, transitively.  Otherwise,
    // return a new reference to *this.

Kenton Varda's avatar
Kenton Varda committed
559 560
    // implements ClientHook -----------------------------------------

561
    Request<AnyPointer, AnyPointer> newCall(
562
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
563 564 565 566 567 568 569 570 571 572 573 574
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          //
          // We pull a neat trick here: We actually end up returning a RequestHook for an import
          // request on the gateway cap, but with the "root" of the request actually pointing
          // to the "params" field of the real request.

          sizeHint = sizeHint.map([](MessageSize hint) {
            ++hint.capCount;
            hint.wordCount += sizeInWords<RealmGateway<>::ImportParams>();
575
            return hint;
576 577 578
          });

          auto request = g->importRequest(sizeHint);
579
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
580

581 582 583 584 585 586 587 588 589 590 591
          // Awkwardly, request.initParams() would return a SaveParams struct, but to construct
          // the Request<AnyPointer, AnyPointer> to return we need an AnyPointer::Builder, and you
          // can't go backwards from a struct builder to an AnyPointer builder. So instead we
          // manually get at the pointer by converting the outer request to AnyStruct and then
          // pulling the pointer from the pointer section.
          auto pointers = toAny(request).getPointerSection();
          KJ_ASSERT(pointers.size() >= 2);
          auto paramsPtr = pointers[1];
          KJ_ASSERT(paramsPtr.isNull());

          return Request<AnyPointer, AnyPointer>(paramsPtr, RequestHook::from(kj::mv(request)));
592 593 594
        }
      }

595 596 597 598 599
      return newCallNoIntercept(interfaceId, methodId, sizeHint);
    }

    Request<AnyPointer, AnyPointer> newCallNoIntercept(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) {
600 601 602 603
      if (!connectionState->connection.is<Connected>()) {
        return newBrokenRequest(kj::cp(connectionState->connection.get<Disconnected>()), sizeHint);
      }

604
      auto request = kj::heap<RpcRequest>(
605 606
          *connectionState, *connectionState->connection.get<Connected>(),
          sizeHint, kj::addRef(*this));
607 608 609 610 611 612
      auto callBuilder = request->getCall();

      callBuilder.setInterfaceId(interfaceId);
      callBuilder.setMethodId(methodId);

      auto root = request->getRoot();
613
      return Request<AnyPointer, AnyPointer>(root, kj::mv(request));
614 615
    }

Kenton Varda's avatar
Kenton Varda committed
616
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
617
                                kj::Own<CallContextHook>&& context) override {
618 619 620 621 622 623 624 625 626 627 628
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          auto params = context->getParams().getAs<Persistent<>::SaveParams>();

          auto requestSize = params.totalSize();
          ++requestSize.capCount;
          requestSize.wordCount += sizeInWords<RealmGateway<>::ImportParams>();

          auto request = g->importRequest(requestSize);
629
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
630 631 632 633 634 635 636 637
          request.setParams(params);

          context->allowCancellation();
          context->releaseParams();
          return context->directTailCall(RequestHook::from(kj::mv(request)));
        }
      }

638 639 640 641 642 643 644
      return callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    VoidPromiseAndPipeline callNoIntercept(uint64_t interfaceId, uint16_t methodId,
                                           kj::Own<CallContextHook>&& context) {
      // Implement call() by copying params and results messages.

Kenton Varda's avatar
Kenton Varda committed
645
      auto params = context->getParams();
646
      auto request = newCall(interfaceId, methodId, params.targetSize());
Kenton Varda's avatar
Kenton Varda committed
647

648
      request.set(params);
Kenton Varda's avatar
Kenton Varda committed
649 650
      context->releaseParams();

651
      // We can and should propagate cancellation.
652
      context->allowCancellation();
653

654
      return context->directTailCall(RequestHook::from(kj::mv(request)));
Kenton Varda's avatar
Kenton Varda committed
655 656
    }

657
    kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
658 659
      return kj::addRef(*this);
    }
660
    const void* getBrand() override {
661
      return connectionState.get();
Kenton Varda's avatar
Kenton Varda committed
662 663
    }

664
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
665 666
  };

667 668 669 670
  class ImportClient final: public RpcClient {
    // A ClientHook that wraps an entry in the import table.

  public:
671
    ImportClient(RpcConnectionState& connectionState, ImportId importId)
Kenton Varda's avatar
Kenton Varda committed
672 673
        : RpcClient(connectionState), importId(importId) {}

Kenton Varda's avatar
Kenton Varda committed
674
    ~ImportClient() noexcept(false) {
675
      unwindDetector.catchExceptionsIfUnwinding([&]() {
676
        // Remove self from the import table, if the table is still pointing at us.
677 678 679 680 681
        KJ_IF_MAYBE(import, connectionState->imports.find(importId)) {
          KJ_IF_MAYBE(i, import->importClient) {
            if (i == this) {
              connectionState->imports.erase(importId);
            }
682 683
          }
        }
Kenton Varda's avatar
Kenton Varda committed
684

685
        // Send a message releasing our remote references.
686 687
        if (remoteRefcount > 0 && connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
688 689 690 691 692 693 694
              messageSizeHint<rpc::Release>());
          rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease();
          builder.setId(importId);
          builder.setReferenceCount(remoteRefcount);
          message->send();
        }
      });
695 696
    }

697 698 699
    void addRemoteRef() {
      // Add a new RemoteRef and return a new ref to this client representing it.
      ++remoteRefcount;
Kenton Varda's avatar
Kenton Varda committed
700
    }
701

702
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
703
      descriptor.setReceiverHosted(importId);
Kenton Varda's avatar
Kenton Varda committed
704
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
705 706
    }

707 708
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
709
      target.setImportedCap(importId);
Kenton Varda's avatar
Kenton Varda committed
710
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
711 712
    }

713
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
714 715 716
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
717
    // implements ClientHook -----------------------------------------
718

719
    kj::Maybe<ClientHook&> getResolved() override {
720 721 722
      return nullptr;
    }

723
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
724 725 726
      return nullptr;
    }

Kenton Varda's avatar
Kenton Varda committed
727
  private:
728
    ImportId importId;
Kenton Varda's avatar
Kenton Varda committed
729 730 731

    uint remoteRefcount = 0;
    // Number of times we've received this import from the peer.
732 733

    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
734 735
  };

736 737 738
  class PipelineClient final: public RpcClient {
    // A ClientHook representing a pipelined promise.  Always wrapped in PromiseClient.

Kenton Varda's avatar
Kenton Varda committed
739
  public:
740 741
    PipelineClient(RpcConnectionState& connectionState,
                   kj::Own<QuestionRef>&& questionRef,
742
                   kj::Array<PipelineOp>&& ops)
Kenton Varda's avatar
Kenton Varda committed
743
        : RpcClient(connectionState), questionRef(kj::mv(questionRef)), ops(kj::mv(ops)) {}
Kenton Varda's avatar
Kenton Varda committed
744

745
   kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
746 747 748 749 750
      auto promisedAnswer = descriptor.initReceiverAnswer();
      promisedAnswer.setQuestionId(questionRef->getId());
      promisedAnswer.adoptTransform(fromPipelineOps(
          Orphanage::getForMessageContaining(descriptor), ops));
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
751 752
    }

753 754
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
Kenton Varda's avatar
Kenton Varda committed
755 756 757 758
      auto builder = target.initPromisedAnswer();
      builder.setQuestionId(questionRef->getId());
      builder.adoptTransform(fromPipelineOps(Orphanage::getForMessageContaining(builder), ops));
      return nullptr;
759 760
    }

761
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
762 763 764
      return kj::addRef(*this);
    }

765
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
766

767
    kj::Maybe<ClientHook&> getResolved() override {
768 769 770
      return nullptr;
    }

771
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
772
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
773 774 775
    }

  private:
776
    kj::Own<QuestionRef> questionRef;
777
    kj::Array<PipelineOp> ops;
Kenton Varda's avatar
Kenton Varda committed
778 779
  };

780 781 782 783
  class PromiseClient final: public RpcClient {
    // A ClientHook that initially wraps one client (in practice, an ImportClient or a
    // PipelineClient) and then, later on, redirects to some other client.

Kenton Varda's avatar
Kenton Varda committed
784
  public:
785 786 787
    PromiseClient(RpcConnectionState& connectionState,
                  kj::Own<ClientHook> initial,
                  kj::Promise<kj::Own<ClientHook>> eventual,
788
                  kj::Maybe<ImportId> importId)
789
        : RpcClient(connectionState),
790 791
          isResolved(false),
          cap(kj::mv(initial)),
792
          importId(importId),
793 794 795
          fork(eventual.fork()),
          resolveSelfPromise(fork.addBranch().then(
              [this](kj::Own<ClientHook>&& resolution) {
796
                resolve(kj::mv(resolution), false);
797
              }, [this](kj::Exception&& exception) {
798
                resolve(newBrokenCap(kj::mv(exception)), true);
799 800 801 802
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
803
              })) {
804 805 806 807 808 809
      // Create a client that starts out forwarding all calls to `initial` but, once `eventual`
      // resolves, will forward there instead.  In addition, `whenMoreResolved()` will return a fork
      // of `eventual`.  Note that this means the application could hold on to `eventual` even after
      // the `PromiseClient` is destroyed; `eventual` must therefore make sure to hold references to
      // anything that needs to stay alive in order to resolve it correctly (such as making sure the
      // import ID is not released).
Kenton Varda's avatar
Kenton Varda committed
810
    }
Kenton Varda's avatar
Kenton Varda committed
811

812 813 814 815 816 817
    ~PromiseClient() noexcept(false) {
      KJ_IF_MAYBE(id, importId) {
        // This object is representing an import promise.  That means the import table may still
        // contain a pointer back to it.  Remove that pointer.  Note that we have to verify that
        // the import still exists and the pointer still points back to this object because this
        // object may actually outlive the import.
818
        KJ_IF_MAYBE(import, connectionState->imports.find(*id)) {
819 820 821 822 823 824 825 826 827
          KJ_IF_MAYBE(c, import->appClient) {
            if (c == this) {
              import->appClient = nullptr;
            }
          }
        }
      }
    }

828
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
829
      receivedCall = true;
830
      return connectionState->writeDescriptor(*cap, descriptor);
Kenton Varda's avatar
Kenton Varda committed
831
    }
Kenton Varda's avatar
Kenton Varda committed
832

833 834
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
835
      receivedCall = true;
836
      return connectionState->writeTarget(*cap, target);
Kenton Varda's avatar
Kenton Varda committed
837 838
    }

839
    kj::Own<ClientHook> getInnermostClient() override {
840
      receivedCall = true;
841
      return connectionState->getInnermostClient(*cap);
Kenton Varda's avatar
Kenton Varda committed
842 843
    }

Kenton Varda's avatar
Kenton Varda committed
844
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
845

846
    Request<AnyPointer, AnyPointer> newCall(
847
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
848
      receivedCall = true;
849
      return cap->newCall(interfaceId, methodId, sizeHint);
850 851
    }

852
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
853
                                kj::Own<CallContextHook>&& context) override {
854
      receivedCall = true;
855
      return cap->call(interfaceId, methodId, kj::mv(context));
856 857
    }

858
    kj::Maybe<ClientHook&> getResolved() override {
859 860
      if (isResolved) {
        return *cap;
861 862 863
      } else {
        return nullptr;
      }
Kenton Varda's avatar
Kenton Varda committed
864 865
    }

866
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
867
      return fork.addBranch();
Kenton Varda's avatar
Kenton Varda committed
868
    }
Kenton Varda's avatar
Kenton Varda committed
869 870

  private:
871 872
    bool isResolved;
    kj::Own<ClientHook> cap;
873

874
    kj::Maybe<ImportId> importId;
875
    kj::ForkedPromise<kj::Own<ClientHook>> fork;
Kenton Varda's avatar
Kenton Varda committed
876 877 878 879 880

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

881
    bool receivedCall = false;
Kenton Varda's avatar
Kenton Varda committed
882

883
    void resolve(kj::Own<ClientHook> replacement, bool isError) {
884 885
      if (replacement->getBrand() != connectionState.get() && receivedCall && !isError &&
          connectionState->connection.is<Connected>()) {
886 887 888 889 890
        // The new capability is hosted locally, not on the remote machine.  And, we had made calls
        // to the promise.  We need to make sure those calls echo back to us before we allow new
        // calls to go directly to the local capability, so we need to set a local embargo and send
        // a `Disembargo` to echo through the peer.

891
        auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
892
            messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
Kenton Varda's avatar
Kenton Varda committed
893

Kenton Varda's avatar
Kenton Varda committed
894
        auto disembargo = message->getBody().initAs<rpc::Message>().initDisembargo();
Kenton Varda's avatar
Kenton Varda committed
895 896

        {
897
          auto redirect = connectionState->writeTarget(*cap, disembargo.initTarget());
Kenton Varda's avatar
Kenton Varda committed
898 899 900 901 902
          KJ_ASSERT(redirect == nullptr,
                    "Original promise target should always be from this RPC connection.");
        }

        EmbargoId embargoId;
903
        Embargo& embargo = connectionState->embargoes.next(embargoId);
Kenton Varda's avatar
Kenton Varda committed
904 905 906 907 908 909 910

        disembargo.getContext().setSenderLoopback(embargoId);

        auto paf = kj::newPromiseAndFulfiller<void>();
        embargo.fulfiller = kj::mv(paf.fulfiller);

        // Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back.
911 912
        auto embargoPromise = paf.promise.then(
            kj::mvCapture(replacement, [this](kj::Own<ClientHook>&& replacement) {
Kenton Varda's avatar
Kenton Varda committed
913 914 915 916 917
              return kj::mv(replacement);
            }));

        // We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise
        // client instead.
918
        replacement = newLocalPromiseClient(kj::mv(embargoPromise));
Kenton Varda's avatar
Kenton Varda committed
919 920 921 922 923

        // Send the `Disembargo`.
        message->send();
      }

924 925
      cap = replacement->addRef();
      isResolved = true;
Kenton Varda's avatar
Kenton Varda committed
926
    }
Kenton Varda's avatar
Kenton Varda committed
927
  };
928

929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
  class NoInterceptClient final: public RpcClient {
    // A wrapper around an RpcClient which bypasses special handling of "save" requests. When we
    // intercept a "save" request and invoke a RealmGateway, we give it a version of the capability
    // with intercepting disabled, since usually the first thing the RealmGateway will do is turn
    // around and call save() again.
    //
    // This is admittedly sort of backwards: the interception of "save" ought to be the part
    // implemented by a wrapper. However, that would require placing a wrapper around every
    // RpcClient we create whereas NoInterceptClient only needs to be injected after a save()
    // request occurs and is intercepted.

  public:
    NoInterceptClient(RpcClient& inner)
        : RpcClient(*inner.connectionState),
          inner(kj::addRef(inner)) {}

    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
      return inner->writeDescriptor(descriptor);
    }

    kj::Maybe<kj::Own<ClientHook>> writeTarget(rpc::MessageTarget::Builder target) override {
      return inner->writeTarget(target);
    }

    kj::Own<ClientHook> getInnermostClient() override {
      return inner->getInnermostClient();
    }

    Request<AnyPointer, AnyPointer> newCall(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
      return inner->newCallNoIntercept(interfaceId, methodId, sizeHint);
    }
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
                                kj::Own<CallContextHook>&& context) override {
      return inner->callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    kj::Maybe<ClientHook&> getResolved() override {
      return nullptr;
    }

    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
      return nullptr;
    }

  private:
    kj::Own<RpcClient> inner;
  };

978
  kj::Maybe<ExportId> writeDescriptor(ClientHook& cap, rpc::CapDescriptor::Builder descriptor) {
979
    // Write a descriptor for the given capability.
Kenton Varda's avatar
Kenton Varda committed
980

981
    // Find the innermost wrapped capability.
982
    ClientHook* inner = &cap;
983 984 985 986 987 988 989 990 991
    for (;;) {
      KJ_IF_MAYBE(resolved, inner->getResolved()) {
        inner = resolved;
      } else {
        break;
      }
    }

    if (inner->getBrand() == this) {
992
      return kj::downcast<RpcClient>(*inner).writeDescriptor(descriptor);
Kenton Varda's avatar
Kenton Varda committed
993
    } else {
994 995
      auto iter = exportsByCap.find(inner);
      if (iter != exportsByCap.end()) {
996
        // We've already seen and exported this capability before.  Just up the refcount.
997
        auto& exp = KJ_ASSERT_NONNULL(exports.find(iter->second));
Kenton Varda's avatar
Kenton Varda committed
998 999 1000 1001
        ++exp.refcount;
        descriptor.setSenderHosted(iter->second);
        return iter->second;
      } else {
1002
        // This is the first time we've seen this capability.
Kenton Varda's avatar
Kenton Varda committed
1003
        ExportId exportId;
1004 1005
        auto& exp = exports.next(exportId);
        exportsByCap[inner] = exportId;
Kenton Varda's avatar
Kenton Varda committed
1006
        exp.refcount = 1;
1007 1008 1009 1010
        exp.clientHook = inner->addRef();

        KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) {
          // This is a promise.  Arrange for the `Resolve` message to be sent later.
Kenton Varda's avatar
Kenton Varda committed
1011
          exp.resolveOp = resolveExportedPromise(exportId, kj::mv(*wrapped));
1012 1013 1014
          descriptor.setSenderPromise(exportId);
        } else {
          descriptor.setSenderHosted(exportId);
1015 1016
        }

Kenton Varda's avatar
Kenton Varda committed
1017 1018
        return exportId;
      }
Kenton Varda's avatar
Kenton Varda committed
1019 1020 1021
    }
  }

1022
  kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> capTable,
1023 1024 1025 1026
                                       rpc::Payload::Builder payload) {
    auto capTableBuilder = payload.initCapTable(capTable.size());
    kj::Vector<ExportId> exports(capTable.size());
    for (uint i: kj::indices(capTable)) {
1027 1028 1029 1030 1031 1032
      KJ_IF_MAYBE(cap, capTable[i]) {
        KJ_IF_MAYBE(exportId, writeDescriptor(**cap, capTableBuilder[i])) {
          exports.add(*exportId);
        }
      } else {
        capTableBuilder[i].setNone();
1033 1034 1035 1036 1037
      }
    }
    return exports.releaseAsArray();
  }

1038
  kj::Maybe<kj::Own<ClientHook>> writeTarget(ClientHook& cap, rpc::MessageTarget::Builder target) {
Kenton Varda's avatar
Kenton Varda committed
1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049
    // If calls to the given capability should pass over this connection, fill in `target`
    // appropriately for such a call and return nullptr.  Otherwise, return a `ClientHook` to which
    // the call should be forwarded; the caller should then delegate the call to that `ClientHook`.
    //
    // The main case where this ends up returning non-null is if `cap` is a promise that has
    // recently resolved.  The application might have started building a request before the promise
    // resolved, and so the request may have been built on the assumption that it would be sent over
    // this network connection, but then the promise resolved to point somewhere else before the
    // request was sent.  Now the request has to be redirected to the new target instead.

    if (cap.getBrand() == this) {
1050
      return kj::downcast<RpcClient>(cap).writeTarget(target);
Kenton Varda's avatar
Kenton Varda committed
1051 1052 1053 1054 1055
    } else {
      return cap.addRef();
    }
  }

1056 1057
  kj::Own<ClientHook> getInnermostClient(ClientHook& client) {
    ClientHook* ptr = &client;
Kenton Varda's avatar
Kenton Varda committed
1058 1059 1060 1061 1062 1063 1064 1065 1066
    for (;;) {
      KJ_IF_MAYBE(inner, ptr->getResolved()) {
        ptr = inner;
      } else {
        break;
      }
    }

    if (ptr->getBrand() == this) {
1067
      return kj::downcast<RpcClient>(*ptr).getInnermostClient();
Kenton Varda's avatar
Kenton Varda committed
1068 1069 1070 1071 1072 1073
    } else {
      return ptr->addRef();
    }
  }

  kj::Promise<void> resolveExportedPromise(
1074
      ExportId exportId, kj::Promise<kj::Own<ClientHook>>&& promise) {
Kenton Varda's avatar
Kenton Varda committed
1075 1076 1077 1078
    // Implements exporting of a promise.  The promise has been exported under the given ID, and is
    // to eventually resolve to the ClientHook produced by `promise`.  This method waits for that
    // resolve to happen and then sends the appropriate `Resolve` message to the peer.

1079
    return promise.then(
1080
        [this,exportId](kj::Own<ClientHook>&& resolution) -> kj::Promise<void> {
Kenton Varda's avatar
Kenton Varda committed
1081 1082
      // Successful resolution.

1083 1084 1085 1086 1087
      KJ_ASSERT(connection.is<Connected>(),
                "Resolving export should have been canceled on disconnect.") {
        return kj::READY_NOW;
      }

Kenton Varda's avatar
Kenton Varda committed
1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098
      // Get the innermost ClientHook backing the resolved client.  This includes traversing
      // PromiseClients that haven't resolved yet to their underlying ImportClient or
      // PipelineClient, so that we get a remote promise that might resolve later.  This is
      // important to make sure that if the peer sends a `Disembargo` back to us, it bounces back
      // correctly instead of going to the result of some future resolution.  See the documentation
      // for `Disembargo` in `rpc.capnp`.
      resolution = getInnermostClient(*resolution);

      // Update the export table to point at this object instead.  We know that our entry in the
      // export table is still live because when it is destroyed the asynchronous resolution task
      // (i.e. this code) is canceled.
1099 1100
      auto& exp = KJ_ASSERT_NONNULL(exports.find(exportId));
      exportsByCap.erase(exp.clientHook);
Kenton Varda's avatar
Kenton Varda committed
1101 1102
      exp.clientHook = kj::mv(resolution);

1103
      if (exp.clientHook->getBrand() != this) {
Kenton Varda's avatar
Kenton Varda committed
1104 1105 1106
        // We're resolving to a local capability.  If we're resolving to a promise, we might be
        // able to reuse our export table entry and avoid sending a message.

1107
        KJ_IF_MAYBE(promise, exp.clientHook->whenMoreResolved()) {
Kenton Varda's avatar
Kenton Varda committed
1108 1109 1110 1111
          // We're replacing a promise with another local promise.  In this case, we might actually
          // be able to just reuse the existing export table entry to represent the new promise --
          // unless it already has an entry.  Let's check.

1112
          auto insertResult = exportsByCap.insert(std::make_pair(exp.clientHook.get(), exportId));
Kenton Varda's avatar
Kenton Varda committed
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123

          if (insertResult.second) {
            // The new promise was not already in the table, therefore the existing export table
            // entry has now been repurposed to represent it.  There is no need to send a resolve
            // message at all.  We do, however, have to start resolving the next promise.
            return resolveExportedPromise(exportId, kj::mv(*promise));
          }
        }
      }

      // OK, we have to send a `Resolve` message.
1124
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1125 1126 1127
          messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
1128
      writeDescriptor(*exp.clientHook, resolve.initCap());
Kenton Varda's avatar
Kenton Varda committed
1129 1130 1131 1132 1133
      message->send();

      return kj::READY_NOW;
    }, [this,exportId](kj::Exception&& exception) {
      // send error resolution
1134
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1135 1136 1137 1138 1139
          messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
      fromException(exception, resolve.initException());
      message->send();
1140 1141 1142
    }).eagerlyEvaluate([this](kj::Exception&& exception) {
      // Put the exception on the TaskSet which will cause the connection to be terminated.
      tasks.add(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1143 1144 1145
    });
  }

Kenton Varda's avatar
Kenton Varda committed
1146
  // =====================================================================================
1147
  // Interpreting CapDescriptor
Kenton Varda's avatar
Kenton Varda committed
1148

1149
  kj::Own<ClientHook> import(ImportId importId, bool isPromise) {
1150
    // Receive a new import.
Kenton Varda's avatar
Kenton Varda committed
1151

1152 1153
    auto& import = imports[importId];
    kj::Own<ImportClient> importClient;
Kenton Varda's avatar
Kenton Varda committed
1154

1155 1156 1157 1158 1159 1160
    // Create the ImportClient, or if one already exists, use it.
    KJ_IF_MAYBE(c, import.importClient) {
      importClient = kj::addRef(*c);
    } else {
      importClient = kj::refcounted<ImportClient>(*this, importId);
      import.importClient = *importClient;
Kenton Varda's avatar
Kenton Varda committed
1161
    }
1162

1163 1164
    // We just received a copy of this import ID, so the remote refcount has gone up.
    importClient->addRemoteRef();
Kenton Varda's avatar
Kenton Varda committed
1165

1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183
    if (isPromise) {
      // We need to construct a PromiseClient around this import, if we haven't already.
      KJ_IF_MAYBE(c, import.appClient) {
        // Use the existing one.
        return kj::addRef(*c);
      } else {
        // Create a promise for this import's resolution.
        auto paf = kj::newPromiseAndFulfiller<kj::Own<ClientHook>>();
        import.promiseFulfiller = kj::mv(paf.fulfiller);

        // Make sure the import is not destroyed while this promise exists.
        paf.promise = paf.promise.attach(kj::addRef(*importClient));

        // Create a PromiseClient around it and return it.
        auto result = kj::refcounted<PromiseClient>(
            *this, kj::mv(importClient), kj::mv(paf.promise), importId);
        import.appClient = *result;
        return kj::mv(result);
1184
      }
1185 1186 1187
    } else {
      import.appClient = *importClient;
      return kj::mv(importClient);
1188
    }
1189
  }
1190

1191
  kj::Maybe<kj::Own<ClientHook>> receiveCap(rpc::CapDescriptor::Reader descriptor) {
1192 1193
    switch (descriptor.which()) {
      case rpc::CapDescriptor::NONE:
1194
        return nullptr;
1195

1196 1197 1198 1199
      case rpc::CapDescriptor::SENDER_HOSTED:
        return import(descriptor.getSenderHosted(), false);
      case rpc::CapDescriptor::SENDER_PROMISE:
        return import(descriptor.getSenderPromise(), true);
1200

1201 1202 1203 1204
      case rpc::CapDescriptor::RECEIVER_HOSTED:
        KJ_IF_MAYBE(exp, exports.find(descriptor.getReceiverHosted())) {
          return exp->clientHook->addRef();
        } else {
1205 1206 1207
          return newBrokenCap("invalid 'receiverHosted' export ID");
        }

1208 1209
      case rpc::CapDescriptor::RECEIVER_ANSWER: {
        auto promisedAnswer = descriptor.getReceiverAnswer();
1210

1211 1212 1213 1214 1215 1216 1217
        KJ_IF_MAYBE(answer, answers.find(promisedAnswer.getQuestionId())) {
          if (answer->active) {
            KJ_IF_MAYBE(pipeline, answer->pipeline) {
              KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
                return pipeline->get()->getPipelinedCap(*ops);
              } else {
                return newBrokenCap("unrecognized pipeline ops");
Kenton Varda's avatar
Kenton Varda committed
1218 1219
              }
            }
1220
          }
1221 1222
        }

1223
        return newBrokenCap("invalid 'receiverAnswer'");
1224 1225
      }

1226 1227 1228
      case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
        // We don't support third-party caps, so use the vine instead.
        return import(descriptor.getThirdPartyHosted().getVineId(), false);
Kenton Varda's avatar
Kenton Varda committed
1229

1230 1231 1232
      default:
        KJ_FAIL_REQUIRE("unknown CapDescriptor type") { break; }
        return newBrokenCap("unknown CapDescriptor type");
Kenton Varda's avatar
Kenton Varda committed
1233
    }
1234
  }
1235

1236 1237
  kj::Array<kj::Maybe<kj::Own<ClientHook>>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable) {
    auto result = kj::heapArrayBuilder<kj::Maybe<kj::Own<ClientHook>>>(capTable.size());
1238 1239
    for (auto cap: capTable) {
      result.add(receiveCap(cap));
Kenton Varda's avatar
Kenton Varda committed
1240
    }
1241 1242
    return result.finish();
  }
1243 1244

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
1245
  // RequestHook/PipelineHook/ResponseHook implementations
1246

Kenton Varda's avatar
Kenton Varda committed
1247 1248 1249 1250
  class QuestionRef: public kj::Refcounted {
    // A reference to an entry on the question table.  Used to detect when the `Finish` message
    // can be sent.

Kenton Varda's avatar
Kenton Varda committed
1251
  public:
1252
    inline QuestionRef(
1253 1254 1255
        RpcConnectionState& connectionState, QuestionId id,
        kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller)
        : connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1256 1257

    ~QuestionRef() {
1258
      unwindDetector.catchExceptionsIfUnwinding([&]() {
1259 1260 1261
        auto& question = KJ_ASSERT_NONNULL(
            connectionState->questions.find(id), "Question ID no longer on table?");

1262
        // Send the "Finish" message (if the connection is not already broken).
1263 1264
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1265
              messageSizeHint<rpc::Finish>());
1266 1267
          auto builder = message->getBody().getAs<rpc::Message>().initFinish();
          builder.setQuestionId(id);
1268 1269 1270 1271 1272
          // If we're still awaiting a return, then this request is being canceled, and we're going
          // to ignore any capabilities in the return message, so set releaseResultCaps true. If we
          // already received the return, then we've already built local proxies for the caps and
          // will send Release messages when those are destoryed.
          builder.setReleaseResultCaps(question.isAwaitingReturn);
1273
          message->send();
1274
        }
Kenton Varda's avatar
Kenton Varda committed
1275

1276 1277 1278
        // Check if the question has returned and, if so, remove it from the table.
        // Remove question ID from the table.  Must do this *after* sending `Finish` to ensure that
        // the ID is not re-allocated before the `Finish` message can be sent.
1279 1280
        if (question.isAwaitingReturn) {
          // Still waiting for return, so just remove the QuestionRef pointer from the table.
1281
          question.selfRef = nullptr;
1282 1283 1284
        } else {
          // Call has already returned, so we can now remove it from the table.
          connectionState->questions.erase(id, question);
1285 1286
        }
      });
Kenton Varda's avatar
Kenton Varda committed
1287 1288 1289 1290
    }

    inline QuestionId getId() const { return id; }

1291
    void fulfill(kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1292 1293 1294
      fulfiller->fulfill(kj::mv(response));
    }

1295
    void fulfill(kj::Promise<kj::Own<RpcResponse>>&& promise) {
1296 1297 1298
      fulfiller->fulfill(kj::mv(promise));
    }

Kenton Varda's avatar
Kenton Varda committed
1299 1300
    void reject(kj::Exception&& exception) {
      fulfiller->reject(kj::mv(exception));
1301
    }
Kenton Varda's avatar
Kenton Varda committed
1302 1303

  private:
1304
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1305
    QuestionId id;
1306
    kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller;
1307
    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
1308 1309
  };

Kenton Varda's avatar
Kenton Varda committed
1310
  class RpcRequest final: public RequestHook {
1311
  public:
1312 1313
    RpcRequest(RpcConnectionState& connectionState, VatNetworkBase::Connection& connection,
               kj::Maybe<MessageSize> sizeHint, kj::Own<RpcClient>&& target)
1314
        : connectionState(kj::addRef(connectionState)),
Kenton Varda's avatar
Kenton Varda committed
1315
          target(kj::mv(target)),
1316
          message(connection.newOutgoingMessage(
1317 1318
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Call>() +
                  sizeInWords<rpc::Payload>() + MESSAGE_TARGET_SIZE_HINT))),
Kenton Varda's avatar
Kenton Varda committed
1319
          callBuilder(message->getBody().getAs<rpc::Message>().initCall()),
1320
          paramsBuilder(callBuilder.getParams().getContent()) {}
Kenton Varda's avatar
Kenton Varda committed
1321

1322
    inline AnyPointer::Builder getRoot() {
Kenton Varda's avatar
Kenton Varda committed
1323 1324
      return paramsBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1325 1326 1327
    inline rpc::Call::Builder getCall() {
      return callBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1328

1329
    RemotePromise<AnyPointer> send() override {
1330
      if (!connectionState->connection.is<Connected>()) {
1331
        // Connection is broken.
1332
        const kj::Exception& e = connectionState->connection.get<Disconnected>();
1333
        return RemotePromise<AnyPointer>(
1334 1335
            kj::Promise<Response<AnyPointer>>(kj::cp(e)),
            AnyPointer::Pipeline(newBrokenPipeline(kj::cp(e))));
1336
      }
Kenton Varda's avatar
Kenton Varda committed
1337

1338 1339 1340
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // We'll have to make a new request and do a copy.  Ick.
Kenton Varda's avatar
Kenton Varda committed
1341

1342
        auto replacement = redirect->get()->newCall(
1343
            callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize());
1344 1345 1346
        replacement.set(paramsBuilder);
        return replacement.send();
      } else {
1347
        auto sendResult = sendInternal(false);
Kenton Varda's avatar
Kenton Varda committed
1348

1349
        auto forkedPromise = sendResult.promise.fork();
1350

1351 1352 1353
        // The pipeline must get notified of resolution before the app does to maintain ordering.
        auto pipeline = kj::refcounted<RpcPipeline>(
            *connectionState, kj::mv(sendResult.questionRef), forkedPromise.addBranch());
Kenton Varda's avatar
Kenton Varda committed
1354

1355 1356 1357 1358 1359
        auto appPromise = forkedPromise.addBranch().then(
            [=](kj::Own<RpcResponse>&& response) {
              auto reader = response->getResults();
              return Response<AnyPointer>(reader, kj::mv(response));
            });
Kenton Varda's avatar
Kenton Varda committed
1360

1361 1362 1363 1364
        return RemotePromise<AnyPointer>(
            kj::mv(appPromise),
            AnyPointer::Pipeline(kj::mv(pipeline)));
      }
Kenton Varda's avatar
Kenton Varda committed
1365 1366
    }

1367 1368 1369
    struct TailInfo {
      QuestionId questionId;
      kj::Promise<void> promise;
1370
      kj::Own<PipelineHook> pipeline;
1371 1372 1373 1374 1375 1376 1377 1378 1379 1380
    };

    kj::Maybe<TailInfo> tailSend() {
      // Send the request as a tail call.
      //
      // Returns null if for some reason a tail call is not possible and the caller should fall
      // back to using send() and copying the response.

      SendInternalResult sendResult;

1381
      if (!connectionState->connection.is<Connected>()) {
1382 1383 1384
        // Disconnected; fall back to a regular send() which will fail appropriately.
        return nullptr;
      }
1385

1386 1387 1388 1389 1390 1391
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // Fall back to regular send().
        return nullptr;
      } else {
        sendResult = sendInternal(true);
1392 1393
      }

1394
      auto promise = sendResult.promise.then([](kj::Own<RpcResponse>&& response) {
1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405
        // Response should be null if `Return` handling code is correct.
        KJ_ASSERT(!response) { break; }
      });

      QuestionId questionId = sendResult.questionRef->getId();

      auto pipeline = kj::refcounted<RpcPipeline>(*connectionState, kj::mv(sendResult.questionRef));

      return TailInfo { questionId, kj::mv(promise), kj::mv(pipeline) };
    }

1406
    const void* getBrand() override {
1407 1408 1409
      return connectionState.get();
    }

Kenton Varda's avatar
Kenton Varda committed
1410
  private:
1411
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1412

1413
    kj::Own<RpcClient> target;
Kenton Varda's avatar
Kenton Varda committed
1414 1415
    kj::Own<OutgoingRpcMessage> message;
    rpc::Call::Builder callBuilder;
1416
    AnyPointer::Builder paramsBuilder;
1417 1418 1419

    struct SendInternalResult {
      kj::Own<QuestionRef> questionRef;
1420
      kj::Promise<kj::Own<RpcResponse>> promise = nullptr;
1421 1422
    };

1423
    SendInternalResult sendInternal(bool isTailCall) {
1424 1425
      // Build the cap table.
      auto exports = connectionState->writeDescriptors(
1426
          message->getCapTable(), callBuilder.getParams());
1427

1428
      // Init the question table.  Do this after writing descriptors to avoid interference.
1429
      QuestionId questionId;
1430
      auto& question = connectionState->questions.next(questionId);
1431 1432 1433
      question.isAwaitingReturn = true;
      question.paramExports = kj::mv(exports);
      question.isTailCall = isTailCall;
1434

1435
      // Finish and send.
1436 1437 1438 1439
      callBuilder.setQuestionId(questionId);
      if (isTailCall) {
        callBuilder.getSendResultsTo().setYourself();
      }
1440
      message->send();
1441

1442
      // Make the result promise.
1443
      SendInternalResult result;
1444
      auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
1445
      result.questionRef = kj::refcounted<QuestionRef>(
1446
          *connectionState, questionId, kj::mv(paf.fulfiller));
1447
      question.selfRef = *result.questionRef;
1448
      result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
1449

1450
      // Send and return.
1451 1452
      return kj::mv(result);
    }
Kenton Varda's avatar
Kenton Varda committed
1453 1454
  };

Kenton Varda's avatar
Kenton Varda committed
1455
  class RpcPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1456
  public:
1457 1458
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef,
                kj::Promise<kj::Own<RpcResponse>>&& redirectLaterParam)
1459
        : connectionState(kj::addRef(connectionState)),
1460 1461 1462
          redirectLater(redirectLaterParam.fork()),
          resolveSelfPromise(KJ_ASSERT_NONNULL(redirectLater).addBranch().then(
              [this](kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1463
                resolve(kj::mv(response));
1464
              }, [this](kj::Exception&& exception) {
Kenton Varda's avatar
Kenton Varda committed
1465
                resolve(kj::mv(exception));
1466 1467 1468 1469
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
1470 1471 1472
              })) {
      // Construct a new RpcPipeline.

1473
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1474
    }
Kenton Varda's avatar
Kenton Varda committed
1475

1476
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef)
1477 1478 1479 1480
        : connectionState(kj::addRef(connectionState)),
          resolveSelfPromise(nullptr) {
      // Construct a new RpcPipeline that is never expected to resolve.

1481
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1482
    }
Kenton Varda's avatar
Kenton Varda committed
1483 1484 1485

    // implements PipelineHook ---------------------------------------

1486
    kj::Own<PipelineHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1487 1488 1489
      return kj::addRef(*this);
    }

1490
    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
Kenton Varda's avatar
Kenton Varda committed
1491 1492 1493 1494 1495 1496 1497
      auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
      for (auto& op: ops) {
        copy.add(op);
      }
      return getPipelinedCap(copy.finish());
    }

1498
    kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override {
1499
      if (state.is<Waiting>()) {
1500 1501
        // Wrap a PipelineClient in a PromiseClient.
        auto pipelineClient = kj::refcounted<PipelineClient>(
1502
            *connectionState, kj::addRef(*state.get<Waiting>()), kj::heapArray(ops.asPtr()));
1503

1504
        KJ_IF_MAYBE(r, redirectLater) {
1505 1506 1507 1508
          auto resolutionPromise = r->addBranch().then(kj::mvCapture(ops,
              [](kj::Array<PipelineOp> ops, kj::Own<RpcResponse>&& response) {
                return response->getResults().getPipelinedCap(ops);
              }));
1509

1510 1511 1512 1513 1514 1515
          return kj::refcounted<PromiseClient>(
              *connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise), nullptr);
        } else {
          // Oh, this pipeline will never get redirected, so just return the PipelineClient.
          return kj::mv(pipelineClient);
        }
1516 1517
      } else if (state.is<Resolved>()) {
        return state.get<Resolved>()->getResults().getPipelinedCap(ops);
Kenton Varda's avatar
Kenton Varda committed
1518
      } else {
1519
        return newBrokenCap(kj::cp(state.get<Broken>()));
Kenton Varda's avatar
Kenton Varda committed
1520
      }
Kenton Varda's avatar
Kenton Varda committed
1521 1522 1523
    }

  private:
1524 1525
    kj::Own<RpcConnectionState> connectionState;
    kj::Maybe<kj::ForkedPromise<kj::Own<RpcResponse>>> redirectLater;
Kenton Varda's avatar
Kenton Varda committed
1526

1527 1528
    typedef kj::Own<QuestionRef> Waiting;
    typedef kj::Own<RpcResponse> Resolved;
Kenton Varda's avatar
Kenton Varda committed
1529
    typedef kj::Exception Broken;
1530
    kj::OneOf<Waiting, Resolved, Broken> state;
Kenton Varda's avatar
Kenton Varda committed
1531 1532 1533 1534 1535

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

1536
    void resolve(kj::Own<RpcResponse>&& response) {
1537 1538
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Resolved>(kj::mv(response));
Kenton Varda's avatar
Kenton Varda committed
1539 1540 1541
    }

    void resolve(const kj::Exception&& exception) {
1542 1543
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Broken>(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1544
    }
Kenton Varda's avatar
Kenton Varda committed
1545 1546
  };

1547 1548
  class RpcResponse: public ResponseHook {
  public:
1549
    virtual AnyPointer::Reader getResults() = 0;
1550
    virtual kj::Own<RpcResponse> addRef() = 0;
1551 1552 1553
  };

  class RpcResponseImpl final: public RpcResponse, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1554
  public:
1555
    RpcResponseImpl(RpcConnectionState& connectionState,
1556 1557
                    kj::Own<QuestionRef>&& questionRef,
                    kj::Own<IncomingRpcMessage>&& message,
1558
                    AnyPointer::Reader results)
1559 1560
        : connectionState(kj::addRef(connectionState)),
          message(kj::mv(message)),
1561
          reader(results),
Kenton Varda's avatar
Kenton Varda committed
1562
          questionRef(kj::mv(questionRef)) {}
Kenton Varda's avatar
Kenton Varda committed
1563

1564
    AnyPointer::Reader getResults() override {
Kenton Varda's avatar
Kenton Varda committed
1565 1566 1567
      return reader;
    }

1568
    kj::Own<RpcResponse> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1569 1570 1571
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
1572
  private:
1573
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1574
    kj::Own<IncomingRpcMessage> message;
1575
    AnyPointer::Reader reader;
1576
    kj::Own<QuestionRef> questionRef;
Kenton Varda's avatar
Kenton Varda committed
1577 1578 1579 1580 1581 1582
  };

  // =====================================================================================
  // CallContextHook implementation

  class RpcServerResponse {
Kenton Varda's avatar
Kenton Varda committed
1583
  public:
1584
    virtual AnyPointer::Builder getResultsBuilder() = 0;
1585 1586 1587 1588
  };

  class RpcServerResponseImpl final: public RpcServerResponse {
  public:
1589
    RpcServerResponseImpl(RpcConnectionState& connectionState,
1590
                          kj::Own<OutgoingRpcMessage>&& message,
1591 1592 1593
                          rpc::Payload::Builder payload)
        : connectionState(connectionState),
          message(kj::mv(message)),
1594
          payload(payload) {}
Kenton Varda's avatar
Kenton Varda committed
1595

1596
    AnyPointer::Builder getResultsBuilder() override {
1597
      return payload.getContent();
Kenton Varda's avatar
Kenton Varda committed
1598 1599
    }

1600 1601 1602 1603 1604
    kj::Maybe<kj::Array<ExportId>> send() {
      // Send the response and return the export list.  Returns nullptr if there were no caps.
      // (Could return a non-null empty array if there were caps but none of them were exports.)

      // Build the cap table.
1605
      auto capTable = message->getCapTable();
1606 1607
      auto exports = connectionState.writeDescriptors(capTable, payload);

1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618
      // Capabilities that we are returning are subject to embargos. See `Disembargo` in rpc.capnp.
      // As explained there, in order to deal with the Tribble 4-way race condition, we need to
      // make sure that if we're returning any remote promises, that we ignore any subsequent
      // resolution of those promises for the purpose of pipelined requests on this answer. Luckily,
      // we can modify the cap table in-place.
      for (auto& slot: capTable) {
        KJ_IF_MAYBE(cap, slot) {
          slot = connectionState.getInnermostClient(**cap);
        }
      }

Kenton Varda's avatar
Kenton Varda committed
1619
      message->send();
1620 1621 1622 1623 1624
      if (capTable.size() == 0) {
        return nullptr;
      } else {
        return kj::mv(exports);
      }
Kenton Varda's avatar
Kenton Varda committed
1625 1626 1627
    }

  private:
1628
    RpcConnectionState& connectionState;
Kenton Varda's avatar
Kenton Varda committed
1629
    kj::Own<OutgoingRpcMessage> message;
1630
    rpc::Payload::Builder payload;
Kenton Varda's avatar
Kenton Varda committed
1631 1632
  };

1633 1634 1635
  class LocallyRedirectedRpcResponse final
      : public RpcResponse, public RpcServerResponse, public kj::Refcounted{
  public:
1636
    LocallyRedirectedRpcResponse(kj::Maybe<MessageSize> sizeHint)
1637 1638
        : message(sizeHint.map([](MessageSize size) { return size.wordCount; })
                          .orDefault(SUGGESTED_FIRST_SEGMENT_WORDS)) {}
1639

1640
    AnyPointer::Builder getResultsBuilder() override {
1641
      return message.getRoot<AnyPointer>();
1642 1643
    }

1644
    AnyPointer::Reader getResults() override {
1645
      return message.getRoot<AnyPointer>();
1646 1647
    }

1648
    kj::Own<RpcResponse> addRef() override {
1649 1650 1651 1652
      return kj::addRef(*this);
    }

  private:
1653
    MallocMessageBuilder message;
1654 1655
  };

Kenton Varda's avatar
Kenton Varda committed
1656 1657
  class RpcCallContext final: public CallContextHook, public kj::Refcounted {
  public:
1658
    RpcCallContext(RpcConnectionState& connectionState, AnswerId answerId,
1659
                   kj::Own<IncomingRpcMessage>&& request, const AnyPointer::Reader& params,
1660
                   bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller)
1661
        : connectionState(kj::addRef(connectionState)),
1662
          answerId(answerId),
Kenton Varda's avatar
Kenton Varda committed
1663
          request(kj::mv(request)),
1664
          params(params),
1665
          returnMessage(nullptr),
Kenton Varda's avatar
Kenton Varda committed
1666 1667
          redirectResults(redirectResults),
          cancelFulfiller(kj::mv(cancelFulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1668

1669 1670 1671 1672
    ~RpcCallContext() noexcept(false) {
      if (isFirstResponder()) {
        // We haven't sent a return yet, so we must have been canceled.  Send a cancellation return.
        unwindDetector.catchExceptionsIfUnwinding([&]() {
Kenton Varda's avatar
Kenton Varda committed
1673
          // Don't send anything if the connection is broken.
1674 1675
          if (connectionState->connection.is<Connected>()) {
            auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1676
                messageSizeHint<rpc::Return>() + sizeInWords<rpc::Payload>());
Kenton Varda's avatar
Kenton Varda committed
1677
            auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1678

1679
            builder.setAnswerId(answerId);
1680
            builder.setReleaseParamCaps(false);
1681

Kenton Varda's avatar
Kenton Varda committed
1682 1683 1684 1685 1686 1687 1688 1689 1690
            if (redirectResults) {
              // The reason we haven't sent a return is because the results were sent somewhere
              // else.
              builder.setResultsSentElsewhere();
            } else {
              builder.setCanceled();
            }

            message->send();
1691
          }
1692

1693
          cleanupAnswerTable(nullptr, true);
1694 1695 1696 1697
        });
      }
    }

1698
    kj::Own<RpcResponse> consumeRedirectedResponse() {
1699 1700
      KJ_ASSERT(redirectResults);

1701
      if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
1702 1703 1704 1705 1706 1707

      // Note that the context needs to keep its own reference to the response so that it doesn't
      // get GC'd until the PipelineHook drops its reference to the context.
      return kj::downcast<LocallyRedirectedRpcResponse>(*KJ_ASSERT_NONNULL(response)).addRef();
    }

Kenton Varda's avatar
Kenton Varda committed
1708
    void sendReturn() {
1709
      KJ_ASSERT(!redirectResults);
1710 1711 1712 1713

      // Avoid sending results if canceled so that we don't have to figure out whether or not
      // `releaseResultCaps` was set in the already-received `Finish`.
      if (!(cancellationFlags & CANCEL_REQUESTED) && isFirstResponder()) {
1714 1715 1716 1717 1718
        KJ_ASSERT(connectionState->connection.is<Connected>(),
                  "Cancellation should have been requested on disconnect.") {
          return;
        }

1719
        if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
Kenton Varda's avatar
Kenton Varda committed
1720

1721
        returnMessage.setAnswerId(answerId);
1722
        returnMessage.setReleaseParamCaps(false);
Kenton Varda's avatar
Kenton Varda committed
1723

1724 1725 1726 1727 1728 1729 1730 1731
        auto exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send();
        KJ_IF_MAYBE(e, exports) {
          // Caps were returned, so we can't free the pipeline yet.
          cleanupAnswerTable(kj::mv(*e), false);
        } else {
          // No caps in the results, therefore the pipeline is irrelevant.
          cleanupAnswerTable(nullptr, true);
        }
Kenton Varda's avatar
Kenton Varda committed
1732 1733 1734
      }
    }
    void sendErrorReturn(kj::Exception&& exception) {
1735
      KJ_ASSERT(!redirectResults);
Kenton Varda's avatar
Kenton Varda committed
1736
      if (isFirstResponder()) {
1737 1738 1739 1740
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
              messageSizeHint<rpc::Return>() + exceptionSizeHint(exception));
          auto builder = message->getBody().initAs<rpc::Message>().initReturn();
Kenton Varda's avatar
Kenton Varda committed
1741

1742 1743 1744
          builder.setAnswerId(answerId);
          builder.setReleaseParamCaps(false);
          fromException(exception, builder.initException());
Kenton Varda's avatar
Kenton Varda committed
1745

1746 1747
          message->send();
        }
1748 1749 1750 1751

        // Do not allow releasing the pipeline because we want pipelined calls to propagate the
        // exception rather than fail with a "no such field" exception.
        cleanupAnswerTable(nullptr, false);
Kenton Varda's avatar
Kenton Varda committed
1752 1753 1754
      }
    }

1755
    void requestCancel() {
Kenton Varda's avatar
Kenton Varda committed
1756 1757 1758 1759 1760 1761
      // Hints that the caller wishes to cancel this call.  At the next time when cancellation is
      // deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes
      // safe, the RpcCallContext will send a normal return when the call completes.  Either way
      // the RpcCallContext is now responsible for cleaning up the entry in the answer table, since
      // a Finish message was already received.

1762 1763 1764 1765
      bool previouslyAllowedButNotRequested = cancellationFlags == CANCEL_ALLOWED;
      cancellationFlags |= CANCEL_REQUESTED;

      if (previouslyAllowedButNotRequested) {
Kenton Varda's avatar
Kenton Varda committed
1766
        // We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously.  Initiate
Kenton Varda's avatar
Kenton Varda committed
1767
        // the cancellation.
Kenton Varda's avatar
Kenton Varda committed
1768
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1769
      }
Kenton Varda's avatar
Kenton Varda committed
1770
    }
1771 1772 1773

    // implements CallContextHook ------------------------------------

1774
    AnyPointer::Reader getParams() override {
1775 1776 1777 1778 1779 1780
      KJ_REQUIRE(request != nullptr, "Can't call getParams() after releaseParams().");
      return params;
    }
    void releaseParams() override {
      request = nullptr;
    }
1781
    AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
1782
      KJ_IF_MAYBE(r, response) {
1783
        return r->get()->getResultsBuilder();
1784
      } else {
1785 1786
        kj::Own<RpcServerResponse> response;

1787
        if (redirectResults || !connectionState->connection.is<Connected>()) {
1788
          response = kj::refcounted<LocallyRedirectedRpcResponse>(sizeHint);
1789
        } else {
1790
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1791 1792
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Return>() +
                               sizeInWords<rpc::Payload>()));
1793 1794 1795 1796 1797 1798
          returnMessage = message->getBody().initAs<rpc::Message>().initReturn();
          response = kj::heap<RpcServerResponseImpl>(
              *connectionState, kj::mv(message), returnMessage.getResults());
        }

        auto results = response->getResultsBuilder();
Kenton Varda's avatar
Kenton Varda committed
1799 1800
        this->response = kj::mv(response);
        return results;
1801 1802
      }
    }
1803 1804 1805
    kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
      auto result = directTailCall(kj::mv(request));
      KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
1806
        f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
1807 1808 1809 1810
      }
      return kj::mv(result.promise);
    }
    ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
1811 1812 1813
      KJ_REQUIRE(response == nullptr,
                 "Can't call tailCall() after initializing the results struct.");

1814
      if (request->getBrand() == connectionState.get() && !redirectResults) {
1815 1816 1817 1818 1819
        // The tail call is headed towards the peer that called us in the first place, so we can
        // optimize out the return trip.

        KJ_IF_MAYBE(tailInfo, kj::downcast<RpcRequest>(*request).tailSend()) {
          if (isFirstResponder()) {
1820 1821 1822 1823
            if (connectionState->connection.is<Connected>()) {
              auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
                  messageSizeHint<rpc::Return>());
              auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1824

1825 1826 1827
              builder.setAnswerId(answerId);
              builder.setReleaseParamCaps(false);
              builder.setTakeFromOtherQuestion(tailInfo->questionId);
1828

1829 1830
              message->send();
            }
1831 1832 1833

            // There are no caps in our return message, but of course the tail results could have
            // caps, so we must continue to honor pipeline calls (and just bounce them back).
1834
            cleanupAnswerTable(nullptr, false);
1835
          }
1836
          return { kj::mv(tailInfo->promise), kj::mv(tailInfo->pipeline) };
1837 1838 1839 1840 1841 1842 1843
        }
      }

      // Just forwarding to another local call.
      auto promise = request->send();

      // Wait for response.
1844
      auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
1845 1846 1847
        // Copy the response.
        // TODO(perf):  It would be nice if we could somehow make the response get built in-place
        //   but requires some refactoring.
1848
        getResults(tailResponse.targetSize()).set(tailResponse);
1849
      });
1850 1851

      return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
1852
    }
1853 1854
    kj::Promise<AnyPointer::Pipeline> onTailCall() override {
      auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
1855 1856 1857
      tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
      return kj::mv(paf.promise);
    }
1858
    void allowCancellation() override {
1859 1860 1861 1862
      bool previouslyRequestedButNotAllowed = cancellationFlags == CANCEL_REQUESTED;
      cancellationFlags |= CANCEL_ALLOWED;

      if (previouslyRequestedButNotAllowed) {
Kenton Varda's avatar
Kenton Varda committed
1863 1864 1865
        // We just set CANCEL_ALLOWED, and CANCEL_REQUESTED was already set previously.  Initiate
        // the cancellation.
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1866
      }
1867 1868 1869 1870 1871 1872
    }
    kj::Own<CallContextHook> addRef() override {
      return kj::addRef(*this);
    }

  private:
1873
    kj::Own<RpcConnectionState> connectionState;
1874
    AnswerId answerId;
1875

Kenton Varda's avatar
Kenton Varda committed
1876 1877 1878
    // Request ---------------------------------------------

    kj::Maybe<kj::Own<IncomingRpcMessage>> request;
1879
    AnyPointer::Reader params;
Kenton Varda's avatar
Kenton Varda committed
1880

Kenton Varda's avatar
Kenton Varda committed
1881 1882 1883
    // Response --------------------------------------------

    kj::Maybe<kj::Own<RpcServerResponse>> response;
Kenton Varda's avatar
Kenton Varda committed
1884
    rpc::Return::Builder returnMessage;
1885
    bool redirectResults = false;
Kenton Varda's avatar
Kenton Varda committed
1886
    bool responseSent = false;
1887
    kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1888 1889 1890 1891 1892 1893 1894 1895

    // Cancellation state ----------------------------------

    enum CancellationFlags {
      CANCEL_REQUESTED = 1,
      CANCEL_ALLOWED = 2
    };

1896
    uint8_t cancellationFlags = 0;
1897
    // When both flags are set, the cancellation process will begin.
Kenton Varda's avatar
Kenton Varda committed
1898

1899
    kj::Own<kj::PromiseFulfiller<void>> cancelFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1900 1901 1902
    // Fulfilled when cancellation has been both requested and permitted.  The fulfilled promise is
    // exclusive-joined with the outermost promise waiting on the call return, so fulfilling it
    // cancels that promise.
Kenton Varda's avatar
Kenton Varda committed
1903

1904 1905
    kj::UnwindDetector unwindDetector;

Kenton Varda's avatar
Kenton Varda committed
1906 1907 1908 1909 1910 1911 1912
    // -----------------------------------------------------

    bool isFirstResponder() {
      if (responseSent) {
        return false;
      } else {
        responseSent = true;
1913 1914 1915
        return true;
      }
    }
Kenton Varda's avatar
Kenton Varda committed
1916

1917
    void cleanupAnswerTable(kj::Array<ExportId> resultExports, bool shouldFreePipeline) {
1918 1919 1920
      // We need to remove the `callContext` pointer -- which points back to us -- from the
      // answer table.  Or we might even be responsible for removing the entire answer table
      // entry.
Kenton Varda's avatar
Kenton Varda committed
1921

1922
      if (cancellationFlags & CANCEL_REQUESTED) {
1923 1924 1925
        // Already received `Finish` so it's our job to erase the table entry. We shouldn't have
        // sent results if canceled, so we shouldn't have an export list to deal with.
        KJ_ASSERT(resultExports.size() == 0);
1926
        connectionState->answers.erase(answerId);
1927
      } else {
1928
        // We just have to null out callContext and set the exports.
1929
        auto& answer = connectionState->answers[answerId];
1930
        answer.callContext = nullptr;
1931 1932 1933 1934 1935 1936
        answer.resultExports = kj::mv(resultExports);

        if (shouldFreePipeline) {
          // We can free the pipeline early, because we know all pipeline calls are invalid (e.g.
          // because there are no caps in the result to receive pipeline requests).
          KJ_ASSERT(resultExports.size() == 0);
Kenton Varda's avatar
Kenton Varda committed
1937
          answer.pipeline = nullptr;
Kenton Varda's avatar
Kenton Varda committed
1938 1939 1940
        }
      }
    }
1941 1942
  };

Kenton Varda's avatar
Kenton Varda committed
1943 1944 1945
  // =====================================================================================
  // Message handling

1946
  kj::Promise<void> messageLoop() {
1947 1948 1949 1950 1951
    if (!connection.is<Connected>()) {
      return kj::READY_NOW;
    }

    return connection.get<Connected>()->receiveIncomingMessage().then(
1952
        [this](kj::Maybe<kj::Own<IncomingRpcMessage>>&& message) {
1953 1954
      KJ_IF_MAYBE(m, message) {
        handleMessage(kj::mv(*m));
1955
        return true;
1956
      } else {
1957
        disconnect(KJ_EXCEPTION(DISCONNECTED, "Peer disconnected."));
1958
        return false;
1959
      }
1960
    }).then([this](bool keepGoing) {
1961 1962 1963 1964
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
1965
      if (keepGoing) tasks.add(messageLoop());
1966
    });
1967 1968 1969 1970
  }

  void handleMessage(kj::Own<IncomingRpcMessage> message) {
    auto reader = message->getBody().getAs<rpc::Message>();
1971

1972 1973
    switch (reader.which()) {
      case rpc::Message::UNIMPLEMENTED:
Kenton Varda's avatar
Kenton Varda committed
1974
        handleUnimplemented(reader.getUnimplemented());
1975 1976 1977
        break;

      case rpc::Message::ABORT:
Kenton Varda's avatar
Kenton Varda committed
1978
        handleAbort(reader.getAbort());
1979 1980
        break;

1981 1982 1983 1984
      case rpc::Message::BOOTSTRAP:
        handleBootstrap(kj::mv(message), reader.getBootstrap());
        break;

1985
      case rpc::Message::CALL:
Kenton Varda's avatar
Kenton Varda committed
1986
        handleCall(kj::mv(message), reader.getCall());
1987 1988
        break;

Kenton Varda's avatar
Kenton Varda committed
1989
      case rpc::Message::RETURN:
Kenton Varda's avatar
Kenton Varda committed
1990
        handleReturn(kj::mv(message), reader.getReturn());
Kenton Varda's avatar
Kenton Varda committed
1991 1992 1993
        break;

      case rpc::Message::FINISH:
Kenton Varda's avatar
Kenton Varda committed
1994
        handleFinish(reader.getFinish());
Kenton Varda's avatar
Kenton Varda committed
1995 1996
        break;

Kenton Varda's avatar
Kenton Varda committed
1997
      case rpc::Message::RESOLVE:
1998
        handleResolve(reader.getResolve());
Kenton Varda's avatar
Kenton Varda committed
1999 2000 2001
        break;

      case rpc::Message::RELEASE:
2002
        handleRelease(reader.getRelease());
Kenton Varda's avatar
Kenton Varda committed
2003 2004
        break;

2005
      case rpc::Message::DISEMBARGO:
Kenton Varda's avatar
Kenton Varda committed
2006
        handleDisembargo(reader.getDisembargo());
2007 2008
        break;

2009
      default: {
2010 2011 2012 2013 2014 2015
        if (connection.is<Connected>()) {
          auto message = connection.get<Connected>()->newOutgoingMessage(
              firstSegmentSize(reader.totalSize(), messageSizeHint<void>()));
          message->getBody().initAs<rpc::Message>().setUnimplemented(reader);
          message->send();
        }
2016 2017 2018 2019 2020
        break;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2021
  void handleUnimplemented(const rpc::Message::Reader& message) {
Kenton Varda's avatar
Kenton Varda committed
2022
    switch (message.which()) {
2023 2024 2025
      case rpc::Message::RESOLVE: {
        auto cap = message.getResolve().getCap();
        switch (cap.which()) {
2026 2027 2028
          case rpc::CapDescriptor::NONE:
            // Nothing to do (but this ought never to happen).
            break;
2029
          case rpc::CapDescriptor::SENDER_HOSTED:
2030
            releaseExport(cap.getSenderHosted(), 1);
2031 2032
            break;
          case rpc::CapDescriptor::SENDER_PROMISE:
2033
            releaseExport(cap.getSenderPromise(), 1);
2034 2035 2036 2037 2038 2039
            break;
          case rpc::CapDescriptor::RECEIVER_ANSWER:
          case rpc::CapDescriptor::RECEIVER_HOSTED:
            // Nothing to do.
            break;
          case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
2040
            releaseExport(cap.getThirdPartyHosted().getVineId(), 1);
2041 2042
            break;
        }
Kenton Varda's avatar
Kenton Varda committed
2043
        break;
2044
      }
Kenton Varda's avatar
Kenton Varda committed
2045 2046 2047 2048 2049

      default:
        KJ_FAIL_ASSERT("Peer did not implement required RPC message type.", (uint)message.which());
        break;
    }
2050 2051
  }

Kenton Varda's avatar
Kenton Varda committed
2052
  void handleAbort(const rpc::Exception::Reader& exception) {
2053 2054 2055
    kj::throwRecoverableException(toException(exception));
  }

2056 2057 2058
  // ---------------------------------------------------------------------------
  // Level 0

2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141
  class SingleCapPipeline: public PipelineHook, public kj::Refcounted {
  public:
    SingleCapPipeline(kj::Own<ClientHook>&& cap)
        : cap(kj::mv(cap)) {}

    kj::Own<PipelineHook> addRef() override {
      return kj::addRef(*this);
    }

    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
      if (ops.size() == 0) {
        return cap->addRef();
      } else {
        return newBrokenCap("Invalid pipeline transform.");
      }
    }

  private:
    kj::Own<ClientHook> cap;
  };

  void handleBootstrap(kj::Own<IncomingRpcMessage>&& message,
                       const rpc::Bootstrap::Reader& bootstrap) {
    AnswerId answerId = bootstrap.getQuestionId();

    if (!connection.is<Connected>()) {
      // Disconnected; ignore.
      return;
    }

    auto response = connection.get<Connected>()->newOutgoingMessage(
        messageSizeHint<rpc::Return>() + sizeInWords<rpc::CapDescriptor>() + 32);

    rpc::Return::Builder ret = response->getBody().getAs<rpc::Message>().initReturn();
    ret.setAnswerId(answerId);

    kj::Own<ClientHook> capHook;
    kj::Array<ExportId> resultExports;
    KJ_DEFER(releaseExports(resultExports));  // in case something goes wrong

    // Call the restorer and initialize the answer.
    KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
      Capability::Client cap = nullptr;
      KJ_IF_MAYBE(r, restorer) {
        cap = r->baseRestore(bootstrap.getDeprecatedObjectId());
      } else KJ_IF_MAYBE(b, bootstrapInterface) {
        if (bootstrap.hasDeprecatedObjectId()) {
          KJ_FAIL_REQUIRE("This vat only supports a bootstrap interface, not the old "
                          "Cap'n-Proto-0.4-style named exports.") { return; }
        } else {
          cap = *b;
        }
      } else {
        KJ_FAIL_REQUIRE("This vat does not expose any public/bootstrap interfaces.") { return; }
      }

      auto payload = ret.initResults();
      payload.getContent().setAs<Capability>(kj::mv(cap));

      auto capTable = response->getCapTable();
      KJ_DASSERT(capTable.size() == 1);
      resultExports = writeDescriptors(capTable, payload);
      capHook = KJ_ASSERT_NONNULL(capTable[0])->addRef();
    })) {
      fromException(*exception, ret.initException());
      capHook = newBrokenCap(kj::mv(*exception));
    }

    message = nullptr;

    // Add the answer to the answer table for pipelining and send the response.
    auto& answer = answers[answerId];
    KJ_REQUIRE(!answer.active, "questionId is already in use") {
      return;
    }

    answer.resultExports = kj::mv(resultExports);
    answer.active = true;
    answer.pipeline = kj::Own<PipelineHook>(kj::refcounted<SingleCapPipeline>(kj::mv(capHook)));

    response->send();
  }

Kenton Varda's avatar
Kenton Varda committed
2142
  void handleCall(kj::Own<IncomingRpcMessage>&& message, const rpc::Call::Reader& call) {
2143
    kj::Own<ClientHook> capability;
2144

Kenton Varda's avatar
Kenton Varda committed
2145 2146 2147 2148 2149
    KJ_IF_MAYBE(t, getMessageTarget(call.getTarget())) {
      capability = kj::mv(*t);
    } else {
      // Exception already reported.
      return;
2150 2151
    }

2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163
    bool redirectResults;
    switch (call.getSendResultsTo().which()) {
      case rpc::Call::SendResultsTo::CALLER:
        redirectResults = false;
        break;
      case rpc::Call::SendResultsTo::YOURSELF:
        redirectResults = true;
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported `Call.sendResultsTo`.") { return; }
    }

2164
    auto payload = call.getParams();
2165
    message->initCapTable(receiveCaps(payload.getCapTable()));
Kenton Varda's avatar
Kenton Varda committed
2166 2167
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

2168
    AnswerId answerId = call.getQuestionId();
Kenton Varda's avatar
Kenton Varda committed
2169

2170
    auto context = kj::refcounted<RpcCallContext>(
2171
        *this, answerId, kj::mv(message), payload.getContent(),
2172
        redirectResults, kj::mv(cancelPaf.fulfiller));
2173

2174
    // No more using `call` after this point, as it now belongs to the context.
2175 2176

    {
2177
      auto& answer = answers[answerId];
2178 2179 2180 2181 2182 2183

      KJ_REQUIRE(!answer.active, "questionId is already in use") {
        return;
      }

      answer.active = true;
Kenton Varda's avatar
Kenton Varda committed
2184
      answer.callContext = *context;
Kenton Varda's avatar
Kenton Varda committed
2185 2186
    }

2187 2188
    auto promiseAndPipeline = startCall(
        call.getInterfaceId(), call.getMethodId(), kj::mv(capability), context->addRef());
Kenton Varda's avatar
Kenton Varda committed
2189

2190
    // Things may have changed -- in particular if startCall() immediately called
Kenton Varda's avatar
Kenton Varda committed
2191 2192 2193
    // context->directTailCall().

    {
2194
      auto& answer = answers[answerId];
Kenton Varda's avatar
Kenton Varda committed
2195

2196 2197
      answer.pipeline = kj::mv(promiseAndPipeline.pipeline);

2198
      if (redirectResults) {
Kenton Varda's avatar
Kenton Varda committed
2199
        auto resultsPromise = promiseAndPipeline.promise.then(
2200 2201 2202
            kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) {
              return context->consumeRedirectedResponse();
            }));
Kenton Varda's avatar
Kenton Varda committed
2203 2204

        // If the call that later picks up `redirectedResults` decides to discard it, we need to
2205
        // make sure our call is not itself canceled unless it has called allowCancellation().
Kenton Varda's avatar
Kenton Varda committed
2206 2207
        // So we fork the promise and join one branch with the cancellation promise, in order to
        // hold on to it.
2208
        auto forked = resultsPromise.fork();
Kenton Varda's avatar
Kenton Varda committed
2209 2210
        answer.redirectedResults = forked.addBranch();

2211 2212 2213
        cancelPaf.promise
            .exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}))
            .detach([](kj::Exception&&) {});
2214 2215 2216 2217 2218
      } else {
        // Hack:  Both the success and error continuations need to use the context.  We could
        //   refcount, but both will be destroyed at the same time anyway.
        RpcCallContext* contextPtr = context;

2219
        promiseAndPipeline.promise.then(
2220 2221 2222 2223 2224 2225 2226
            [contextPtr]() {
              contextPtr->sendReturn();
            }, [contextPtr](kj::Exception&& exception) {
              contextPtr->sendErrorReturn(kj::mv(exception));
            }).then([]() {}, [&](kj::Exception&& exception) {
              // Handle exceptions that occur in sendReturn()/sendErrorReturn().
              taskFailed(kj::mv(exception));
2227 2228 2229
            }).attach(kj::mv(context))
            .exclusiveJoin(kj::mv(cancelPaf.promise))
            .detach([](kj::Exception&&) {});
2230
      }
2231 2232 2233
    }
  }

2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260
  ClientHook::VoidPromiseAndPipeline startCall(
      uint64_t interfaceId, uint64_t methodId,
      kj::Own<ClientHook>&& capability, kj::Own<CallContextHook>&& context) {
    if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
      KJ_IF_MAYBE(g, gateway) {
        // Wait, this is a call to Persistent.save() and we need to translate it through our
        // gateway.

        auto params = context->getParams().getAs<Persistent<>::SaveParams>();

        auto requestSize = params.totalSize();
        ++requestSize.capCount;
        requestSize.wordCount += sizeInWords<RealmGateway<>::ExportParams>();

        auto request = g->exportRequest(requestSize);
        request.setCap(Persistent<>::Client(capability->addRef()));
        request.setParams(params);

        context->allowCancellation();
        context->releaseParams();
        return context->directTailCall(RequestHook::from(kj::mv(request)));
      }
    }

    return capability->call(interfaceId, methodId, context->addRef());
  }

2261
  kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) {
Kenton Varda's avatar
Kenton Varda committed
2262
    switch (target.which()) {
2263 2264
      case rpc::MessageTarget::IMPORTED_CAP: {
        KJ_IF_MAYBE(exp, exports.find(target.getImportedCap())) {
Kenton Varda's avatar
Kenton Varda committed
2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275
          return exp->clientHook->addRef();
        } else {
          KJ_FAIL_REQUIRE("Message target is not a current export ID.") {
            return nullptr;
          }
        }
        break;
      }

      case rpc::MessageTarget::PROMISED_ANSWER: {
        auto promisedAnswer = target.getPromisedAnswer();
2276
        kj::Own<PipelineHook> pipeline;
Kenton Varda's avatar
Kenton Varda committed
2277

2278 2279 2280 2281 2282 2283 2284 2285 2286
        auto& base = answers[promisedAnswer.getQuestionId()];
        KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") {
          return nullptr;
        }
        KJ_IF_MAYBE(p, base.pipeline) {
          pipeline = p->get()->addRef();
        } else {
          KJ_FAIL_REQUIRE("PromisedAnswer.questionId is already finished or contained no "
                          "capabilities.") {
Kenton Varda's avatar
Kenton Varda committed
2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303
            return nullptr;
          }
        }

        KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
          return pipeline->getPipelinedCap(*ops);
        } else {
          // Exception already thrown.
          return nullptr;
        }
      }

      default:
        KJ_FAIL_REQUIRE("Unknown message target type.", target) {
          return nullptr;
        }
    }
2304 2305

    KJ_UNREACHABLE;
Kenton Varda's avatar
Kenton Varda committed
2306 2307
  }

Kenton Varda's avatar
Kenton Varda committed
2308
  void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
Kenton Varda's avatar
Kenton Varda committed
2309 2310
    // Transitive destructors can end up manipulating the question table and invalidating our
    // pointer into it, so make sure these destructors run later.
2311 2312
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2313
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease;
2314

2315
    KJ_IF_MAYBE(question, questions.find(ret.getAnswerId())) {
2316 2317
      KJ_REQUIRE(question->isAwaitingReturn, "Duplicate Return.") { return; }
      question->isAwaitingReturn = false;
Kenton Varda's avatar
Kenton Varda committed
2318

2319 2320
      if (ret.getReleaseParamCaps()) {
        exportsToRelease = kj::mv(question->paramExports);
Kenton Varda's avatar
Kenton Varda committed
2321
      } else {
2322
        question->paramExports = nullptr;
Kenton Varda's avatar
Kenton Varda committed
2323
      }
2324

2325 2326
      KJ_IF_MAYBE(questionRef, question->selfRef) {
        switch (ret.which()) {
2327
          case rpc::Return::RESULTS: {
2328 2329 2330
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `results`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2331
            }
Kenton Varda's avatar
Kenton Varda committed
2332

2333
            auto payload = ret.getResults();
2334
            message->initCapTable(receiveCaps(payload.getCapTable()));
2335
            questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
2336
                *this, kj::addRef(*questionRef), kj::mv(message), payload.getContent()));
2337
            break;
2338
          }
2339

2340 2341 2342 2343
          case rpc::Return::EXCEPTION:
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2344
            }
2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356

            questionRef->reject(toException(ret.getException()));
            break;

          case rpc::Return::CANCELED:
            KJ_FAIL_REQUIRE("Return message falsely claims call was canceled.") { return; }
            break;

          case rpc::Return::RESULTS_SENT_ELSEWHERE:
            KJ_REQUIRE(question->isTailCall,
                "`Return` had `resultsSentElsewhere` but this was not a tail call.") {
              return;
2357 2358
            }

2359 2360 2361 2362
            // Tail calls are fulfilled with a null pointer.
            questionRef->fulfill(kj::Own<RpcResponse>());
            break;

2363 2364
          case rpc::Return::TAKE_FROM_OTHER_QUESTION:
            KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2365 2366 2367 2368 2369
              KJ_IF_MAYBE(response, answer->redirectedResults) {
                questionRef->fulfill(kj::mv(*response));
              } else {
                KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` referenced a call that did not "
                                "use `sendResultsTo.yourself`.") { return; }
2370 2371
              }
            } else {
2372
              KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` had invalid answer ID.") { return; }
2373 2374
            }

2375
            break;
2376

2377 2378 2379 2380
          default:
            KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; }
        }
      } else {
2381
        if (ret.isTakeFromOtherQuestion()) {
2382
          // Be sure to release the tail call's promise.
2383
          KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2384 2385 2386
            promiseToRelease = kj::mv(answer->redirectedResults);
          }
        }
Kenton Varda's avatar
Kenton Varda committed
2387

2388 2389
        // Looks like this question was canceled earlier, so `Finish` was already sent, with
        // `releaseResultCaps` set true so that we don't have to release them here.  We can go
2390
        // ahead and delete it from the table.
2391
        questions.erase(ret.getAnswerId(), *question);
Kenton Varda's avatar
Kenton Varda committed
2392
      }
Kenton Varda's avatar
Kenton Varda committed
2393

Kenton Varda's avatar
Kenton Varda committed
2394 2395 2396 2397 2398
    } else {
      KJ_FAIL_REQUIRE("Invalid question ID in Return message.") { return; }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2399
  void handleFinish(const rpc::Finish::Reader& finish) {
Kenton Varda's avatar
Kenton Varda committed
2400 2401
    // Delay release of these things until return so that transitive destructors don't accidentally
    // modify the answer table and invalidate our pointer into it.
2402 2403
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2404
    Answer answerToRelease;
2405
    kj::Maybe<kj::Own<PipelineHook>> pipelineToRelease;
Kenton Varda's avatar
Kenton Varda committed
2406

2407
    KJ_IF_MAYBE(answer, answers.find(finish.getQuestionId())) {
2408
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2409

2410 2411 2412 2413
      if (finish.getReleaseResultCaps()) {
        exportsToRelease = kj::mv(answer->resultExports);
      } else {
        answer->resultExports = nullptr;
2414
      }
Kenton Varda's avatar
Kenton Varda committed
2415

2416 2417
      pipelineToRelease = kj::mv(answer->pipeline);

2418 2419 2420 2421 2422
      // If the call isn't actually done yet, cancel it.  Otherwise, we can go ahead and erase the
      // question from the table.
      KJ_IF_MAYBE(context, answer->callContext) {
        context->requestCancel();
      } else {
Kenton Varda's avatar
Kenton Varda committed
2423
        answerToRelease = answers.erase(finish.getQuestionId());
2424
      }
Kenton Varda's avatar
Kenton Varda committed
2425
    } else {
2426
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2427
    }
2428 2429
  }

2430 2431 2432
  // ---------------------------------------------------------------------------
  // Level 1

2433
  void handleResolve(const rpc::Resolve::Reader& resolve) {
2434
    kj::Own<ClientHook> replacement;
2435
    kj::Maybe<kj::Exception> exception;
2436 2437 2438 2439

    // Extract the replacement capability.
    switch (resolve.which()) {
      case rpc::Resolve::CAP:
2440 2441 2442 2443 2444
        KJ_IF_MAYBE(cap, receiveCap(resolve.getCap())) {
          replacement = kj::mv(*cap);
        } else {
          KJ_FAIL_REQUIRE("'Resolve' contained 'CapDescriptor.none'.") { return; }
        }
2445 2446 2447
        break;

      case rpc::Resolve::EXCEPTION:
2448 2449 2450 2451
        // We can't set `replacement` to a new broken cap here because this will confuse
        // PromiseClient::Resolve() into thinking that the remote promise resolved to a local
        // capability and therefore a Disembargo is needed. We must actually reject the promise.
        exception = toException(resolve.getException());
2452 2453 2454 2455 2456 2457 2458
        break;

      default:
        KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; }
    }

    // If the import is on the table, fulfill it.
2459
    KJ_IF_MAYBE(import, imports.find(resolve.getPromiseId())) {
2460 2461
      KJ_IF_MAYBE(fulfiller, import->promiseFulfiller) {
        // OK, this is in fact an unfulfilled promise!
2462 2463 2464 2465 2466
        KJ_IF_MAYBE(e, exception) {
          fulfiller->get()->reject(kj::mv(*e));
        } else {
          fulfiller->get()->fulfill(kj::mv(replacement));
        }
2467 2468 2469 2470 2471 2472 2473 2474
      } else if (import->importClient != nullptr) {
        // It appears this is a valid entry on the import table, but was not expected to be a
        // promise.
        KJ_FAIL_REQUIRE("Got 'Resolve' for a non-promise import.") { break; }
      }
    }
  }

2475
  void handleRelease(const rpc::Release::Reader& release) {
Kenton Varda's avatar
Kenton Varda committed
2476
    releaseExport(release.getId(), release.getReferenceCount());
2477 2478
  }

Kenton Varda's avatar
Kenton Varda committed
2479
  void releaseExport(ExportId id, uint refcount) {
2480
    KJ_IF_MAYBE(exp, exports.find(id)) {
2481
      KJ_REQUIRE(refcount <= exp->refcount, "Tried to drop export's refcount below zero.") {
Kenton Varda's avatar
Kenton Varda committed
2482
        return;
2483 2484 2485 2486
      }

      exp->refcount -= refcount;
      if (exp->refcount == 0) {
2487
        exportsByCap.erase(exp->clientHook);
2488
        exports.erase(id, *exp);
2489 2490 2491
      }
    } else {
      KJ_FAIL_REQUIRE("Tried to release invalid export ID.") {
Kenton Varda's avatar
Kenton Varda committed
2492
        return;
2493 2494 2495 2496
      }
    }
  }

2497 2498 2499 2500 2501 2502
  void releaseExports(kj::ArrayPtr<ExportId> exports) {
    for (auto exportId: exports) {
      releaseExport(exportId, 1);
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2503 2504 2505 2506
  void handleDisembargo(const rpc::Disembargo::Reader& disembargo) {
    auto context = disembargo.getContext();
    switch (context.which()) {
      case rpc::Disembargo::Context::SENDER_LOOPBACK: {
2507
        kj::Own<ClientHook> target;
Kenton Varda's avatar
Kenton Varda committed
2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529

        KJ_IF_MAYBE(t, getMessageTarget(disembargo.getTarget())) {
          target = kj::mv(*t);
        } else {
          // Exception already reported.
          return;
        }

        for (;;) {
          KJ_IF_MAYBE(r, target->getResolved()) {
            target = r->addRef();
          } else {
            break;
          }
        }

        KJ_REQUIRE(target->getBrand() == this,
                   "'Disembargo' of type 'senderLoopback' sent to an object that does not point "
                   "back to the sender.") {
          return;
        }

Kenton Varda's avatar
Kenton Varda committed
2530 2531 2532 2533
        EmbargoId embargoId = context.getSenderLoopback();

        // We need to insert an evalLater() here to make sure that any pending calls towards this
        // cap have had time to find their way through the event loop.
2534 2535
        tasks.add(kj::evalLater(kj::mvCapture(
            target, [this,embargoId](kj::Own<ClientHook>&& target) {
2536 2537 2538 2539
          if (!connection.is<Connected>()) {
            return;
          }

2540
          RpcClient& downcasted = kj::downcast<RpcClient>(*target);
Kenton Varda's avatar
Kenton Varda committed
2541

2542
          auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
2543 2544 2545 2546 2547 2548
              messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
          auto builder = message->getBody().initAs<rpc::Message>().initDisembargo();

          {
            auto redirect = downcasted.writeTarget(builder.initTarget());

2549
            // Disembargoes should only be sent to capabilities that were previously the subject of
Kenton Varda's avatar
Kenton Varda committed
2550
            // a `Resolve` message.  But `writeTarget` only ever returns non-null when called on
2551 2552 2553
            // a PromiseClient.  The code which sends `Resolve` and `Return` should have replaced
            // any promise with a direct node in order to solve the Tribble 4-way race condition.
            // See the documentation of Disembargo in rpc.capnp for more.
Kenton Varda's avatar
Kenton Varda committed
2554 2555
            KJ_REQUIRE(redirect == nullptr,
                       "'Disembargo' of type 'senderLoopback' sent to an object that does not "
2556
                       "appear to have been the subject of a previous 'Resolve' message.") {
Kenton Varda's avatar
Kenton Varda committed
2557 2558
              return;
            }
Kenton Varda's avatar
Kenton Varda committed
2559 2560
          }

Kenton Varda's avatar
Kenton Varda committed
2561
          builder.getContext().setReceiverLoopback(embargoId);
Kenton Varda's avatar
Kenton Varda committed
2562

Kenton Varda's avatar
Kenton Varda committed
2563 2564
          message->send();
        })));
Kenton Varda's avatar
Kenton Varda committed
2565 2566 2567 2568

        break;
      }

Kenton Varda's avatar
Kenton Varda committed
2569
      case rpc::Disembargo::Context::RECEIVER_LOOPBACK: {
2570
        KJ_IF_MAYBE(embargo, embargoes.find(context.getReceiverLoopback())) {
Kenton Varda's avatar
Kenton Varda committed
2571
          KJ_ASSERT_NONNULL(embargo->fulfiller)->fulfill();
2572
          embargoes.erase(context.getReceiverLoopback(), *embargo);
Kenton Varda's avatar
Kenton Varda committed
2573 2574 2575 2576 2577 2578
        } else {
          KJ_FAIL_REQUIRE("Invalid embargo ID in 'Disembargo.context.receiverLoopback'.") {
            return;
          }
        }
        break;
Kenton Varda's avatar
Kenton Varda committed
2579
      }
Kenton Varda's avatar
Kenton Varda committed
2580 2581 2582 2583 2584 2585

      default:
        KJ_FAIL_REQUIRE("Unimplemented Disembargo type.", disembargo) { return; }
    }
  }

2586 2587
  // ---------------------------------------------------------------------------
  // Level 2
2588 2589 2590 2591
};

}  // namespace

2592
class RpcSystemBase::Impl final: public kj::TaskSet::ErrorHandler {
2593
public:
2594 2595 2596 2597
  Impl(VatNetworkBase& network, kj::Maybe<Capability::Client> bootstrapInterface,
       kj::Maybe<RealmGateway<>::Client> gateway)
      : network(network), bootstrapInterface(kj::mv(bootstrapInterface)),
        gateway(kj::mv(gateway)), tasks(*this) {
2598 2599 2600
    tasks.add(acceptLoop());
  }
  Impl(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2601
      : network(network), restorer(restorer), tasks(*this) {
2602 2603 2604 2605
    tasks.add(acceptLoop());
  }

  ~Impl() noexcept(false) {
2606 2607 2608 2609 2610
    unwindDetector.catchExceptionsIfUnwinding([&]() {
      // std::unordered_map doesn't like it when elements' destructors throw, so carefully
      // disassemble it.
      if (!connections.empty()) {
        kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size());
2611
        kj::Exception shutdownException = KJ_EXCEPTION(FAILED, "RpcSystem was destroyed.");
2612 2613 2614 2615
        for (auto& entry: connections) {
          entry.second->disconnect(kj::cp(shutdownException));
          deleteMe.add(kj::mv(entry.second));
        }
2616
      }
2617
    });
2618
  }
2619

2620 2621 2622 2623 2624 2625 2626 2627
  Capability::Client bootstrap(_::StructReader vatId) {
    // For now we delegate to restore() since it's equivalent, but eventually we'll remove restore()
    // and implement bootstrap() directly.
    return restore(vatId, AnyPointer::Reader());
  }

  Capability::Client restore(_::StructReader vatId, AnyPointer::Reader objectId) {
    KJ_IF_MAYBE(connection, network.baseConnect(vatId)) {
2628
      auto& state = getConnectionState(kj::mv(*connection));
2629 2630 2631 2632 2633 2634 2635
      return Capability::Client(state.restore(objectId));
    } else KJ_IF_MAYBE(r, restorer) {
      return r->baseRestore(objectId);
    } else {
      return Capability::Client(newBrokenCap(
          "SturdyRef referred to a local object but there is no local SturdyRef restorer."));
    }
2636 2637 2638
  }

  void taskFailed(kj::Exception&& exception) override {
2639
    KJ_LOG(ERROR, exception);
Kenton Varda's avatar
Kenton Varda committed
2640 2641
  }

2642 2643
private:
  VatNetworkBase& network;
2644
  kj::Maybe<Capability::Client> bootstrapInterface;
2645
  kj::Maybe<RealmGateway<>::Client> gateway;
2646 2647 2648 2649 2650
  kj::Maybe<SturdyRefRestorerBase&> restorer;
  kj::TaskSet tasks;

  typedef std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>>
      ConnectionMap;
2651
  ConnectionMap connections;
Kenton Varda's avatar
Kenton Varda committed
2652

2653 2654
  kj::UnwindDetector unwindDetector;

2655 2656 2657
  RpcConnectionState& getConnectionState(kj::Own<VatNetworkBase::Connection>&& connection) {
    auto iter = connections.find(connection);
    if (iter == connections.end()) {
2658
      VatNetworkBase::Connection* connectionPtr = connection;
Kenton Varda's avatar
Kenton Varda committed
2659 2660 2661
      auto onDisconnect = kj::newPromiseAndFulfiller<RpcConnectionState::DisconnectInfo>();
      tasks.add(onDisconnect.promise
          .then([this,connectionPtr](RpcConnectionState::DisconnectInfo info) {
2662
        connections.erase(connectionPtr);
Kenton Varda's avatar
Kenton Varda committed
2663
        tasks.add(kj::mv(info.shutdownPromise));
2664 2665
      }));
      auto newState = kj::refcounted<RpcConnectionState>(
2666 2667
          bootstrapInterface, gateway, restorer, kj::mv(connection),
          kj::mv(onDisconnect.fulfiller));
2668
      RpcConnectionState& result = *newState;
2669
      connections.insert(std::make_pair(connectionPtr, kj::mv(newState)));
2670 2671 2672 2673 2674 2675 2676
      return result;
    } else {
      return *iter->second;
    }
  }

  kj::Promise<void> acceptLoop() {
2677
    auto receive = network.baseAccept().then(
2678
        [this](kj::Own<VatNetworkBase::Connection>&& connection) {
2679
      getConnectionState(kj::mv(connection));
2680 2681 2682 2683 2684 2685 2686 2687
    });
    return receive.then([this]() {
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
      tasks.add(acceptLoop());
    });
2688
  }
2689 2690
};

2691
RpcSystemBase::RpcSystemBase(VatNetworkBase& network,
2692 2693 2694
                             kj::Maybe<Capability::Client> bootstrapInterface,
                             kj::Maybe<RealmGateway<>::Client> gateway)
    : impl(kj::heap<Impl>(network, kj::mv(bootstrapInterface), kj::mv(gateway))) {}
2695
RpcSystemBase::RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2696
    : impl(kj::heap<Impl>(network, restorer)) {}
2697
RpcSystemBase::RpcSystemBase(RpcSystemBase&& other) noexcept = default;
2698 2699
RpcSystemBase::~RpcSystemBase() noexcept(false) {}

2700 2701 2702 2703
Capability::Client RpcSystemBase::baseBootstrap(_::StructReader vatId) {
  return impl->bootstrap(vatId);
}

2704
Capability::Client RpcSystemBase::baseRestore(
2705
    _::StructReader hostId, AnyPointer::Reader objectId) {
2706
  return impl->restore(hostId, objectId);
2707 2708 2709 2710
}

}  // namespace _ (private)
}  // namespace capnp