rpc.c++ 107 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22

#include "rpc.h"
23
#include "message.h"
24 25 26
#include <kj/debug.h>
#include <kj/vector.h>
#include <kj/async.h>
Kenton Varda's avatar
Kenton Varda committed
27
#include <kj/one-of.h>
Kenton Varda's avatar
Kenton Varda committed
28
#include <kj/function.h>
29
#include <functional>  // std::greater
30
#include <unordered_map>
Kenton Varda's avatar
Kenton Varda committed
31
#include <map>
32 33 34 35 36 37 38 39
#include <queue>
#include <capnp/rpc.capnp.h>

namespace capnp {
namespace _ {  // private

namespace {

Kenton Varda's avatar
Kenton Varda committed
40 41 42 43 44 45 46 47 48
template <typename T>
inline constexpr uint messageSizeHint() {
  return 1 + sizeInWords<rpc::Message>() + sizeInWords<T>();
}
template <>
inline constexpr uint messageSizeHint<void>() {
  return 1 + sizeInWords<rpc::Message>();
}

49 50 51
constexpr const uint MESSAGE_TARGET_SIZE_HINT = sizeInWords<rpc::MessageTarget>() +
    sizeInWords<rpc::PromisedAnswer>() + 16;  // +16 for ops; hope that's enough

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
constexpr const uint CAP_DESCRIPTOR_SIZE_HINT = sizeInWords<rpc::CapDescriptor>() +
    sizeInWords<rpc::PromisedAnswer>();

constexpr const uint64_t MAX_SIZE_HINT = 1 << 20;

uint copySizeHint(MessageSize size) {
  uint64_t sizeHint = size.wordCount + size.capCount * CAP_DESCRIPTOR_SIZE_HINT;
  return kj::min(MAX_SIZE_HINT, sizeHint);
}

uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint, uint additional) {
  KJ_IF_MAYBE(s, sizeHint) {
    return copySizeHint(*s) + additional;
  } else {
    return 0;
  }
}

Kenton Varda's avatar
Kenton Varda committed
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
kj::Maybe<kj::Array<PipelineOp>> toPipelineOps(List<rpc::PromisedAnswer::Op>::Reader ops) {
  auto result = kj::heapArrayBuilder<PipelineOp>(ops.size());
  for (auto opReader: ops) {
    PipelineOp op;
    switch (opReader.which()) {
      case rpc::PromisedAnswer::Op::NOOP:
        op.type = PipelineOp::NOOP;
        break;
      case rpc::PromisedAnswer::Op::GET_POINTER_FIELD:
        op.type = PipelineOp::GET_POINTER_FIELD;
        op.pointerIndex = opReader.getGetPointerField();
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported pipeline op.", (uint)opReader.which()) {
          return nullptr;
        }
    }
87
    result.add(op);
Kenton Varda's avatar
Kenton Varda committed
88 89 90 91 92
  }
  return result.finish();
}

Orphan<List<rpc::PromisedAnswer::Op>> fromPipelineOps(
Kenton Varda's avatar
Kenton Varda committed
93
    Orphanage orphanage, kj::ArrayPtr<const PipelineOp> ops) {
Kenton Varda's avatar
Kenton Varda committed
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
  auto result = orphanage.newOrphan<List<rpc::PromisedAnswer::Op>>(ops.size());
  auto builder = result.get();
  for (uint i: kj::indices(ops)) {
    rpc::PromisedAnswer::Op::Builder opBuilder = builder[i];
    switch (ops[i].type) {
      case PipelineOp::NOOP:
        opBuilder.setNoop();
        break;
      case PipelineOp::GET_POINTER_FIELD:
        opBuilder.setGetPointerField(ops[i].pointerIndex);
        break;
    }
  }
  return result;
}

Kenton Varda's avatar
Kenton Varda committed
110
kj::Exception toException(const rpc::Exception::Reader& exception) {
111 112
  return kj::Exception(static_cast<kj::Exception::Type>(exception.getType()),
      "(remote)", 0, kj::str("remote exception: ", exception.getReason()));
Kenton Varda's avatar
Kenton Varda committed
113 114 115
}

void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) {
Kenton Varda's avatar
Kenton Varda committed
116 117
  // TODO(someday):  Indicate the remote server name as part of the stack trace.  Maybe even
  //   transmit stack traces?
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

  kj::StringPtr description = exception.getDescription();

  // Include context, if any.
  kj::Vector<kj::String> contextLines;
  for (auto context = exception.getContext();;) {
    KJ_IF_MAYBE(c, context) {
      contextLines.add(kj::str("context: ", c->file, ": ", c->line, ": ", c->description));
      context = c->next;
    } else {
      break;
    }
  }
  kj::String scratch;
  if (contextLines.size() > 0) {
    scratch = kj::str(description, '\n', kj::strArray(contextLines, "\n"));
    description = scratch;
  }

  builder.setReason(description);
138
  builder.setType(static_cast<rpc::Exception::Type>(exception.getType()));
Kenton Varda's avatar
Kenton Varda committed
139 140 141 142 143

  if (exception.getType() == kj::Exception::Type::FAILED &&
      !exception.getDescription().startsWith("remote exception:")) {
    KJ_LOG(INFO, "returning failure over rpc", exception);
  }
Kenton Varda's avatar
Kenton Varda committed
144 145
}

Kenton Varda's avatar
Kenton Varda committed
146
uint exceptionSizeHint(const kj::Exception& exception) {
Kenton Varda's avatar
Kenton Varda committed
147
  return sizeInWords<rpc::Exception>() + exception.getDescription().size() / sizeof(word) + 1;
Kenton Varda's avatar
Kenton Varda committed
148 149
}

Kenton Varda's avatar
Kenton Varda committed
150
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
151

152 153 154 155 156 157 158 159 160 161 162 163 164
template <typename Id, typename T>
class ExportTable {
  // Table mapping integers to T, where the integers are chosen locally.

public:
  kj::Maybe<T&> find(Id id) {
    if (id < slots.size() && slots[id] != nullptr) {
      return slots[id];
    } else {
      return nullptr;
    }
  }

165
  T erase(Id id, T& entry) {
Kenton Varda's avatar
Kenton Varda committed
166 167
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
168 169 170 171 172 173 174 175
    // `entry` is a reference to the entry being released -- we require this in order to prove
    // that the caller has already done a find() to check that this entry exists.  We can't check
    // ourselves because the caller may have nullified the entry in the meantime.
    KJ_DREQUIRE(&entry == &slots[id]);
    T toRelease = kj::mv(slots[id]);
    slots[id] = T();
    freeIds.push(id);
    return toRelease;
176 177 178 179 180 181 182 183 184 185 186 187 188
  }

  T& next(Id& id) {
    if (freeIds.empty()) {
      id = slots.size();
      return slots.add();
    } else {
      id = freeIds.top();
      freeIds.pop();
      return slots[id];
    }
  }

189 190 191 192 193 194 195 196 197
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i = 0; i < slots.size(); i++) {
      if (slots[i] != nullptr) {
        func(i, slots[i]);
      }
    }
  }

198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
private:
  kj::Vector<T> slots;
  std::priority_queue<Id, std::vector<Id>, std::greater<Id>> freeIds;
};

template <typename Id, typename T>
class ImportTable {
  // Table mapping integers to T, where the integers are chosen remotely.

public:
  T& operator[](Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      return high[id];
    }
  }

Kenton Varda's avatar
Kenton Varda committed
216 217 218 219 220
  kj::Maybe<T&> find(Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      auto iter = high.find(id);
Kenton Varda's avatar
Kenton Varda committed
221
      if (iter == high.end()) {
Kenton Varda's avatar
Kenton Varda committed
222 223 224 225 226 227 228
        return nullptr;
      } else {
        return iter->second;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
229 230 231
  T erase(Id id) {
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
232
    if (id < kj::size(low)) {
Kenton Varda's avatar
Kenton Varda committed
233
      T toRelease = kj::mv(low[id]);
234
      low[id] = T();
Kenton Varda's avatar
Kenton Varda committed
235
      return toRelease;
236
    } else {
Kenton Varda's avatar
Kenton Varda committed
237
      T toRelease = kj::mv(high[id]);
238
      high.erase(id);
Kenton Varda's avatar
Kenton Varda committed
239
      return toRelease;
240 241 242
    }
  }

243 244 245 246 247 248 249 250 251 252
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i: kj::indices(low)) {
      func(i, low[i]);
    }
    for (auto& entry: high) {
      func(entry.first, entry.second);
    }
  }

253 254 255 256 257
private:
  T low[16];
  std::unordered_map<Id, T> high;
};

Kenton Varda's avatar
Kenton Varda committed
258 259
// =======================================================================================

260
class RpcConnectionState final: public kj::TaskSet::ErrorHandler, public kj::Refcounted {
261
public:
Kenton Varda's avatar
Kenton Varda committed
262 263 264 265 266
  struct DisconnectInfo {
    kj::Promise<void> shutdownPromise;
    // Task which is working on sending an abort message and cleanly ending the connection.
  };

267
  RpcConnectionState(BootstrapFactoryBase& bootstrapFactory,
268
                     kj::Maybe<RealmGateway<>::Client> gateway,
269
                     kj::Maybe<SturdyRefRestorerBase&> restorer,
270
                     kj::Own<VatNetworkBase::Connection>&& connectionParam,
271 272
                     kj::Own<kj::PromiseFulfiller<DisconnectInfo>>&& disconnectFulfiller,
                     size_t flowLimit)
273
      : bootstrapFactory(bootstrapFactory), gateway(kj::mv(gateway)),
274 275
        restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), flowLimit(flowLimit),
        tasks(*this) {
276
    connection.init<Connected>(kj::mv(connectionParam));
277 278 279
    tasks.add(messageLoop());
  }

280
  kj::Own<ClientHook> restore(AnyPointer::Reader objectId) {
281 282 283 284
    if (connection.is<Disconnected>()) {
      return newBrokenCap(kj::cp(connection.get<Disconnected>()));
    }

285
    QuestionId questionId;
286
    auto& question = questions.next(questionId);
287

288
    question.isAwaitingReturn = true;
289

290
    auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
Kenton Varda's avatar
Kenton Varda committed
291

292 293
    auto questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller));
    question.selfRef = *questionRef;
Kenton Varda's avatar
Kenton Varda committed
294

295
    paf.promise = paf.promise.attach(kj::addRef(*questionRef));
296 297

    {
298
      auto message = connection.get<Connected>()->newOutgoingMessage(
299
          objectId.targetSize().wordCount + messageSizeHint<rpc::Bootstrap>());
300

301
      auto builder = message->getBody().initAs<rpc::Message>().initBootstrap();
302
      builder.setQuestionId(questionId);
303
      builder.getDeprecatedObjectId().set(objectId);
304 305 306 307

      message->send();
    }

308
    auto pipeline = kj::refcounted<RpcPipeline>(*this, kj::mv(questionRef), kj::mv(paf.promise));
309

310
    return pipeline->getPipelinedCap(kj::Array<const PipelineOp>(nullptr));
Kenton Varda's avatar
Kenton Varda committed
311 312
  }

313
  void taskFailed(kj::Exception&& exception) override {
314 315 316 317
    disconnect(kj::mv(exception));
  }

  void disconnect(kj::Exception&& exception) {
318 319 320 321 322
    if (!connection.is<Connected>()) {
      // Already disconnected.
      return;
    }

323 324
    kj::Exception networkException(kj::Exception::Type::DISCONNECTED,
        exception.getFile(), exception.getLine(), kj::heapString(exception.getDescription()));
325 326

    KJ_IF_MAYBE(newException, kj::runCatchingExceptions([&]() {
327 328
      // Carefully pull all the objects out of the tables prior to releasing them because their
      // destructors could come back and mess with the tables.
329 330 331 332
      kj::Vector<kj::Own<PipelineHook>> pipelinesToRelease;
      kj::Vector<kj::Own<ClientHook>> clientsToRelease;
      kj::Vector<kj::Promise<kj::Own<RpcResponse>>> tailCallsToRelease;
      kj::Vector<kj::Promise<void>> resolveOpsToRelease;
333 334

      // All current questions complete with exceptions.
335
      questions.forEach([&](QuestionId id, Question& question) {
Kenton Varda's avatar
Kenton Varda committed
336
        KJ_IF_MAYBE(questionRef, question.selfRef) {
337 338
          // QuestionRef still present.
          questionRef->reject(kj::cp(networkException));
Kenton Varda's avatar
Kenton Varda committed
339
        }
340 341
      });

342
      answers.forEach([&](AnswerId id, Answer& answer) {
343 344 345 346
        KJ_IF_MAYBE(p, answer.pipeline) {
          pipelinesToRelease.add(kj::mv(*p));
        }

347
        KJ_IF_MAYBE(promise, answer.redirectedResults) {
348
          tailCallsToRelease.add(kj::mv(*promise));
349 350
        }

351 352 353 354 355
        KJ_IF_MAYBE(context, answer.callContext) {
          context->requestCancel();
        }
      });

356
      exports.forEach([&](ExportId id, Export& exp) {
357
        clientsToRelease.add(kj::mv(exp.clientHook));
358
        resolveOpsToRelease.add(kj::mv(exp.resolveOp));
359 360 361
        exp = Export();
      });

362
      imports.forEach([&](ImportId id, Import& import) {
363 364
        KJ_IF_MAYBE(f, import.promiseFulfiller) {
          f->get()->reject(kj::cp(networkException));
365 366 367
        }
      });

368
      embargoes.forEach([&](EmbargoId id, Embargo& embargo) {
369 370 371 372
        KJ_IF_MAYBE(f, embargo.fulfiller) {
          f->get()->reject(kj::cp(networkException));
        }
      });
373 374 375 376 377
    })) {
      // Some destructor must have thrown an exception.  There is no appropriate place to report
      // these errors.
      KJ_LOG(ERROR, "Uncaught exception when destroying capabilities dropped by disconnect.",
             *newException);
378 379
    }

380 381 382
    // Send an abort message, but ignore failure.
    kj::runCatchingExceptions([&]() {
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
383
          messageSizeHint<void>() + exceptionSizeHint(exception));
384 385
      fromException(exception, message->getBody().getAs<rpc::Message>().initAbort());
      message->send();
386
    });
387 388

    // Indicate disconnect.
389 390 391 392 393 394 395 396 397 398 399
    auto shutdownPromise = connection.get<Connected>()->shutdown()
        .attach(kj::mv(connection.get<Connected>()))
        .then([]() -> kj::Promise<void> { return kj::READY_NOW; },
              [](kj::Exception&& e) -> kj::Promise<void> {
          // Don't report disconnects as an error.
          if (e.getType() != kj::Exception::Type::DISCONNECTED) {
            return kj::mv(e);
          }
          return kj::READY_NOW;
        });
    disconnectFulfiller->fulfill(DisconnectInfo { kj::mv(shutdownPromise) });
400
    connection.init<Disconnected>(kj::mv(networkException));
401 402
  }

403 404 405 406 407
  void setFlowLimit(size_t words) {
    flowLimit = words;
    maybeUnblockFlow();
  }

408
private:
409
  class RpcClient;
Kenton Varda's avatar
Kenton Varda committed
410
  class ImportClient;
411
  class PromiseClient;
Kenton Varda's avatar
Kenton Varda committed
412
  class QuestionRef;
Kenton Varda's avatar
Kenton Varda committed
413
  class RpcPipeline;
Kenton Varda's avatar
Kenton Varda committed
414
  class RpcCallContext;
Kenton Varda's avatar
Kenton Varda committed
415
  class RpcResponse;
Kenton Varda's avatar
Kenton Varda committed
416

417 418 419 420 421 422
  // =======================================================================================
  // The Four Tables entry types
  //
  // We have to define these before we can define the class's fields.

  typedef uint32_t QuestionId;
423
  typedef QuestionId AnswerId;
424
  typedef uint32_t ExportId;
425 426 427 428 429 430 431 432 433 434
  typedef ExportId ImportId;
  // See equivalent definitions in rpc.capnp.
  //
  // We always use the type that refers to the local table of the same name.  So e.g. although
  // QuestionId and AnswerId are the same type, we use QuestionId when referring to an entry in
  // the local question table (which corresponds to the peer's answer table) and use AnswerId
  // to refer to an entry in our answer table (which corresponds to the peer's question table).
  // Since all messages in the RPC protocol are defined from the sender's point of view, this
  // means that any time we read an ID from a received message, its type should invert.
  // TODO(cleanup):  Perhaps we could enforce that in a type-safe way?  Hmm...
435 436

  struct Question {
437 438 439
    kj::Array<ExportId> paramExports;
    // List of exports that were sent in the request.  If the response has `releaseParamCaps` these
    // will need to be released.
440

Kenton Varda's avatar
Kenton Varda committed
441 442 443
    kj::Maybe<QuestionRef&> selfRef;
    // The local QuestionRef, set to nullptr when it is destroyed, which is also when `Finish` is
    // sent.
444

445 446 447
    bool isAwaitingReturn = false;
    // True from when `Call` is sent until `Return` is received.

448 449 450
    bool isTailCall = false;
    // Is this a tail call?  If so, we don't expect to receive results in the `Return`.

451 452 453
    bool skipFinish = false;
    // If true, don't send a Finish message.

Kenton Varda's avatar
Kenton Varda committed
454
    inline bool operator==(decltype(nullptr)) const {
455
      return !isAwaitingReturn && selfRef == nullptr;
Kenton Varda's avatar
Kenton Varda committed
456 457
    }
    inline bool operator!=(decltype(nullptr)) const { return !operator==(nullptr); }
458 459 460
  };

  struct Answer {
461 462 463 464 465 466
    Answer() = default;
    Answer(const Answer&) = delete;
    Answer(Answer&&) = default;
    Answer& operator=(Answer&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

467 468 469 470
    bool active = false;
    // True from the point when the Call message is received to the point when both the `Finish`
    // message has been received and the `Return` has been sent.

471
    kj::Maybe<kj::Own<PipelineHook>> pipeline;
472 473
    // Send pipelined calls here.  Becomes null as soon as a `Finish` is received.

474
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> redirectedResults;
475 476
    // For locally-redirected calls (Call.sendResultsTo.yourself), this is a promise for the call
    // result, to be picked up by a subsequent `Return`.
477

478
    kj::Maybe<RpcCallContext&> callContext;
479 480
    // The call context, if it's still active.  Becomes null when the `Return` message is sent.
    // This object, if non-null, is owned by `asyncOp`.
481

482 483 484
    kj::Array<ExportId> resultExports;
    // List of exports that were sent in the results.  If the finish has `releaseResultCaps` these
    // will need to be released.
485 486 487 488 489 490
  };

  struct Export {
    uint refcount = 0;
    // When this reaches 0, drop `clientHook` and free this export.

491
    kj::Own<ClientHook> clientHook;
492

493 494 495 496
    kj::Promise<void> resolveOp = nullptr;
    // If this export is a promise (not a settled capability), the `resolveOp` represents the
    // ongoing operation to wait for that promise to resolve and then send a `Resolve` message.

497 498 499 500 501 502 503 504 505 506 507
    inline bool operator==(decltype(nullptr)) const { return refcount == 0; }
    inline bool operator!=(decltype(nullptr)) const { return refcount != 0; }
  };

  struct Import {
    Import() = default;
    Import(const Import&) = delete;
    Import(Import&&) = default;
    Import& operator=(Import&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

508
    kj::Maybe<ImportClient&> importClient;
509 510
    // Becomes null when the import is destroyed.

511 512 513 514 515
    kj::Maybe<RpcClient&> appClient;
    // Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient.
    // Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is
    // resolved and the import is no longer needed).

516
    kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Own<ClientHook>>>> promiseFulfiller;
517 518 519
    // If non-null, the import is a promise.
  };

Kenton Varda's avatar
Kenton Varda committed
520 521 522 523 524 525 526 527 528 529 530 531
  typedef uint32_t EmbargoId;

  struct Embargo {
    // For handling the `Disembargo` message when looping back to self.

    kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
    // Fulfill this when the Disembargo arrives.

    inline bool operator==(decltype(nullptr)) const { return fulfiller == nullptr; }
    inline bool operator!=(decltype(nullptr)) const { return fulfiller != nullptr; }
  };

532 533 534
  // =======================================================================================
  // OK, now we can define RpcConnectionState's member data.

535
  BootstrapFactoryBase& bootstrapFactory;
536
  kj::Maybe<RealmGateway<>::Client> gateway;
537
  kj::Maybe<SturdyRefRestorerBase&> restorer;
538 539 540 541 542 543 544

  typedef kj::Own<VatNetworkBase::Connection> Connected;
  typedef kj::Exception Disconnected;
  kj::OneOf<Connected, Disconnected> connection;
  // Once the connection has failed, we drop it and replace it with an exception, which will be
  // thrown from all further calls.

Kenton Varda's avatar
Kenton Varda committed
545
  kj::Own<kj::PromiseFulfiller<DisconnectInfo>> disconnectFulfiller;
546

547 548
  ExportTable<ExportId, Export> exports;
  ExportTable<QuestionId, Question> questions;
549 550
  ImportTable<AnswerId, Answer> answers;
  ImportTable<ImportId, Import> imports;
551 552 553 554 555 556 557 558 559
  // The Four Tables!
  // The order of the tables is important for correct destruction.

  std::unordered_map<ClientHook*, ExportId> exportsByCap;
  // Maps already-exported ClientHook objects to their ID in the export table.

  ExportTable<EmbargoId, Embargo> embargoes;
  // There are only four tables.  This definitely isn't a fifth table.  I don't know what you're
  // talking about.
560

561 562 563 564 565 566 567
  size_t flowLimit;
  size_t callWordsInFlight = 0;

  kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> flowWaiter;
  // If non-null, we're currently blocking incoming messages waiting for callWordsInFlight to drop
  // below flowLimit. Fulfill this to un-block.

568 569 570
  kj::TaskSet tasks;

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
571
  // ClientHook implementations
572

Kenton Varda's avatar
Kenton Varda committed
573
  class RpcClient: public ClientHook, public kj::Refcounted {
574
  public:
575
    RpcClient(RpcConnectionState& connectionState)
576
        : connectionState(kj::addRef(connectionState)) {}
577

578 579 580 581
    virtual kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) = 0;
    // Writes a CapDescriptor referencing this client.  The CapDescriptor must be sent as part of
    // the very next message sent on the connection, as it may become invalid if other things
    // happen.
Kenton Varda's avatar
Kenton Varda committed
582 583 584 585
    //
    // If writing the descriptor adds a new export to the export table, or increments the refcount
    // on an existing one, then the ID is returned and the caller is responsible for removing it
    // later.
Kenton Varda's avatar
Kenton Varda committed
586

587 588
    virtual kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) = 0;
Kenton Varda's avatar
Kenton Varda committed
589
    // Writes the appropriate call target for calls to this capability and returns null.
Kenton Varda's avatar
Kenton Varda committed
590
    //
Kenton Varda's avatar
Kenton Varda committed
591 592 593 594
    // - OR -
    //
    // If calls have been redirected to some other local ClientHook, returns that hook instead.
    // This can happen if the capability represents a promise that has been resolved.
Kenton Varda's avatar
Kenton Varda committed
595

596
    virtual kj::Own<ClientHook> getInnermostClient() = 0;
Kenton Varda's avatar
Kenton Varda committed
597 598 599 600
    // If this client just wraps some other client -- even if it is only *temporarily* wrapping
    // that other client -- return a reference to the other client, transitively.  Otherwise,
    // return a new reference to *this.

Kenton Varda's avatar
Kenton Varda committed
601 602
    // implements ClientHook -----------------------------------------

603
    Request<AnyPointer, AnyPointer> newCall(
604
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
605 606 607 608 609 610 611 612 613 614 615 616
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          //
          // We pull a neat trick here: We actually end up returning a RequestHook for an import
          // request on the gateway cap, but with the "root" of the request actually pointing
          // to the "params" field of the real request.

          sizeHint = sizeHint.map([](MessageSize hint) {
            ++hint.capCount;
            hint.wordCount += sizeInWords<RealmGateway<>::ImportParams>();
617
            return hint;
618 619 620
          });

          auto request = g->importRequest(sizeHint);
621
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
622

623 624 625 626 627 628 629 630 631 632 633
          // Awkwardly, request.initParams() would return a SaveParams struct, but to construct
          // the Request<AnyPointer, AnyPointer> to return we need an AnyPointer::Builder, and you
          // can't go backwards from a struct builder to an AnyPointer builder. So instead we
          // manually get at the pointer by converting the outer request to AnyStruct and then
          // pulling the pointer from the pointer section.
          auto pointers = toAny(request).getPointerSection();
          KJ_ASSERT(pointers.size() >= 2);
          auto paramsPtr = pointers[1];
          KJ_ASSERT(paramsPtr.isNull());

          return Request<AnyPointer, AnyPointer>(paramsPtr, RequestHook::from(kj::mv(request)));
634 635 636
        }
      }

637 638 639 640 641
      return newCallNoIntercept(interfaceId, methodId, sizeHint);
    }

    Request<AnyPointer, AnyPointer> newCallNoIntercept(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) {
642 643 644 645
      if (!connectionState->connection.is<Connected>()) {
        return newBrokenRequest(kj::cp(connectionState->connection.get<Disconnected>()), sizeHint);
      }

646
      auto request = kj::heap<RpcRequest>(
647 648
          *connectionState, *connectionState->connection.get<Connected>(),
          sizeHint, kj::addRef(*this));
649 650 651 652 653 654
      auto callBuilder = request->getCall();

      callBuilder.setInterfaceId(interfaceId);
      callBuilder.setMethodId(methodId);

      auto root = request->getRoot();
655
      return Request<AnyPointer, AnyPointer>(root, kj::mv(request));
656 657
    }

Kenton Varda's avatar
Kenton Varda committed
658
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
659
                                kj::Own<CallContextHook>&& context) override {
660 661 662 663 664 665 666 667 668 669 670
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          auto params = context->getParams().getAs<Persistent<>::SaveParams>();

          auto requestSize = params.totalSize();
          ++requestSize.capCount;
          requestSize.wordCount += sizeInWords<RealmGateway<>::ImportParams>();

          auto request = g->importRequest(requestSize);
671
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
672 673 674 675 676 677 678 679
          request.setParams(params);

          context->allowCancellation();
          context->releaseParams();
          return context->directTailCall(RequestHook::from(kj::mv(request)));
        }
      }

680 681 682 683 684 685 686
      return callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    VoidPromiseAndPipeline callNoIntercept(uint64_t interfaceId, uint16_t methodId,
                                           kj::Own<CallContextHook>&& context) {
      // Implement call() by copying params and results messages.

Kenton Varda's avatar
Kenton Varda committed
687
      auto params = context->getParams();
688
      auto request = newCallNoIntercept(interfaceId, methodId, params.targetSize());
Kenton Varda's avatar
Kenton Varda committed
689

690
      request.set(params);
Kenton Varda's avatar
Kenton Varda committed
691 692
      context->releaseParams();

693
      // We can and should propagate cancellation.
694
      context->allowCancellation();
695

696
      return context->directTailCall(RequestHook::from(kj::mv(request)));
Kenton Varda's avatar
Kenton Varda committed
697 698
    }

699
    kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
700 701
      return kj::addRef(*this);
    }
702
    const void* getBrand() override {
703
      return connectionState.get();
Kenton Varda's avatar
Kenton Varda committed
704 705
    }

706
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
707 708
  };

709 710 711 712
  class ImportClient final: public RpcClient {
    // A ClientHook that wraps an entry in the import table.

  public:
713
    ImportClient(RpcConnectionState& connectionState, ImportId importId)
Kenton Varda's avatar
Kenton Varda committed
714 715
        : RpcClient(connectionState), importId(importId) {}

Kenton Varda's avatar
Kenton Varda committed
716
    ~ImportClient() noexcept(false) {
717
      unwindDetector.catchExceptionsIfUnwinding([&]() {
718
        // Remove self from the import table, if the table is still pointing at us.
719 720 721 722 723
        KJ_IF_MAYBE(import, connectionState->imports.find(importId)) {
          KJ_IF_MAYBE(i, import->importClient) {
            if (i == this) {
              connectionState->imports.erase(importId);
            }
724 725
          }
        }
Kenton Varda's avatar
Kenton Varda committed
726

727
        // Send a message releasing our remote references.
728 729
        if (remoteRefcount > 0 && connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
730 731 732 733 734 735 736
              messageSizeHint<rpc::Release>());
          rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease();
          builder.setId(importId);
          builder.setReferenceCount(remoteRefcount);
          message->send();
        }
      });
737 738
    }

739 740 741
    void addRemoteRef() {
      // Add a new RemoteRef and return a new ref to this client representing it.
      ++remoteRefcount;
Kenton Varda's avatar
Kenton Varda committed
742
    }
743

744
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
745
      descriptor.setReceiverHosted(importId);
Kenton Varda's avatar
Kenton Varda committed
746
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
747 748
    }

749 750
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
751
      target.setImportedCap(importId);
Kenton Varda's avatar
Kenton Varda committed
752
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
753 754
    }

755
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
756 757 758
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
759
    // implements ClientHook -----------------------------------------
760

761
    kj::Maybe<ClientHook&> getResolved() override {
762 763 764
      return nullptr;
    }

765
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
766 767 768
      return nullptr;
    }

Kenton Varda's avatar
Kenton Varda committed
769
  private:
770
    ImportId importId;
Kenton Varda's avatar
Kenton Varda committed
771 772 773

    uint remoteRefcount = 0;
    // Number of times we've received this import from the peer.
774 775

    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
776 777
  };

778 779 780
  class PipelineClient final: public RpcClient {
    // A ClientHook representing a pipelined promise.  Always wrapped in PromiseClient.

Kenton Varda's avatar
Kenton Varda committed
781
  public:
782 783
    PipelineClient(RpcConnectionState& connectionState,
                   kj::Own<QuestionRef>&& questionRef,
784
                   kj::Array<PipelineOp>&& ops)
Kenton Varda's avatar
Kenton Varda committed
785
        : RpcClient(connectionState), questionRef(kj::mv(questionRef)), ops(kj::mv(ops)) {}
Kenton Varda's avatar
Kenton Varda committed
786

787
   kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
788 789 790 791 792
      auto promisedAnswer = descriptor.initReceiverAnswer();
      promisedAnswer.setQuestionId(questionRef->getId());
      promisedAnswer.adoptTransform(fromPipelineOps(
          Orphanage::getForMessageContaining(descriptor), ops));
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
793 794
    }

795 796
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
Kenton Varda's avatar
Kenton Varda committed
797 798 799 800
      auto builder = target.initPromisedAnswer();
      builder.setQuestionId(questionRef->getId());
      builder.adoptTransform(fromPipelineOps(Orphanage::getForMessageContaining(builder), ops));
      return nullptr;
801 802
    }

803
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
804 805 806
      return kj::addRef(*this);
    }

807
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
808

809
    kj::Maybe<ClientHook&> getResolved() override {
810 811 812
      return nullptr;
    }

813
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
814
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
815 816 817
    }

  private:
818
    kj::Own<QuestionRef> questionRef;
819
    kj::Array<PipelineOp> ops;
Kenton Varda's avatar
Kenton Varda committed
820 821
  };

822 823 824 825
  class PromiseClient final: public RpcClient {
    // A ClientHook that initially wraps one client (in practice, an ImportClient or a
    // PipelineClient) and then, later on, redirects to some other client.

Kenton Varda's avatar
Kenton Varda committed
826
  public:
827 828 829
    PromiseClient(RpcConnectionState& connectionState,
                  kj::Own<ClientHook> initial,
                  kj::Promise<kj::Own<ClientHook>> eventual,
830
                  kj::Maybe<ImportId> importId)
831
        : RpcClient(connectionState),
832 833
          isResolved(false),
          cap(kj::mv(initial)),
834
          importId(importId),
835 836 837
          fork(eventual.fork()),
          resolveSelfPromise(fork.addBranch().then(
              [this](kj::Own<ClientHook>&& resolution) {
838
                resolve(kj::mv(resolution), false);
839
              }, [this](kj::Exception&& exception) {
840
                resolve(newBrokenCap(kj::mv(exception)), true);
841 842 843 844
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
845
              })) {
846 847 848 849 850 851
      // Create a client that starts out forwarding all calls to `initial` but, once `eventual`
      // resolves, will forward there instead.  In addition, `whenMoreResolved()` will return a fork
      // of `eventual`.  Note that this means the application could hold on to `eventual` even after
      // the `PromiseClient` is destroyed; `eventual` must therefore make sure to hold references to
      // anything that needs to stay alive in order to resolve it correctly (such as making sure the
      // import ID is not released).
Kenton Varda's avatar
Kenton Varda committed
852
    }
Kenton Varda's avatar
Kenton Varda committed
853

854 855 856 857 858 859
    ~PromiseClient() noexcept(false) {
      KJ_IF_MAYBE(id, importId) {
        // This object is representing an import promise.  That means the import table may still
        // contain a pointer back to it.  Remove that pointer.  Note that we have to verify that
        // the import still exists and the pointer still points back to this object because this
        // object may actually outlive the import.
860
        KJ_IF_MAYBE(import, connectionState->imports.find(*id)) {
861 862 863 864 865 866 867 868 869
          KJ_IF_MAYBE(c, import->appClient) {
            if (c == this) {
              import->appClient = nullptr;
            }
          }
        }
      }
    }

870
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
871
      receivedCall = true;
872
      return connectionState->writeDescriptor(*cap, descriptor);
Kenton Varda's avatar
Kenton Varda committed
873
    }
Kenton Varda's avatar
Kenton Varda committed
874

875 876
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
877
      receivedCall = true;
878
      return connectionState->writeTarget(*cap, target);
Kenton Varda's avatar
Kenton Varda committed
879 880
    }

881
    kj::Own<ClientHook> getInnermostClient() override {
882
      receivedCall = true;
883
      return connectionState->getInnermostClient(*cap);
Kenton Varda's avatar
Kenton Varda committed
884 885
    }

Kenton Varda's avatar
Kenton Varda committed
886
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
887

888
    Request<AnyPointer, AnyPointer> newCall(
889
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
890 891 892 893 894 895 896 897 898 899
      if (!isResolved && interfaceId == typeId<Persistent<>>() && methodId == 0 &&
          connectionState->gateway != nullptr) {
        // This is a call to Persistent.save(), and we're not resolved yet, and the underlying
        // remote capability will perform a gateway translation. This isn't right if the promise
        // ultimately resolves to a local capability. Instead, we'll need to queue the call until
        // the promise resolves.
        return newLocalPromiseClient(fork.addBranch())
            ->newCall(interfaceId, methodId, sizeHint);
      }

900
      receivedCall = true;
901
      return cap->newCall(interfaceId, methodId, sizeHint);
902 903
    }

904
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
905
                                kj::Own<CallContextHook>&& context) override {
906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
      if (!isResolved && interfaceId == typeId<Persistent<>>() && methodId == 0 &&
          connectionState->gateway != nullptr) {
        // This is a call to Persistent.save(), and we're not resolved yet, and the underlying
        // remote capability will perform a gateway translation. This isn't right if the promise
        // ultimately resolves to a local capability. Instead, we'll need to queue the call until
        // the promise resolves.

        auto vpapPromises = fork.addBranch().then(kj::mvCapture(context,
            [interfaceId,methodId](kj::Own<CallContextHook>&& context,
                                   kj::Own<ClientHook> resolvedCap) {
          auto vpap = resolvedCap->call(interfaceId, methodId, kj::mv(context));
          return kj::tuple(kj::mv(vpap.promise), kj::mv(vpap.pipeline));
        })).split();

        return {
          kj::mv(kj::get<0>(vpapPromises)),
          newLocalPromisePipeline(kj::mv(kj::get<1>(vpapPromises))),
        };
      }

926
      receivedCall = true;
927
      return cap->call(interfaceId, methodId, kj::mv(context));
928 929
    }

930
    kj::Maybe<ClientHook&> getResolved() override {
931 932
      if (isResolved) {
        return *cap;
933 934 935
      } else {
        return nullptr;
      }
Kenton Varda's avatar
Kenton Varda committed
936 937
    }

938
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
939
      return fork.addBranch();
Kenton Varda's avatar
Kenton Varda committed
940
    }
Kenton Varda's avatar
Kenton Varda committed
941 942

  private:
943 944
    bool isResolved;
    kj::Own<ClientHook> cap;
945

946
    kj::Maybe<ImportId> importId;
947
    kj::ForkedPromise<kj::Own<ClientHook>> fork;
Kenton Varda's avatar
Kenton Varda committed
948 949 950 951 952

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

953
    bool receivedCall = false;
Kenton Varda's avatar
Kenton Varda committed
954

955
    void resolve(kj::Own<ClientHook> replacement, bool isError) {
956 957 958 959
      const void* replacementBrand = replacement->getBrand();
      if (replacementBrand != connectionState.get() &&
          replacementBrand != &ClientHook::NULL_CAPABILITY_BRAND &&
          receivedCall && !isError && connectionState->connection.is<Connected>()) {
960 961 962 963 964
        // The new capability is hosted locally, not on the remote machine.  And, we had made calls
        // to the promise.  We need to make sure those calls echo back to us before we allow new
        // calls to go directly to the local capability, so we need to set a local embargo and send
        // a `Disembargo` to echo through the peer.

965
        auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
966
            messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
Kenton Varda's avatar
Kenton Varda committed
967

Kenton Varda's avatar
Kenton Varda committed
968
        auto disembargo = message->getBody().initAs<rpc::Message>().initDisembargo();
Kenton Varda's avatar
Kenton Varda committed
969 970

        {
971
          auto redirect = connectionState->writeTarget(*cap, disembargo.initTarget());
Kenton Varda's avatar
Kenton Varda committed
972 973 974 975 976
          KJ_ASSERT(redirect == nullptr,
                    "Original promise target should always be from this RPC connection.");
        }

        EmbargoId embargoId;
977
        Embargo& embargo = connectionState->embargoes.next(embargoId);
Kenton Varda's avatar
Kenton Varda committed
978 979 980 981 982 983 984

        disembargo.getContext().setSenderLoopback(embargoId);

        auto paf = kj::newPromiseAndFulfiller<void>();
        embargo.fulfiller = kj::mv(paf.fulfiller);

        // Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back.
985
        auto embargoPromise = paf.promise.then(
986
            kj::mvCapture(replacement, [](kj::Own<ClientHook>&& replacement) {
Kenton Varda's avatar
Kenton Varda committed
987 988 989 990 991
              return kj::mv(replacement);
            }));

        // We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise
        // client instead.
992
        replacement = newLocalPromiseClient(kj::mv(embargoPromise));
Kenton Varda's avatar
Kenton Varda committed
993 994 995 996 997

        // Send the `Disembargo`.
        message->send();
      }

998
      cap = kj::mv(replacement);
999
      isResolved = true;
Kenton Varda's avatar
Kenton Varda committed
1000
    }
Kenton Varda's avatar
Kenton Varda committed
1001
  };
1002

1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
  class NoInterceptClient final: public RpcClient {
    // A wrapper around an RpcClient which bypasses special handling of "save" requests. When we
    // intercept a "save" request and invoke a RealmGateway, we give it a version of the capability
    // with intercepting disabled, since usually the first thing the RealmGateway will do is turn
    // around and call save() again.
    //
    // This is admittedly sort of backwards: the interception of "save" ought to be the part
    // implemented by a wrapper. However, that would require placing a wrapper around every
    // RpcClient we create whereas NoInterceptClient only needs to be injected after a save()
    // request occurs and is intercepted.

  public:
    NoInterceptClient(RpcClient& inner)
        : RpcClient(*inner.connectionState),
          inner(kj::addRef(inner)) {}

    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
      return inner->writeDescriptor(descriptor);
    }

    kj::Maybe<kj::Own<ClientHook>> writeTarget(rpc::MessageTarget::Builder target) override {
      return inner->writeTarget(target);
    }

    kj::Own<ClientHook> getInnermostClient() override {
      return inner->getInnermostClient();
    }

    Request<AnyPointer, AnyPointer> newCall(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
      return inner->newCallNoIntercept(interfaceId, methodId, sizeHint);
    }
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
                                kj::Own<CallContextHook>&& context) override {
      return inner->callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    kj::Maybe<ClientHook&> getResolved() override {
      return nullptr;
    }

    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
      return nullptr;
    }

  private:
    kj::Own<RpcClient> inner;
  };

1052
  kj::Maybe<ExportId> writeDescriptor(ClientHook& cap, rpc::CapDescriptor::Builder descriptor) {
1053
    // Write a descriptor for the given capability.
Kenton Varda's avatar
Kenton Varda committed
1054

1055
    // Find the innermost wrapped capability.
1056
    ClientHook* inner = &cap;
1057 1058 1059 1060 1061 1062 1063 1064 1065
    for (;;) {
      KJ_IF_MAYBE(resolved, inner->getResolved()) {
        inner = resolved;
      } else {
        break;
      }
    }

    if (inner->getBrand() == this) {
1066
      return kj::downcast<RpcClient>(*inner).writeDescriptor(descriptor);
Kenton Varda's avatar
Kenton Varda committed
1067
    } else {
1068 1069
      auto iter = exportsByCap.find(inner);
      if (iter != exportsByCap.end()) {
1070
        // We've already seen and exported this capability before.  Just up the refcount.
1071
        auto& exp = KJ_ASSERT_NONNULL(exports.find(iter->second));
Kenton Varda's avatar
Kenton Varda committed
1072 1073 1074 1075
        ++exp.refcount;
        descriptor.setSenderHosted(iter->second);
        return iter->second;
      } else {
1076
        // This is the first time we've seen this capability.
Kenton Varda's avatar
Kenton Varda committed
1077
        ExportId exportId;
1078 1079
        auto& exp = exports.next(exportId);
        exportsByCap[inner] = exportId;
Kenton Varda's avatar
Kenton Varda committed
1080
        exp.refcount = 1;
1081 1082 1083 1084
        exp.clientHook = inner->addRef();

        KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) {
          // This is a promise.  Arrange for the `Resolve` message to be sent later.
Kenton Varda's avatar
Kenton Varda committed
1085
          exp.resolveOp = resolveExportedPromise(exportId, kj::mv(*wrapped));
1086 1087 1088
          descriptor.setSenderPromise(exportId);
        } else {
          descriptor.setSenderHosted(exportId);
1089 1090
        }

Kenton Varda's avatar
Kenton Varda committed
1091 1092
        return exportId;
      }
Kenton Varda's avatar
Kenton Varda committed
1093 1094 1095
    }
  }

1096
  kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> capTable,
1097 1098 1099 1100
                                       rpc::Payload::Builder payload) {
    auto capTableBuilder = payload.initCapTable(capTable.size());
    kj::Vector<ExportId> exports(capTable.size());
    for (uint i: kj::indices(capTable)) {
1101 1102 1103 1104 1105 1106
      KJ_IF_MAYBE(cap, capTable[i]) {
        KJ_IF_MAYBE(exportId, writeDescriptor(**cap, capTableBuilder[i])) {
          exports.add(*exportId);
        }
      } else {
        capTableBuilder[i].setNone();
1107 1108 1109 1110 1111
      }
    }
    return exports.releaseAsArray();
  }

1112
  kj::Maybe<kj::Own<ClientHook>> writeTarget(ClientHook& cap, rpc::MessageTarget::Builder target) {
Kenton Varda's avatar
Kenton Varda committed
1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
    // If calls to the given capability should pass over this connection, fill in `target`
    // appropriately for such a call and return nullptr.  Otherwise, return a `ClientHook` to which
    // the call should be forwarded; the caller should then delegate the call to that `ClientHook`.
    //
    // The main case where this ends up returning non-null is if `cap` is a promise that has
    // recently resolved.  The application might have started building a request before the promise
    // resolved, and so the request may have been built on the assumption that it would be sent over
    // this network connection, but then the promise resolved to point somewhere else before the
    // request was sent.  Now the request has to be redirected to the new target instead.

    if (cap.getBrand() == this) {
1124
      return kj::downcast<RpcClient>(cap).writeTarget(target);
Kenton Varda's avatar
Kenton Varda committed
1125 1126 1127 1128 1129
    } else {
      return cap.addRef();
    }
  }

1130 1131
  kj::Own<ClientHook> getInnermostClient(ClientHook& client) {
    ClientHook* ptr = &client;
Kenton Varda's avatar
Kenton Varda committed
1132 1133 1134 1135 1136 1137 1138 1139 1140
    for (;;) {
      KJ_IF_MAYBE(inner, ptr->getResolved()) {
        ptr = inner;
      } else {
        break;
      }
    }

    if (ptr->getBrand() == this) {
1141
      return kj::downcast<RpcClient>(*ptr).getInnermostClient();
Kenton Varda's avatar
Kenton Varda committed
1142 1143 1144 1145 1146 1147
    } else {
      return ptr->addRef();
    }
  }

  kj::Promise<void> resolveExportedPromise(
1148
      ExportId exportId, kj::Promise<kj::Own<ClientHook>>&& promise) {
Kenton Varda's avatar
Kenton Varda committed
1149 1150 1151 1152
    // Implements exporting of a promise.  The promise has been exported under the given ID, and is
    // to eventually resolve to the ClientHook produced by `promise`.  This method waits for that
    // resolve to happen and then sends the appropriate `Resolve` message to the peer.

1153
    return promise.then(
1154
        [this,exportId](kj::Own<ClientHook>&& resolution) -> kj::Promise<void> {
Kenton Varda's avatar
Kenton Varda committed
1155 1156
      // Successful resolution.

1157 1158 1159 1160 1161
      KJ_ASSERT(connection.is<Connected>(),
                "Resolving export should have been canceled on disconnect.") {
        return kj::READY_NOW;
      }

Kenton Varda's avatar
Kenton Varda committed
1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172
      // Get the innermost ClientHook backing the resolved client.  This includes traversing
      // PromiseClients that haven't resolved yet to their underlying ImportClient or
      // PipelineClient, so that we get a remote promise that might resolve later.  This is
      // important to make sure that if the peer sends a `Disembargo` back to us, it bounces back
      // correctly instead of going to the result of some future resolution.  See the documentation
      // for `Disembargo` in `rpc.capnp`.
      resolution = getInnermostClient(*resolution);

      // Update the export table to point at this object instead.  We know that our entry in the
      // export table is still live because when it is destroyed the asynchronous resolution task
      // (i.e. this code) is canceled.
1173 1174
      auto& exp = KJ_ASSERT_NONNULL(exports.find(exportId));
      exportsByCap.erase(exp.clientHook);
Kenton Varda's avatar
Kenton Varda committed
1175 1176
      exp.clientHook = kj::mv(resolution);

1177
      if (exp.clientHook->getBrand() != this) {
Kenton Varda's avatar
Kenton Varda committed
1178 1179 1180
        // We're resolving to a local capability.  If we're resolving to a promise, we might be
        // able to reuse our export table entry and avoid sending a message.

1181
        KJ_IF_MAYBE(promise, exp.clientHook->whenMoreResolved()) {
Kenton Varda's avatar
Kenton Varda committed
1182 1183 1184 1185
          // We're replacing a promise with another local promise.  In this case, we might actually
          // be able to just reuse the existing export table entry to represent the new promise --
          // unless it already has an entry.  Let's check.

1186
          auto insertResult = exportsByCap.insert(std::make_pair(exp.clientHook.get(), exportId));
Kenton Varda's avatar
Kenton Varda committed
1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197

          if (insertResult.second) {
            // The new promise was not already in the table, therefore the existing export table
            // entry has now been repurposed to represent it.  There is no need to send a resolve
            // message at all.  We do, however, have to start resolving the next promise.
            return resolveExportedPromise(exportId, kj::mv(*promise));
          }
        }
      }

      // OK, we have to send a `Resolve` message.
1198
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1199 1200 1201
          messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
1202
      writeDescriptor(*exp.clientHook, resolve.initCap());
Kenton Varda's avatar
Kenton Varda committed
1203 1204 1205 1206 1207
      message->send();

      return kj::READY_NOW;
    }, [this,exportId](kj::Exception&& exception) {
      // send error resolution
1208
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1209 1210 1211 1212 1213
          messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
      fromException(exception, resolve.initException());
      message->send();
1214 1215 1216
    }).eagerlyEvaluate([this](kj::Exception&& exception) {
      // Put the exception on the TaskSet which will cause the connection to be terminated.
      tasks.add(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1217 1218 1219
    });
  }

Kenton Varda's avatar
Kenton Varda committed
1220
  // =====================================================================================
1221
  // Interpreting CapDescriptor
Kenton Varda's avatar
Kenton Varda committed
1222

1223
  kj::Own<ClientHook> import(ImportId importId, bool isPromise) {
1224
    // Receive a new import.
Kenton Varda's avatar
Kenton Varda committed
1225

1226 1227
    auto& import = imports[importId];
    kj::Own<ImportClient> importClient;
Kenton Varda's avatar
Kenton Varda committed
1228

1229 1230 1231 1232 1233 1234
    // Create the ImportClient, or if one already exists, use it.
    KJ_IF_MAYBE(c, import.importClient) {
      importClient = kj::addRef(*c);
    } else {
      importClient = kj::refcounted<ImportClient>(*this, importId);
      import.importClient = *importClient;
Kenton Varda's avatar
Kenton Varda committed
1235
    }
1236

1237 1238
    // We just received a copy of this import ID, so the remote refcount has gone up.
    importClient->addRemoteRef();
Kenton Varda's avatar
Kenton Varda committed
1239

1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
    if (isPromise) {
      // We need to construct a PromiseClient around this import, if we haven't already.
      KJ_IF_MAYBE(c, import.appClient) {
        // Use the existing one.
        return kj::addRef(*c);
      } else {
        // Create a promise for this import's resolution.
        auto paf = kj::newPromiseAndFulfiller<kj::Own<ClientHook>>();
        import.promiseFulfiller = kj::mv(paf.fulfiller);

        // Make sure the import is not destroyed while this promise exists.
        paf.promise = paf.promise.attach(kj::addRef(*importClient));

        // Create a PromiseClient around it and return it.
        auto result = kj::refcounted<PromiseClient>(
            *this, kj::mv(importClient), kj::mv(paf.promise), importId);
        import.appClient = *result;
        return kj::mv(result);
1258
      }
1259 1260 1261
    } else {
      import.appClient = *importClient;
      return kj::mv(importClient);
1262
    }
1263
  }
1264

1265
  kj::Maybe<kj::Own<ClientHook>> receiveCap(rpc::CapDescriptor::Reader descriptor) {
1266 1267
    switch (descriptor.which()) {
      case rpc::CapDescriptor::NONE:
1268
        return nullptr;
1269

1270 1271 1272 1273
      case rpc::CapDescriptor::SENDER_HOSTED:
        return import(descriptor.getSenderHosted(), false);
      case rpc::CapDescriptor::SENDER_PROMISE:
        return import(descriptor.getSenderPromise(), true);
1274

1275 1276 1277 1278
      case rpc::CapDescriptor::RECEIVER_HOSTED:
        KJ_IF_MAYBE(exp, exports.find(descriptor.getReceiverHosted())) {
          return exp->clientHook->addRef();
        } else {
1279 1280 1281
          return newBrokenCap("invalid 'receiverHosted' export ID");
        }

1282 1283
      case rpc::CapDescriptor::RECEIVER_ANSWER: {
        auto promisedAnswer = descriptor.getReceiverAnswer();
1284

1285 1286 1287 1288 1289 1290 1291
        KJ_IF_MAYBE(answer, answers.find(promisedAnswer.getQuestionId())) {
          if (answer->active) {
            KJ_IF_MAYBE(pipeline, answer->pipeline) {
              KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
                return pipeline->get()->getPipelinedCap(*ops);
              } else {
                return newBrokenCap("unrecognized pipeline ops");
Kenton Varda's avatar
Kenton Varda committed
1292 1293
              }
            }
1294
          }
1295 1296
        }

1297
        return newBrokenCap("invalid 'receiverAnswer'");
1298 1299
      }

1300 1301 1302
      case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
        // We don't support third-party caps, so use the vine instead.
        return import(descriptor.getThirdPartyHosted().getVineId(), false);
Kenton Varda's avatar
Kenton Varda committed
1303

1304 1305 1306
      default:
        KJ_FAIL_REQUIRE("unknown CapDescriptor type") { break; }
        return newBrokenCap("unknown CapDescriptor type");
Kenton Varda's avatar
Kenton Varda committed
1307
    }
1308
  }
1309

1310 1311
  kj::Array<kj::Maybe<kj::Own<ClientHook>>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable) {
    auto result = kj::heapArrayBuilder<kj::Maybe<kj::Own<ClientHook>>>(capTable.size());
1312 1313
    for (auto cap: capTable) {
      result.add(receiveCap(cap));
Kenton Varda's avatar
Kenton Varda committed
1314
    }
1315 1316
    return result.finish();
  }
1317 1318

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
1319
  // RequestHook/PipelineHook/ResponseHook implementations
1320

Kenton Varda's avatar
Kenton Varda committed
1321 1322 1323 1324
  class QuestionRef: public kj::Refcounted {
    // A reference to an entry on the question table.  Used to detect when the `Finish` message
    // can be sent.

Kenton Varda's avatar
Kenton Varda committed
1325
  public:
1326
    inline QuestionRef(
1327 1328 1329
        RpcConnectionState& connectionState, QuestionId id,
        kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller)
        : connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1330 1331

    ~QuestionRef() {
1332
      unwindDetector.catchExceptionsIfUnwinding([&]() {
1333 1334 1335
        auto& question = KJ_ASSERT_NONNULL(
            connectionState->questions.find(id), "Question ID no longer on table?");

1336
        // Send the "Finish" message (if the connection is not already broken).
1337
        if (connectionState->connection.is<Connected>() && !question.skipFinish) {
1338
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1339
              messageSizeHint<rpc::Finish>());
1340 1341
          auto builder = message->getBody().getAs<rpc::Message>().initFinish();
          builder.setQuestionId(id);
1342 1343 1344
          // If we're still awaiting a return, then this request is being canceled, and we're going
          // to ignore any capabilities in the return message, so set releaseResultCaps true. If we
          // already received the return, then we've already built local proxies for the caps and
David Renshaw's avatar
David Renshaw committed
1345
          // will send Release messages when those are destroyed.
1346
          builder.setReleaseResultCaps(question.isAwaitingReturn);
1347
          message->send();
1348
        }
Kenton Varda's avatar
Kenton Varda committed
1349

1350 1351 1352
        // Check if the question has returned and, if so, remove it from the table.
        // Remove question ID from the table.  Must do this *after* sending `Finish` to ensure that
        // the ID is not re-allocated before the `Finish` message can be sent.
1353 1354
        if (question.isAwaitingReturn) {
          // Still waiting for return, so just remove the QuestionRef pointer from the table.
1355
          question.selfRef = nullptr;
1356 1357 1358
        } else {
          // Call has already returned, so we can now remove it from the table.
          connectionState->questions.erase(id, question);
1359 1360
        }
      });
Kenton Varda's avatar
Kenton Varda committed
1361 1362 1363 1364
    }

    inline QuestionId getId() const { return id; }

1365
    void fulfill(kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1366 1367 1368
      fulfiller->fulfill(kj::mv(response));
    }

1369
    void fulfill(kj::Promise<kj::Own<RpcResponse>>&& promise) {
1370 1371 1372
      fulfiller->fulfill(kj::mv(promise));
    }

Kenton Varda's avatar
Kenton Varda committed
1373 1374
    void reject(kj::Exception&& exception) {
      fulfiller->reject(kj::mv(exception));
1375
    }
Kenton Varda's avatar
Kenton Varda committed
1376 1377

  private:
1378
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1379
    QuestionId id;
1380
    kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller;
1381
    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
1382 1383
  };

Kenton Varda's avatar
Kenton Varda committed
1384
  class RpcRequest final: public RequestHook {
1385
  public:
1386 1387
    RpcRequest(RpcConnectionState& connectionState, VatNetworkBase::Connection& connection,
               kj::Maybe<MessageSize> sizeHint, kj::Own<RpcClient>&& target)
1388
        : connectionState(kj::addRef(connectionState)),
Kenton Varda's avatar
Kenton Varda committed
1389
          target(kj::mv(target)),
1390
          message(connection.newOutgoingMessage(
1391 1392
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Call>() +
                  sizeInWords<rpc::Payload>() + MESSAGE_TARGET_SIZE_HINT))),
Kenton Varda's avatar
Kenton Varda committed
1393
          callBuilder(message->getBody().getAs<rpc::Message>().initCall()),
1394
          paramsBuilder(capTable.imbue(callBuilder.getParams().getContent())) {}
Kenton Varda's avatar
Kenton Varda committed
1395

1396
    inline AnyPointer::Builder getRoot() {
Kenton Varda's avatar
Kenton Varda committed
1397 1398
      return paramsBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1399 1400 1401
    inline rpc::Call::Builder getCall() {
      return callBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1402

1403
    RemotePromise<AnyPointer> send() override {
1404
      if (!connectionState->connection.is<Connected>()) {
1405
        // Connection is broken.
1406
        const kj::Exception& e = connectionState->connection.get<Disconnected>();
1407
        return RemotePromise<AnyPointer>(
1408 1409
            kj::Promise<Response<AnyPointer>>(kj::cp(e)),
            AnyPointer::Pipeline(newBrokenPipeline(kj::cp(e))));
1410
      }
Kenton Varda's avatar
Kenton Varda committed
1411

1412 1413 1414
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // We'll have to make a new request and do a copy.  Ick.
Kenton Varda's avatar
Kenton Varda committed
1415

1416
        auto replacement = redirect->get()->newCall(
1417
            callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize());
1418 1419 1420
        replacement.set(paramsBuilder);
        return replacement.send();
      } else {
1421
        auto sendResult = sendInternal(false);
Kenton Varda's avatar
Kenton Varda committed
1422

1423
        auto forkedPromise = sendResult.promise.fork();
1424

1425 1426 1427
        // The pipeline must get notified of resolution before the app does to maintain ordering.
        auto pipeline = kj::refcounted<RpcPipeline>(
            *connectionState, kj::mv(sendResult.questionRef), forkedPromise.addBranch());
Kenton Varda's avatar
Kenton Varda committed
1428

1429 1430 1431 1432 1433
        auto appPromise = forkedPromise.addBranch().then(
            [=](kj::Own<RpcResponse>&& response) {
              auto reader = response->getResults();
              return Response<AnyPointer>(reader, kj::mv(response));
            });
Kenton Varda's avatar
Kenton Varda committed
1434

1435 1436 1437 1438
        return RemotePromise<AnyPointer>(
            kj::mv(appPromise),
            AnyPointer::Pipeline(kj::mv(pipeline)));
      }
Kenton Varda's avatar
Kenton Varda committed
1439 1440
    }

1441 1442 1443
    struct TailInfo {
      QuestionId questionId;
      kj::Promise<void> promise;
1444
      kj::Own<PipelineHook> pipeline;
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454
    };

    kj::Maybe<TailInfo> tailSend() {
      // Send the request as a tail call.
      //
      // Returns null if for some reason a tail call is not possible and the caller should fall
      // back to using send() and copying the response.

      SendInternalResult sendResult;

1455
      if (!connectionState->connection.is<Connected>()) {
1456 1457 1458
        // Disconnected; fall back to a regular send() which will fail appropriately.
        return nullptr;
      }
1459

1460 1461 1462 1463 1464 1465
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // Fall back to regular send().
        return nullptr;
      } else {
        sendResult = sendInternal(true);
1466 1467
      }

1468
      auto promise = sendResult.promise.then([](kj::Own<RpcResponse>&& response) {
1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479
        // Response should be null if `Return` handling code is correct.
        KJ_ASSERT(!response) { break; }
      });

      QuestionId questionId = sendResult.questionRef->getId();

      auto pipeline = kj::refcounted<RpcPipeline>(*connectionState, kj::mv(sendResult.questionRef));

      return TailInfo { questionId, kj::mv(promise), kj::mv(pipeline) };
    }

1480
    const void* getBrand() override {
1481 1482 1483
      return connectionState.get();
    }

Kenton Varda's avatar
Kenton Varda committed
1484
  private:
1485
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1486

1487
    kj::Own<RpcClient> target;
Kenton Varda's avatar
Kenton Varda committed
1488
    kj::Own<OutgoingRpcMessage> message;
1489
    BuilderCapabilityTable capTable;
Kenton Varda's avatar
Kenton Varda committed
1490
    rpc::Call::Builder callBuilder;
1491
    AnyPointer::Builder paramsBuilder;
1492 1493 1494

    struct SendInternalResult {
      kj::Own<QuestionRef> questionRef;
1495
      kj::Promise<kj::Own<RpcResponse>> promise = nullptr;
1496 1497
    };

1498
    SendInternalResult sendInternal(bool isTailCall) {
1499 1500
      // Build the cap table.
      auto exports = connectionState->writeDescriptors(
1501
          capTable.getTable(), callBuilder.getParams());
1502

1503
      // Init the question table.  Do this after writing descriptors to avoid interference.
1504
      QuestionId questionId;
1505
      auto& question = connectionState->questions.next(questionId);
1506 1507 1508
      question.isAwaitingReturn = true;
      question.paramExports = kj::mv(exports);
      question.isTailCall = isTailCall;
1509

1510 1511 1512 1513 1514 1515 1516 1517
      // Make the QuentionRef and result promise.
      SendInternalResult result;
      auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
      result.questionRef = kj::refcounted<QuestionRef>(
          *connectionState, questionId, kj::mv(paf.fulfiller));
      question.selfRef = *result.questionRef;
      result.promise = paf.promise.attach(kj::addRef(*result.questionRef));

1518
      // Finish and send.
1519 1520 1521 1522
      callBuilder.setQuestionId(questionId);
      if (isTailCall) {
        callBuilder.getSendResultsTo().setYourself();
      }
1523 1524 1525 1526 1527
      KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
        KJ_CONTEXT("sending RPC call",
           callBuilder.getInterfaceId(), callBuilder.getMethodId());
        message->send();
      })) {
1528 1529 1530 1531 1532
        // We can't safely throw the exception from here since we've already modified the question
        // table state. We'll have to reject the promise instead.
        question.isAwaitingReturn = false;
        question.skipFinish = true;
        result.questionRef->reject(kj::mv(*exception));
1533
      }
1534

1535
      // Send and return.
1536 1537
      return kj::mv(result);
    }
Kenton Varda's avatar
Kenton Varda committed
1538 1539
  };

Kenton Varda's avatar
Kenton Varda committed
1540
  class RpcPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1541
  public:
1542 1543
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef,
                kj::Promise<kj::Own<RpcResponse>>&& redirectLaterParam)
1544
        : connectionState(kj::addRef(connectionState)),
1545 1546 1547
          redirectLater(redirectLaterParam.fork()),
          resolveSelfPromise(KJ_ASSERT_NONNULL(redirectLater).addBranch().then(
              [this](kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1548
                resolve(kj::mv(response));
1549
              }, [this](kj::Exception&& exception) {
Kenton Varda's avatar
Kenton Varda committed
1550
                resolve(kj::mv(exception));
1551 1552 1553 1554
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
1555 1556 1557
              })) {
      // Construct a new RpcPipeline.

1558
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1559
    }
Kenton Varda's avatar
Kenton Varda committed
1560

1561
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef)
1562 1563 1564 1565
        : connectionState(kj::addRef(connectionState)),
          resolveSelfPromise(nullptr) {
      // Construct a new RpcPipeline that is never expected to resolve.

1566
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1567
    }
Kenton Varda's avatar
Kenton Varda committed
1568 1569 1570

    // implements PipelineHook ---------------------------------------

1571
    kj::Own<PipelineHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1572 1573 1574
      return kj::addRef(*this);
    }

1575
    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
Kenton Varda's avatar
Kenton Varda committed
1576 1577 1578 1579 1580 1581 1582
      auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
      for (auto& op: ops) {
        copy.add(op);
      }
      return getPipelinedCap(copy.finish());
    }

1583
    kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override {
1584
      if (state.is<Waiting>()) {
1585 1586
        // Wrap a PipelineClient in a PromiseClient.
        auto pipelineClient = kj::refcounted<PipelineClient>(
1587
            *connectionState, kj::addRef(*state.get<Waiting>()), kj::heapArray(ops.asPtr()));
1588

1589
        KJ_IF_MAYBE(r, redirectLater) {
1590 1591 1592 1593
          auto resolutionPromise = r->addBranch().then(kj::mvCapture(ops,
              [](kj::Array<PipelineOp> ops, kj::Own<RpcResponse>&& response) {
                return response->getResults().getPipelinedCap(ops);
              }));
1594

1595 1596 1597 1598 1599 1600
          return kj::refcounted<PromiseClient>(
              *connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise), nullptr);
        } else {
          // Oh, this pipeline will never get redirected, so just return the PipelineClient.
          return kj::mv(pipelineClient);
        }
1601 1602
      } else if (state.is<Resolved>()) {
        return state.get<Resolved>()->getResults().getPipelinedCap(ops);
Kenton Varda's avatar
Kenton Varda committed
1603
      } else {
1604
        return newBrokenCap(kj::cp(state.get<Broken>()));
Kenton Varda's avatar
Kenton Varda committed
1605
      }
Kenton Varda's avatar
Kenton Varda committed
1606 1607 1608
    }

  private:
1609 1610
    kj::Own<RpcConnectionState> connectionState;
    kj::Maybe<kj::ForkedPromise<kj::Own<RpcResponse>>> redirectLater;
Kenton Varda's avatar
Kenton Varda committed
1611

1612 1613
    typedef kj::Own<QuestionRef> Waiting;
    typedef kj::Own<RpcResponse> Resolved;
Kenton Varda's avatar
Kenton Varda committed
1614
    typedef kj::Exception Broken;
1615
    kj::OneOf<Waiting, Resolved, Broken> state;
Kenton Varda's avatar
Kenton Varda committed
1616 1617 1618 1619 1620

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

1621
    void resolve(kj::Own<RpcResponse>&& response) {
1622 1623
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Resolved>(kj::mv(response));
Kenton Varda's avatar
Kenton Varda committed
1624 1625 1626
    }

    void resolve(const kj::Exception&& exception) {
1627 1628
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Broken>(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1629
    }
Kenton Varda's avatar
Kenton Varda committed
1630 1631
  };

1632 1633
  class RpcResponse: public ResponseHook {
  public:
1634
    virtual AnyPointer::Reader getResults() = 0;
1635
    virtual kj::Own<RpcResponse> addRef() = 0;
1636 1637 1638
  };

  class RpcResponseImpl final: public RpcResponse, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1639
  public:
1640
    RpcResponseImpl(RpcConnectionState& connectionState,
1641 1642
                    kj::Own<QuestionRef>&& questionRef,
                    kj::Own<IncomingRpcMessage>&& message,
1643
                    kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTableArray,
1644
                    AnyPointer::Reader results)
1645 1646
        : connectionState(kj::addRef(connectionState)),
          message(kj::mv(message)),
1647 1648
          capTable(kj::mv(capTableArray)),
          reader(capTable.imbue(results)),
Kenton Varda's avatar
Kenton Varda committed
1649
          questionRef(kj::mv(questionRef)) {}
Kenton Varda's avatar
Kenton Varda committed
1650

1651
    AnyPointer::Reader getResults() override {
Kenton Varda's avatar
Kenton Varda committed
1652 1653 1654
      return reader;
    }

1655
    kj::Own<RpcResponse> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1656 1657 1658
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
1659
  private:
1660
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1661
    kj::Own<IncomingRpcMessage> message;
1662
    ReaderCapabilityTable capTable;
1663
    AnyPointer::Reader reader;
1664
    kj::Own<QuestionRef> questionRef;
Kenton Varda's avatar
Kenton Varda committed
1665 1666 1667 1668 1669 1670
  };

  // =====================================================================================
  // CallContextHook implementation

  class RpcServerResponse {
Kenton Varda's avatar
Kenton Varda committed
1671
  public:
1672
    virtual AnyPointer::Builder getResultsBuilder() = 0;
1673 1674 1675 1676
  };

  class RpcServerResponseImpl final: public RpcServerResponse {
  public:
1677
    RpcServerResponseImpl(RpcConnectionState& connectionState,
1678
                          kj::Own<OutgoingRpcMessage>&& message,
1679 1680 1681
                          rpc::Payload::Builder payload)
        : connectionState(connectionState),
          message(kj::mv(message)),
1682
          payload(payload) {}
Kenton Varda's avatar
Kenton Varda committed
1683

1684
    AnyPointer::Builder getResultsBuilder() override {
1685
      return capTable.imbue(payload.getContent());
Kenton Varda's avatar
Kenton Varda committed
1686 1687
    }

1688 1689 1690 1691 1692
    kj::Maybe<kj::Array<ExportId>> send() {
      // Send the response and return the export list.  Returns nullptr if there were no caps.
      // (Could return a non-null empty array if there were caps but none of them were exports.)

      // Build the cap table.
1693
      auto capTable = this->capTable.getTable();
1694 1695
      auto exports = connectionState.writeDescriptors(capTable, payload);

1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706
      // Capabilities that we are returning are subject to embargos. See `Disembargo` in rpc.capnp.
      // As explained there, in order to deal with the Tribble 4-way race condition, we need to
      // make sure that if we're returning any remote promises, that we ignore any subsequent
      // resolution of those promises for the purpose of pipelined requests on this answer. Luckily,
      // we can modify the cap table in-place.
      for (auto& slot: capTable) {
        KJ_IF_MAYBE(cap, slot) {
          slot = connectionState.getInnermostClient(**cap);
        }
      }

Kenton Varda's avatar
Kenton Varda committed
1707
      message->send();
1708 1709 1710 1711 1712
      if (capTable.size() == 0) {
        return nullptr;
      } else {
        return kj::mv(exports);
      }
Kenton Varda's avatar
Kenton Varda committed
1713 1714 1715
    }

  private:
1716
    RpcConnectionState& connectionState;
Kenton Varda's avatar
Kenton Varda committed
1717
    kj::Own<OutgoingRpcMessage> message;
1718
    BuilderCapabilityTable capTable;
1719
    rpc::Payload::Builder payload;
Kenton Varda's avatar
Kenton Varda committed
1720 1721
  };

1722 1723 1724
  class LocallyRedirectedRpcResponse final
      : public RpcResponse, public RpcServerResponse, public kj::Refcounted{
  public:
1725
    LocallyRedirectedRpcResponse(kj::Maybe<MessageSize> sizeHint)
1726 1727
        : message(sizeHint.map([](MessageSize size) { return size.wordCount; })
                          .orDefault(SUGGESTED_FIRST_SEGMENT_WORDS)) {}
1728

1729
    AnyPointer::Builder getResultsBuilder() override {
1730
      return message.getRoot<AnyPointer>();
1731 1732
    }

1733
    AnyPointer::Reader getResults() override {
1734
      return message.getRoot<AnyPointer>();
1735 1736
    }

1737
    kj::Own<RpcResponse> addRef() override {
1738 1739 1740 1741
      return kj::addRef(*this);
    }

  private:
1742
    MallocMessageBuilder message;
1743 1744
  };

Kenton Varda's avatar
Kenton Varda committed
1745 1746
  class RpcCallContext final: public CallContextHook, public kj::Refcounted {
  public:
1747
    RpcCallContext(RpcConnectionState& connectionState, AnswerId answerId,
1748 1749 1750
                   kj::Own<IncomingRpcMessage>&& request,
                   kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTableArray,
                   const AnyPointer::Reader& params,
1751 1752
                   bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller,
                   uint64_t interfaceId, uint16_t methodId)
1753
        : connectionState(kj::addRef(connectionState)),
1754
          answerId(answerId),
1755 1756
          interfaceId(interfaceId),
          methodId(methodId),
1757
          requestSize(request->getBody().targetSize().wordCount),
Kenton Varda's avatar
Kenton Varda committed
1758
          request(kj::mv(request)),
1759 1760
          paramsCapTable(kj::mv(capTableArray)),
          params(paramsCapTable.imbue(params)),
1761
          returnMessage(nullptr),
Kenton Varda's avatar
Kenton Varda committed
1762
          redirectResults(redirectResults),
1763 1764 1765
          cancelFulfiller(kj::mv(cancelFulfiller)) {
      connectionState.callWordsInFlight += requestSize;
    }
Kenton Varda's avatar
Kenton Varda committed
1766

1767 1768 1769 1770
    ~RpcCallContext() noexcept(false) {
      if (isFirstResponder()) {
        // We haven't sent a return yet, so we must have been canceled.  Send a cancellation return.
        unwindDetector.catchExceptionsIfUnwinding([&]() {
Kenton Varda's avatar
Kenton Varda committed
1771
          // Don't send anything if the connection is broken.
1772 1773
          if (connectionState->connection.is<Connected>()) {
            auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1774
                messageSizeHint<rpc::Return>() + sizeInWords<rpc::Payload>());
Kenton Varda's avatar
Kenton Varda committed
1775
            auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1776

1777
            builder.setAnswerId(answerId);
1778
            builder.setReleaseParamCaps(false);
1779

Kenton Varda's avatar
Kenton Varda committed
1780 1781 1782 1783 1784 1785 1786 1787 1788
            if (redirectResults) {
              // The reason we haven't sent a return is because the results were sent somewhere
              // else.
              builder.setResultsSentElsewhere();
            } else {
              builder.setCanceled();
            }

            message->send();
1789
          }
1790

1791
          cleanupAnswerTable(nullptr, true);
1792 1793 1794 1795
        });
      }
    }

1796
    kj::Own<RpcResponse> consumeRedirectedResponse() {
1797 1798
      KJ_ASSERT(redirectResults);

1799
      if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
1800 1801 1802 1803 1804 1805

      // Note that the context needs to keep its own reference to the response so that it doesn't
      // get GC'd until the PipelineHook drops its reference to the context.
      return kj::downcast<LocallyRedirectedRpcResponse>(*KJ_ASSERT_NONNULL(response)).addRef();
    }

Kenton Varda's avatar
Kenton Varda committed
1806
    void sendReturn() {
1807
      KJ_ASSERT(!redirectResults);
1808 1809 1810 1811

      // Avoid sending results if canceled so that we don't have to figure out whether or not
      // `releaseResultCaps` was set in the already-received `Finish`.
      if (!(cancellationFlags & CANCEL_REQUESTED) && isFirstResponder()) {
1812 1813 1814 1815 1816
        KJ_ASSERT(connectionState->connection.is<Connected>(),
                  "Cancellation should have been requested on disconnect.") {
          return;
        }

1817
        if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
Kenton Varda's avatar
Kenton Varda committed
1818

1819
        returnMessage.setAnswerId(answerId);
1820
        returnMessage.setReleaseParamCaps(false);
Kenton Varda's avatar
Kenton Varda committed
1821

1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832
        kj::Maybe<kj::Array<ExportId>> exports;
        KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
          // Debug info incase send() fails due to overside message.
          KJ_CONTEXT("returning from RPC call", interfaceId, methodId);
          exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send();
        })) {
          responseSent = false;
          sendErrorReturn(kj::mv(*exception));
          return;
        }

1833 1834 1835 1836 1837 1838 1839
        KJ_IF_MAYBE(e, exports) {
          // Caps were returned, so we can't free the pipeline yet.
          cleanupAnswerTable(kj::mv(*e), false);
        } else {
          // No caps in the results, therefore the pipeline is irrelevant.
          cleanupAnswerTable(nullptr, true);
        }
Kenton Varda's avatar
Kenton Varda committed
1840 1841 1842
      }
    }
    void sendErrorReturn(kj::Exception&& exception) {
1843
      KJ_ASSERT(!redirectResults);
Kenton Varda's avatar
Kenton Varda committed
1844
      if (isFirstResponder()) {
1845 1846 1847 1848
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
              messageSizeHint<rpc::Return>() + exceptionSizeHint(exception));
          auto builder = message->getBody().initAs<rpc::Message>().initReturn();
Kenton Varda's avatar
Kenton Varda committed
1849

1850 1851 1852
          builder.setAnswerId(answerId);
          builder.setReleaseParamCaps(false);
          fromException(exception, builder.initException());
Kenton Varda's avatar
Kenton Varda committed
1853

1854 1855
          message->send();
        }
1856 1857 1858 1859

        // Do not allow releasing the pipeline because we want pipelined calls to propagate the
        // exception rather than fail with a "no such field" exception.
        cleanupAnswerTable(nullptr, false);
Kenton Varda's avatar
Kenton Varda committed
1860 1861 1862
      }
    }

1863
    void requestCancel() {
Kenton Varda's avatar
Kenton Varda committed
1864 1865 1866 1867 1868 1869
      // Hints that the caller wishes to cancel this call.  At the next time when cancellation is
      // deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes
      // safe, the RpcCallContext will send a normal return when the call completes.  Either way
      // the RpcCallContext is now responsible for cleaning up the entry in the answer table, since
      // a Finish message was already received.

1870 1871 1872 1873
      bool previouslyAllowedButNotRequested = cancellationFlags == CANCEL_ALLOWED;
      cancellationFlags |= CANCEL_REQUESTED;

      if (previouslyAllowedButNotRequested) {
Kenton Varda's avatar
Kenton Varda committed
1874
        // We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously.  Initiate
Kenton Varda's avatar
Kenton Varda committed
1875
        // the cancellation.
Kenton Varda's avatar
Kenton Varda committed
1876
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1877
      }
Kenton Varda's avatar
Kenton Varda committed
1878
    }
1879 1880 1881

    // implements CallContextHook ------------------------------------

1882
    AnyPointer::Reader getParams() override {
1883 1884 1885 1886 1887 1888
      KJ_REQUIRE(request != nullptr, "Can't call getParams() after releaseParams().");
      return params;
    }
    void releaseParams() override {
      request = nullptr;
    }
1889
    AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
1890
      KJ_IF_MAYBE(r, response) {
1891
        return r->get()->getResultsBuilder();
1892
      } else {
1893 1894
        kj::Own<RpcServerResponse> response;

1895
        if (redirectResults || !connectionState->connection.is<Connected>()) {
1896
          response = kj::refcounted<LocallyRedirectedRpcResponse>(sizeHint);
1897
        } else {
1898
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1899 1900
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Return>() +
                               sizeInWords<rpc::Payload>()));
1901 1902 1903 1904 1905 1906
          returnMessage = message->getBody().initAs<rpc::Message>().initReturn();
          response = kj::heap<RpcServerResponseImpl>(
              *connectionState, kj::mv(message), returnMessage.getResults());
        }

        auto results = response->getResultsBuilder();
Kenton Varda's avatar
Kenton Varda committed
1907 1908
        this->response = kj::mv(response);
        return results;
1909 1910
      }
    }
1911 1912 1913
    kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
      auto result = directTailCall(kj::mv(request));
      KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
1914
        f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
1915 1916 1917 1918
      }
      return kj::mv(result.promise);
    }
    ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
1919 1920 1921
      KJ_REQUIRE(response == nullptr,
                 "Can't call tailCall() after initializing the results struct.");

1922
      if (request->getBrand() == connectionState.get() && !redirectResults) {
1923 1924 1925 1926 1927
        // The tail call is headed towards the peer that called us in the first place, so we can
        // optimize out the return trip.

        KJ_IF_MAYBE(tailInfo, kj::downcast<RpcRequest>(*request).tailSend()) {
          if (isFirstResponder()) {
1928 1929 1930 1931
            if (connectionState->connection.is<Connected>()) {
              auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
                  messageSizeHint<rpc::Return>());
              auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1932

1933 1934 1935
              builder.setAnswerId(answerId);
              builder.setReleaseParamCaps(false);
              builder.setTakeFromOtherQuestion(tailInfo->questionId);
1936

1937 1938
              message->send();
            }
1939 1940 1941

            // There are no caps in our return message, but of course the tail results could have
            // caps, so we must continue to honor pipeline calls (and just bounce them back).
1942
            cleanupAnswerTable(nullptr, false);
1943
          }
1944
          return { kj::mv(tailInfo->promise), kj::mv(tailInfo->pipeline) };
1945 1946 1947 1948 1949 1950 1951
        }
      }

      // Just forwarding to another local call.
      auto promise = request->send();

      // Wait for response.
1952
      auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
1953 1954 1955
        // Copy the response.
        // TODO(perf):  It would be nice if we could somehow make the response get built in-place
        //   but requires some refactoring.
1956
        getResults(tailResponse.targetSize()).set(tailResponse);
1957
      });
1958 1959

      return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
1960
    }
1961 1962
    kj::Promise<AnyPointer::Pipeline> onTailCall() override {
      auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
1963 1964 1965
      tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
      return kj::mv(paf.promise);
    }
1966
    void allowCancellation() override {
1967 1968 1969 1970
      bool previouslyRequestedButNotAllowed = cancellationFlags == CANCEL_REQUESTED;
      cancellationFlags |= CANCEL_ALLOWED;

      if (previouslyRequestedButNotAllowed) {
Kenton Varda's avatar
Kenton Varda committed
1971 1972 1973
        // We just set CANCEL_ALLOWED, and CANCEL_REQUESTED was already set previously.  Initiate
        // the cancellation.
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1974
      }
1975 1976 1977 1978 1979 1980
    }
    kj::Own<CallContextHook> addRef() override {
      return kj::addRef(*this);
    }

  private:
1981
    kj::Own<RpcConnectionState> connectionState;
1982
    AnswerId answerId;
1983

1984 1985 1986 1987
    uint64_t interfaceId;
    uint16_t methodId;
    // For debugging.

Kenton Varda's avatar
Kenton Varda committed
1988 1989
    // Request ---------------------------------------------

1990
    size_t requestSize;  // for flow limit purposes
Kenton Varda's avatar
Kenton Varda committed
1991
    kj::Maybe<kj::Own<IncomingRpcMessage>> request;
1992
    ReaderCapabilityTable paramsCapTable;
1993
    AnyPointer::Reader params;
Kenton Varda's avatar
Kenton Varda committed
1994

Kenton Varda's avatar
Kenton Varda committed
1995 1996 1997
    // Response --------------------------------------------

    kj::Maybe<kj::Own<RpcServerResponse>> response;
Kenton Varda's avatar
Kenton Varda committed
1998
    rpc::Return::Builder returnMessage;
1999
    bool redirectResults = false;
Kenton Varda's avatar
Kenton Varda committed
2000
    bool responseSent = false;
2001
    kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
Kenton Varda's avatar
Kenton Varda committed
2002 2003 2004 2005 2006 2007 2008 2009

    // Cancellation state ----------------------------------

    enum CancellationFlags {
      CANCEL_REQUESTED = 1,
      CANCEL_ALLOWED = 2
    };

2010
    uint8_t cancellationFlags = 0;
2011
    // When both flags are set, the cancellation process will begin.
Kenton Varda's avatar
Kenton Varda committed
2012

2013
    kj::Own<kj::PromiseFulfiller<void>> cancelFulfiller;
Kenton Varda's avatar
Kenton Varda committed
2014 2015 2016
    // Fulfilled when cancellation has been both requested and permitted.  The fulfilled promise is
    // exclusive-joined with the outermost promise waiting on the call return, so fulfilling it
    // cancels that promise.
Kenton Varda's avatar
Kenton Varda committed
2017

2018 2019
    kj::UnwindDetector unwindDetector;

Kenton Varda's avatar
Kenton Varda committed
2020 2021 2022 2023 2024 2025 2026
    // -----------------------------------------------------

    bool isFirstResponder() {
      if (responseSent) {
        return false;
      } else {
        responseSent = true;
2027 2028 2029
        return true;
      }
    }
Kenton Varda's avatar
Kenton Varda committed
2030

2031
    void cleanupAnswerTable(kj::Array<ExportId> resultExports, bool shouldFreePipeline) {
2032 2033 2034
      // We need to remove the `callContext` pointer -- which points back to us -- from the
      // answer table.  Or we might even be responsible for removing the entire answer table
      // entry.
Kenton Varda's avatar
Kenton Varda committed
2035

2036
      if (cancellationFlags & CANCEL_REQUESTED) {
2037 2038 2039
        // Already received `Finish` so it's our job to erase the table entry. We shouldn't have
        // sent results if canceled, so we shouldn't have an export list to deal with.
        KJ_ASSERT(resultExports.size() == 0);
2040
        connectionState->answers.erase(answerId);
2041
      } else {
2042
        // We just have to null out callContext and set the exports.
2043
        auto& answer = connectionState->answers[answerId];
2044
        answer.callContext = nullptr;
2045 2046 2047 2048 2049 2050
        answer.resultExports = kj::mv(resultExports);

        if (shouldFreePipeline) {
          // We can free the pipeline early, because we know all pipeline calls are invalid (e.g.
          // because there are no caps in the result to receive pipeline requests).
          KJ_ASSERT(resultExports.size() == 0);
Kenton Varda's avatar
Kenton Varda committed
2051
          answer.pipeline = nullptr;
Kenton Varda's avatar
Kenton Varda committed
2052 2053
        }
      }
2054 2055 2056 2057

      // Also, this is the right time to stop counting the call against the flow limit.
      connectionState->callWordsInFlight -= requestSize;
      connectionState->maybeUnblockFlow();
Kenton Varda's avatar
Kenton Varda committed
2058
    }
2059 2060
  };

Kenton Varda's avatar
Kenton Varda committed
2061 2062 2063
  // =====================================================================================
  // Message handling

2064 2065 2066 2067 2068 2069 2070 2071 2072
  void maybeUnblockFlow() {
    if (callWordsInFlight < flowLimit) {
      KJ_IF_MAYBE(w, flowWaiter) {
        w->get()->fulfill();
        flowWaiter = nullptr;
      }
    }
  }

2073
  kj::Promise<void> messageLoop() {
2074 2075 2076 2077
    if (!connection.is<Connected>()) {
      return kj::READY_NOW;
    }

2078 2079 2080 2081 2082 2083 2084 2085
    if (callWordsInFlight > flowLimit) {
      auto paf = kj::newPromiseAndFulfiller<void>();
      flowWaiter = kj::mv(paf.fulfiller);
      return paf.promise.then([this]() {
        return messageLoop();
      });
    }

2086
    return connection.get<Connected>()->receiveIncomingMessage().then(
2087
        [this](kj::Maybe<kj::Own<IncomingRpcMessage>>&& message) {
2088 2089
      KJ_IF_MAYBE(m, message) {
        handleMessage(kj::mv(*m));
2090
        return true;
2091
      } else {
2092
        disconnect(KJ_EXCEPTION(DISCONNECTED, "Peer disconnected."));
2093
        return false;
2094
      }
2095
    }).then([this](bool keepGoing) {
2096 2097 2098 2099
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
2100
      if (keepGoing) tasks.add(messageLoop());
2101
    });
2102 2103 2104 2105
  }

  void handleMessage(kj::Own<IncomingRpcMessage> message) {
    auto reader = message->getBody().getAs<rpc::Message>();
2106

2107 2108
    switch (reader.which()) {
      case rpc::Message::UNIMPLEMENTED:
Kenton Varda's avatar
Kenton Varda committed
2109
        handleUnimplemented(reader.getUnimplemented());
2110 2111 2112
        break;

      case rpc::Message::ABORT:
Kenton Varda's avatar
Kenton Varda committed
2113
        handleAbort(reader.getAbort());
2114 2115
        break;

2116 2117 2118 2119
      case rpc::Message::BOOTSTRAP:
        handleBootstrap(kj::mv(message), reader.getBootstrap());
        break;

2120
      case rpc::Message::CALL:
Kenton Varda's avatar
Kenton Varda committed
2121
        handleCall(kj::mv(message), reader.getCall());
2122 2123
        break;

Kenton Varda's avatar
Kenton Varda committed
2124
      case rpc::Message::RETURN:
Kenton Varda's avatar
Kenton Varda committed
2125
        handleReturn(kj::mv(message), reader.getReturn());
Kenton Varda's avatar
Kenton Varda committed
2126 2127 2128
        break;

      case rpc::Message::FINISH:
Kenton Varda's avatar
Kenton Varda committed
2129
        handleFinish(reader.getFinish());
Kenton Varda's avatar
Kenton Varda committed
2130 2131
        break;

Kenton Varda's avatar
Kenton Varda committed
2132
      case rpc::Message::RESOLVE:
2133
        handleResolve(reader.getResolve());
Kenton Varda's avatar
Kenton Varda committed
2134 2135 2136
        break;

      case rpc::Message::RELEASE:
2137
        handleRelease(reader.getRelease());
Kenton Varda's avatar
Kenton Varda committed
2138 2139
        break;

2140
      case rpc::Message::DISEMBARGO:
Kenton Varda's avatar
Kenton Varda committed
2141
        handleDisembargo(reader.getDisembargo());
2142 2143
        break;

2144
      default: {
2145 2146 2147 2148 2149 2150
        if (connection.is<Connected>()) {
          auto message = connection.get<Connected>()->newOutgoingMessage(
              firstSegmentSize(reader.totalSize(), messageSizeHint<void>()));
          message->getBody().initAs<rpc::Message>().setUnimplemented(reader);
          message->send();
        }
2151 2152 2153 2154 2155
        break;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2156
  void handleUnimplemented(const rpc::Message::Reader& message) {
Kenton Varda's avatar
Kenton Varda committed
2157
    switch (message.which()) {
2158
      case rpc::Message::RESOLVE: {
2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180
        auto resolve = message.getResolve();
        switch (resolve.which()) {
          case rpc::Resolve::CAP: {
            auto cap = resolve.getCap();
            switch (cap.which()) {
              case rpc::CapDescriptor::NONE:
                // Nothing to do (but this ought never to happen).
                break;
              case rpc::CapDescriptor::SENDER_HOSTED:
                releaseExport(cap.getSenderHosted(), 1);
                break;
              case rpc::CapDescriptor::SENDER_PROMISE:
                releaseExport(cap.getSenderPromise(), 1);
                break;
              case rpc::CapDescriptor::RECEIVER_ANSWER:
              case rpc::CapDescriptor::RECEIVER_HOSTED:
                // Nothing to do.
                break;
              case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
                releaseExport(cap.getThirdPartyHosted().getVineId(), 1);
                break;
            }
2181
            break;
2182 2183
          }
          case rpc::Resolve::EXCEPTION:
2184 2185 2186
            // Nothing to do.
            break;
        }
Kenton Varda's avatar
Kenton Varda committed
2187
        break;
2188
      }
Kenton Varda's avatar
Kenton Varda committed
2189 2190 2191 2192 2193

      default:
        KJ_FAIL_ASSERT("Peer did not implement required RPC message type.", (uint)message.which());
        break;
    }
2194 2195
  }

Kenton Varda's avatar
Kenton Varda committed
2196
  void handleAbort(const rpc::Exception::Reader& exception) {
2197 2198 2199
    kj::throwRecoverableException(toException(exception));
  }

2200 2201 2202
  // ---------------------------------------------------------------------------
  // Level 0

2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232
  class SingleCapPipeline: public PipelineHook, public kj::Refcounted {
  public:
    SingleCapPipeline(kj::Own<ClientHook>&& cap)
        : cap(kj::mv(cap)) {}

    kj::Own<PipelineHook> addRef() override {
      return kj::addRef(*this);
    }

    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
      if (ops.size() == 0) {
        return cap->addRef();
      } else {
        return newBrokenCap("Invalid pipeline transform.");
      }
    }

  private:
    kj::Own<ClientHook> cap;
  };

  void handleBootstrap(kj::Own<IncomingRpcMessage>&& message,
                       const rpc::Bootstrap::Reader& bootstrap) {
    AnswerId answerId = bootstrap.getQuestionId();

    if (!connection.is<Connected>()) {
      // Disconnected; ignore.
      return;
    }

2233 2234
    VatNetworkBase::Connection& conn = *connection.get<Connected>();
    auto response = conn.newOutgoingMessage(
2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246
        messageSizeHint<rpc::Return>() + sizeInWords<rpc::CapDescriptor>() + 32);

    rpc::Return::Builder ret = response->getBody().getAs<rpc::Message>().initReturn();
    ret.setAnswerId(answerId);

    kj::Own<ClientHook> capHook;
    kj::Array<ExportId> resultExports;
    KJ_DEFER(releaseExports(resultExports));  // in case something goes wrong

    // Call the restorer and initialize the answer.
    KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
      Capability::Client cap = nullptr;
2247 2248 2249 2250 2251

      if (bootstrap.hasDeprecatedObjectId()) {
        KJ_IF_MAYBE(r, restorer) {
          cap = r->baseRestore(bootstrap.getDeprecatedObjectId());
        } else {
2252 2253 2254 2255
          KJ_FAIL_REQUIRE("This vat only supports a bootstrap interface, not the old "
                          "Cap'n-Proto-0.4-style named exports.") { return; }
        }
      } else {
2256
        cap = bootstrapFactory.baseCreateFor(conn.baseGetPeerVatId());
2257 2258
      }

2259
      BuilderCapabilityTable capTable;
2260
      auto payload = ret.initResults();
2261
      capTable.imbue(payload.getContent()).setAs<Capability>(kj::mv(cap));
2262

2263 2264 2265 2266
      auto capTableArray = capTable.getTable();
      KJ_DASSERT(capTableArray.size() == 1);
      resultExports = writeDescriptors(capTableArray, payload);
      capHook = KJ_ASSERT_NONNULL(capTableArray[0])->addRef();
2267 2268 2269 2270 2271 2272 2273 2274 2275
    })) {
      fromException(*exception, ret.initException());
      capHook = newBrokenCap(kj::mv(*exception));
    }

    message = nullptr;

    // Add the answer to the answer table for pipelining and send the response.
    auto& answer = answers[answerId];
2276
    KJ_REQUIRE(!answer.active, "questionId is already in use", answerId) {
2277 2278 2279 2280 2281 2282 2283 2284 2285 2286
      return;
    }

    answer.resultExports = kj::mv(resultExports);
    answer.active = true;
    answer.pipeline = kj::Own<PipelineHook>(kj::refcounted<SingleCapPipeline>(kj::mv(capHook)));

    response->send();
  }

Kenton Varda's avatar
Kenton Varda committed
2287
  void handleCall(kj::Own<IncomingRpcMessage>&& message, const rpc::Call::Reader& call) {
2288
    kj::Own<ClientHook> capability;
2289

Kenton Varda's avatar
Kenton Varda committed
2290 2291 2292 2293 2294
    KJ_IF_MAYBE(t, getMessageTarget(call.getTarget())) {
      capability = kj::mv(*t);
    } else {
      // Exception already reported.
      return;
2295 2296
    }

2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308
    bool redirectResults;
    switch (call.getSendResultsTo().which()) {
      case rpc::Call::SendResultsTo::CALLER:
        redirectResults = false;
        break;
      case rpc::Call::SendResultsTo::YOURSELF:
        redirectResults = true;
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported `Call.sendResultsTo`.") { return; }
    }

2309
    auto payload = call.getParams();
2310
    auto capTableArray = receiveCaps(payload.getCapTable());
Kenton Varda's avatar
Kenton Varda committed
2311 2312
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

2313
    AnswerId answerId = call.getQuestionId();
Kenton Varda's avatar
Kenton Varda committed
2314

2315
    auto context = kj::refcounted<RpcCallContext>(
2316
        *this, answerId, kj::mv(message), kj::mv(capTableArray), payload.getContent(),
2317 2318
        redirectResults, kj::mv(cancelPaf.fulfiller),
        call.getInterfaceId(), call.getMethodId());
2319

2320
    // No more using `call` after this point, as it now belongs to the context.
2321 2322

    {
2323
      auto& answer = answers[answerId];
2324 2325 2326 2327 2328 2329

      KJ_REQUIRE(!answer.active, "questionId is already in use") {
        return;
      }

      answer.active = true;
Kenton Varda's avatar
Kenton Varda committed
2330
      answer.callContext = *context;
Kenton Varda's avatar
Kenton Varda committed
2331 2332
    }

2333 2334
    auto promiseAndPipeline = startCall(
        call.getInterfaceId(), call.getMethodId(), kj::mv(capability), context->addRef());
Kenton Varda's avatar
Kenton Varda committed
2335

2336
    // Things may have changed -- in particular if startCall() immediately called
Kenton Varda's avatar
Kenton Varda committed
2337 2338 2339
    // context->directTailCall().

    {
2340
      auto& answer = answers[answerId];
Kenton Varda's avatar
Kenton Varda committed
2341

2342 2343
      answer.pipeline = kj::mv(promiseAndPipeline.pipeline);

2344
      if (redirectResults) {
Kenton Varda's avatar
Kenton Varda committed
2345
        auto resultsPromise = promiseAndPipeline.promise.then(
2346 2347 2348
            kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) {
              return context->consumeRedirectedResponse();
            }));
Kenton Varda's avatar
Kenton Varda committed
2349 2350

        // If the call that later picks up `redirectedResults` decides to discard it, we need to
2351
        // make sure our call is not itself canceled unless it has called allowCancellation().
Kenton Varda's avatar
Kenton Varda committed
2352 2353
        // So we fork the promise and join one branch with the cancellation promise, in order to
        // hold on to it.
2354
        auto forked = resultsPromise.fork();
Kenton Varda's avatar
Kenton Varda committed
2355 2356
        answer.redirectedResults = forked.addBranch();

2357 2358 2359
        cancelPaf.promise
            .exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}))
            .detach([](kj::Exception&&) {});
2360 2361 2362 2363 2364
      } else {
        // Hack:  Both the success and error continuations need to use the context.  We could
        //   refcount, but both will be destroyed at the same time anyway.
        RpcCallContext* contextPtr = context;

2365
        promiseAndPipeline.promise.then(
2366 2367 2368 2369
            [contextPtr]() {
              contextPtr->sendReturn();
            }, [contextPtr](kj::Exception&& exception) {
              contextPtr->sendErrorReturn(kj::mv(exception));
2370
            }).catch_([&](kj::Exception&& exception) {
2371 2372
              // Handle exceptions that occur in sendReturn()/sendErrorReturn().
              taskFailed(kj::mv(exception));
2373 2374 2375
            }).attach(kj::mv(context))
            .exclusiveJoin(kj::mv(cancelPaf.promise))
            .detach([](kj::Exception&&) {});
2376
      }
2377 2378 2379
    }
  }

2380 2381 2382 2383 2384 2385 2386 2387
  ClientHook::VoidPromiseAndPipeline startCall(
      uint64_t interfaceId, uint64_t methodId,
      kj::Own<ClientHook>&& capability, kj::Own<CallContextHook>&& context) {
    if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
      KJ_IF_MAYBE(g, gateway) {
        // Wait, this is a call to Persistent.save() and we need to translate it through our
        // gateway.

2388 2389 2390 2391 2392 2393 2394 2395 2396
        KJ_IF_MAYBE(resolvedPromise, capability->whenMoreResolved()) {
          // The plot thickens: We're looking at a promise capability. It could end up resolving
          // to a capability outside the gateway, in which case we don't want to translate at all.

          auto promises = resolvedPromise->then(kj::mvCapture(context,
              [this,interfaceId,methodId](kj::Own<CallContextHook>&& context,
                                          kj::Own<ClientHook> resolvedCap) {
            auto vpap = startCall(interfaceId, methodId, kj::mv(resolvedCap), kj::mv(context));
            return kj::tuple(kj::mv(vpap.promise), kj::mv(vpap.pipeline));
2397
          })).attach(addRef(*this), kj::mv(capability)).split();
2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412

          return {
            kj::mv(kj::get<0>(promises)),
            newLocalPromisePipeline(kj::mv(kj::get<1>(promises))),
          };
        }

        if (capability->getBrand() == this) {
          // This capability is one of our own, pointing back out over the network. That means
          // that it would be inappropriate to apply the gateway transformation. We just want to
          // reflect the call back.
          return kj::downcast<RpcClient>(*capability)
              .callNoIntercept(interfaceId, methodId, kj::mv(context));
        }

2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428
        auto params = context->getParams().getAs<Persistent<>::SaveParams>();

        auto requestSize = params.totalSize();
        ++requestSize.capCount;
        requestSize.wordCount += sizeInWords<RealmGateway<>::ExportParams>();

        auto request = g->exportRequest(requestSize);
        request.setCap(Persistent<>::Client(capability->addRef()));
        request.setParams(params);

        context->allowCancellation();
        context->releaseParams();
        return context->directTailCall(RequestHook::from(kj::mv(request)));
      }
    }

2429
    return capability->call(interfaceId, methodId, kj::mv(context));
2430 2431
  }

2432
  kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) {
Kenton Varda's avatar
Kenton Varda committed
2433
    switch (target.which()) {
2434 2435
      case rpc::MessageTarget::IMPORTED_CAP: {
        KJ_IF_MAYBE(exp, exports.find(target.getImportedCap())) {
Kenton Varda's avatar
Kenton Varda committed
2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446
          return exp->clientHook->addRef();
        } else {
          KJ_FAIL_REQUIRE("Message target is not a current export ID.") {
            return nullptr;
          }
        }
        break;
      }

      case rpc::MessageTarget::PROMISED_ANSWER: {
        auto promisedAnswer = target.getPromisedAnswer();
2447
        kj::Own<PipelineHook> pipeline;
Kenton Varda's avatar
Kenton Varda committed
2448

2449 2450 2451 2452 2453 2454 2455
        auto& base = answers[promisedAnswer.getQuestionId()];
        KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") {
          return nullptr;
        }
        KJ_IF_MAYBE(p, base.pipeline) {
          pipeline = p->get()->addRef();
        } else {
2456 2457
          pipeline = newBrokenPipeline(KJ_EXCEPTION(FAILED,
              "Pipeline call on a request that returned no capabilities or was already closed."));
Kenton Varda's avatar
Kenton Varda committed
2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472
        }

        KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
          return pipeline->getPipelinedCap(*ops);
        } else {
          // Exception already thrown.
          return nullptr;
        }
      }

      default:
        KJ_FAIL_REQUIRE("Unknown message target type.", target) {
          return nullptr;
        }
    }
2473 2474

    KJ_UNREACHABLE;
Kenton Varda's avatar
Kenton Varda committed
2475 2476
  }

Kenton Varda's avatar
Kenton Varda committed
2477
  void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
Kenton Varda's avatar
Kenton Varda committed
2478 2479
    // Transitive destructors can end up manipulating the question table and invalidating our
    // pointer into it, so make sure these destructors run later.
2480 2481
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2482
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease;
2483

2484
    KJ_IF_MAYBE(question, questions.find(ret.getAnswerId())) {
2485 2486
      KJ_REQUIRE(question->isAwaitingReturn, "Duplicate Return.") { return; }
      question->isAwaitingReturn = false;
Kenton Varda's avatar
Kenton Varda committed
2487

2488 2489
      if (ret.getReleaseParamCaps()) {
        exportsToRelease = kj::mv(question->paramExports);
Kenton Varda's avatar
Kenton Varda committed
2490
      } else {
2491
        question->paramExports = nullptr;
Kenton Varda's avatar
Kenton Varda committed
2492
      }
2493

2494 2495
      KJ_IF_MAYBE(questionRef, question->selfRef) {
        switch (ret.which()) {
2496
          case rpc::Return::RESULTS: {
2497 2498 2499
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `results`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2500
            }
Kenton Varda's avatar
Kenton Varda committed
2501

2502
            auto payload = ret.getResults();
2503
            auto capTableArray = receiveCaps(payload.getCapTable());
2504
            questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
2505 2506
                *this, kj::addRef(*questionRef), kj::mv(message),
                kj::mv(capTableArray), payload.getContent()));
2507
            break;
2508
          }
2509

2510 2511 2512 2513
          case rpc::Return::EXCEPTION:
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2514
            }
2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526

            questionRef->reject(toException(ret.getException()));
            break;

          case rpc::Return::CANCELED:
            KJ_FAIL_REQUIRE("Return message falsely claims call was canceled.") { return; }
            break;

          case rpc::Return::RESULTS_SENT_ELSEWHERE:
            KJ_REQUIRE(question->isTailCall,
                "`Return` had `resultsSentElsewhere` but this was not a tail call.") {
              return;
2527 2528
            }

2529 2530 2531 2532
            // Tail calls are fulfilled with a null pointer.
            questionRef->fulfill(kj::Own<RpcResponse>());
            break;

2533 2534
          case rpc::Return::TAKE_FROM_OTHER_QUESTION:
            KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2535 2536 2537
              KJ_IF_MAYBE(response, answer->redirectedResults) {
                questionRef->fulfill(kj::mv(*response));
              } else {
2538
                KJ_FAIL_REQUIRE("`Return.takeFromOtherQuestion` referenced a call that did not "
2539
                                "use `sendResultsTo.yourself`.") { return; }
2540 2541
              }
            } else {
2542
              KJ_FAIL_REQUIRE("`Return.takeFromOtherQuestion` had invalid answer ID.") { return; }
2543 2544
            }

2545
            break;
2546

2547 2548 2549 2550
          default:
            KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; }
        }
      } else {
2551
        if (ret.isTakeFromOtherQuestion()) {
2552
          // Be sure to release the tail call's promise.
2553
          KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2554 2555 2556
            promiseToRelease = kj::mv(answer->redirectedResults);
          }
        }
Kenton Varda's avatar
Kenton Varda committed
2557

2558 2559
        // Looks like this question was canceled earlier, so `Finish` was already sent, with
        // `releaseResultCaps` set true so that we don't have to release them here.  We can go
2560
        // ahead and delete it from the table.
2561
        questions.erase(ret.getAnswerId(), *question);
Kenton Varda's avatar
Kenton Varda committed
2562
      }
Kenton Varda's avatar
Kenton Varda committed
2563

Kenton Varda's avatar
Kenton Varda committed
2564 2565 2566 2567 2568
    } else {
      KJ_FAIL_REQUIRE("Invalid question ID in Return message.") { return; }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2569
  void handleFinish(const rpc::Finish::Reader& finish) {
Kenton Varda's avatar
Kenton Varda committed
2570 2571
    // Delay release of these things until return so that transitive destructors don't accidentally
    // modify the answer table and invalidate our pointer into it.
2572 2573
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2574
    Answer answerToRelease;
2575
    kj::Maybe<kj::Own<PipelineHook>> pipelineToRelease;
Kenton Varda's avatar
Kenton Varda committed
2576

2577
    KJ_IF_MAYBE(answer, answers.find(finish.getQuestionId())) {
2578
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2579

2580 2581 2582 2583
      if (finish.getReleaseResultCaps()) {
        exportsToRelease = kj::mv(answer->resultExports);
      } else {
        answer->resultExports = nullptr;
2584
      }
Kenton Varda's avatar
Kenton Varda committed
2585

2586 2587
      pipelineToRelease = kj::mv(answer->pipeline);

2588 2589 2590 2591 2592
      // If the call isn't actually done yet, cancel it.  Otherwise, we can go ahead and erase the
      // question from the table.
      KJ_IF_MAYBE(context, answer->callContext) {
        context->requestCancel();
      } else {
Kenton Varda's avatar
Kenton Varda committed
2593
        answerToRelease = answers.erase(finish.getQuestionId());
2594
      }
Kenton Varda's avatar
Kenton Varda committed
2595
    } else {
2596
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2597
    }
2598 2599
  }

2600 2601 2602
  // ---------------------------------------------------------------------------
  // Level 1

2603
  void handleResolve(const rpc::Resolve::Reader& resolve) {
2604
    kj::Own<ClientHook> replacement;
2605
    kj::Maybe<kj::Exception> exception;
2606 2607 2608 2609

    // Extract the replacement capability.
    switch (resolve.which()) {
      case rpc::Resolve::CAP:
2610 2611 2612 2613 2614
        KJ_IF_MAYBE(cap, receiveCap(resolve.getCap())) {
          replacement = kj::mv(*cap);
        } else {
          KJ_FAIL_REQUIRE("'Resolve' contained 'CapDescriptor.none'.") { return; }
        }
2615 2616 2617
        break;

      case rpc::Resolve::EXCEPTION:
2618 2619 2620 2621
        // We can't set `replacement` to a new broken cap here because this will confuse
        // PromiseClient::Resolve() into thinking that the remote promise resolved to a local
        // capability and therefore a Disembargo is needed. We must actually reject the promise.
        exception = toException(resolve.getException());
2622 2623 2624 2625 2626 2627 2628
        break;

      default:
        KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; }
    }

    // If the import is on the table, fulfill it.
2629
    KJ_IF_MAYBE(import, imports.find(resolve.getPromiseId())) {
2630 2631
      KJ_IF_MAYBE(fulfiller, import->promiseFulfiller) {
        // OK, this is in fact an unfulfilled promise!
2632 2633 2634 2635 2636
        KJ_IF_MAYBE(e, exception) {
          fulfiller->get()->reject(kj::mv(*e));
        } else {
          fulfiller->get()->fulfill(kj::mv(replacement));
        }
2637 2638 2639 2640 2641 2642 2643 2644
      } else if (import->importClient != nullptr) {
        // It appears this is a valid entry on the import table, but was not expected to be a
        // promise.
        KJ_FAIL_REQUIRE("Got 'Resolve' for a non-promise import.") { break; }
      }
    }
  }

2645
  void handleRelease(const rpc::Release::Reader& release) {
Kenton Varda's avatar
Kenton Varda committed
2646
    releaseExport(release.getId(), release.getReferenceCount());
2647 2648
  }

Kenton Varda's avatar
Kenton Varda committed
2649
  void releaseExport(ExportId id, uint refcount) {
2650
    KJ_IF_MAYBE(exp, exports.find(id)) {
2651
      KJ_REQUIRE(refcount <= exp->refcount, "Tried to drop export's refcount below zero.") {
Kenton Varda's avatar
Kenton Varda committed
2652
        return;
2653 2654 2655 2656
      }

      exp->refcount -= refcount;
      if (exp->refcount == 0) {
2657
        exportsByCap.erase(exp->clientHook);
2658
        exports.erase(id, *exp);
2659 2660 2661
      }
    } else {
      KJ_FAIL_REQUIRE("Tried to release invalid export ID.") {
Kenton Varda's avatar
Kenton Varda committed
2662
        return;
2663 2664 2665 2666
      }
    }
  }

2667 2668 2669 2670 2671 2672
  void releaseExports(kj::ArrayPtr<ExportId> exports) {
    for (auto exportId: exports) {
      releaseExport(exportId, 1);
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2673 2674 2675 2676
  void handleDisembargo(const rpc::Disembargo::Reader& disembargo) {
    auto context = disembargo.getContext();
    switch (context.which()) {
      case rpc::Disembargo::Context::SENDER_LOOPBACK: {
2677
        kj::Own<ClientHook> target;
Kenton Varda's avatar
Kenton Varda committed
2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699

        KJ_IF_MAYBE(t, getMessageTarget(disembargo.getTarget())) {
          target = kj::mv(*t);
        } else {
          // Exception already reported.
          return;
        }

        for (;;) {
          KJ_IF_MAYBE(r, target->getResolved()) {
            target = r->addRef();
          } else {
            break;
          }
        }

        KJ_REQUIRE(target->getBrand() == this,
                   "'Disembargo' of type 'senderLoopback' sent to an object that does not point "
                   "back to the sender.") {
          return;
        }

Kenton Varda's avatar
Kenton Varda committed
2700 2701 2702 2703
        EmbargoId embargoId = context.getSenderLoopback();

        // We need to insert an evalLater() here to make sure that any pending calls towards this
        // cap have had time to find their way through the event loop.
2704 2705
        tasks.add(kj::evalLater(kj::mvCapture(
            target, [this,embargoId](kj::Own<ClientHook>&& target) {
2706 2707 2708 2709
          if (!connection.is<Connected>()) {
            return;
          }

2710
          RpcClient& downcasted = kj::downcast<RpcClient>(*target);
Kenton Varda's avatar
Kenton Varda committed
2711

2712
          auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
2713 2714 2715 2716 2717 2718
              messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
          auto builder = message->getBody().initAs<rpc::Message>().initDisembargo();

          {
            auto redirect = downcasted.writeTarget(builder.initTarget());

2719
            // Disembargoes should only be sent to capabilities that were previously the subject of
Kenton Varda's avatar
Kenton Varda committed
2720
            // a `Resolve` message.  But `writeTarget` only ever returns non-null when called on
2721 2722 2723
            // a PromiseClient.  The code which sends `Resolve` and `Return` should have replaced
            // any promise with a direct node in order to solve the Tribble 4-way race condition.
            // See the documentation of Disembargo in rpc.capnp for more.
Kenton Varda's avatar
Kenton Varda committed
2724 2725
            KJ_REQUIRE(redirect == nullptr,
                       "'Disembargo' of type 'senderLoopback' sent to an object that does not "
2726
                       "appear to have been the subject of a previous 'Resolve' message.") {
Kenton Varda's avatar
Kenton Varda committed
2727 2728
              return;
            }
Kenton Varda's avatar
Kenton Varda committed
2729 2730
          }

Kenton Varda's avatar
Kenton Varda committed
2731
          builder.getContext().setReceiverLoopback(embargoId);
Kenton Varda's avatar
Kenton Varda committed
2732

Kenton Varda's avatar
Kenton Varda committed
2733 2734
          message->send();
        })));
Kenton Varda's avatar
Kenton Varda committed
2735 2736 2737 2738

        break;
      }

Kenton Varda's avatar
Kenton Varda committed
2739
      case rpc::Disembargo::Context::RECEIVER_LOOPBACK: {
2740
        KJ_IF_MAYBE(embargo, embargoes.find(context.getReceiverLoopback())) {
Kenton Varda's avatar
Kenton Varda committed
2741
          KJ_ASSERT_NONNULL(embargo->fulfiller)->fulfill();
2742
          embargoes.erase(context.getReceiverLoopback(), *embargo);
Kenton Varda's avatar
Kenton Varda committed
2743 2744 2745 2746 2747 2748
        } else {
          KJ_FAIL_REQUIRE("Invalid embargo ID in 'Disembargo.context.receiverLoopback'.") {
            return;
          }
        }
        break;
Kenton Varda's avatar
Kenton Varda committed
2749
      }
Kenton Varda's avatar
Kenton Varda committed
2750 2751 2752 2753 2754 2755

      default:
        KJ_FAIL_REQUIRE("Unimplemented Disembargo type.", disembargo) { return; }
    }
  }

2756 2757
  // ---------------------------------------------------------------------------
  // Level 2
2758 2759 2760 2761
};

}  // namespace

2762
class RpcSystemBase::Impl final: private BootstrapFactoryBase, private kj::TaskSet::ErrorHandler {
2763
public:
2764 2765 2766
  Impl(VatNetworkBase& network, kj::Maybe<Capability::Client> bootstrapInterface,
       kj::Maybe<RealmGateway<>::Client> gateway)
      : network(network), bootstrapInterface(kj::mv(bootstrapInterface)),
2767 2768 2769 2770 2771 2772
        bootstrapFactory(*this), gateway(kj::mv(gateway)), tasks(*this) {
    tasks.add(acceptLoop());
  }
  Impl(VatNetworkBase& network, BootstrapFactoryBase& bootstrapFactory,
       kj::Maybe<RealmGateway<>::Client> gateway)
      : network(network), bootstrapFactory(bootstrapFactory),
2773
        gateway(kj::mv(gateway)), tasks(*this) {
2774 2775 2776
    tasks.add(acceptLoop());
  }
  Impl(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2777
      : network(network), bootstrapFactory(*this), restorer(restorer), tasks(*this) {
2778 2779 2780 2781
    tasks.add(acceptLoop());
  }

  ~Impl() noexcept(false) {
2782 2783 2784 2785 2786
    unwindDetector.catchExceptionsIfUnwinding([&]() {
      // std::unordered_map doesn't like it when elements' destructors throw, so carefully
      // disassemble it.
      if (!connections.empty()) {
        kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size());
2787
        kj::Exception shutdownException = KJ_EXCEPTION(FAILED, "RpcSystem was destroyed.");
2788 2789 2790 2791
        for (auto& entry: connections) {
          entry.second->disconnect(kj::cp(shutdownException));
          deleteMe.add(kj::mv(entry.second));
        }
2792
      }
2793
    });
2794
  }
2795

2796
  Capability::Client bootstrap(AnyStruct::Reader vatId) {
2797 2798 2799 2800 2801
    // For now we delegate to restore() since it's equivalent, but eventually we'll remove restore()
    // and implement bootstrap() directly.
    return restore(vatId, AnyPointer::Reader());
  }

2802
  Capability::Client restore(AnyStruct::Reader vatId, AnyPointer::Reader objectId) {
2803
    KJ_IF_MAYBE(connection, network.baseConnect(vatId)) {
2804
      auto& state = getConnectionState(kj::mv(*connection));
2805 2806 2807 2808 2809 2810 2811
      return Capability::Client(state.restore(objectId));
    } else KJ_IF_MAYBE(r, restorer) {
      return r->baseRestore(objectId);
    } else {
      return Capability::Client(newBrokenCap(
          "SturdyRef referred to a local object but there is no local SturdyRef restorer."));
    }
2812 2813
  }

2814 2815 2816 2817 2818 2819 2820 2821
  void setFlowLimit(size_t words) {
    flowLimit = words;

    for (auto& conn: connections) {
      conn.second->setFlowLimit(words);
    }
  }

2822 2823
private:
  VatNetworkBase& network;
2824
  kj::Maybe<Capability::Client> bootstrapInterface;
2825
  BootstrapFactoryBase& bootstrapFactory;
2826
  kj::Maybe<RealmGateway<>::Client> gateway;
2827
  kj::Maybe<SturdyRefRestorerBase&> restorer;
2828
  size_t flowLimit = kj::maxValue;
2829 2830 2831 2832
  kj::TaskSet tasks;

  typedef std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>>
      ConnectionMap;
2833
  ConnectionMap connections;
Kenton Varda's avatar
Kenton Varda committed
2834

2835 2836
  kj::UnwindDetector unwindDetector;

2837 2838 2839
  RpcConnectionState& getConnectionState(kj::Own<VatNetworkBase::Connection>&& connection) {
    auto iter = connections.find(connection);
    if (iter == connections.end()) {
2840
      VatNetworkBase::Connection* connectionPtr = connection;
Kenton Varda's avatar
Kenton Varda committed
2841 2842 2843
      auto onDisconnect = kj::newPromiseAndFulfiller<RpcConnectionState::DisconnectInfo>();
      tasks.add(onDisconnect.promise
          .then([this,connectionPtr](RpcConnectionState::DisconnectInfo info) {
2844
        connections.erase(connectionPtr);
Kenton Varda's avatar
Kenton Varda committed
2845
        tasks.add(kj::mv(info.shutdownPromise));
2846 2847
      }));
      auto newState = kj::refcounted<RpcConnectionState>(
2848
          bootstrapFactory, gateway, restorer, kj::mv(connection),
2849
          kj::mv(onDisconnect.fulfiller), flowLimit);
2850
      RpcConnectionState& result = *newState;
2851
      connections.insert(std::make_pair(connectionPtr, kj::mv(newState)));
2852 2853 2854 2855 2856 2857 2858
      return result;
    } else {
      return *iter->second;
    }
  }

  kj::Promise<void> acceptLoop() {
2859
    auto receive = network.baseAccept().then(
2860
        [this](kj::Own<VatNetworkBase::Connection>&& connection) {
2861
      getConnectionState(kj::mv(connection));
2862 2863 2864 2865 2866 2867 2868 2869
    });
    return receive.then([this]() {
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
      tasks.add(acceptLoop());
    });
2870
  }
2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887

  Capability::Client baseCreateFor(AnyStruct::Reader clientId) override {
    // Implements BootstrapFactory::baseCreateFor() in terms of `bootstrapInterface` or `restorer`,
    // for use when we were given one of those instead of an actual `bootstrapFactory`.

    KJ_IF_MAYBE(cap, bootstrapInterface) {
      return *cap;
    } else KJ_IF_MAYBE(r, restorer) {
      return r->baseRestore(AnyPointer::Reader());
    } else {
      return KJ_EXCEPTION(FAILED, "This vat does not expose any public/bootstrap interfaces.");
    }
  }

  void taskFailed(kj::Exception&& exception) override {
    KJ_LOG(ERROR, exception);
  }
2888 2889
};

2890
RpcSystemBase::RpcSystemBase(VatNetworkBase& network,
2891 2892 2893
                             kj::Maybe<Capability::Client> bootstrapInterface,
                             kj::Maybe<RealmGateway<>::Client> gateway)
    : impl(kj::heap<Impl>(network, kj::mv(bootstrapInterface), kj::mv(gateway))) {}
2894 2895 2896 2897
RpcSystemBase::RpcSystemBase(VatNetworkBase& network,
                             BootstrapFactoryBase& bootstrapFactory,
                             kj::Maybe<RealmGateway<>::Client> gateway)
    : impl(kj::heap<Impl>(network, bootstrapFactory, kj::mv(gateway))) {}
2898
RpcSystemBase::RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2899
    : impl(kj::heap<Impl>(network, restorer)) {}
2900
RpcSystemBase::RpcSystemBase(RpcSystemBase&& other) noexcept = default;
2901 2902
RpcSystemBase::~RpcSystemBase() noexcept(false) {}

2903
Capability::Client RpcSystemBase::baseBootstrap(AnyStruct::Reader vatId) {
2904 2905 2906
  return impl->bootstrap(vatId);
}

2907
Capability::Client RpcSystemBase::baseRestore(
2908
    AnyStruct::Reader hostId, AnyPointer::Reader objectId) {
2909
  return impl->restore(hostId, objectId);
2910 2911
}

2912 2913 2914 2915
void RpcSystemBase::baseSetFlowLimit(size_t words) {
  return impl->setFlowLimit(words);
}

2916 2917
}  // namespace _ (private)
}  // namespace capnp