capability.c++ 22.7 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22 23
#define CAPNP_PRIVATE

24
#include "capability.h"
Kenton Varda's avatar
Kenton Varda committed
25
#include "message.h"
26
#include "arena.h"
Kenton Varda's avatar
Kenton Varda committed
27
#include <kj/refcount.h>
28
#include <kj/debug.h>
Kenton Varda's avatar
Kenton Varda committed
29
#include <kj/vector.h>
30
#include <map>
31 32 33

namespace capnp {

34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
namespace _ {

void setGlobalBrokenCapFactoryForLayoutCpp(BrokenCapFactory& factory);
// Defined in layout.c++.

}  // namespace _

namespace {

class BrokenCapFactoryImpl: public _::BrokenCapFactory {
public:
  kj::Own<ClientHook> newBrokenCap(kj::StringPtr description) override {
    return capnp::newBrokenCap(description);
  }
};

static BrokenCapFactoryImpl brokenCapFactory;

}  // namespace

ClientHook::ClientHook() {
  setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
}

void MessageReader::initCapTable(kj::Array<kj::Maybe<kj::Own<ClientHook>>> capTable) {
  setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
  arena()->initCapTable(kj::mv(capTable));
}

// =======================================================================================

65 66 67
Capability::Client::Client(decltype(nullptr))
    : hook(newBrokenCap("Called null capability.")) {}

68 69 70
Capability::Client::Client(kj::Exception&& exception)
    : hook(newBrokenCap(kj::mv(exception))) {}

71 72 73 74
kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* actualInterfaceName, uint64_t requestedTypeId) {
  KJ_FAIL_REQUIRE("Requested interface not implemented.", actualInterfaceName, requestedTypeId) {
    // Recoverable exception will be caught by promise framework.
75 76 77 78 79 80

    // We can't "return kj::READY_NOW;" inside this block because it causes a memory leak due to
    // a bug that exists in both Clang and GCC:
    //   http://gcc.gnu.org/bugzilla/show_bug.cgi?id=33799
    //   http://llvm.org/bugs/show_bug.cgi?id=12286
    break;
81
  }
82
  return kj::READY_NOW;
83 84 85 86 87 88
}

kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* interfaceName, uint64_t typeId, uint16_t methodId) {
  KJ_FAIL_REQUIRE("Method not implemented.", interfaceName, typeId, methodId) {
    // Recoverable exception will be caught by promise framework.
89
    break;
90
  }
91
  return kj::READY_NOW;
92 93 94 95 96 97
}

kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* interfaceName, const char* methodName, uint64_t typeId, uint16_t methodId) {
  KJ_FAIL_REQUIRE("Method not implemented.", interfaceName, typeId, methodName, methodId) {
    // Recoverable exception will be caught by promise framework.
98
    break;
99
  }
100
  return kj::READY_NOW;
101 102
}

Kenton Varda's avatar
Kenton Varda committed
103 104
ResponseHook::~ResponseHook() noexcept(false) {}

105
kj::Promise<void> ClientHook::whenResolved() {
106
  KJ_IF_MAYBE(promise, whenMoreResolved()) {
107
    return promise->then([](kj::Own<ClientHook>&& resolution) {
108 109 110 111 112 113
      return resolution->whenResolved();
    });
  } else {
    return kj::READY_NOW;
  }
}
Kenton Varda's avatar
Kenton Varda committed
114

115
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
116

117 118 119 120 121 122 123 124
static inline uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint) {
  KJ_IF_MAYBE(s, sizeHint) {
    return s->wordCount;
  } else {
    return SUGGESTED_FIRST_SEGMENT_WORDS;
  }
}

125
class LocalResponse final: public ResponseHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
126
public:
127
  LocalResponse(kj::Maybe<MessageSize> sizeHint)
128
      : message(firstSegmentSize(sizeHint)) {}
Kenton Varda's avatar
Kenton Varda committed
129

130
  MallocMessageBuilder message;
Kenton Varda's avatar
Kenton Varda committed
131 132
};

133
class LocalCallContext final: public CallContextHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
134
public:
135
  LocalCallContext(kj::Own<MallocMessageBuilder>&& request, kj::Own<ClientHook> clientRef,
136 137 138
                   kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller)
      : request(kj::mv(request)), clientRef(kj::mv(clientRef)),
        cancelAllowedFulfiller(kj::mv(cancelAllowedFulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
139

140
  AnyPointer::Reader getParams() override {
141
    KJ_IF_MAYBE(r, request) {
142
      return r->get()->getRoot<AnyPointer>();
143 144 145
    } else {
      KJ_FAIL_REQUIRE("Can't call getParams() after releaseParams().");
    }
Kenton Varda's avatar
Kenton Varda committed
146 147 148 149
  }
  void releaseParams() override {
    request = nullptr;
  }
150
  AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
151
    if (response == nullptr) {
152
      auto localResponse = kj::refcounted<LocalResponse>(sizeHint);
153
      responseBuilder = localResponse->message.getRoot<AnyPointer>();
154
      response = Response<AnyPointer>(responseBuilder.asReader(), kj::mv(localResponse));
Kenton Varda's avatar
Kenton Varda committed
155
    }
156 157
    return responseBuilder;
  }
158 159 160
  kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
    auto result = directTailCall(kj::mv(request));
    KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
161
      f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
162 163 164 165
    }
    return kj::mv(result.promise);
  }
  ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
166 167 168 169
    KJ_REQUIRE(response == nullptr, "Can't call tailCall() after initializing the results struct.");

    auto promise = request->send();

170
    auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
171 172
      response = kj::mv(tailResponse);
    });
173 174

    return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
175
  }
176 177
  kj::Promise<AnyPointer::Pipeline> onTailCall() override {
    auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
178 179
    tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
    return kj::mv(paf.promise);
Kenton Varda's avatar
Kenton Varda committed
180
  }
181
  void allowCancellation() override {
182
    cancelAllowedFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
183
  }
184 185
  kj::Own<CallContextHook> addRef() override {
    return kj::addRef(*this);
186
  }
Kenton Varda's avatar
Kenton Varda committed
187

188
  kj::Maybe<kj::Own<MallocMessageBuilder>> request;
189 190
  kj::Maybe<Response<AnyPointer>> response;
  AnyPointer::Builder responseBuilder = nullptr;  // only valid if `response` is non-null
191
  kj::Own<ClientHook> clientRef;
192
  kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
193
  kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller;
Kenton Varda's avatar
Kenton Varda committed
194 195
};

196
class LocalRequest final: public RequestHook {
Kenton Varda's avatar
Kenton Varda committed
197
public:
198
  inline LocalRequest(uint64_t interfaceId, uint16_t methodId,
199
                      kj::Maybe<MessageSize> sizeHint, kj::Own<ClientHook> client)
200
      : message(kj::heap<MallocMessageBuilder>(firstSegmentSize(sizeHint))),
201 202
        interfaceId(interfaceId), methodId(methodId), client(kj::mv(client)) {}

203
  RemotePromise<AnyPointer> send() override {
204 205
    KJ_REQUIRE(message.get() != nullptr, "Already called send() on this request.");

206 207 208 209
    // For the lambda capture.
    uint64_t interfaceId = this->interfaceId;
    uint16_t methodId = this->methodId;

210 211 212 213
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

    auto context = kj::refcounted<LocalCallContext>(
        kj::mv(message), client->addRef(), kj::mv(cancelPaf.fulfiller));
214
    auto promiseAndPipeline = client->call(interfaceId, methodId, kj::addRef(*context));
215

216 217
    // We have to make sure the call is not canceled unless permitted.  We need to fork the promise
    // so that if the client drops their copy, the promise isn't necessarily canceled.
218
    auto forked = promiseAndPipeline.promise.fork();
219 220 221

    // We daemonize one branch, but only after joining it with the promise that fires if
    // cancellation is allowed.
222 223 224 225
    forked.addBranch()
        .attach(kj::addRef(*context))
        .exclusiveJoin(kj::mv(cancelPaf.promise))
        .detach([](kj::Exception&&) {});  // ignore exceptions
226 227

    // Now the other branch returns the response from the context.
228 229
    auto promise = forked.addBranch().then(kj::mvCapture(context,
        [](kj::Own<LocalCallContext>&& context) {
230
      context->getResults(MessageSize { 0, 0 });  // force response allocation
231 232
      return kj::mv(KJ_ASSERT_NONNULL(context->response));
    }));
233

234
    // We return the other branch.
235 236
    return RemotePromise<AnyPointer>(
        kj::mv(promise), AnyPointer::Pipeline(kj::mv(promiseAndPipeline.pipeline)));
237 238
  }

239
  const void* getBrand() override {
240 241 242
    return nullptr;
  }

243
  kj::Own<MallocMessageBuilder> message;
244 245 246 247

private:
  uint64_t interfaceId;
  uint16_t methodId;
248
  kj::Own<ClientHook> client;
249 250 251 252 253 254 255 256 257 258 259 260 261
};

// =======================================================================================
// Call queues
//
// These classes handle pipelining in the case where calls need to be queued in-memory until some
// local operation completes.

class QueuedPipeline final: public PipelineHook, public kj::Refcounted {
  // A PipelineHook which simply queues calls while waiting for a PipelineHook to which to forward
  // them.

public:
262 263
  QueuedPipeline(kj::Promise<kj::Own<PipelineHook>>&& promiseParam)
      : promise(promiseParam.fork()),
264 265 266 267 268
        selfResolutionOp(promise.addBranch().then([this](kj::Own<PipelineHook>&& inner) {
          redirect = kj::mv(inner);
        }, [this](kj::Exception&& exception) {
          redirect = newBrokenPipeline(kj::mv(exception));
        }).eagerlyEvaluate(nullptr)) {}
269

270
  kj::Own<PipelineHook> addRef() override {
271 272 273
    return kj::addRef(*this);
  }

274
  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
275 276 277 278
    auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
    for (auto& op: ops) {
      copy.add(op);
    }
279
    return getPipelinedCap(copy.finish());
280 281
  }

282
  kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override;
283 284

private:
285
  kj::ForkedPromise<kj::Own<PipelineHook>> promise;
Kenton Varda's avatar
Kenton Varda committed
286

Kenton Varda's avatar
Kenton Varda committed
287
  kj::Maybe<kj::Own<PipelineHook>> redirect;
Kenton Varda's avatar
Kenton Varda committed
288 289 290 291
  // Once the promise resolves, this will become non-null and point to the underlying object.

  kj::Promise<void> selfResolutionOp;
  // Represents the operation which will set `redirect` when possible.
292 293 294 295 296 297 298
};

class QueuedClient final: public ClientHook, public kj::Refcounted {
  // A ClientHook which simply queues calls while waiting for a ClientHook to which to forward
  // them.

public:
299 300
  QueuedClient(kj::Promise<kj::Own<ClientHook>>&& promiseParam)
      : promise(promiseParam.fork()),
301 302 303 304 305
        selfResolutionOp(promise.addBranch().then([this](kj::Own<ClientHook>&& inner) {
          redirect = kj::mv(inner);
        }, [this](kj::Exception&& exception) {
          redirect = newBrokenCap(kj::mv(exception));
        }).eagerlyEvaluate(nullptr)),
Kenton Varda's avatar
Kenton Varda committed
306
        promiseForCallForwarding(promise.addBranch().fork()),
307
        promiseForClientResolution(promise.addBranch().fork()) {}
Kenton Varda's avatar
Kenton Varda committed
308

309
  Request<AnyPointer, AnyPointer> newCall(
310
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
311
    auto hook = kj::heap<LocalRequest>(
312
        interfaceId, methodId, sizeHint, kj::addRef(*this));
313
    auto root = hook->message->getRoot<AnyPointer>();
314
    return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
Kenton Varda's avatar
Kenton Varda committed
315 316 317
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
318
                              kj::Own<CallContextHook>&& context) override {
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335
    // This is a bit complicated.  We need to initiate this call later on.  When we initiate the
    // call, we'll get a void promise for its completion and a pipeline object.  Right now, we have
    // to produce a similar void promise and pipeline that will eventually be chained to those.
    // The problem is, these are two independent objects, but they both depend on the result of
    // one future call.
    //
    // So, we need to set up a continuation that will initiate the call later, then we need to
    // fork the promise for that continuation in order to send the completion promise and the
    // pipeline to their respective places.
    //
    // TODO(perf):  Too much reference counting?  Can we do better?  Maybe a way to fork
    //   Promise<Tuple<T, U>> into Tuple<Promise<T>, Promise<U>>?

    struct CallResultHolder: public kj::Refcounted {
      // Essentially acts as a refcounted \VoidPromiseAndPipeline, so that we can create a promise
      // for it and fork that promise.

336
      VoidPromiseAndPipeline content;
337
      // One branch of the fork will use content.promise, the other branch will use
338
      // content.pipeline.  Neither branch will touch the other's piece.
339 340 341

      inline CallResultHolder(VoidPromiseAndPipeline&& content): content(kj::mv(content)) {}

342
      kj::Own<CallResultHolder> addRef() { return kj::addRef(*this); }
343 344 345
    };

    // Create a promise for the call initiation.
346
    kj::ForkedPromise<kj::Own<CallResultHolder>> callResultPromise =
Kenton Varda's avatar
Kenton Varda committed
347
        promiseForCallForwarding.addBranch().then(kj::mvCapture(context,
348
        [=](kj::Own<CallContextHook>&& context, kj::Own<ClientHook>&& client){
349 350
          return kj::refcounted<CallResultHolder>(
              client->call(interfaceId, methodId, kj::mv(context)));
351
        })).fork();
352 353 354

    // Create a promise that extracts the pipeline from the call initiation, and construct our
    // QueuedPipeline to chain to it.
355 356
    auto pipelinePromise = callResultPromise.addBranch().then(
        [](kj::Own<CallResultHolder>&& callResult){
357 358
          return kj::mv(callResult->content.pipeline);
        });
359
    auto pipeline = kj::refcounted<QueuedPipeline>(kj::mv(pipelinePromise));
360 361

    // Create a promise that simply chains to the void promise produced by the call initiation.
362 363
    auto completionPromise = callResultPromise.addBranch().then(
        [](kj::Own<CallResultHolder>&& callResult){
364 365 366 367 368
          return kj::mv(callResult->content.promise);
        });

    // OK, now we can actually return our thing.
    return VoidPromiseAndPipeline { kj::mv(completionPromise), kj::mv(pipeline) };
Kenton Varda's avatar
Kenton Varda committed
369 370
  }

371
  kj::Maybe<ClientHook&> getResolved() override {
Kenton Varda's avatar
Kenton Varda committed
372
    KJ_IF_MAYBE(inner, redirect) {
373 374 375 376 377 378
      return **inner;
    } else {
      return nullptr;
    }
  }

379
  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
Kenton Varda's avatar
Kenton Varda committed
380
    return promiseForClientResolution.addBranch();
Kenton Varda's avatar
Kenton Varda committed
381 382
  }

383
  kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
384 385 386
    return kj::addRef(*this);
  }

387
  const void* getBrand() override {
Kenton Varda's avatar
Kenton Varda committed
388 389 390 391
    return nullptr;
  }

private:
392
  typedef kj::ForkedPromise<kj::Own<ClientHook>> ClientHookPromiseFork;
393

Kenton Varda's avatar
Kenton Varda committed
394 395 396
  kj::Maybe<kj::Own<ClientHook>> redirect;
  // Once the promise resolves, this will become non-null and point to the underlying object.

397 398 399
  ClientHookPromiseFork promise;
  // Promise that resolves when we have a new ClientHook to forward to.
  //
Kenton Varda's avatar
Kenton Varda committed
400
  // This fork shall only have three branches:  `selfResolutionOp`, `promiseForCallForwarding`, and
401 402
  // `promiseForClientResolution`, in that order.

Kenton Varda's avatar
Kenton Varda committed
403 404 405 406
  kj::Promise<void> selfResolutionOp;
  // Represents the operation which will set `redirect` when possible.

  ClientHookPromiseFork promiseForCallForwarding;
407 408 409 410
  // When this promise resolves, each queued call will be forwarded to the real client.  This needs
  // to occur *before* any 'whenMoreResolved()' promises resolve, because we want to make sure
  // previously-queued calls are delivered before any new calls made in response to the resolution.

Kenton Varda's avatar
Kenton Varda committed
411
  ClientHookPromiseFork promiseForClientResolution;
412 413 414 415 416 417
  // whenMoreResolved() returns forks of this promise.  These must resolve *after* queued calls
  // have been initiated (so that any calls made in the whenMoreResolved() handler are correctly
  // delivered after calls made earlier), but *before* any queued calls return (because it might
  // confuse the application if a queued call returns before the capability on which it was made
  // resolves).  Luckily, we know that queued calls will involve, at the very least, an
  // eventLoop.evalLater.
Kenton Varda's avatar
Kenton Varda committed
418 419
};

420
kj::Own<ClientHook> QueuedPipeline::getPipelinedCap(kj::Array<PipelineOp>&& ops) {
Kenton Varda's avatar
Kenton Varda committed
421 422
  KJ_IF_MAYBE(r, redirect) {
    return r->get()->getPipelinedCap(kj::mv(ops));
Kenton Varda's avatar
Kenton Varda committed
423
  } else {
424 425
    auto clientPromise = promise.addBranch().then(kj::mvCapture(ops,
        [](kj::Array<PipelineOp>&& ops, kj::Own<PipelineHook> pipeline) {
Kenton Varda's avatar
Kenton Varda committed
426 427 428
          return pipeline->getPipelinedCap(kj::mv(ops));
        }));

429
    return kj::refcounted<QueuedClient>(kj::mv(clientPromise));
Kenton Varda's avatar
Kenton Varda committed
430
  }
431
}
Kenton Varda's avatar
Kenton Varda committed
432

433
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
434

435
class LocalPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
436
public:
437 438
  inline LocalPipeline(kj::Own<CallContextHook>&& contextParam)
      : context(kj::mv(contextParam)),
439
        results(context->getResults(MessageSize { 0, 0 })) {}
Kenton Varda's avatar
Kenton Varda committed
440

441
  kj::Own<PipelineHook> addRef() {
442
    return kj::addRef(*this);
Kenton Varda's avatar
Kenton Varda committed
443 444
  }

445
  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
446
    return results.getPipelinedCap(ops);
447
  }
Kenton Varda's avatar
Kenton Varda committed
448 449

private:
450
  kj::Own<CallContextHook> context;
451
  AnyPointer::Reader results;
Kenton Varda's avatar
Kenton Varda committed
452 453 454 455
};

class LocalClient final: public ClientHook, public kj::Refcounted {
public:
456 457
  LocalClient(kj::Own<Capability::Server>&& server)
      : server(kj::mv(server)) {}
Kenton Varda's avatar
Kenton Varda committed
458

459
  Request<AnyPointer, AnyPointer> newCall(
460
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
Kenton Varda's avatar
Kenton Varda committed
461
    auto hook = kj::heap<LocalRequest>(
462
        interfaceId, methodId, sizeHint, kj::addRef(*this));
463
    auto root = hook->message->getRoot<AnyPointer>();
464
    return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
Kenton Varda's avatar
Kenton Varda committed
465 466 467
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
468
                              kj::Own<CallContextHook>&& context) override {
469 470
    auto contextPtr = context.get();

471 472 473
    // We don't want to actually dispatch the call synchronously, because we don't want the callee
    // to have any side effects before the promise is returned to the caller.  This helps avoid
    // race conditions.
474 475 476 477 478
    //
    // So, we do an evalLater() here.
    //
    // Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't
    // complete before 'whenMoreResolved()' promises resolve.
479 480
    auto promise = kj::evalLater([this,interfaceId,methodId,contextPtr]() {
      return server->dispatchCall(interfaceId, methodId,
481
                                  CallContext<AnyPointer, AnyPointer>(*contextPtr));
482
    }).attach(kj::addRef(*this));
483

484
    // We have to fork this promise for the pipeline to receive a copy of the answer.
485
    auto forked = promise.fork();
486

487 488
    auto pipelinePromise = forked.addBranch().then(kj::mvCapture(context->addRef(),
        [=](kj::Own<CallContextHook>&& context) -> kj::Own<PipelineHook> {
489 490 491
          context->releaseParams();
          return kj::refcounted<LocalPipeline>(kj::mv(context));
        }));
492

493
    auto tailPipelinePromise = context->onTailCall().then([](AnyPointer::Pipeline&& pipeline) {
494 495 496
      return kj::mv(pipeline.hook);
    });

497
    pipelinePromise = pipelinePromise.exclusiveJoin(kj::mv(tailPipelinePromise));
498

499
    auto completionPromise = forked.addBranch().attach(kj::mv(context));
Kenton Varda's avatar
Kenton Varda committed
500

501
    return VoidPromiseAndPipeline { kj::mv(completionPromise),
502
        kj::refcounted<QueuedPipeline>(kj::mv(pipelinePromise)) };
Kenton Varda's avatar
Kenton Varda committed
503 504
  }

505
  kj::Maybe<ClientHook&> getResolved() override {
506 507 508
    return nullptr;
  }

509
  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
Kenton Varda's avatar
Kenton Varda committed
510 511 512
    return nullptr;
  }

513
  kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
514 515 516
    return kj::addRef(*this);
  }

517
  const void* getBrand() override {
Kenton Varda's avatar
Kenton Varda committed
518 519 520 521 522
    // We have no need to detect local objects.
    return nullptr;
  }

private:
523
  kj::Own<Capability::Server> server;
Kenton Varda's avatar
Kenton Varda committed
524 525
};

526 527
kj::Own<ClientHook> Capability::Client::makeLocalClient(kj::Own<Capability::Server>&& server) {
  return kj::refcounted<LocalClient>(kj::mv(server));
Kenton Varda's avatar
Kenton Varda committed
528 529
}

530 531
kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& promise) {
  return kj::refcounted<QueuedClient>(kj::mv(promise));
Kenton Varda's avatar
Kenton Varda committed
532 533
}

534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
// =======================================================================================

namespace {

class BrokenPipeline final: public PipelineHook, public kj::Refcounted {
public:
  BrokenPipeline(const kj::Exception& exception): exception(exception) {}

  kj::Own<PipelineHook> addRef() override {
    return kj::addRef(*this);
  }

  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override;

private:
  kj::Exception exception;
};

class BrokenRequest final: public RequestHook {
public:
  BrokenRequest(const kj::Exception& exception, kj::Maybe<MessageSize> sizeHint)
      : exception(exception), message(firstSegmentSize(sizeHint)) {}

  RemotePromise<AnyPointer> send() override {
    return RemotePromise<AnyPointer>(kj::cp(exception),
        AnyPointer::Pipeline(kj::refcounted<BrokenPipeline>(exception)));
  }

  const void* getBrand() {
    return nullptr;
  }

  kj::Exception exception;
  MallocMessageBuilder message;
};

class BrokenClient final: public ClientHook, public kj::Refcounted {
public:
  BrokenClient(const kj::Exception& exception): exception(exception) {}
  BrokenClient(const kj::StringPtr description)
      : exception(kj::Exception::Nature::PRECONDITION, kj::Exception::Durability::PERMANENT,
                  "", 0, kj::str(description)) {}

  Request<AnyPointer, AnyPointer> newCall(
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
579
    return newBrokenRequest(kj::cp(exception), sizeHint);
580 581 582 583
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
                              kj::Own<CallContextHook>&& context) override {
584
    return VoidPromiseAndPipeline { kj::cp(exception), kj::refcounted<BrokenPipeline>(exception) };
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624
  }

  kj::Maybe<ClientHook&> getResolved() {
    return nullptr;
  }

  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
    return kj::Promise<kj::Own<ClientHook>>(kj::cp(exception));
  }

  kj::Own<ClientHook> addRef() override {
    return kj::addRef(*this);
  }

  const void* getBrand() override {
    return nullptr;
  }

private:
  kj::Exception exception;
};

kj::Own<ClientHook> BrokenPipeline::getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
  return kj::refcounted<BrokenClient>(exception);
}

}  // namespace

kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason) {
  return kj::refcounted<BrokenClient>(reason);
}

kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason) {
  return kj::refcounted<BrokenClient>(kj::mv(reason));
}

kj::Own<PipelineHook> newBrokenPipeline(kj::Exception&& reason) {
  return kj::refcounted<BrokenPipeline>(kj::mv(reason));
}

625 626 627 628 629 630 631
Request<AnyPointer, AnyPointer> newBrokenRequest(
    kj::Exception&& reason, kj::Maybe<MessageSize> sizeHint) {
  auto hook = kj::heap<BrokenRequest>(kj::mv(reason), sizeHint);
  auto root = hook->message.getRoot<AnyPointer>();
  return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
}

632
}  // namespace capnp