rpc.c++ 105 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22

#include "rpc.h"
23
#include "message.h"
24 25 26
#include <kj/debug.h>
#include <kj/vector.h>
#include <kj/async.h>
Kenton Varda's avatar
Kenton Varda committed
27
#include <kj/one-of.h>
Kenton Varda's avatar
Kenton Varda committed
28
#include <kj/function.h>
29
#include <functional>  // std::greater
30
#include <unordered_map>
Kenton Varda's avatar
Kenton Varda committed
31
#include <map>
32 33 34 35 36 37 38 39
#include <queue>
#include <capnp/rpc.capnp.h>

namespace capnp {
namespace _ {  // private

namespace {

Kenton Varda's avatar
Kenton Varda committed
40 41 42 43 44 45 46 47 48
template <typename T>
inline constexpr uint messageSizeHint() {
  return 1 + sizeInWords<rpc::Message>() + sizeInWords<T>();
}
template <>
inline constexpr uint messageSizeHint<void>() {
  return 1 + sizeInWords<rpc::Message>();
}

49 50 51
constexpr const uint MESSAGE_TARGET_SIZE_HINT = sizeInWords<rpc::MessageTarget>() +
    sizeInWords<rpc::PromisedAnswer>() + 16;  // +16 for ops; hope that's enough

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
constexpr const uint CAP_DESCRIPTOR_SIZE_HINT = sizeInWords<rpc::CapDescriptor>() +
    sizeInWords<rpc::PromisedAnswer>();

constexpr const uint64_t MAX_SIZE_HINT = 1 << 20;

uint copySizeHint(MessageSize size) {
  uint64_t sizeHint = size.wordCount + size.capCount * CAP_DESCRIPTOR_SIZE_HINT;
  return kj::min(MAX_SIZE_HINT, sizeHint);
}

uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint, uint additional) {
  KJ_IF_MAYBE(s, sizeHint) {
    return copySizeHint(*s) + additional;
  } else {
    return 0;
  }
}

Kenton Varda's avatar
Kenton Varda committed
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
kj::Maybe<kj::Array<PipelineOp>> toPipelineOps(List<rpc::PromisedAnswer::Op>::Reader ops) {
  auto result = kj::heapArrayBuilder<PipelineOp>(ops.size());
  for (auto opReader: ops) {
    PipelineOp op;
    switch (opReader.which()) {
      case rpc::PromisedAnswer::Op::NOOP:
        op.type = PipelineOp::NOOP;
        break;
      case rpc::PromisedAnswer::Op::GET_POINTER_FIELD:
        op.type = PipelineOp::GET_POINTER_FIELD;
        op.pointerIndex = opReader.getGetPointerField();
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported pipeline op.", (uint)opReader.which()) {
          return nullptr;
        }
    }
87
    result.add(op);
Kenton Varda's avatar
Kenton Varda committed
88 89 90 91 92
  }
  return result.finish();
}

Orphan<List<rpc::PromisedAnswer::Op>> fromPipelineOps(
Kenton Varda's avatar
Kenton Varda committed
93
    Orphanage orphanage, kj::ArrayPtr<const PipelineOp> ops) {
Kenton Varda's avatar
Kenton Varda committed
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
  auto result = orphanage.newOrphan<List<rpc::PromisedAnswer::Op>>(ops.size());
  auto builder = result.get();
  for (uint i: kj::indices(ops)) {
    rpc::PromisedAnswer::Op::Builder opBuilder = builder[i];
    switch (ops[i].type) {
      case PipelineOp::NOOP:
        opBuilder.setNoop();
        break;
      case PipelineOp::GET_POINTER_FIELD:
        opBuilder.setGetPointerField(ops[i].pointerIndex);
        break;
    }
  }
  return result;
}

Kenton Varda's avatar
Kenton Varda committed
110
kj::Exception toException(const rpc::Exception::Reader& exception) {
111 112
  return kj::Exception(static_cast<kj::Exception::Type>(exception.getType()),
      "(remote)", 0, kj::str("remote exception: ", exception.getReason()));
Kenton Varda's avatar
Kenton Varda committed
113 114 115
}

void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) {
Kenton Varda's avatar
Kenton Varda committed
116 117 118
  // TODO(someday):  Indicate the remote server name as part of the stack trace.  Maybe even
  //   transmit stack traces?
  builder.setReason(exception.getDescription());
119
  builder.setType(static_cast<rpc::Exception::Type>(exception.getType()));
Kenton Varda's avatar
Kenton Varda committed
120 121 122 123 124

  if (exception.getType() == kj::Exception::Type::FAILED &&
      !exception.getDescription().startsWith("remote exception:")) {
    KJ_LOG(INFO, "returning failure over rpc", exception);
  }
Kenton Varda's avatar
Kenton Varda committed
125 126
}

Kenton Varda's avatar
Kenton Varda committed
127
uint exceptionSizeHint(const kj::Exception& exception) {
Kenton Varda's avatar
Kenton Varda committed
128
  return sizeInWords<rpc::Exception>() + exception.getDescription().size() / sizeof(word) + 1;
Kenton Varda's avatar
Kenton Varda committed
129 130
}

Kenton Varda's avatar
Kenton Varda committed
131
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
132

133 134 135 136 137 138 139 140 141 142 143 144 145
template <typename Id, typename T>
class ExportTable {
  // Table mapping integers to T, where the integers are chosen locally.

public:
  kj::Maybe<T&> find(Id id) {
    if (id < slots.size() && slots[id] != nullptr) {
      return slots[id];
    } else {
      return nullptr;
    }
  }

146
  T erase(Id id, T& entry) {
Kenton Varda's avatar
Kenton Varda committed
147 148
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
149 150 151 152 153 154 155 156
    // `entry` is a reference to the entry being released -- we require this in order to prove
    // that the caller has already done a find() to check that this entry exists.  We can't check
    // ourselves because the caller may have nullified the entry in the meantime.
    KJ_DREQUIRE(&entry == &slots[id]);
    T toRelease = kj::mv(slots[id]);
    slots[id] = T();
    freeIds.push(id);
    return toRelease;
157 158 159 160 161 162 163 164 165 166 167 168 169
  }

  T& next(Id& id) {
    if (freeIds.empty()) {
      id = slots.size();
      return slots.add();
    } else {
      id = freeIds.top();
      freeIds.pop();
      return slots[id];
    }
  }

170 171 172 173 174 175 176 177 178
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i = 0; i < slots.size(); i++) {
      if (slots[i] != nullptr) {
        func(i, slots[i]);
      }
    }
  }

179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
private:
  kj::Vector<T> slots;
  std::priority_queue<Id, std::vector<Id>, std::greater<Id>> freeIds;
};

template <typename Id, typename T>
class ImportTable {
  // Table mapping integers to T, where the integers are chosen remotely.

public:
  T& operator[](Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      return high[id];
    }
  }

Kenton Varda's avatar
Kenton Varda committed
197 198 199 200 201
  kj::Maybe<T&> find(Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      auto iter = high.find(id);
Kenton Varda's avatar
Kenton Varda committed
202
      if (iter == high.end()) {
Kenton Varda's avatar
Kenton Varda committed
203 204 205 206 207 208 209
        return nullptr;
      } else {
        return iter->second;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
210 211 212
  T erase(Id id) {
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
213
    if (id < kj::size(low)) {
Kenton Varda's avatar
Kenton Varda committed
214
      T toRelease = kj::mv(low[id]);
215
      low[id] = T();
Kenton Varda's avatar
Kenton Varda committed
216
      return toRelease;
217
    } else {
Kenton Varda's avatar
Kenton Varda committed
218
      T toRelease = kj::mv(high[id]);
219
      high.erase(id);
Kenton Varda's avatar
Kenton Varda committed
220
      return toRelease;
221 222 223
    }
  }

224 225 226 227 228 229 230 231 232 233
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i: kj::indices(low)) {
      func(i, low[i]);
    }
    for (auto& entry: high) {
      func(entry.first, entry.second);
    }
  }

234 235 236 237 238
private:
  T low[16];
  std::unordered_map<Id, T> high;
};

Kenton Varda's avatar
Kenton Varda committed
239 240
// =======================================================================================

241
class RpcConnectionState final: public kj::TaskSet::ErrorHandler, public kj::Refcounted {
242
public:
Kenton Varda's avatar
Kenton Varda committed
243 244 245 246 247
  struct DisconnectInfo {
    kj::Promise<void> shutdownPromise;
    // Task which is working on sending an abort message and cleanly ending the connection.
  };

248
  RpcConnectionState(BootstrapFactoryBase& bootstrapFactory,
249
                     kj::Maybe<RealmGateway<>::Client> gateway,
250
                     kj::Maybe<SturdyRefRestorerBase&> restorer,
251
                     kj::Own<VatNetworkBase::Connection>&& connectionParam,
252 253
                     kj::Own<kj::PromiseFulfiller<DisconnectInfo>>&& disconnectFulfiller,
                     size_t flowLimit)
254
      : bootstrapFactory(bootstrapFactory), gateway(kj::mv(gateway)),
255 256
        restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), flowLimit(flowLimit),
        tasks(*this) {
257
    connection.init<Connected>(kj::mv(connectionParam));
258 259 260
    tasks.add(messageLoop());
  }

261
  kj::Own<ClientHook> restore(AnyPointer::Reader objectId) {
262 263 264 265
    if (connection.is<Disconnected>()) {
      return newBrokenCap(kj::cp(connection.get<Disconnected>()));
    }

266
    QuestionId questionId;
267
    auto& question = questions.next(questionId);
268

269
    question.isAwaitingReturn = true;
270

271
    auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
Kenton Varda's avatar
Kenton Varda committed
272

273 274
    auto questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller));
    question.selfRef = *questionRef;
Kenton Varda's avatar
Kenton Varda committed
275

276
    paf.promise = paf.promise.attach(kj::addRef(*questionRef));
277 278

    {
279
      auto message = connection.get<Connected>()->newOutgoingMessage(
280
          objectId.targetSize().wordCount + messageSizeHint<rpc::Bootstrap>());
281

282
      auto builder = message->getBody().initAs<rpc::Message>().initBootstrap();
283
      builder.setQuestionId(questionId);
284
      builder.getDeprecatedObjectId().set(objectId);
285 286 287 288

      message->send();
    }

289
    auto pipeline = kj::refcounted<RpcPipeline>(*this, kj::mv(questionRef), kj::mv(paf.promise));
290

291
    return pipeline->getPipelinedCap(kj::Array<const PipelineOp>(nullptr));
Kenton Varda's avatar
Kenton Varda committed
292 293
  }

294
  void taskFailed(kj::Exception&& exception) override {
295 296 297 298
    disconnect(kj::mv(exception));
  }

  void disconnect(kj::Exception&& exception) {
299 300 301 302 303
    if (!connection.is<Connected>()) {
      // Already disconnected.
      return;
    }

304 305
    kj::Exception networkException(kj::Exception::Type::DISCONNECTED,
        exception.getFile(), exception.getLine(), kj::heapString(exception.getDescription()));
306 307

    KJ_IF_MAYBE(newException, kj::runCatchingExceptions([&]() {
308 309
      // Carefully pull all the objects out of the tables prior to releasing them because their
      // destructors could come back and mess with the tables.
310 311 312 313
      kj::Vector<kj::Own<PipelineHook>> pipelinesToRelease;
      kj::Vector<kj::Own<ClientHook>> clientsToRelease;
      kj::Vector<kj::Promise<kj::Own<RpcResponse>>> tailCallsToRelease;
      kj::Vector<kj::Promise<void>> resolveOpsToRelease;
314 315

      // All current questions complete with exceptions.
316
      questions.forEach([&](QuestionId id, Question& question) {
Kenton Varda's avatar
Kenton Varda committed
317
        KJ_IF_MAYBE(questionRef, question.selfRef) {
318 319
          // QuestionRef still present.
          questionRef->reject(kj::cp(networkException));
Kenton Varda's avatar
Kenton Varda committed
320
        }
321 322
      });

323
      answers.forEach([&](AnswerId id, Answer& answer) {
324 325 326 327
        KJ_IF_MAYBE(p, answer.pipeline) {
          pipelinesToRelease.add(kj::mv(*p));
        }

328
        KJ_IF_MAYBE(promise, answer.redirectedResults) {
329
          tailCallsToRelease.add(kj::mv(*promise));
330 331
        }

332 333 334 335 336
        KJ_IF_MAYBE(context, answer.callContext) {
          context->requestCancel();
        }
      });

337
      exports.forEach([&](ExportId id, Export& exp) {
338
        clientsToRelease.add(kj::mv(exp.clientHook));
339
        resolveOpsToRelease.add(kj::mv(exp.resolveOp));
340 341 342
        exp = Export();
      });

343
      imports.forEach([&](ImportId id, Import& import) {
344 345
        KJ_IF_MAYBE(f, import.promiseFulfiller) {
          f->get()->reject(kj::cp(networkException));
346 347 348
        }
      });

349
      embargoes.forEach([&](EmbargoId id, Embargo& embargo) {
350 351 352 353
        KJ_IF_MAYBE(f, embargo.fulfiller) {
          f->get()->reject(kj::cp(networkException));
        }
      });
354 355 356 357 358
    })) {
      // Some destructor must have thrown an exception.  There is no appropriate place to report
      // these errors.
      KJ_LOG(ERROR, "Uncaught exception when destroying capabilities dropped by disconnect.",
             *newException);
359 360
    }

361 362 363
    // Send an abort message, but ignore failure.
    kj::runCatchingExceptions([&]() {
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
364
          messageSizeHint<void>() + exceptionSizeHint(exception));
365 366
      fromException(exception, message->getBody().getAs<rpc::Message>().initAbort());
      message->send();
367
    });
368 369

    // Indicate disconnect.
370 371 372 373 374 375 376 377 378 379 380
    auto shutdownPromise = connection.get<Connected>()->shutdown()
        .attach(kj::mv(connection.get<Connected>()))
        .then([]() -> kj::Promise<void> { return kj::READY_NOW; },
              [](kj::Exception&& e) -> kj::Promise<void> {
          // Don't report disconnects as an error.
          if (e.getType() != kj::Exception::Type::DISCONNECTED) {
            return kj::mv(e);
          }
          return kj::READY_NOW;
        });
    disconnectFulfiller->fulfill(DisconnectInfo { kj::mv(shutdownPromise) });
381
    connection.init<Disconnected>(kj::mv(networkException));
382 383
  }

384 385 386 387 388
  void setFlowLimit(size_t words) {
    flowLimit = words;
    maybeUnblockFlow();
  }

389
private:
390
  class RpcClient;
Kenton Varda's avatar
Kenton Varda committed
391
  class ImportClient;
392
  class PromiseClient;
Kenton Varda's avatar
Kenton Varda committed
393
  class QuestionRef;
Kenton Varda's avatar
Kenton Varda committed
394
  class RpcPipeline;
Kenton Varda's avatar
Kenton Varda committed
395
  class RpcCallContext;
Kenton Varda's avatar
Kenton Varda committed
396
  class RpcResponse;
Kenton Varda's avatar
Kenton Varda committed
397

398 399 400 401 402 403
  // =======================================================================================
  // The Four Tables entry types
  //
  // We have to define these before we can define the class's fields.

  typedef uint32_t QuestionId;
404
  typedef QuestionId AnswerId;
405
  typedef uint32_t ExportId;
406 407 408 409 410 411 412 413 414 415
  typedef ExportId ImportId;
  // See equivalent definitions in rpc.capnp.
  //
  // We always use the type that refers to the local table of the same name.  So e.g. although
  // QuestionId and AnswerId are the same type, we use QuestionId when referring to an entry in
  // the local question table (which corresponds to the peer's answer table) and use AnswerId
  // to refer to an entry in our answer table (which corresponds to the peer's question table).
  // Since all messages in the RPC protocol are defined from the sender's point of view, this
  // means that any time we read an ID from a received message, its type should invert.
  // TODO(cleanup):  Perhaps we could enforce that in a type-safe way?  Hmm...
416 417

  struct Question {
418 419 420
    kj::Array<ExportId> paramExports;
    // List of exports that were sent in the request.  If the response has `releaseParamCaps` these
    // will need to be released.
421

Kenton Varda's avatar
Kenton Varda committed
422 423 424
    kj::Maybe<QuestionRef&> selfRef;
    // The local QuestionRef, set to nullptr when it is destroyed, which is also when `Finish` is
    // sent.
425

426 427 428
    bool isAwaitingReturn = false;
    // True from when `Call` is sent until `Return` is received.

429 430 431
    bool isTailCall = false;
    // Is this a tail call?  If so, we don't expect to receive results in the `Return`.

Kenton Varda's avatar
Kenton Varda committed
432
    inline bool operator==(decltype(nullptr)) const {
433
      return !isAwaitingReturn && selfRef == nullptr;
Kenton Varda's avatar
Kenton Varda committed
434 435
    }
    inline bool operator!=(decltype(nullptr)) const { return !operator==(nullptr); }
436 437 438
  };

  struct Answer {
439 440 441 442 443 444
    Answer() = default;
    Answer(const Answer&) = delete;
    Answer(Answer&&) = default;
    Answer& operator=(Answer&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

445 446 447 448
    bool active = false;
    // True from the point when the Call message is received to the point when both the `Finish`
    // message has been received and the `Return` has been sent.

449
    kj::Maybe<kj::Own<PipelineHook>> pipeline;
450 451
    // Send pipelined calls here.  Becomes null as soon as a `Finish` is received.

452
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> redirectedResults;
453 454
    // For locally-redirected calls (Call.sendResultsTo.yourself), this is a promise for the call
    // result, to be picked up by a subsequent `Return`.
455

456
    kj::Maybe<RpcCallContext&> callContext;
457 458
    // The call context, if it's still active.  Becomes null when the `Return` message is sent.
    // This object, if non-null, is owned by `asyncOp`.
459

460 461 462
    kj::Array<ExportId> resultExports;
    // List of exports that were sent in the results.  If the finish has `releaseResultCaps` these
    // will need to be released.
463 464 465 466 467 468
  };

  struct Export {
    uint refcount = 0;
    // When this reaches 0, drop `clientHook` and free this export.

469
    kj::Own<ClientHook> clientHook;
470

471 472 473 474
    kj::Promise<void> resolveOp = nullptr;
    // If this export is a promise (not a settled capability), the `resolveOp` represents the
    // ongoing operation to wait for that promise to resolve and then send a `Resolve` message.

475 476 477 478 479 480 481 482 483 484 485
    inline bool operator==(decltype(nullptr)) const { return refcount == 0; }
    inline bool operator!=(decltype(nullptr)) const { return refcount != 0; }
  };

  struct Import {
    Import() = default;
    Import(const Import&) = delete;
    Import(Import&&) = default;
    Import& operator=(Import&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

486
    kj::Maybe<ImportClient&> importClient;
487 488
    // Becomes null when the import is destroyed.

489 490 491 492 493
    kj::Maybe<RpcClient&> appClient;
    // Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient.
    // Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is
    // resolved and the import is no longer needed).

494
    kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Own<ClientHook>>>> promiseFulfiller;
495 496 497
    // If non-null, the import is a promise.
  };

Kenton Varda's avatar
Kenton Varda committed
498 499 500 501 502 503 504 505 506 507 508 509
  typedef uint32_t EmbargoId;

  struct Embargo {
    // For handling the `Disembargo` message when looping back to self.

    kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
    // Fulfill this when the Disembargo arrives.

    inline bool operator==(decltype(nullptr)) const { return fulfiller == nullptr; }
    inline bool operator!=(decltype(nullptr)) const { return fulfiller != nullptr; }
  };

510 511 512
  // =======================================================================================
  // OK, now we can define RpcConnectionState's member data.

513
  BootstrapFactoryBase& bootstrapFactory;
514
  kj::Maybe<RealmGateway<>::Client> gateway;
515
  kj::Maybe<SturdyRefRestorerBase&> restorer;
516 517 518 519 520 521 522

  typedef kj::Own<VatNetworkBase::Connection> Connected;
  typedef kj::Exception Disconnected;
  kj::OneOf<Connected, Disconnected> connection;
  // Once the connection has failed, we drop it and replace it with an exception, which will be
  // thrown from all further calls.

Kenton Varda's avatar
Kenton Varda committed
523
  kj::Own<kj::PromiseFulfiller<DisconnectInfo>> disconnectFulfiller;
524

525 526
  ExportTable<ExportId, Export> exports;
  ExportTable<QuestionId, Question> questions;
527 528
  ImportTable<AnswerId, Answer> answers;
  ImportTable<ImportId, Import> imports;
529 530 531 532 533 534 535 536 537
  // The Four Tables!
  // The order of the tables is important for correct destruction.

  std::unordered_map<ClientHook*, ExportId> exportsByCap;
  // Maps already-exported ClientHook objects to their ID in the export table.

  ExportTable<EmbargoId, Embargo> embargoes;
  // There are only four tables.  This definitely isn't a fifth table.  I don't know what you're
  // talking about.
538

539 540 541 542 543 544 545
  size_t flowLimit;
  size_t callWordsInFlight = 0;

  kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> flowWaiter;
  // If non-null, we're currently blocking incoming messages waiting for callWordsInFlight to drop
  // below flowLimit. Fulfill this to un-block.

546 547 548
  kj::TaskSet tasks;

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
549
  // ClientHook implementations
550

Kenton Varda's avatar
Kenton Varda committed
551
  class RpcClient: public ClientHook, public kj::Refcounted {
552
  public:
553
    RpcClient(RpcConnectionState& connectionState)
554
        : connectionState(kj::addRef(connectionState)) {}
555

556 557 558 559
    virtual kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) = 0;
    // Writes a CapDescriptor referencing this client.  The CapDescriptor must be sent as part of
    // the very next message sent on the connection, as it may become invalid if other things
    // happen.
Kenton Varda's avatar
Kenton Varda committed
560 561 562 563
    //
    // If writing the descriptor adds a new export to the export table, or increments the refcount
    // on an existing one, then the ID is returned and the caller is responsible for removing it
    // later.
Kenton Varda's avatar
Kenton Varda committed
564

565 566
    virtual kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) = 0;
Kenton Varda's avatar
Kenton Varda committed
567
    // Writes the appropriate call target for calls to this capability and returns null.
Kenton Varda's avatar
Kenton Varda committed
568
    //
Kenton Varda's avatar
Kenton Varda committed
569 570 571 572
    // - OR -
    //
    // If calls have been redirected to some other local ClientHook, returns that hook instead.
    // This can happen if the capability represents a promise that has been resolved.
Kenton Varda's avatar
Kenton Varda committed
573

574
    virtual kj::Own<ClientHook> getInnermostClient() = 0;
Kenton Varda's avatar
Kenton Varda committed
575 576 577 578
    // If this client just wraps some other client -- even if it is only *temporarily* wrapping
    // that other client -- return a reference to the other client, transitively.  Otherwise,
    // return a new reference to *this.

Kenton Varda's avatar
Kenton Varda committed
579 580
    // implements ClientHook -----------------------------------------

581
    Request<AnyPointer, AnyPointer> newCall(
582
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
583 584 585 586 587 588 589 590 591 592 593 594
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          //
          // We pull a neat trick here: We actually end up returning a RequestHook for an import
          // request on the gateway cap, but with the "root" of the request actually pointing
          // to the "params" field of the real request.

          sizeHint = sizeHint.map([](MessageSize hint) {
            ++hint.capCount;
            hint.wordCount += sizeInWords<RealmGateway<>::ImportParams>();
595
            return hint;
596 597 598
          });

          auto request = g->importRequest(sizeHint);
599
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
600

601 602 603 604 605 606 607 608 609 610 611
          // Awkwardly, request.initParams() would return a SaveParams struct, but to construct
          // the Request<AnyPointer, AnyPointer> to return we need an AnyPointer::Builder, and you
          // can't go backwards from a struct builder to an AnyPointer builder. So instead we
          // manually get at the pointer by converting the outer request to AnyStruct and then
          // pulling the pointer from the pointer section.
          auto pointers = toAny(request).getPointerSection();
          KJ_ASSERT(pointers.size() >= 2);
          auto paramsPtr = pointers[1];
          KJ_ASSERT(paramsPtr.isNull());

          return Request<AnyPointer, AnyPointer>(paramsPtr, RequestHook::from(kj::mv(request)));
612 613 614
        }
      }

615 616 617 618 619
      return newCallNoIntercept(interfaceId, methodId, sizeHint);
    }

    Request<AnyPointer, AnyPointer> newCallNoIntercept(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) {
620 621 622 623
      if (!connectionState->connection.is<Connected>()) {
        return newBrokenRequest(kj::cp(connectionState->connection.get<Disconnected>()), sizeHint);
      }

624
      auto request = kj::heap<RpcRequest>(
625 626
          *connectionState, *connectionState->connection.get<Connected>(),
          sizeHint, kj::addRef(*this));
627 628 629 630 631 632
      auto callBuilder = request->getCall();

      callBuilder.setInterfaceId(interfaceId);
      callBuilder.setMethodId(methodId);

      auto root = request->getRoot();
633
      return Request<AnyPointer, AnyPointer>(root, kj::mv(request));
634 635
    }

Kenton Varda's avatar
Kenton Varda committed
636
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
637
                                kj::Own<CallContextHook>&& context) override {
638 639 640 641 642 643 644 645 646 647 648
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          auto params = context->getParams().getAs<Persistent<>::SaveParams>();

          auto requestSize = params.totalSize();
          ++requestSize.capCount;
          requestSize.wordCount += sizeInWords<RealmGateway<>::ImportParams>();

          auto request = g->importRequest(requestSize);
649
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
650 651 652 653 654 655 656 657
          request.setParams(params);

          context->allowCancellation();
          context->releaseParams();
          return context->directTailCall(RequestHook::from(kj::mv(request)));
        }
      }

658 659 660 661 662 663 664
      return callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    VoidPromiseAndPipeline callNoIntercept(uint64_t interfaceId, uint16_t methodId,
                                           kj::Own<CallContextHook>&& context) {
      // Implement call() by copying params and results messages.

Kenton Varda's avatar
Kenton Varda committed
665
      auto params = context->getParams();
666
      auto request = newCallNoIntercept(interfaceId, methodId, params.targetSize());
Kenton Varda's avatar
Kenton Varda committed
667

668
      request.set(params);
Kenton Varda's avatar
Kenton Varda committed
669 670
      context->releaseParams();

671
      // We can and should propagate cancellation.
672
      context->allowCancellation();
673

674
      return context->directTailCall(RequestHook::from(kj::mv(request)));
Kenton Varda's avatar
Kenton Varda committed
675 676
    }

677
    kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
678 679
      return kj::addRef(*this);
    }
680
    const void* getBrand() override {
681
      return connectionState.get();
Kenton Varda's avatar
Kenton Varda committed
682 683
    }

684
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
685 686
  };

687 688 689 690
  class ImportClient final: public RpcClient {
    // A ClientHook that wraps an entry in the import table.

  public:
691
    ImportClient(RpcConnectionState& connectionState, ImportId importId)
Kenton Varda's avatar
Kenton Varda committed
692 693
        : RpcClient(connectionState), importId(importId) {}

Kenton Varda's avatar
Kenton Varda committed
694
    ~ImportClient() noexcept(false) {
695
      unwindDetector.catchExceptionsIfUnwinding([&]() {
696
        // Remove self from the import table, if the table is still pointing at us.
697 698 699 700 701
        KJ_IF_MAYBE(import, connectionState->imports.find(importId)) {
          KJ_IF_MAYBE(i, import->importClient) {
            if (i == this) {
              connectionState->imports.erase(importId);
            }
702 703
          }
        }
Kenton Varda's avatar
Kenton Varda committed
704

705
        // Send a message releasing our remote references.
706 707
        if (remoteRefcount > 0 && connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
708 709 710 711 712 713 714
              messageSizeHint<rpc::Release>());
          rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease();
          builder.setId(importId);
          builder.setReferenceCount(remoteRefcount);
          message->send();
        }
      });
715 716
    }

717 718 719
    void addRemoteRef() {
      // Add a new RemoteRef and return a new ref to this client representing it.
      ++remoteRefcount;
Kenton Varda's avatar
Kenton Varda committed
720
    }
721

722
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
723
      descriptor.setReceiverHosted(importId);
Kenton Varda's avatar
Kenton Varda committed
724
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
725 726
    }

727 728
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
729
      target.setImportedCap(importId);
Kenton Varda's avatar
Kenton Varda committed
730
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
731 732
    }

733
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
734 735 736
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
737
    // implements ClientHook -----------------------------------------
738

739
    kj::Maybe<ClientHook&> getResolved() override {
740 741 742
      return nullptr;
    }

743
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
744 745 746
      return nullptr;
    }

Kenton Varda's avatar
Kenton Varda committed
747
  private:
748
    ImportId importId;
Kenton Varda's avatar
Kenton Varda committed
749 750 751

    uint remoteRefcount = 0;
    // Number of times we've received this import from the peer.
752 753

    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
754 755
  };

756 757 758
  class PipelineClient final: public RpcClient {
    // A ClientHook representing a pipelined promise.  Always wrapped in PromiseClient.

Kenton Varda's avatar
Kenton Varda committed
759
  public:
760 761
    PipelineClient(RpcConnectionState& connectionState,
                   kj::Own<QuestionRef>&& questionRef,
762
                   kj::Array<PipelineOp>&& ops)
Kenton Varda's avatar
Kenton Varda committed
763
        : RpcClient(connectionState), questionRef(kj::mv(questionRef)), ops(kj::mv(ops)) {}
Kenton Varda's avatar
Kenton Varda committed
764

765
   kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
766 767 768 769 770
      auto promisedAnswer = descriptor.initReceiverAnswer();
      promisedAnswer.setQuestionId(questionRef->getId());
      promisedAnswer.adoptTransform(fromPipelineOps(
          Orphanage::getForMessageContaining(descriptor), ops));
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
771 772
    }

773 774
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
Kenton Varda's avatar
Kenton Varda committed
775 776 777 778
      auto builder = target.initPromisedAnswer();
      builder.setQuestionId(questionRef->getId());
      builder.adoptTransform(fromPipelineOps(Orphanage::getForMessageContaining(builder), ops));
      return nullptr;
779 780
    }

781
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
782 783 784
      return kj::addRef(*this);
    }

785
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
786

787
    kj::Maybe<ClientHook&> getResolved() override {
788 789 790
      return nullptr;
    }

791
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
792
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
793 794 795
    }

  private:
796
    kj::Own<QuestionRef> questionRef;
797
    kj::Array<PipelineOp> ops;
Kenton Varda's avatar
Kenton Varda committed
798 799
  };

800 801 802 803
  class PromiseClient final: public RpcClient {
    // A ClientHook that initially wraps one client (in practice, an ImportClient or a
    // PipelineClient) and then, later on, redirects to some other client.

Kenton Varda's avatar
Kenton Varda committed
804
  public:
805 806 807
    PromiseClient(RpcConnectionState& connectionState,
                  kj::Own<ClientHook> initial,
                  kj::Promise<kj::Own<ClientHook>> eventual,
808
                  kj::Maybe<ImportId> importId)
809
        : RpcClient(connectionState),
810 811
          isResolved(false),
          cap(kj::mv(initial)),
812
          importId(importId),
813 814 815
          fork(eventual.fork()),
          resolveSelfPromise(fork.addBranch().then(
              [this](kj::Own<ClientHook>&& resolution) {
816
                resolve(kj::mv(resolution), false);
817
              }, [this](kj::Exception&& exception) {
818
                resolve(newBrokenCap(kj::mv(exception)), true);
819 820 821 822
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
823
              })) {
824 825 826 827 828 829
      // Create a client that starts out forwarding all calls to `initial` but, once `eventual`
      // resolves, will forward there instead.  In addition, `whenMoreResolved()` will return a fork
      // of `eventual`.  Note that this means the application could hold on to `eventual` even after
      // the `PromiseClient` is destroyed; `eventual` must therefore make sure to hold references to
      // anything that needs to stay alive in order to resolve it correctly (such as making sure the
      // import ID is not released).
Kenton Varda's avatar
Kenton Varda committed
830
    }
Kenton Varda's avatar
Kenton Varda committed
831

832 833 834 835 836 837
    ~PromiseClient() noexcept(false) {
      KJ_IF_MAYBE(id, importId) {
        // This object is representing an import promise.  That means the import table may still
        // contain a pointer back to it.  Remove that pointer.  Note that we have to verify that
        // the import still exists and the pointer still points back to this object because this
        // object may actually outlive the import.
838
        KJ_IF_MAYBE(import, connectionState->imports.find(*id)) {
839 840 841 842 843 844 845 846 847
          KJ_IF_MAYBE(c, import->appClient) {
            if (c == this) {
              import->appClient = nullptr;
            }
          }
        }
      }
    }

848
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
849
      receivedCall = true;
850
      return connectionState->writeDescriptor(*cap, descriptor);
Kenton Varda's avatar
Kenton Varda committed
851
    }
Kenton Varda's avatar
Kenton Varda committed
852

853 854
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
855
      receivedCall = true;
856
      return connectionState->writeTarget(*cap, target);
Kenton Varda's avatar
Kenton Varda committed
857 858
    }

859
    kj::Own<ClientHook> getInnermostClient() override {
860
      receivedCall = true;
861
      return connectionState->getInnermostClient(*cap);
Kenton Varda's avatar
Kenton Varda committed
862 863
    }

Kenton Varda's avatar
Kenton Varda committed
864
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
865

866
    Request<AnyPointer, AnyPointer> newCall(
867
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
868 869 870 871 872 873 874 875 876 877
      if (!isResolved && interfaceId == typeId<Persistent<>>() && methodId == 0 &&
          connectionState->gateway != nullptr) {
        // This is a call to Persistent.save(), and we're not resolved yet, and the underlying
        // remote capability will perform a gateway translation. This isn't right if the promise
        // ultimately resolves to a local capability. Instead, we'll need to queue the call until
        // the promise resolves.
        return newLocalPromiseClient(fork.addBranch())
            ->newCall(interfaceId, methodId, sizeHint);
      }

878
      receivedCall = true;
879
      return cap->newCall(interfaceId, methodId, sizeHint);
880 881
    }

882
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
883
                                kj::Own<CallContextHook>&& context) override {
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903
      if (!isResolved && interfaceId == typeId<Persistent<>>() && methodId == 0 &&
          connectionState->gateway != nullptr) {
        // This is a call to Persistent.save(), and we're not resolved yet, and the underlying
        // remote capability will perform a gateway translation. This isn't right if the promise
        // ultimately resolves to a local capability. Instead, we'll need to queue the call until
        // the promise resolves.

        auto vpapPromises = fork.addBranch().then(kj::mvCapture(context,
            [interfaceId,methodId](kj::Own<CallContextHook>&& context,
                                   kj::Own<ClientHook> resolvedCap) {
          auto vpap = resolvedCap->call(interfaceId, methodId, kj::mv(context));
          return kj::tuple(kj::mv(vpap.promise), kj::mv(vpap.pipeline));
        })).split();

        return {
          kj::mv(kj::get<0>(vpapPromises)),
          newLocalPromisePipeline(kj::mv(kj::get<1>(vpapPromises))),
        };
      }

904
      receivedCall = true;
905
      return cap->call(interfaceId, methodId, kj::mv(context));
906 907
    }

908
    kj::Maybe<ClientHook&> getResolved() override {
909 910
      if (isResolved) {
        return *cap;
911 912 913
      } else {
        return nullptr;
      }
Kenton Varda's avatar
Kenton Varda committed
914 915
    }

916
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
917
      return fork.addBranch();
Kenton Varda's avatar
Kenton Varda committed
918
    }
Kenton Varda's avatar
Kenton Varda committed
919 920

  private:
921 922
    bool isResolved;
    kj::Own<ClientHook> cap;
923

924
    kj::Maybe<ImportId> importId;
925
    kj::ForkedPromise<kj::Own<ClientHook>> fork;
Kenton Varda's avatar
Kenton Varda committed
926 927 928 929 930

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

931
    bool receivedCall = false;
Kenton Varda's avatar
Kenton Varda committed
932

933
    void resolve(kj::Own<ClientHook> replacement, bool isError) {
934 935 936 937
      const void* replacementBrand = replacement->getBrand();
      if (replacementBrand != connectionState.get() &&
          replacementBrand != &ClientHook::NULL_CAPABILITY_BRAND &&
          receivedCall && !isError && connectionState->connection.is<Connected>()) {
938 939 940 941 942
        // The new capability is hosted locally, not on the remote machine.  And, we had made calls
        // to the promise.  We need to make sure those calls echo back to us before we allow new
        // calls to go directly to the local capability, so we need to set a local embargo and send
        // a `Disembargo` to echo through the peer.

943
        auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
944
            messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
Kenton Varda's avatar
Kenton Varda committed
945

Kenton Varda's avatar
Kenton Varda committed
946
        auto disembargo = message->getBody().initAs<rpc::Message>().initDisembargo();
Kenton Varda's avatar
Kenton Varda committed
947 948

        {
949
          auto redirect = connectionState->writeTarget(*cap, disembargo.initTarget());
Kenton Varda's avatar
Kenton Varda committed
950 951 952 953 954
          KJ_ASSERT(redirect == nullptr,
                    "Original promise target should always be from this RPC connection.");
        }

        EmbargoId embargoId;
955
        Embargo& embargo = connectionState->embargoes.next(embargoId);
Kenton Varda's avatar
Kenton Varda committed
956 957 958 959 960 961 962

        disembargo.getContext().setSenderLoopback(embargoId);

        auto paf = kj::newPromiseAndFulfiller<void>();
        embargo.fulfiller = kj::mv(paf.fulfiller);

        // Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back.
963 964
        auto embargoPromise = paf.promise.then(
            kj::mvCapture(replacement, [this](kj::Own<ClientHook>&& replacement) {
Kenton Varda's avatar
Kenton Varda committed
965 966 967 968 969
              return kj::mv(replacement);
            }));

        // We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise
        // client instead.
970
        replacement = newLocalPromiseClient(kj::mv(embargoPromise));
Kenton Varda's avatar
Kenton Varda committed
971 972 973 974 975

        // Send the `Disembargo`.
        message->send();
      }

976
      cap = kj::mv(replacement);
977
      isResolved = true;
Kenton Varda's avatar
Kenton Varda committed
978
    }
Kenton Varda's avatar
Kenton Varda committed
979
  };
980

981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
  class NoInterceptClient final: public RpcClient {
    // A wrapper around an RpcClient which bypasses special handling of "save" requests. When we
    // intercept a "save" request and invoke a RealmGateway, we give it a version of the capability
    // with intercepting disabled, since usually the first thing the RealmGateway will do is turn
    // around and call save() again.
    //
    // This is admittedly sort of backwards: the interception of "save" ought to be the part
    // implemented by a wrapper. However, that would require placing a wrapper around every
    // RpcClient we create whereas NoInterceptClient only needs to be injected after a save()
    // request occurs and is intercepted.

  public:
    NoInterceptClient(RpcClient& inner)
        : RpcClient(*inner.connectionState),
          inner(kj::addRef(inner)) {}

    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
      return inner->writeDescriptor(descriptor);
    }

    kj::Maybe<kj::Own<ClientHook>> writeTarget(rpc::MessageTarget::Builder target) override {
      return inner->writeTarget(target);
    }

    kj::Own<ClientHook> getInnermostClient() override {
      return inner->getInnermostClient();
    }

    Request<AnyPointer, AnyPointer> newCall(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
      return inner->newCallNoIntercept(interfaceId, methodId, sizeHint);
    }
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
                                kj::Own<CallContextHook>&& context) override {
      return inner->callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    kj::Maybe<ClientHook&> getResolved() override {
      return nullptr;
    }

    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
      return nullptr;
    }

  private:
    kj::Own<RpcClient> inner;
  };

1030
  kj::Maybe<ExportId> writeDescriptor(ClientHook& cap, rpc::CapDescriptor::Builder descriptor) {
1031
    // Write a descriptor for the given capability.
Kenton Varda's avatar
Kenton Varda committed
1032

1033
    // Find the innermost wrapped capability.
1034
    ClientHook* inner = &cap;
1035 1036 1037 1038 1039 1040 1041 1042 1043
    for (;;) {
      KJ_IF_MAYBE(resolved, inner->getResolved()) {
        inner = resolved;
      } else {
        break;
      }
    }

    if (inner->getBrand() == this) {
1044
      return kj::downcast<RpcClient>(*inner).writeDescriptor(descriptor);
Kenton Varda's avatar
Kenton Varda committed
1045
    } else {
1046 1047
      auto iter = exportsByCap.find(inner);
      if (iter != exportsByCap.end()) {
1048
        // We've already seen and exported this capability before.  Just up the refcount.
1049
        auto& exp = KJ_ASSERT_NONNULL(exports.find(iter->second));
Kenton Varda's avatar
Kenton Varda committed
1050 1051 1052 1053
        ++exp.refcount;
        descriptor.setSenderHosted(iter->second);
        return iter->second;
      } else {
1054
        // This is the first time we've seen this capability.
Kenton Varda's avatar
Kenton Varda committed
1055
        ExportId exportId;
1056 1057
        auto& exp = exports.next(exportId);
        exportsByCap[inner] = exportId;
Kenton Varda's avatar
Kenton Varda committed
1058
        exp.refcount = 1;
1059 1060 1061 1062
        exp.clientHook = inner->addRef();

        KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) {
          // This is a promise.  Arrange for the `Resolve` message to be sent later.
Kenton Varda's avatar
Kenton Varda committed
1063
          exp.resolveOp = resolveExportedPromise(exportId, kj::mv(*wrapped));
1064 1065 1066
          descriptor.setSenderPromise(exportId);
        } else {
          descriptor.setSenderHosted(exportId);
1067 1068
        }

Kenton Varda's avatar
Kenton Varda committed
1069 1070
        return exportId;
      }
Kenton Varda's avatar
Kenton Varda committed
1071 1072 1073
    }
  }

1074
  kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> capTable,
1075 1076 1077 1078
                                       rpc::Payload::Builder payload) {
    auto capTableBuilder = payload.initCapTable(capTable.size());
    kj::Vector<ExportId> exports(capTable.size());
    for (uint i: kj::indices(capTable)) {
1079 1080 1081 1082 1083 1084
      KJ_IF_MAYBE(cap, capTable[i]) {
        KJ_IF_MAYBE(exportId, writeDescriptor(**cap, capTableBuilder[i])) {
          exports.add(*exportId);
        }
      } else {
        capTableBuilder[i].setNone();
1085 1086 1087 1088 1089
      }
    }
    return exports.releaseAsArray();
  }

1090
  kj::Maybe<kj::Own<ClientHook>> writeTarget(ClientHook& cap, rpc::MessageTarget::Builder target) {
Kenton Varda's avatar
Kenton Varda committed
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101
    // If calls to the given capability should pass over this connection, fill in `target`
    // appropriately for such a call and return nullptr.  Otherwise, return a `ClientHook` to which
    // the call should be forwarded; the caller should then delegate the call to that `ClientHook`.
    //
    // The main case where this ends up returning non-null is if `cap` is a promise that has
    // recently resolved.  The application might have started building a request before the promise
    // resolved, and so the request may have been built on the assumption that it would be sent over
    // this network connection, but then the promise resolved to point somewhere else before the
    // request was sent.  Now the request has to be redirected to the new target instead.

    if (cap.getBrand() == this) {
1102
      return kj::downcast<RpcClient>(cap).writeTarget(target);
Kenton Varda's avatar
Kenton Varda committed
1103 1104 1105 1106 1107
    } else {
      return cap.addRef();
    }
  }

1108 1109
  kj::Own<ClientHook> getInnermostClient(ClientHook& client) {
    ClientHook* ptr = &client;
Kenton Varda's avatar
Kenton Varda committed
1110 1111 1112 1113 1114 1115 1116 1117 1118
    for (;;) {
      KJ_IF_MAYBE(inner, ptr->getResolved()) {
        ptr = inner;
      } else {
        break;
      }
    }

    if (ptr->getBrand() == this) {
1119
      return kj::downcast<RpcClient>(*ptr).getInnermostClient();
Kenton Varda's avatar
Kenton Varda committed
1120 1121 1122 1123 1124 1125
    } else {
      return ptr->addRef();
    }
  }

  kj::Promise<void> resolveExportedPromise(
1126
      ExportId exportId, kj::Promise<kj::Own<ClientHook>>&& promise) {
Kenton Varda's avatar
Kenton Varda committed
1127 1128 1129 1130
    // Implements exporting of a promise.  The promise has been exported under the given ID, and is
    // to eventually resolve to the ClientHook produced by `promise`.  This method waits for that
    // resolve to happen and then sends the appropriate `Resolve` message to the peer.

1131
    return promise.then(
1132
        [this,exportId](kj::Own<ClientHook>&& resolution) -> kj::Promise<void> {
Kenton Varda's avatar
Kenton Varda committed
1133 1134
      // Successful resolution.

1135 1136 1137 1138 1139
      KJ_ASSERT(connection.is<Connected>(),
                "Resolving export should have been canceled on disconnect.") {
        return kj::READY_NOW;
      }

Kenton Varda's avatar
Kenton Varda committed
1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
      // Get the innermost ClientHook backing the resolved client.  This includes traversing
      // PromiseClients that haven't resolved yet to their underlying ImportClient or
      // PipelineClient, so that we get a remote promise that might resolve later.  This is
      // important to make sure that if the peer sends a `Disembargo` back to us, it bounces back
      // correctly instead of going to the result of some future resolution.  See the documentation
      // for `Disembargo` in `rpc.capnp`.
      resolution = getInnermostClient(*resolution);

      // Update the export table to point at this object instead.  We know that our entry in the
      // export table is still live because when it is destroyed the asynchronous resolution task
      // (i.e. this code) is canceled.
1151 1152
      auto& exp = KJ_ASSERT_NONNULL(exports.find(exportId));
      exportsByCap.erase(exp.clientHook);
Kenton Varda's avatar
Kenton Varda committed
1153 1154
      exp.clientHook = kj::mv(resolution);

1155
      if (exp.clientHook->getBrand() != this) {
Kenton Varda's avatar
Kenton Varda committed
1156 1157 1158
        // We're resolving to a local capability.  If we're resolving to a promise, we might be
        // able to reuse our export table entry and avoid sending a message.

1159
        KJ_IF_MAYBE(promise, exp.clientHook->whenMoreResolved()) {
Kenton Varda's avatar
Kenton Varda committed
1160 1161 1162 1163
          // We're replacing a promise with another local promise.  In this case, we might actually
          // be able to just reuse the existing export table entry to represent the new promise --
          // unless it already has an entry.  Let's check.

1164
          auto insertResult = exportsByCap.insert(std::make_pair(exp.clientHook.get(), exportId));
Kenton Varda's avatar
Kenton Varda committed
1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175

          if (insertResult.second) {
            // The new promise was not already in the table, therefore the existing export table
            // entry has now been repurposed to represent it.  There is no need to send a resolve
            // message at all.  We do, however, have to start resolving the next promise.
            return resolveExportedPromise(exportId, kj::mv(*promise));
          }
        }
      }

      // OK, we have to send a `Resolve` message.
1176
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1177 1178 1179
          messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
1180
      writeDescriptor(*exp.clientHook, resolve.initCap());
Kenton Varda's avatar
Kenton Varda committed
1181 1182 1183 1184 1185
      message->send();

      return kj::READY_NOW;
    }, [this,exportId](kj::Exception&& exception) {
      // send error resolution
1186
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1187 1188 1189 1190 1191
          messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
      fromException(exception, resolve.initException());
      message->send();
1192 1193 1194
    }).eagerlyEvaluate([this](kj::Exception&& exception) {
      // Put the exception on the TaskSet which will cause the connection to be terminated.
      tasks.add(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1195 1196 1197
    });
  }

Kenton Varda's avatar
Kenton Varda committed
1198
  // =====================================================================================
1199
  // Interpreting CapDescriptor
Kenton Varda's avatar
Kenton Varda committed
1200

1201
  kj::Own<ClientHook> import(ImportId importId, bool isPromise) {
1202
    // Receive a new import.
Kenton Varda's avatar
Kenton Varda committed
1203

1204 1205
    auto& import = imports[importId];
    kj::Own<ImportClient> importClient;
Kenton Varda's avatar
Kenton Varda committed
1206

1207 1208 1209 1210 1211 1212
    // Create the ImportClient, or if one already exists, use it.
    KJ_IF_MAYBE(c, import.importClient) {
      importClient = kj::addRef(*c);
    } else {
      importClient = kj::refcounted<ImportClient>(*this, importId);
      import.importClient = *importClient;
Kenton Varda's avatar
Kenton Varda committed
1213
    }
1214

1215 1216
    // We just received a copy of this import ID, so the remote refcount has gone up.
    importClient->addRemoteRef();
Kenton Varda's avatar
Kenton Varda committed
1217

1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235
    if (isPromise) {
      // We need to construct a PromiseClient around this import, if we haven't already.
      KJ_IF_MAYBE(c, import.appClient) {
        // Use the existing one.
        return kj::addRef(*c);
      } else {
        // Create a promise for this import's resolution.
        auto paf = kj::newPromiseAndFulfiller<kj::Own<ClientHook>>();
        import.promiseFulfiller = kj::mv(paf.fulfiller);

        // Make sure the import is not destroyed while this promise exists.
        paf.promise = paf.promise.attach(kj::addRef(*importClient));

        // Create a PromiseClient around it and return it.
        auto result = kj::refcounted<PromiseClient>(
            *this, kj::mv(importClient), kj::mv(paf.promise), importId);
        import.appClient = *result;
        return kj::mv(result);
1236
      }
1237 1238 1239
    } else {
      import.appClient = *importClient;
      return kj::mv(importClient);
1240
    }
1241
  }
1242

1243
  kj::Maybe<kj::Own<ClientHook>> receiveCap(rpc::CapDescriptor::Reader descriptor) {
1244 1245
    switch (descriptor.which()) {
      case rpc::CapDescriptor::NONE:
1246
        return nullptr;
1247

1248 1249 1250 1251
      case rpc::CapDescriptor::SENDER_HOSTED:
        return import(descriptor.getSenderHosted(), false);
      case rpc::CapDescriptor::SENDER_PROMISE:
        return import(descriptor.getSenderPromise(), true);
1252

1253 1254 1255 1256
      case rpc::CapDescriptor::RECEIVER_HOSTED:
        KJ_IF_MAYBE(exp, exports.find(descriptor.getReceiverHosted())) {
          return exp->clientHook->addRef();
        } else {
1257 1258 1259
          return newBrokenCap("invalid 'receiverHosted' export ID");
        }

1260 1261
      case rpc::CapDescriptor::RECEIVER_ANSWER: {
        auto promisedAnswer = descriptor.getReceiverAnswer();
1262

1263 1264 1265 1266 1267 1268 1269
        KJ_IF_MAYBE(answer, answers.find(promisedAnswer.getQuestionId())) {
          if (answer->active) {
            KJ_IF_MAYBE(pipeline, answer->pipeline) {
              KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
                return pipeline->get()->getPipelinedCap(*ops);
              } else {
                return newBrokenCap("unrecognized pipeline ops");
Kenton Varda's avatar
Kenton Varda committed
1270 1271
              }
            }
1272
          }
1273 1274
        }

1275
        return newBrokenCap("invalid 'receiverAnswer'");
1276 1277
      }

1278 1279 1280
      case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
        // We don't support third-party caps, so use the vine instead.
        return import(descriptor.getThirdPartyHosted().getVineId(), false);
Kenton Varda's avatar
Kenton Varda committed
1281

1282 1283 1284
      default:
        KJ_FAIL_REQUIRE("unknown CapDescriptor type") { break; }
        return newBrokenCap("unknown CapDescriptor type");
Kenton Varda's avatar
Kenton Varda committed
1285
    }
1286
  }
1287

1288 1289
  kj::Array<kj::Maybe<kj::Own<ClientHook>>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable) {
    auto result = kj::heapArrayBuilder<kj::Maybe<kj::Own<ClientHook>>>(capTable.size());
1290 1291
    for (auto cap: capTable) {
      result.add(receiveCap(cap));
Kenton Varda's avatar
Kenton Varda committed
1292
    }
1293 1294
    return result.finish();
  }
1295 1296

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
1297
  // RequestHook/PipelineHook/ResponseHook implementations
1298

Kenton Varda's avatar
Kenton Varda committed
1299 1300 1301 1302
  class QuestionRef: public kj::Refcounted {
    // A reference to an entry on the question table.  Used to detect when the `Finish` message
    // can be sent.

Kenton Varda's avatar
Kenton Varda committed
1303
  public:
1304
    inline QuestionRef(
1305 1306 1307
        RpcConnectionState& connectionState, QuestionId id,
        kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller)
        : connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1308 1309

    ~QuestionRef() {
1310
      unwindDetector.catchExceptionsIfUnwinding([&]() {
1311 1312 1313
        auto& question = KJ_ASSERT_NONNULL(
            connectionState->questions.find(id), "Question ID no longer on table?");

1314
        // Send the "Finish" message (if the connection is not already broken).
1315 1316
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1317
              messageSizeHint<rpc::Finish>());
1318 1319
          auto builder = message->getBody().getAs<rpc::Message>().initFinish();
          builder.setQuestionId(id);
1320 1321 1322
          // If we're still awaiting a return, then this request is being canceled, and we're going
          // to ignore any capabilities in the return message, so set releaseResultCaps true. If we
          // already received the return, then we've already built local proxies for the caps and
David Renshaw's avatar
David Renshaw committed
1323
          // will send Release messages when those are destroyed.
1324
          builder.setReleaseResultCaps(question.isAwaitingReturn);
1325
          message->send();
1326
        }
Kenton Varda's avatar
Kenton Varda committed
1327

1328 1329 1330
        // Check if the question has returned and, if so, remove it from the table.
        // Remove question ID from the table.  Must do this *after* sending `Finish` to ensure that
        // the ID is not re-allocated before the `Finish` message can be sent.
1331 1332
        if (question.isAwaitingReturn) {
          // Still waiting for return, so just remove the QuestionRef pointer from the table.
1333
          question.selfRef = nullptr;
1334 1335 1336
        } else {
          // Call has already returned, so we can now remove it from the table.
          connectionState->questions.erase(id, question);
1337 1338
        }
      });
Kenton Varda's avatar
Kenton Varda committed
1339 1340 1341 1342
    }

    inline QuestionId getId() const { return id; }

1343
    void fulfill(kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1344 1345 1346
      fulfiller->fulfill(kj::mv(response));
    }

1347
    void fulfill(kj::Promise<kj::Own<RpcResponse>>&& promise) {
1348 1349 1350
      fulfiller->fulfill(kj::mv(promise));
    }

Kenton Varda's avatar
Kenton Varda committed
1351 1352
    void reject(kj::Exception&& exception) {
      fulfiller->reject(kj::mv(exception));
1353
    }
Kenton Varda's avatar
Kenton Varda committed
1354 1355

  private:
1356
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1357
    QuestionId id;
1358
    kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller;
1359
    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
1360 1361
  };

Kenton Varda's avatar
Kenton Varda committed
1362
  class RpcRequest final: public RequestHook {
1363
  public:
1364 1365
    RpcRequest(RpcConnectionState& connectionState, VatNetworkBase::Connection& connection,
               kj::Maybe<MessageSize> sizeHint, kj::Own<RpcClient>&& target)
1366
        : connectionState(kj::addRef(connectionState)),
Kenton Varda's avatar
Kenton Varda committed
1367
          target(kj::mv(target)),
1368
          message(connection.newOutgoingMessage(
1369 1370
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Call>() +
                  sizeInWords<rpc::Payload>() + MESSAGE_TARGET_SIZE_HINT))),
Kenton Varda's avatar
Kenton Varda committed
1371
          callBuilder(message->getBody().getAs<rpc::Message>().initCall()),
1372
          paramsBuilder(capTable.imbue(callBuilder.getParams().getContent())) {}
Kenton Varda's avatar
Kenton Varda committed
1373

1374
    inline AnyPointer::Builder getRoot() {
Kenton Varda's avatar
Kenton Varda committed
1375 1376
      return paramsBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1377 1378 1379
    inline rpc::Call::Builder getCall() {
      return callBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1380

1381
    RemotePromise<AnyPointer> send() override {
1382
      if (!connectionState->connection.is<Connected>()) {
1383
        // Connection is broken.
1384
        const kj::Exception& e = connectionState->connection.get<Disconnected>();
1385
        return RemotePromise<AnyPointer>(
1386 1387
            kj::Promise<Response<AnyPointer>>(kj::cp(e)),
            AnyPointer::Pipeline(newBrokenPipeline(kj::cp(e))));
1388
      }
Kenton Varda's avatar
Kenton Varda committed
1389

1390 1391 1392
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // We'll have to make a new request and do a copy.  Ick.
Kenton Varda's avatar
Kenton Varda committed
1393

1394
        auto replacement = redirect->get()->newCall(
1395
            callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize());
1396 1397 1398
        replacement.set(paramsBuilder);
        return replacement.send();
      } else {
1399
        auto sendResult = sendInternal(false);
Kenton Varda's avatar
Kenton Varda committed
1400

1401
        auto forkedPromise = sendResult.promise.fork();
1402

1403 1404 1405
        // The pipeline must get notified of resolution before the app does to maintain ordering.
        auto pipeline = kj::refcounted<RpcPipeline>(
            *connectionState, kj::mv(sendResult.questionRef), forkedPromise.addBranch());
Kenton Varda's avatar
Kenton Varda committed
1406

1407 1408 1409 1410 1411
        auto appPromise = forkedPromise.addBranch().then(
            [=](kj::Own<RpcResponse>&& response) {
              auto reader = response->getResults();
              return Response<AnyPointer>(reader, kj::mv(response));
            });
Kenton Varda's avatar
Kenton Varda committed
1412

1413 1414 1415 1416
        return RemotePromise<AnyPointer>(
            kj::mv(appPromise),
            AnyPointer::Pipeline(kj::mv(pipeline)));
      }
Kenton Varda's avatar
Kenton Varda committed
1417 1418
    }

1419 1420 1421
    struct TailInfo {
      QuestionId questionId;
      kj::Promise<void> promise;
1422
      kj::Own<PipelineHook> pipeline;
1423 1424 1425 1426 1427 1428 1429 1430 1431 1432
    };

    kj::Maybe<TailInfo> tailSend() {
      // Send the request as a tail call.
      //
      // Returns null if for some reason a tail call is not possible and the caller should fall
      // back to using send() and copying the response.

      SendInternalResult sendResult;

1433
      if (!connectionState->connection.is<Connected>()) {
1434 1435 1436
        // Disconnected; fall back to a regular send() which will fail appropriately.
        return nullptr;
      }
1437

1438 1439 1440 1441 1442 1443
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // Fall back to regular send().
        return nullptr;
      } else {
        sendResult = sendInternal(true);
1444 1445
      }

1446
      auto promise = sendResult.promise.then([](kj::Own<RpcResponse>&& response) {
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457
        // Response should be null if `Return` handling code is correct.
        KJ_ASSERT(!response) { break; }
      });

      QuestionId questionId = sendResult.questionRef->getId();

      auto pipeline = kj::refcounted<RpcPipeline>(*connectionState, kj::mv(sendResult.questionRef));

      return TailInfo { questionId, kj::mv(promise), kj::mv(pipeline) };
    }

1458
    const void* getBrand() override {
1459 1460 1461
      return connectionState.get();
    }

Kenton Varda's avatar
Kenton Varda committed
1462
  private:
1463
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1464

1465
    kj::Own<RpcClient> target;
Kenton Varda's avatar
Kenton Varda committed
1466
    kj::Own<OutgoingRpcMessage> message;
1467
    BuilderCapabilityTable capTable;
Kenton Varda's avatar
Kenton Varda committed
1468
    rpc::Call::Builder callBuilder;
1469
    AnyPointer::Builder paramsBuilder;
1470 1471 1472

    struct SendInternalResult {
      kj::Own<QuestionRef> questionRef;
1473
      kj::Promise<kj::Own<RpcResponse>> promise = nullptr;
1474 1475
    };

1476
    SendInternalResult sendInternal(bool isTailCall) {
1477 1478
      // Build the cap table.
      auto exports = connectionState->writeDescriptors(
1479
          capTable.getTable(), callBuilder.getParams());
1480

1481
      // Init the question table.  Do this after writing descriptors to avoid interference.
1482
      QuestionId questionId;
1483
      auto& question = connectionState->questions.next(questionId);
1484 1485 1486
      question.isAwaitingReturn = true;
      question.paramExports = kj::mv(exports);
      question.isTailCall = isTailCall;
1487

1488
      // Finish and send.
1489 1490 1491 1492
      callBuilder.setQuestionId(questionId);
      if (isTailCall) {
        callBuilder.getSendResultsTo().setYourself();
      }
1493
      message->send();
1494

1495
      // Make the result promise.
1496
      SendInternalResult result;
1497
      auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
1498
      result.questionRef = kj::refcounted<QuestionRef>(
1499
          *connectionState, questionId, kj::mv(paf.fulfiller));
1500
      question.selfRef = *result.questionRef;
1501
      result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
1502

1503
      // Send and return.
1504 1505
      return kj::mv(result);
    }
Kenton Varda's avatar
Kenton Varda committed
1506 1507
  };

Kenton Varda's avatar
Kenton Varda committed
1508
  class RpcPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1509
  public:
1510 1511
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef,
                kj::Promise<kj::Own<RpcResponse>>&& redirectLaterParam)
1512
        : connectionState(kj::addRef(connectionState)),
1513 1514 1515
          redirectLater(redirectLaterParam.fork()),
          resolveSelfPromise(KJ_ASSERT_NONNULL(redirectLater).addBranch().then(
              [this](kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1516
                resolve(kj::mv(response));
1517
              }, [this](kj::Exception&& exception) {
Kenton Varda's avatar
Kenton Varda committed
1518
                resolve(kj::mv(exception));
1519 1520 1521 1522
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
1523 1524 1525
              })) {
      // Construct a new RpcPipeline.

1526
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1527
    }
Kenton Varda's avatar
Kenton Varda committed
1528

1529
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef)
1530 1531 1532 1533
        : connectionState(kj::addRef(connectionState)),
          resolveSelfPromise(nullptr) {
      // Construct a new RpcPipeline that is never expected to resolve.

1534
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1535
    }
Kenton Varda's avatar
Kenton Varda committed
1536 1537 1538

    // implements PipelineHook ---------------------------------------

1539
    kj::Own<PipelineHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1540 1541 1542
      return kj::addRef(*this);
    }

1543
    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
Kenton Varda's avatar
Kenton Varda committed
1544 1545 1546 1547 1548 1549 1550
      auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
      for (auto& op: ops) {
        copy.add(op);
      }
      return getPipelinedCap(copy.finish());
    }

1551
    kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override {
1552
      if (state.is<Waiting>()) {
1553 1554
        // Wrap a PipelineClient in a PromiseClient.
        auto pipelineClient = kj::refcounted<PipelineClient>(
1555
            *connectionState, kj::addRef(*state.get<Waiting>()), kj::heapArray(ops.asPtr()));
1556

1557
        KJ_IF_MAYBE(r, redirectLater) {
1558 1559 1560 1561
          auto resolutionPromise = r->addBranch().then(kj::mvCapture(ops,
              [](kj::Array<PipelineOp> ops, kj::Own<RpcResponse>&& response) {
                return response->getResults().getPipelinedCap(ops);
              }));
1562

1563 1564 1565 1566 1567 1568
          return kj::refcounted<PromiseClient>(
              *connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise), nullptr);
        } else {
          // Oh, this pipeline will never get redirected, so just return the PipelineClient.
          return kj::mv(pipelineClient);
        }
1569 1570
      } else if (state.is<Resolved>()) {
        return state.get<Resolved>()->getResults().getPipelinedCap(ops);
Kenton Varda's avatar
Kenton Varda committed
1571
      } else {
1572
        return newBrokenCap(kj::cp(state.get<Broken>()));
Kenton Varda's avatar
Kenton Varda committed
1573
      }
Kenton Varda's avatar
Kenton Varda committed
1574 1575 1576
    }

  private:
1577 1578
    kj::Own<RpcConnectionState> connectionState;
    kj::Maybe<kj::ForkedPromise<kj::Own<RpcResponse>>> redirectLater;
Kenton Varda's avatar
Kenton Varda committed
1579

1580 1581
    typedef kj::Own<QuestionRef> Waiting;
    typedef kj::Own<RpcResponse> Resolved;
Kenton Varda's avatar
Kenton Varda committed
1582
    typedef kj::Exception Broken;
1583
    kj::OneOf<Waiting, Resolved, Broken> state;
Kenton Varda's avatar
Kenton Varda committed
1584 1585 1586 1587 1588

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

1589
    void resolve(kj::Own<RpcResponse>&& response) {
1590 1591
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Resolved>(kj::mv(response));
Kenton Varda's avatar
Kenton Varda committed
1592 1593 1594
    }

    void resolve(const kj::Exception&& exception) {
1595 1596
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Broken>(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1597
    }
Kenton Varda's avatar
Kenton Varda committed
1598 1599
  };

1600 1601
  class RpcResponse: public ResponseHook {
  public:
1602
    virtual AnyPointer::Reader getResults() = 0;
1603
    virtual kj::Own<RpcResponse> addRef() = 0;
1604 1605 1606
  };

  class RpcResponseImpl final: public RpcResponse, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1607
  public:
1608
    RpcResponseImpl(RpcConnectionState& connectionState,
1609 1610
                    kj::Own<QuestionRef>&& questionRef,
                    kj::Own<IncomingRpcMessage>&& message,
1611
                    kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTableArray,
1612
                    AnyPointer::Reader results)
1613 1614
        : connectionState(kj::addRef(connectionState)),
          message(kj::mv(message)),
1615 1616
          capTable(kj::mv(capTableArray)),
          reader(capTable.imbue(results)),
Kenton Varda's avatar
Kenton Varda committed
1617
          questionRef(kj::mv(questionRef)) {}
Kenton Varda's avatar
Kenton Varda committed
1618

1619
    AnyPointer::Reader getResults() override {
Kenton Varda's avatar
Kenton Varda committed
1620 1621 1622
      return reader;
    }

1623
    kj::Own<RpcResponse> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1624 1625 1626
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
1627
  private:
1628
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1629
    kj::Own<IncomingRpcMessage> message;
1630
    ReaderCapabilityTable capTable;
1631
    AnyPointer::Reader reader;
1632
    kj::Own<QuestionRef> questionRef;
Kenton Varda's avatar
Kenton Varda committed
1633 1634 1635 1636 1637 1638
  };

  // =====================================================================================
  // CallContextHook implementation

  class RpcServerResponse {
Kenton Varda's avatar
Kenton Varda committed
1639
  public:
1640
    virtual AnyPointer::Builder getResultsBuilder() = 0;
1641 1642 1643 1644
  };

  class RpcServerResponseImpl final: public RpcServerResponse {
  public:
1645
    RpcServerResponseImpl(RpcConnectionState& connectionState,
1646
                          kj::Own<OutgoingRpcMessage>&& message,
1647 1648 1649
                          rpc::Payload::Builder payload)
        : connectionState(connectionState),
          message(kj::mv(message)),
1650
          payload(payload) {}
Kenton Varda's avatar
Kenton Varda committed
1651

1652
    AnyPointer::Builder getResultsBuilder() override {
1653
      return capTable.imbue(payload.getContent());
Kenton Varda's avatar
Kenton Varda committed
1654 1655
    }

1656 1657 1658 1659 1660
    kj::Maybe<kj::Array<ExportId>> send() {
      // Send the response and return the export list.  Returns nullptr if there were no caps.
      // (Could return a non-null empty array if there were caps but none of them were exports.)

      // Build the cap table.
1661
      auto capTable = this->capTable.getTable();
1662 1663
      auto exports = connectionState.writeDescriptors(capTable, payload);

1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674
      // Capabilities that we are returning are subject to embargos. See `Disembargo` in rpc.capnp.
      // As explained there, in order to deal with the Tribble 4-way race condition, we need to
      // make sure that if we're returning any remote promises, that we ignore any subsequent
      // resolution of those promises for the purpose of pipelined requests on this answer. Luckily,
      // we can modify the cap table in-place.
      for (auto& slot: capTable) {
        KJ_IF_MAYBE(cap, slot) {
          slot = connectionState.getInnermostClient(**cap);
        }
      }

Kenton Varda's avatar
Kenton Varda committed
1675
      message->send();
1676 1677 1678 1679 1680
      if (capTable.size() == 0) {
        return nullptr;
      } else {
        return kj::mv(exports);
      }
Kenton Varda's avatar
Kenton Varda committed
1681 1682 1683
    }

  private:
1684
    RpcConnectionState& connectionState;
Kenton Varda's avatar
Kenton Varda committed
1685
    kj::Own<OutgoingRpcMessage> message;
1686
    BuilderCapabilityTable capTable;
1687
    rpc::Payload::Builder payload;
Kenton Varda's avatar
Kenton Varda committed
1688 1689
  };

1690 1691 1692
  class LocallyRedirectedRpcResponse final
      : public RpcResponse, public RpcServerResponse, public kj::Refcounted{
  public:
1693
    LocallyRedirectedRpcResponse(kj::Maybe<MessageSize> sizeHint)
1694 1695
        : message(sizeHint.map([](MessageSize size) { return size.wordCount; })
                          .orDefault(SUGGESTED_FIRST_SEGMENT_WORDS)) {}
1696

1697
    AnyPointer::Builder getResultsBuilder() override {
1698
      return message.getRoot<AnyPointer>();
1699 1700
    }

1701
    AnyPointer::Reader getResults() override {
1702
      return message.getRoot<AnyPointer>();
1703 1704
    }

1705
    kj::Own<RpcResponse> addRef() override {
1706 1707 1708 1709
      return kj::addRef(*this);
    }

  private:
1710
    MallocMessageBuilder message;
1711 1712
  };

Kenton Varda's avatar
Kenton Varda committed
1713 1714
  class RpcCallContext final: public CallContextHook, public kj::Refcounted {
  public:
1715
    RpcCallContext(RpcConnectionState& connectionState, AnswerId answerId,
1716 1717 1718
                   kj::Own<IncomingRpcMessage>&& request,
                   kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTableArray,
                   const AnyPointer::Reader& params,
1719
                   bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller)
1720
        : connectionState(kj::addRef(connectionState)),
1721
          answerId(answerId),
1722
          requestSize(request->getBody().targetSize().wordCount),
Kenton Varda's avatar
Kenton Varda committed
1723
          request(kj::mv(request)),
1724 1725
          paramsCapTable(kj::mv(capTableArray)),
          params(paramsCapTable.imbue(params)),
1726
          returnMessage(nullptr),
Kenton Varda's avatar
Kenton Varda committed
1727
          redirectResults(redirectResults),
1728 1729 1730
          cancelFulfiller(kj::mv(cancelFulfiller)) {
      connectionState.callWordsInFlight += requestSize;
    }
Kenton Varda's avatar
Kenton Varda committed
1731

1732 1733 1734 1735
    ~RpcCallContext() noexcept(false) {
      if (isFirstResponder()) {
        // We haven't sent a return yet, so we must have been canceled.  Send a cancellation return.
        unwindDetector.catchExceptionsIfUnwinding([&]() {
Kenton Varda's avatar
Kenton Varda committed
1736
          // Don't send anything if the connection is broken.
1737 1738
          if (connectionState->connection.is<Connected>()) {
            auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1739
                messageSizeHint<rpc::Return>() + sizeInWords<rpc::Payload>());
Kenton Varda's avatar
Kenton Varda committed
1740
            auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1741

1742
            builder.setAnswerId(answerId);
1743
            builder.setReleaseParamCaps(false);
1744

Kenton Varda's avatar
Kenton Varda committed
1745 1746 1747 1748 1749 1750 1751 1752 1753
            if (redirectResults) {
              // The reason we haven't sent a return is because the results were sent somewhere
              // else.
              builder.setResultsSentElsewhere();
            } else {
              builder.setCanceled();
            }

            message->send();
1754
          }
1755

1756
          cleanupAnswerTable(nullptr, true);
1757 1758 1759 1760
        });
      }
    }

1761
    kj::Own<RpcResponse> consumeRedirectedResponse() {
1762 1763
      KJ_ASSERT(redirectResults);

1764
      if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
1765 1766 1767 1768 1769 1770

      // Note that the context needs to keep its own reference to the response so that it doesn't
      // get GC'd until the PipelineHook drops its reference to the context.
      return kj::downcast<LocallyRedirectedRpcResponse>(*KJ_ASSERT_NONNULL(response)).addRef();
    }

Kenton Varda's avatar
Kenton Varda committed
1771
    void sendReturn() {
1772
      KJ_ASSERT(!redirectResults);
1773 1774 1775 1776

      // Avoid sending results if canceled so that we don't have to figure out whether or not
      // `releaseResultCaps` was set in the already-received `Finish`.
      if (!(cancellationFlags & CANCEL_REQUESTED) && isFirstResponder()) {
1777 1778 1779 1780 1781
        KJ_ASSERT(connectionState->connection.is<Connected>(),
                  "Cancellation should have been requested on disconnect.") {
          return;
        }

1782
        if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
Kenton Varda's avatar
Kenton Varda committed
1783

1784
        returnMessage.setAnswerId(answerId);
1785
        returnMessage.setReleaseParamCaps(false);
Kenton Varda's avatar
Kenton Varda committed
1786

1787 1788 1789 1790 1791 1792 1793 1794
        auto exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send();
        KJ_IF_MAYBE(e, exports) {
          // Caps were returned, so we can't free the pipeline yet.
          cleanupAnswerTable(kj::mv(*e), false);
        } else {
          // No caps in the results, therefore the pipeline is irrelevant.
          cleanupAnswerTable(nullptr, true);
        }
Kenton Varda's avatar
Kenton Varda committed
1795 1796 1797
      }
    }
    void sendErrorReturn(kj::Exception&& exception) {
1798
      KJ_ASSERT(!redirectResults);
Kenton Varda's avatar
Kenton Varda committed
1799
      if (isFirstResponder()) {
1800 1801 1802 1803
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
              messageSizeHint<rpc::Return>() + exceptionSizeHint(exception));
          auto builder = message->getBody().initAs<rpc::Message>().initReturn();
Kenton Varda's avatar
Kenton Varda committed
1804

1805 1806 1807
          builder.setAnswerId(answerId);
          builder.setReleaseParamCaps(false);
          fromException(exception, builder.initException());
Kenton Varda's avatar
Kenton Varda committed
1808

1809 1810
          message->send();
        }
1811 1812 1813 1814

        // Do not allow releasing the pipeline because we want pipelined calls to propagate the
        // exception rather than fail with a "no such field" exception.
        cleanupAnswerTable(nullptr, false);
Kenton Varda's avatar
Kenton Varda committed
1815 1816 1817
      }
    }

1818
    void requestCancel() {
Kenton Varda's avatar
Kenton Varda committed
1819 1820 1821 1822 1823 1824
      // Hints that the caller wishes to cancel this call.  At the next time when cancellation is
      // deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes
      // safe, the RpcCallContext will send a normal return when the call completes.  Either way
      // the RpcCallContext is now responsible for cleaning up the entry in the answer table, since
      // a Finish message was already received.

1825 1826 1827 1828
      bool previouslyAllowedButNotRequested = cancellationFlags == CANCEL_ALLOWED;
      cancellationFlags |= CANCEL_REQUESTED;

      if (previouslyAllowedButNotRequested) {
Kenton Varda's avatar
Kenton Varda committed
1829
        // We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously.  Initiate
Kenton Varda's avatar
Kenton Varda committed
1830
        // the cancellation.
Kenton Varda's avatar
Kenton Varda committed
1831
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1832
      }
Kenton Varda's avatar
Kenton Varda committed
1833
    }
1834 1835 1836

    // implements CallContextHook ------------------------------------

1837
    AnyPointer::Reader getParams() override {
1838 1839 1840 1841 1842 1843
      KJ_REQUIRE(request != nullptr, "Can't call getParams() after releaseParams().");
      return params;
    }
    void releaseParams() override {
      request = nullptr;
    }
1844
    AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
1845
      KJ_IF_MAYBE(r, response) {
1846
        return r->get()->getResultsBuilder();
1847
      } else {
1848 1849
        kj::Own<RpcServerResponse> response;

1850
        if (redirectResults || !connectionState->connection.is<Connected>()) {
1851
          response = kj::refcounted<LocallyRedirectedRpcResponse>(sizeHint);
1852
        } else {
1853
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1854 1855
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Return>() +
                               sizeInWords<rpc::Payload>()));
1856 1857 1858 1859 1860 1861
          returnMessage = message->getBody().initAs<rpc::Message>().initReturn();
          response = kj::heap<RpcServerResponseImpl>(
              *connectionState, kj::mv(message), returnMessage.getResults());
        }

        auto results = response->getResultsBuilder();
Kenton Varda's avatar
Kenton Varda committed
1862 1863
        this->response = kj::mv(response);
        return results;
1864 1865
      }
    }
1866 1867 1868
    kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
      auto result = directTailCall(kj::mv(request));
      KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
1869
        f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
1870 1871 1872 1873
      }
      return kj::mv(result.promise);
    }
    ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
1874 1875 1876
      KJ_REQUIRE(response == nullptr,
                 "Can't call tailCall() after initializing the results struct.");

1877
      if (request->getBrand() == connectionState.get() && !redirectResults) {
1878 1879 1880 1881 1882
        // The tail call is headed towards the peer that called us in the first place, so we can
        // optimize out the return trip.

        KJ_IF_MAYBE(tailInfo, kj::downcast<RpcRequest>(*request).tailSend()) {
          if (isFirstResponder()) {
1883 1884 1885 1886
            if (connectionState->connection.is<Connected>()) {
              auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
                  messageSizeHint<rpc::Return>());
              auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1887

1888 1889 1890
              builder.setAnswerId(answerId);
              builder.setReleaseParamCaps(false);
              builder.setTakeFromOtherQuestion(tailInfo->questionId);
1891

1892 1893
              message->send();
            }
1894 1895 1896

            // There are no caps in our return message, but of course the tail results could have
            // caps, so we must continue to honor pipeline calls (and just bounce them back).
1897
            cleanupAnswerTable(nullptr, false);
1898
          }
1899
          return { kj::mv(tailInfo->promise), kj::mv(tailInfo->pipeline) };
1900 1901 1902 1903 1904 1905 1906
        }
      }

      // Just forwarding to another local call.
      auto promise = request->send();

      // Wait for response.
1907
      auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
1908 1909 1910
        // Copy the response.
        // TODO(perf):  It would be nice if we could somehow make the response get built in-place
        //   but requires some refactoring.
1911
        getResults(tailResponse.targetSize()).set(tailResponse);
1912
      });
1913 1914

      return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
1915
    }
1916 1917
    kj::Promise<AnyPointer::Pipeline> onTailCall() override {
      auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
1918 1919 1920
      tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
      return kj::mv(paf.promise);
    }
1921
    void allowCancellation() override {
1922 1923 1924 1925
      bool previouslyRequestedButNotAllowed = cancellationFlags == CANCEL_REQUESTED;
      cancellationFlags |= CANCEL_ALLOWED;

      if (previouslyRequestedButNotAllowed) {
Kenton Varda's avatar
Kenton Varda committed
1926 1927 1928
        // We just set CANCEL_ALLOWED, and CANCEL_REQUESTED was already set previously.  Initiate
        // the cancellation.
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1929
      }
1930 1931 1932 1933 1934 1935
    }
    kj::Own<CallContextHook> addRef() override {
      return kj::addRef(*this);
    }

  private:
1936
    kj::Own<RpcConnectionState> connectionState;
1937
    AnswerId answerId;
1938

Kenton Varda's avatar
Kenton Varda committed
1939 1940
    // Request ---------------------------------------------

1941
    size_t requestSize;  // for flow limit purposes
Kenton Varda's avatar
Kenton Varda committed
1942
    kj::Maybe<kj::Own<IncomingRpcMessage>> request;
1943
    ReaderCapabilityTable paramsCapTable;
1944
    AnyPointer::Reader params;
Kenton Varda's avatar
Kenton Varda committed
1945

Kenton Varda's avatar
Kenton Varda committed
1946 1947 1948
    // Response --------------------------------------------

    kj::Maybe<kj::Own<RpcServerResponse>> response;
Kenton Varda's avatar
Kenton Varda committed
1949
    rpc::Return::Builder returnMessage;
1950
    bool redirectResults = false;
Kenton Varda's avatar
Kenton Varda committed
1951
    bool responseSent = false;
1952
    kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1953 1954 1955 1956 1957 1958 1959 1960

    // Cancellation state ----------------------------------

    enum CancellationFlags {
      CANCEL_REQUESTED = 1,
      CANCEL_ALLOWED = 2
    };

1961
    uint8_t cancellationFlags = 0;
1962
    // When both flags are set, the cancellation process will begin.
Kenton Varda's avatar
Kenton Varda committed
1963

1964
    kj::Own<kj::PromiseFulfiller<void>> cancelFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1965 1966 1967
    // Fulfilled when cancellation has been both requested and permitted.  The fulfilled promise is
    // exclusive-joined with the outermost promise waiting on the call return, so fulfilling it
    // cancels that promise.
Kenton Varda's avatar
Kenton Varda committed
1968

1969 1970
    kj::UnwindDetector unwindDetector;

Kenton Varda's avatar
Kenton Varda committed
1971 1972 1973 1974 1975 1976 1977
    // -----------------------------------------------------

    bool isFirstResponder() {
      if (responseSent) {
        return false;
      } else {
        responseSent = true;
1978 1979 1980
        return true;
      }
    }
Kenton Varda's avatar
Kenton Varda committed
1981

1982
    void cleanupAnswerTable(kj::Array<ExportId> resultExports, bool shouldFreePipeline) {
1983 1984 1985
      // We need to remove the `callContext` pointer -- which points back to us -- from the
      // answer table.  Or we might even be responsible for removing the entire answer table
      // entry.
Kenton Varda's avatar
Kenton Varda committed
1986

1987
      if (cancellationFlags & CANCEL_REQUESTED) {
1988 1989 1990
        // Already received `Finish` so it's our job to erase the table entry. We shouldn't have
        // sent results if canceled, so we shouldn't have an export list to deal with.
        KJ_ASSERT(resultExports.size() == 0);
1991
        connectionState->answers.erase(answerId);
1992
      } else {
1993
        // We just have to null out callContext and set the exports.
1994
        auto& answer = connectionState->answers[answerId];
1995
        answer.callContext = nullptr;
1996 1997 1998 1999 2000 2001
        answer.resultExports = kj::mv(resultExports);

        if (shouldFreePipeline) {
          // We can free the pipeline early, because we know all pipeline calls are invalid (e.g.
          // because there are no caps in the result to receive pipeline requests).
          KJ_ASSERT(resultExports.size() == 0);
Kenton Varda's avatar
Kenton Varda committed
2002
          answer.pipeline = nullptr;
Kenton Varda's avatar
Kenton Varda committed
2003 2004
        }
      }
2005 2006 2007 2008

      // Also, this is the right time to stop counting the call against the flow limit.
      connectionState->callWordsInFlight -= requestSize;
      connectionState->maybeUnblockFlow();
Kenton Varda's avatar
Kenton Varda committed
2009
    }
2010 2011
  };

Kenton Varda's avatar
Kenton Varda committed
2012 2013 2014
  // =====================================================================================
  // Message handling

2015 2016 2017 2018 2019 2020 2021 2022 2023
  void maybeUnblockFlow() {
    if (callWordsInFlight < flowLimit) {
      KJ_IF_MAYBE(w, flowWaiter) {
        w->get()->fulfill();
        flowWaiter = nullptr;
      }
    }
  }

2024
  kj::Promise<void> messageLoop() {
2025 2026 2027 2028
    if (!connection.is<Connected>()) {
      return kj::READY_NOW;
    }

2029 2030 2031 2032 2033 2034 2035 2036
    if (callWordsInFlight > flowLimit) {
      auto paf = kj::newPromiseAndFulfiller<void>();
      flowWaiter = kj::mv(paf.fulfiller);
      return paf.promise.then([this]() {
        return messageLoop();
      });
    }

2037
    return connection.get<Connected>()->receiveIncomingMessage().then(
2038
        [this](kj::Maybe<kj::Own<IncomingRpcMessage>>&& message) {
2039 2040
      KJ_IF_MAYBE(m, message) {
        handleMessage(kj::mv(*m));
2041
        return true;
2042
      } else {
2043
        disconnect(KJ_EXCEPTION(DISCONNECTED, "Peer disconnected."));
2044
        return false;
2045
      }
2046
    }).then([this](bool keepGoing) {
2047 2048 2049 2050
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
2051
      if (keepGoing) tasks.add(messageLoop());
2052
    });
2053 2054 2055 2056
  }

  void handleMessage(kj::Own<IncomingRpcMessage> message) {
    auto reader = message->getBody().getAs<rpc::Message>();
2057

2058 2059
    switch (reader.which()) {
      case rpc::Message::UNIMPLEMENTED:
Kenton Varda's avatar
Kenton Varda committed
2060
        handleUnimplemented(reader.getUnimplemented());
2061 2062 2063
        break;

      case rpc::Message::ABORT:
Kenton Varda's avatar
Kenton Varda committed
2064
        handleAbort(reader.getAbort());
2065 2066
        break;

2067 2068 2069 2070
      case rpc::Message::BOOTSTRAP:
        handleBootstrap(kj::mv(message), reader.getBootstrap());
        break;

2071
      case rpc::Message::CALL:
Kenton Varda's avatar
Kenton Varda committed
2072
        handleCall(kj::mv(message), reader.getCall());
2073 2074
        break;

Kenton Varda's avatar
Kenton Varda committed
2075
      case rpc::Message::RETURN:
Kenton Varda's avatar
Kenton Varda committed
2076
        handleReturn(kj::mv(message), reader.getReturn());
Kenton Varda's avatar
Kenton Varda committed
2077 2078 2079
        break;

      case rpc::Message::FINISH:
Kenton Varda's avatar
Kenton Varda committed
2080
        handleFinish(reader.getFinish());
Kenton Varda's avatar
Kenton Varda committed
2081 2082
        break;

Kenton Varda's avatar
Kenton Varda committed
2083
      case rpc::Message::RESOLVE:
2084
        handleResolve(reader.getResolve());
Kenton Varda's avatar
Kenton Varda committed
2085 2086 2087
        break;

      case rpc::Message::RELEASE:
2088
        handleRelease(reader.getRelease());
Kenton Varda's avatar
Kenton Varda committed
2089 2090
        break;

2091
      case rpc::Message::DISEMBARGO:
Kenton Varda's avatar
Kenton Varda committed
2092
        handleDisembargo(reader.getDisembargo());
2093 2094
        break;

2095
      default: {
2096 2097 2098 2099 2100 2101
        if (connection.is<Connected>()) {
          auto message = connection.get<Connected>()->newOutgoingMessage(
              firstSegmentSize(reader.totalSize(), messageSizeHint<void>()));
          message->getBody().initAs<rpc::Message>().setUnimplemented(reader);
          message->send();
        }
2102 2103 2104 2105 2106
        break;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2107
  void handleUnimplemented(const rpc::Message::Reader& message) {
Kenton Varda's avatar
Kenton Varda committed
2108
    switch (message.which()) {
2109
      case rpc::Message::RESOLVE: {
2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131
        auto resolve = message.getResolve();
        switch (resolve.which()) {
          case rpc::Resolve::CAP: {
            auto cap = resolve.getCap();
            switch (cap.which()) {
              case rpc::CapDescriptor::NONE:
                // Nothing to do (but this ought never to happen).
                break;
              case rpc::CapDescriptor::SENDER_HOSTED:
                releaseExport(cap.getSenderHosted(), 1);
                break;
              case rpc::CapDescriptor::SENDER_PROMISE:
                releaseExport(cap.getSenderPromise(), 1);
                break;
              case rpc::CapDescriptor::RECEIVER_ANSWER:
              case rpc::CapDescriptor::RECEIVER_HOSTED:
                // Nothing to do.
                break;
              case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
                releaseExport(cap.getThirdPartyHosted().getVineId(), 1);
                break;
            }
2132
            break;
2133 2134
          }
          case rpc::Resolve::EXCEPTION:
2135 2136 2137
            // Nothing to do.
            break;
        }
Kenton Varda's avatar
Kenton Varda committed
2138
        break;
2139
      }
Kenton Varda's avatar
Kenton Varda committed
2140 2141 2142 2143 2144

      default:
        KJ_FAIL_ASSERT("Peer did not implement required RPC message type.", (uint)message.which());
        break;
    }
2145 2146
  }

Kenton Varda's avatar
Kenton Varda committed
2147
  void handleAbort(const rpc::Exception::Reader& exception) {
2148 2149 2150
    kj::throwRecoverableException(toException(exception));
  }

2151 2152 2153
  // ---------------------------------------------------------------------------
  // Level 0

2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167 2168 2169 2170 2171 2172 2173 2174 2175 2176 2177 2178 2179 2180 2181 2182 2183
  class SingleCapPipeline: public PipelineHook, public kj::Refcounted {
  public:
    SingleCapPipeline(kj::Own<ClientHook>&& cap)
        : cap(kj::mv(cap)) {}

    kj::Own<PipelineHook> addRef() override {
      return kj::addRef(*this);
    }

    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
      if (ops.size() == 0) {
        return cap->addRef();
      } else {
        return newBrokenCap("Invalid pipeline transform.");
      }
    }

  private:
    kj::Own<ClientHook> cap;
  };

  void handleBootstrap(kj::Own<IncomingRpcMessage>&& message,
                       const rpc::Bootstrap::Reader& bootstrap) {
    AnswerId answerId = bootstrap.getQuestionId();

    if (!connection.is<Connected>()) {
      // Disconnected; ignore.
      return;
    }

2184 2185
    VatNetworkBase::Connection& conn = *connection.get<Connected>();
    auto response = conn.newOutgoingMessage(
2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196 2197
        messageSizeHint<rpc::Return>() + sizeInWords<rpc::CapDescriptor>() + 32);

    rpc::Return::Builder ret = response->getBody().getAs<rpc::Message>().initReturn();
    ret.setAnswerId(answerId);

    kj::Own<ClientHook> capHook;
    kj::Array<ExportId> resultExports;
    KJ_DEFER(releaseExports(resultExports));  // in case something goes wrong

    // Call the restorer and initialize the answer.
    KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
      Capability::Client cap = nullptr;
2198 2199 2200 2201 2202

      if (bootstrap.hasDeprecatedObjectId()) {
        KJ_IF_MAYBE(r, restorer) {
          cap = r->baseRestore(bootstrap.getDeprecatedObjectId());
        } else {
2203 2204 2205 2206
          KJ_FAIL_REQUIRE("This vat only supports a bootstrap interface, not the old "
                          "Cap'n-Proto-0.4-style named exports.") { return; }
        }
      } else {
2207
        cap = bootstrapFactory.baseCreateFor(conn.baseGetPeerVatId());
2208 2209
      }

2210
      BuilderCapabilityTable capTable;
2211
      auto payload = ret.initResults();
2212
      capTable.imbue(payload.getContent()).setAs<Capability>(kj::mv(cap));
2213

2214 2215 2216 2217
      auto capTableArray = capTable.getTable();
      KJ_DASSERT(capTableArray.size() == 1);
      resultExports = writeDescriptors(capTableArray, payload);
      capHook = KJ_ASSERT_NONNULL(capTableArray[0])->addRef();
2218 2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237
    })) {
      fromException(*exception, ret.initException());
      capHook = newBrokenCap(kj::mv(*exception));
    }

    message = nullptr;

    // Add the answer to the answer table for pipelining and send the response.
    auto& answer = answers[answerId];
    KJ_REQUIRE(!answer.active, "questionId is already in use") {
      return;
    }

    answer.resultExports = kj::mv(resultExports);
    answer.active = true;
    answer.pipeline = kj::Own<PipelineHook>(kj::refcounted<SingleCapPipeline>(kj::mv(capHook)));

    response->send();
  }

Kenton Varda's avatar
Kenton Varda committed
2238
  void handleCall(kj::Own<IncomingRpcMessage>&& message, const rpc::Call::Reader& call) {
2239
    kj::Own<ClientHook> capability;
2240

Kenton Varda's avatar
Kenton Varda committed
2241 2242 2243 2244 2245
    KJ_IF_MAYBE(t, getMessageTarget(call.getTarget())) {
      capability = kj::mv(*t);
    } else {
      // Exception already reported.
      return;
2246 2247
    }

2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259
    bool redirectResults;
    switch (call.getSendResultsTo().which()) {
      case rpc::Call::SendResultsTo::CALLER:
        redirectResults = false;
        break;
      case rpc::Call::SendResultsTo::YOURSELF:
        redirectResults = true;
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported `Call.sendResultsTo`.") { return; }
    }

2260
    auto payload = call.getParams();
2261
    auto capTableArray = receiveCaps(payload.getCapTable());
Kenton Varda's avatar
Kenton Varda committed
2262 2263
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

2264
    AnswerId answerId = call.getQuestionId();
Kenton Varda's avatar
Kenton Varda committed
2265

2266
    auto context = kj::refcounted<RpcCallContext>(
2267
        *this, answerId, kj::mv(message), kj::mv(capTableArray), payload.getContent(),
2268
        redirectResults, kj::mv(cancelPaf.fulfiller));
2269

2270
    // No more using `call` after this point, as it now belongs to the context.
2271 2272

    {
2273
      auto& answer = answers[answerId];
2274 2275 2276 2277 2278 2279

      KJ_REQUIRE(!answer.active, "questionId is already in use") {
        return;
      }

      answer.active = true;
Kenton Varda's avatar
Kenton Varda committed
2280
      answer.callContext = *context;
Kenton Varda's avatar
Kenton Varda committed
2281 2282
    }

2283 2284
    auto promiseAndPipeline = startCall(
        call.getInterfaceId(), call.getMethodId(), kj::mv(capability), context->addRef());
Kenton Varda's avatar
Kenton Varda committed
2285

2286
    // Things may have changed -- in particular if startCall() immediately called
Kenton Varda's avatar
Kenton Varda committed
2287 2288 2289
    // context->directTailCall().

    {
2290
      auto& answer = answers[answerId];
Kenton Varda's avatar
Kenton Varda committed
2291

2292 2293
      answer.pipeline = kj::mv(promiseAndPipeline.pipeline);

2294
      if (redirectResults) {
Kenton Varda's avatar
Kenton Varda committed
2295
        auto resultsPromise = promiseAndPipeline.promise.then(
2296 2297 2298
            kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) {
              return context->consumeRedirectedResponse();
            }));
Kenton Varda's avatar
Kenton Varda committed
2299 2300

        // If the call that later picks up `redirectedResults` decides to discard it, we need to
2301
        // make sure our call is not itself canceled unless it has called allowCancellation().
Kenton Varda's avatar
Kenton Varda committed
2302 2303
        // So we fork the promise and join one branch with the cancellation promise, in order to
        // hold on to it.
2304
        auto forked = resultsPromise.fork();
Kenton Varda's avatar
Kenton Varda committed
2305 2306
        answer.redirectedResults = forked.addBranch();

2307 2308 2309
        cancelPaf.promise
            .exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}))
            .detach([](kj::Exception&&) {});
2310 2311 2312 2313 2314
      } else {
        // Hack:  Both the success and error continuations need to use the context.  We could
        //   refcount, but both will be destroyed at the same time anyway.
        RpcCallContext* contextPtr = context;

2315
        promiseAndPipeline.promise.then(
2316 2317 2318 2319 2320 2321 2322
            [contextPtr]() {
              contextPtr->sendReturn();
            }, [contextPtr](kj::Exception&& exception) {
              contextPtr->sendErrorReturn(kj::mv(exception));
            }).then([]() {}, [&](kj::Exception&& exception) {
              // Handle exceptions that occur in sendReturn()/sendErrorReturn().
              taskFailed(kj::mv(exception));
2323 2324 2325
            }).attach(kj::mv(context))
            .exclusiveJoin(kj::mv(cancelPaf.promise))
            .detach([](kj::Exception&&) {});
2326
      }
2327 2328 2329
    }
  }

2330 2331 2332 2333 2334 2335 2336 2337
  ClientHook::VoidPromiseAndPipeline startCall(
      uint64_t interfaceId, uint64_t methodId,
      kj::Own<ClientHook>&& capability, kj::Own<CallContextHook>&& context) {
    if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
      KJ_IF_MAYBE(g, gateway) {
        // Wait, this is a call to Persistent.save() and we need to translate it through our
        // gateway.

2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361 2362
        KJ_IF_MAYBE(resolvedPromise, capability->whenMoreResolved()) {
          // The plot thickens: We're looking at a promise capability. It could end up resolving
          // to a capability outside the gateway, in which case we don't want to translate at all.

          auto promises = resolvedPromise->then(kj::mvCapture(context,
              [this,interfaceId,methodId](kj::Own<CallContextHook>&& context,
                                          kj::Own<ClientHook> resolvedCap) {
            auto vpap = startCall(interfaceId, methodId, kj::mv(resolvedCap), kj::mv(context));
            return kj::tuple(kj::mv(vpap.promise), kj::mv(vpap.pipeline));
          })).attach(addRef(*this)).split();

          return {
            kj::mv(kj::get<0>(promises)),
            newLocalPromisePipeline(kj::mv(kj::get<1>(promises))),
          };
        }

        if (capability->getBrand() == this) {
          // This capability is one of our own, pointing back out over the network. That means
          // that it would be inappropriate to apply the gateway transformation. We just want to
          // reflect the call back.
          return kj::downcast<RpcClient>(*capability)
              .callNoIntercept(interfaceId, methodId, kj::mv(context));
        }

2363 2364 2365 2366 2367 2368 2369 2370 2371 2372 2373 2374 2375 2376 2377 2378
        auto params = context->getParams().getAs<Persistent<>::SaveParams>();

        auto requestSize = params.totalSize();
        ++requestSize.capCount;
        requestSize.wordCount += sizeInWords<RealmGateway<>::ExportParams>();

        auto request = g->exportRequest(requestSize);
        request.setCap(Persistent<>::Client(capability->addRef()));
        request.setParams(params);

        context->allowCancellation();
        context->releaseParams();
        return context->directTailCall(RequestHook::from(kj::mv(request)));
      }
    }

2379
    return capability->call(interfaceId, methodId, kj::mv(context));
2380 2381
  }

2382
  kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) {
Kenton Varda's avatar
Kenton Varda committed
2383
    switch (target.which()) {
2384 2385
      case rpc::MessageTarget::IMPORTED_CAP: {
        KJ_IF_MAYBE(exp, exports.find(target.getImportedCap())) {
Kenton Varda's avatar
Kenton Varda committed
2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396
          return exp->clientHook->addRef();
        } else {
          KJ_FAIL_REQUIRE("Message target is not a current export ID.") {
            return nullptr;
          }
        }
        break;
      }

      case rpc::MessageTarget::PROMISED_ANSWER: {
        auto promisedAnswer = target.getPromisedAnswer();
2397
        kj::Own<PipelineHook> pipeline;
Kenton Varda's avatar
Kenton Varda committed
2398

2399 2400 2401 2402 2403 2404 2405
        auto& base = answers[promisedAnswer.getQuestionId()];
        KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") {
          return nullptr;
        }
        KJ_IF_MAYBE(p, base.pipeline) {
          pipeline = p->get()->addRef();
        } else {
2406 2407
          pipeline = newBrokenPipeline(KJ_EXCEPTION(FAILED,
              "Pipeline call on a request that returned no capabilities or was already closed."));
Kenton Varda's avatar
Kenton Varda committed
2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422
        }

        KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
          return pipeline->getPipelinedCap(*ops);
        } else {
          // Exception already thrown.
          return nullptr;
        }
      }

      default:
        KJ_FAIL_REQUIRE("Unknown message target type.", target) {
          return nullptr;
        }
    }
2423 2424

    KJ_UNREACHABLE;
Kenton Varda's avatar
Kenton Varda committed
2425 2426
  }

Kenton Varda's avatar
Kenton Varda committed
2427
  void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
Kenton Varda's avatar
Kenton Varda committed
2428 2429
    // Transitive destructors can end up manipulating the question table and invalidating our
    // pointer into it, so make sure these destructors run later.
2430 2431
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2432
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease;
2433

2434
    KJ_IF_MAYBE(question, questions.find(ret.getAnswerId())) {
2435 2436
      KJ_REQUIRE(question->isAwaitingReturn, "Duplicate Return.") { return; }
      question->isAwaitingReturn = false;
Kenton Varda's avatar
Kenton Varda committed
2437

2438 2439
      if (ret.getReleaseParamCaps()) {
        exportsToRelease = kj::mv(question->paramExports);
Kenton Varda's avatar
Kenton Varda committed
2440
      } else {
2441
        question->paramExports = nullptr;
Kenton Varda's avatar
Kenton Varda committed
2442
      }
2443

2444 2445
      KJ_IF_MAYBE(questionRef, question->selfRef) {
        switch (ret.which()) {
2446
          case rpc::Return::RESULTS: {
2447 2448 2449
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `results`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2450
            }
Kenton Varda's avatar
Kenton Varda committed
2451

2452
            auto payload = ret.getResults();
2453
            auto capTableArray = receiveCaps(payload.getCapTable());
2454
            questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
2455 2456
                *this, kj::addRef(*questionRef), kj::mv(message),
                kj::mv(capTableArray), payload.getContent()));
2457
            break;
2458
          }
2459

2460 2461 2462 2463
          case rpc::Return::EXCEPTION:
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2464
            }
2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476

            questionRef->reject(toException(ret.getException()));
            break;

          case rpc::Return::CANCELED:
            KJ_FAIL_REQUIRE("Return message falsely claims call was canceled.") { return; }
            break;

          case rpc::Return::RESULTS_SENT_ELSEWHERE:
            KJ_REQUIRE(question->isTailCall,
                "`Return` had `resultsSentElsewhere` but this was not a tail call.") {
              return;
2477 2478
            }

2479 2480 2481 2482
            // Tail calls are fulfilled with a null pointer.
            questionRef->fulfill(kj::Own<RpcResponse>());
            break;

2483 2484
          case rpc::Return::TAKE_FROM_OTHER_QUESTION:
            KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2485 2486 2487
              KJ_IF_MAYBE(response, answer->redirectedResults) {
                questionRef->fulfill(kj::mv(*response));
              } else {
2488
                KJ_FAIL_REQUIRE("`Return.takeFromOtherQuestion` referenced a call that did not "
2489
                                "use `sendResultsTo.yourself`.") { return; }
2490 2491
              }
            } else {
2492
              KJ_FAIL_REQUIRE("`Return.takeFromOtherQuestion` had invalid answer ID.") { return; }
2493 2494
            }

2495
            break;
2496

2497 2498 2499 2500
          default:
            KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; }
        }
      } else {
2501
        if (ret.isTakeFromOtherQuestion()) {
2502
          // Be sure to release the tail call's promise.
2503
          KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2504 2505 2506
            promiseToRelease = kj::mv(answer->redirectedResults);
          }
        }
Kenton Varda's avatar
Kenton Varda committed
2507

2508 2509
        // Looks like this question was canceled earlier, so `Finish` was already sent, with
        // `releaseResultCaps` set true so that we don't have to release them here.  We can go
2510
        // ahead and delete it from the table.
2511
        questions.erase(ret.getAnswerId(), *question);
Kenton Varda's avatar
Kenton Varda committed
2512
      }
Kenton Varda's avatar
Kenton Varda committed
2513

Kenton Varda's avatar
Kenton Varda committed
2514 2515 2516 2517 2518
    } else {
      KJ_FAIL_REQUIRE("Invalid question ID in Return message.") { return; }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2519
  void handleFinish(const rpc::Finish::Reader& finish) {
Kenton Varda's avatar
Kenton Varda committed
2520 2521
    // Delay release of these things until return so that transitive destructors don't accidentally
    // modify the answer table and invalidate our pointer into it.
2522 2523
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2524
    Answer answerToRelease;
2525
    kj::Maybe<kj::Own<PipelineHook>> pipelineToRelease;
Kenton Varda's avatar
Kenton Varda committed
2526

2527
    KJ_IF_MAYBE(answer, answers.find(finish.getQuestionId())) {
2528
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2529

2530 2531 2532 2533
      if (finish.getReleaseResultCaps()) {
        exportsToRelease = kj::mv(answer->resultExports);
      } else {
        answer->resultExports = nullptr;
2534
      }
Kenton Varda's avatar
Kenton Varda committed
2535

2536 2537
      pipelineToRelease = kj::mv(answer->pipeline);

2538 2539 2540 2541 2542
      // If the call isn't actually done yet, cancel it.  Otherwise, we can go ahead and erase the
      // question from the table.
      KJ_IF_MAYBE(context, answer->callContext) {
        context->requestCancel();
      } else {
Kenton Varda's avatar
Kenton Varda committed
2543
        answerToRelease = answers.erase(finish.getQuestionId());
2544
      }
Kenton Varda's avatar
Kenton Varda committed
2545
    } else {
2546
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2547
    }
2548 2549
  }

2550 2551 2552
  // ---------------------------------------------------------------------------
  // Level 1

2553
  void handleResolve(const rpc::Resolve::Reader& resolve) {
2554
    kj::Own<ClientHook> replacement;
2555
    kj::Maybe<kj::Exception> exception;
2556 2557 2558 2559

    // Extract the replacement capability.
    switch (resolve.which()) {
      case rpc::Resolve::CAP:
2560 2561 2562 2563 2564
        KJ_IF_MAYBE(cap, receiveCap(resolve.getCap())) {
          replacement = kj::mv(*cap);
        } else {
          KJ_FAIL_REQUIRE("'Resolve' contained 'CapDescriptor.none'.") { return; }
        }
2565 2566 2567
        break;

      case rpc::Resolve::EXCEPTION:
2568 2569 2570 2571
        // We can't set `replacement` to a new broken cap here because this will confuse
        // PromiseClient::Resolve() into thinking that the remote promise resolved to a local
        // capability and therefore a Disembargo is needed. We must actually reject the promise.
        exception = toException(resolve.getException());
2572 2573 2574 2575 2576 2577 2578
        break;

      default:
        KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; }
    }

    // If the import is on the table, fulfill it.
2579
    KJ_IF_MAYBE(import, imports.find(resolve.getPromiseId())) {
2580 2581
      KJ_IF_MAYBE(fulfiller, import->promiseFulfiller) {
        // OK, this is in fact an unfulfilled promise!
2582 2583 2584 2585 2586
        KJ_IF_MAYBE(e, exception) {
          fulfiller->get()->reject(kj::mv(*e));
        } else {
          fulfiller->get()->fulfill(kj::mv(replacement));
        }
2587 2588 2589 2590 2591 2592 2593 2594
      } else if (import->importClient != nullptr) {
        // It appears this is a valid entry on the import table, but was not expected to be a
        // promise.
        KJ_FAIL_REQUIRE("Got 'Resolve' for a non-promise import.") { break; }
      }
    }
  }

2595
  void handleRelease(const rpc::Release::Reader& release) {
Kenton Varda's avatar
Kenton Varda committed
2596
    releaseExport(release.getId(), release.getReferenceCount());
2597 2598
  }

Kenton Varda's avatar
Kenton Varda committed
2599
  void releaseExport(ExportId id, uint refcount) {
2600
    KJ_IF_MAYBE(exp, exports.find(id)) {
2601
      KJ_REQUIRE(refcount <= exp->refcount, "Tried to drop export's refcount below zero.") {
Kenton Varda's avatar
Kenton Varda committed
2602
        return;
2603 2604 2605 2606
      }

      exp->refcount -= refcount;
      if (exp->refcount == 0) {
2607
        exportsByCap.erase(exp->clientHook);
2608
        exports.erase(id, *exp);
2609 2610 2611
      }
    } else {
      KJ_FAIL_REQUIRE("Tried to release invalid export ID.") {
Kenton Varda's avatar
Kenton Varda committed
2612
        return;
2613 2614 2615 2616
      }
    }
  }

2617 2618 2619 2620 2621 2622
  void releaseExports(kj::ArrayPtr<ExportId> exports) {
    for (auto exportId: exports) {
      releaseExport(exportId, 1);
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2623 2624 2625 2626
  void handleDisembargo(const rpc::Disembargo::Reader& disembargo) {
    auto context = disembargo.getContext();
    switch (context.which()) {
      case rpc::Disembargo::Context::SENDER_LOOPBACK: {
2627
        kj::Own<ClientHook> target;
Kenton Varda's avatar
Kenton Varda committed
2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649

        KJ_IF_MAYBE(t, getMessageTarget(disembargo.getTarget())) {
          target = kj::mv(*t);
        } else {
          // Exception already reported.
          return;
        }

        for (;;) {
          KJ_IF_MAYBE(r, target->getResolved()) {
            target = r->addRef();
          } else {
            break;
          }
        }

        KJ_REQUIRE(target->getBrand() == this,
                   "'Disembargo' of type 'senderLoopback' sent to an object that does not point "
                   "back to the sender.") {
          return;
        }

Kenton Varda's avatar
Kenton Varda committed
2650 2651 2652 2653
        EmbargoId embargoId = context.getSenderLoopback();

        // We need to insert an evalLater() here to make sure that any pending calls towards this
        // cap have had time to find their way through the event loop.
2654 2655
        tasks.add(kj::evalLater(kj::mvCapture(
            target, [this,embargoId](kj::Own<ClientHook>&& target) {
2656 2657 2658 2659
          if (!connection.is<Connected>()) {
            return;
          }

2660
          RpcClient& downcasted = kj::downcast<RpcClient>(*target);
Kenton Varda's avatar
Kenton Varda committed
2661

2662
          auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
2663 2664 2665 2666 2667 2668
              messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
          auto builder = message->getBody().initAs<rpc::Message>().initDisembargo();

          {
            auto redirect = downcasted.writeTarget(builder.initTarget());

2669
            // Disembargoes should only be sent to capabilities that were previously the subject of
Kenton Varda's avatar
Kenton Varda committed
2670
            // a `Resolve` message.  But `writeTarget` only ever returns non-null when called on
2671 2672 2673
            // a PromiseClient.  The code which sends `Resolve` and `Return` should have replaced
            // any promise with a direct node in order to solve the Tribble 4-way race condition.
            // See the documentation of Disembargo in rpc.capnp for more.
Kenton Varda's avatar
Kenton Varda committed
2674 2675
            KJ_REQUIRE(redirect == nullptr,
                       "'Disembargo' of type 'senderLoopback' sent to an object that does not "
2676
                       "appear to have been the subject of a previous 'Resolve' message.") {
Kenton Varda's avatar
Kenton Varda committed
2677 2678
              return;
            }
Kenton Varda's avatar
Kenton Varda committed
2679 2680
          }

Kenton Varda's avatar
Kenton Varda committed
2681
          builder.getContext().setReceiverLoopback(embargoId);
Kenton Varda's avatar
Kenton Varda committed
2682

Kenton Varda's avatar
Kenton Varda committed
2683 2684
          message->send();
        })));
Kenton Varda's avatar
Kenton Varda committed
2685 2686 2687 2688

        break;
      }

Kenton Varda's avatar
Kenton Varda committed
2689
      case rpc::Disembargo::Context::RECEIVER_LOOPBACK: {
2690
        KJ_IF_MAYBE(embargo, embargoes.find(context.getReceiverLoopback())) {
Kenton Varda's avatar
Kenton Varda committed
2691
          KJ_ASSERT_NONNULL(embargo->fulfiller)->fulfill();
2692
          embargoes.erase(context.getReceiverLoopback(), *embargo);
Kenton Varda's avatar
Kenton Varda committed
2693 2694 2695 2696 2697 2698
        } else {
          KJ_FAIL_REQUIRE("Invalid embargo ID in 'Disembargo.context.receiverLoopback'.") {
            return;
          }
        }
        break;
Kenton Varda's avatar
Kenton Varda committed
2699
      }
Kenton Varda's avatar
Kenton Varda committed
2700 2701 2702 2703 2704 2705

      default:
        KJ_FAIL_REQUIRE("Unimplemented Disembargo type.", disembargo) { return; }
    }
  }

2706 2707
  // ---------------------------------------------------------------------------
  // Level 2
2708 2709 2710 2711
};

}  // namespace

2712
class RpcSystemBase::Impl final: private BootstrapFactoryBase, private kj::TaskSet::ErrorHandler {
2713
public:
2714 2715 2716
  Impl(VatNetworkBase& network, kj::Maybe<Capability::Client> bootstrapInterface,
       kj::Maybe<RealmGateway<>::Client> gateway)
      : network(network), bootstrapInterface(kj::mv(bootstrapInterface)),
2717 2718 2719 2720 2721 2722
        bootstrapFactory(*this), gateway(kj::mv(gateway)), tasks(*this) {
    tasks.add(acceptLoop());
  }
  Impl(VatNetworkBase& network, BootstrapFactoryBase& bootstrapFactory,
       kj::Maybe<RealmGateway<>::Client> gateway)
      : network(network), bootstrapFactory(bootstrapFactory),
2723
        gateway(kj::mv(gateway)), tasks(*this) {
2724 2725 2726
    tasks.add(acceptLoop());
  }
  Impl(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2727
      : network(network), bootstrapFactory(*this), restorer(restorer), tasks(*this) {
2728 2729 2730 2731
    tasks.add(acceptLoop());
  }

  ~Impl() noexcept(false) {
2732 2733 2734 2735 2736
    unwindDetector.catchExceptionsIfUnwinding([&]() {
      // std::unordered_map doesn't like it when elements' destructors throw, so carefully
      // disassemble it.
      if (!connections.empty()) {
        kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size());
2737
        kj::Exception shutdownException = KJ_EXCEPTION(FAILED, "RpcSystem was destroyed.");
2738 2739 2740 2741
        for (auto& entry: connections) {
          entry.second->disconnect(kj::cp(shutdownException));
          deleteMe.add(kj::mv(entry.second));
        }
2742
      }
2743
    });
2744
  }
2745

2746
  Capability::Client bootstrap(AnyStruct::Reader vatId) {
2747 2748 2749 2750 2751
    // For now we delegate to restore() since it's equivalent, but eventually we'll remove restore()
    // and implement bootstrap() directly.
    return restore(vatId, AnyPointer::Reader());
  }

2752
  Capability::Client restore(AnyStruct::Reader vatId, AnyPointer::Reader objectId) {
2753
    KJ_IF_MAYBE(connection, network.baseConnect(vatId)) {
2754
      auto& state = getConnectionState(kj::mv(*connection));
2755 2756 2757 2758 2759 2760 2761
      return Capability::Client(state.restore(objectId));
    } else KJ_IF_MAYBE(r, restorer) {
      return r->baseRestore(objectId);
    } else {
      return Capability::Client(newBrokenCap(
          "SturdyRef referred to a local object but there is no local SturdyRef restorer."));
    }
2762 2763
  }

2764 2765 2766 2767 2768 2769 2770 2771
  void setFlowLimit(size_t words) {
    flowLimit = words;

    for (auto& conn: connections) {
      conn.second->setFlowLimit(words);
    }
  }

2772 2773
private:
  VatNetworkBase& network;
2774
  kj::Maybe<Capability::Client> bootstrapInterface;
2775
  BootstrapFactoryBase& bootstrapFactory;
2776
  kj::Maybe<RealmGateway<>::Client> gateway;
2777
  kj::Maybe<SturdyRefRestorerBase&> restorer;
2778
  size_t flowLimit = kj::maxValue;
2779 2780 2781 2782
  kj::TaskSet tasks;

  typedef std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>>
      ConnectionMap;
2783
  ConnectionMap connections;
Kenton Varda's avatar
Kenton Varda committed
2784

2785 2786
  kj::UnwindDetector unwindDetector;

2787 2788 2789
  RpcConnectionState& getConnectionState(kj::Own<VatNetworkBase::Connection>&& connection) {
    auto iter = connections.find(connection);
    if (iter == connections.end()) {
2790
      VatNetworkBase::Connection* connectionPtr = connection;
Kenton Varda's avatar
Kenton Varda committed
2791 2792 2793
      auto onDisconnect = kj::newPromiseAndFulfiller<RpcConnectionState::DisconnectInfo>();
      tasks.add(onDisconnect.promise
          .then([this,connectionPtr](RpcConnectionState::DisconnectInfo info) {
2794
        connections.erase(connectionPtr);
Kenton Varda's avatar
Kenton Varda committed
2795
        tasks.add(kj::mv(info.shutdownPromise));
2796 2797
      }));
      auto newState = kj::refcounted<RpcConnectionState>(
2798
          bootstrapFactory, gateway, restorer, kj::mv(connection),
2799
          kj::mv(onDisconnect.fulfiller), flowLimit);
2800
      RpcConnectionState& result = *newState;
2801
      connections.insert(std::make_pair(connectionPtr, kj::mv(newState)));
2802 2803 2804 2805 2806 2807 2808
      return result;
    } else {
      return *iter->second;
    }
  }

  kj::Promise<void> acceptLoop() {
2809
    auto receive = network.baseAccept().then(
2810
        [this](kj::Own<VatNetworkBase::Connection>&& connection) {
2811
      getConnectionState(kj::mv(connection));
2812 2813 2814 2815 2816 2817 2818 2819
    });
    return receive.then([this]() {
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
      tasks.add(acceptLoop());
    });
2820
  }
2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837

  Capability::Client baseCreateFor(AnyStruct::Reader clientId) override {
    // Implements BootstrapFactory::baseCreateFor() in terms of `bootstrapInterface` or `restorer`,
    // for use when we were given one of those instead of an actual `bootstrapFactory`.

    KJ_IF_MAYBE(cap, bootstrapInterface) {
      return *cap;
    } else KJ_IF_MAYBE(r, restorer) {
      return r->baseRestore(AnyPointer::Reader());
    } else {
      return KJ_EXCEPTION(FAILED, "This vat does not expose any public/bootstrap interfaces.");
    }
  }

  void taskFailed(kj::Exception&& exception) override {
    KJ_LOG(ERROR, exception);
  }
2838 2839
};

2840
RpcSystemBase::RpcSystemBase(VatNetworkBase& network,
2841 2842 2843
                             kj::Maybe<Capability::Client> bootstrapInterface,
                             kj::Maybe<RealmGateway<>::Client> gateway)
    : impl(kj::heap<Impl>(network, kj::mv(bootstrapInterface), kj::mv(gateway))) {}
2844 2845 2846 2847
RpcSystemBase::RpcSystemBase(VatNetworkBase& network,
                             BootstrapFactoryBase& bootstrapFactory,
                             kj::Maybe<RealmGateway<>::Client> gateway)
    : impl(kj::heap<Impl>(network, bootstrapFactory, kj::mv(gateway))) {}
2848
RpcSystemBase::RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2849
    : impl(kj::heap<Impl>(network, restorer)) {}
2850
RpcSystemBase::RpcSystemBase(RpcSystemBase&& other) noexcept = default;
2851 2852
RpcSystemBase::~RpcSystemBase() noexcept(false) {}

2853
Capability::Client RpcSystemBase::baseBootstrap(AnyStruct::Reader vatId) {
2854 2855 2856
  return impl->bootstrap(vatId);
}

2857
Capability::Client RpcSystemBase::baseRestore(
2858
    AnyStruct::Reader hostId, AnyPointer::Reader objectId) {
2859
  return impl->restore(hostId, objectId);
2860 2861
}

2862 2863 2864 2865
void RpcSystemBase::baseSetFlowLimit(size_t words) {
  return impl->setFlowLimit(words);
}

2866 2867
}  // namespace _ (private)
}  // namespace capnp