capability.c++ 24.8 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22 23
#define CAPNP_PRIVATE

24
#include "capability.h"
Kenton Varda's avatar
Kenton Varda committed
25
#include "message.h"
26
#include "arena.h"
Kenton Varda's avatar
Kenton Varda committed
27
#include <kj/refcount.h>
28
#include <kj/debug.h>
Kenton Varda's avatar
Kenton Varda committed
29
#include <kj/vector.h>
30
#include <map>
31
#include "generated-header-support.h"
32 33 34

namespace capnp {

35 36 37
const _::RawBrandedSchema* const Capability::_capnpPrivate::brand =
    &_::NULL_INTERFACE_SCHEMA.defaultBrand;

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
namespace _ {

void setGlobalBrokenCapFactoryForLayoutCpp(BrokenCapFactory& factory);
// Defined in layout.c++.

}  // namespace _

namespace {

class BrokenCapFactoryImpl: public _::BrokenCapFactory {
public:
  kj::Own<ClientHook> newBrokenCap(kj::StringPtr description) override {
    return capnp::newBrokenCap(description);
  }
};

static BrokenCapFactoryImpl brokenCapFactory;

56 57
static kj::Own<ClientHook> newNullCap();

58 59 60 61 62 63
}  // namespace

ClientHook::ClientHook() {
  setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
}

64 65 66 67
void* ClientHook::getLocalServer(_::CapabilityServerSetBase& capServerSet) {
  return nullptr;
}

68 69 70 71 72 73 74
void MessageReader::initCapTable(kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTable) {
  setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
  arena()->initCapTable(kj::mv(capTable));
}

// =======================================================================================

75
Capability::Client::Client(decltype(nullptr))
76
    : hook(newNullCap()) {}
77

78 79 80
Capability::Client::Client(kj::Exception&& exception)
    : hook(newBrokenCap(kj::mv(exception))) {}

81 82
kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* actualInterfaceName, uint64_t requestedTypeId) {
83
  KJ_UNIMPLEMENTED("Requested interface not implemented.", actualInterfaceName, requestedTypeId) {
84
    // Recoverable exception will be caught by promise framework.
85 86 87 88 89 90

    // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
    // a bug that exists in both Clang and GCC:
    //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
    //   http://llvm.org/bugs/show_bug.cgi?id=12286
    break;
91
  }
92
  return kj::READY_NOW;
93 94 95 96
}

kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* interfaceName, uint64_t typeId, uint16_t methodId) {
97
  KJ_UNIMPLEMENTED("Method not implemented.", interfaceName, typeId, methodId) {
98
    // Recoverable exception will be caught by promise framework.
99
    break;
100
  }
101
  return kj::READY_NOW;
102 103 104 105
}

kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* interfaceName, const char* methodName, uint64_t typeId, uint16_t methodId) {
106
  KJ_UNIMPLEMENTED("Method not implemented.", interfaceName, typeId, methodName, methodId) {
107
    // Recoverable exception will be caught by promise framework.
108
    break;
109
  }
110
  return kj::READY_NOW;
111 112
}

Kenton Varda's avatar
Kenton Varda committed
113 114
ResponseHook::~ResponseHook() noexcept(false) {}

115
kj::Promise<void> ClientHook::whenResolved() {
116
  KJ_IF_MAYBE(promise, whenMoreResolved()) {
117
    return promise->then([](kj::Own<ClientHook>&& resolution) {
118 119 120 121 122 123
      return resolution->whenResolved();
    });
  } else {
    return kj::READY_NOW;
  }
}
Kenton Varda's avatar
Kenton Varda committed
124

125
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
126

127 128 129 130 131 132 133 134
static inline uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint) {
  KJ_IF_MAYBE(s, sizeHint) {
    return s->wordCount;
  } else {
    return SUGGESTED_FIRST_SEGMENT_WORDS;
  }
}

135
class LocalResponse final: public ResponseHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
136
public:
137
  LocalResponse(kj::Maybe<MessageSize> sizeHint)
138
      : message(firstSegmentSize(sizeHint)) {}
Kenton Varda's avatar
Kenton Varda committed
139

140
  MallocMessageBuilder message;
Kenton Varda's avatar
Kenton Varda committed
141 142
};

143
class LocalCallContext final: public CallContextHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
144
public:
145
  LocalCallContext(kj::Own<MallocMessageBuilder>&& request, kj::Own<ClientHook> clientRef,
146 147 148
                   kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller)
      : request(kj::mv(request)), clientRef(kj::mv(clientRef)),
        cancelAllowedFulfiller(kj::mv(cancelAllowedFulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
149

150
  AnyPointer::Reader getParams() override {
151
    KJ_IF_MAYBE(r, request) {
152
      return r->get()->getRoot<AnyPointer>();
153 154 155
    } else {
      KJ_FAIL_REQUIRE("Can't call getParams() after releaseParams().");
    }
Kenton Varda's avatar
Kenton Varda committed
156 157 158 159
  }
  void releaseParams() override {
    request = nullptr;
  }
160
  AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
161
    if (response == nullptr) {
162
      auto localResponse = kj::refcounted<LocalResponse>(sizeHint);
163
      responseBuilder = localResponse->message.getRoot<AnyPointer>();
164
      response = Response<AnyPointer>(responseBuilder.asReader(), kj::mv(localResponse));
Kenton Varda's avatar
Kenton Varda committed
165
    }
166 167
    return responseBuilder;
  }
168 169 170
  kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
    auto result = directTailCall(kj::mv(request));
    KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
171
      f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
172 173 174 175
    }
    return kj::mv(result.promise);
  }
  ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
176 177 178 179
    KJ_REQUIRE(response == nullptr, "Can't call tailCall() after initializing the results struct.");

    auto promise = request->send();

180
    auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
181 182
      response = kj::mv(tailResponse);
    });
183 184

    return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
185
  }
186 187
  kj::Promise<AnyPointer::Pipeline> onTailCall() override {
    auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
188 189
    tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
    return kj::mv(paf.promise);
Kenton Varda's avatar
Kenton Varda committed
190
  }
191
  void allowCancellation() override {
192
    cancelAllowedFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
193
  }
194 195
  kj::Own<CallContextHook> addRef() override {
    return kj::addRef(*this);
196
  }
Kenton Varda's avatar
Kenton Varda committed
197

198
  kj::Maybe<kj::Own<MallocMessageBuilder>> request;
199 200
  kj::Maybe<Response<AnyPointer>> response;
  AnyPointer::Builder responseBuilder = nullptr;  // only valid if `response` is non-null
201
  kj::Own<ClientHook> clientRef;
202
  kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
203
  kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller;
Kenton Varda's avatar
Kenton Varda committed
204 205
};

206
class LocalRequest final: public RequestHook {
Kenton Varda's avatar
Kenton Varda committed
207
public:
208
  inline LocalRequest(uint64_t interfaceId, uint16_t methodId,
209
                      kj::Maybe<MessageSize> sizeHint, kj::Own<ClientHook> client)
210
      : message(kj::heap<MallocMessageBuilder>(firstSegmentSize(sizeHint))),
211 212
        interfaceId(interfaceId), methodId(methodId), client(kj::mv(client)) {}

213
  RemotePromise<AnyPointer> send() override {
214 215
    KJ_REQUIRE(message.get() != nullptr, "Already called send() on this request.");

216 217 218 219
    // For the lambda capture.
    uint64_t interfaceId = this->interfaceId;
    uint16_t methodId = this->methodId;

220 221 222 223
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

    auto context = kj::refcounted<LocalCallContext>(
        kj::mv(message), client->addRef(), kj::mv(cancelPaf.fulfiller));
224
    auto promiseAndPipeline = client->call(interfaceId, methodId, kj::addRef(*context));
225

226 227
    // We have to make sure the call is not canceled unless permitted.  We need to fork the promise
    // so that if the client drops their copy, the promise isn't necessarily canceled.
228
    auto forked = promiseAndPipeline.promise.fork();
229 230 231

    // We daemonize one branch, but only after joining it with the promise that fires if
    // cancellation is allowed.
232 233 234 235
    forked.addBranch()
        .attach(kj::addRef(*context))
        .exclusiveJoin(kj::mv(cancelPaf.promise))
        .detach([](kj::Exception&&) {});  // ignore exceptions
236 237

    // Now the other branch returns the response from the context.
238 239
    auto promise = forked.addBranch().then(kj::mvCapture(context,
        [](kj::Own<LocalCallContext>&& context) {
240
      context->getResults(MessageSize { 0, 0 });  // force response allocation
241 242
      return kj::mv(KJ_ASSERT_NONNULL(context->response));
    }));
243

244
    // We return the other branch.
245 246
    return RemotePromise<AnyPointer>(
        kj::mv(promise), AnyPointer::Pipeline(kj::mv(promiseAndPipeline.pipeline)));
247 248
  }

249
  const void* getBrand() override {
250 251 252
    return nullptr;
  }

253
  kj::Own<MallocMessageBuilder> message;
254 255 256 257

private:
  uint64_t interfaceId;
  uint16_t methodId;
258
  kj::Own<ClientHook> client;
259 260 261 262 263 264 265 266 267 268 269 270 271
};

// =======================================================================================
// Call queues
//
// These classes handle pipelining in the case where calls need to be queued in-memory until some
// local operation completes.

class QueuedPipeline final: public PipelineHook, public kj::Refcounted {
  // A PipelineHook which simply queues calls while waiting for a PipelineHook to which to forward
  // them.

public:
272 273
  QueuedPipeline(kj::Promise<kj::Own<PipelineHook>>&& promiseParam)
      : promise(promiseParam.fork()),
274 275 276 277 278
        selfResolutionOp(promise.addBranch().then([this](kj::Own<PipelineHook>&& inner) {
          redirect = kj::mv(inner);
        }, [this](kj::Exception&& exception) {
          redirect = newBrokenPipeline(kj::mv(exception));
        }).eagerlyEvaluate(nullptr)) {}
279

280
  kj::Own<PipelineHook> addRef() override {
281 282 283
    return kj::addRef(*this);
  }

284
  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
285 286 287 288
    auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
    for (auto& op: ops) {
      copy.add(op);
    }
289
    return getPipelinedCap(copy.finish());
290 291
  }

292
  kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override;
293 294

private:
295
  kj::ForkedPromise<kj::Own<PipelineHook>> promise;
Kenton Varda's avatar
Kenton Varda committed
296

Kenton Varda's avatar
Kenton Varda committed
297
  kj::Maybe<kj::Own<PipelineHook>> redirect;
Kenton Varda's avatar
Kenton Varda committed
298 299 300 301
  // Once the promise resolves, this will become non-null and point to the underlying object.

  kj::Promise<void> selfResolutionOp;
  // Represents the operation which will set `redirect` when possible.
302 303 304 305 306 307 308
};

class QueuedClient final: public ClientHook, public kj::Refcounted {
  // A ClientHook which simply queues calls while waiting for a ClientHook to which to forward
  // them.

public:
309 310
  QueuedClient(kj::Promise<kj::Own<ClientHook>>&& promiseParam)
      : promise(promiseParam.fork()),
311 312 313 314 315
        selfResolutionOp(promise.addBranch().then([this](kj::Own<ClientHook>&& inner) {
          redirect = kj::mv(inner);
        }, [this](kj::Exception&& exception) {
          redirect = newBrokenCap(kj::mv(exception));
        }).eagerlyEvaluate(nullptr)),
Kenton Varda's avatar
Kenton Varda committed
316
        promiseForCallForwarding(promise.addBranch().fork()),
317
        promiseForClientResolution(promise.addBranch().fork()) {}
Kenton Varda's avatar
Kenton Varda committed
318

319
  Request<AnyPointer, AnyPointer> newCall(
320
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
321
    auto hook = kj::heap<LocalRequest>(
322
        interfaceId, methodId, sizeHint, kj::addRef(*this));
323
    auto root = hook->message->getRoot<AnyPointer>();
324
    return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
Kenton Varda's avatar
Kenton Varda committed
325 326 327
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
328
                              kj::Own<CallContextHook>&& context) override {
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
    // This is a bit complicated.  We need to initiate this call later on.  When we initiate the
    // call, we'll get a void promise for its completion and a pipeline object.  Right now, we have
    // to produce a similar void promise and pipeline that will eventually be chained to those.
    // The problem is, these are two independent objects, but they both depend on the result of
    // one future call.
    //
    // So, we need to set up a continuation that will initiate the call later, then we need to
    // fork the promise for that continuation in order to send the completion promise and the
    // pipeline to their respective places.
    //
    // TODO(perf):  Too much reference counting?  Can we do better?  Maybe a way to fork
    //   Promise<Tuple<T, U>> into Tuple<Promise<T>, Promise<U>>?

    struct CallResultHolder: public kj::Refcounted {
      // Essentially acts as a refcounted \VoidPromiseAndPipeline, so that we can create a promise
      // for it and fork that promise.

346
      VoidPromiseAndPipeline content;
347
      // One branch of the fork will use content.promise, the other branch will use
348
      // content.pipeline.  Neither branch will touch the other's piece.
349 350 351

      inline CallResultHolder(VoidPromiseAndPipeline&& content): content(kj::mv(content)) {}

352
      kj::Own<CallResultHolder> addRef() { return kj::addRef(*this); }
353 354 355
    };

    // Create a promise for the call initiation.
356
    kj::ForkedPromise<kj::Own<CallResultHolder>> callResultPromise =
Kenton Varda's avatar
Kenton Varda committed
357
        promiseForCallForwarding.addBranch().then(kj::mvCapture(context,
358
        [=](kj::Own<CallContextHook>&& context, kj::Own<ClientHook>&& client){
359 360
          return kj::refcounted<CallResultHolder>(
              client->call(interfaceId, methodId, kj::mv(context)));
361
        })).fork();
362 363 364

    // Create a promise that extracts the pipeline from the call initiation, and construct our
    // QueuedPipeline to chain to it.
365 366
    auto pipelinePromise = callResultPromise.addBranch().then(
        [](kj::Own<CallResultHolder>&& callResult){
367 368
          return kj::mv(callResult->content.pipeline);
        });
369
    auto pipeline = kj::refcounted<QueuedPipeline>(kj::mv(pipelinePromise));
370 371

    // Create a promise that simply chains to the void promise produced by the call initiation.
372 373
    auto completionPromise = callResultPromise.addBranch().then(
        [](kj::Own<CallResultHolder>&& callResult){
374 375 376 377 378
          return kj::mv(callResult->content.promise);
        });

    // OK, now we can actually return our thing.
    return VoidPromiseAndPipeline { kj::mv(completionPromise), kj::mv(pipeline) };
Kenton Varda's avatar
Kenton Varda committed
379 380
  }

381
  kj::Maybe<ClientHook&> getResolved() override {
Kenton Varda's avatar
Kenton Varda committed
382
    KJ_IF_MAYBE(inner, redirect) {
383 384 385 386 387 388
      return **inner;
    } else {
      return nullptr;
    }
  }

389
  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
Kenton Varda's avatar
Kenton Varda committed
390
    return promiseForClientResolution.addBranch();
Kenton Varda's avatar
Kenton Varda committed
391 392
  }

393
  kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
394 395 396
    return kj::addRef(*this);
  }

397
  const void* getBrand() override {
Kenton Varda's avatar
Kenton Varda committed
398 399 400 401
    return nullptr;
  }

private:
402
  typedef kj::ForkedPromise<kj::Own<ClientHook>> ClientHookPromiseFork;
403

Kenton Varda's avatar
Kenton Varda committed
404 405 406
  kj::Maybe<kj::Own<ClientHook>> redirect;
  // Once the promise resolves, this will become non-null and point to the underlying object.

407 408 409
  ClientHookPromiseFork promise;
  // Promise that resolves when we have a new ClientHook to forward to.
  //
Kenton Varda's avatar
Kenton Varda committed
410
  // This fork shall only have three branches:  `selfResolutionOp`, `promiseForCallForwarding`, and
411 412
  // `promiseForClientResolution`, in that order.

Kenton Varda's avatar
Kenton Varda committed
413 414 415 416
  kj::Promise<void> selfResolutionOp;
  // Represents the operation which will set `redirect` when possible.

  ClientHookPromiseFork promiseForCallForwarding;
417 418 419 420
  // When this promise resolves, each queued call will be forwarded to the real client.  This needs
  // to occur *before* any 'whenMoreResolved()' promises resolve, because we want to make sure
  // previously-queued calls are delivered before any new calls made in response to the resolution.

Kenton Varda's avatar
Kenton Varda committed
421
  ClientHookPromiseFork promiseForClientResolution;
422 423 424 425 426 427
  // whenMoreResolved() returns forks of this promise.  These must resolve *after* queued calls
  // have been initiated (so that any calls made in the whenMoreResolved() handler are correctly
  // delivered after calls made earlier), but *before* any queued calls return (because it might
  // confuse the application if a queued call returns before the capability on which it was made
  // resolves).  Luckily, we know that queued calls will involve, at the very least, an
  // eventLoop.evalLater.
Kenton Varda's avatar
Kenton Varda committed
428 429
};

430
kj::Own<ClientHook> QueuedPipeline::getPipelinedCap(kj::Array<PipelineOp>&& ops) {
Kenton Varda's avatar
Kenton Varda committed
431 432
  KJ_IF_MAYBE(r, redirect) {
    return r->get()->getPipelinedCap(kj::mv(ops));
Kenton Varda's avatar
Kenton Varda committed
433
  } else {
434 435
    auto clientPromise = promise.addBranch().then(kj::mvCapture(ops,
        [](kj::Array<PipelineOp>&& ops, kj::Own<PipelineHook> pipeline) {
Kenton Varda's avatar
Kenton Varda committed
436 437 438
          return pipeline->getPipelinedCap(kj::mv(ops));
        }));

439
    return kj::refcounted<QueuedClient>(kj::mv(clientPromise));
Kenton Varda's avatar
Kenton Varda committed
440
  }
441
}
Kenton Varda's avatar
Kenton Varda committed
442

443
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
444

445
class LocalPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
446
public:
447 448
  inline LocalPipeline(kj::Own<CallContextHook>&& contextParam)
      : context(kj::mv(contextParam)),
449
        results(context->getResults(MessageSize { 0, 0 })) {}
Kenton Varda's avatar
Kenton Varda committed
450

451
  kj::Own<PipelineHook> addRef() {
452
    return kj::addRef(*this);
Kenton Varda's avatar
Kenton Varda committed
453 454
  }

455
  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
456
    return results.getPipelinedCap(ops);
457
  }
Kenton Varda's avatar
Kenton Varda committed
458 459

private:
460
  kj::Own<CallContextHook> context;
461
  AnyPointer::Reader results;
Kenton Varda's avatar
Kenton Varda committed
462 463 464 465
};

class LocalClient final: public ClientHook, public kj::Refcounted {
public:
466 467 468 469 470
  LocalClient(kj::Own<Capability::Server>&& serverParam)
      : server(kj::mv(serverParam)) {
    server->thisHook = this;
  }
  LocalClient(kj::Own<Capability::Server>&& serverParam,
471
              _::CapabilityServerSetBase& capServerSet, void* ptr)
472 473
      : server(kj::mv(serverParam)), capServerSet(&capServerSet), ptr(ptr) {
    server->thisHook = this;
474 475
  }

476 477
  ~LocalClient() noexcept(false) {
    server->thisHook = nullptr;
478
  }
Kenton Varda's avatar
Kenton Varda committed
479

480
  Request<AnyPointer, AnyPointer> newCall(
481
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
Kenton Varda's avatar
Kenton Varda committed
482
    auto hook = kj::heap<LocalRequest>(
483
        interfaceId, methodId, sizeHint, kj::addRef(*this));
484
    auto root = hook->message->getRoot<AnyPointer>();
485
    return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
Kenton Varda's avatar
Kenton Varda committed
486 487 488
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
489
                              kj::Own<CallContextHook>&& context) override {
490 491
    auto contextPtr = context.get();

492 493 494
    // We don't want to actually dispatch the call synchronously, because we don't want the callee
    // to have any side effects before the promise is returned to the caller.  This helps avoid
    // race conditions.
495 496 497 498 499
    //
    // So, we do an evalLater() here.
    //
    // Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't
    // complete before 'whenMoreResolved()' promises resolve.
500 501
    auto promise = kj::evalLater([this,interfaceId,methodId,contextPtr]() {
      return server->dispatchCall(interfaceId, methodId,
502
                                  CallContext<AnyPointer, AnyPointer>(*contextPtr));
503
    }).attach(kj::addRef(*this));
504

505
    // We have to fork this promise for the pipeline to receive a copy of the answer.
506
    auto forked = promise.fork();
507

508 509
    auto pipelinePromise = forked.addBranch().then(kj::mvCapture(context->addRef(),
        [=](kj::Own<CallContextHook>&& context) -> kj::Own<PipelineHook> {
510 511 512
          context->releaseParams();
          return kj::refcounted<LocalPipeline>(kj::mv(context));
        }));
513

514
    auto tailPipelinePromise = context->onTailCall().then([](AnyPointer::Pipeline&& pipeline) {
515 516 517
      return kj::mv(pipeline.hook);
    });

518
    pipelinePromise = pipelinePromise.exclusiveJoin(kj::mv(tailPipelinePromise));
519

520
    auto completionPromise = forked.addBranch().attach(kj::mv(context));
Kenton Varda's avatar
Kenton Varda committed
521

522
    return VoidPromiseAndPipeline { kj::mv(completionPromise),
523
        kj::refcounted<QueuedPipeline>(kj::mv(pipelinePromise)) };
Kenton Varda's avatar
Kenton Varda committed
524 525
  }

526
  kj::Maybe<ClientHook&> getResolved() override {
527 528 529
    return nullptr;
  }

530
  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
Kenton Varda's avatar
Kenton Varda committed
531 532 533
    return nullptr;
  }

534
  kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
535 536 537
    return kj::addRef(*this);
  }

538
  const void* getBrand() override {
Kenton Varda's avatar
Kenton Varda committed
539 540 541 542
    // We have no need to detect local objects.
    return nullptr;
  }

543 544 545 546 547 548 549 550
  void* getLocalServer(_::CapabilityServerSetBase& capServerSet) override {
    if (this->capServerSet == &capServerSet) {
      return ptr;
    } else {
      return nullptr;
    }
  }

Kenton Varda's avatar
Kenton Varda committed
551
private:
552
  kj::Own<Capability::Server> server;
553 554
  _::CapabilityServerSetBase* capServerSet = nullptr;
  void* ptr = nullptr;
Kenton Varda's avatar
Kenton Varda committed
555 556
};

557 558
kj::Own<ClientHook> Capability::Client::makeLocalClient(kj::Own<Capability::Server>&& server) {
  return kj::refcounted<LocalClient>(kj::mv(server));
Kenton Varda's avatar
Kenton Varda committed
559 560
}

561 562
kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& promise) {
  return kj::refcounted<QueuedClient>(kj::mv(promise));
Kenton Varda's avatar
Kenton Varda committed
563 564
}

565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602
// =======================================================================================

namespace {

class BrokenPipeline final: public PipelineHook, public kj::Refcounted {
public:
  BrokenPipeline(const kj::Exception& exception): exception(exception) {}

  kj::Own<PipelineHook> addRef() override {
    return kj::addRef(*this);
  }

  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override;

private:
  kj::Exception exception;
};

class BrokenRequest final: public RequestHook {
public:
  BrokenRequest(const kj::Exception& exception, kj::Maybe<MessageSize> sizeHint)
      : exception(exception), message(firstSegmentSize(sizeHint)) {}

  RemotePromise<AnyPointer> send() override {
    return RemotePromise<AnyPointer>(kj::cp(exception),
        AnyPointer::Pipeline(kj::refcounted<BrokenPipeline>(exception)));
  }

  const void* getBrand() {
    return nullptr;
  }

  kj::Exception exception;
  MallocMessageBuilder message;
};

class BrokenClient final: public ClientHook, public kj::Refcounted {
public:
603 604 605 606
  BrokenClient(const kj::Exception& exception, bool resolved)
      : exception(exception), resolved(resolved) {}
  BrokenClient(const kj::StringPtr description, bool resolved)
      : exception(kj::Exception::Type::FAILED, "", 0, kj::str(description)), resolved(resolved) {}
607 608 609

  Request<AnyPointer, AnyPointer> newCall(
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
610
    return newBrokenRequest(kj::cp(exception), sizeHint);
611 612 613 614
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
                              kj::Own<CallContextHook>&& context) override {
615
    return VoidPromiseAndPipeline { kj::cp(exception), kj::refcounted<BrokenPipeline>(exception) };
616 617 618 619 620 621 622
  }

  kj::Maybe<ClientHook&> getResolved() {
    return nullptr;
  }

  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
623 624 625 626 627
    if (resolved) {
      return nullptr;
    } else {
      return kj::Promise<kj::Own<ClientHook>>(kj::cp(exception));
    }
628 629 630 631 632 633 634 635 636 637 638 639
  }

  kj::Own<ClientHook> addRef() override {
    return kj::addRef(*this);
  }

  const void* getBrand() override {
    return nullptr;
  }

private:
  kj::Exception exception;
640
  bool resolved;
641 642 643
};

kj::Own<ClientHook> BrokenPipeline::getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
644 645 646 647 648 649
  return kj::refcounted<BrokenClient>(exception, false);
}

kj::Own<ClientHook> newNullCap() {
  // A null capability, unlike other broken capabilities, is considered resolved.
  return kj::refcounted<BrokenClient>("Called null capability.", true);
650 651 652 653 654
}

}  // namespace

kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason) {
655
  return kj::refcounted<BrokenClient>(reason, false);
656 657 658
}

kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason) {
659
  return kj::refcounted<BrokenClient>(kj::mv(reason), false);
660 661 662 663 664 665
}

kj::Own<PipelineHook> newBrokenPipeline(kj::Exception&& reason) {
  return kj::refcounted<BrokenPipeline>(kj::mv(reason));
}

666 667 668 669 670 671 672
Request<AnyPointer, AnyPointer> newBrokenRequest(
    kj::Exception&& reason, kj::Maybe<MessageSize> sizeHint) {
  auto hook = kj::heap<BrokenRequest>(kj::mv(reason), sizeHint);
  auto root = hook->message.getRoot<AnyPointer>();
  return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
}

673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704
// =======================================================================================
// CapabilityServerSet

namespace _ {  // private

Capability::Client CapabilityServerSetBase::addInternal(
    kj::Own<Capability::Server>&& server, void* ptr) {
  return Capability::Client(kj::refcounted<LocalClient>(kj::mv(server), *this, ptr));
}

kj::Promise<void*> CapabilityServerSetBase::getLocalServerInternal(Capability::Client& client) {
  ClientHook* hook = client.hook.get();

  // Get the most-resolved-so-far version of the hook.
  KJ_IF_MAYBE(h, hook->getResolved()) {
    hook = h;
  };

  KJ_IF_MAYBE(p, hook->whenMoreResolved()) {
    // This hook is an unresolved promise. We need to wait for it.
    return p->attach(hook->addRef())
        .then([this](kj::Own<ClientHook>&& resolved) {
      Capability::Client client(kj::mv(resolved));
      return getLocalServerInternal(client);
    });
  } else {
    return hook->getLocalServer(*this);
  }
}

}  // namespace _ (private)

705
}  // namespace capnp