rpc.c++ 97.9 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22

#include "rpc.h"
23
#include "message.h"
24 25 26
#include <kj/debug.h>
#include <kj/vector.h>
#include <kj/async.h>
Kenton Varda's avatar
Kenton Varda committed
27
#include <kj/one-of.h>
Kenton Varda's avatar
Kenton Varda committed
28
#include <kj/function.h>
29
#include <unordered_map>
Kenton Varda's avatar
Kenton Varda committed
30
#include <map>
31 32 33 34 35 36 37 38
#include <queue>
#include <capnp/rpc.capnp.h>

namespace capnp {
namespace _ {  // private

namespace {

Kenton Varda's avatar
Kenton Varda committed
39 40 41 42 43 44 45 46 47
template <typename T>
inline constexpr uint messageSizeHint() {
  return 1 + sizeInWords<rpc::Message>() + sizeInWords<T>();
}
template <>
inline constexpr uint messageSizeHint<void>() {
  return 1 + sizeInWords<rpc::Message>();
}

48 49 50
constexpr const uint MESSAGE_TARGET_SIZE_HINT = sizeInWords<rpc::MessageTarget>() +
    sizeInWords<rpc::PromisedAnswer>() + 16;  // +16 for ops; hope that's enough

51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
constexpr const uint CAP_DESCRIPTOR_SIZE_HINT = sizeInWords<rpc::CapDescriptor>() +
    sizeInWords<rpc::PromisedAnswer>();

constexpr const uint64_t MAX_SIZE_HINT = 1 << 20;

uint copySizeHint(MessageSize size) {
  uint64_t sizeHint = size.wordCount + size.capCount * CAP_DESCRIPTOR_SIZE_HINT;
  return kj::min(MAX_SIZE_HINT, sizeHint);
}

uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint, uint additional) {
  KJ_IF_MAYBE(s, sizeHint) {
    return copySizeHint(*s) + additional;
  } else {
    return 0;
  }
}

Kenton Varda's avatar
Kenton Varda committed
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
kj::Maybe<kj::Array<PipelineOp>> toPipelineOps(List<rpc::PromisedAnswer::Op>::Reader ops) {
  auto result = kj::heapArrayBuilder<PipelineOp>(ops.size());
  for (auto opReader: ops) {
    PipelineOp op;
    switch (opReader.which()) {
      case rpc::PromisedAnswer::Op::NOOP:
        op.type = PipelineOp::NOOP;
        break;
      case rpc::PromisedAnswer::Op::GET_POINTER_FIELD:
        op.type = PipelineOp::GET_POINTER_FIELD;
        op.pointerIndex = opReader.getGetPointerField();
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported pipeline op.", (uint)opReader.which()) {
          return nullptr;
        }
    }
86
    result.add(op);
Kenton Varda's avatar
Kenton Varda committed
87 88 89 90 91
  }
  return result.finish();
}

Orphan<List<rpc::PromisedAnswer::Op>> fromPipelineOps(
Kenton Varda's avatar
Kenton Varda committed
92
    Orphanage orphanage, kj::ArrayPtr<const PipelineOp> ops) {
Kenton Varda's avatar
Kenton Varda committed
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
  auto result = orphanage.newOrphan<List<rpc::PromisedAnswer::Op>>(ops.size());
  auto builder = result.get();
  for (uint i: kj::indices(ops)) {
    rpc::PromisedAnswer::Op::Builder opBuilder = builder[i];
    switch (ops[i].type) {
      case PipelineOp::NOOP:
        opBuilder.setNoop();
        break;
      case PipelineOp::GET_POINTER_FIELD:
        opBuilder.setGetPointerField(ops[i].pointerIndex);
        break;
    }
  }
  return result;
}

Kenton Varda's avatar
Kenton Varda committed
109
kj::Exception toException(const rpc::Exception::Reader& exception) {
110 111
  return kj::Exception(static_cast<kj::Exception::Type>(exception.getType()),
      "(remote)", 0, kj::str("remote exception: ", exception.getReason()));
Kenton Varda's avatar
Kenton Varda committed
112 113 114
}

void fromException(const kj::Exception& exception, rpc::Exception::Builder builder) {
Kenton Varda's avatar
Kenton Varda committed
115 116 117
  // TODO(someday):  Indicate the remote server name as part of the stack trace.  Maybe even
  //   transmit stack traces?
  builder.setReason(exception.getDescription());
118
  builder.setType(static_cast<rpc::Exception::Type>(exception.getType()));
Kenton Varda's avatar
Kenton Varda committed
119 120
}

Kenton Varda's avatar
Kenton Varda committed
121
uint exceptionSizeHint(const kj::Exception& exception) {
Kenton Varda's avatar
Kenton Varda committed
122
  return sizeInWords<rpc::Exception>() + exception.getDescription().size() / sizeof(word) + 1;
Kenton Varda's avatar
Kenton Varda committed
123 124
}

Kenton Varda's avatar
Kenton Varda committed
125
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
126

127 128 129 130 131 132 133 134 135 136 137 138 139
template <typename Id, typename T>
class ExportTable {
  // Table mapping integers to T, where the integers are chosen locally.

public:
  kj::Maybe<T&> find(Id id) {
    if (id < slots.size() && slots[id] != nullptr) {
      return slots[id];
    } else {
      return nullptr;
    }
  }

140
  T erase(Id id, T& entry) {
Kenton Varda's avatar
Kenton Varda committed
141 142
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
143 144 145 146 147 148 149 150
    // `entry` is a reference to the entry being released -- we require this in order to prove
    // that the caller has already done a find() to check that this entry exists.  We can't check
    // ourselves because the caller may have nullified the entry in the meantime.
    KJ_DREQUIRE(&entry == &slots[id]);
    T toRelease = kj::mv(slots[id]);
    slots[id] = T();
    freeIds.push(id);
    return toRelease;
151 152 153 154 155 156 157 158 159 160 161 162 163
  }

  T& next(Id& id) {
    if (freeIds.empty()) {
      id = slots.size();
      return slots.add();
    } else {
      id = freeIds.top();
      freeIds.pop();
      return slots[id];
    }
  }

164 165 166 167 168 169 170 171 172
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i = 0; i < slots.size(); i++) {
      if (slots[i] != nullptr) {
        func(i, slots[i]);
      }
    }
  }

173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
private:
  kj::Vector<T> slots;
  std::priority_queue<Id, std::vector<Id>, std::greater<Id>> freeIds;
};

template <typename Id, typename T>
class ImportTable {
  // Table mapping integers to T, where the integers are chosen remotely.

public:
  T& operator[](Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      return high[id];
    }
  }

Kenton Varda's avatar
Kenton Varda committed
191 192 193 194 195
  kj::Maybe<T&> find(Id id) {
    if (id < kj::size(low)) {
      return low[id];
    } else {
      auto iter = high.find(id);
Kenton Varda's avatar
Kenton Varda committed
196
      if (iter == high.end()) {
Kenton Varda's avatar
Kenton Varda committed
197 198 199 200 201 202 203
        return nullptr;
      } else {
        return iter->second;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
204 205 206
  T erase(Id id) {
    // Remove an entry from the table and return it.  We return it so that the caller can be
    // careful to release it (possibly invoking arbitrary destructors) at a time that makes sense.
207
    if (id < kj::size(low)) {
Kenton Varda's avatar
Kenton Varda committed
208
      T toRelease = kj::mv(low[id]);
209
      low[id] = T();
Kenton Varda's avatar
Kenton Varda committed
210
      return toRelease;
211
    } else {
Kenton Varda's avatar
Kenton Varda committed
212
      T toRelease = kj::mv(high[id]);
213
      high.erase(id);
Kenton Varda's avatar
Kenton Varda committed
214
      return toRelease;
215 216 217
    }
  }

218 219 220 221 222 223 224 225 226 227
  template <typename Func>
  void forEach(Func&& func) {
    for (Id i: kj::indices(low)) {
      func(i, low[i]);
    }
    for (auto& entry: high) {
      func(entry.first, entry.second);
    }
  }

228 229 230 231 232
private:
  T low[16];
  std::unordered_map<Id, T> high;
};

Kenton Varda's avatar
Kenton Varda committed
233 234
// =======================================================================================

235
class RpcConnectionState final: public kj::TaskSet::ErrorHandler, public kj::Refcounted {
236
public:
237
  RpcConnectionState(kj::Maybe<Capability::Client> bootstrapInterface,
238
                     kj::Maybe<RealmGateway<>::Client> gateway,
239
                     kj::Maybe<SturdyRefRestorerBase&> restorer,
240
                     kj::Own<VatNetworkBase::Connection>&& connectionParam,
241
                     kj::Own<kj::PromiseFulfiller<void>>&& disconnectFulfiller)
242 243
      : bootstrapInterface(kj::mv(bootstrapInterface)), gateway(kj::mv(gateway)),
        restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), tasks(*this) {
244
    connection.init<Connected>(kj::mv(connectionParam));
245 246 247
    tasks.add(messageLoop());
  }

248
  kj::Own<ClientHook> restore(AnyPointer::Reader objectId) {
249 250 251 252
    if (connection.is<Disconnected>()) {
      return newBrokenCap(kj::cp(connection.get<Disconnected>()));
    }

253
    QuestionId questionId;
254
    auto& question = questions.next(questionId);
255

256
    question.isAwaitingReturn = true;
257

258
    auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
Kenton Varda's avatar
Kenton Varda committed
259

260 261
    auto questionRef = kj::refcounted<QuestionRef>(*this, questionId, kj::mv(paf.fulfiller));
    question.selfRef = *questionRef;
Kenton Varda's avatar
Kenton Varda committed
262

263
    paf.promise = paf.promise.attach(kj::addRef(*questionRef));
264 265

    {
266
      auto message = connection.get<Connected>()->newOutgoingMessage(
267
          objectId.targetSize().wordCount + messageSizeHint<rpc::Bootstrap>());
268

269
      auto builder = message->getBody().initAs<rpc::Message>().initBootstrap();
270
      builder.setQuestionId(questionId);
271
      builder.getDeprecatedObjectId().set(objectId);
272 273 274 275

      message->send();
    }

276
    auto pipeline = kj::refcounted<RpcPipeline>(*this, kj::mv(questionRef), kj::mv(paf.promise));
277

278
    return pipeline->getPipelinedCap(kj::Array<const PipelineOp>(nullptr));
Kenton Varda's avatar
Kenton Varda committed
279 280
  }

281
  void taskFailed(kj::Exception&& exception) override {
282 283 284 285
    disconnect(kj::mv(exception));
  }

  void disconnect(kj::Exception&& exception) {
286 287 288 289 290
    if (!connection.is<Connected>()) {
      // Already disconnected.
      return;
    }

291 292
    kj::Exception networkException(kj::Exception::Type::DISCONNECTED,
        exception.getFile(), exception.getLine(), kj::heapString(exception.getDescription()));
293 294

    KJ_IF_MAYBE(newException, kj::runCatchingExceptions([&]() {
295 296
      // Carefully pull all the objects out of the tables prior to releasing them because their
      // destructors could come back and mess with the tables.
297 298 299 300
      kj::Vector<kj::Own<PipelineHook>> pipelinesToRelease;
      kj::Vector<kj::Own<ClientHook>> clientsToRelease;
      kj::Vector<kj::Promise<kj::Own<RpcResponse>>> tailCallsToRelease;
      kj::Vector<kj::Promise<void>> resolveOpsToRelease;
301 302

      // All current questions complete with exceptions.
303
      questions.forEach([&](QuestionId id, Question& question) {
Kenton Varda's avatar
Kenton Varda committed
304
        KJ_IF_MAYBE(questionRef, question.selfRef) {
305 306
          // QuestionRef still present.
          questionRef->reject(kj::cp(networkException));
Kenton Varda's avatar
Kenton Varda committed
307
        }
308 309
      });

310
      answers.forEach([&](AnswerId id, Answer& answer) {
311 312 313 314
        KJ_IF_MAYBE(p, answer.pipeline) {
          pipelinesToRelease.add(kj::mv(*p));
        }

315
        KJ_IF_MAYBE(promise, answer.redirectedResults) {
316
          tailCallsToRelease.add(kj::mv(*promise));
317 318
        }

319 320 321 322 323
        KJ_IF_MAYBE(context, answer.callContext) {
          context->requestCancel();
        }
      });

324
      exports.forEach([&](ExportId id, Export& exp) {
325
        clientsToRelease.add(kj::mv(exp.clientHook));
326
        resolveOpsToRelease.add(kj::mv(exp.resolveOp));
327 328 329
        exp = Export();
      });

330
      imports.forEach([&](ImportId id, Import& import) {
331 332
        KJ_IF_MAYBE(f, import.promiseFulfiller) {
          f->get()->reject(kj::cp(networkException));
333 334 335
        }
      });

336
      embargoes.forEach([&](EmbargoId id, Embargo& embargo) {
337 338 339 340
        KJ_IF_MAYBE(f, embargo.fulfiller) {
          f->get()->reject(kj::cp(networkException));
        }
      });
341 342 343 344 345
    })) {
      // Some destructor must have thrown an exception.  There is no appropriate place to report
      // these errors.
      KJ_LOG(ERROR, "Uncaught exception when destroying capabilities dropped by disconnect.",
             *newException);
346 347
    }

348 349 350
    // Send an abort message, but ignore failure.
    kj::runCatchingExceptions([&]() {
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
351
          messageSizeHint<void>() + exceptionSizeHint(exception));
352 353
      fromException(exception, message->getBody().getAs<rpc::Message>().initAbort());
      message->send();
354
    });
355 356 357

    // Indicate disconnect.
    disconnectFulfiller->fulfill();
358
    connection.init<Disconnected>(kj::mv(networkException));
359 360
  }

361
private:
362
  class RpcClient;
Kenton Varda's avatar
Kenton Varda committed
363
  class ImportClient;
364
  class PromiseClient;
Kenton Varda's avatar
Kenton Varda committed
365
  class QuestionRef;
Kenton Varda's avatar
Kenton Varda committed
366
  class RpcPipeline;
Kenton Varda's avatar
Kenton Varda committed
367
  class RpcCallContext;
Kenton Varda's avatar
Kenton Varda committed
368
  class RpcResponse;
Kenton Varda's avatar
Kenton Varda committed
369

370 371 372 373 374 375
  // =======================================================================================
  // The Four Tables entry types
  //
  // We have to define these before we can define the class's fields.

  typedef uint32_t QuestionId;
376
  typedef QuestionId AnswerId;
377
  typedef uint32_t ExportId;
378 379 380 381 382 383 384 385 386 387
  typedef ExportId ImportId;
  // See equivalent definitions in rpc.capnp.
  //
  // We always use the type that refers to the local table of the same name.  So e.g. although
  // QuestionId and AnswerId are the same type, we use QuestionId when referring to an entry in
  // the local question table (which corresponds to the peer's answer table) and use AnswerId
  // to refer to an entry in our answer table (which corresponds to the peer's question table).
  // Since all messages in the RPC protocol are defined from the sender's point of view, this
  // means that any time we read an ID from a received message, its type should invert.
  // TODO(cleanup):  Perhaps we could enforce that in a type-safe way?  Hmm...
388 389

  struct Question {
390 391 392
    kj::Array<ExportId> paramExports;
    // List of exports that were sent in the request.  If the response has `releaseParamCaps` these
    // will need to be released.
393

Kenton Varda's avatar
Kenton Varda committed
394 395 396
    kj::Maybe<QuestionRef&> selfRef;
    // The local QuestionRef, set to nullptr when it is destroyed, which is also when `Finish` is
    // sent.
397

398 399 400
    bool isAwaitingReturn = false;
    // True from when `Call` is sent until `Return` is received.

401 402 403
    bool isTailCall = false;
    // Is this a tail call?  If so, we don't expect to receive results in the `Return`.

Kenton Varda's avatar
Kenton Varda committed
404
    inline bool operator==(decltype(nullptr)) const {
405
      return !isAwaitingReturn && selfRef == nullptr;
Kenton Varda's avatar
Kenton Varda committed
406 407
    }
    inline bool operator!=(decltype(nullptr)) const { return !operator==(nullptr); }
408 409 410
  };

  struct Answer {
411 412 413 414 415 416
    Answer() = default;
    Answer(const Answer&) = delete;
    Answer(Answer&&) = default;
    Answer& operator=(Answer&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

417 418 419 420
    bool active = false;
    // True from the point when the Call message is received to the point when both the `Finish`
    // message has been received and the `Return` has been sent.

421
    kj::Maybe<kj::Own<PipelineHook>> pipeline;
422 423
    // Send pipelined calls here.  Becomes null as soon as a `Finish` is received.

424
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> redirectedResults;
425 426
    // For locally-redirected calls (Call.sendResultsTo.yourself), this is a promise for the call
    // result, to be picked up by a subsequent `Return`.
427

428
    kj::Maybe<RpcCallContext&> callContext;
429 430
    // The call context, if it's still active.  Becomes null when the `Return` message is sent.
    // This object, if non-null, is owned by `asyncOp`.
431

432 433 434
    kj::Array<ExportId> resultExports;
    // List of exports that were sent in the results.  If the finish has `releaseResultCaps` these
    // will need to be released.
435 436 437 438 439 440
  };

  struct Export {
    uint refcount = 0;
    // When this reaches 0, drop `clientHook` and free this export.

441
    kj::Own<ClientHook> clientHook;
442

443 444 445 446
    kj::Promise<void> resolveOp = nullptr;
    // If this export is a promise (not a settled capability), the `resolveOp` represents the
    // ongoing operation to wait for that promise to resolve and then send a `Resolve` message.

447 448 449 450 451 452 453 454 455 456 457
    inline bool operator==(decltype(nullptr)) const { return refcount == 0; }
    inline bool operator!=(decltype(nullptr)) const { return refcount != 0; }
  };

  struct Import {
    Import() = default;
    Import(const Import&) = delete;
    Import(Import&&) = default;
    Import& operator=(Import&&) = default;
    // If we don't explicitly write all this, we get some stupid error deep in STL.

458
    kj::Maybe<ImportClient&> importClient;
459 460
    // Becomes null when the import is destroyed.

461 462 463 464 465
    kj::Maybe<RpcClient&> appClient;
    // Either a copy of importClient, or, in the case of promises, the wrapping PromiseClient.
    // Becomes null when it is discarded *or* when the import is destroyed (e.g. the promise is
    // resolved and the import is no longer needed).

466
    kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Own<ClientHook>>>> promiseFulfiller;
467 468 469
    // If non-null, the import is a promise.
  };

Kenton Varda's avatar
Kenton Varda committed
470 471 472 473 474 475 476 477 478 479 480 481
  typedef uint32_t EmbargoId;

  struct Embargo {
    // For handling the `Disembargo` message when looping back to self.

    kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> fulfiller;
    // Fulfill this when the Disembargo arrives.

    inline bool operator==(decltype(nullptr)) const { return fulfiller == nullptr; }
    inline bool operator!=(decltype(nullptr)) const { return fulfiller != nullptr; }
  };

482 483 484
  // =======================================================================================
  // OK, now we can define RpcConnectionState's member data.

485
  kj::Maybe<Capability::Client> bootstrapInterface;
486
  kj::Maybe<RealmGateway<>::Client> gateway;
487
  kj::Maybe<SturdyRefRestorerBase&> restorer;
488 489 490 491 492 493 494

  typedef kj::Own<VatNetworkBase::Connection> Connected;
  typedef kj::Exception Disconnected;
  kj::OneOf<Connected, Disconnected> connection;
  // Once the connection has failed, we drop it and replace it with an exception, which will be
  // thrown from all further calls.

495 496
  kj::Own<kj::PromiseFulfiller<void>> disconnectFulfiller;

497 498
  ExportTable<ExportId, Export> exports;
  ExportTable<QuestionId, Question> questions;
499 500
  ImportTable<AnswerId, Answer> answers;
  ImportTable<ImportId, Import> imports;
501 502 503 504 505 506 507 508 509
  // The Four Tables!
  // The order of the tables is important for correct destruction.

  std::unordered_map<ClientHook*, ExportId> exportsByCap;
  // Maps already-exported ClientHook objects to their ID in the export table.

  ExportTable<EmbargoId, Embargo> embargoes;
  // There are only four tables.  This definitely isn't a fifth table.  I don't know what you're
  // talking about.
510 511 512 513

  kj::TaskSet tasks;

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
514
  // ClientHook implementations
515

Kenton Varda's avatar
Kenton Varda committed
516
  class RpcClient: public ClientHook, public kj::Refcounted {
517
  public:
518
    RpcClient(RpcConnectionState& connectionState)
519
        : connectionState(kj::addRef(connectionState)) {}
520

521 522 523 524
    virtual kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) = 0;
    // Writes a CapDescriptor referencing this client.  The CapDescriptor must be sent as part of
    // the very next message sent on the connection, as it may become invalid if other things
    // happen.
Kenton Varda's avatar
Kenton Varda committed
525 526 527 528
    //
    // If writing the descriptor adds a new export to the export table, or increments the refcount
    // on an existing one, then the ID is returned and the caller is responsible for removing it
    // later.
Kenton Varda's avatar
Kenton Varda committed
529

530 531
    virtual kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) = 0;
Kenton Varda's avatar
Kenton Varda committed
532
    // Writes the appropriate call target for calls to this capability and returns null.
Kenton Varda's avatar
Kenton Varda committed
533
    //
Kenton Varda's avatar
Kenton Varda committed
534 535 536 537
    // - OR -
    //
    // If calls have been redirected to some other local ClientHook, returns that hook instead.
    // This can happen if the capability represents a promise that has been resolved.
Kenton Varda's avatar
Kenton Varda committed
538

539
    virtual kj::Own<ClientHook> getInnermostClient() = 0;
Kenton Varda's avatar
Kenton Varda committed
540 541 542 543
    // If this client just wraps some other client -- even if it is only *temporarily* wrapping
    // that other client -- return a reference to the other client, transitively.  Otherwise,
    // return a new reference to *this.

Kenton Varda's avatar
Kenton Varda committed
544 545
    // implements ClientHook -----------------------------------------

546
    Request<AnyPointer, AnyPointer> newCall(
547
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
548 549 550 551 552 553 554 555 556 557 558 559
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          //
          // We pull a neat trick here: We actually end up returning a RequestHook for an import
          // request on the gateway cap, but with the "root" of the request actually pointing
          // to the "params" field of the real request.

          sizeHint = sizeHint.map([](MessageSize hint) {
            ++hint.capCount;
            hint.wordCount += sizeInWords<RealmGateway<>::ImportParams>();
560
            return hint;
561 562 563
          });

          auto request = g->importRequest(sizeHint);
564
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
565

566 567 568 569 570 571 572 573 574 575 576
          // Awkwardly, request.initParams() would return a SaveParams struct, but to construct
          // the Request<AnyPointer, AnyPointer> to return we need an AnyPointer::Builder, and you
          // can't go backwards from a struct builder to an AnyPointer builder. So instead we
          // manually get at the pointer by converting the outer request to AnyStruct and then
          // pulling the pointer from the pointer section.
          auto pointers = toAny(request).getPointerSection();
          KJ_ASSERT(pointers.size() >= 2);
          auto paramsPtr = pointers[1];
          KJ_ASSERT(paramsPtr.isNull());

          return Request<AnyPointer, AnyPointer>(paramsPtr, RequestHook::from(kj::mv(request)));
577 578 579
        }
      }

580 581 582 583 584
      return newCallNoIntercept(interfaceId, methodId, sizeHint);
    }

    Request<AnyPointer, AnyPointer> newCallNoIntercept(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) {
585 586 587 588
      if (!connectionState->connection.is<Connected>()) {
        return newBrokenRequest(kj::cp(connectionState->connection.get<Disconnected>()), sizeHint);
      }

589
      auto request = kj::heap<RpcRequest>(
590 591
          *connectionState, *connectionState->connection.get<Connected>(),
          sizeHint, kj::addRef(*this));
592 593 594 595 596 597
      auto callBuilder = request->getCall();

      callBuilder.setInterfaceId(interfaceId);
      callBuilder.setMethodId(methodId);

      auto root = request->getRoot();
598
      return Request<AnyPointer, AnyPointer>(root, kj::mv(request));
599 600
    }

Kenton Varda's avatar
Kenton Varda committed
601
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
602
                                kj::Own<CallContextHook>&& context) override {
603 604 605 606 607 608 609 610 611 612 613
      if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
        KJ_IF_MAYBE(g, connectionState->gateway) {
          // Wait, this is a call to Persistent.save() and we need to translate it through our
          // gateway.
          auto params = context->getParams().getAs<Persistent<>::SaveParams>();

          auto requestSize = params.totalSize();
          ++requestSize.capCount;
          requestSize.wordCount += sizeInWords<RealmGateway<>::ImportParams>();

          auto request = g->importRequest(requestSize);
614
          request.setCap(Persistent<>::Client(kj::refcounted<NoInterceptClient>(*this)));
615 616 617 618 619 620 621 622
          request.setParams(params);

          context->allowCancellation();
          context->releaseParams();
          return context->directTailCall(RequestHook::from(kj::mv(request)));
        }
      }

623 624 625 626 627 628 629
      return callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    VoidPromiseAndPipeline callNoIntercept(uint64_t interfaceId, uint16_t methodId,
                                           kj::Own<CallContextHook>&& context) {
      // Implement call() by copying params and results messages.

Kenton Varda's avatar
Kenton Varda committed
630
      auto params = context->getParams();
631
      auto request = newCall(interfaceId, methodId, params.targetSize());
Kenton Varda's avatar
Kenton Varda committed
632

633
      request.set(params);
Kenton Varda's avatar
Kenton Varda committed
634 635
      context->releaseParams();

636
      // We can and should propagate cancellation.
637
      context->allowCancellation();
638

639
      return context->directTailCall(RequestHook::from(kj::mv(request)));
Kenton Varda's avatar
Kenton Varda committed
640 641
    }

642
    kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
643 644
      return kj::addRef(*this);
    }
645
    const void* getBrand() override {
646
      return connectionState.get();
Kenton Varda's avatar
Kenton Varda committed
647 648
    }

649
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
650 651
  };

652 653 654 655
  class ImportClient final: public RpcClient {
    // A ClientHook that wraps an entry in the import table.

  public:
656
    ImportClient(RpcConnectionState& connectionState, ImportId importId)
Kenton Varda's avatar
Kenton Varda committed
657 658
        : RpcClient(connectionState), importId(importId) {}

Kenton Varda's avatar
Kenton Varda committed
659
    ~ImportClient() noexcept(false) {
660
      unwindDetector.catchExceptionsIfUnwinding([&]() {
661
        // Remove self from the import table, if the table is still pointing at us.
662 663 664 665 666
        KJ_IF_MAYBE(import, connectionState->imports.find(importId)) {
          KJ_IF_MAYBE(i, import->importClient) {
            if (i == this) {
              connectionState->imports.erase(importId);
            }
667 668
          }
        }
Kenton Varda's avatar
Kenton Varda committed
669

670
        // Send a message releasing our remote references.
671 672
        if (remoteRefcount > 0 && connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
673 674 675 676 677 678 679
              messageSizeHint<rpc::Release>());
          rpc::Release::Builder builder = message->getBody().initAs<rpc::Message>().initRelease();
          builder.setId(importId);
          builder.setReferenceCount(remoteRefcount);
          message->send();
        }
      });
680 681
    }

682 683 684
    void addRemoteRef() {
      // Add a new RemoteRef and return a new ref to this client representing it.
      ++remoteRefcount;
Kenton Varda's avatar
Kenton Varda committed
685
    }
686

687
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
688
      descriptor.setReceiverHosted(importId);
Kenton Varda's avatar
Kenton Varda committed
689
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
690 691
    }

692 693
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
694
      target.setImportedCap(importId);
Kenton Varda's avatar
Kenton Varda committed
695
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
696 697
    }

698
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
699 700 701
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
702
    // implements ClientHook -----------------------------------------
703

704
    kj::Maybe<ClientHook&> getResolved() override {
705 706 707
      return nullptr;
    }

708
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
709 710 711
      return nullptr;
    }

Kenton Varda's avatar
Kenton Varda committed
712
  private:
713
    ImportId importId;
Kenton Varda's avatar
Kenton Varda committed
714 715 716

    uint remoteRefcount = 0;
    // Number of times we've received this import from the peer.
717 718

    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
719 720
  };

721 722 723
  class PipelineClient final: public RpcClient {
    // A ClientHook representing a pipelined promise.  Always wrapped in PromiseClient.

Kenton Varda's avatar
Kenton Varda committed
724
  public:
725 726
    PipelineClient(RpcConnectionState& connectionState,
                   kj::Own<QuestionRef>&& questionRef,
727
                   kj::Array<PipelineOp>&& ops)
Kenton Varda's avatar
Kenton Varda committed
728
        : RpcClient(connectionState), questionRef(kj::mv(questionRef)), ops(kj::mv(ops)) {}
Kenton Varda's avatar
Kenton Varda committed
729

730
   kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
Kenton Varda's avatar
Kenton Varda committed
731 732 733 734 735
      auto promisedAnswer = descriptor.initReceiverAnswer();
      promisedAnswer.setQuestionId(questionRef->getId());
      promisedAnswer.adoptTransform(fromPipelineOps(
          Orphanage::getForMessageContaining(descriptor), ops));
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
736 737
    }

738 739
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
Kenton Varda's avatar
Kenton Varda committed
740 741 742 743
      auto builder = target.initPromisedAnswer();
      builder.setQuestionId(questionRef->getId());
      builder.adoptTransform(fromPipelineOps(Orphanage::getForMessageContaining(builder), ops));
      return nullptr;
744 745
    }

746
    kj::Own<ClientHook> getInnermostClient() override {
Kenton Varda's avatar
Kenton Varda committed
747 748 749
      return kj::addRef(*this);
    }

750
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
751

752
    kj::Maybe<ClientHook&> getResolved() override {
753 754 755
      return nullptr;
    }

756
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
757
      return nullptr;
Kenton Varda's avatar
Kenton Varda committed
758 759 760
    }

  private:
761
    kj::Own<QuestionRef> questionRef;
762
    kj::Array<PipelineOp> ops;
Kenton Varda's avatar
Kenton Varda committed
763 764
  };

765 766 767 768
  class PromiseClient final: public RpcClient {
    // A ClientHook that initially wraps one client (in practice, an ImportClient or a
    // PipelineClient) and then, later on, redirects to some other client.

Kenton Varda's avatar
Kenton Varda committed
769
  public:
770 771 772
    PromiseClient(RpcConnectionState& connectionState,
                  kj::Own<ClientHook> initial,
                  kj::Promise<kj::Own<ClientHook>> eventual,
773
                  kj::Maybe<ImportId> importId)
774
        : RpcClient(connectionState),
775 776
          isResolved(false),
          cap(kj::mv(initial)),
777
          importId(importId),
778 779 780
          fork(eventual.fork()),
          resolveSelfPromise(fork.addBranch().then(
              [this](kj::Own<ClientHook>&& resolution) {
781
                resolve(kj::mv(resolution), false);
782
              }, [this](kj::Exception&& exception) {
783
                resolve(newBrokenCap(kj::mv(exception)), true);
784 785 786 787
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
788
              })) {
789 790 791 792 793 794
      // Create a client that starts out forwarding all calls to `initial` but, once `eventual`
      // resolves, will forward there instead.  In addition, `whenMoreResolved()` will return a fork
      // of `eventual`.  Note that this means the application could hold on to `eventual` even after
      // the `PromiseClient` is destroyed; `eventual` must therefore make sure to hold references to
      // anything that needs to stay alive in order to resolve it correctly (such as making sure the
      // import ID is not released).
Kenton Varda's avatar
Kenton Varda committed
795
    }
Kenton Varda's avatar
Kenton Varda committed
796

797 798 799 800 801 802
    ~PromiseClient() noexcept(false) {
      KJ_IF_MAYBE(id, importId) {
        // This object is representing an import promise.  That means the import table may still
        // contain a pointer back to it.  Remove that pointer.  Note that we have to verify that
        // the import still exists and the pointer still points back to this object because this
        // object may actually outlive the import.
803
        KJ_IF_MAYBE(import, connectionState->imports.find(*id)) {
804 805 806 807 808 809 810 811 812
          KJ_IF_MAYBE(c, import->appClient) {
            if (c == this) {
              import->appClient = nullptr;
            }
          }
        }
      }
    }

813
    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
814
      receivedCall = true;
815
      return connectionState->writeDescriptor(*cap, descriptor);
Kenton Varda's avatar
Kenton Varda committed
816
    }
Kenton Varda's avatar
Kenton Varda committed
817

818 819
    kj::Maybe<kj::Own<ClientHook>> writeTarget(
        rpc::MessageTarget::Builder target) override {
820
      receivedCall = true;
821
      return connectionState->writeTarget(*cap, target);
Kenton Varda's avatar
Kenton Varda committed
822 823
    }

824
    kj::Own<ClientHook> getInnermostClient() override {
825
      receivedCall = true;
826
      return connectionState->getInnermostClient(*cap);
Kenton Varda's avatar
Kenton Varda committed
827 828
    }

Kenton Varda's avatar
Kenton Varda committed
829
    // implements ClientHook -----------------------------------------
Kenton Varda's avatar
Kenton Varda committed
830

831
    Request<AnyPointer, AnyPointer> newCall(
832
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
833
      receivedCall = true;
834
      return cap->newCall(interfaceId, methodId, sizeHint);
835 836
    }

837
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
838
                                kj::Own<CallContextHook>&& context) override {
839
      receivedCall = true;
840
      return cap->call(interfaceId, methodId, kj::mv(context));
841 842
    }

843
    kj::Maybe<ClientHook&> getResolved() override {
844 845
      if (isResolved) {
        return *cap;
846 847 848
      } else {
        return nullptr;
      }
Kenton Varda's avatar
Kenton Varda committed
849 850
    }

851
    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
852
      return fork.addBranch();
Kenton Varda's avatar
Kenton Varda committed
853
    }
Kenton Varda's avatar
Kenton Varda committed
854 855

  private:
856 857
    bool isResolved;
    kj::Own<ClientHook> cap;
858

859
    kj::Maybe<ImportId> importId;
860
    kj::ForkedPromise<kj::Own<ClientHook>> fork;
Kenton Varda's avatar
Kenton Varda committed
861 862 863 864 865

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

866
    bool receivedCall = false;
Kenton Varda's avatar
Kenton Varda committed
867

868
    void resolve(kj::Own<ClientHook> replacement, bool isError) {
869 870
      if (replacement->getBrand() != connectionState.get() && receivedCall && !isError &&
          connectionState->connection.is<Connected>()) {
871 872 873 874 875
        // The new capability is hosted locally, not on the remote machine.  And, we had made calls
        // to the promise.  We need to make sure those calls echo back to us before we allow new
        // calls to go directly to the local capability, so we need to set a local embargo and send
        // a `Disembargo` to echo through the peer.

876
        auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
877
            messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
Kenton Varda's avatar
Kenton Varda committed
878

Kenton Varda's avatar
Kenton Varda committed
879
        auto disembargo = message->getBody().initAs<rpc::Message>().initDisembargo();
Kenton Varda's avatar
Kenton Varda committed
880 881

        {
882
          auto redirect = connectionState->writeTarget(*cap, disembargo.initTarget());
Kenton Varda's avatar
Kenton Varda committed
883 884 885 886 887
          KJ_ASSERT(redirect == nullptr,
                    "Original promise target should always be from this RPC connection.");
        }

        EmbargoId embargoId;
888
        Embargo& embargo = connectionState->embargoes.next(embargoId);
Kenton Varda's avatar
Kenton Varda committed
889 890 891 892 893 894 895

        disembargo.getContext().setSenderLoopback(embargoId);

        auto paf = kj::newPromiseAndFulfiller<void>();
        embargo.fulfiller = kj::mv(paf.fulfiller);

        // Make a promise which resolves to `replacement` as soon as the `Disembargo` comes back.
896 897
        auto embargoPromise = paf.promise.then(
            kj::mvCapture(replacement, [this](kj::Own<ClientHook>&& replacement) {
Kenton Varda's avatar
Kenton Varda committed
898 899 900 901 902
              return kj::mv(replacement);
            }));

        // We need to queue up calls in the meantime, so we'll resolve ourselves to a local promise
        // client instead.
903
        replacement = newLocalPromiseClient(kj::mv(embargoPromise));
Kenton Varda's avatar
Kenton Varda committed
904 905 906 907 908

        // Send the `Disembargo`.
        message->send();
      }

909 910
      cap = replacement->addRef();
      isResolved = true;
Kenton Varda's avatar
Kenton Varda committed
911
    }
Kenton Varda's avatar
Kenton Varda committed
912
  };
913

914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962
  class NoInterceptClient final: public RpcClient {
    // A wrapper around an RpcClient which bypasses special handling of "save" requests. When we
    // intercept a "save" request and invoke a RealmGateway, we give it a version of the capability
    // with intercepting disabled, since usually the first thing the RealmGateway will do is turn
    // around and call save() again.
    //
    // This is admittedly sort of backwards: the interception of "save" ought to be the part
    // implemented by a wrapper. However, that would require placing a wrapper around every
    // RpcClient we create whereas NoInterceptClient only needs to be injected after a save()
    // request occurs and is intercepted.

  public:
    NoInterceptClient(RpcClient& inner)
        : RpcClient(*inner.connectionState),
          inner(kj::addRef(inner)) {}

    kj::Maybe<ExportId> writeDescriptor(rpc::CapDescriptor::Builder descriptor) override {
      return inner->writeDescriptor(descriptor);
    }

    kj::Maybe<kj::Own<ClientHook>> writeTarget(rpc::MessageTarget::Builder target) override {
      return inner->writeTarget(target);
    }

    kj::Own<ClientHook> getInnermostClient() override {
      return inner->getInnermostClient();
    }

    Request<AnyPointer, AnyPointer> newCall(
        uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
      return inner->newCallNoIntercept(interfaceId, methodId, sizeHint);
    }
    VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
                                kj::Own<CallContextHook>&& context) override {
      return inner->callNoIntercept(interfaceId, methodId, kj::mv(context));
    }

    kj::Maybe<ClientHook&> getResolved() override {
      return nullptr;
    }

    kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
      return nullptr;
    }

  private:
    kj::Own<RpcClient> inner;
  };

963
  kj::Maybe<ExportId> writeDescriptor(ClientHook& cap, rpc::CapDescriptor::Builder descriptor) {
964
    // Write a descriptor for the given capability.
Kenton Varda's avatar
Kenton Varda committed
965

966
    // Find the innermost wrapped capability.
967
    ClientHook* inner = &cap;
968 969 970 971 972 973 974 975 976
    for (;;) {
      KJ_IF_MAYBE(resolved, inner->getResolved()) {
        inner = resolved;
      } else {
        break;
      }
    }

    if (inner->getBrand() == this) {
977
      return kj::downcast<RpcClient>(*inner).writeDescriptor(descriptor);
Kenton Varda's avatar
Kenton Varda committed
978
    } else {
979 980
      auto iter = exportsByCap.find(inner);
      if (iter != exportsByCap.end()) {
981
        // We've already seen and exported this capability before.  Just up the refcount.
982
        auto& exp = KJ_ASSERT_NONNULL(exports.find(iter->second));
Kenton Varda's avatar
Kenton Varda committed
983 984 985 986
        ++exp.refcount;
        descriptor.setSenderHosted(iter->second);
        return iter->second;
      } else {
987
        // This is the first time we've seen this capability.
Kenton Varda's avatar
Kenton Varda committed
988
        ExportId exportId;
989 990
        auto& exp = exports.next(exportId);
        exportsByCap[inner] = exportId;
Kenton Varda's avatar
Kenton Varda committed
991
        exp.refcount = 1;
992 993 994 995
        exp.clientHook = inner->addRef();

        KJ_IF_MAYBE(wrapped, inner->whenMoreResolved()) {
          // This is a promise.  Arrange for the `Resolve` message to be sent later.
Kenton Varda's avatar
Kenton Varda committed
996
          exp.resolveOp = resolveExportedPromise(exportId, kj::mv(*wrapped));
997 998 999
          descriptor.setSenderPromise(exportId);
        } else {
          descriptor.setSenderHosted(exportId);
1000 1001
        }

Kenton Varda's avatar
Kenton Varda committed
1002 1003
        return exportId;
      }
Kenton Varda's avatar
Kenton Varda committed
1004 1005 1006
    }
  }

1007
  kj::Array<ExportId> writeDescriptors(kj::ArrayPtr<kj::Maybe<kj::Own<ClientHook>>> capTable,
1008 1009 1010 1011
                                       rpc::Payload::Builder payload) {
    auto capTableBuilder = payload.initCapTable(capTable.size());
    kj::Vector<ExportId> exports(capTable.size());
    for (uint i: kj::indices(capTable)) {
1012 1013 1014 1015 1016 1017
      KJ_IF_MAYBE(cap, capTable[i]) {
        KJ_IF_MAYBE(exportId, writeDescriptor(**cap, capTableBuilder[i])) {
          exports.add(*exportId);
        }
      } else {
        capTableBuilder[i].setNone();
1018 1019 1020 1021 1022
      }
    }
    return exports.releaseAsArray();
  }

1023
  kj::Maybe<kj::Own<ClientHook>> writeTarget(ClientHook& cap, rpc::MessageTarget::Builder target) {
Kenton Varda's avatar
Kenton Varda committed
1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
    // If calls to the given capability should pass over this connection, fill in `target`
    // appropriately for such a call and return nullptr.  Otherwise, return a `ClientHook` to which
    // the call should be forwarded; the caller should then delegate the call to that `ClientHook`.
    //
    // The main case where this ends up returning non-null is if `cap` is a promise that has
    // recently resolved.  The application might have started building a request before the promise
    // resolved, and so the request may have been built on the assumption that it would be sent over
    // this network connection, but then the promise resolved to point somewhere else before the
    // request was sent.  Now the request has to be redirected to the new target instead.

    if (cap.getBrand() == this) {
1035
      return kj::downcast<RpcClient>(cap).writeTarget(target);
Kenton Varda's avatar
Kenton Varda committed
1036 1037 1038 1039 1040
    } else {
      return cap.addRef();
    }
  }

1041 1042
  kj::Own<ClientHook> getInnermostClient(ClientHook& client) {
    ClientHook* ptr = &client;
Kenton Varda's avatar
Kenton Varda committed
1043 1044 1045 1046 1047 1048 1049 1050 1051
    for (;;) {
      KJ_IF_MAYBE(inner, ptr->getResolved()) {
        ptr = inner;
      } else {
        break;
      }
    }

    if (ptr->getBrand() == this) {
1052
      return kj::downcast<RpcClient>(*ptr).getInnermostClient();
Kenton Varda's avatar
Kenton Varda committed
1053 1054 1055 1056 1057 1058
    } else {
      return ptr->addRef();
    }
  }

  kj::Promise<void> resolveExportedPromise(
1059
      ExportId exportId, kj::Promise<kj::Own<ClientHook>>&& promise) {
Kenton Varda's avatar
Kenton Varda committed
1060 1061 1062 1063
    // Implements exporting of a promise.  The promise has been exported under the given ID, and is
    // to eventually resolve to the ClientHook produced by `promise`.  This method waits for that
    // resolve to happen and then sends the appropriate `Resolve` message to the peer.

1064
    return promise.then(
1065
        [this,exportId](kj::Own<ClientHook>&& resolution) -> kj::Promise<void> {
Kenton Varda's avatar
Kenton Varda committed
1066 1067
      // Successful resolution.

1068 1069 1070 1071 1072
      KJ_ASSERT(connection.is<Connected>(),
                "Resolving export should have been canceled on disconnect.") {
        return kj::READY_NOW;
      }

Kenton Varda's avatar
Kenton Varda committed
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083
      // Get the innermost ClientHook backing the resolved client.  This includes traversing
      // PromiseClients that haven't resolved yet to their underlying ImportClient or
      // PipelineClient, so that we get a remote promise that might resolve later.  This is
      // important to make sure that if the peer sends a `Disembargo` back to us, it bounces back
      // correctly instead of going to the result of some future resolution.  See the documentation
      // for `Disembargo` in `rpc.capnp`.
      resolution = getInnermostClient(*resolution);

      // Update the export table to point at this object instead.  We know that our entry in the
      // export table is still live because when it is destroyed the asynchronous resolution task
      // (i.e. this code) is canceled.
1084 1085
      auto& exp = KJ_ASSERT_NONNULL(exports.find(exportId));
      exportsByCap.erase(exp.clientHook);
Kenton Varda's avatar
Kenton Varda committed
1086 1087
      exp.clientHook = kj::mv(resolution);

1088
      if (exp.clientHook->getBrand() != this) {
Kenton Varda's avatar
Kenton Varda committed
1089 1090 1091
        // We're resolving to a local capability.  If we're resolving to a promise, we might be
        // able to reuse our export table entry and avoid sending a message.

1092
        KJ_IF_MAYBE(promise, exp.clientHook->whenMoreResolved()) {
Kenton Varda's avatar
Kenton Varda committed
1093 1094 1095 1096
          // We're replacing a promise with another local promise.  In this case, we might actually
          // be able to just reuse the existing export table entry to represent the new promise --
          // unless it already has an entry.  Let's check.

1097
          auto insertResult = exportsByCap.insert(std::make_pair(exp.clientHook.get(), exportId));
Kenton Varda's avatar
Kenton Varda committed
1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108

          if (insertResult.second) {
            // The new promise was not already in the table, therefore the existing export table
            // entry has now been repurposed to represent it.  There is no need to send a resolve
            // message at all.  We do, however, have to start resolving the next promise.
            return resolveExportedPromise(exportId, kj::mv(*promise));
          }
        }
      }

      // OK, we have to send a `Resolve` message.
1109
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1110 1111 1112
          messageSizeHint<rpc::Resolve>() + sizeInWords<rpc::CapDescriptor>() + 16);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
1113
      writeDescriptor(*exp.clientHook, resolve.initCap());
Kenton Varda's avatar
Kenton Varda committed
1114 1115 1116 1117 1118
      message->send();

      return kj::READY_NOW;
    }, [this,exportId](kj::Exception&& exception) {
      // send error resolution
1119
      auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
1120 1121 1122 1123 1124
          messageSizeHint<rpc::Resolve>() + exceptionSizeHint(exception) + 8);
      auto resolve = message->getBody().initAs<rpc::Message>().initResolve();
      resolve.setPromiseId(exportId);
      fromException(exception, resolve.initException());
      message->send();
1125 1126 1127
    }).eagerlyEvaluate([this](kj::Exception&& exception) {
      // Put the exception on the TaskSet which will cause the connection to be terminated.
      tasks.add(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1128 1129 1130
    });
  }

Kenton Varda's avatar
Kenton Varda committed
1131
  // =====================================================================================
1132
  // Interpreting CapDescriptor
Kenton Varda's avatar
Kenton Varda committed
1133

1134
  kj::Own<ClientHook> import(ImportId importId, bool isPromise) {
1135
    // Receive a new import.
Kenton Varda's avatar
Kenton Varda committed
1136

1137 1138
    auto& import = imports[importId];
    kj::Own<ImportClient> importClient;
Kenton Varda's avatar
Kenton Varda committed
1139

1140 1141 1142 1143 1144 1145
    // Create the ImportClient, or if one already exists, use it.
    KJ_IF_MAYBE(c, import.importClient) {
      importClient = kj::addRef(*c);
    } else {
      importClient = kj::refcounted<ImportClient>(*this, importId);
      import.importClient = *importClient;
Kenton Varda's avatar
Kenton Varda committed
1146
    }
1147

1148 1149
    // We just received a copy of this import ID, so the remote refcount has gone up.
    importClient->addRemoteRef();
Kenton Varda's avatar
Kenton Varda committed
1150

1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168
    if (isPromise) {
      // We need to construct a PromiseClient around this import, if we haven't already.
      KJ_IF_MAYBE(c, import.appClient) {
        // Use the existing one.
        return kj::addRef(*c);
      } else {
        // Create a promise for this import's resolution.
        auto paf = kj::newPromiseAndFulfiller<kj::Own<ClientHook>>();
        import.promiseFulfiller = kj::mv(paf.fulfiller);

        // Make sure the import is not destroyed while this promise exists.
        paf.promise = paf.promise.attach(kj::addRef(*importClient));

        // Create a PromiseClient around it and return it.
        auto result = kj::refcounted<PromiseClient>(
            *this, kj::mv(importClient), kj::mv(paf.promise), importId);
        import.appClient = *result;
        return kj::mv(result);
1169
      }
1170 1171 1172
    } else {
      import.appClient = *importClient;
      return kj::mv(importClient);
1173
    }
1174
  }
1175

1176
  kj::Maybe<kj::Own<ClientHook>> receiveCap(rpc::CapDescriptor::Reader descriptor) {
1177 1178
    switch (descriptor.which()) {
      case rpc::CapDescriptor::NONE:
1179
        return nullptr;
1180

1181 1182 1183 1184
      case rpc::CapDescriptor::SENDER_HOSTED:
        return import(descriptor.getSenderHosted(), false);
      case rpc::CapDescriptor::SENDER_PROMISE:
        return import(descriptor.getSenderPromise(), true);
1185

1186 1187 1188 1189
      case rpc::CapDescriptor::RECEIVER_HOSTED:
        KJ_IF_MAYBE(exp, exports.find(descriptor.getReceiverHosted())) {
          return exp->clientHook->addRef();
        } else {
1190 1191 1192
          return newBrokenCap("invalid 'receiverHosted' export ID");
        }

1193 1194
      case rpc::CapDescriptor::RECEIVER_ANSWER: {
        auto promisedAnswer = descriptor.getReceiverAnswer();
1195

1196 1197 1198 1199 1200 1201 1202
        KJ_IF_MAYBE(answer, answers.find(promisedAnswer.getQuestionId())) {
          if (answer->active) {
            KJ_IF_MAYBE(pipeline, answer->pipeline) {
              KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
                return pipeline->get()->getPipelinedCap(*ops);
              } else {
                return newBrokenCap("unrecognized pipeline ops");
Kenton Varda's avatar
Kenton Varda committed
1203 1204
              }
            }
1205
          }
1206 1207
        }

1208
        return newBrokenCap("invalid 'receiverAnswer'");
1209 1210
      }

1211 1212 1213
      case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
        // We don't support third-party caps, so use the vine instead.
        return import(descriptor.getThirdPartyHosted().getVineId(), false);
Kenton Varda's avatar
Kenton Varda committed
1214

1215 1216 1217
      default:
        KJ_FAIL_REQUIRE("unknown CapDescriptor type") { break; }
        return newBrokenCap("unknown CapDescriptor type");
Kenton Varda's avatar
Kenton Varda committed
1218
    }
1219
  }
1220

1221 1222
  kj::Array<kj::Maybe<kj::Own<ClientHook>>> receiveCaps(List<rpc::CapDescriptor>::Reader capTable) {
    auto result = kj::heapArrayBuilder<kj::Maybe<kj::Own<ClientHook>>>(capTable.size());
1223 1224
    for (auto cap: capTable) {
      result.add(receiveCap(cap));
Kenton Varda's avatar
Kenton Varda committed
1225
    }
1226 1227
    return result.finish();
  }
1228 1229

  // =====================================================================================
Kenton Varda's avatar
Kenton Varda committed
1230
  // RequestHook/PipelineHook/ResponseHook implementations
1231

Kenton Varda's avatar
Kenton Varda committed
1232 1233 1234 1235
  class QuestionRef: public kj::Refcounted {
    // A reference to an entry on the question table.  Used to detect when the `Finish` message
    // can be sent.

Kenton Varda's avatar
Kenton Varda committed
1236
  public:
1237
    inline QuestionRef(
1238 1239 1240
        RpcConnectionState& connectionState, QuestionId id,
        kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller)
        : connectionState(kj::addRef(connectionState)), id(id), fulfiller(kj::mv(fulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1241 1242

    ~QuestionRef() {
1243
      unwindDetector.catchExceptionsIfUnwinding([&]() {
1244 1245 1246
        auto& question = KJ_ASSERT_NONNULL(
            connectionState->questions.find(id), "Question ID no longer on table?");

1247
        // Send the "Finish" message (if the connection is not already broken).
1248 1249
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1250
              messageSizeHint<rpc::Finish>());
1251 1252
          auto builder = message->getBody().getAs<rpc::Message>().initFinish();
          builder.setQuestionId(id);
1253 1254 1255 1256 1257
          // If we're still awaiting a return, then this request is being canceled, and we're going
          // to ignore any capabilities in the return message, so set releaseResultCaps true. If we
          // already received the return, then we've already built local proxies for the caps and
          // will send Release messages when those are destoryed.
          builder.setReleaseResultCaps(question.isAwaitingReturn);
1258
          message->send();
1259
        }
Kenton Varda's avatar
Kenton Varda committed
1260

1261 1262 1263
        // Check if the question has returned and, if so, remove it from the table.
        // Remove question ID from the table.  Must do this *after* sending `Finish` to ensure that
        // the ID is not re-allocated before the `Finish` message can be sent.
1264 1265
        if (question.isAwaitingReturn) {
          // Still waiting for return, so just remove the QuestionRef pointer from the table.
1266
          question.selfRef = nullptr;
1267 1268 1269
        } else {
          // Call has already returned, so we can now remove it from the table.
          connectionState->questions.erase(id, question);
1270 1271
        }
      });
Kenton Varda's avatar
Kenton Varda committed
1272 1273 1274 1275
    }

    inline QuestionId getId() const { return id; }

1276
    void fulfill(kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1277 1278 1279
      fulfiller->fulfill(kj::mv(response));
    }

1280
    void fulfill(kj::Promise<kj::Own<RpcResponse>>&& promise) {
1281 1282 1283
      fulfiller->fulfill(kj::mv(promise));
    }

Kenton Varda's avatar
Kenton Varda committed
1284 1285
    void reject(kj::Exception&& exception) {
      fulfiller->reject(kj::mv(exception));
1286
    }
Kenton Varda's avatar
Kenton Varda committed
1287 1288

  private:
1289
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1290
    QuestionId id;
1291
    kj::Own<kj::PromiseFulfiller<kj::Promise<kj::Own<RpcResponse>>>> fulfiller;
1292
    kj::UnwindDetector unwindDetector;
Kenton Varda's avatar
Kenton Varda committed
1293 1294
  };

Kenton Varda's avatar
Kenton Varda committed
1295
  class RpcRequest final: public RequestHook {
1296
  public:
1297 1298
    RpcRequest(RpcConnectionState& connectionState, VatNetworkBase::Connection& connection,
               kj::Maybe<MessageSize> sizeHint, kj::Own<RpcClient>&& target)
1299
        : connectionState(kj::addRef(connectionState)),
Kenton Varda's avatar
Kenton Varda committed
1300
          target(kj::mv(target)),
1301
          message(connection.newOutgoingMessage(
1302 1303
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Call>() +
                  sizeInWords<rpc::Payload>() + MESSAGE_TARGET_SIZE_HINT))),
Kenton Varda's avatar
Kenton Varda committed
1304
          callBuilder(message->getBody().getAs<rpc::Message>().initCall()),
1305
          paramsBuilder(callBuilder.getParams().getContent()) {}
Kenton Varda's avatar
Kenton Varda committed
1306

1307
    inline AnyPointer::Builder getRoot() {
Kenton Varda's avatar
Kenton Varda committed
1308 1309
      return paramsBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1310 1311 1312
    inline rpc::Call::Builder getCall() {
      return callBuilder;
    }
Kenton Varda's avatar
Kenton Varda committed
1313

1314
    RemotePromise<AnyPointer> send() override {
1315
      if (!connectionState->connection.is<Connected>()) {
1316
        // Connection is broken.
1317
        const kj::Exception& e = connectionState->connection.get<Disconnected>();
1318
        return RemotePromise<AnyPointer>(
1319 1320
            kj::Promise<Response<AnyPointer>>(kj::cp(e)),
            AnyPointer::Pipeline(newBrokenPipeline(kj::cp(e))));
1321
      }
Kenton Varda's avatar
Kenton Varda committed
1322

1323 1324 1325
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // We'll have to make a new request and do a copy.  Ick.
Kenton Varda's avatar
Kenton Varda committed
1326

1327
        auto replacement = redirect->get()->newCall(
1328
            callBuilder.getInterfaceId(), callBuilder.getMethodId(), paramsBuilder.targetSize());
1329 1330 1331
        replacement.set(paramsBuilder);
        return replacement.send();
      } else {
1332
        auto sendResult = sendInternal(false);
Kenton Varda's avatar
Kenton Varda committed
1333

1334
        auto forkedPromise = sendResult.promise.fork();
1335

1336 1337 1338
        // The pipeline must get notified of resolution before the app does to maintain ordering.
        auto pipeline = kj::refcounted<RpcPipeline>(
            *connectionState, kj::mv(sendResult.questionRef), forkedPromise.addBranch());
Kenton Varda's avatar
Kenton Varda committed
1339

1340 1341 1342 1343 1344
        auto appPromise = forkedPromise.addBranch().then(
            [=](kj::Own<RpcResponse>&& response) {
              auto reader = response->getResults();
              return Response<AnyPointer>(reader, kj::mv(response));
            });
Kenton Varda's avatar
Kenton Varda committed
1345

1346 1347 1348 1349
        return RemotePromise<AnyPointer>(
            kj::mv(appPromise),
            AnyPointer::Pipeline(kj::mv(pipeline)));
      }
Kenton Varda's avatar
Kenton Varda committed
1350 1351
    }

1352 1353 1354
    struct TailInfo {
      QuestionId questionId;
      kj::Promise<void> promise;
1355
      kj::Own<PipelineHook> pipeline;
1356 1357 1358 1359 1360 1361 1362 1363 1364 1365
    };

    kj::Maybe<TailInfo> tailSend() {
      // Send the request as a tail call.
      //
      // Returns null if for some reason a tail call is not possible and the caller should fall
      // back to using send() and copying the response.

      SendInternalResult sendResult;

1366
      if (!connectionState->connection.is<Connected>()) {
1367 1368 1369
        // Disconnected; fall back to a regular send() which will fail appropriately.
        return nullptr;
      }
1370

1371 1372 1373 1374 1375 1376
      KJ_IF_MAYBE(redirect, target->writeTarget(callBuilder.getTarget())) {
        // Whoops, this capability has been redirected while we were building the request!
        // Fall back to regular send().
        return nullptr;
      } else {
        sendResult = sendInternal(true);
1377 1378
      }

1379
      auto promise = sendResult.promise.then([](kj::Own<RpcResponse>&& response) {
1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390
        // Response should be null if `Return` handling code is correct.
        KJ_ASSERT(!response) { break; }
      });

      QuestionId questionId = sendResult.questionRef->getId();

      auto pipeline = kj::refcounted<RpcPipeline>(*connectionState, kj::mv(sendResult.questionRef));

      return TailInfo { questionId, kj::mv(promise), kj::mv(pipeline) };
    }

1391
    const void* getBrand() override {
1392 1393 1394
      return connectionState.get();
    }

Kenton Varda's avatar
Kenton Varda committed
1395
  private:
1396
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1397

1398
    kj::Own<RpcClient> target;
Kenton Varda's avatar
Kenton Varda committed
1399 1400
    kj::Own<OutgoingRpcMessage> message;
    rpc::Call::Builder callBuilder;
1401
    AnyPointer::Builder paramsBuilder;
1402 1403 1404

    struct SendInternalResult {
      kj::Own<QuestionRef> questionRef;
1405
      kj::Promise<kj::Own<RpcResponse>> promise = nullptr;
1406 1407
    };

1408
    SendInternalResult sendInternal(bool isTailCall) {
1409 1410
      // Build the cap table.
      auto exports = connectionState->writeDescriptors(
1411
          message->getCapTable(), callBuilder.getParams());
1412

1413
      // Init the question table.  Do this after writing descriptors to avoid interference.
1414
      QuestionId questionId;
1415
      auto& question = connectionState->questions.next(questionId);
1416 1417 1418
      question.isAwaitingReturn = true;
      question.paramExports = kj::mv(exports);
      question.isTailCall = isTailCall;
1419

1420
      // Finish and send.
1421 1422 1423 1424
      callBuilder.setQuestionId(questionId);
      if (isTailCall) {
        callBuilder.getSendResultsTo().setYourself();
      }
1425
      message->send();
1426

1427
      // Make the result promise.
1428
      SendInternalResult result;
1429
      auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
1430
      result.questionRef = kj::refcounted<QuestionRef>(
1431
          *connectionState, questionId, kj::mv(paf.fulfiller));
1432
      question.selfRef = *result.questionRef;
1433
      result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
1434

1435
      // Send and return.
1436 1437
      return kj::mv(result);
    }
Kenton Varda's avatar
Kenton Varda committed
1438 1439
  };

Kenton Varda's avatar
Kenton Varda committed
1440
  class RpcPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1441
  public:
1442 1443
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef,
                kj::Promise<kj::Own<RpcResponse>>&& redirectLaterParam)
1444
        : connectionState(kj::addRef(connectionState)),
1445 1446 1447
          redirectLater(redirectLaterParam.fork()),
          resolveSelfPromise(KJ_ASSERT_NONNULL(redirectLater).addBranch().then(
              [this](kj::Own<RpcResponse>&& response) {
Kenton Varda's avatar
Kenton Varda committed
1448
                resolve(kj::mv(response));
1449
              }, [this](kj::Exception&& exception) {
Kenton Varda's avatar
Kenton Varda committed
1450
                resolve(kj::mv(exception));
1451 1452 1453 1454
              }).eagerlyEvaluate([&](kj::Exception&& e) {
                // Make any exceptions thrown from resolve() go to the connection's TaskSet which
                // will cause the connection to be terminated.
                connectionState.tasks.add(kj::mv(e));
Kenton Varda's avatar
Kenton Varda committed
1455 1456 1457
              })) {
      // Construct a new RpcPipeline.

1458
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1459
    }
Kenton Varda's avatar
Kenton Varda committed
1460

1461
    RpcPipeline(RpcConnectionState& connectionState, kj::Own<QuestionRef>&& questionRef)
1462 1463 1464 1465
        : connectionState(kj::addRef(connectionState)),
          resolveSelfPromise(nullptr) {
      // Construct a new RpcPipeline that is never expected to resolve.

1466
      state.init<Waiting>(kj::mv(questionRef));
Kenton Varda's avatar
Kenton Varda committed
1467
    }
Kenton Varda's avatar
Kenton Varda committed
1468 1469 1470

    // implements PipelineHook ---------------------------------------

1471
    kj::Own<PipelineHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1472 1473 1474
      return kj::addRef(*this);
    }

1475
    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
Kenton Varda's avatar
Kenton Varda committed
1476 1477 1478 1479 1480 1481 1482
      auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
      for (auto& op: ops) {
        copy.add(op);
      }
      return getPipelinedCap(copy.finish());
    }

1483
    kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override {
1484
      if (state.is<Waiting>()) {
1485 1486
        // Wrap a PipelineClient in a PromiseClient.
        auto pipelineClient = kj::refcounted<PipelineClient>(
1487
            *connectionState, kj::addRef(*state.get<Waiting>()), kj::heapArray(ops.asPtr()));
1488

1489
        KJ_IF_MAYBE(r, redirectLater) {
1490 1491 1492 1493
          auto resolutionPromise = r->addBranch().then(kj::mvCapture(ops,
              [](kj::Array<PipelineOp> ops, kj::Own<RpcResponse>&& response) {
                return response->getResults().getPipelinedCap(ops);
              }));
1494

1495 1496 1497 1498 1499 1500
          return kj::refcounted<PromiseClient>(
              *connectionState, kj::mv(pipelineClient), kj::mv(resolutionPromise), nullptr);
        } else {
          // Oh, this pipeline will never get redirected, so just return the PipelineClient.
          return kj::mv(pipelineClient);
        }
1501 1502
      } else if (state.is<Resolved>()) {
        return state.get<Resolved>()->getResults().getPipelinedCap(ops);
Kenton Varda's avatar
Kenton Varda committed
1503
      } else {
1504
        return newBrokenCap(kj::cp(state.get<Broken>()));
Kenton Varda's avatar
Kenton Varda committed
1505
      }
Kenton Varda's avatar
Kenton Varda committed
1506 1507 1508
    }

  private:
1509 1510
    kj::Own<RpcConnectionState> connectionState;
    kj::Maybe<kj::ForkedPromise<kj::Own<RpcResponse>>> redirectLater;
Kenton Varda's avatar
Kenton Varda committed
1511

1512 1513
    typedef kj::Own<QuestionRef> Waiting;
    typedef kj::Own<RpcResponse> Resolved;
Kenton Varda's avatar
Kenton Varda committed
1514
    typedef kj::Exception Broken;
1515
    kj::OneOf<Waiting, Resolved, Broken> state;
Kenton Varda's avatar
Kenton Varda committed
1516 1517 1518 1519 1520

    // Keep this last, because the continuation uses *this, so it should be destroyed first to
    // ensure the continuation is not still running.
    kj::Promise<void> resolveSelfPromise;

1521
    void resolve(kj::Own<RpcResponse>&& response) {
1522 1523
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Resolved>(kj::mv(response));
Kenton Varda's avatar
Kenton Varda committed
1524 1525 1526
    }

    void resolve(const kj::Exception&& exception) {
1527 1528
      KJ_ASSERT(state.is<Waiting>(), "Already resolved?");
      state.init<Broken>(kj::mv(exception));
Kenton Varda's avatar
Kenton Varda committed
1529
    }
Kenton Varda's avatar
Kenton Varda committed
1530 1531
  };

1532 1533
  class RpcResponse: public ResponseHook {
  public:
1534
    virtual AnyPointer::Reader getResults() = 0;
1535
    virtual kj::Own<RpcResponse> addRef() = 0;
1536 1537 1538
  };

  class RpcResponseImpl final: public RpcResponse, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
1539
  public:
1540
    RpcResponseImpl(RpcConnectionState& connectionState,
1541 1542
                    kj::Own<QuestionRef>&& questionRef,
                    kj::Own<IncomingRpcMessage>&& message,
1543
                    AnyPointer::Reader results)
1544 1545
        : connectionState(kj::addRef(connectionState)),
          message(kj::mv(message)),
1546
          reader(results),
Kenton Varda's avatar
Kenton Varda committed
1547
          questionRef(kj::mv(questionRef)) {}
Kenton Varda's avatar
Kenton Varda committed
1548

1549
    AnyPointer::Reader getResults() override {
Kenton Varda's avatar
Kenton Varda committed
1550 1551 1552
      return reader;
    }

1553
    kj::Own<RpcResponse> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
1554 1555 1556
      return kj::addRef(*this);
    }

Kenton Varda's avatar
Kenton Varda committed
1557
  private:
1558
    kj::Own<RpcConnectionState> connectionState;
Kenton Varda's avatar
Kenton Varda committed
1559
    kj::Own<IncomingRpcMessage> message;
1560
    AnyPointer::Reader reader;
1561
    kj::Own<QuestionRef> questionRef;
Kenton Varda's avatar
Kenton Varda committed
1562 1563 1564 1565 1566 1567
  };

  // =====================================================================================
  // CallContextHook implementation

  class RpcServerResponse {
Kenton Varda's avatar
Kenton Varda committed
1568
  public:
1569
    virtual AnyPointer::Builder getResultsBuilder() = 0;
1570 1571 1572 1573
  };

  class RpcServerResponseImpl final: public RpcServerResponse {
  public:
1574
    RpcServerResponseImpl(RpcConnectionState& connectionState,
1575
                          kj::Own<OutgoingRpcMessage>&& message,
1576 1577 1578
                          rpc::Payload::Builder payload)
        : connectionState(connectionState),
          message(kj::mv(message)),
1579
          payload(payload) {}
Kenton Varda's avatar
Kenton Varda committed
1580

1581
    AnyPointer::Builder getResultsBuilder() override {
1582
      return payload.getContent();
Kenton Varda's avatar
Kenton Varda committed
1583 1584
    }

1585 1586 1587 1588 1589
    kj::Maybe<kj::Array<ExportId>> send() {
      // Send the response and return the export list.  Returns nullptr if there were no caps.
      // (Could return a non-null empty array if there were caps but none of them were exports.)

      // Build the cap table.
1590
      auto capTable = message->getCapTable();
1591 1592
      auto exports = connectionState.writeDescriptors(capTable, payload);

1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603
      // Capabilities that we are returning are subject to embargos. See `Disembargo` in rpc.capnp.
      // As explained there, in order to deal with the Tribble 4-way race condition, we need to
      // make sure that if we're returning any remote promises, that we ignore any subsequent
      // resolution of those promises for the purpose of pipelined requests on this answer. Luckily,
      // we can modify the cap table in-place.
      for (auto& slot: capTable) {
        KJ_IF_MAYBE(cap, slot) {
          slot = connectionState.getInnermostClient(**cap);
        }
      }

Kenton Varda's avatar
Kenton Varda committed
1604
      message->send();
1605 1606 1607 1608 1609
      if (capTable.size() == 0) {
        return nullptr;
      } else {
        return kj::mv(exports);
      }
Kenton Varda's avatar
Kenton Varda committed
1610 1611 1612
    }

  private:
1613
    RpcConnectionState& connectionState;
Kenton Varda's avatar
Kenton Varda committed
1614
    kj::Own<OutgoingRpcMessage> message;
1615
    rpc::Payload::Builder payload;
Kenton Varda's avatar
Kenton Varda committed
1616 1617
  };

1618 1619 1620
  class LocallyRedirectedRpcResponse final
      : public RpcResponse, public RpcServerResponse, public kj::Refcounted{
  public:
1621
    LocallyRedirectedRpcResponse(kj::Maybe<MessageSize> sizeHint)
1622 1623
        : message(sizeHint.map([](MessageSize size) { return size.wordCount; })
                          .orDefault(SUGGESTED_FIRST_SEGMENT_WORDS)) {}
1624

1625
    AnyPointer::Builder getResultsBuilder() override {
1626
      return message.getRoot<AnyPointer>();
1627 1628
    }

1629
    AnyPointer::Reader getResults() override {
1630
      return message.getRoot<AnyPointer>();
1631 1632
    }

1633
    kj::Own<RpcResponse> addRef() override {
1634 1635 1636 1637
      return kj::addRef(*this);
    }

  private:
1638
    MallocMessageBuilder message;
1639 1640
  };

Kenton Varda's avatar
Kenton Varda committed
1641 1642
  class RpcCallContext final: public CallContextHook, public kj::Refcounted {
  public:
1643
    RpcCallContext(RpcConnectionState& connectionState, AnswerId answerId,
1644
                   kj::Own<IncomingRpcMessage>&& request, const AnyPointer::Reader& params,
1645
                   bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller)
1646
        : connectionState(kj::addRef(connectionState)),
1647
          answerId(answerId),
Kenton Varda's avatar
Kenton Varda committed
1648
          request(kj::mv(request)),
1649
          params(params),
1650
          returnMessage(nullptr),
Kenton Varda's avatar
Kenton Varda committed
1651 1652
          redirectResults(redirectResults),
          cancelFulfiller(kj::mv(cancelFulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
1653

1654 1655 1656 1657
    ~RpcCallContext() noexcept(false) {
      if (isFirstResponder()) {
        // We haven't sent a return yet, so we must have been canceled.  Send a cancellation return.
        unwindDetector.catchExceptionsIfUnwinding([&]() {
Kenton Varda's avatar
Kenton Varda committed
1658
          // Don't send anything if the connection is broken.
1659 1660
          if (connectionState->connection.is<Connected>()) {
            auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1661
                messageSizeHint<rpc::Return>() + sizeInWords<rpc::Payload>());
Kenton Varda's avatar
Kenton Varda committed
1662
            auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1663

1664
            builder.setAnswerId(answerId);
1665
            builder.setReleaseParamCaps(false);
1666

Kenton Varda's avatar
Kenton Varda committed
1667 1668 1669 1670 1671 1672 1673 1674 1675
            if (redirectResults) {
              // The reason we haven't sent a return is because the results were sent somewhere
              // else.
              builder.setResultsSentElsewhere();
            } else {
              builder.setCanceled();
            }

            message->send();
1676
          }
1677

1678
          cleanupAnswerTable(nullptr, true);
1679 1680 1681 1682
        });
      }
    }

1683
    kj::Own<RpcResponse> consumeRedirectedResponse() {
1684 1685
      KJ_ASSERT(redirectResults);

1686
      if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
1687 1688 1689 1690 1691 1692

      // Note that the context needs to keep its own reference to the response so that it doesn't
      // get GC'd until the PipelineHook drops its reference to the context.
      return kj::downcast<LocallyRedirectedRpcResponse>(*KJ_ASSERT_NONNULL(response)).addRef();
    }

Kenton Varda's avatar
Kenton Varda committed
1693
    void sendReturn() {
1694
      KJ_ASSERT(!redirectResults);
1695 1696 1697 1698

      // Avoid sending results if canceled so that we don't have to figure out whether or not
      // `releaseResultCaps` was set in the already-received `Finish`.
      if (!(cancellationFlags & CANCEL_REQUESTED) && isFirstResponder()) {
1699 1700 1701 1702 1703
        KJ_ASSERT(connectionState->connection.is<Connected>(),
                  "Cancellation should have been requested on disconnect.") {
          return;
        }

1704
        if (response == nullptr) getResults(MessageSize{0, 0});  // force initialization of response
Kenton Varda's avatar
Kenton Varda committed
1705

1706
        returnMessage.setAnswerId(answerId);
1707
        returnMessage.setReleaseParamCaps(false);
Kenton Varda's avatar
Kenton Varda committed
1708

1709 1710 1711 1712 1713 1714 1715 1716
        auto exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send();
        KJ_IF_MAYBE(e, exports) {
          // Caps were returned, so we can't free the pipeline yet.
          cleanupAnswerTable(kj::mv(*e), false);
        } else {
          // No caps in the results, therefore the pipeline is irrelevant.
          cleanupAnswerTable(nullptr, true);
        }
Kenton Varda's avatar
Kenton Varda committed
1717 1718 1719
      }
    }
    void sendErrorReturn(kj::Exception&& exception) {
1720
      KJ_ASSERT(!redirectResults);
Kenton Varda's avatar
Kenton Varda committed
1721
      if (isFirstResponder()) {
1722 1723 1724 1725
        if (connectionState->connection.is<Connected>()) {
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
              messageSizeHint<rpc::Return>() + exceptionSizeHint(exception));
          auto builder = message->getBody().initAs<rpc::Message>().initReturn();
Kenton Varda's avatar
Kenton Varda committed
1726

1727 1728 1729
          builder.setAnswerId(answerId);
          builder.setReleaseParamCaps(false);
          fromException(exception, builder.initException());
Kenton Varda's avatar
Kenton Varda committed
1730

1731 1732
          message->send();
        }
1733 1734 1735 1736

        // Do not allow releasing the pipeline because we want pipelined calls to propagate the
        // exception rather than fail with a "no such field" exception.
        cleanupAnswerTable(nullptr, false);
Kenton Varda's avatar
Kenton Varda committed
1737 1738 1739
      }
    }

1740
    void requestCancel() {
Kenton Varda's avatar
Kenton Varda committed
1741 1742 1743 1744 1745 1746
      // Hints that the caller wishes to cancel this call.  At the next time when cancellation is
      // deemed safe, the RpcCallContext shall send a canceled Return -- or if it never becomes
      // safe, the RpcCallContext will send a normal return when the call completes.  Either way
      // the RpcCallContext is now responsible for cleaning up the entry in the answer table, since
      // a Finish message was already received.

1747 1748 1749 1750
      bool previouslyAllowedButNotRequested = cancellationFlags == CANCEL_ALLOWED;
      cancellationFlags |= CANCEL_REQUESTED;

      if (previouslyAllowedButNotRequested) {
Kenton Varda's avatar
Kenton Varda committed
1751
        // We just set CANCEL_REQUESTED, and CANCEL_ALLOWED was already set previously.  Initiate
Kenton Varda's avatar
Kenton Varda committed
1752
        // the cancellation.
Kenton Varda's avatar
Kenton Varda committed
1753
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1754
      }
Kenton Varda's avatar
Kenton Varda committed
1755
    }
1756 1757 1758

    // implements CallContextHook ------------------------------------

1759
    AnyPointer::Reader getParams() override {
1760 1761 1762 1763 1764 1765
      KJ_REQUIRE(request != nullptr, "Can't call getParams() after releaseParams().");
      return params;
    }
    void releaseParams() override {
      request = nullptr;
    }
1766
    AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
1767
      KJ_IF_MAYBE(r, response) {
1768
        return r->get()->getResultsBuilder();
1769
      } else {
1770 1771
        kj::Own<RpcServerResponse> response;

1772
        if (redirectResults || !connectionState->connection.is<Connected>()) {
1773
          response = kj::refcounted<LocallyRedirectedRpcResponse>(sizeHint);
1774
        } else {
1775
          auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
1776 1777
              firstSegmentSize(sizeHint, messageSizeHint<rpc::Return>() +
                               sizeInWords<rpc::Payload>()));
1778 1779 1780 1781 1782 1783
          returnMessage = message->getBody().initAs<rpc::Message>().initReturn();
          response = kj::heap<RpcServerResponseImpl>(
              *connectionState, kj::mv(message), returnMessage.getResults());
        }

        auto results = response->getResultsBuilder();
Kenton Varda's avatar
Kenton Varda committed
1784 1785
        this->response = kj::mv(response);
        return results;
1786 1787
      }
    }
1788 1789 1790
    kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
      auto result = directTailCall(kj::mv(request));
      KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
1791
        f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
1792 1793 1794 1795
      }
      return kj::mv(result.promise);
    }
    ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
1796 1797 1798
      KJ_REQUIRE(response == nullptr,
                 "Can't call tailCall() after initializing the results struct.");

1799
      if (request->getBrand() == connectionState.get() && !redirectResults) {
1800 1801 1802 1803 1804
        // The tail call is headed towards the peer that called us in the first place, so we can
        // optimize out the return trip.

        KJ_IF_MAYBE(tailInfo, kj::downcast<RpcRequest>(*request).tailSend()) {
          if (isFirstResponder()) {
1805 1806 1807 1808
            if (connectionState->connection.is<Connected>()) {
              auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
                  messageSizeHint<rpc::Return>());
              auto builder = message->getBody().initAs<rpc::Message>().initReturn();
1809

1810 1811 1812
              builder.setAnswerId(answerId);
              builder.setReleaseParamCaps(false);
              builder.setTakeFromOtherQuestion(tailInfo->questionId);
1813

1814 1815
              message->send();
            }
1816 1817 1818

            // There are no caps in our return message, but of course the tail results could have
            // caps, so we must continue to honor pipeline calls (and just bounce them back).
1819
            cleanupAnswerTable(nullptr, false);
1820
          }
1821
          return { kj::mv(tailInfo->promise), kj::mv(tailInfo->pipeline) };
1822 1823 1824 1825 1826 1827 1828
        }
      }

      // Just forwarding to another local call.
      auto promise = request->send();

      // Wait for response.
1829
      auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
1830 1831 1832
        // Copy the response.
        // TODO(perf):  It would be nice if we could somehow make the response get built in-place
        //   but requires some refactoring.
1833
        getResults(tailResponse.targetSize()).set(tailResponse);
1834
      });
1835 1836

      return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
1837
    }
1838 1839
    kj::Promise<AnyPointer::Pipeline> onTailCall() override {
      auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
1840 1841 1842
      tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
      return kj::mv(paf.promise);
    }
1843
    void allowCancellation() override {
1844 1845 1846 1847
      bool previouslyRequestedButNotAllowed = cancellationFlags == CANCEL_REQUESTED;
      cancellationFlags |= CANCEL_ALLOWED;

      if (previouslyRequestedButNotAllowed) {
Kenton Varda's avatar
Kenton Varda committed
1848 1849 1850
        // We just set CANCEL_ALLOWED, and CANCEL_REQUESTED was already set previously.  Initiate
        // the cancellation.
        cancelFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
1851
      }
1852 1853 1854 1855 1856 1857
    }
    kj::Own<CallContextHook> addRef() override {
      return kj::addRef(*this);
    }

  private:
1858
    kj::Own<RpcConnectionState> connectionState;
1859
    AnswerId answerId;
1860

Kenton Varda's avatar
Kenton Varda committed
1861 1862 1863
    // Request ---------------------------------------------

    kj::Maybe<kj::Own<IncomingRpcMessage>> request;
1864
    AnyPointer::Reader params;
Kenton Varda's avatar
Kenton Varda committed
1865

Kenton Varda's avatar
Kenton Varda committed
1866 1867 1868
    // Response --------------------------------------------

    kj::Maybe<kj::Own<RpcServerResponse>> response;
Kenton Varda's avatar
Kenton Varda committed
1869
    rpc::Return::Builder returnMessage;
1870
    bool redirectResults = false;
Kenton Varda's avatar
Kenton Varda committed
1871
    bool responseSent = false;
1872
    kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1873 1874 1875 1876 1877 1878 1879 1880

    // Cancellation state ----------------------------------

    enum CancellationFlags {
      CANCEL_REQUESTED = 1,
      CANCEL_ALLOWED = 2
    };

1881
    uint8_t cancellationFlags = 0;
1882
    // When both flags are set, the cancellation process will begin.
Kenton Varda's avatar
Kenton Varda committed
1883

1884
    kj::Own<kj::PromiseFulfiller<void>> cancelFulfiller;
Kenton Varda's avatar
Kenton Varda committed
1885 1886 1887
    // Fulfilled when cancellation has been both requested and permitted.  The fulfilled promise is
    // exclusive-joined with the outermost promise waiting on the call return, so fulfilling it
    // cancels that promise.
Kenton Varda's avatar
Kenton Varda committed
1888

1889 1890
    kj::UnwindDetector unwindDetector;

Kenton Varda's avatar
Kenton Varda committed
1891 1892 1893 1894 1895 1896 1897
    // -----------------------------------------------------

    bool isFirstResponder() {
      if (responseSent) {
        return false;
      } else {
        responseSent = true;
1898 1899 1900
        return true;
      }
    }
Kenton Varda's avatar
Kenton Varda committed
1901

1902
    void cleanupAnswerTable(kj::Array<ExportId> resultExports, bool shouldFreePipeline) {
1903 1904 1905
      // We need to remove the `callContext` pointer -- which points back to us -- from the
      // answer table.  Or we might even be responsible for removing the entire answer table
      // entry.
Kenton Varda's avatar
Kenton Varda committed
1906

1907
      if (cancellationFlags & CANCEL_REQUESTED) {
1908 1909 1910
        // Already received `Finish` so it's our job to erase the table entry. We shouldn't have
        // sent results if canceled, so we shouldn't have an export list to deal with.
        KJ_ASSERT(resultExports.size() == 0);
1911
        connectionState->answers.erase(answerId);
1912
      } else {
1913
        // We just have to null out callContext and set the exports.
1914
        auto& answer = connectionState->answers[answerId];
1915
        answer.callContext = nullptr;
1916 1917 1918 1919 1920 1921
        answer.resultExports = kj::mv(resultExports);

        if (shouldFreePipeline) {
          // We can free the pipeline early, because we know all pipeline calls are invalid (e.g.
          // because there are no caps in the result to receive pipeline requests).
          KJ_ASSERT(resultExports.size() == 0);
Kenton Varda's avatar
Kenton Varda committed
1922
          answer.pipeline = nullptr;
Kenton Varda's avatar
Kenton Varda committed
1923 1924 1925
        }
      }
    }
1926 1927
  };

Kenton Varda's avatar
Kenton Varda committed
1928 1929 1930
  // =====================================================================================
  // Message handling

1931
  kj::Promise<void> messageLoop() {
1932 1933 1934 1935 1936
    if (!connection.is<Connected>()) {
      return kj::READY_NOW;
    }

    return connection.get<Connected>()->receiveIncomingMessage().then(
1937
        [this](kj::Maybe<kj::Own<IncomingRpcMessage>>&& message) {
1938 1939
      KJ_IF_MAYBE(m, message) {
        handleMessage(kj::mv(*m));
1940
        return true;
1941
      } else {
1942
        disconnect(KJ_EXCEPTION(DISCONNECTED, "Peer disconnected."));
1943
        return false;
1944
      }
1945
    }).then([this](bool keepGoing) {
1946 1947 1948 1949
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
1950
      if (keepGoing) tasks.add(messageLoop());
1951
    });
1952 1953 1954 1955
  }

  void handleMessage(kj::Own<IncomingRpcMessage> message) {
    auto reader = message->getBody().getAs<rpc::Message>();
1956

1957 1958
    switch (reader.which()) {
      case rpc::Message::UNIMPLEMENTED:
Kenton Varda's avatar
Kenton Varda committed
1959
        handleUnimplemented(reader.getUnimplemented());
1960 1961 1962
        break;

      case rpc::Message::ABORT:
Kenton Varda's avatar
Kenton Varda committed
1963
        handleAbort(reader.getAbort());
1964 1965
        break;

1966 1967 1968 1969
      case rpc::Message::BOOTSTRAP:
        handleBootstrap(kj::mv(message), reader.getBootstrap());
        break;

1970
      case rpc::Message::CALL:
Kenton Varda's avatar
Kenton Varda committed
1971
        handleCall(kj::mv(message), reader.getCall());
1972 1973
        break;

Kenton Varda's avatar
Kenton Varda committed
1974
      case rpc::Message::RETURN:
Kenton Varda's avatar
Kenton Varda committed
1975
        handleReturn(kj::mv(message), reader.getReturn());
Kenton Varda's avatar
Kenton Varda committed
1976 1977 1978
        break;

      case rpc::Message::FINISH:
Kenton Varda's avatar
Kenton Varda committed
1979
        handleFinish(reader.getFinish());
Kenton Varda's avatar
Kenton Varda committed
1980 1981
        break;

Kenton Varda's avatar
Kenton Varda committed
1982
      case rpc::Message::RESOLVE:
1983
        handleResolve(reader.getResolve());
Kenton Varda's avatar
Kenton Varda committed
1984 1985 1986
        break;

      case rpc::Message::RELEASE:
1987
        handleRelease(reader.getRelease());
Kenton Varda's avatar
Kenton Varda committed
1988 1989
        break;

1990
      case rpc::Message::DISEMBARGO:
Kenton Varda's avatar
Kenton Varda committed
1991
        handleDisembargo(reader.getDisembargo());
1992 1993
        break;

1994
      default: {
1995 1996 1997 1998 1999 2000
        if (connection.is<Connected>()) {
          auto message = connection.get<Connected>()->newOutgoingMessage(
              firstSegmentSize(reader.totalSize(), messageSizeHint<void>()));
          message->getBody().initAs<rpc::Message>().setUnimplemented(reader);
          message->send();
        }
2001 2002 2003 2004 2005
        break;
      }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2006
  void handleUnimplemented(const rpc::Message::Reader& message) {
Kenton Varda's avatar
Kenton Varda committed
2007
    switch (message.which()) {
2008 2009 2010
      case rpc::Message::RESOLVE: {
        auto cap = message.getResolve().getCap();
        switch (cap.which()) {
2011 2012 2013
          case rpc::CapDescriptor::NONE:
            // Nothing to do (but this ought never to happen).
            break;
2014
          case rpc::CapDescriptor::SENDER_HOSTED:
2015
            releaseExport(cap.getSenderHosted(), 1);
2016 2017
            break;
          case rpc::CapDescriptor::SENDER_PROMISE:
2018
            releaseExport(cap.getSenderPromise(), 1);
2019 2020 2021 2022 2023 2024
            break;
          case rpc::CapDescriptor::RECEIVER_ANSWER:
          case rpc::CapDescriptor::RECEIVER_HOSTED:
            // Nothing to do.
            break;
          case rpc::CapDescriptor::THIRD_PARTY_HOSTED:
2025
            releaseExport(cap.getThirdPartyHosted().getVineId(), 1);
2026 2027
            break;
        }
Kenton Varda's avatar
Kenton Varda committed
2028
        break;
2029
      }
Kenton Varda's avatar
Kenton Varda committed
2030 2031 2032 2033 2034

      default:
        KJ_FAIL_ASSERT("Peer did not implement required RPC message type.", (uint)message.which());
        break;
    }
2035 2036
  }

Kenton Varda's avatar
Kenton Varda committed
2037
  void handleAbort(const rpc::Exception::Reader& exception) {
2038 2039 2040
    kj::throwRecoverableException(toException(exception));
  }

2041 2042 2043
  // ---------------------------------------------------------------------------
  // Level 0

2044 2045 2046 2047 2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126
  class SingleCapPipeline: public PipelineHook, public kj::Refcounted {
  public:
    SingleCapPipeline(kj::Own<ClientHook>&& cap)
        : cap(kj::mv(cap)) {}

    kj::Own<PipelineHook> addRef() override {
      return kj::addRef(*this);
    }

    kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
      if (ops.size() == 0) {
        return cap->addRef();
      } else {
        return newBrokenCap("Invalid pipeline transform.");
      }
    }

  private:
    kj::Own<ClientHook> cap;
  };

  void handleBootstrap(kj::Own<IncomingRpcMessage>&& message,
                       const rpc::Bootstrap::Reader& bootstrap) {
    AnswerId answerId = bootstrap.getQuestionId();

    if (!connection.is<Connected>()) {
      // Disconnected; ignore.
      return;
    }

    auto response = connection.get<Connected>()->newOutgoingMessage(
        messageSizeHint<rpc::Return>() + sizeInWords<rpc::CapDescriptor>() + 32);

    rpc::Return::Builder ret = response->getBody().getAs<rpc::Message>().initReturn();
    ret.setAnswerId(answerId);

    kj::Own<ClientHook> capHook;
    kj::Array<ExportId> resultExports;
    KJ_DEFER(releaseExports(resultExports));  // in case something goes wrong

    // Call the restorer and initialize the answer.
    KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
      Capability::Client cap = nullptr;
      KJ_IF_MAYBE(r, restorer) {
        cap = r->baseRestore(bootstrap.getDeprecatedObjectId());
      } else KJ_IF_MAYBE(b, bootstrapInterface) {
        if (bootstrap.hasDeprecatedObjectId()) {
          KJ_FAIL_REQUIRE("This vat only supports a bootstrap interface, not the old "
                          "Cap'n-Proto-0.4-style named exports.") { return; }
        } else {
          cap = *b;
        }
      } else {
        KJ_FAIL_REQUIRE("This vat does not expose any public/bootstrap interfaces.") { return; }
      }

      auto payload = ret.initResults();
      payload.getContent().setAs<Capability>(kj::mv(cap));

      auto capTable = response->getCapTable();
      KJ_DASSERT(capTable.size() == 1);
      resultExports = writeDescriptors(capTable, payload);
      capHook = KJ_ASSERT_NONNULL(capTable[0])->addRef();
    })) {
      fromException(*exception, ret.initException());
      capHook = newBrokenCap(kj::mv(*exception));
    }

    message = nullptr;

    // Add the answer to the answer table for pipelining and send the response.
    auto& answer = answers[answerId];
    KJ_REQUIRE(!answer.active, "questionId is already in use") {
      return;
    }

    answer.resultExports = kj::mv(resultExports);
    answer.active = true;
    answer.pipeline = kj::Own<PipelineHook>(kj::refcounted<SingleCapPipeline>(kj::mv(capHook)));

    response->send();
  }

Kenton Varda's avatar
Kenton Varda committed
2127
  void handleCall(kj::Own<IncomingRpcMessage>&& message, const rpc::Call::Reader& call) {
2128
    kj::Own<ClientHook> capability;
2129

Kenton Varda's avatar
Kenton Varda committed
2130 2131 2132 2133 2134
    KJ_IF_MAYBE(t, getMessageTarget(call.getTarget())) {
      capability = kj::mv(*t);
    } else {
      // Exception already reported.
      return;
2135 2136
    }

2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148
    bool redirectResults;
    switch (call.getSendResultsTo().which()) {
      case rpc::Call::SendResultsTo::CALLER:
        redirectResults = false;
        break;
      case rpc::Call::SendResultsTo::YOURSELF:
        redirectResults = true;
        break;
      default:
        KJ_FAIL_REQUIRE("Unsupported `Call.sendResultsTo`.") { return; }
    }

2149
    auto payload = call.getParams();
2150
    message->initCapTable(receiveCaps(payload.getCapTable()));
Kenton Varda's avatar
Kenton Varda committed
2151 2152
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

2153
    AnswerId answerId = call.getQuestionId();
Kenton Varda's avatar
Kenton Varda committed
2154

2155
    auto context = kj::refcounted<RpcCallContext>(
2156
        *this, answerId, kj::mv(message), payload.getContent(),
2157
        redirectResults, kj::mv(cancelPaf.fulfiller));
2158

2159
    // No more using `call` after this point, as it now belongs to the context.
2160 2161

    {
2162
      auto& answer = answers[answerId];
2163 2164 2165 2166 2167 2168

      KJ_REQUIRE(!answer.active, "questionId is already in use") {
        return;
      }

      answer.active = true;
Kenton Varda's avatar
Kenton Varda committed
2169
      answer.callContext = *context;
Kenton Varda's avatar
Kenton Varda committed
2170 2171
    }

2172 2173
    auto promiseAndPipeline = startCall(
        call.getInterfaceId(), call.getMethodId(), kj::mv(capability), context->addRef());
Kenton Varda's avatar
Kenton Varda committed
2174

2175
    // Things may have changed -- in particular if startCall() immediately called
Kenton Varda's avatar
Kenton Varda committed
2176 2177 2178
    // context->directTailCall().

    {
2179
      auto& answer = answers[answerId];
Kenton Varda's avatar
Kenton Varda committed
2180

2181 2182
      answer.pipeline = kj::mv(promiseAndPipeline.pipeline);

2183
      if (redirectResults) {
Kenton Varda's avatar
Kenton Varda committed
2184
        auto resultsPromise = promiseAndPipeline.promise.then(
2185 2186 2187
            kj::mvCapture(context, [](kj::Own<RpcCallContext>&& context) {
              return context->consumeRedirectedResponse();
            }));
Kenton Varda's avatar
Kenton Varda committed
2188 2189

        // If the call that later picks up `redirectedResults` decides to discard it, we need to
2190
        // make sure our call is not itself canceled unless it has called allowCancellation().
Kenton Varda's avatar
Kenton Varda committed
2191 2192
        // So we fork the promise and join one branch with the cancellation promise, in order to
        // hold on to it.
2193
        auto forked = resultsPromise.fork();
Kenton Varda's avatar
Kenton Varda committed
2194 2195
        answer.redirectedResults = forked.addBranch();

2196 2197 2198
        cancelPaf.promise
            .exclusiveJoin(forked.addBranch().then([](kj::Own<RpcResponse>&&){}))
            .detach([](kj::Exception&&) {});
2199 2200 2201 2202 2203
      } else {
        // Hack:  Both the success and error continuations need to use the context.  We could
        //   refcount, but both will be destroyed at the same time anyway.
        RpcCallContext* contextPtr = context;

2204
        promiseAndPipeline.promise.then(
2205 2206 2207 2208 2209 2210 2211
            [contextPtr]() {
              contextPtr->sendReturn();
            }, [contextPtr](kj::Exception&& exception) {
              contextPtr->sendErrorReturn(kj::mv(exception));
            }).then([]() {}, [&](kj::Exception&& exception) {
              // Handle exceptions that occur in sendReturn()/sendErrorReturn().
              taskFailed(kj::mv(exception));
2212 2213 2214
            }).attach(kj::mv(context))
            .exclusiveJoin(kj::mv(cancelPaf.promise))
            .detach([](kj::Exception&&) {});
2215
      }
2216 2217 2218
    }
  }

2219 2220 2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245
  ClientHook::VoidPromiseAndPipeline startCall(
      uint64_t interfaceId, uint64_t methodId,
      kj::Own<ClientHook>&& capability, kj::Own<CallContextHook>&& context) {
    if (interfaceId == typeId<Persistent<>>() && methodId == 0) {
      KJ_IF_MAYBE(g, gateway) {
        // Wait, this is a call to Persistent.save() and we need to translate it through our
        // gateway.

        auto params = context->getParams().getAs<Persistent<>::SaveParams>();

        auto requestSize = params.totalSize();
        ++requestSize.capCount;
        requestSize.wordCount += sizeInWords<RealmGateway<>::ExportParams>();

        auto request = g->exportRequest(requestSize);
        request.setCap(Persistent<>::Client(capability->addRef()));
        request.setParams(params);

        context->allowCancellation();
        context->releaseParams();
        return context->directTailCall(RequestHook::from(kj::mv(request)));
      }
    }

    return capability->call(interfaceId, methodId, context->addRef());
  }

2246
  kj::Maybe<kj::Own<ClientHook>> getMessageTarget(const rpc::MessageTarget::Reader& target) {
Kenton Varda's avatar
Kenton Varda committed
2247
    switch (target.which()) {
2248 2249
      case rpc::MessageTarget::IMPORTED_CAP: {
        KJ_IF_MAYBE(exp, exports.find(target.getImportedCap())) {
Kenton Varda's avatar
Kenton Varda committed
2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260
          return exp->clientHook->addRef();
        } else {
          KJ_FAIL_REQUIRE("Message target is not a current export ID.") {
            return nullptr;
          }
        }
        break;
      }

      case rpc::MessageTarget::PROMISED_ANSWER: {
        auto promisedAnswer = target.getPromisedAnswer();
2261
        kj::Own<PipelineHook> pipeline;
Kenton Varda's avatar
Kenton Varda committed
2262

2263 2264 2265 2266 2267 2268 2269 2270 2271
        auto& base = answers[promisedAnswer.getQuestionId()];
        KJ_REQUIRE(base.active, "PromisedAnswer.questionId is not a current question.") {
          return nullptr;
        }
        KJ_IF_MAYBE(p, base.pipeline) {
          pipeline = p->get()->addRef();
        } else {
          KJ_FAIL_REQUIRE("PromisedAnswer.questionId is already finished or contained no "
                          "capabilities.") {
Kenton Varda's avatar
Kenton Varda committed
2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288
            return nullptr;
          }
        }

        KJ_IF_MAYBE(ops, toPipelineOps(promisedAnswer.getTransform())) {
          return pipeline->getPipelinedCap(*ops);
        } else {
          // Exception already thrown.
          return nullptr;
        }
      }

      default:
        KJ_FAIL_REQUIRE("Unknown message target type.", target) {
          return nullptr;
        }
    }
2289 2290

    KJ_UNREACHABLE;
Kenton Varda's avatar
Kenton Varda committed
2291 2292
  }

Kenton Varda's avatar
Kenton Varda committed
2293
  void handleReturn(kj::Own<IncomingRpcMessage>&& message, const rpc::Return::Reader& ret) {
Kenton Varda's avatar
Kenton Varda committed
2294 2295
    // Transitive destructors can end up manipulating the question table and invalidating our
    // pointer into it, so make sure these destructors run later.
2296 2297
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2298
    kj::Maybe<kj::Promise<kj::Own<RpcResponse>>> promiseToRelease;
2299

2300
    KJ_IF_MAYBE(question, questions.find(ret.getAnswerId())) {
2301 2302
      KJ_REQUIRE(question->isAwaitingReturn, "Duplicate Return.") { return; }
      question->isAwaitingReturn = false;
Kenton Varda's avatar
Kenton Varda committed
2303

2304 2305
      if (ret.getReleaseParamCaps()) {
        exportsToRelease = kj::mv(question->paramExports);
Kenton Varda's avatar
Kenton Varda committed
2306
      } else {
2307
        question->paramExports = nullptr;
Kenton Varda's avatar
Kenton Varda committed
2308
      }
2309

2310 2311
      KJ_IF_MAYBE(questionRef, question->selfRef) {
        switch (ret.which()) {
2312
          case rpc::Return::RESULTS: {
2313 2314 2315
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `results`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2316
            }
Kenton Varda's avatar
Kenton Varda committed
2317

2318
            auto payload = ret.getResults();
2319
            message->initCapTable(receiveCaps(payload.getCapTable()));
2320
            questionRef->fulfill(kj::refcounted<RpcResponseImpl>(
2321
                *this, kj::addRef(*questionRef), kj::mv(message), payload.getContent()));
2322
            break;
2323
          }
2324

2325 2326 2327 2328
          case rpc::Return::EXCEPTION:
            KJ_REQUIRE(!question->isTailCall,
                "Tail call `Return` must set `resultsSentElsewhere`, not `exception`.") {
              return;
Kenton Varda's avatar
Kenton Varda committed
2329
            }
2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341

            questionRef->reject(toException(ret.getException()));
            break;

          case rpc::Return::CANCELED:
            KJ_FAIL_REQUIRE("Return message falsely claims call was canceled.") { return; }
            break;

          case rpc::Return::RESULTS_SENT_ELSEWHERE:
            KJ_REQUIRE(question->isTailCall,
                "`Return` had `resultsSentElsewhere` but this was not a tail call.") {
              return;
2342 2343
            }

2344 2345 2346 2347
            // Tail calls are fulfilled with a null pointer.
            questionRef->fulfill(kj::Own<RpcResponse>());
            break;

2348 2349
          case rpc::Return::TAKE_FROM_OTHER_QUESTION:
            KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2350 2351 2352 2353 2354
              KJ_IF_MAYBE(response, answer->redirectedResults) {
                questionRef->fulfill(kj::mv(*response));
              } else {
                KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` referenced a call that did not "
                                "use `sendResultsTo.yourself`.") { return; }
2355 2356
              }
            } else {
2357
              KJ_FAIL_REQUIRE("`Return.takeFromOtherAnswer` had invalid answer ID.") { return; }
2358 2359
            }

2360
            break;
2361

2362 2363 2364 2365
          default:
            KJ_FAIL_REQUIRE("Unknown 'Return' type.") { return; }
        }
      } else {
2366
        if (ret.isTakeFromOtherQuestion()) {
2367
          // Be sure to release the tail call's promise.
2368
          KJ_IF_MAYBE(answer, answers.find(ret.getTakeFromOtherQuestion())) {
2369 2370 2371
            promiseToRelease = kj::mv(answer->redirectedResults);
          }
        }
Kenton Varda's avatar
Kenton Varda committed
2372

2373 2374
        // Looks like this question was canceled earlier, so `Finish` was already sent, with
        // `releaseResultCaps` set true so that we don't have to release them here.  We can go
2375
        // ahead and delete it from the table.
2376
        questions.erase(ret.getAnswerId(), *question);
Kenton Varda's avatar
Kenton Varda committed
2377
      }
Kenton Varda's avatar
Kenton Varda committed
2378

Kenton Varda's avatar
Kenton Varda committed
2379 2380 2381 2382 2383
    } else {
      KJ_FAIL_REQUIRE("Invalid question ID in Return message.") { return; }
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2384
  void handleFinish(const rpc::Finish::Reader& finish) {
Kenton Varda's avatar
Kenton Varda committed
2385 2386
    // Delay release of these things until return so that transitive destructors don't accidentally
    // modify the answer table and invalidate our pointer into it.
2387 2388
    kj::Array<ExportId> exportsToRelease;
    KJ_DEFER(releaseExports(exportsToRelease));
2389
    Answer answerToRelease;
2390
    kj::Maybe<kj::Own<PipelineHook>> pipelineToRelease;
Kenton Varda's avatar
Kenton Varda committed
2391

2392
    KJ_IF_MAYBE(answer, answers.find(finish.getQuestionId())) {
2393
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2394

2395 2396 2397 2398
      if (finish.getReleaseResultCaps()) {
        exportsToRelease = kj::mv(answer->resultExports);
      } else {
        answer->resultExports = nullptr;
2399
      }
Kenton Varda's avatar
Kenton Varda committed
2400

2401 2402
      pipelineToRelease = kj::mv(answer->pipeline);

2403 2404 2405 2406 2407
      // If the call isn't actually done yet, cancel it.  Otherwise, we can go ahead and erase the
      // question from the table.
      KJ_IF_MAYBE(context, answer->callContext) {
        context->requestCancel();
      } else {
Kenton Varda's avatar
Kenton Varda committed
2408
        answerToRelease = answers.erase(finish.getQuestionId());
2409
      }
Kenton Varda's avatar
Kenton Varda committed
2410
    } else {
2411
      KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
Kenton Varda's avatar
Kenton Varda committed
2412
    }
2413 2414
  }

2415 2416 2417
  // ---------------------------------------------------------------------------
  // Level 1

2418
  void handleResolve(const rpc::Resolve::Reader& resolve) {
2419
    kj::Own<ClientHook> replacement;
2420
    kj::Maybe<kj::Exception> exception;
2421 2422 2423 2424

    // Extract the replacement capability.
    switch (resolve.which()) {
      case rpc::Resolve::CAP:
2425 2426 2427 2428 2429
        KJ_IF_MAYBE(cap, receiveCap(resolve.getCap())) {
          replacement = kj::mv(*cap);
        } else {
          KJ_FAIL_REQUIRE("'Resolve' contained 'CapDescriptor.none'.") { return; }
        }
2430 2431 2432
        break;

      case rpc::Resolve::EXCEPTION:
2433 2434 2435 2436
        // We can't set `replacement` to a new broken cap here because this will confuse
        // PromiseClient::Resolve() into thinking that the remote promise resolved to a local
        // capability and therefore a Disembargo is needed. We must actually reject the promise.
        exception = toException(resolve.getException());
2437 2438 2439 2440 2441 2442 2443
        break;

      default:
        KJ_FAIL_REQUIRE("Unknown 'Resolve' type.") { return; }
    }

    // If the import is on the table, fulfill it.
2444
    KJ_IF_MAYBE(import, imports.find(resolve.getPromiseId())) {
2445 2446
      KJ_IF_MAYBE(fulfiller, import->promiseFulfiller) {
        // OK, this is in fact an unfulfilled promise!
2447 2448 2449 2450 2451
        KJ_IF_MAYBE(e, exception) {
          fulfiller->get()->reject(kj::mv(*e));
        } else {
          fulfiller->get()->fulfill(kj::mv(replacement));
        }
2452 2453 2454 2455 2456 2457 2458 2459
      } else if (import->importClient != nullptr) {
        // It appears this is a valid entry on the import table, but was not expected to be a
        // promise.
        KJ_FAIL_REQUIRE("Got 'Resolve' for a non-promise import.") { break; }
      }
    }
  }

2460
  void handleRelease(const rpc::Release::Reader& release) {
Kenton Varda's avatar
Kenton Varda committed
2461
    releaseExport(release.getId(), release.getReferenceCount());
2462 2463
  }

Kenton Varda's avatar
Kenton Varda committed
2464
  void releaseExport(ExportId id, uint refcount) {
2465
    KJ_IF_MAYBE(exp, exports.find(id)) {
2466
      KJ_REQUIRE(refcount <= exp->refcount, "Tried to drop export's refcount below zero.") {
Kenton Varda's avatar
Kenton Varda committed
2467
        return;
2468 2469 2470 2471
      }

      exp->refcount -= refcount;
      if (exp->refcount == 0) {
2472
        exportsByCap.erase(exp->clientHook);
2473
        exports.erase(id, *exp);
2474 2475 2476
      }
    } else {
      KJ_FAIL_REQUIRE("Tried to release invalid export ID.") {
Kenton Varda's avatar
Kenton Varda committed
2477
        return;
2478 2479 2480 2481
      }
    }
  }

2482 2483 2484 2485 2486 2487
  void releaseExports(kj::ArrayPtr<ExportId> exports) {
    for (auto exportId: exports) {
      releaseExport(exportId, 1);
    }
  }

Kenton Varda's avatar
Kenton Varda committed
2488 2489 2490 2491
  void handleDisembargo(const rpc::Disembargo::Reader& disembargo) {
    auto context = disembargo.getContext();
    switch (context.which()) {
      case rpc::Disembargo::Context::SENDER_LOOPBACK: {
2492
        kj::Own<ClientHook> target;
Kenton Varda's avatar
Kenton Varda committed
2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514

        KJ_IF_MAYBE(t, getMessageTarget(disembargo.getTarget())) {
          target = kj::mv(*t);
        } else {
          // Exception already reported.
          return;
        }

        for (;;) {
          KJ_IF_MAYBE(r, target->getResolved()) {
            target = r->addRef();
          } else {
            break;
          }
        }

        KJ_REQUIRE(target->getBrand() == this,
                   "'Disembargo' of type 'senderLoopback' sent to an object that does not point "
                   "back to the sender.") {
          return;
        }

Kenton Varda's avatar
Kenton Varda committed
2515 2516 2517 2518
        EmbargoId embargoId = context.getSenderLoopback();

        // We need to insert an evalLater() here to make sure that any pending calls towards this
        // cap have had time to find their way through the event loop.
2519 2520
        tasks.add(kj::evalLater(kj::mvCapture(
            target, [this,embargoId](kj::Own<ClientHook>&& target) {
2521 2522 2523 2524
          if (!connection.is<Connected>()) {
            return;
          }

2525
          RpcClient& downcasted = kj::downcast<RpcClient>(*target);
Kenton Varda's avatar
Kenton Varda committed
2526

2527
          auto message = connection.get<Connected>()->newOutgoingMessage(
Kenton Varda's avatar
Kenton Varda committed
2528 2529 2530 2531 2532 2533
              messageSizeHint<rpc::Disembargo>() + MESSAGE_TARGET_SIZE_HINT);
          auto builder = message->getBody().initAs<rpc::Message>().initDisembargo();

          {
            auto redirect = downcasted.writeTarget(builder.initTarget());

2534
            // Disembargoes should only be sent to capabilities that were previously the subject of
Kenton Varda's avatar
Kenton Varda committed
2535
            // a `Resolve` message.  But `writeTarget` only ever returns non-null when called on
2536 2537 2538
            // a PromiseClient.  The code which sends `Resolve` and `Return` should have replaced
            // any promise with a direct node in order to solve the Tribble 4-way race condition.
            // See the documentation of Disembargo in rpc.capnp for more.
Kenton Varda's avatar
Kenton Varda committed
2539 2540
            KJ_REQUIRE(redirect == nullptr,
                       "'Disembargo' of type 'senderLoopback' sent to an object that does not "
2541
                       "appear to have been the subject of a previous 'Resolve' message.") {
Kenton Varda's avatar
Kenton Varda committed
2542 2543
              return;
            }
Kenton Varda's avatar
Kenton Varda committed
2544 2545
          }

Kenton Varda's avatar
Kenton Varda committed
2546
          builder.getContext().setReceiverLoopback(embargoId);
Kenton Varda's avatar
Kenton Varda committed
2547

Kenton Varda's avatar
Kenton Varda committed
2548 2549
          message->send();
        })));
Kenton Varda's avatar
Kenton Varda committed
2550 2551 2552 2553

        break;
      }

Kenton Varda's avatar
Kenton Varda committed
2554
      case rpc::Disembargo::Context::RECEIVER_LOOPBACK: {
2555
        KJ_IF_MAYBE(embargo, embargoes.find(context.getReceiverLoopback())) {
Kenton Varda's avatar
Kenton Varda committed
2556
          KJ_ASSERT_NONNULL(embargo->fulfiller)->fulfill();
2557
          embargoes.erase(context.getReceiverLoopback(), *embargo);
Kenton Varda's avatar
Kenton Varda committed
2558 2559 2560 2561 2562 2563
        } else {
          KJ_FAIL_REQUIRE("Invalid embargo ID in 'Disembargo.context.receiverLoopback'.") {
            return;
          }
        }
        break;
Kenton Varda's avatar
Kenton Varda committed
2564
      }
Kenton Varda's avatar
Kenton Varda committed
2565 2566 2567 2568 2569 2570

      default:
        KJ_FAIL_REQUIRE("Unimplemented Disembargo type.", disembargo) { return; }
    }
  }

2571 2572
  // ---------------------------------------------------------------------------
  // Level 2
2573 2574 2575 2576
};

}  // namespace

2577
class RpcSystemBase::Impl final: public kj::TaskSet::ErrorHandler {
2578
public:
2579 2580 2581 2582
  Impl(VatNetworkBase& network, kj::Maybe<Capability::Client> bootstrapInterface,
       kj::Maybe<RealmGateway<>::Client> gateway)
      : network(network), bootstrapInterface(kj::mv(bootstrapInterface)),
        gateway(kj::mv(gateway)), tasks(*this) {
2583 2584 2585
    tasks.add(acceptLoop());
  }
  Impl(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2586
      : network(network), restorer(restorer), tasks(*this) {
2587 2588 2589 2590
    tasks.add(acceptLoop());
  }

  ~Impl() noexcept(false) {
2591 2592 2593 2594 2595
    unwindDetector.catchExceptionsIfUnwinding([&]() {
      // std::unordered_map doesn't like it when elements' destructors throw, so carefully
      // disassemble it.
      if (!connections.empty()) {
        kj::Vector<kj::Own<RpcConnectionState>> deleteMe(connections.size());
2596
        kj::Exception shutdownException = KJ_EXCEPTION(FAILED, "RpcSystem was destroyed.");
2597 2598 2599 2600
        for (auto& entry: connections) {
          entry.second->disconnect(kj::cp(shutdownException));
          deleteMe.add(kj::mv(entry.second));
        }
2601
      }
2602
    });
2603
  }
2604

2605 2606 2607 2608 2609 2610 2611 2612
  Capability::Client bootstrap(_::StructReader vatId) {
    // For now we delegate to restore() since it's equivalent, but eventually we'll remove restore()
    // and implement bootstrap() directly.
    return restore(vatId, AnyPointer::Reader());
  }

  Capability::Client restore(_::StructReader vatId, AnyPointer::Reader objectId) {
    KJ_IF_MAYBE(connection, network.baseConnect(vatId)) {
2613
      auto& state = getConnectionState(kj::mv(*connection));
2614 2615 2616 2617 2618 2619 2620
      return Capability::Client(state.restore(objectId));
    } else KJ_IF_MAYBE(r, restorer) {
      return r->baseRestore(objectId);
    } else {
      return Capability::Client(newBrokenCap(
          "SturdyRef referred to a local object but there is no local SturdyRef restorer."));
    }
2621 2622 2623
  }

  void taskFailed(kj::Exception&& exception) override {
2624
    KJ_LOG(ERROR, exception);
Kenton Varda's avatar
Kenton Varda committed
2625 2626
  }

2627 2628
private:
  VatNetworkBase& network;
2629
  kj::Maybe<Capability::Client> bootstrapInterface;
2630
  kj::Maybe<RealmGateway<>::Client> gateway;
2631 2632 2633 2634 2635
  kj::Maybe<SturdyRefRestorerBase&> restorer;
  kj::TaskSet tasks;

  typedef std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>>
      ConnectionMap;
2636
  ConnectionMap connections;
Kenton Varda's avatar
Kenton Varda committed
2637

2638 2639
  kj::UnwindDetector unwindDetector;

2640 2641 2642
  RpcConnectionState& getConnectionState(kj::Own<VatNetworkBase::Connection>&& connection) {
    auto iter = connections.find(connection);
    if (iter == connections.end()) {
2643
      VatNetworkBase::Connection* connectionPtr = connection;
2644
      auto onDisconnect = kj::newPromiseAndFulfiller<void>();
2645
      tasks.add(onDisconnect.promise.then([this,connectionPtr]() {
2646
        connections.erase(connectionPtr);
2647 2648
      }));
      auto newState = kj::refcounted<RpcConnectionState>(
2649 2650
          bootstrapInterface, gateway, restorer, kj::mv(connection),
          kj::mv(onDisconnect.fulfiller));
2651
      RpcConnectionState& result = *newState;
2652
      connections.insert(std::make_pair(connectionPtr, kj::mv(newState)));
2653 2654 2655 2656 2657 2658 2659
      return result;
    } else {
      return *iter->second;
    }
  }

  kj::Promise<void> acceptLoop() {
2660
    auto receive = network.baseAccept().then(
2661
        [this](kj::Own<VatNetworkBase::Connection>&& connection) {
2662
      getConnectionState(kj::mv(connection));
2663 2664 2665 2666 2667 2668 2669 2670
    });
    return receive.then([this]() {
      // No exceptions; continue loop.
      //
      // (We do this in a separate continuation to handle the case where exceptions are
      // disabled.)
      tasks.add(acceptLoop());
    });
2671
  }
2672 2673
};

2674
RpcSystemBase::RpcSystemBase(VatNetworkBase& network,
2675 2676 2677
                             kj::Maybe<Capability::Client> bootstrapInterface,
                             kj::Maybe<RealmGateway<>::Client> gateway)
    : impl(kj::heap<Impl>(network, kj::mv(bootstrapInterface), kj::mv(gateway))) {}
2678
RpcSystemBase::RpcSystemBase(VatNetworkBase& network, SturdyRefRestorerBase& restorer)
2679
    : impl(kj::heap<Impl>(network, restorer)) {}
2680
RpcSystemBase::RpcSystemBase(RpcSystemBase&& other) noexcept = default;
2681 2682
RpcSystemBase::~RpcSystemBase() noexcept(false) {}

2683 2684 2685 2686
Capability::Client RpcSystemBase::baseBootstrap(_::StructReader vatId) {
  return impl->bootstrap(vatId);
}

2687
Capability::Client RpcSystemBase::baseRestore(
2688
    _::StructReader hostId, AnyPointer::Reader objectId) {
2689
  return impl->restore(hostId, objectId);
2690 2691 2692 2693
}

}  // namespace _ (private)
}  // namespace capnp