capability.c++ 25.5 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21

22 23
#define CAPNP_PRIVATE

24
#include "capability.h"
Kenton Varda's avatar
Kenton Varda committed
25
#include "message.h"
26
#include "arena.h"
Kenton Varda's avatar
Kenton Varda committed
27
#include <kj/refcount.h>
28
#include <kj/debug.h>
Kenton Varda's avatar
Kenton Varda committed
29
#include <kj/vector.h>
30
#include <map>
31
#include "generated-header-support.h"
32 33 34

namespace capnp {

35 36 37 38 39 40 41 42 43
namespace _ {

void setGlobalBrokenCapFactoryForLayoutCpp(BrokenCapFactory& factory);
// Defined in layout.c++.

}  // namespace _

namespace {

44 45
static kj::Own<ClientHook> newNullCap();

46 47 48 49 50
class BrokenCapFactoryImpl: public _::BrokenCapFactory {
public:
  kj::Own<ClientHook> newBrokenCap(kj::StringPtr description) override {
    return capnp::newBrokenCap(description);
  }
51 52 53
  kj::Own<ClientHook> newNullCap() override {
    return capnp::newNullCap();
  }
54 55 56 57 58 59 60 61 62 63 64 65
};

static BrokenCapFactoryImpl brokenCapFactory;

}  // namespace

ClientHook::ClientHook() {
  setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
}

// =======================================================================================

66
Capability::Client::Client(decltype(nullptr))
67
    : hook(newNullCap()) {}
68

69 70 71
Capability::Client::Client(kj::Exception&& exception)
    : hook(newBrokenCap(kj::mv(exception))) {}

72 73
kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* actualInterfaceName, uint64_t requestedTypeId) {
74 75
  return KJ_EXCEPTION(UNIMPLEMENTED, "Requested interface not implemented.",
                      actualInterfaceName, requestedTypeId);
76 77 78 79
}

kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* interfaceName, uint64_t typeId, uint16_t methodId) {
80
  return KJ_EXCEPTION(UNIMPLEMENTED, "Method not implemented.", interfaceName, typeId, methodId);
81 82 83 84
}

kj::Promise<void> Capability::Server::internalUnimplemented(
    const char* interfaceName, const char* methodName, uint64_t typeId, uint16_t methodId) {
85 86
  return KJ_EXCEPTION(UNIMPLEMENTED, "Method not implemented.", interfaceName,
                      typeId, methodName, methodId);
87 88
}

Kenton Varda's avatar
Kenton Varda committed
89 90
ResponseHook::~ResponseHook() noexcept(false) {}

91
kj::Promise<void> ClientHook::whenResolved() {
92
  KJ_IF_MAYBE(promise, whenMoreResolved()) {
93
    return promise->then([](kj::Own<ClientHook>&& resolution) {
94 95 96 97 98 99
      return resolution->whenResolved();
    });
  } else {
    return kj::READY_NOW;
  }
}
Kenton Varda's avatar
Kenton Varda committed
100

101
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
102

103 104 105 106 107 108 109 110
static inline uint firstSegmentSize(kj::Maybe<MessageSize> sizeHint) {
  KJ_IF_MAYBE(s, sizeHint) {
    return s->wordCount;
  } else {
    return SUGGESTED_FIRST_SEGMENT_WORDS;
  }
}

111
class LocalResponse final: public ResponseHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
112
public:
113
  LocalResponse(kj::Maybe<MessageSize> sizeHint)
114
      : message(firstSegmentSize(sizeHint)) {}
Kenton Varda's avatar
Kenton Varda committed
115

116
  MallocMessageBuilder message;
Kenton Varda's avatar
Kenton Varda committed
117 118
};

119
class LocalCallContext final: public CallContextHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
120
public:
121
  LocalCallContext(kj::Own<MallocMessageBuilder>&& request, kj::Own<ClientHook> clientRef,
122 123 124
                   kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller)
      : request(kj::mv(request)), clientRef(kj::mv(clientRef)),
        cancelAllowedFulfiller(kj::mv(cancelAllowedFulfiller)) {}
Kenton Varda's avatar
Kenton Varda committed
125

126
  AnyPointer::Reader getParams() override {
127
    KJ_IF_MAYBE(r, request) {
128
      return r->get()->getRoot<AnyPointer>();
129 130 131
    } else {
      KJ_FAIL_REQUIRE("Can't call getParams() after releaseParams().");
    }
Kenton Varda's avatar
Kenton Varda committed
132 133 134 135
  }
  void releaseParams() override {
    request = nullptr;
  }
136
  AnyPointer::Builder getResults(kj::Maybe<MessageSize> sizeHint) override {
137
    if (response == nullptr) {
138
      auto localResponse = kj::refcounted<LocalResponse>(sizeHint);
139
      responseBuilder = localResponse->message.getRoot<AnyPointer>();
140
      response = Response<AnyPointer>(responseBuilder.asReader(), kj::mv(localResponse));
Kenton Varda's avatar
Kenton Varda committed
141
    }
142 143
    return responseBuilder;
  }
144 145 146
  kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
    auto result = directTailCall(kj::mv(request));
    KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
147
      f->get()->fulfill(AnyPointer::Pipeline(kj::mv(result.pipeline)));
148 149 150 151
    }
    return kj::mv(result.promise);
  }
  ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
152 153 154 155
    KJ_REQUIRE(response == nullptr, "Can't call tailCall() after initializing the results struct.");

    auto promise = request->send();

156
    auto voidPromise = promise.then([this](Response<AnyPointer>&& tailResponse) {
157 158
      response = kj::mv(tailResponse);
    });
159 160

    return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
161
  }
162 163
  kj::Promise<AnyPointer::Pipeline> onTailCall() override {
    auto paf = kj::newPromiseAndFulfiller<AnyPointer::Pipeline>();
164 165
    tailCallPipelineFulfiller = kj::mv(paf.fulfiller);
    return kj::mv(paf.promise);
Kenton Varda's avatar
Kenton Varda committed
166
  }
167
  void allowCancellation() override {
168
    cancelAllowedFulfiller->fulfill();
Kenton Varda's avatar
Kenton Varda committed
169
  }
170 171
  kj::Own<CallContextHook> addRef() override {
    return kj::addRef(*this);
172
  }
Kenton Varda's avatar
Kenton Varda committed
173

174
  kj::Maybe<kj::Own<MallocMessageBuilder>> request;
175 176
  kj::Maybe<Response<AnyPointer>> response;
  AnyPointer::Builder responseBuilder = nullptr;  // only valid if `response` is non-null
177
  kj::Own<ClientHook> clientRef;
178
  kj::Maybe<kj::Own<kj::PromiseFulfiller<AnyPointer::Pipeline>>> tailCallPipelineFulfiller;
179
  kj::Own<kj::PromiseFulfiller<void>> cancelAllowedFulfiller;
Kenton Varda's avatar
Kenton Varda committed
180 181
};

182
class LocalRequest final: public RequestHook {
Kenton Varda's avatar
Kenton Varda committed
183
public:
184
  inline LocalRequest(uint64_t interfaceId, uint16_t methodId,
185
                      kj::Maybe<MessageSize> sizeHint, kj::Own<ClientHook> client)
186
      : message(kj::heap<MallocMessageBuilder>(firstSegmentSize(sizeHint))),
187 188
        interfaceId(interfaceId), methodId(methodId), client(kj::mv(client)) {}

189
  RemotePromise<AnyPointer> send() override {
190 191
    KJ_REQUIRE(message.get() != nullptr, "Already called send() on this request.");

192 193 194 195
    // For the lambda capture.
    uint64_t interfaceId = this->interfaceId;
    uint16_t methodId = this->methodId;

196 197 198 199
    auto cancelPaf = kj::newPromiseAndFulfiller<void>();

    auto context = kj::refcounted<LocalCallContext>(
        kj::mv(message), client->addRef(), kj::mv(cancelPaf.fulfiller));
200
    auto promiseAndPipeline = client->call(interfaceId, methodId, kj::addRef(*context));
201

202 203
    // We have to make sure the call is not canceled unless permitted.  We need to fork the promise
    // so that if the client drops their copy, the promise isn't necessarily canceled.
204
    auto forked = promiseAndPipeline.promise.fork();
205 206 207

    // We daemonize one branch, but only after joining it with the promise that fires if
    // cancellation is allowed.
208 209 210 211
    forked.addBranch()
        .attach(kj::addRef(*context))
        .exclusiveJoin(kj::mv(cancelPaf.promise))
        .detach([](kj::Exception&&) {});  // ignore exceptions
212 213

    // Now the other branch returns the response from the context.
214 215
    auto promise = forked.addBranch().then(kj::mvCapture(context,
        [](kj::Own<LocalCallContext>&& context) {
216
      context->getResults(MessageSize { 0, 0 });  // force response allocation
217 218
      return kj::mv(KJ_ASSERT_NONNULL(context->response));
    }));
219

220
    // We return the other branch.
221 222
    return RemotePromise<AnyPointer>(
        kj::mv(promise), AnyPointer::Pipeline(kj::mv(promiseAndPipeline.pipeline)));
223 224
  }

225
  const void* getBrand() override {
226 227 228
    return nullptr;
  }

229
  kj::Own<MallocMessageBuilder> message;
230 231 232 233

private:
  uint64_t interfaceId;
  uint16_t methodId;
234
  kj::Own<ClientHook> client;
235 236 237 238 239 240 241 242 243 244 245 246 247
};

// =======================================================================================
// Call queues
//
// These classes handle pipelining in the case where calls need to be queued in-memory until some
// local operation completes.

class QueuedPipeline final: public PipelineHook, public kj::Refcounted {
  // A PipelineHook which simply queues calls while waiting for a PipelineHook to which to forward
  // them.

public:
248 249
  QueuedPipeline(kj::Promise<kj::Own<PipelineHook>>&& promiseParam)
      : promise(promiseParam.fork()),
250 251 252 253 254
        selfResolutionOp(promise.addBranch().then([this](kj::Own<PipelineHook>&& inner) {
          redirect = kj::mv(inner);
        }, [this](kj::Exception&& exception) {
          redirect = newBrokenPipeline(kj::mv(exception));
        }).eagerlyEvaluate(nullptr)) {}
255

256
  kj::Own<PipelineHook> addRef() override {
257 258 259
    return kj::addRef(*this);
  }

260
  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override {
261 262 263 264
    auto copy = kj::heapArrayBuilder<PipelineOp>(ops.size());
    for (auto& op: ops) {
      copy.add(op);
    }
265
    return getPipelinedCap(copy.finish());
266 267
  }

268
  kj::Own<ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) override;
269 270

private:
271
  kj::ForkedPromise<kj::Own<PipelineHook>> promise;
Kenton Varda's avatar
Kenton Varda committed
272

Kenton Varda's avatar
Kenton Varda committed
273
  kj::Maybe<kj::Own<PipelineHook>> redirect;
Kenton Varda's avatar
Kenton Varda committed
274 275 276 277
  // Once the promise resolves, this will become non-null and point to the underlying object.

  kj::Promise<void> selfResolutionOp;
  // Represents the operation which will set `redirect` when possible.
278 279 280 281 282 283 284
};

class QueuedClient final: public ClientHook, public kj::Refcounted {
  // A ClientHook which simply queues calls while waiting for a ClientHook to which to forward
  // them.

public:
285 286
  QueuedClient(kj::Promise<kj::Own<ClientHook>>&& promiseParam)
      : promise(promiseParam.fork()),
287 288 289 290 291
        selfResolutionOp(promise.addBranch().then([this](kj::Own<ClientHook>&& inner) {
          redirect = kj::mv(inner);
        }, [this](kj::Exception&& exception) {
          redirect = newBrokenCap(kj::mv(exception));
        }).eagerlyEvaluate(nullptr)),
Kenton Varda's avatar
Kenton Varda committed
292
        promiseForCallForwarding(promise.addBranch().fork()),
293
        promiseForClientResolution(promise.addBranch().fork()) {}
Kenton Varda's avatar
Kenton Varda committed
294

295
  Request<AnyPointer, AnyPointer> newCall(
296
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
297
    auto hook = kj::heap<LocalRequest>(
298
        interfaceId, methodId, sizeHint, kj::addRef(*this));
299
    auto root = hook->message->getRoot<AnyPointer>();
300
    return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
Kenton Varda's avatar
Kenton Varda committed
301 302 303
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
304
                              kj::Own<CallContextHook>&& context) override {
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    // This is a bit complicated.  We need to initiate this call later on.  When we initiate the
    // call, we'll get a void promise for its completion and a pipeline object.  Right now, we have
    // to produce a similar void promise and pipeline that will eventually be chained to those.
    // The problem is, these are two independent objects, but they both depend on the result of
    // one future call.
    //
    // So, we need to set up a continuation that will initiate the call later, then we need to
    // fork the promise for that continuation in order to send the completion promise and the
    // pipeline to their respective places.
    //
    // TODO(perf):  Too much reference counting?  Can we do better?  Maybe a way to fork
    //   Promise<Tuple<T, U>> into Tuple<Promise<T>, Promise<U>>?

    struct CallResultHolder: public kj::Refcounted {
      // Essentially acts as a refcounted \VoidPromiseAndPipeline, so that we can create a promise
      // for it and fork that promise.

322
      VoidPromiseAndPipeline content;
323
      // One branch of the fork will use content.promise, the other branch will use
324
      // content.pipeline.  Neither branch will touch the other's piece.
325 326 327

      inline CallResultHolder(VoidPromiseAndPipeline&& content): content(kj::mv(content)) {}

328
      kj::Own<CallResultHolder> addRef() { return kj::addRef(*this); }
329 330 331
    };

    // Create a promise for the call initiation.
332
    kj::ForkedPromise<kj::Own<CallResultHolder>> callResultPromise =
Kenton Varda's avatar
Kenton Varda committed
333
        promiseForCallForwarding.addBranch().then(kj::mvCapture(context,
334
        [=](kj::Own<CallContextHook>&& context, kj::Own<ClientHook>&& client){
335 336
          return kj::refcounted<CallResultHolder>(
              client->call(interfaceId, methodId, kj::mv(context)));
337
        })).fork();
338 339 340

    // Create a promise that extracts the pipeline from the call initiation, and construct our
    // QueuedPipeline to chain to it.
341 342
    auto pipelinePromise = callResultPromise.addBranch().then(
        [](kj::Own<CallResultHolder>&& callResult){
343 344
          return kj::mv(callResult->content.pipeline);
        });
345
    auto pipeline = kj::refcounted<QueuedPipeline>(kj::mv(pipelinePromise));
346 347

    // Create a promise that simply chains to the void promise produced by the call initiation.
348 349
    auto completionPromise = callResultPromise.addBranch().then(
        [](kj::Own<CallResultHolder>&& callResult){
350 351 352 353 354
          return kj::mv(callResult->content.promise);
        });

    // OK, now we can actually return our thing.
    return VoidPromiseAndPipeline { kj::mv(completionPromise), kj::mv(pipeline) };
Kenton Varda's avatar
Kenton Varda committed
355 356
  }

357
  kj::Maybe<ClientHook&> getResolved() override {
Kenton Varda's avatar
Kenton Varda committed
358
    KJ_IF_MAYBE(inner, redirect) {
359 360 361 362 363 364
      return **inner;
    } else {
      return nullptr;
    }
  }

365
  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
Kenton Varda's avatar
Kenton Varda committed
366
    return promiseForClientResolution.addBranch();
Kenton Varda's avatar
Kenton Varda committed
367 368
  }

369
  kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
370 371 372
    return kj::addRef(*this);
  }

373
  const void* getBrand() override {
Kenton Varda's avatar
Kenton Varda committed
374 375 376 377
    return nullptr;
  }

private:
378
  typedef kj::ForkedPromise<kj::Own<ClientHook>> ClientHookPromiseFork;
379

Kenton Varda's avatar
Kenton Varda committed
380 381 382
  kj::Maybe<kj::Own<ClientHook>> redirect;
  // Once the promise resolves, this will become non-null and point to the underlying object.

383 384 385
  ClientHookPromiseFork promise;
  // Promise that resolves when we have a new ClientHook to forward to.
  //
Kenton Varda's avatar
Kenton Varda committed
386
  // This fork shall only have three branches:  `selfResolutionOp`, `promiseForCallForwarding`, and
387 388
  // `promiseForClientResolution`, in that order.

Kenton Varda's avatar
Kenton Varda committed
389 390 391 392
  kj::Promise<void> selfResolutionOp;
  // Represents the operation which will set `redirect` when possible.

  ClientHookPromiseFork promiseForCallForwarding;
393 394 395 396
  // When this promise resolves, each queued call will be forwarded to the real client.  This needs
  // to occur *before* any 'whenMoreResolved()' promises resolve, because we want to make sure
  // previously-queued calls are delivered before any new calls made in response to the resolution.

Kenton Varda's avatar
Kenton Varda committed
397
  ClientHookPromiseFork promiseForClientResolution;
398 399 400 401 402 403
  // whenMoreResolved() returns forks of this promise.  These must resolve *after* queued calls
  // have been initiated (so that any calls made in the whenMoreResolved() handler are correctly
  // delivered after calls made earlier), but *before* any queued calls return (because it might
  // confuse the application if a queued call returns before the capability on which it was made
  // resolves).  Luckily, we know that queued calls will involve, at the very least, an
  // eventLoop.evalLater.
Kenton Varda's avatar
Kenton Varda committed
404 405
};

406
kj::Own<ClientHook> QueuedPipeline::getPipelinedCap(kj::Array<PipelineOp>&& ops) {
Kenton Varda's avatar
Kenton Varda committed
407 408
  KJ_IF_MAYBE(r, redirect) {
    return r->get()->getPipelinedCap(kj::mv(ops));
Kenton Varda's avatar
Kenton Varda committed
409
  } else {
410 411
    auto clientPromise = promise.addBranch().then(kj::mvCapture(ops,
        [](kj::Array<PipelineOp>&& ops, kj::Own<PipelineHook> pipeline) {
Kenton Varda's avatar
Kenton Varda committed
412 413 414
          return pipeline->getPipelinedCap(kj::mv(ops));
        }));

415
    return kj::refcounted<QueuedClient>(kj::mv(clientPromise));
Kenton Varda's avatar
Kenton Varda committed
416
  }
417
}
Kenton Varda's avatar
Kenton Varda committed
418

419
// =======================================================================================
Kenton Varda's avatar
Kenton Varda committed
420

421
class LocalPipeline final: public PipelineHook, public kj::Refcounted {
Kenton Varda's avatar
Kenton Varda committed
422
public:
423 424
  inline LocalPipeline(kj::Own<CallContextHook>&& contextParam)
      : context(kj::mv(contextParam)),
425
        results(context->getResults(MessageSize { 0, 0 })) {}
Kenton Varda's avatar
Kenton Varda committed
426

427
  kj::Own<PipelineHook> addRef() {
428
    return kj::addRef(*this);
Kenton Varda's avatar
Kenton Varda committed
429 430
  }

431
  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
432
    return results.getPipelinedCap(ops);
433
  }
Kenton Varda's avatar
Kenton Varda committed
434 435

private:
436
  kj::Own<CallContextHook> context;
437
  AnyPointer::Reader results;
Kenton Varda's avatar
Kenton Varda committed
438 439 440 441
};

class LocalClient final: public ClientHook, public kj::Refcounted {
public:
442 443 444 445 446
  LocalClient(kj::Own<Capability::Server>&& serverParam)
      : server(kj::mv(serverParam)) {
    server->thisHook = this;
  }
  LocalClient(kj::Own<Capability::Server>&& serverParam,
447
              _::CapabilityServerSetBase& capServerSet, void* ptr)
448 449
      : server(kj::mv(serverParam)), capServerSet(&capServerSet), ptr(ptr) {
    server->thisHook = this;
450 451
  }

452 453
  ~LocalClient() noexcept(false) {
    server->thisHook = nullptr;
454
  }
Kenton Varda's avatar
Kenton Varda committed
455

456
  Request<AnyPointer, AnyPointer> newCall(
457
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
Kenton Varda's avatar
Kenton Varda committed
458
    auto hook = kj::heap<LocalRequest>(
459
        interfaceId, methodId, sizeHint, kj::addRef(*this));
460
    auto root = hook->message->getRoot<AnyPointer>();
461
    return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
Kenton Varda's avatar
Kenton Varda committed
462 463 464
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
465
                              kj::Own<CallContextHook>&& context) override {
466 467
    auto contextPtr = context.get();

468 469 470
    // We don't want to actually dispatch the call synchronously, because we don't want the callee
    // to have any side effects before the promise is returned to the caller.  This helps avoid
    // race conditions.
471 472 473 474 475
    //
    // So, we do an evalLater() here.
    //
    // Note also that QueuedClient depends on this evalLater() to ensure that pipelined calls don't
    // complete before 'whenMoreResolved()' promises resolve.
476 477
    auto promise = kj::evalLater([this,interfaceId,methodId,contextPtr]() {
      return server->dispatchCall(interfaceId, methodId,
478
                                  CallContext<AnyPointer, AnyPointer>(*contextPtr));
479
    }).attach(kj::addRef(*this));
480

481
    // We have to fork this promise for the pipeline to receive a copy of the answer.
482
    auto forked = promise.fork();
483

484 485
    auto pipelinePromise = forked.addBranch().then(kj::mvCapture(context->addRef(),
        [=](kj::Own<CallContextHook>&& context) -> kj::Own<PipelineHook> {
486 487 488
          context->releaseParams();
          return kj::refcounted<LocalPipeline>(kj::mv(context));
        }));
489

490
    auto tailPipelinePromise = context->onTailCall().then([](AnyPointer::Pipeline&& pipeline) {
491 492 493
      return kj::mv(pipeline.hook);
    });

494
    pipelinePromise = pipelinePromise.exclusiveJoin(kj::mv(tailPipelinePromise));
495

496
    auto completionPromise = forked.addBranch().attach(kj::mv(context));
Kenton Varda's avatar
Kenton Varda committed
497

498
    return VoidPromiseAndPipeline { kj::mv(completionPromise),
499
        kj::refcounted<QueuedPipeline>(kj::mv(pipelinePromise)) };
Kenton Varda's avatar
Kenton Varda committed
500 501
  }

502
  kj::Maybe<ClientHook&> getResolved() override {
503 504 505
    return nullptr;
  }

506
  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
Kenton Varda's avatar
Kenton Varda committed
507 508 509
    return nullptr;
  }

510
  kj::Own<ClientHook> addRef() override {
Kenton Varda's avatar
Kenton Varda committed
511 512 513
    return kj::addRef(*this);
  }

514
  const void* getBrand() override {
Kenton Varda's avatar
Kenton Varda committed
515 516 517 518
    // We have no need to detect local objects.
    return nullptr;
  }

519 520 521 522 523 524 525 526
  void* getLocalServer(_::CapabilityServerSetBase& capServerSet) override {
    if (this->capServerSet == &capServerSet) {
      return ptr;
    } else {
      return nullptr;
    }
  }

Kenton Varda's avatar
Kenton Varda committed
527
private:
528
  kj::Own<Capability::Server> server;
529 530
  _::CapabilityServerSetBase* capServerSet = nullptr;
  void* ptr = nullptr;
Kenton Varda's avatar
Kenton Varda committed
531 532
};

533 534
kj::Own<ClientHook> Capability::Client::makeLocalClient(kj::Own<Capability::Server>&& server) {
  return kj::refcounted<LocalClient>(kj::mv(server));
Kenton Varda's avatar
Kenton Varda committed
535 536
}

537 538
kj::Own<ClientHook> newLocalPromiseClient(kj::Promise<kj::Own<ClientHook>>&& promise) {
  return kj::refcounted<QueuedClient>(kj::mv(promise));
Kenton Varda's avatar
Kenton Varda committed
539 540
}

541 542 543 544
kj::Own<PipelineHook> newLocalPromisePipeline(kj::Promise<kj::Own<PipelineHook>>&& promise) {
  return kj::refcounted<QueuedPipeline>(kj::mv(promise));
}

545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
// =======================================================================================

namespace {

class BrokenPipeline final: public PipelineHook, public kj::Refcounted {
public:
  BrokenPipeline(const kj::Exception& exception): exception(exception) {}

  kj::Own<PipelineHook> addRef() override {
    return kj::addRef(*this);
  }

  kj::Own<ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) override;

private:
  kj::Exception exception;
};

class BrokenRequest final: public RequestHook {
public:
  BrokenRequest(const kj::Exception& exception, kj::Maybe<MessageSize> sizeHint)
      : exception(exception), message(firstSegmentSize(sizeHint)) {}

  RemotePromise<AnyPointer> send() override {
    return RemotePromise<AnyPointer>(kj::cp(exception),
        AnyPointer::Pipeline(kj::refcounted<BrokenPipeline>(exception)));
  }

573
  const void* getBrand() override {
574 575 576 577 578 579 580 581 582
    return nullptr;
  }

  kj::Exception exception;
  MallocMessageBuilder message;
};

class BrokenClient final: public ClientHook, public kj::Refcounted {
public:
583 584 585 586 587
  BrokenClient(const kj::Exception& exception, bool resolved, const void* brand = nullptr)
      : exception(exception), resolved(resolved), brand(brand) {}
  BrokenClient(const kj::StringPtr description, bool resolved, const void* brand = nullptr)
      : exception(kj::Exception::Type::FAILED, "", 0, kj::str(description)),
        resolved(resolved), brand(brand) {}
588 589 590

  Request<AnyPointer, AnyPointer> newCall(
      uint64_t interfaceId, uint16_t methodId, kj::Maybe<MessageSize> sizeHint) override {
591
    return newBrokenRequest(kj::cp(exception), sizeHint);
592 593 594 595
  }

  VoidPromiseAndPipeline call(uint64_t interfaceId, uint16_t methodId,
                              kj::Own<CallContextHook>&& context) override {
596
    return VoidPromiseAndPipeline { kj::cp(exception), kj::refcounted<BrokenPipeline>(exception) };
597 598
  }

599
  kj::Maybe<ClientHook&> getResolved() override {
600 601 602 603
    return nullptr;
  }

  kj::Maybe<kj::Promise<kj::Own<ClientHook>>> whenMoreResolved() override {
604 605 606 607 608
    if (resolved) {
      return nullptr;
    } else {
      return kj::Promise<kj::Own<ClientHook>>(kj::cp(exception));
    }
609 610 611 612 613 614 615
  }

  kj::Own<ClientHook> addRef() override {
    return kj::addRef(*this);
  }

  const void* getBrand() override {
616
    return brand;
617 618 619 620
  }

private:
  kj::Exception exception;
621
  bool resolved;
622
  const void* brand;
623 624 625
};

kj::Own<ClientHook> BrokenPipeline::getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) {
626 627 628 629 630
  return kj::refcounted<BrokenClient>(exception, false);
}

kj::Own<ClientHook> newNullCap() {
  // A null capability, unlike other broken capabilities, is considered resolved.
631 632
  return kj::refcounted<BrokenClient>("Called null capability.", true,
                                      &ClientHook::NULL_CAPABILITY_BRAND);
633 634 635 636 637
}

}  // namespace

kj::Own<ClientHook> newBrokenCap(kj::StringPtr reason) {
638
  return kj::refcounted<BrokenClient>(reason, false);
639 640 641
}

kj::Own<ClientHook> newBrokenCap(kj::Exception&& reason) {
642
  return kj::refcounted<BrokenClient>(kj::mv(reason), false);
643 644 645 646 647 648
}

kj::Own<PipelineHook> newBrokenPipeline(kj::Exception&& reason) {
  return kj::refcounted<BrokenPipeline>(kj::mv(reason));
}

649 650 651 652 653 654 655
Request<AnyPointer, AnyPointer> newBrokenRequest(
    kj::Exception&& reason, kj::Maybe<MessageSize> sizeHint) {
  auto hook = kj::heap<BrokenRequest>(kj::mv(reason), sizeHint);
  auto root = hook->message.getRoot<AnyPointer>();
  return Request<AnyPointer, AnyPointer>(root, kj::mv(hook));
}

656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696
// =======================================================================================

ReaderCapabilityTable::ReaderCapabilityTable(
    kj::Array<kj::Maybe<kj::Own<ClientHook>>> table)
    : table(kj::mv(table)) {
  setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
}

kj::Maybe<kj::Own<ClientHook>> ReaderCapabilityTable::extractCap(uint index) {
  if (index < table.size()) {
    return table[index].map([](kj::Own<ClientHook>& cap) { return cap->addRef(); });
  } else {
    return nullptr;
  }
}

BuilderCapabilityTable::BuilderCapabilityTable() {
  setGlobalBrokenCapFactoryForLayoutCpp(brokenCapFactory);
}

kj::Maybe<kj::Own<ClientHook>> BuilderCapabilityTable::extractCap(uint index) {
  if (index < table.size()) {
    return table[index].map([](kj::Own<ClientHook>& cap) { return cap->addRef(); });
  } else {
    return nullptr;
  }
}

uint BuilderCapabilityTable::injectCap(kj::Own<ClientHook>&& cap) {
  uint result = table.size();
  table.add(kj::mv(cap));
  return result;
}

void BuilderCapabilityTable::dropCap(uint index) {
  KJ_ASSERT(index < table.size(), "Invalid capability descriptor in message.") {
    return;
  }
  table[index] = nullptr;
}

697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
// =======================================================================================
// CapabilityServerSet

namespace _ {  // private

Capability::Client CapabilityServerSetBase::addInternal(
    kj::Own<Capability::Server>&& server, void* ptr) {
  return Capability::Client(kj::refcounted<LocalClient>(kj::mv(server), *this, ptr));
}

kj::Promise<void*> CapabilityServerSetBase::getLocalServerInternal(Capability::Client& client) {
  ClientHook* hook = client.hook.get();

  // Get the most-resolved-so-far version of the hook.
  KJ_IF_MAYBE(h, hook->getResolved()) {
    hook = h;
  };

  KJ_IF_MAYBE(p, hook->whenMoreResolved()) {
    // This hook is an unresolved promise. We need to wait for it.
    return p->attach(hook->addRef())
        .then([this](kj::Own<ClientHook>&& resolved) {
      Capability::Client client(kj::mv(resolved));
      return getLocalServerInternal(client);
    });
  } else {
    return hook->getLocalServer(*this);
  }
}

}  // namespace _ (private)

729
}  // namespace capnp