capability.c++ 22.8 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22 23
#define CAPNP_PRIVATE

24
#include "capability.h"
Kenton Varda's avatar
Kenton Varda committed
25
#include "message.h"
26
#include "arena.h"
Kenton Varda's avatar
Kenton Varda committed
27
#include <kj/refcount.h>
28
#include <kj/debug.h>
Kenton Varda's avatar
Kenton Varda committed
29
#include <kj/vector.h>
30
#include <map>
31
#include "generated-header-support.h"
32 33 34

namespace capnp {

35 36 37
const _::RawBrandedSchema* const Capability::_capnpPrivate::brand =
    &_::NULL_INTERFACE_SCHEMA.defaultBrand;

38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
namespace _ {

void setGlobalBrokenCapFactoryForLayoutCpp(BrokenCapFactory& factory);
// Defined in layout.c++.

}  // namespace _

namespace {

class BrokenCapFactoryImpl: public _::BrokenCapFactory {
public:
  kj::Own<ClientHook> newBrokenCap(kj::StringPtr description) override {
    return capnp::newBrokenCap(description);
  }
};

static BrokenCapFactoryImpl brokenCapFactory;

}  // namespace

ClientHook::ClientHook() {
  setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
}

void MessageReader::initCapTable(kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTable) {
  setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
  arena()->initCapTable(kj::mv(capTable));
}

// =======================================================================================

69 70 71
Capability::Client::Client(decltype(nullptr))
    : hook(newBrokenCap("Called null capability.")) {}

72 73 74
Capability::Client::Client(kj::Exception&& exception)
    : hook(newBrokenCap(kj::mv(exception))) {}

75 76
kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* actualInterfaceName, uint64_t requestedTypeId) {
77
  KJ_UNIMPLEMENTED("Requested interface not implemented.", actualInterfaceName, requestedTypeId) {
78
    // Recoverable exception will be caught by promise framework.
79 80 81 82 83 84

    // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
    // a bug that exists in both Clang and GCC:
    //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
    //   http://llvm.org/bugs/show_bug.cgi?id=12286
    break;
85
  }
86
  return kj::READY_NOW;
87 88 89 90
}

kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* interfaceName, uint64_t typeId, uint16_t methodId) {
91
  KJ_UNIMPLEMENTED("Method not implemented.", interfaceName, typeId, methodId) {
92
    // Recoverable exception will be caught by promise framework.
93
    break;
94
  }
95
  return kj::READY_NOW;
96 97 98 99
}

kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* interfaceName, const char* methodName, uint64_t typeId, uint16_t methodId) {
100
  KJ_UNIMPLEMENTED("Method not implemented.", interfaceName, typeId, methodName, methodId) {
101
    // Recoverable exception will be caught by promise framework.
102
    break;
103
  }
104
  return kj::READY_NOW;
105 106
}

Kenton Varda's avatar
Kenton Varda committed
107 108
ResponseHook::~ResponseHook() noexcept(false) {}

109
kj::Promise<void> ClientHook::whenResolved() {
110
  KJ_IF_MAYBE(promise, whenMoreResolved()) {
111
    return promise->then([](kj::Own<ClientHook>&& resolution) {
112 113 114 115 116 117
      return resolution->whenResolved();
    });
  } else {
    return kj::READY_NOW;
  }
}
Kenton Varda's avatar
Kenton Varda committed
118

119
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
120

121 122 123 124 125 126 127 128
static inline uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint) {
  KJ_IF_MAYBE(s, sizeHint) {
    return s->wordCount;
  } else {
    return SUGGESTED_FIRST_SEGMENT_WORDS;
  }
}

129
class LocalResponse final: public ResponseHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
130
public:
131
  LocalResponse(kj::Maybe<MessageSize> sizeHint)
132
      : message(firstSegmentSize(sizeHint)) {}
Kenton Varda's avatar
Kenton Varda committed
133

134
  MallocMessageBuilder message;
Kenton Varda's avatar
Kenton Varda committed
135 136
};

137
class LocalCallContext final: public CallContextHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
138
public:
139
  LocalCallContext(kj::Own<MallocMessageBuilder>&& request, kj::Own<ClientHook> clientRef,
140 141 142
                   kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller)
      : request(kj::mv(request)), clientRef(kj::mv(clientRef)),
        cancelAllowedFulfiller(kj::mv(cancelAllowedFulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
143

144
  AnyPointer::Reader getParams() override {
145
    KJ_IF_MAYBE(r, request) {
146
      return r->get()->getRoot<AnyPointer>();
147 148 149
    } else {
      KJ_FAIL_REQUIRE("Can't call getParams() after releaseParams().");
    }
Kenton Varda's avatar
Kenton Varda committed
150 151 152 153
  }
  void releaseParams() override {
    request = nullptr;
  }
154
  AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
155
    if (response == nullptr) {
156
      auto localResponse = kj::refcounted<LocalResponse>(sizeHint);
157
      responseBuilder = localResponse->message.getRoot<AnyPointer>();
158
      response = Response<AnyPointer>(responseBuilder.asReader(), kj::mv(localResponse));
Kenton Varda's avatar
Kenton Varda committed
159
    }
160 161
    return responseBuilder;
  }
162 163 164
  kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
    auto result = directTailCall(kj::mv(request));
    KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
165
      f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
166 167 168 169
    }
    return kj::mv(result.promise);
  }
  ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
170 171 172 173
    KJ_REQUIRE(response == nullptr, "Can't call tailCall() after initializing the results struct.");

    auto promise = request->send();

174
    auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
175 176
      response = kj::mv(tailResponse);
    });
177 178

    return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
179
  }
180 181
  kj::Promise<AnyPointer::Pipeline> onTailCall() override {
    auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
182 183
    tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
    return kj::mv(paf.promise);
Kenton Varda's avatar
Kenton Varda committed
184
  }
185
  void allowCancellation() override {
186
    cancelAllowedFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
187
  }
188 189
  kj::Own<CallContextHook> addRef() override {
    return kj::addRef(*this);
190
  }
Kenton Varda's avatar
Kenton Varda committed
191

192
  kj::Maybe<kj::Own<MallocMessageBuilder>> request;
193 194
  kj::Maybe<Response<AnyPointer>> response;
  AnyPointer::Builder responseBuilder = nullptr;  // only valid if `response` is non-null
195
  kj::Own<ClientHook> clientRef;
196
  kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
197
  kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller;
Kenton Varda's avatar
Kenton Varda committed
198 199
};

200
class LocalRequest final: public RequestHook {
Kenton Varda's avatar
Kenton Varda committed
201
public:
202
  inline LocalRequest(uint64_t interfaceId, uint16_t methodId,
203
                      kj::Maybe<MessageSize> sizeHint, kj::Own<ClientHook> client)
204
      : message(kj::heap<MallocMessageBuilder>(firstSegmentSize(sizeHint))),
205 206
        interfaceId(interfaceId), methodId(methodId), client(kj::mv(client)) {}

207
  RemotePromise<AnyPointer> send() override {
208 209
    KJ_REQUIRE(message.get() != nullptr, "Already called send() on this request.");

210 211 212 213
    // For the lambda capture.
    uint64_t interfaceId = this->interfaceId;
    uint16_t methodId = this->methodId;

214 215 216 217
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

    auto context = kj::refcounted<LocalCallContext>(
        kj::mv(message), client->addRef(), kj::mv(cancelPaf.fulfiller));
218
    auto promiseAndPipeline = client->call(interfaceId, methodId, kj::addRef(*context));
219

220 221
    // We have to make sure the call is not canceled unless permitted.  We need to fork the promise
    // so that if the client drops their copy, the promise isn't necessarily canceled.
222
    auto forked = promiseAndPipeline.promise.fork();
223 224 225

    // We daemonize one branch, but only after joining it with the promise that fires if
    // cancellation is allowed.
226 227 228 229
    forked.addBranch()
        .attach(kj::addRef(*context))
        .exclusiveJoin(kj::mv(cancelPaf.promise))
        .detach([](kj::Exception&&) {});  // ignore exceptions
230 231

    // Now the other branch returns the response from the context.
232 233
    auto promise = forked.addBranch().then(kj::mvCapture(context,
        [](kj::Own<LocalCallContext>&& context) {
234
      context->getResults(MessageSize { 0, 0 });  // force response allocation
235 236
      return kj::mv(KJ_ASSERT_NONNULL(context->response));
    }));
237

238
    // We return the other branch.
239 240
    return RemotePromise<AnyPointer>(
        kj::mv(promise), AnyPointer::Pipeline(kj::mv(promiseAndPipeline.pipeline)));
241 242
  }

243
  const void* getBrand() override {
244 245 246
    return nullptr;
  }

247
  kj::Own<MallocMessageBuilder> message;
248 249 250 251

private:
  uint64_t interfaceId;
  uint16_t methodId;
252
  kj::Own<ClientHook> client;
253 254 255 256 257 258 259 260 261 262 263 264 265
};

// =======================================================================================
// Call queues
//
// These classes handle pipelining in the case where calls need to be queued in-memory until some
// local operation completes.

class QueuedPipeline final: public PipelineHook, public kj::Refcounted {
  // A PipelineHook which simply queues calls while waiting for a PipelineHook to which to forward
  // them.

public:
266 267
  QueuedPipeline(kj::Promise<kj::Own<PipelineHook>>&& promiseParam)
      : promise(promiseParam.fork()),
268 269 270 271 272
        selfResolutionOp(promise.addBranch().then([this](kj::Own<PipelineHook>&& inner) {
          redirect = kj::mv(inner);
        }, [this](kj::Exception&& exception) {
          redirect = newBrokenPipeline(kj::mv(exception));
        }).eagerlyEvaluate(nullptr)) {}
273

274
  kj::Own<PipelineHook> addRef() override {
275 276 277
    return kj::addRef(*this);
  }

278
  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
279 280 281 282
    auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
    for (auto& op: ops) {
      copy.add(op);
    }
283
    return getPipelinedCap(copy.finish());
284 285
  }

286
  kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override;
287 288

private:
289
  kj::ForkedPromise<kj::Own<PipelineHook>> promise;
Kenton Varda's avatar
Kenton Varda committed
290

Kenton Varda's avatar
Kenton Varda committed
291
  kj::Maybe<kj::Own<PipelineHook>> redirect;
Kenton Varda's avatar
Kenton Varda committed
292 293 294 295
  // Once the promise resolves, this will become non-null and point to the underlying object.

  kj::Promise<void> selfResolutionOp;
  // Represents the operation which will set `redirect` when possible.
296 297 298 299 300 301 302
};

class QueuedClient final: public ClientHook, public kj::Refcounted {
  // A ClientHook which simply queues calls while waiting for a ClientHook to which to forward
  // them.

public:
303 304
  QueuedClient(kj::Promise<kj::Own<ClientHook>>&& promiseParam)
      : promise(promiseParam.fork()),
305 306 307 308 309
        selfResolutionOp(promise.addBranch().then([this](kj::Own<ClientHook>&& inner) {
          redirect = kj::mv(inner);
        }, [this](kj::Exception&& exception) {
          redirect = newBrokenCap(kj::mv(exception));
        }).eagerlyEvaluate(nullptr)),
Kenton Varda's avatar
Kenton Varda committed
310
        promiseForCallForwarding(promise.addBranch().fork()),
311
        promiseForClientResolution(promise.addBranch().fork()) {}
Kenton Varda's avatar
Kenton Varda committed
312

313
  Request<AnyPointer, AnyPointer> newCall(
314
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
315
    auto hook = kj::heap<LocalRequest>(
316
        interfaceId, methodId, sizeHint, kj::addRef(*this));
317
    auto root = hook->message->getRoot<AnyPointer>();
318
    return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
Kenton Varda's avatar
Kenton Varda committed
319 320 321
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
322
                              kj::Own<CallContextHook>&& context) override {
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
    // This is a bit complicated.  We need to initiate this call later on.  When we initiate the
    // call, we'll get a void promise for its completion and a pipeline object.  Right now, we have
    // to produce a similar void promise and pipeline that will eventually be chained to those.
    // The problem is, these are two independent objects, but they both depend on the result of
    // one future call.
    //
    // So, we need to set up a continuation that will initiate the call later, then we need to
    // fork the promise for that continuation in order to send the completion promise and the
    // pipeline to their respective places.
    //
    // TODO(perf):  Too much reference counting?  Can we do better?  Maybe a way to fork
    //   Promise<Tuple<T, U>> into Tuple<Promise<T>, Promise<U>>?

    struct CallResultHolder: public kj::Refcounted {
      // Essentially acts as a refcounted \VoidPromiseAndPipeline, so that we can create a promise
      // for it and fork that promise.

340
      VoidPromiseAndPipeline content;
341
      // One branch of the fork will use content.promise, the other branch will use
342
      // content.pipeline.  Neither branch will touch the other's piece.
343 344 345

      inline CallResultHolder(VoidPromiseAndPipeline&& content): content(kj::mv(content)) {}

346
      kj::Own<CallResultHolder> addRef() { return kj::addRef(*this); }
347 348 349
    };

    // Create a promise for the call initiation.
350
    kj::ForkedPromise<kj::Own<CallResultHolder>> callResultPromise =
Kenton Varda's avatar
Kenton Varda committed
351
        promiseForCallForwarding.addBranch().then(kj::mvCapture(context,
352
        [=](kj::Own<CallContextHook>&& context, kj::Own<ClientHook>&& client){
353 354
          return kj::refcounted<CallResultHolder>(
              client->call(interfaceId, methodId, kj::mv(context)));
355
        })).fork();
356 357 358

    // Create a promise that extracts the pipeline from the call initiation, and construct our
    // QueuedPipeline to chain to it.
359 360
    auto pipelinePromise = callResultPromise.addBranch().then(
        [](kj::Own<CallResultHolder>&& callResult){
361 362
          return kj::mv(callResult->content.pipeline);
        });
363
    auto pipeline = kj::refcounted<QueuedPipeline>(kj::mv(pipelinePromise));
364 365

    // Create a promise that simply chains to the void promise produced by the call initiation.
366 367
    auto completionPromise = callResultPromise.addBranch().then(
        [](kj::Own<CallResultHolder>&& callResult){
368 369 370 371 372
          return kj::mv(callResult->content.promise);
        });

    // OK, now we can actually return our thing.
    return VoidPromiseAndPipeline { kj::mv(completionPromise), kj::mv(pipeline) };
Kenton Varda's avatar
Kenton Varda committed
373 374
  }

375
  kj::Maybe<ClientHook&> getResolved() override {
Kenton Varda's avatar
Kenton Varda committed
376
    KJ_IF_MAYBE(inner, redirect) {
377 378 379 380 381 382
      return **inner;
    } else {
      return nullptr;
    }
  }

383
  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
Kenton Varda's avatar
Kenton Varda committed
384
    return promiseForClientResolution.addBranch();
Kenton Varda's avatar
Kenton Varda committed
385 386
  }

387
  kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
388 389 390
    return kj::addRef(*this);
  }

391
  const void* getBrand() override {
Kenton Varda's avatar
Kenton Varda committed
392 393 394 395
    return nullptr;
  }

private:
396
  typedef kj::ForkedPromise<kj::Own<ClientHook>> ClientHookPromiseFork;
397

Kenton Varda's avatar
Kenton Varda committed
398 399 400
  kj::Maybe<kj::Own<ClientHook>> redirect;
  // Once the promise resolves, this will become non-null and point to the underlying object.

401 402 403
  ClientHookPromiseFork promise;
  // Promise that resolves when we have a new ClientHook to forward to.
  //
Kenton Varda's avatar
Kenton Varda committed
404
  // This fork shall only have three branches:  `selfResolutionOp`, `promiseForCallForwarding`, and
405 406
  // `promiseForClientResolution`, in that order.

Kenton Varda's avatar
Kenton Varda committed
407 408 409 410
  kj::Promise<void> selfResolutionOp;
  // Represents the operation which will set `redirect` when possible.

  ClientHookPromiseFork promiseForCallForwarding;
411 412 413 414
  // When this promise resolves, each queued call will be forwarded to the real client.  This needs
  // to occur *before* any 'whenMoreResolved()' promises resolve, because we want to make sure
  // previously-queued calls are delivered before any new calls made in response to the resolution.

Kenton Varda's avatar
Kenton Varda committed
415
  ClientHookPromiseFork promiseForClientResolution;
416 417 418 419 420 421
  // whenMoreResolved() returns forks of this promise.  These must resolve *after* queued calls
  // have been initiated (so that any calls made in the whenMoreResolved() handler are correctly
  // delivered after calls made earlier), but *before* any queued calls return (because it might
  // confuse the application if a queued call returns before the capability on which it was made
  // resolves).  Luckily, we know that queued calls will involve, at the very least, an
  // eventLoop.evalLater.
Kenton Varda's avatar
Kenton Varda committed
422 423
};

424
kj::Own<ClientHook> QueuedPipeline::getPipelinedCap(kj::Array<PipelineOp>&& ops) {
Kenton Varda's avatar
Kenton Varda committed
425 426
  KJ_IF_MAYBE(r, redirect) {
    return r->get()->getPipelinedCap(kj::mv(ops));
Kenton Varda's avatar
Kenton Varda committed
427
  } else {
428 429
    auto clientPromise = promise.addBranch().then(kj::mvCapture(ops,
        [](kj::Array<PipelineOp>&& ops, kj::Own<PipelineHook> pipeline) {
Kenton Varda's avatar
Kenton Varda committed
430 431 432
          return pipeline->getPipelinedCap(kj::mv(ops));
        }));

433
    return kj::refcounted<QueuedClient>(kj::mv(clientPromise));
Kenton Varda's avatar
Kenton Varda committed
434
  }
435
}
Kenton Varda's avatar
Kenton Varda committed
436

437
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
438

439
class LocalPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
440
public:
441 442
  inline LocalPipeline(kj::Own<CallContextHook>&& contextParam)
      : context(kj::mv(contextParam)),
443
        results(context->getResults(MessageSize { 0, 0 })) {}
Kenton Varda's avatar
Kenton Varda committed
444

445
  kj::Own<PipelineHook> addRef() {
446
    return kj::addRef(*this);
Kenton Varda's avatar
Kenton Varda committed
447 448
  }

449
  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
450
    return results.getPipelinedCap(ops);
451
  }
Kenton Varda's avatar
Kenton Varda committed
452 453

private:
454
  kj::Own<CallContextHook> context;
455
  AnyPointer::Reader results;
Kenton Varda's avatar
Kenton Varda committed
456 457 458 459
};

class LocalClient final: public ClientHook, public kj::Refcounted {
public:
460 461
  LocalClient(kj::Own<Capability::Server>&& server)
      : server(kj::mv(server)) {}
Kenton Varda's avatar
Kenton Varda committed
462

463
  Request<AnyPointer, AnyPointer> newCall(
464
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
Kenton Varda's avatar
Kenton Varda committed
465
    auto hook = kj::heap<LocalRequest>(
466
        interfaceId, methodId, sizeHint, kj::addRef(*this));
467
    auto root = hook->message->getRoot<AnyPointer>();
468
    return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
Kenton Varda's avatar
Kenton Varda committed
469 470 471
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
472
                              kj::Own<CallContextHook>&& context) override {
473 474
    auto contextPtr = context.get();

475 476 477
    // We don't want to actually dispatch the call synchronously, because we don't want the callee
    // to have any side effects before the promise is returned to the caller.  This helps avoid
    // race conditions.
478 479 480 481 482
    //
    // So, we do an evalLater() here.
    //
    // Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't
    // complete before 'whenMoreResolved()' promises resolve.
483 484
    auto promise = kj::evalLater([this,interfaceId,methodId,contextPtr]() {
      return server->dispatchCall(interfaceId, methodId,
485
                                  CallContext<AnyPointer, AnyPointer>(*contextPtr));
486
    }).attach(kj::addRef(*this));
487

488
    // We have to fork this promise for the pipeline to receive a copy of the answer.
489
    auto forked = promise.fork();
490

491 492
    auto pipelinePromise = forked.addBranch().then(kj::mvCapture(context->addRef(),
        [=](kj::Own<CallContextHook>&& context) -> kj::Own<PipelineHook> {
493 494 495
          context->releaseParams();
          return kj::refcounted<LocalPipeline>(kj::mv(context));
        }));
496

497
    auto tailPipelinePromise = context->onTailCall().then([](AnyPointer::Pipeline&& pipeline) {
498 499 500
      return kj::mv(pipeline.hook);
    });

501
    pipelinePromise = pipelinePromise.exclusiveJoin(kj::mv(tailPipelinePromise));
502

503
    auto completionPromise = forked.addBranch().attach(kj::mv(context));
Kenton Varda's avatar
Kenton Varda committed
504

505
    return VoidPromiseAndPipeline { kj::mv(completionPromise),
506
        kj::refcounted<QueuedPipeline>(kj::mv(pipelinePromise)) };
Kenton Varda's avatar
Kenton Varda committed
507 508
  }

509
  kj::Maybe<ClientHook&> getResolved() override {
510 511 512
    return nullptr;
  }

513
  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
Kenton Varda's avatar
Kenton Varda committed
514 515 516
    return nullptr;
  }

517
  kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
518 519 520
    return kj::addRef(*this);
  }

521
  const void* getBrand() override {
Kenton Varda's avatar
Kenton Varda committed
522 523 524 525 526
    // We have no need to detect local objects.
    return nullptr;
  }

private:
527
  kj::Own<Capability::Server> server;
Kenton Varda's avatar
Kenton Varda committed
528 529
};

530 531
kj::Own<ClientHook> Capability::Client::makeLocalClient(kj::Own<Capability::Server>&& server) {
  return kj::refcounted<LocalClient>(kj::mv(server));
Kenton Varda's avatar
Kenton Varda committed
532 533
}

534 535
kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& promise) {
  return kj::refcounted<QueuedClient>(kj::mv(promise));
Kenton Varda's avatar
Kenton Varda committed
536 537
}

538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
// =======================================================================================

namespace {

class BrokenPipeline final: public PipelineHook, public kj::Refcounted {
public:
  BrokenPipeline(const kj::Exception& exception): exception(exception) {}

  kj::Own<PipelineHook> addRef() override {
    return kj::addRef(*this);
  }

  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override;

private:
  kj::Exception exception;
};

class BrokenRequest final: public RequestHook {
public:
  BrokenRequest(const kj::Exception& exception, kj::Maybe<MessageSize> sizeHint)
      : exception(exception), message(firstSegmentSize(sizeHint)) {}

  RemotePromise<AnyPointer> send() override {
    return RemotePromise<AnyPointer>(kj::cp(exception),
        AnyPointer::Pipeline(kj::refcounted<BrokenPipeline>(exception)));
  }

  const void* getBrand() {
    return nullptr;
  }

  kj::Exception exception;
  MallocMessageBuilder message;
};

class BrokenClient final: public ClientHook, public kj::Refcounted {
public:
  BrokenClient(const kj::Exception& exception): exception(exception) {}
  BrokenClient(const kj::StringPtr description)
578
      : exception(kj::Exception::Type::FAILED, "", 0, kj::str(description)) {}
579 580 581

  Request<AnyPointer, AnyPointer> newCall(
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
582
    return newBrokenRequest(kj::cp(exception), sizeHint);
583 584 585 586
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
                              kj::Own<CallContextHook>&& context) override {
587
    return VoidPromiseAndPipeline { kj::cp(exception), kj::refcounted<BrokenPipeline>(exception) };
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627
  }

  kj::Maybe<ClientHook&> getResolved() {
    return nullptr;
  }

  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
    return kj::Promise<kj::Own<ClientHook>>(kj::cp(exception));
  }

  kj::Own<ClientHook> addRef() override {
    return kj::addRef(*this);
  }

  const void* getBrand() override {
    return nullptr;
  }

private:
  kj::Exception exception;
};

kj::Own<ClientHook> BrokenPipeline::getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
  return kj::refcounted<BrokenClient>(exception);
}

}  // namespace

kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason) {
  return kj::refcounted<BrokenClient>(reason);
}

kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason) {
  return kj::refcounted<BrokenClient>(kj::mv(reason));
}

kj::Own<PipelineHook> newBrokenPipeline(kj::Exception&& reason) {
  return kj::refcounted<BrokenPipeline>(kj::mv(reason));
}

628 629 630 631 632 633 634
Request<AnyPointer, AnyPointer> newBrokenRequest(
    kj::Exception&& reason, kj::Maybe<MessageSize> sizeHint) {
  auto hook = kj::heap<BrokenRequest>(kj::mv(reason), sizeHint);
  auto root = hook->message.getRoot<AnyPointer>();
  return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
}

635
}  // namespace capnp