rpc.c++ 90.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
//    list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
//    this list of conditions and the following disclaimer in the documentation
//    and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#include "rpc.h"
25
#include "message.h"
26 27 28
#include <kj/debug.h>
#include <kj/vector.h>
#include <kj/async.h>
Kenton Varda's avatar
Kenton Varda committed
29
#include <kj/one-of.h>
Kenton Varda's avatar
Kenton Varda committed
30
#include <kj/function.h>
31
#include <unordered_map>
Kenton Varda's avatar
Kenton Varda committed
32
#include <map>
33 34 35 36 37 38 39 40
#include <queue>
#include <capnp/rpc.capnp.h>

namespace capnp {
namespace _ {  // private

namespace {

Kenton Varda's avatar
Kenton Varda committed
41 42 43 44 45 46 47 48 49
template <typename T>
inline constexpr uint messageSizeHint() {
  return 1 + sizeInWords<rpc::Message>() + sizeInWords<T>();
}
template <>
inline constexpr uint messageSizeHint<void>() {
  return 1 + sizeInWords<rpc::Message>();
}

50 51 52
constexpr const uint MESSAGE_TARGET_SIZE_HINT = sizeInWords<rpc::MessageTarget>() +
    sizeInWords<rpc::PromisedAnswer>() + 16;  // +16 for ops; hope that's enough

53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
constexpr const uint CAP_DESCRIPTOR_SIZE_HINT = sizeInWords<rpc::CapDescriptor>() +
    sizeInWords<rpc::PromisedAnswer>();

constexpr const uint64_t MAX_SIZE_HINT = 1 << 20;

uint copySizeHint(MessageSize size) {
  uint64_t sizeHint = size.wordCount + size.capCount * CAP_DESCRIPTOR_SIZE_HINT;
  return kj::min(MAX_SIZE_HINT, sizeHint);
}

uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint, uint additional) {
  KJ_IF_MAYBE(s, sizeHint) {
    return copySizeHint(*s) + additional;
  } else {
    return 0;
  }
}

Kenton Varda's avatar
Kenton Varda committed
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
kj::Maybe<kj::Array<PipelineOp>> toPipelineOps(List<rpc::PromisedAnswer::Op>::Reader ops) {
  auto result = kj::heapArrayBuilder<PipelineOp>(ops.size());
  for (auto opReader: ops) {
    PipelineOp op;
    switch (opReader.which()) {
      case rpc::PromisedAnswer::Op::NOOP:
        op.type = PipelineOp::NOOP;
        break;
      case rpc::PromisedAnswer::Op::GET_POINTER_FIELD:
        op.type = PipelineOp::GET_POINTER_FIELD;
        op.pointerIndex = opReader.getGetPointerField();
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported pipeline op.", (uint)opReader.which()) {
          return nullptr;
        }
    }
88
    result.add(op);
Kenton Varda's avatar
Kenton Varda committed
89 90 91 92 93
  }
  return result.finish();
}

Orphan<List<rpc::PromisedAnswer::Op>> fromPipelineOps(
Kenton Varda's avatar
Kenton Varda committed
94
    Orphanage orphanage, kj::ArrayPtr<const PipelineOp> ops) {
Kenton Varda's avatar
Kenton Varda committed
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
  auto result = orphanage.newOrphan<List<rpc::PromisedAnswer::Op>>(ops.size());
  auto builder = result.get();
  for (uint i: kj::indices(ops)) {
    rpc::PromisedAnswer::Op::Builder opBuilder = builder[i];
    switch (ops[i].type) {
      case PipelineOp::NOOP:
        opBuilder.setNoop();
        break;
      case PipelineOp::GET_POINTER_FIELD:
        opBuilder.setGetPointerField(ops[i].pointerIndex);
        break;
    }
  }
  return result;
}

Kenton Varda's avatar
Kenton Varda committed
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
kj::Exception toException(const rpc::Exception::Reader& exception) {
  kj::Exception::Nature nature =
      exception.getIsCallersFault()
          ? kj::Exception::Nature::PRECONDITION
          : kj::Exception::Nature::LOCAL_BUG;

  kj::Exception::Durability durability;
  switch (exception.getDurability()) {
    default:
    case rpc::Exception::Durability::PERMANENT:
      durability = kj::Exception::Durability::PERMANENT;
      break;
    case rpc::Exception::Durability::TEMPORARY:
      durability = kj::Exception::Durability::TEMPORARY;
      break;
    case rpc::Exception::Durability::OVERLOADED:
      durability = kj::Exception::Durability::OVERLOADED;
      break;
  }

Kenton Varda's avatar
Kenton Varda committed
131 132
  return kj::Exception(nature, durability, "(remote)", 0,
                       kj::str("remote exception: ", exception.getReason()));
Kenton Varda's avatar
Kenton Varda committed
133 134 135
}

void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) {
Kenton Varda's avatar
Kenton Varda committed
136 137 138
  // TODO(someday):  Indicate the remote server name as part of the stack trace.  Maybe even
  //   transmit stack traces?
  builder.setReason(exception.getDescription());
Kenton Varda's avatar
Kenton Varda committed
139 140 141 142 143 144 145 146 147 148 149 150 151 152
  builder.setIsCallersFault(exception.getNature() == kj::Exception::Nature::PRECONDITION);
  switch (exception.getDurability()) {
    case kj::Exception::Durability::PERMANENT:
      builder.setDurability(rpc::Exception::Durability::PERMANENT);
      break;
    case kj::Exception::Durability::TEMPORARY:
      builder.setDurability(rpc::Exception::Durability::TEMPORARY);
      break;
    case kj::Exception::Durability::OVERLOADED:
      builder.setDurability(rpc::Exception::Durability::OVERLOADED);
      break;
  }
}

Kenton Varda's avatar
Kenton Varda committed
153
uint exceptionSizeHint(const kj::Exception& exception) {
Kenton Varda's avatar
Kenton Varda committed
154
  return sizeInWords<rpc::Exception>() + exception.getDescription().size() / sizeof(word) + 1;
Kenton Varda's avatar
Kenton Varda committed
155 156
}

Kenton Varda's avatar
Kenton Varda committed
157
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
158

159 160 161 162 163 164 165 166 167 168 169 170 171
template <typename Id, typename T>
class ExportTable {
  // Table mapping integers to T, where the integers are chosen locally.

public:
  kj::Maybe<T&> find(Id id) {
    if (id < slots.size() && slots[id] != nullptr) {
      return slots[id];
    } else {
      return nullptr;
    }
  }

172
  T erase(Id id, T& entry) {
Kenton Varda's avatar
Kenton Varda committed
173 174
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
175 176 177 178 179 180 181 182
    // `entry` is a reference to the entry being released -- we require this in order to prove
    // that the caller has already done a find() to check that this entry exists.  We can't check
    // ourselves because the caller may have nullified the entry in the meantime.
    KJ_DREQUIRE(&entry == &slots[id]);
    T toRelease = kj::mv(slots[id]);
    slots[id] = T();
    freeIds.push(id);
    return toRelease;
183 184 185 186 187 188 189 190 191 192 193 194 195
  }

  T& next(Id& id) {
    if (freeIds.empty()) {
      id = slots.size();
      return slots.add();
    } else {
      id = freeIds.top();
      freeIds.pop();
      return slots[id];
    }
  }

196 197 198 199 200 201 202 203 204
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i = 0; i < slots.size(); i++) {
      if (slots[i] != nullptr) {
        func(i, slots[i]);
      }
    }
  }

205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
private:
  kj::Vector<T> slots;
  std::priority_queue<Id, std::vector<Id>, std::greater<Id>> freeIds;
};

template <typename Id, typename T>
class ImportTable {
  // Table mapping integers to T, where the integers are chosen remotely.

public:
  T& operator[](Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      return high[id];
    }
  }

Kenton Varda's avatar
Kenton Varda committed
223 224 225 226 227
  kj::Maybe<T&> find(Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      auto iter = high.find(id);
Kenton Varda's avatar
Kenton Varda committed
228
      if (iter == high.end()) {
Kenton Varda's avatar
Kenton Varda committed
229 230 231 232 233 234 235
        return nullptr;
      } else {
        return iter->second;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
236 237 238
  T erase(Id id) {
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
239
    if (id < kj::size(low)) {
Kenton Varda's avatar
Kenton Varda committed
240
      T toRelease = kj::mv(low[id]);
241
      low[id] = T();
Kenton Varda's avatar
Kenton Varda committed
242
      return toRelease;
243
    } else {
Kenton Varda's avatar
Kenton Varda committed
244
      T toRelease = kj::mv(high[id]);
245
      high.erase(id);
Kenton Varda's avatar
Kenton Varda committed
246
      return toRelease;
247 248 249
    }
  }

250 251 252 253 254 255 256 257 258 259
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i: kj::indices(low)) {
      func(i, low[i]);
    }
    for (auto& entry: high) {
      func(entry.first, entry.second);
    }
  }

260 261 262 263 264
private:
  T low[16];
  std::unordered_map<Id, T> high;
};

Kenton Varda's avatar
Kenton Varda committed
265 266
// =======================================================================================

267
class RpcConnectionState final: public kj::TaskSet::ErrorHandler, public kj::Refcounted {
268
public:
269
  RpcConnectionState(kj::Maybe<SturdyRefRestorerBase&> restorer,
270
                     kj::Own<VatNetworkBase::Connection>&& connectionParam,
271
                     kj::Own<kj::PromiseFulfiller<void>>&& disconnectFulfiller)
272 273
      : restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), tasks(*this) {
    connection.init<Connected>(kj::mv(connectionParam));
274 275 276
    tasks.add(messageLoop());
  }

277
  kj::Own<ClientHook> restore(AnyPointer::Reader objectId) {
278 279 280 281
    if (connection.is<Disconnected>()) {
      return newBrokenCap(kj::cp(connection.get<Disconnected>()));
    }

282
    QuestionId questionId;
283
    auto& question = questions.next(questionId);
284

285
    question.isAwaitingReturn = true;
286

287
    auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
Kenton Varda's avatar
Kenton Varda committed
288

289 290
    auto questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller));
    question.selfRef = *questionRef;
Kenton Varda's avatar
Kenton Varda committed
291

292
    paf.promise = paf.promise.attach(kj::addRef(*questionRef));
293 294

    {
295
      auto message = connection.get<Connected>()->newOutgoingMessage(
296
          objectId.targetSize().wordCount + messageSizeHint<rpc::Restore>());
297 298 299

      auto builder = message->getBody().initAs<rpc::Message>().initRestore();
      builder.setQuestionId(questionId);
300
      builder.getObjectId().set(objectId);
301 302 303 304

      message->send();
    }

305
    auto pipeline = kj::refcounted<RpcPipeline>(*this, kj::mv(questionRef), kj::mv(paf.promise));
306

307
    return pipeline->getPipelinedCap(kj::Array<const PipelineOp>(nullptr));
Kenton Varda's avatar
Kenton Varda committed
308 309
  }

310
  void taskFailed(kj::Exception&& exception) override {
311 312 313 314
    disconnect(kj::mv(exception));
  }

  void disconnect(kj::Exception&& exception) {
315 316 317 318 319 320 321 322 323 324
    if (!connection.is<Connected>()) {
      // Already disconnected.
      return;
    }

    kj::Exception networkException(
        kj::Exception::Nature::NETWORK_FAILURE, kj::Exception::Durability::PERMANENT,
        __FILE__, __LINE__, kj::str("Disconnected: ", exception.getDescription()));

    KJ_IF_MAYBE(newException, kj::runCatchingExceptions([&]() {
325 326
      // Carefully pull all the objects out of the tables prior to releasing them because their
      // destructors could come back and mess with the tables.
327 328 329 330
      kj::Vector<kj::Own<PipelineHook>> pipelinesToRelease;
      kj::Vector<kj::Own<ClientHook>> clientsToRelease;
      kj::Vector<kj::Promise<kj::Own<RpcResponse>>> tailCallsToRelease;
      kj::Vector<kj::Promise<void>> resolveOpsToRelease;
331 332

      // All current questions complete with exceptions.
333
      questions.forEach([&](QuestionId id, Question& question) {
Kenton Varda's avatar
Kenton Varda committed
334
        KJ_IF_MAYBE(questionRef, question.selfRef) {
335 336
          // QuestionRef still present.
          questionRef->reject(kj::cp(networkException));
Kenton Varda's avatar
Kenton Varda committed
337
        }
338 339
      });

340
      answers.forEach([&](AnswerId id, Answer& answer) {
341 342 343 344
        KJ_IF_MAYBE(p, answer.pipeline) {
          pipelinesToRelease.add(kj::mv(*p));
        }

345
        KJ_IF_MAYBE(promise, answer.redirectedResults) {
346
          tailCallsToRelease.add(kj::mv(*promise));
347 348
        }

349 350 351 352 353
        KJ_IF_MAYBE(context, answer.callContext) {
          context->requestCancel();
        }
      });

354
      exports.forEach([&](ExportId id, Export& exp) {
355
        clientsToRelease.add(kj::mv(exp.clientHook));
356
        resolveOpsToRelease.add(kj::mv(exp.resolveOp));
357 358 359
        exp = Export();
      });

360
      imports.forEach([&](ImportId id, Import& import) {
361 362
        KJ_IF_MAYBE(f, import.promiseFulfiller) {
          f->get()->reject(kj::cp(networkException));
363 364 365
        }
      });

366
      embargoes.forEach([&](EmbargoId id, Embargo& embargo) {
367 368 369 370
        KJ_IF_MAYBE(f, embargo.fulfiller) {
          f->get()->reject(kj::cp(networkException));
        }
      });
371 372 373 374 375
    })) {
      // Some destructor must have thrown an exception.  There is no appropriate place to report
      // these errors.
      KJ_LOG(ERROR, "Uncaught exception when destroying capabilities dropped by disconnect.",
             *newException);
376 377
    }

378 379 380
    // Send an abort message, but ignore failure.
    kj::runCatchingExceptions([&]() {
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
381
          messageSizeHint<void>() + exceptionSizeHint(exception));
382 383
      fromException(exception, message->getBody().getAs<rpc::Message>().initAbort());
      message->send();
384
    });
385 386 387

    // Indicate disconnect.
    disconnectFulfiller->fulfill();
388
    connection.init<Disconnected>(kj::mv(networkException));
389 390
  }

391
private:
392
  class RpcClient;
Kenton Varda's avatar
Kenton Varda committed
393
  class ImportClient;
394
  class PromiseClient;
Kenton Varda's avatar
Kenton Varda committed
395
  class QuestionRef;
Kenton Varda's avatar
Kenton Varda committed
396
  class RpcPipeline;
Kenton Varda's avatar
Kenton Varda committed
397
  class RpcCallContext;
Kenton Varda's avatar
Kenton Varda committed
398
  class RpcResponse;
Kenton Varda's avatar
Kenton Varda committed
399

400 401 402 403 404 405
  // =======================================================================================
  // The Four Tables entry types
  //
  // We have to define these before we can define the class's fields.

  typedef uint32_t QuestionId;
406
  typedef QuestionId AnswerId;
407
  typedef uint32_t ExportId;
408 409 410 411 412 413 414 415 416 417
  typedef ExportId ImportId;
  // See equivalent definitions in rpc.capnp.
  //
  // We always use the type that refers to the local table of the same name.  So e.g. although
  // QuestionId and AnswerId are the same type, we use QuestionId when referring to an entry in
  // the local question table (which corresponds to the peer's answer table) and use AnswerId
  // to refer to an entry in our answer table (which corresponds to the peer's question table).
  // Since all messages in the RPC protocol are defined from the sender's point of view, this
  // means that any time we read an ID from a received message, its type should invert.
  // TODO(cleanup):  Perhaps we could enforce that in a type-safe way?  Hmm...
418 419

  struct Question {
420 421 422
    kj::Array<ExportId> paramExports;
    // List of exports that were sent in the request.  If the response has `releaseParamCaps` these
    // will need to be released.
423

Kenton Varda's avatar
Kenton Varda committed
424 425 426
    kj::Maybe<QuestionRef&> selfRef;
    // The local QuestionRef, set to nullptr when it is destroyed, which is also when `Finish` is
    // sent.
427

428 429 430
    bool isAwaitingReturn = false;
    // True from when `Call` is sent until `Return` is received.

431 432 433
    bool isTailCall = false;
    // Is this a tail call?  If so, we don't expect to receive results in the `Return`.

Kenton Varda's avatar
Kenton Varda committed
434
    inline bool operator==(decltype(nullptr)) const {
435
      return !isAwaitingReturn && selfRef == nullptr;
Kenton Varda's avatar
Kenton Varda committed
436 437
    }
    inline bool operator!=(decltype(nullptr)) const { return !operator==(nullptr); }
438 439 440
  };

  struct Answer {
441 442 443 444 445 446
    Answer() = default;
    Answer(const Answer&) = delete;
    Answer(Answer&&) = default;
    Answer& operator=(Answer&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

447 448 449 450
    bool active = false;
    // True from the point when the Call message is received to the point when both the `Finish`
    // message has been received and the `Return` has been sent.

451
    kj::Maybe<kj::Own<PipelineHook>> pipeline;
452 453
    // Send pipelined calls here.  Becomes null as soon as a `Finish` is received.

454
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> redirectedResults;
455 456
    // For locally-redirected calls (Call.sendResultsTo.yourself), this is a promise for the call
    // result, to be picked up by a subsequent `Return`.
457

458
    kj::Maybe<RpcCallContext&> callContext;
459 460
    // The call context, if it's still active.  Becomes null when the `Return` message is sent.
    // This object, if non-null, is owned by `asyncOp`.
461

462 463 464
    kj::Array<ExportId> resultExports;
    // List of exports that were sent in the results.  If the finish has `releaseResultCaps` these
    // will need to be released.
465 466 467 468 469 470
  };

  struct Export {
    uint refcount = 0;
    // When this reaches 0, drop `clientHook` and free this export.

471
    kj::Own<ClientHook> clientHook;
472

473 474 475 476
    kj::Promise<void> resolveOp = nullptr;
    // If this export is a promise (not a settled capability), the `resolveOp` represents the
    // ongoing operation to wait for that promise to resolve and then send a `Resolve` message.

477 478 479 480 481 482 483 484 485 486 487
    inline bool operator==(decltype(nullptr)) const { return refcount == 0; }
    inline bool operator!=(decltype(nullptr)) const { return refcount != 0; }
  };

  struct Import {
    Import() = default;
    Import(const Import&) = delete;
    Import(Import&&) = default;
    Import& operator=(Import&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

488
    kj::Maybe<ImportClient&> importClient;
489 490
    // Becomes null when the import is destroyed.

491 492 493 494 495
    kj::Maybe<RpcClient&> appClient;
    // Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient.
    // Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is
    // resolved and the import is no longer needed).

496
    kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Own<ClientHook>>>> promiseFulfiller;
497 498 499
    // If non-null, the import is a promise.
  };

Kenton Varda's avatar
Kenton Varda committed
500 501 502 503 504 505 506 507 508 509 510 511
  typedef uint32_t EmbargoId;

  struct Embargo {
    // For handling the `Disembargo` message when looping back to self.

    kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
    // Fulfill this when the Disembargo arrives.

    inline bool operator==(decltype(nullptr)) const { return fulfiller == nullptr; }
    inline bool operator!=(decltype(nullptr)) const { return fulfiller != nullptr; }
  };

512 513 514 515
  // =======================================================================================
  // OK, now we can define RpcConnectionState's member data.

  kj::Maybe<SturdyRefRestorerBase&> restorer;
516 517 518 519 520 521 522

  typedef kj::Own<VatNetworkBase::Connection> Connected;
  typedef kj::Exception Disconnected;
  kj::OneOf<Connected, Disconnected> connection;
  // Once the connection has failed, we drop it and replace it with an exception, which will be
  // thrown from all further calls.

523 524
  kj::Own<kj::PromiseFulfiller<void>> disconnectFulfiller;

525 526
  ExportTable<ExportId, Export> exports;
  ExportTable<QuestionId, Question> questions;
527 528
  ImportTable<AnswerId, Answer> answers;
  ImportTable<ImportId, Import> imports;
529 530 531 532 533 534 535 536 537
  // The Four Tables!
  // The order of the tables is important for correct destruction.

  std::unordered_map<ClientHook*, ExportId> exportsByCap;
  // Maps already-exported ClientHook objects to their ID in the export table.

  ExportTable<EmbargoId, Embargo> embargoes;
  // There are only four tables.  This definitely isn't a fifth table.  I don't know what you're
  // talking about.
538 539 540 541

  kj::TaskSet tasks;

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
542
  // ClientHook implementations
543

Kenton Varda's avatar
Kenton Varda committed
544
  class RpcClient: public ClientHook, public kj::Refcounted {
545
  public:
546
    RpcClient(RpcConnectionState& connectionState)
547
        : connectionState(kj::addRef(connectionState)) {}
548

549 550 551 552
    virtual kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) = 0;
    // Writes a CapDescriptor referencing this client.  The CapDescriptor must be sent as part of
    // the very next message sent on the connection, as it may become invalid if other things
    // happen.
Kenton Varda's avatar
Kenton Varda committed
553 554 555 556
    //
    // If writing the descriptor adds a new export to the export table, or increments the refcount
    // on an existing one, then the ID is returned and the caller is responsible for removing it
    // later.
Kenton Varda's avatar
Kenton Varda committed
557

558 559
    virtual kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) = 0;
Kenton Varda's avatar
Kenton Varda committed
560
    // Writes the appropriate call target for calls to this capability and returns null.
Kenton Varda's avatar
Kenton Varda committed
561
    //
Kenton Varda's avatar
Kenton Varda committed
562 563 564 565
    // - OR -
    //
    // If calls have been redirected to some other local ClientHook, returns that hook instead.
    // This can happen if the capability represents a promise that has been resolved.
Kenton Varda's avatar
Kenton Varda committed
566

567
    virtual kj::Own<ClientHook> getInnermostClient() = 0;
Kenton Varda's avatar
Kenton Varda committed
568 569 570 571
    // If this client just wraps some other client -- even if it is only *temporarily* wrapping
    // that other client -- return a reference to the other client, transitively.  Otherwise,
    // return a new reference to *this.

Kenton Varda's avatar
Kenton Varda committed
572 573
    // implements ClientHook -----------------------------------------

574
    Request<AnyPointer, AnyPointer> newCall(
575
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
576 577 578 579
      if (!connectionState->connection.is<Connected>()) {
        return newBrokenRequest(kj::cp(connectionState->connection.get<Disconnected>()), sizeHint);
      }

580
      auto request = kj::heap<RpcRequest>(
581 582
          *connectionState, *connectionState->connection.get<Connected>(),
          sizeHint, kj::addRef(*this));
583 584 585 586 587 588
      auto callBuilder = request->getCall();

      callBuilder.setInterfaceId(interfaceId);
      callBuilder.setMethodId(methodId);

      auto root = request->getRoot();
589
      return Request<AnyPointer, AnyPointer>(root, kj::mv(request));
590 591
    }

Kenton Varda's avatar
Kenton Varda committed
592
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
593
                                kj::Own<CallContextHook>&& context) override {
594 595
      // Implement call() by copying params and results messages.

Kenton Varda's avatar
Kenton Varda committed
596
      auto params = context->getParams();
597
      auto request = newCall(interfaceId, methodId, params.targetSize());
Kenton Varda's avatar
Kenton Varda committed
598

599
      request.set(params);
Kenton Varda's avatar
Kenton Varda committed
600 601
      context->releaseParams();

602
      // We can and should propagate cancellation.
603
      context->allowCancellation();
604

605
      return context->directTailCall(RequestHook::from(kj::mv(request)));
Kenton Varda's avatar
Kenton Varda committed
606 607
    }

608
    kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
609 610
      return kj::addRef(*this);
    }
611
    const void* getBrand() override {
612
      return connectionState.get();
Kenton Varda's avatar
Kenton Varda committed
613 614 615
    }

  protected:
616
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
617 618
  };

619 620 621 622
  class ImportClient final: public RpcClient {
    // A ClientHook that wraps an entry in the import table.

  public:
623
    ImportClient(RpcConnectionState& connectionState, ImportId importId)
Kenton Varda's avatar
Kenton Varda committed
624 625
        : RpcClient(connectionState), importId(importId) {}

Kenton Varda's avatar
Kenton Varda committed
626
    ~ImportClient() noexcept(false) {
627
      unwindDetector.catchExceptionsIfUnwinding([&]() {
628
        // Remove self from the import table, if the table is still pointing at us.
629 630 631 632 633
        KJ_IF_MAYBE(import, connectionState->imports.find(importId)) {
          KJ_IF_MAYBE(i, import->importClient) {
            if (i == this) {
              connectionState->imports.erase(importId);
            }
634 635
          }
        }
Kenton Varda's avatar
Kenton Varda committed
636

637
        // Send a message releasing our remote references.
638 639
        if (remoteRefcount > 0 && connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
640 641 642 643 644 645 646
              messageSizeHint<rpc::Release>());
          rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease();
          builder.setId(importId);
          builder.setReferenceCount(remoteRefcount);
          message->send();
        }
      });
647 648
    }

649 650 651
    void addRemoteRef() {
      // Add a new RemoteRef and return a new ref to this client representing it.
      ++remoteRefcount;
Kenton Varda's avatar
Kenton Varda committed
652
    }
653

654
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
655
      descriptor.setReceiverHosted(importId);
Kenton Varda's avatar
Kenton Varda committed
656
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
657 658
    }

659 660
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
661
      target.setImportedCap(importId);
Kenton Varda's avatar
Kenton Varda committed
662
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
663 664
    }

665
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
666 667 668
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
669
    // implements ClientHook -----------------------------------------
670

671
    kj::Maybe<ClientHook&> getResolved() override {
672 673 674
      return nullptr;
    }

675
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
676 677 678
      return nullptr;
    }

Kenton Varda's avatar
Kenton Varda committed
679
  private:
680
    ImportId importId;
Kenton Varda's avatar
Kenton Varda committed
681 682 683

    uint remoteRefcount = 0;
    // Number of times we've received this import from the peer.
684 685

    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
686 687
  };

688 689 690
  class PipelineClient final: public RpcClient {
    // A ClientHook representing a pipelined promise.  Always wrapped in PromiseClient.

Kenton Varda's avatar
Kenton Varda committed
691
  public:
692 693
    PipelineClient(RpcConnectionState& connectionState,
                   kj::Own<QuestionRef>&& questionRef,
694
                   kj::Array<PipelineOp>&& ops)
Kenton Varda's avatar
Kenton Varda committed
695
        : RpcClient(connectionState), questionRef(kj::mv(questionRef)), ops(kj::mv(ops)) {}
Kenton Varda's avatar
Kenton Varda committed
696

697
   kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
698 699 700 701 702
      auto promisedAnswer = descriptor.initReceiverAnswer();
      promisedAnswer.setQuestionId(questionRef->getId());
      promisedAnswer.adoptTransform(fromPipelineOps(
          Orphanage::getForMessageContaining(descriptor), ops));
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
703 704
    }

705 706
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
Kenton Varda's avatar
Kenton Varda committed
707 708 709 710
      auto builder = target.initPromisedAnswer();
      builder.setQuestionId(questionRef->getId());
      builder.adoptTransform(fromPipelineOps(Orphanage::getForMessageContaining(builder), ops));
      return nullptr;
711 712
    }

713
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
714 715 716
      return kj::addRef(*this);
    }

717
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
718

719
    kj::Maybe<ClientHook&> getResolved() override {
720 721 722
      return nullptr;
    }

723
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
724
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
725 726 727
    }

  private:
728
    kj::Own<QuestionRef> questionRef;
729
    kj::Array<PipelineOp> ops;
Kenton Varda's avatar
Kenton Varda committed
730 731
  };

732 733 734 735
  class PromiseClient final: public RpcClient {
    // A ClientHook that initially wraps one client (in practice, an ImportClient or a
    // PipelineClient) and then, later on, redirects to some other client.

Kenton Varda's avatar
Kenton Varda committed
736
  public:
737 738 739
    PromiseClient(RpcConnectionState& connectionState,
                  kj::Own<ClientHook> initial,
                  kj::Promise<kj::Own<ClientHook>> eventual,
740
                  kj::Maybe<ImportId> importId)
741
        : RpcClient(connectionState),
742 743
          isResolved(false),
          cap(kj::mv(initial)),
744
          importId(importId),
745 746 747
          fork(eventual.fork()),
          resolveSelfPromise(fork.addBranch().then(
              [this](kj::Own<ClientHook>&& resolution) {
748
                resolve(kj::mv(resolution), false);
749
              }, [this](kj::Exception&& exception) {
750
                resolve(newBrokenCap(kj::mv(exception)), true);
751 752 753 754
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
755
              })) {
756 757 758 759 760 761
      // Create a client that starts out forwarding all calls to `initial` but, once `eventual`
      // resolves, will forward there instead.  In addition, `whenMoreResolved()` will return a fork
      // of `eventual`.  Note that this means the application could hold on to `eventual` even after
      // the `PromiseClient` is destroyed; `eventual` must therefore make sure to hold references to
      // anything that needs to stay alive in order to resolve it correctly (such as making sure the
      // import ID is not released).
Kenton Varda's avatar
Kenton Varda committed
762
    }
Kenton Varda's avatar
Kenton Varda committed
763

764 765 766 767 768 769
    ~PromiseClient() noexcept(false) {
      KJ_IF_MAYBE(id, importId) {
        // This object is representing an import promise.  That means the import table may still
        // contain a pointer back to it.  Remove that pointer.  Note that we have to verify that
        // the import still exists and the pointer still points back to this object because this
        // object may actually outlive the import.
770
        KJ_IF_MAYBE(import, connectionState->imports.find(*id)) {
771 772 773 774 775 776 777 778 779
          KJ_IF_MAYBE(c, import->appClient) {
            if (c == this) {
              import->appClient = nullptr;
            }
          }
        }
      }
    }

780
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
781
      receivedCall = true;
782
      return connectionState->writeDescriptor(*cap, descriptor);
Kenton Varda's avatar
Kenton Varda committed
783
    }
Kenton Varda's avatar
Kenton Varda committed
784

785 786
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
787
      receivedCall = true;
788
      return connectionState->writeTarget(*cap, target);
Kenton Varda's avatar
Kenton Varda committed
789 790
    }

791
    kj::Own<ClientHook> getInnermostClient() override {
792
      receivedCall = true;
793
      return connectionState->getInnermostClient(*cap);
Kenton Varda's avatar
Kenton Varda committed
794 795
    }

Kenton Varda's avatar
Kenton Varda committed
796
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
797

798
    Request<AnyPointer, AnyPointer> newCall(
799
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
800
      receivedCall = true;
801
      return cap->newCall(interfaceId, methodId, sizeHint);
802 803
    }

804
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
805
                                kj::Own<CallContextHook>&& context) override {
806
      receivedCall = true;
807
      return cap->call(interfaceId, methodId, kj::mv(context));
808 809
    }

810
    kj::Maybe<ClientHook&> getResolved() override {
811 812
      if (isResolved) {
        return *cap;
813 814 815
      } else {
        return nullptr;
      }
Kenton Varda's avatar
Kenton Varda committed
816 817
    }

818
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
819
      return fork.addBranch();
Kenton Varda's avatar
Kenton Varda committed
820
    }
Kenton Varda's avatar
Kenton Varda committed
821 822

  private:
823 824
    bool isResolved;
    kj::Own<ClientHook> cap;
825

826
    kj::Maybe<ImportId> importId;
827
    kj::ForkedPromise<kj::Own<ClientHook>> fork;
Kenton Varda's avatar
Kenton Varda committed
828 829 830 831 832

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

833
    bool receivedCall = false;
Kenton Varda's avatar
Kenton Varda committed
834

835
    void resolve(kj::Own<ClientHook> replacement, bool isError) {
836 837
      if (replacement->getBrand() != connectionState.get() && receivedCall && !isError &&
          connectionState->connection.is<Connected>()) {
838 839 840 841 842
        // The new capability is hosted locally, not on the remote machine.  And, we had made calls
        // to the promise.  We need to make sure those calls echo back to us before we allow new
        // calls to go directly to the local capability, so we need to set a local embargo and send
        // a `Disembargo` to echo through the peer.

843
        auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
844
            messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
Kenton Varda's avatar
Kenton Varda committed
845

Kenton Varda's avatar
Kenton Varda committed
846
        auto disembargo = message->getBody().initAs<rpc::Message>().initDisembargo();
Kenton Varda's avatar
Kenton Varda committed
847 848

        {
849
          auto redirect = connectionState->writeTarget(*cap, disembargo.initTarget());
Kenton Varda's avatar
Kenton Varda committed
850 851 852 853 854
          KJ_ASSERT(redirect == nullptr,
                    "Original promise target should always be from this RPC connection.");
        }

        EmbargoId embargoId;
855
        Embargo& embargo = connectionState->embargoes.next(embargoId);
Kenton Varda's avatar
Kenton Varda committed
856 857 858 859 860 861 862

        disembargo.getContext().setSenderLoopback(embargoId);

        auto paf = kj::newPromiseAndFulfiller<void>();
        embargo.fulfiller = kj::mv(paf.fulfiller);

        // Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back.
863 864
        auto embargoPromise = paf.promise.then(
            kj::mvCapture(replacement, [this](kj::Own<ClientHook>&& replacement) {
Kenton Varda's avatar
Kenton Varda committed
865 866 867 868 869
              return kj::mv(replacement);
            }));

        // We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise
        // client instead.
870
        replacement = newLocalPromiseClient(kj::mv(embargoPromise));
Kenton Varda's avatar
Kenton Varda committed
871 872 873 874 875

        // Send the `Disembargo`.
        message->send();
      }

876 877
      cap = replacement->addRef();
      isResolved = true;
Kenton Varda's avatar
Kenton Varda committed
878
    }
Kenton Varda's avatar
Kenton Varda committed
879
  };
880

881
  kj::Maybe<ExportId> writeDescriptor(ClientHook& cap, rpc::CapDescriptor::Builder descriptor) {
882
    // Write a descriptor for the given capability.
Kenton Varda's avatar
Kenton Varda committed
883

884
    // Find the innermost wrapped capability.
885
    ClientHook* inner = &cap;
886 887 888 889 890 891 892 893 894
    for (;;) {
      KJ_IF_MAYBE(resolved, inner->getResolved()) {
        inner = resolved;
      } else {
        break;
      }
    }

    if (inner->getBrand() == this) {
895
      return kj::downcast<RpcClient>(*inner).writeDescriptor(descriptor);
Kenton Varda's avatar
Kenton Varda committed
896
    } else {
897 898
      auto iter = exportsByCap.find(inner);
      if (iter != exportsByCap.end()) {
899
        // We've already seen and exported this capability before.  Just up the refcount.
900
        auto& exp = KJ_ASSERT_NONNULL(exports.find(iter->second));
Kenton Varda's avatar
Kenton Varda committed
901 902 903 904
        ++exp.refcount;
        descriptor.setSenderHosted(iter->second);
        return iter->second;
      } else {
905
        // This is the first time we've seen this capability.
Kenton Varda's avatar
Kenton Varda committed
906
        ExportId exportId;
907 908
        auto& exp = exports.next(exportId);
        exportsByCap[inner] = exportId;
Kenton Varda's avatar
Kenton Varda committed
909
        exp.refcount = 1;
910 911 912 913
        exp.clientHook = inner->addRef();

        KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) {
          // This is a promise.  Arrange for the `Resolve` message to be sent later.
Kenton Varda's avatar
Kenton Varda committed
914
          exp.resolveOp = resolveExportedPromise(exportId, kj::mv(*wrapped));
915 916 917
          descriptor.setSenderPromise(exportId);
        } else {
          descriptor.setSenderHosted(exportId);
918 919
        }

Kenton Varda's avatar
Kenton Varda committed
920 921
        return exportId;
      }
Kenton Varda's avatar
Kenton Varda committed
922 923 924
    }
  }

925
  kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> capTable,
926 927 928 929
                                       rpc::Payload::Builder payload) {
    auto capTableBuilder = payload.initCapTable(capTable.size());
    kj::Vector<ExportId> exports(capTable.size());
    for (uint i: kj::indices(capTable)) {
930 931 932 933 934 935
      KJ_IF_MAYBE(cap, capTable[i]) {
        KJ_IF_MAYBE(exportId, writeDescriptor(**cap, capTableBuilder[i])) {
          exports.add(*exportId);
        }
      } else {
        capTableBuilder[i].setNone();
936 937 938 939 940
      }
    }
    return exports.releaseAsArray();
  }

941
  kj::Maybe<kj::Own<ClientHook>> writeTarget(ClientHook& cap, rpc::MessageTarget::Builder target) {
Kenton Varda's avatar
Kenton Varda committed
942 943 944 945 946 947 948 949 950 951 952
    // If calls to the given capability should pass over this connection, fill in `target`
    // appropriately for such a call and return nullptr.  Otherwise, return a `ClientHook` to which
    // the call should be forwarded; the caller should then delegate the call to that `ClientHook`.
    //
    // The main case where this ends up returning non-null is if `cap` is a promise that has
    // recently resolved.  The application might have started building a request before the promise
    // resolved, and so the request may have been built on the assumption that it would be sent over
    // this network connection, but then the promise resolved to point somewhere else before the
    // request was sent.  Now the request has to be redirected to the new target instead.

    if (cap.getBrand() == this) {
953
      return kj::downcast<RpcClient>(cap).writeTarget(target);
Kenton Varda's avatar
Kenton Varda committed
954 955 956 957 958
    } else {
      return cap.addRef();
    }
  }

959 960
  kj::Own<ClientHook> getInnermostClient(ClientHook& client) {
    ClientHook* ptr = &client;
Kenton Varda's avatar
Kenton Varda committed
961 962 963 964 965 966 967 968 969
    for (;;) {
      KJ_IF_MAYBE(inner, ptr->getResolved()) {
        ptr = inner;
      } else {
        break;
      }
    }

    if (ptr->getBrand() == this) {
970
      return kj::downcast<RpcClient>(*ptr).getInnermostClient();
Kenton Varda's avatar
Kenton Varda committed
971 972 973 974 975 976
    } else {
      return ptr->addRef();
    }
  }

  kj::Promise<void> resolveExportedPromise(
977
      ExportId exportId, kj::Promise<kj::Own<ClientHook>>&& promise) {
Kenton Varda's avatar
Kenton Varda committed
978 979 980 981
    // Implements exporting of a promise.  The promise has been exported under the given ID, and is
    // to eventually resolve to the ClientHook produced by `promise`.  This method waits for that
    // resolve to happen and then sends the appropriate `Resolve` message to the peer.

982
    return promise.then(
983
        [this,exportId](kj::Own<ClientHook>&& resolution) -> kj::Promise<void> {
Kenton Varda's avatar
Kenton Varda committed
984 985
      // Successful resolution.

986 987 988 989 990
      KJ_ASSERT(connection.is<Connected>(),
                "Resolving export should have been canceled on disconnect.") {
        return kj::READY_NOW;
      }

Kenton Varda's avatar
Kenton Varda committed
991 992 993 994 995 996 997 998 999 1000 1001
      // Get the innermost ClientHook backing the resolved client.  This includes traversing
      // PromiseClients that haven't resolved yet to their underlying ImportClient or
      // PipelineClient, so that we get a remote promise that might resolve later.  This is
      // important to make sure that if the peer sends a `Disembargo` back to us, it bounces back
      // correctly instead of going to the result of some future resolution.  See the documentation
      // for `Disembargo` in `rpc.capnp`.
      resolution = getInnermostClient(*resolution);

      // Update the export table to point at this object instead.  We know that our entry in the
      // export table is still live because when it is destroyed the asynchronous resolution task
      // (i.e. this code) is canceled.
1002 1003
      auto& exp = KJ_ASSERT_NONNULL(exports.find(exportId));
      exportsByCap.erase(exp.clientHook);
Kenton Varda's avatar
Kenton Varda committed
1004 1005
      exp.clientHook = kj::mv(resolution);

1006
      if (exp.clientHook->getBrand() != this) {
Kenton Varda's avatar
Kenton Varda committed
1007 1008 1009
        // We're resolving to a local capability.  If we're resolving to a promise, we might be
        // able to reuse our export table entry and avoid sending a message.

1010
        KJ_IF_MAYBE(promise, exp.clientHook->whenMoreResolved()) {
Kenton Varda's avatar
Kenton Varda committed
1011 1012 1013 1014
          // We're replacing a promise with another local promise.  In this case, we might actually
          // be able to just reuse the existing export table entry to represent the new promise --
          // unless it already has an entry.  Let's check.

1015
          auto insertResult = exportsByCap.insert(std::make_pair(exp.clientHook.get(), exportId));
Kenton Varda's avatar
Kenton Varda committed
1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026

          if (insertResult.second) {
            // The new promise was not already in the table, therefore the existing export table
            // entry has now been repurposed to represent it.  There is no need to send a resolve
            // message at all.  We do, however, have to start resolving the next promise.
            return resolveExportedPromise(exportId, kj::mv(*promise));
          }
        }
      }

      // OK, we have to send a `Resolve` message.
1027
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1028 1029 1030
          messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
1031
      writeDescriptor(*exp.clientHook, resolve.initCap());
Kenton Varda's avatar
Kenton Varda committed
1032 1033 1034 1035 1036
      message->send();

      return kj::READY_NOW;
    }, [this,exportId](kj::Exception&& exception) {
      // send error resolution
1037
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1038 1039 1040 1041 1042
          messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
      fromException(exception, resolve.initException());
      message->send();
1043 1044 1045
    }).eagerlyEvaluate([this](kj::Exception&& exception) {
      // Put the exception on the TaskSet which will cause the connection to be terminated.
      tasks.add(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1046 1047 1048
    });
  }

Kenton Varda's avatar
Kenton Varda committed
1049
  // =====================================================================================
1050
  // Interpreting CapDescriptor
Kenton Varda's avatar
Kenton Varda committed
1051

1052
  kj::Own<ClientHook> import(ImportId importId, bool isPromise) {
1053
    // Receive a new import.
Kenton Varda's avatar
Kenton Varda committed
1054

1055 1056
    auto& import = imports[importId];
    kj::Own<ImportClient> importClient;
Kenton Varda's avatar
Kenton Varda committed
1057

1058 1059 1060 1061 1062 1063
    // Create the ImportClient, or if one already exists, use it.
    KJ_IF_MAYBE(c, import.importClient) {
      importClient = kj::addRef(*c);
    } else {
      importClient = kj::refcounted<ImportClient>(*this, importId);
      import.importClient = *importClient;
Kenton Varda's avatar
Kenton Varda committed
1064
    }
1065

1066 1067
    // We just received a copy of this import ID, so the remote refcount has gone up.
    importClient->addRemoteRef();
Kenton Varda's avatar
Kenton Varda committed
1068

1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
    if (isPromise) {
      // We need to construct a PromiseClient around this import, if we haven't already.
      KJ_IF_MAYBE(c, import.appClient) {
        // Use the existing one.
        return kj::addRef(*c);
      } else {
        // Create a promise for this import's resolution.
        auto paf = kj::newPromiseAndFulfiller<kj::Own<ClientHook>>();
        import.promiseFulfiller = kj::mv(paf.fulfiller);

        // Make sure the import is not destroyed while this promise exists.
        paf.promise = paf.promise.attach(kj::addRef(*importClient));

        // Create a PromiseClient around it and return it.
        auto result = kj::refcounted<PromiseClient>(
            *this, kj::mv(importClient), kj::mv(paf.promise), importId);
        import.appClient = *result;
        return kj::mv(result);
1087
      }
1088 1089 1090
    } else {
      import.appClient = *importClient;
      return kj::mv(importClient);
1091
    }
1092
  }
1093

1094
  kj::Maybe<kj::Own<ClientHook>> receiveCap(rpc::CapDescriptor::Reader descriptor) {
1095 1096
    switch (descriptor.which()) {
      case rpc::CapDescriptor::NONE:
1097
        return nullptr;
1098

1099 1100 1101 1102
      case rpc::CapDescriptor::SENDER_HOSTED:
        return import(descriptor.getSenderHosted(), false);
      case rpc::CapDescriptor::SENDER_PROMISE:
        return import(descriptor.getSenderPromise(), true);
1103

1104 1105 1106 1107
      case rpc::CapDescriptor::RECEIVER_HOSTED:
        KJ_IF_MAYBE(exp, exports.find(descriptor.getReceiverHosted())) {
          return exp->clientHook->addRef();
        } else {
1108 1109 1110
          return newBrokenCap("invalid 'receiverHosted' export ID");
        }

1111 1112
      case rpc::CapDescriptor::RECEIVER_ANSWER: {
        auto promisedAnswer = descriptor.getReceiverAnswer();
1113

1114 1115 1116 1117 1118 1119 1120
        KJ_IF_MAYBE(answer, answers.find(promisedAnswer.getQuestionId())) {
          if (answer->active) {
            KJ_IF_MAYBE(pipeline, answer->pipeline) {
              KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
                return pipeline->get()->getPipelinedCap(*ops);
              } else {
                return newBrokenCap("unrecognized pipeline ops");
Kenton Varda's avatar
Kenton Varda committed
1121 1122
              }
            }
1123
          }
1124 1125
        }

1126
        return newBrokenCap("invalid 'receiverAnswer'");
1127 1128
      }

1129 1130 1131
      case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
        // We don't support third-party caps, so use the vine instead.
        return import(descriptor.getThirdPartyHosted().getVineId(), false);
Kenton Varda's avatar
Kenton Varda committed
1132

1133 1134 1135
      default:
        KJ_FAIL_REQUIRE("unknown CapDescriptor type") { break; }
        return newBrokenCap("unknown CapDescriptor type");
Kenton Varda's avatar
Kenton Varda committed
1136
    }
1137
  }
1138

1139 1140
  kj::Array<kj::Maybe<kj::Own<ClientHook>>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable) {
    auto result = kj::heapArrayBuilder<kj::Maybe<kj::Own<ClientHook>>>(capTable.size());
1141 1142
    for (auto cap: capTable) {
      result.add(receiveCap(cap));
Kenton Varda's avatar
Kenton Varda committed
1143
    }
1144 1145
    return result.finish();
  }
1146 1147

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
1148
  // RequestHook/PipelineHook/ResponseHook implementations
1149

Kenton Varda's avatar
Kenton Varda committed
1150 1151 1152 1153
  class QuestionRef: public kj::Refcounted {
    // A reference to an entry on the question table.  Used to detect when the `Finish` message
    // can be sent.

Kenton Varda's avatar
Kenton Varda committed
1154
  public:
1155
    inline QuestionRef(
1156 1157 1158
        RpcConnectionState& connectionState, QuestionId id,
        kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller)
        : connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1159 1160

    ~QuestionRef() {
1161
      unwindDetector.catchExceptionsIfUnwinding([&]() {
1162
        // Send the "Finish" message (if the connection is not already broken).
1163 1164
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1165
              messageSizeHint<rpc::Finish>());
1166 1167
          auto builder = message->getBody().getAs<rpc::Message>().initFinish();
          builder.setQuestionId(id);
1168
          builder.setReleaseResultCaps(false);
1169
          message->send();
1170
        }
Kenton Varda's avatar
Kenton Varda committed
1171

1172 1173 1174 1175 1176
        // Check if the question has returned and, if so, remove it from the table.
        // Remove question ID from the table.  Must do this *after* sending `Finish` to ensure that
        // the ID is not re-allocated before the `Finish` message can be sent.
        auto& question = KJ_ASSERT_NONNULL(
            connectionState->questions.find(id), "Question ID no longer on table?");
1177 1178
        if (question.isAwaitingReturn) {
          // Still waiting for return, so just remove the QuestionRef pointer from the table.
1179
          question.selfRef = nullptr;
1180 1181 1182
        } else {
          // Call has already returned, so we can now remove it from the table.
          connectionState->questions.erase(id, question);
1183 1184
        }
      });
Kenton Varda's avatar
Kenton Varda committed
1185 1186 1187 1188
    }

    inline QuestionId getId() const { return id; }

1189
    void fulfill(kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1190 1191 1192
      fulfiller->fulfill(kj::mv(response));
    }

1193
    void fulfill(kj::Promise<kj::Own<RpcResponse>>&& promise) {
1194 1195 1196
      fulfiller->fulfill(kj::mv(promise));
    }

Kenton Varda's avatar
Kenton Varda committed
1197 1198
    void reject(kj::Exception&& exception) {
      fulfiller->reject(kj::mv(exception));
1199
    }
Kenton Varda's avatar
Kenton Varda committed
1200 1201

  private:
1202
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1203
    QuestionId id;
1204
    kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller;
1205
    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
1206 1207
  };

Kenton Varda's avatar
Kenton Varda committed
1208
  class RpcRequest final: public RequestHook {
1209
  public:
1210 1211
    RpcRequest(RpcConnectionState& connectionState, VatNetworkBase::Connection& connection,
               kj::Maybe<MessageSize> sizeHint, kj::Own<RpcClient>&& target)
1212
        : connectionState(kj::addRef(connectionState)),
Kenton Varda's avatar
Kenton Varda committed
1213
          target(kj::mv(target)),
1214
          message(connection.newOutgoingMessage(
1215 1216
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Call>() +
                  sizeInWords<rpc::Payload>() + MESSAGE_TARGET_SIZE_HINT))),
Kenton Varda's avatar
Kenton Varda committed
1217
          callBuilder(message->getBody().getAs<rpc::Message>().initCall()),
1218
          paramsBuilder(callBuilder.getParams().getContent()) {}
Kenton Varda's avatar
Kenton Varda committed
1219

1220
    inline AnyPointer::Builder getRoot() {
Kenton Varda's avatar
Kenton Varda committed
1221 1222
      return paramsBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1223 1224 1225
    inline rpc::Call::Builder getCall() {
      return callBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1226

1227
    RemotePromise<AnyPointer> send() override {
1228
      if (!connectionState->connection.is<Connected>()) {
1229
        // Connection is broken.
1230
        const kj::Exception& e = connectionState->connection.get<Disconnected>();
1231
        return RemotePromise<AnyPointer>(
1232 1233
            kj::Promise<Response<AnyPointer>>(kj::cp(e)),
            AnyPointer::Pipeline(newBrokenPipeline(kj::cp(e))));
1234
      }
Kenton Varda's avatar
Kenton Varda committed
1235

1236 1237 1238
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // We'll have to make a new request and do a copy.  Ick.
Kenton Varda's avatar
Kenton Varda committed
1239

1240
        auto replacement = redirect->get()->newCall(
1241
            callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize());
1242 1243 1244
        replacement.set(paramsBuilder);
        return replacement.send();
      } else {
1245
        auto sendResult = sendInternal(false);
Kenton Varda's avatar
Kenton Varda committed
1246

1247
        auto forkedPromise = sendResult.promise.fork();
1248

1249 1250 1251
        // The pipeline must get notified of resolution before the app does to maintain ordering.
        auto pipeline = kj::refcounted<RpcPipeline>(
            *connectionState, kj::mv(sendResult.questionRef), forkedPromise.addBranch());
Kenton Varda's avatar
Kenton Varda committed
1252

1253 1254 1255 1256 1257
        auto appPromise = forkedPromise.addBranch().then(
            [=](kj::Own<RpcResponse>&& response) {
              auto reader = response->getResults();
              return Response<AnyPointer>(reader, kj::mv(response));
            });
Kenton Varda's avatar
Kenton Varda committed
1258

1259 1260 1261 1262
        return RemotePromise<AnyPointer>(
            kj::mv(appPromise),
            AnyPointer::Pipeline(kj::mv(pipeline)));
      }
Kenton Varda's avatar
Kenton Varda committed
1263 1264
    }

1265 1266 1267
    struct TailInfo {
      QuestionId questionId;
      kj::Promise<void> promise;
1268
      kj::Own<PipelineHook> pipeline;
1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
    };

    kj::Maybe<TailInfo> tailSend() {
      // Send the request as a tail call.
      //
      // Returns null if for some reason a tail call is not possible and the caller should fall
      // back to using send() and copying the response.

      SendInternalResult sendResult;

1279
      if (!connectionState->connection.is<Connected>()) {
1280 1281 1282
        // Disconnected; fall back to a regular send() which will fail appropriately.
        return nullptr;
      }
1283

1284 1285 1286 1287 1288 1289
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // Fall back to regular send().
        return nullptr;
      } else {
        sendResult = sendInternal(true);
1290 1291
      }

1292
      auto promise = sendResult.promise.then([](kj::Own<RpcResponse>&& response) {
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
        // Response should be null if `Return` handling code is correct.
        KJ_ASSERT(!response) { break; }
      });

      QuestionId questionId = sendResult.questionRef->getId();

      auto pipeline = kj::refcounted<RpcPipeline>(*connectionState, kj::mv(sendResult.questionRef));

      return TailInfo { questionId, kj::mv(promise), kj::mv(pipeline) };
    }

1304
    const void* getBrand() override {
1305 1306 1307
      return connectionState.get();
    }

Kenton Varda's avatar
Kenton Varda committed
1308
  private:
1309
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1310

1311
    kj::Own<RpcClient> target;
Kenton Varda's avatar
Kenton Varda committed
1312 1313
    kj::Own<OutgoingRpcMessage> message;
    rpc::Call::Builder callBuilder;
1314
    AnyPointer::Builder paramsBuilder;
1315 1316 1317

    struct SendInternalResult {
      kj::Own<QuestionRef> questionRef;
1318
      kj::Promise<kj::Own<RpcResponse>> promise = nullptr;
1319 1320
    };

1321
    SendInternalResult sendInternal(bool isTailCall) {
1322 1323
      // Build the cap table.
      auto exports = connectionState->writeDescriptors(
1324
          message->getCapTable(), callBuilder.getParams());
1325

1326
      // Init the question table.  Do this after writing descriptors to avoid interference.
1327
      QuestionId questionId;
1328
      auto& question = connectionState->questions.next(questionId);
1329 1330 1331
      question.isAwaitingReturn = true;
      question.paramExports = kj::mv(exports);
      question.isTailCall = isTailCall;
1332

1333
      // Finish and send.
1334 1335 1336 1337
      callBuilder.setQuestionId(questionId);
      if (isTailCall) {
        callBuilder.getSendResultsTo().setYourself();
      }
1338
      message->send();
1339

1340
      // Make the result promise.
1341
      SendInternalResult result;
1342
      auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
1343
      result.questionRef = kj::refcounted<QuestionRef>(
1344
          *connectionState, questionId, kj::mv(paf.fulfiller));
1345
      question.selfRef = *result.questionRef;
1346
      result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
1347

1348
      // Send and return.
1349 1350
      return kj::mv(result);
    }
Kenton Varda's avatar
Kenton Varda committed
1351 1352
  };

Kenton Varda's avatar
Kenton Varda committed
1353
  class RpcPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1354
  public:
1355 1356
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef,
                kj::Promise<kj::Own<RpcResponse>>&& redirectLaterParam)
1357
        : connectionState(kj::addRef(connectionState)),
1358 1359 1360
          redirectLater(redirectLaterParam.fork()),
          resolveSelfPromise(KJ_ASSERT_NONNULL(redirectLater).addBranch().then(
              [this](kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1361
                resolve(kj::mv(response));
1362
              }, [this](kj::Exception&& exception) {
Kenton Varda's avatar
Kenton Varda committed
1363
                resolve(kj::mv(exception));
1364 1365 1366 1367
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
1368 1369 1370
              })) {
      // Construct a new RpcPipeline.

1371
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1372
    }
Kenton Varda's avatar
Kenton Varda committed
1373

1374
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef)
1375 1376 1377 1378
        : connectionState(kj::addRef(connectionState)),
          resolveSelfPromise(nullptr) {
      // Construct a new RpcPipeline that is never expected to resolve.

1379
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1380
    }
Kenton Varda's avatar
Kenton Varda committed
1381 1382 1383

    // implements PipelineHook ---------------------------------------

1384
    kj::Own<PipelineHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1385 1386 1387
      return kj::addRef(*this);
    }

1388
    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
Kenton Varda's avatar
Kenton Varda committed
1389 1390 1391 1392 1393 1394 1395
      auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
      for (auto& op: ops) {
        copy.add(op);
      }
      return getPipelinedCap(copy.finish());
    }

1396
    kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override {
1397
      if (state.is<Waiting>()) {
1398 1399
        // Wrap a PipelineClient in a PromiseClient.
        auto pipelineClient = kj::refcounted<PipelineClient>(
1400
            *connectionState, kj::addRef(*state.get<Waiting>()), kj::heapArray(ops.asPtr()));
1401

1402
        KJ_IF_MAYBE(r, redirectLater) {
1403 1404 1405 1406
          auto resolutionPromise = r->addBranch().then(kj::mvCapture(ops,
              [](kj::Array<PipelineOp> ops, kj::Own<RpcResponse>&& response) {
                return response->getResults().getPipelinedCap(ops);
              }));
1407

1408 1409 1410 1411 1412 1413
          return kj::refcounted<PromiseClient>(
              *connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise), nullptr);
        } else {
          // Oh, this pipeline will never get redirected, so just return the PipelineClient.
          return kj::mv(pipelineClient);
        }
1414 1415
      } else if (state.is<Resolved>()) {
        return state.get<Resolved>()->getResults().getPipelinedCap(ops);
Kenton Varda's avatar
Kenton Varda committed
1416
      } else {
1417
        return newBrokenCap(kj::cp(state.get<Broken>()));
Kenton Varda's avatar
Kenton Varda committed
1418
      }
Kenton Varda's avatar
Kenton Varda committed
1419 1420 1421
    }

  private:
1422 1423
    kj::Own<RpcConnectionState> connectionState;
    kj::Maybe<kj::ForkedPromise<kj::Own<RpcResponse>>> redirectLater;
Kenton Varda's avatar
Kenton Varda committed
1424

1425 1426
    typedef kj::Own<QuestionRef> Waiting;
    typedef kj::Own<RpcResponse> Resolved;
Kenton Varda's avatar
Kenton Varda committed
1427
    typedef kj::Exception Broken;
1428
    kj::OneOf<Waiting, Resolved, Broken> state;
Kenton Varda's avatar
Kenton Varda committed
1429 1430 1431 1432 1433

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

1434
    void resolve(kj::Own<RpcResponse>&& response) {
1435 1436
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Resolved>(kj::mv(response));
Kenton Varda's avatar
Kenton Varda committed
1437 1438 1439
    }

    void resolve(const kj::Exception&& exception) {
1440 1441
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Broken>(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1442
    }
Kenton Varda's avatar
Kenton Varda committed
1443 1444
  };

1445 1446
  class RpcResponse: public ResponseHook {
  public:
1447
    virtual AnyPointer::Reader getResults() = 0;
1448
    virtual kj::Own<RpcResponse> addRef() = 0;
1449 1450 1451
  };

  class RpcResponseImpl final: public RpcResponse, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1452
  public:
1453
    RpcResponseImpl(RpcConnectionState& connectionState,
1454 1455
                    kj::Own<QuestionRef>&& questionRef,
                    kj::Own<IncomingRpcMessage>&& message,
1456
                    AnyPointer::Reader results)
1457 1458
        : connectionState(kj::addRef(connectionState)),
          message(kj::mv(message)),
1459
          reader(results),
Kenton Varda's avatar
Kenton Varda committed
1460
          questionRef(kj::mv(questionRef)) {}
Kenton Varda's avatar
Kenton Varda committed
1461

1462
    AnyPointer::Reader getResults() override {
Kenton Varda's avatar
Kenton Varda committed
1463 1464 1465
      return reader;
    }

1466
    kj::Own<RpcResponse> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1467 1468 1469
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
1470
  private:
1471
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1472
    kj::Own<IncomingRpcMessage> message;
1473
    AnyPointer::Reader reader;
1474
    kj::Own<QuestionRef> questionRef;
Kenton Varda's avatar
Kenton Varda committed
1475 1476 1477 1478 1479 1480
  };

  // =====================================================================================
  // CallContextHook implementation

  class RpcServerResponse {
Kenton Varda's avatar
Kenton Varda committed
1481
  public:
1482
    virtual AnyPointer::Builder getResultsBuilder() = 0;
1483 1484 1485 1486
  };

  class RpcServerResponseImpl final: public RpcServerResponse {
  public:
1487
    RpcServerResponseImpl(RpcConnectionState& connectionState,
1488
                          kj::Own<OutgoingRpcMessage>&& message,
1489 1490 1491
                          rpc::Payload::Builder payload)
        : connectionState(connectionState),
          message(kj::mv(message)),
1492
          payload(payload) {}
Kenton Varda's avatar
Kenton Varda committed
1493

1494
    AnyPointer::Builder getResultsBuilder() override {
1495
      return payload.getContent();
Kenton Varda's avatar
Kenton Varda committed
1496 1497
    }

1498 1499 1500 1501 1502
    kj::Maybe<kj::Array<ExportId>> send() {
      // Send the response and return the export list.  Returns nullptr if there were no caps.
      // (Could return a non-null empty array if there were caps but none of them were exports.)

      // Build the cap table.
1503
      auto capTable = message->getCapTable();
1504 1505
      auto exports = connectionState.writeDescriptors(capTable, payload);

Kenton Varda's avatar
Kenton Varda committed
1506
      message->send();
1507 1508 1509 1510 1511
      if (capTable.size() == 0) {
        return nullptr;
      } else {
        return kj::mv(exports);
      }
Kenton Varda's avatar
Kenton Varda committed
1512 1513 1514
    }

  private:
1515
    RpcConnectionState& connectionState;
Kenton Varda's avatar
Kenton Varda committed
1516
    kj::Own<OutgoingRpcMessage> message;
1517
    rpc::Payload::Builder payload;
Kenton Varda's avatar
Kenton Varda committed
1518 1519
  };

1520 1521 1522
  class LocallyRedirectedRpcResponse final
      : public RpcResponse, public RpcServerResponse, public kj::Refcounted{
  public:
1523
    LocallyRedirectedRpcResponse(kj::Maybe<MessageSize> sizeHint)
1524 1525
        : message(sizeHint.map([](MessageSize size) { return size.wordCount; })
                          .orDefault(SUGGESTED_FIRST_SEGMENT_WORDS)) {}
1526

1527
    AnyPointer::Builder getResultsBuilder() override {
1528
      return message.getRoot<AnyPointer>();
1529 1530
    }

1531
    AnyPointer::Reader getResults() override {
1532
      return message.getRoot<AnyPointer>();
1533 1534
    }

1535
    kj::Own<RpcResponse> addRef() override {
1536 1537 1538 1539
      return kj::addRef(*this);
    }

  private:
1540
    MallocMessageBuilder message;
1541 1542
  };

Kenton Varda's avatar
Kenton Varda committed
1543 1544
  class RpcCallContext final: public CallContextHook, public kj::Refcounted {
  public:
1545
    RpcCallContext(RpcConnectionState& connectionState, AnswerId answerId,
1546
                   kj::Own<IncomingRpcMessage>&& request, const AnyPointer::Reader& params,
1547
                   bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller)
1548
        : connectionState(kj::addRef(connectionState)),
1549
          answerId(answerId),
Kenton Varda's avatar
Kenton Varda committed
1550
          request(kj::mv(request)),
1551
          params(params),
1552
          returnMessage(nullptr),
Kenton Varda's avatar
Kenton Varda committed
1553 1554
          redirectResults(redirectResults),
          cancelFulfiller(kj::mv(cancelFulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1555

1556 1557 1558 1559
    ~RpcCallContext() noexcept(false) {
      if (isFirstResponder()) {
        // We haven't sent a return yet, so we must have been canceled.  Send a cancellation return.
        unwindDetector.catchExceptionsIfUnwinding([&]() {
Kenton Varda's avatar
Kenton Varda committed
1560
          // Don't send anything if the connection is broken.
1561 1562
          if (connectionState->connection.is<Connected>()) {
            auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1563
                messageSizeHint<rpc::Return>() + sizeInWords<rpc::Payload>());
Kenton Varda's avatar
Kenton Varda committed
1564
            auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1565

1566
            builder.setAnswerId(answerId);
1567
            builder.setReleaseParamCaps(false);
1568

Kenton Varda's avatar
Kenton Varda committed
1569 1570 1571 1572 1573 1574 1575 1576 1577
            if (redirectResults) {
              // The reason we haven't sent a return is because the results were sent somewhere
              // else.
              builder.setResultsSentElsewhere();
            } else {
              builder.setCanceled();
            }

            message->send();
1578
          }
1579

1580
          cleanupAnswerTable(nullptr, true);
1581 1582 1583 1584
        });
      }
    }

1585
    kj::Own<RpcResponse> consumeRedirectedResponse() {
1586 1587
      KJ_ASSERT(redirectResults);

1588
      if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
1589 1590 1591 1592 1593 1594

      // Note that the context needs to keep its own reference to the response so that it doesn't
      // get GC'd until the PipelineHook drops its reference to the context.
      return kj::downcast<LocallyRedirectedRpcResponse>(*KJ_ASSERT_NONNULL(response)).addRef();
    }

Kenton Varda's avatar
Kenton Varda committed
1595
    void sendReturn() {
1596
      KJ_ASSERT(!redirectResults);
1597 1598 1599 1600

      // Avoid sending results if canceled so that we don't have to figure out whether or not
      // `releaseResultCaps` was set in the already-received `Finish`.
      if (!(cancellationFlags & CANCEL_REQUESTED) && isFirstResponder()) {
1601 1602 1603 1604 1605
        KJ_ASSERT(connectionState->connection.is<Connected>(),
                  "Cancellation should have been requested on disconnect.") {
          return;
        }

1606
        if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
Kenton Varda's avatar
Kenton Varda committed
1607

1608
        returnMessage.setAnswerId(answerId);
1609
        returnMessage.setReleaseParamCaps(false);
Kenton Varda's avatar
Kenton Varda committed
1610

1611 1612 1613 1614 1615 1616 1617 1618
        auto exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send();
        KJ_IF_MAYBE(e, exports) {
          // Caps were returned, so we can't free the pipeline yet.
          cleanupAnswerTable(kj::mv(*e), false);
        } else {
          // No caps in the results, therefore the pipeline is irrelevant.
          cleanupAnswerTable(nullptr, true);
        }
Kenton Varda's avatar
Kenton Varda committed
1619 1620 1621
      }
    }
    void sendErrorReturn(kj::Exception&& exception) {
1622
      KJ_ASSERT(!redirectResults);
Kenton Varda's avatar
Kenton Varda committed
1623
      if (isFirstResponder()) {
1624 1625 1626 1627
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
              messageSizeHint<rpc::Return>() + exceptionSizeHint(exception));
          auto builder = message->getBody().initAs<rpc::Message>().initReturn();
Kenton Varda's avatar
Kenton Varda committed
1628

1629 1630 1631
          builder.setAnswerId(answerId);
          builder.setReleaseParamCaps(false);
          fromException(exception, builder.initException());
Kenton Varda's avatar
Kenton Varda committed
1632

1633 1634
          message->send();
        }
1635 1636 1637 1638

        // Do not allow releasing the pipeline because we want pipelined calls to propagate the
        // exception rather than fail with a "no such field" exception.
        cleanupAnswerTable(nullptr, false);
Kenton Varda's avatar
Kenton Varda committed
1639 1640 1641
      }
    }

1642
    void requestCancel() {
Kenton Varda's avatar
Kenton Varda committed
1643 1644 1645 1646 1647 1648
      // Hints that the caller wishes to cancel this call.  At the next time when cancellation is
      // deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes
      // safe, the RpcCallContext will send a normal return when the call completes.  Either way
      // the RpcCallContext is now responsible for cleaning up the entry in the answer table, since
      // a Finish message was already received.

1649 1650 1651 1652
      bool previouslyAllowedButNotRequested = cancellationFlags == CANCEL_ALLOWED;
      cancellationFlags |= CANCEL_REQUESTED;

      if (previouslyAllowedButNotRequested) {
Kenton Varda's avatar
Kenton Varda committed
1653
        // We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously.  Initiate
Kenton Varda's avatar
Kenton Varda committed
1654
        // the cancellation.
Kenton Varda's avatar
Kenton Varda committed
1655
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1656
      }
Kenton Varda's avatar
Kenton Varda committed
1657
    }
1658 1659 1660

    // implements CallContextHook ------------------------------------

1661
    AnyPointer::Reader getParams() override {
1662 1663 1664 1665 1666 1667
      KJ_REQUIRE(request != nullptr, "Can't call getParams() after releaseParams().");
      return params;
    }
    void releaseParams() override {
      request = nullptr;
    }
1668
    AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
1669
      KJ_IF_MAYBE(r, response) {
1670
        return r->get()->getResultsBuilder();
1671
      } else {
1672 1673
        kj::Own<RpcServerResponse> response;

1674
        if (redirectResults || !connectionState->connection.is<Connected>()) {
1675
          response = kj::refcounted<LocallyRedirectedRpcResponse>(sizeHint);
1676
        } else {
1677
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1678 1679
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Return>() +
                               sizeInWords<rpc::Payload>()));
1680 1681 1682 1683 1684 1685
          returnMessage = message->getBody().initAs<rpc::Message>().initReturn();
          response = kj::heap<RpcServerResponseImpl>(
              *connectionState, kj::mv(message), returnMessage.getResults());
        }

        auto results = response->getResultsBuilder();
Kenton Varda's avatar
Kenton Varda committed
1686 1687
        this->response = kj::mv(response);
        return results;
1688 1689
      }
    }
1690 1691 1692
    kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
      auto result = directTailCall(kj::mv(request));
      KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
1693
        f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
1694 1695 1696 1697
      }
      return kj::mv(result.promise);
    }
    ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
1698 1699 1700
      KJ_REQUIRE(response == nullptr,
                 "Can't call tailCall() after initializing the results struct.");

1701
      if (request->getBrand() == connectionState.get() && !redirectResults) {
1702 1703 1704 1705 1706
        // The tail call is headed towards the peer that called us in the first place, so we can
        // optimize out the return trip.

        KJ_IF_MAYBE(tailInfo, kj::downcast<RpcRequest>(*request).tailSend()) {
          if (isFirstResponder()) {
1707 1708 1709 1710
            if (connectionState->connection.is<Connected>()) {
              auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
                  messageSizeHint<rpc::Return>());
              auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1711

1712 1713 1714
              builder.setAnswerId(answerId);
              builder.setReleaseParamCaps(false);
              builder.setTakeFromOtherQuestion(tailInfo->questionId);
1715

1716 1717
              message->send();
            }
1718 1719 1720

            // There are no caps in our return message, but of course the tail results could have
            // caps, so we must continue to honor pipeline calls (and just bounce them back).
1721
            cleanupAnswerTable(nullptr, false);
1722
          }
1723
          return { kj::mv(tailInfo->promise), kj::mv(tailInfo->pipeline) };
1724 1725 1726 1727 1728 1729 1730
        }
      }

      // Just forwarding to another local call.
      auto promise = request->send();

      // Wait for response.
1731
      auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
1732 1733 1734
        // Copy the response.
        // TODO(perf):  It would be nice if we could somehow make the response get built in-place
        //   but requires some refactoring.
1735
        getResults(tailResponse.targetSize()).set(tailResponse);
1736
      });
1737 1738

      return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
1739
    }
1740 1741
    kj::Promise<AnyPointer::Pipeline> onTailCall() override {
      auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
1742 1743 1744
      tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
      return kj::mv(paf.promise);
    }
1745
    void allowCancellation() override {
1746 1747 1748 1749
      bool previouslyRequestedButNotAllowed = cancellationFlags == CANCEL_REQUESTED;
      cancellationFlags |= CANCEL_ALLOWED;

      if (previouslyRequestedButNotAllowed) {
Kenton Varda's avatar
Kenton Varda committed
1750 1751 1752
        // We just set CANCEL_ALLOWED, and CANCEL_REQUESTED was already set previously.  Initiate
        // the cancellation.
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1753
      }
1754 1755 1756 1757 1758 1759
    }
    kj::Own<CallContextHook> addRef() override {
      return kj::addRef(*this);
    }

  private:
1760
    kj::Own<RpcConnectionState> connectionState;
1761
    AnswerId answerId;
1762

Kenton Varda's avatar
Kenton Varda committed
1763 1764 1765
    // Request ---------------------------------------------

    kj::Maybe<kj::Own<IncomingRpcMessage>> request;
1766
    AnyPointer::Reader params;
Kenton Varda's avatar
Kenton Varda committed
1767

Kenton Varda's avatar
Kenton Varda committed
1768 1769 1770
    // Response --------------------------------------------

    kj::Maybe<kj::Own<RpcServerResponse>> response;
Kenton Varda's avatar
Kenton Varda committed
1771
    rpc::Return::Builder returnMessage;
1772
    bool redirectResults = false;
Kenton Varda's avatar
Kenton Varda committed
1773
    bool responseSent = false;
1774
    kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1775 1776 1777 1778 1779 1780 1781 1782

    // Cancellation state ----------------------------------

    enum CancellationFlags {
      CANCEL_REQUESTED = 1,
      CANCEL_ALLOWED = 2
    };

1783
    uint8_t cancellationFlags = 0;
1784
    // When both flags are set, the cancellation process will begin.
Kenton Varda's avatar
Kenton Varda committed
1785

1786
    kj::Own<kj::PromiseFulfiller<void>> cancelFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1787 1788 1789
    // Fulfilled when cancellation has been both requested and permitted.  The fulfilled promise is
    // exclusive-joined with the outermost promise waiting on the call return, so fulfilling it
    // cancels that promise.
Kenton Varda's avatar
Kenton Varda committed
1790

1791 1792
    kj::UnwindDetector unwindDetector;

Kenton Varda's avatar
Kenton Varda committed
1793 1794 1795 1796 1797 1798 1799
    // -----------------------------------------------------

    bool isFirstResponder() {
      if (responseSent) {
        return false;
      } else {
        responseSent = true;
1800 1801 1802
        return true;
      }
    }
Kenton Varda's avatar
Kenton Varda committed
1803

1804
    void cleanupAnswerTable(kj::Array<ExportId> resultExports, bool shouldFreePipeline) {
1805 1806 1807
      // We need to remove the `callContext` pointer -- which points back to us -- from the
      // answer table.  Or we might even be responsible for removing the entire answer table
      // entry.
Kenton Varda's avatar
Kenton Varda committed
1808

1809
      if (cancellationFlags & CANCEL_REQUESTED) {
1810 1811 1812
        // Already received `Finish` so it's our job to erase the table entry. We shouldn't have
        // sent results if canceled, so we shouldn't have an export list to deal with.
        KJ_ASSERT(resultExports.size() == 0);
1813
        connectionState->answers.erase(answerId);
1814
      } else {
1815
        // We just have to null out callContext and set the exports.
1816
        auto& answer = connectionState->answers[answerId];
1817
        answer.callContext = nullptr;
1818 1819 1820 1821 1822 1823
        answer.resultExports = kj::mv(resultExports);

        if (shouldFreePipeline) {
          // We can free the pipeline early, because we know all pipeline calls are invalid (e.g.
          // because there are no caps in the result to receive pipeline requests).
          KJ_ASSERT(resultExports.size() == 0);
Kenton Varda's avatar
Kenton Varda committed
1824
          answer.pipeline = nullptr;
Kenton Varda's avatar
Kenton Varda committed
1825 1826 1827
        }
      }
    }
1828 1829
  };

Kenton Varda's avatar
Kenton Varda committed
1830 1831 1832
  // =====================================================================================
  // Message handling

1833
  kj::Promise<void> messageLoop() {
1834 1835 1836 1837 1838
    if (!connection.is<Connected>()) {
      return kj::READY_NOW;
    }

    return connection.get<Connected>()->receiveIncomingMessage().then(
1839
        [this](kj::Maybe<kj::Own<IncomingRpcMessage>>&& message) {
1840 1841
      KJ_IF_MAYBE(m, message) {
        handleMessage(kj::mv(*m));
1842
        return true;
1843 1844 1845 1846
      } else {
        disconnect(kj::Exception(
            kj::Exception::Nature::PRECONDITION, kj::Exception::Durability::PERMANENT,
            __FILE__, __LINE__, kj::str("Peer disconnected.")));
1847
        return false;
1848
      }
1849
    }).then([this](bool keepGoing) {
1850 1851 1852 1853
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
1854
      if (keepGoing) tasks.add(messageLoop());
1855
    });
1856 1857 1858 1859
  }

  void handleMessage(kj::Own<IncomingRpcMessage> message) {
    auto reader = message->getBody().getAs<rpc::Message>();
1860

1861 1862
    switch (reader.which()) {
      case rpc::Message::UNIMPLEMENTED:
Kenton Varda's avatar
Kenton Varda committed
1863
        handleUnimplemented(reader.getUnimplemented());
1864 1865 1866
        break;

      case rpc::Message::ABORT:
Kenton Varda's avatar
Kenton Varda committed
1867
        handleAbort(reader.getAbort());
1868 1869 1870
        break;

      case rpc::Message::CALL:
Kenton Varda's avatar
Kenton Varda committed
1871
        handleCall(kj::mv(message), reader.getCall());
1872 1873
        break;

Kenton Varda's avatar
Kenton Varda committed
1874
      case rpc::Message::RETURN:
Kenton Varda's avatar
Kenton Varda committed
1875
        handleReturn(kj::mv(message), reader.getReturn());
Kenton Varda's avatar
Kenton Varda committed
1876 1877 1878
        break;

      case rpc::Message::FINISH:
Kenton Varda's avatar
Kenton Varda committed
1879
        handleFinish(reader.getFinish());
Kenton Varda's avatar
Kenton Varda committed
1880 1881
        break;

Kenton Varda's avatar
Kenton Varda committed
1882
      case rpc::Message::RESOLVE:
1883
        handleResolve(reader.getResolve());
Kenton Varda's avatar
Kenton Varda committed
1884 1885 1886
        break;

      case rpc::Message::RELEASE:
1887
        handleRelease(reader.getRelease());
Kenton Varda's avatar
Kenton Varda committed
1888 1889
        break;

1890
      case rpc::Message::DISEMBARGO:
Kenton Varda's avatar
Kenton Varda committed
1891
        handleDisembargo(reader.getDisembargo());
1892 1893
        break;

1894 1895 1896 1897
      case rpc::Message::RESTORE:
        handleRestore(kj::mv(message), reader.getRestore());
        break;

1898
      default: {
1899 1900 1901 1902 1903 1904
        if (connection.is<Connected>()) {
          auto message = connection.get<Connected>()->newOutgoingMessage(
              firstSegmentSize(reader.totalSize(), messageSizeHint<void>()));
          message->getBody().initAs<rpc::Message>().setUnimplemented(reader);
          message->send();
        }
1905 1906 1907 1908 1909
        break;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
1910
  void handleUnimplemented(const rpc::Message::Reader& message) {
Kenton Varda's avatar
Kenton Varda committed
1911
    switch (message.which()) {
1912 1913 1914
      case rpc::Message::RESOLVE: {
        auto cap = message.getResolve().getCap();
        switch (cap.which()) {
1915 1916 1917
          case rpc::CapDescriptor::NONE:
            // Nothing to do (but this ought never to happen).
            break;
1918
          case rpc::CapDescriptor::SENDER_HOSTED:
1919
            releaseExport(cap.getSenderHosted(), 1);
1920 1921
            break;
          case rpc::CapDescriptor::SENDER_PROMISE:
1922
            releaseExport(cap.getSenderPromise(), 1);
1923 1924 1925 1926 1927 1928
            break;
          case rpc::CapDescriptor::RECEIVER_ANSWER:
          case rpc::CapDescriptor::RECEIVER_HOSTED:
            // Nothing to do.
            break;
          case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
1929
            releaseExport(cap.getThirdPartyHosted().getVineId(), 1);
1930 1931
            break;
        }
Kenton Varda's avatar
Kenton Varda committed
1932
        break;
1933
      }
Kenton Varda's avatar
Kenton Varda committed
1934 1935 1936 1937 1938

      default:
        KJ_FAIL_ASSERT("Peer did not implement required RPC message type.", (uint)message.which());
        break;
    }
1939 1940
  }

Kenton Varda's avatar
Kenton Varda committed
1941
  void handleAbort(const rpc::Exception::Reader& exception) {
1942 1943 1944
    kj::throwRecoverableException(toException(exception));
  }

1945 1946 1947
  // ---------------------------------------------------------------------------
  // Level 0

Kenton Varda's avatar
Kenton Varda committed
1948
  void handleCall(kj::Own<IncomingRpcMessage>&& message, const rpc::Call::Reader& call) {
1949
    kj::Own<ClientHook> capability;
1950

Kenton Varda's avatar
Kenton Varda committed
1951 1952 1953 1954 1955
    KJ_IF_MAYBE(t, getMessageTarget(call.getTarget())) {
      capability = kj::mv(*t);
    } else {
      // Exception already reported.
      return;
1956 1957
    }

1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969
    bool redirectResults;
    switch (call.getSendResultsTo().which()) {
      case rpc::Call::SendResultsTo::CALLER:
        redirectResults = false;
        break;
      case rpc::Call::SendResultsTo::YOURSELF:
        redirectResults = true;
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported `Call.sendResultsTo`.") { return; }
    }

1970
    auto payload = call.getParams();
1971
    message->initCapTable(receiveCaps(payload.getCapTable()));
Kenton Varda's avatar
Kenton Varda committed
1972 1973
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

1974
    AnswerId answerId = call.getQuestionId();
Kenton Varda's avatar
Kenton Varda committed
1975

1976
    auto context = kj::refcounted<RpcCallContext>(
1977
        *this, answerId, kj::mv(message), payload.getContent(),
1978
        redirectResults, kj::mv(cancelPaf.fulfiller));
1979

1980
    // No more using `call` after this point, as it now belongs to the context.
1981 1982

    {
1983
      auto& answer = answers[answerId];
1984 1985 1986 1987 1988 1989

      KJ_REQUIRE(!answer.active, "questionId is already in use") {
        return;
      }

      answer.active = true;
Kenton Varda's avatar
Kenton Varda committed
1990
      answer.callContext = *context;
Kenton Varda's avatar
Kenton Varda committed
1991 1992 1993 1994 1995 1996 1997 1998 1999
    }

    auto promiseAndPipeline = capability->call(
        call.getInterfaceId(), call.getMethodId(), context->addRef());

    // Things may have changed -- in particular if call() immediately called
    // context->directTailCall().

    {
2000
      auto& answer = answers[answerId];
Kenton Varda's avatar
Kenton Varda committed
2001

2002 2003
      answer.pipeline = kj::mv(promiseAndPipeline.pipeline);

2004
      if (redirectResults) {
Kenton Varda's avatar
Kenton Varda committed
2005
        auto resultsPromise = promiseAndPipeline.promise.then(
2006 2007 2008
            kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) {
              return context->consumeRedirectedResponse();
            }));
Kenton Varda's avatar
Kenton Varda committed
2009 2010

        // If the call that later picks up `redirectedResults` decides to discard it, we need to
2011
        // make sure our call is not itself canceled unless it has called allowCancellation().
Kenton Varda's avatar
Kenton Varda committed
2012 2013
        // So we fork the promise and join one branch with the cancellation promise, in order to
        // hold on to it.
2014
        auto forked = resultsPromise.fork();
Kenton Varda's avatar
Kenton Varda committed
2015 2016
        answer.redirectedResults = forked.addBranch();

2017 2018 2019
        cancelPaf.promise
            .exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}))
            .detach([](kj::Exception&&) {});
2020 2021 2022 2023 2024
      } else {
        // Hack:  Both the success and error continuations need to use the context.  We could
        //   refcount, but both will be destroyed at the same time anyway.
        RpcCallContext* contextPtr = context;

2025
        promiseAndPipeline.promise.then(
2026 2027 2028 2029 2030 2031 2032
            [contextPtr]() {
              contextPtr->sendReturn();
            }, [contextPtr](kj::Exception&& exception) {
              contextPtr->sendErrorReturn(kj::mv(exception));
            }).then([]() {}, [&](kj::Exception&& exception) {
              // Handle exceptions that occur in sendReturn()/sendErrorReturn().
              taskFailed(kj::mv(exception));
2033 2034 2035
            }).attach(kj::mv(context))
            .exclusiveJoin(kj::mv(cancelPaf.promise))
            .detach([](kj::Exception&&) {});
2036
      }
2037 2038 2039
    }
  }

2040
  kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) {
Kenton Varda's avatar
Kenton Varda committed
2041
    switch (target.which()) {
2042 2043
      case rpc::MessageTarget::IMPORTED_CAP: {
        KJ_IF_MAYBE(exp, exports.find(target.getImportedCap())) {
Kenton Varda's avatar
Kenton Varda committed
2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054
          return exp->clientHook->addRef();
        } else {
          KJ_FAIL_REQUIRE("Message target is not a current export ID.") {
            return nullptr;
          }
        }
        break;
      }

      case rpc::MessageTarget::PROMISED_ANSWER: {
        auto promisedAnswer = target.getPromisedAnswer();
2055
        kj::Own<PipelineHook> pipeline;
Kenton Varda's avatar
Kenton Varda committed
2056

2057 2058 2059 2060 2061 2062 2063 2064 2065
        auto& base = answers[promisedAnswer.getQuestionId()];
        KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") {
          return nullptr;
        }
        KJ_IF_MAYBE(p, base.pipeline) {
          pipeline = p->get()->addRef();
        } else {
          KJ_FAIL_REQUIRE("PromisedAnswer.questionId is already finished or contained no "
                          "capabilities.") {
Kenton Varda's avatar
Kenton Varda committed
2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082
            return nullptr;
          }
        }

        KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
          return pipeline->getPipelinedCap(*ops);
        } else {
          // Exception already thrown.
          return nullptr;
        }
      }

      default:
        KJ_FAIL_REQUIRE("Unknown message target type.", target) {
          return nullptr;
        }
    }
2083 2084

    KJ_UNREACHABLE;
Kenton Varda's avatar
Kenton Varda committed
2085 2086
  }

Kenton Varda's avatar
Kenton Varda committed
2087
  void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
Kenton Varda's avatar
Kenton Varda committed
2088 2089
    // Transitive destructors can end up manipulating the question table and invalidating our
    // pointer into it, so make sure these destructors run later.
2090 2091
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2092
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease;
2093

2094
    KJ_IF_MAYBE(question, questions.find(ret.getAnswerId())) {
2095 2096
      KJ_REQUIRE(question->isAwaitingReturn, "Duplicate Return.") { return; }
      question->isAwaitingReturn = false;
Kenton Varda's avatar
Kenton Varda committed
2097

2098 2099
      if (ret.getReleaseParamCaps()) {
        exportsToRelease = kj::mv(question->paramExports);
Kenton Varda's avatar
Kenton Varda committed
2100
      } else {
2101
        question->paramExports = nullptr;
Kenton Varda's avatar
Kenton Varda committed
2102
      }
2103

2104 2105
      KJ_IF_MAYBE(questionRef, question->selfRef) {
        switch (ret.which()) {
2106
          case rpc::Return::RESULTS: {
2107 2108 2109
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `results`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2110
            }
Kenton Varda's avatar
Kenton Varda committed
2111

2112
            auto payload = ret.getResults();
2113
            message->initCapTable(receiveCaps(payload.getCapTable()));
2114
            questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
2115
                *this, kj::addRef(*questionRef), kj::mv(message), payload.getContent()));
2116
            break;
2117
          }
2118

2119 2120 2121 2122
          case rpc::Return::EXCEPTION:
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2123
            }
2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135

            questionRef->reject(toException(ret.getException()));
            break;

          case rpc::Return::CANCELED:
            KJ_FAIL_REQUIRE("Return message falsely claims call was canceled.") { return; }
            break;

          case rpc::Return::RESULTS_SENT_ELSEWHERE:
            KJ_REQUIRE(question->isTailCall,
                "`Return` had `resultsSentElsewhere` but this was not a tail call.") {
              return;
2136 2137
            }

2138 2139 2140 2141
            // Tail calls are fulfilled with a null pointer.
            questionRef->fulfill(kj::Own<RpcResponse>());
            break;

2142 2143
          case rpc::Return::TAKE_FROM_OTHER_QUESTION:
            KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2144 2145 2146 2147 2148
              KJ_IF_MAYBE(response, answer->redirectedResults) {
                questionRef->fulfill(kj::mv(*response));
              } else {
                KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` referenced a call that did not "
                                "use `sendResultsTo.yourself`.") { return; }
2149 2150
              }
            } else {
2151
              KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` had invalid answer ID.") { return; }
2152 2153
            }

2154
            break;
2155

2156 2157 2158 2159
          default:
            KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; }
        }
      } else {
2160
        if (ret.isTakeFromOtherQuestion()) {
2161
          // Be sure to release the tail call's promise.
2162
          KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2163 2164 2165
            promiseToRelease = kj::mv(answer->redirectedResults);
          }
        }
Kenton Varda's avatar
Kenton Varda committed
2166

2167 2168
        // Looks like this question was canceled earlier, so `Finish` was already sent.  We can go
        // ahead and delete it from the table.
2169
        questions.erase(ret.getAnswerId(), *question);
Kenton Varda's avatar
Kenton Varda committed
2170
      }
Kenton Varda's avatar
Kenton Varda committed
2171

Kenton Varda's avatar
Kenton Varda committed
2172 2173 2174 2175 2176
    } else {
      KJ_FAIL_REQUIRE("Invalid question ID in Return message.") { return; }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2177
  void handleFinish(const rpc::Finish::Reader& finish) {
Kenton Varda's avatar
Kenton Varda committed
2178 2179
    // Delay release of these things until return so that transitive destructors don't accidentally
    // modify the answer table and invalidate our pointer into it.
2180 2181
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2182
    Answer answerToRelease;
2183
    kj::Maybe<kj::Own<PipelineHook>> pipelineToRelease;
Kenton Varda's avatar
Kenton Varda committed
2184

2185
    KJ_IF_MAYBE(answer, answers.find(finish.getQuestionId())) {
2186
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2187

2188 2189 2190 2191
      if (finish.getReleaseResultCaps()) {
        exportsToRelease = kj::mv(answer->resultExports);
      } else {
        answer->resultExports = nullptr;
2192
      }
Kenton Varda's avatar
Kenton Varda committed
2193

2194 2195
      pipelineToRelease = kj::mv(answer->pipeline);

2196 2197 2198 2199 2200
      // If the call isn't actually done yet, cancel it.  Otherwise, we can go ahead and erase the
      // question from the table.
      KJ_IF_MAYBE(context, answer->callContext) {
        context->requestCancel();
      } else {
Kenton Varda's avatar
Kenton Varda committed
2201
        answerToRelease = answers.erase(finish.getQuestionId());
2202
      }
Kenton Varda's avatar
Kenton Varda committed
2203
    } else {
2204
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2205
    }
2206 2207
  }

2208 2209 2210
  // ---------------------------------------------------------------------------
  // Level 1

2211
  void handleResolve(const rpc::Resolve::Reader& resolve) {
2212
    kj::Own<ClientHook> replacement;
2213 2214 2215 2216

    // Extract the replacement capability.
    switch (resolve.which()) {
      case rpc::Resolve::CAP:
2217 2218 2219 2220 2221
        KJ_IF_MAYBE(cap, receiveCap(resolve.getCap())) {
          replacement = kj::mv(*cap);
        } else {
          KJ_FAIL_REQUIRE("'Resolve' contained 'CapDescriptor.none'.") { return; }
        }
2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232
        break;

      case rpc::Resolve::EXCEPTION:
        replacement = newBrokenCap(toException(resolve.getException()));
        break;

      default:
        KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; }
    }

    // If the import is on the table, fulfill it.
2233
    KJ_IF_MAYBE(import, imports.find(resolve.getPromiseId())) {
2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244
      KJ_IF_MAYBE(fulfiller, import->promiseFulfiller) {
        // OK, this is in fact an unfulfilled promise!
        fulfiller->get()->fulfill(kj::mv(replacement));
      } else if (import->importClient != nullptr) {
        // It appears this is a valid entry on the import table, but was not expected to be a
        // promise.
        KJ_FAIL_REQUIRE("Got 'Resolve' for a non-promise import.") { break; }
      }
    }
  }

2245
  void handleRelease(const rpc::Release::Reader& release) {
Kenton Varda's avatar
Kenton Varda committed
2246
    releaseExport(release.getId(), release.getReferenceCount());
2247 2248
  }

Kenton Varda's avatar
Kenton Varda committed
2249
  void releaseExport(ExportId id, uint refcount) {
2250
    KJ_IF_MAYBE(exp, exports.find(id)) {
2251
      KJ_REQUIRE(refcount <= exp->refcount, "Tried to drop export's refcount below zero.") {
Kenton Varda's avatar
Kenton Varda committed
2252
        return;
2253 2254 2255 2256
      }

      exp->refcount -= refcount;
      if (exp->refcount == 0) {
2257
        exportsByCap.erase(exp->clientHook);
2258
        exports.erase(id, *exp);
2259 2260 2261
      }
    } else {
      KJ_FAIL_REQUIRE("Tried to release invalid export ID.") {
Kenton Varda's avatar
Kenton Varda committed
2262
        return;
2263 2264 2265 2266
      }
    }
  }

2267 2268 2269 2270 2271 2272
  void releaseExports(kj::ArrayPtr<ExportId> exports) {
    for (auto exportId: exports) {
      releaseExport(exportId, 1);
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2273 2274 2275 2276
  void handleDisembargo(const rpc::Disembargo::Reader& disembargo) {
    auto context = disembargo.getContext();
    switch (context.which()) {
      case rpc::Disembargo::Context::SENDER_LOOPBACK: {
2277
        kj::Own<ClientHook> target;
Kenton Varda's avatar
Kenton Varda committed
2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299

        KJ_IF_MAYBE(t, getMessageTarget(disembargo.getTarget())) {
          target = kj::mv(*t);
        } else {
          // Exception already reported.
          return;
        }

        for (;;) {
          KJ_IF_MAYBE(r, target->getResolved()) {
            target = r->addRef();
          } else {
            break;
          }
        }

        KJ_REQUIRE(target->getBrand() == this,
                   "'Disembargo' of type 'senderLoopback' sent to an object that does not point "
                   "back to the sender.") {
          return;
        }

Kenton Varda's avatar
Kenton Varda committed
2300 2301 2302 2303
        EmbargoId embargoId = context.getSenderLoopback();

        // We need to insert an evalLater() here to make sure that any pending calls towards this
        // cap have had time to find their way through the event loop.
2304 2305
        tasks.add(kj::evalLater(kj::mvCapture(
            target, [this,embargoId](kj::Own<ClientHook>&& target) {
2306 2307 2308 2309
          if (!connection.is<Connected>()) {
            return;
          }

2310
          RpcClient& downcasted = kj::downcast<RpcClient>(*target);
Kenton Varda's avatar
Kenton Varda committed
2311

2312
          auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327
              messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
          auto builder = message->getBody().initAs<rpc::Message>().initDisembargo();

          {
            auto redirect = downcasted.writeTarget(builder.initTarget());

            // Disembargoes should only be sent to capabilities that were previously the object of
            // a `Resolve` message.  But `writeTarget` only ever returns non-null when called on
            // a PromiseClient.  The code which sends `Resolve` should have replaced any promise
            // with a direct node in order to solve the Tribble 4-way race condition.
            KJ_REQUIRE(redirect == nullptr,
                       "'Disembargo' of type 'senderLoopback' sent to an object that does not "
                       "appear to have been the object of a previous 'Resolve' message.") {
              return;
            }
Kenton Varda's avatar
Kenton Varda committed
2328 2329
          }

Kenton Varda's avatar
Kenton Varda committed
2330
          builder.getContext().setReceiverLoopback(embargoId);
Kenton Varda's avatar
Kenton Varda committed
2331

Kenton Varda's avatar
Kenton Varda committed
2332 2333
          message->send();
        })));
Kenton Varda's avatar
Kenton Varda committed
2334 2335 2336 2337

        break;
      }

Kenton Varda's avatar
Kenton Varda committed
2338
      case rpc::Disembargo::Context::RECEIVER_LOOPBACK: {
2339
        KJ_IF_MAYBE(embargo, embargoes.find(context.getReceiverLoopback())) {
Kenton Varda's avatar
Kenton Varda committed
2340
          KJ_ASSERT_NONNULL(embargo->fulfiller)->fulfill();
2341
          embargoes.erase(context.getReceiverLoopback(), *embargo);
Kenton Varda's avatar
Kenton Varda committed
2342 2343 2344 2345 2346 2347
        } else {
          KJ_FAIL_REQUIRE("Invalid embargo ID in 'Disembargo.context.receiverLoopback'.") {
            return;
          }
        }
        break;
Kenton Varda's avatar
Kenton Varda committed
2348
      }
Kenton Varda's avatar
Kenton Varda committed
2349 2350 2351 2352 2353 2354

      default:
        KJ_FAIL_REQUIRE("Unimplemented Disembargo type.", disembargo) { return; }
    }
  }

2355 2356 2357 2358 2359
  // ---------------------------------------------------------------------------
  // Level 2

  class SingleCapPipeline: public PipelineHook, public kj::Refcounted {
  public:
2360 2361
    SingleCapPipeline(kj::Own<ClientHook>&& cap)
        : cap(kj::mv(cap)) {}
2362

2363
    kj::Own<PipelineHook> addRef() override {
2364 2365 2366
      return kj::addRef(*this);
    }

2367
    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
2368 2369 2370 2371 2372 2373 2374 2375
      if (ops.size() == 0) {
        return cap->addRef();
      } else {
        return newBrokenCap("Invalid pipeline transform.");
      }
    }

  private:
2376
    kj::Own<ClientHook> cap;
2377 2378 2379
  };

  void handleRestore(kj::Own<IncomingRpcMessage>&& message, const rpc::Restore::Reader& restore) {
2380
    AnswerId answerId = restore.getQuestionId();
2381

2382 2383 2384 2385 2386 2387
    if (!connection.is<Connected>()) {
      // Disconnected; ignore.
      return;
    }

    auto response = connection.get<Connected>()->newOutgoingMessage(
2388 2389 2390
        messageSizeHint<rpc::Return>() + sizeInWords<rpc::CapDescriptor>() + 32);

    rpc::Return::Builder ret = response->getBody().getAs<rpc::Message>().initReturn();
2391
    ret.setAnswerId(answerId);
2392

2393
    kj::Own<ClientHook> capHook;
2394 2395
    kj::Array<ExportId> resultExports;
    KJ_DEFER(releaseExports(resultExports));  // in case something goes wrong
2396 2397 2398 2399

    // Call the restorer and initialize the answer.
    KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
      KJ_IF_MAYBE(r, restorer) {
2400
        Capability::Client cap = r->baseRestore(restore.getObjectId());
2401
        auto payload = ret.initResults();
2402
        payload.getContent().setAs<Capability>(kj::mv(cap));
2403

2404
        auto capTable = response->getCapTable();
2405 2406
        KJ_DASSERT(capTable.size() == 1);
        resultExports = writeDescriptors(capTable, payload);
2407
        capHook = KJ_ASSERT_NONNULL(capTable[0])->addRef();
2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418
      } else {
        KJ_FAIL_REQUIRE("This vat cannot restore this SturdyRef.") { break; }
      }
    })) {
      fromException(*exception, ret.initException());
      capHook = newBrokenCap(kj::mv(*exception));
    }

    message = nullptr;

    // Add the answer to the answer table for pipelining and send the response.
2419
    auto& answer = answers[answerId];
2420 2421 2422
    KJ_REQUIRE(!answer.active, "questionId is already in use") {
      return;
    }
2423

2424
    answer.resultExports = kj::mv(resultExports);
2425
    answer.active = true;
2426
    answer.pipeline = kj::Own<PipelineHook>(kj::refcounted<SingleCapPipeline>(kj::mv(capHook)));
2427

2428
    response->send();
2429
  }
2430 2431 2432 2433
};

}  // namespace

2434
class RpcSystemBase::Impl final: public kj::TaskSet::ErrorHandler {
2435
public:
2436 2437
  Impl(VatNetworkBase& network, kj::Maybe<SturdyRefRestorerBase&> restorer)
      : network(network), restorer(restorer), tasks(*this) {
2438 2439 2440 2441
    tasks.add(acceptLoop());
  }

  ~Impl() noexcept(false) {
2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453
    unwindDetector.catchExceptionsIfUnwinding([&]() {
      // std::unordered_map doesn't like it when elements' destructors throw, so carefully
      // disassemble it.
      if (!connections.empty()) {
        kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size());
        kj::Exception shutdownException(
            kj::Exception::Nature::LOCAL_BUG, kj::Exception::Durability::PERMANENT,
            __FILE__, __LINE__, kj::str("RpcSystem was destroyed."));
        for (auto& entry: connections) {
          entry.second->disconnect(kj::cp(shutdownException));
          deleteMe.add(kj::mv(entry.second));
        }
2454
      }
2455
    });
2456
  }
2457

2458
  Capability::Client restore(_::StructReader hostId, AnyPointer::Reader objectId) {
2459
    KJ_IF_MAYBE(connection, network.baseConnectToRefHost(hostId)) {
2460
      auto& state = getConnectionState(kj::mv(*connection));
2461 2462 2463 2464 2465 2466 2467
      return Capability::Client(state.restore(objectId));
    } else KJ_IF_MAYBE(r, restorer) {
      return r->baseRestore(objectId);
    } else {
      return Capability::Client(newBrokenCap(
          "SturdyRef referred to a local object but there is no local SturdyRef restorer."));
    }
2468 2469 2470
  }

  void taskFailed(kj::Exception&& exception) override {
2471
    KJ_LOG(ERROR, exception);
Kenton Varda's avatar
Kenton Varda committed
2472 2473
  }

2474 2475
private:
  VatNetworkBase& network;
2476 2477 2478 2479 2480
  kj::Maybe<SturdyRefRestorerBase&> restorer;
  kj::TaskSet tasks;

  typedef std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>>
      ConnectionMap;
2481
  ConnectionMap connections;
Kenton Varda's avatar
Kenton Varda committed
2482

2483 2484
  kj::UnwindDetector unwindDetector;

2485 2486 2487
  RpcConnectionState& getConnectionState(kj::Own<VatNetworkBase::Connection>&& connection) {
    auto iter = connections.find(connection);
    if (iter == connections.end()) {
2488
      VatNetworkBase::Connection* connectionPtr = connection;
2489
      auto onDisconnect = kj::newPromiseAndFulfiller<void>();
2490
      tasks.add(onDisconnect.promise.then([this,connectionPtr]() {
2491
        connections.erase(connectionPtr);
2492 2493
      }));
      auto newState = kj::refcounted<RpcConnectionState>(
2494
          restorer, kj::mv(connection), kj::mv(onDisconnect.fulfiller));
2495
      RpcConnectionState& result = *newState;
2496
      connections.insert(std::make_pair(connectionPtr, kj::mv(newState)));
2497 2498 2499 2500 2501 2502 2503
      return result;
    } else {
      return *iter->second;
    }
  }

  kj::Promise<void> acceptLoop() {
2504
    auto receive = network.baseAcceptConnectionAsRefHost().then(
2505
        [this](kj::Own<VatNetworkBase::Connection>&& connection) {
2506
      getConnectionState(kj::mv(connection));
2507 2508 2509 2510 2511 2512 2513 2514
    });
    return receive.then([this]() {
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
      tasks.add(acceptLoop());
    });
2515
  }
2516 2517
};

2518 2519
RpcSystemBase::RpcSystemBase(VatNetworkBase& network, kj::Maybe<SturdyRefRestorerBase&> restorer)
    : impl(kj::heap<Impl>(network, restorer)) {}
2520
RpcSystemBase::RpcSystemBase(RpcSystemBase&& other) noexcept = default;
2521 2522
RpcSystemBase::~RpcSystemBase() noexcept(false) {}

2523
Capability::Client RpcSystemBase::baseRestore(
2524
    _::StructReader hostId, AnyPointer::Reader objectId) {
2525
  return impl->restore(hostId, objectId);
2526 2527 2528 2529
}

}  // namespace _ (private)
}  // namespace capnp