async.c++ 24.8 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22 23

#include "async.h"
#include "debug.h"
24
#include "vector.h"
25
#include "threadlocal.h"
26
#include <exception>
27
#include <map>
28

Kenton Varda's avatar
Kenton Varda committed
29
#if KJ_USE_FUTEX
30 31 32
#include <unistd.h>
#include <sys/syscall.h>
#include <linux/futex.h>
33
#endif
34

35 36
#if !KJ_NO_RTTI
#include <typeinfo>
37 38
#if __GNUC__
#include <cxxabi.h>
39 40
#include <stdlib.h>
#endif
41 42
#endif

43 44 45 46
namespace kj {

namespace {

47
KJ_THREADLOCAL_PTR(EventLoop) threadLocalEventLoop = nullptr;
48

49
#define _kJ_ALREADY_READY reinterpret_cast< ::kj::_::Event*>(1)
50

51 52 53 54 55 56
EventLoop& currentEventLoop() {
  EventLoop* loop = threadLocalEventLoop;
  KJ_REQUIRE(loop != nullptr, "No event loop is running on this thread.");
  return *loop;
}

57
class BoolEvent: public _::Event {
58 59 60
public:
  bool fired = false;

61
  Maybe<Own<_::Event>> fire() override {
62
    fired = true;
63
    return nullptr;
64 65 66
  }
};

67 68
class YieldPromiseNode final: public _::PromiseNode {
public:
Kenton Varda's avatar
Kenton Varda committed
69
  void onReady(_::Event& event) noexcept override {
70
    event.armBreadthFirst();
71 72
  }
  void get(_::ExceptionOrValue& output) noexcept override {
Kenton Varda's avatar
Kenton Varda committed
73
    output.as<_::Void>() = _::Void();
74 75 76
  }
};

77
class NeverDonePromiseNode final: public _::PromiseNode {
78
public:
Kenton Varda's avatar
Kenton Varda committed
79 80
  void onReady(_::Event& event) noexcept override {
    // ignore
81 82 83 84 85 86
  }
  void get(_::ExceptionOrValue& output) noexcept override {
    KJ_FAIL_REQUIRE("Not ready.");
  }
};

87 88
}  // namespace

Kenton Varda's avatar
Kenton Varda committed
89 90 91 92
namespace _ {  // private

class TaskSetImpl {
public:
93 94
  inline TaskSetImpl(TaskSet::ErrorHandler& errorHandler)
    : errorHandler(errorHandler) {}
Kenton Varda's avatar
Kenton Varda committed
95 96 97

  ~TaskSetImpl() noexcept(false) {
    // std::map doesn't like it when elements' destructors throw, so carefully disassemble it.
98 99 100
    if (!tasks.empty()) {
      Vector<Own<Task>> deleteMe(tasks.size());
      for (auto& entry: tasks) {
Kenton Varda's avatar
Kenton Varda committed
101 102 103 104 105
        deleteMe.add(kj::mv(entry.second));
      }
    }
  }

106
  class Task final: public Event {
Kenton Varda's avatar
Kenton Varda committed
107
  public:
108 109
    Task(TaskSetImpl& taskSet, Own<_::PromiseNode>&& nodeParam)
        : taskSet(taskSet), node(kj::mv(nodeParam)) {
110
      node->setSelfPointer(&node);
Kenton Varda's avatar
Kenton Varda committed
111
      node->onReady(*this);
Kenton Varda's avatar
Kenton Varda committed
112 113 114
    }

  protected:
115
    Maybe<Own<Event>> fire() override {
Kenton Varda's avatar
Kenton Varda committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
      // Get the result.
      _::ExceptionOr<_::Void> result;
      node->get(result);

      // Delete the node, catching any exceptions.
      KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
        node = nullptr;
      })) {
        result.addException(kj::mv(*exception));
      }

      // Call the error handler if there was an exception.
      KJ_IF_MAYBE(e, result.exception) {
        taskSet.errorHandler.taskFailed(kj::mv(*e));
      }
131 132 133 134 135 136 137 138 139 140 141

      // Remove from the task map.
      auto iter = taskSet.tasks.find(this);
      KJ_ASSERT(iter != taskSet.tasks.end());
      Own<Event> self = kj::mv(iter->second);
      taskSet.tasks.erase(iter);
      return mv(self);
    }

    _::PromiseNode* getInnerForTrace() override {
      return node;
Kenton Varda's avatar
Kenton Varda committed
142 143 144
    }

  private:
145
    TaskSetImpl& taskSet;
Kenton Varda's avatar
Kenton Varda committed
146 147 148
    kj::Own<_::PromiseNode> node;
  };

149 150
  void add(Promise<void>&& promise) {
    auto task = heap<Task>(*this, kj::mv(promise.node));
Kenton Varda's avatar
Kenton Varda committed
151
    Task* ptr = task;
152 153 154 155 156 157 158 159 160
    tasks.insert(std::make_pair(ptr, kj::mv(task)));
  }

  kj::String trace() {
    kj::Vector<kj::String> traces;
    for (auto& entry: tasks) {
      traces.add(entry.second->trace());
    }
    return kj::strArray(traces, "\n============================================\n");
Kenton Varda's avatar
Kenton Varda committed
161 162 163 164 165
  }

private:
  TaskSet::ErrorHandler& errorHandler;

166
  // TODO(perf):  Use a linked list instead.
167
  std::map<Task*, Own<Task>> tasks;
Kenton Varda's avatar
Kenton Varda committed
168 169 170 171 172 173 174 175 176 177 178 179 180
};

class LoggingErrorHandler: public TaskSet::ErrorHandler {
public:
  static LoggingErrorHandler instance;

  void taskFailed(kj::Exception&& exception) override {
    KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
  }
};

LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler();

181 182
class NullEventPort: public EventPort {
public:
183
  bool wait() override {
184 185 186
    KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
  }

187 188 189
  bool poll() override { return false; }

  void wake() const override {
190
    // TODO(someday): Implement using condvar.
191 192 193
    kj::throwRecoverableException(KJ_EXCEPTION(UNIMPLEMENTED,
        "Cross-thread events are not yet implemented for EventLoops with no EventPort."));
  }
194 195 196 197 198 199

  static NullEventPort instance;
};

NullEventPort NullEventPort::instance = NullEventPort();

Kenton Varda's avatar
Kenton Varda committed
200 201 202 203
}  // namespace _ (private)

// =======================================================================================

204
void EventPort::setRunnable(bool runnable) {}
205

206 207 208 209 210
void EventPort::wake() const {
  kj::throwRecoverableException(KJ_EXCEPTION(UNIMPLEMENTED,
      "cross-thread wake() not implemented by this EventPort implementation"));
}

211 212
EventLoop::EventLoop()
    : port(_::NullEventPort::instance),
213
      daemons(kj::heap<_::TaskSetImpl>(_::LoggingErrorHandler::instance)) {}
214

215 216
EventLoop::EventLoop(EventPort& port)
    : port(port),
217
      daemons(kj::heap<_::TaskSetImpl>(_::LoggingErrorHandler::instance)) {}
Kenton Varda's avatar
Kenton Varda committed
218

219
EventLoop::~EventLoop() noexcept(false) {
220 221 222 223 224 225 226 227 228
  // Destroy all "daemon" tasks, noting that their destructors might try to access the EventLoop
  // some more.
  daemons = nullptr;

  // The application _should_ destroy everything using the EventLoop before destroying the
  // EventLoop itself, so if there are events on the loop, this indicates a memory leak.
  KJ_REQUIRE(head == nullptr, "EventLoop destroyed with events still in the queue.  Memory leak?",
             head->trace()) {
    // Unlink all the events and hope that no one ever fires them...
229
    _::Event* event = head;
230
    while (event != nullptr) {
231
      _::Event* next = event->next;
232 233 234 235 236 237
      event->next = nullptr;
      event->prev = nullptr;
      event = next;
    }
    break;
  }
238 239 240 241 242 243

  KJ_REQUIRE(threadLocalEventLoop != this,
             "EventLoop destroyed while still current for the thread.") {
    threadLocalEventLoop = nullptr;
    break;
  }
244
}
245

246
void EventLoop::run(uint maxTurnCount) {
247 248 249
  running = true;
  KJ_DEFER(running = false);

250 251 252 253 254
  for (uint i = 0; i < maxTurnCount; i++) {
    if (!turn()) {
      break;
    }
  }
255 256

  setRunnable(head != nullptr);
257 258
}

259 260
bool EventLoop::turn() {
  _::Event* event = head;
261

262 263 264 265 266 267 268 269
  if (event == nullptr) {
    // No events in the queue.
    return false;
  } else {
    head = event->next;
    if (head != nullptr) {
      head->prev = &head;
    }
270

271 272 273 274
    depthFirstInsertPoint = &head;
    if (tail == &event->next) {
      tail = &head;
    }
275

276 277
    event->next = nullptr;
    event->prev = nullptr;
278

279 280 281 282 283 284
    Maybe<Own<_::Event>> eventToDestroy;
    {
      event->firing = true;
      KJ_DEFER(event->firing = false);
      eventToDestroy = event->fire();
    }
Kenton Varda's avatar
Kenton Varda committed
285

286 287 288 289
    depthFirstInsertPoint = &head;
    return true;
  }
}
290

291 292 293 294 295 296 297
void EventLoop::setRunnable(bool runnable) {
  if (runnable != lastRunnableState) {
    port.setRunnable(runnable);
    lastRunnableState = runnable;
  }
}

298 299 300 301 302 303 304 305 306 307 308 309 310
void EventLoop::enterScope() {
  KJ_REQUIRE(threadLocalEventLoop == nullptr, "This thread already has an EventLoop.");
  threadLocalEventLoop = this;
}

void EventLoop::leaveScope() {
  KJ_REQUIRE(threadLocalEventLoop == this,
             "WaitScope destroyed in a different thread than it was created in.") {
    break;
  }
  threadLocalEventLoop = nullptr;
}

311
namespace _ {  // private
312

313 314 315
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope) {
  EventLoop& loop = waitScope.loop;
  KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
316
  KJ_REQUIRE(!loop.running, "wait() is not allowed from within event callbacks.");
317

318
  BoolEvent doneEvent;
319
  node->setSelfPointer(&node);
320 321 322 323 324 325 326 327 328 329
  node->onReady(doneEvent);

  loop.running = true;
  KJ_DEFER(loop.running = false);

  while (!doneEvent.fired) {
    if (!loop.turn()) {
      // No events in the queue.  Wait for callback.
      loop.port.wait();
    }
330
  }
331

332 333
  loop.setRunnable(loop.head != nullptr);

334
  node->get(result);
335
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
336 337 338 339
    node = nullptr;
  })) {
    result.addException(kj::mv(*exception));
  }
340 341
}

342
Promise<void> yield() {
343 344 345
  return Promise<void>(false, kj::heap<YieldPromiseNode>());
}

346 347 348 349
Own<PromiseNode> neverDone() {
  return kj::heap<NeverDonePromiseNode>();
}

Kenton Varda's avatar
Kenton Varda committed
350
void NeverDone::wait(WaitScope& waitScope) const {
351 352 353 354 355
  ExceptionOr<Void> dummy;
  waitImpl(neverDone(), dummy, waitScope);
  KJ_UNREACHABLE;
}

356
void detach(kj::Promise<void>&& promise) {
357 358 359
  EventLoop& loop = currentEventLoop();
  KJ_REQUIRE(loop.daemons.get() != nullptr, "EventLoop is shutting down.") { return; }
  loop.daemons->add(kj::mv(promise));
360 361
}

362
Event::Event()
363
    : loop(currentEventLoop()), next(nullptr), prev(nullptr) {}
364

365
Event::~Event() noexcept(false) {
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
  if (prev != nullptr) {
    if (loop.tail == &next) {
      loop.tail = prev;
    }
    if (loop.depthFirstInsertPoint == &next) {
      loop.depthFirstInsertPoint = prev;
    }

    *prev = next;
    if (next != nullptr) {
      next->prev = prev;
    }
  }

  KJ_REQUIRE(!firing, "Promise callback destroyed itself.");
  KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
             "Promise destroyed from a different thread than it was created in.");
Kenton Varda's avatar
Kenton Varda committed
383 384
}

385
void Event::armDepthFirst() {
386 387 388
  KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
             "Event armed from different thread than it was created in.  You must use "
             "the thread-safe work queue to queue events cross-thread.");
389

390 391 392 393 394 395 396
  if (prev == nullptr) {
    next = *loop.depthFirstInsertPoint;
    prev = loop.depthFirstInsertPoint;
    *prev = this;
    if (next != nullptr) {
      next->prev = &next;
    }
397

398 399 400 401 402
    loop.depthFirstInsertPoint = &next;

    if (loop.tail == prev) {
      loop.tail = &next;
    }
403 404

    loop.setRunnable(true);
405 406 407
  }
}

408
void Event::armBreadthFirst() {
409 410 411 412 413 414 415 416 417 418 419 420 421
  KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
             "Event armed from different thread than it was created in.  You must use "
             "the thread-safe work queue to queue events cross-thread.");

  if (prev == nullptr) {
    next = *loop.tail;
    prev = loop.tail;
    *prev = this;
    if (next != nullptr) {
      next->prev = &next;
    }

    loop.tail = &next;
422 423

    loop.setRunnable(true);
424 425 426
  }
}

427
_::PromiseNode* Event::getInnerForTrace() {
428 429 430
  return nullptr;
}

431
#if !KJ_NO_RTTI
432 433 434 435
#if __GNUC__
static kj::String demangleTypeName(const char* name) {
  int status;
  char* buf = abi::__cxa_demangle(name, nullptr, nullptr, &status);
436
  kj::String result = kj::heapString(buf == nullptr ? name : buf);
437 438 439 440 441 442 443 444
  free(buf);
  return kj::mv(result);
}
#else
static kj::String demangleTypeName(const char* name) {
  return kj::heapString(name);
}
#endif
445
#endif
446

447
static kj::String traceImpl(Event* event, _::PromiseNode* node) {
448 449 450
#if KJ_NO_RTTI
  return heapString("Trace not available because RTTI is disabled.");
#else
451 452 453 454 455 456 457 458 459 460 461 462
  kj::Vector<kj::String> trace;

  if (event != nullptr) {
    trace.add(demangleTypeName(typeid(*event).name()));
  }

  while (node != nullptr) {
    trace.add(demangleTypeName(typeid(*node).name()));
    node = node->getInnerForTrace();
  }

  return strArray(trace, "\n");
463
#endif
464 465
}

466
kj::String Event::trace() {
467
  return traceImpl(this, getInnerForTrace());
468 469
}

470 471
}  // namespace _ (private)

472 473
// =======================================================================================

474 475
TaskSet::TaskSet(ErrorHandler& errorHandler)
    : impl(heap<_::TaskSetImpl>(errorHandler)) {}
476 477 478

TaskSet::~TaskSet() noexcept(false) {}

479
void TaskSet::add(Promise<void>&& promise) {
480 481 482
  impl->add(kj::mv(promise));
}

483 484 485 486
kj::String TaskSet::trace() {
  return impl->trace();
}

487 488
namespace _ {  // private

489 490 491 492
kj::String PromiseBase::trace() {
  return traceImpl(nullptr, node);
}

493 494
void PromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {}

495
PromiseNode* PromiseNode::getInnerForTrace() { return nullptr; }
496

Kenton Varda's avatar
Kenton Varda committed
497
void PromiseNode::OnReadyEvent::init(Event& newEvent) {
498
  if (event == _kJ_ALREADY_READY) {
Kenton Varda's avatar
Kenton Varda committed
499 500 501 502
    // A new continuation was added to a promise that was already ready.  In this case, we schedule
    // breadth-first, to make it difficult for applications to accidentally starve the event loop
    // by repeatedly waiting on immediate promises.
    newEvent.armBreadthFirst();
503 504
  } else {
    event = &newEvent;
505 506 507
  }
}

508 509 510 511
void PromiseNode::OnReadyEvent::arm() {
  if (event == nullptr) {
    event = _kJ_ALREADY_READY;
  } else {
Kenton Varda's avatar
Kenton Varda committed
512 513 514
    // A promise resolved and an event is already waiting on it.  In this case, arm in depth-first
    // order so that the event runs immediately after the current one.  This way, chained promises
    // execute together for better cache locality and lower latency.
515
    event->armDepthFirst();
516 517 518
  }
}

519 520
// -------------------------------------------------------------------

521 522 523
ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
ImmediatePromiseNodeBase::~ImmediatePromiseNodeBase() noexcept(false) {}

Kenton Varda's avatar
Kenton Varda committed
524 525 526
void ImmediatePromiseNodeBase::onReady(Event& event) noexcept {
  event.armBreadthFirst();
}
527 528 529 530 531 532 533 534

ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception)
    : exception(kj::mv(exception)) {}

void ImmediateBrokenPromiseNode::get(ExceptionOrValue& output) noexcept {
  output.exception = kj::mv(exception);
}

535 536
// -------------------------------------------------------------------

537 538 539 540
AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependencyParam)
    : dependency(kj::mv(dependencyParam)) {
  dependency->setSelfPointer(&dependency);
}
541

Kenton Varda's avatar
Kenton Varda committed
542 543
void AttachmentPromiseNodeBase::onReady(Event& event) noexcept {
  dependency->onReady(event);
544 545 546 547 548 549
}

void AttachmentPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
  dependency->get(output);
}

550 551
PromiseNode* AttachmentPromiseNodeBase::getInnerForTrace() {
  return dependency;
552 553 554 555 556 557 558 559
}

void AttachmentPromiseNodeBase::dropDependency() {
  dependency = nullptr;
}

// -------------------------------------------------------------------

560 561 562 563
TransformPromiseNodeBase::TransformPromiseNodeBase(Own<PromiseNode>&& dependencyParam)
    : dependency(kj::mv(dependencyParam)) {
  dependency->setSelfPointer(&dependency);
}
564

Kenton Varda's avatar
Kenton Varda committed
565 566
void TransformPromiseNodeBase::onReady(Event& event) noexcept {
  dependency->onReady(event);
567 568 569 570 571
}

void TransformPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
    getImpl(output);
572
    dropDependency();
573 574 575 576 577
  })) {
    output.addException(kj::mv(*exception));
  }
}

578 579
PromiseNode* TransformPromiseNodeBase::getInnerForTrace() {
  return dependency;
580 581
}

582 583 584 585
void TransformPromiseNodeBase::dropDependency() {
  dependency = nullptr;
}

586 587 588 589 590 591 592 593 594
void TransformPromiseNodeBase::getDepResult(ExceptionOrValue& output) {
  dependency->get(output);
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
    dependency = nullptr;
  })) {
    output.addException(kj::mv(*exception));
  }
}

595 596
// -------------------------------------------------------------------

597
ForkBranchBase::ForkBranchBase(Own<ForkHubBase>&& hubParam): hub(kj::mv(hubParam)) {
598
  if (hub->tailBranch == nullptr) {
599
    onReadyEvent.arm();
600 601
  } else {
    // Insert into hub's linked list of branches.
602
    prevPtr = hub->tailBranch;
603 604
    *prevPtr = this;
    next = nullptr;
605
    hub->tailBranch = &next;
606 607 608
  }
}

Kenton Varda's avatar
Kenton Varda committed
609
ForkBranchBase::~ForkBranchBase() noexcept(false) {
610 611 612
  if (prevPtr != nullptr) {
    // Remove from hub's linked list of branches.
    *prevPtr = next;
613
    (next == nullptr ? hub->tailBranch : next->prevPtr) = prevPtr;
614 615 616 617
  }
}

void ForkBranchBase::hubReady() noexcept {
618
  onReadyEvent.arm();
619 620 621 622
}

void ForkBranchBase::releaseHub(ExceptionOrValue& output) {
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
623
    hub = nullptr;
624 625 626 627 628
  })) {
    output.addException(kj::mv(*exception));
  }
}

Kenton Varda's avatar
Kenton Varda committed
629 630
void ForkBranchBase::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
631 632
}

633 634
PromiseNode* ForkBranchBase::getInnerForTrace() {
  return hub->getInnerForTrace();
635 636 637 638
}

// -------------------------------------------------------------------

639 640
ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& resultRef)
    : inner(kj::mv(innerParam)), resultRef(resultRef) {
641
  inner->setSelfPointer(&inner);
Kenton Varda's avatar
Kenton Varda committed
642
  inner->onReady(*this);
643
}
644

645
Maybe<Own<Event>> ForkHubBase::fire() {
646 647 648 649 650 651 652
  // Dependency is ready.  Fetch its result and then delete the node.
  inner->get(resultRef);
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
    inner = nullptr;
  })) {
    resultRef.addException(kj::mv(*exception));
  }
653

654
  for (auto branch = headBranch; branch != nullptr; branch = branch->next) {
655 656 657
    branch->hubReady();
    *branch->prevPtr = nullptr;
    branch->prevPtr = nullptr;
658
  }
659
  *tailBranch = nullptr;
660 661

  // Indicate that the list is no longer active.
662
  tailBranch = nullptr;
663 664 665 666 667 668

  return nullptr;
}

_::PromiseNode* ForkHubBase::getInnerForTrace() {
  return inner;
669 670 671 672
}

// -------------------------------------------------------------------

673 674
ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam)
    : state(STEP1), inner(kj::mv(innerParam)) {
675
  inner->setSelfPointer(&inner);
Kenton Varda's avatar
Kenton Varda committed
676
  inner->onReady(*this);
677 678
}

679
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {}
680

Kenton Varda's avatar
Kenton Varda committed
681
void ChainPromiseNode::onReady(Event& event) noexcept {
682 683
  switch (state) {
    case STEP1:
684
      KJ_REQUIRE(onReadyEvent == nullptr, "onReady() can only be called once.");
685
      onReadyEvent = &event;
Kenton Varda's avatar
Kenton Varda committed
686
      return;
687
    case STEP2:
Kenton Varda's avatar
Kenton Varda committed
688 689
      inner->onReady(event);
      return;
690 691 692 693
  }
  KJ_UNREACHABLE;
}

694 695 696 697 698 699 700 701 702
void ChainPromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {
  if (state == STEP2) {
    *selfPtr = kj::mv(inner);  // deletes this!
    selfPtr->get()->setSelfPointer(selfPtr);
  } else {
    this->selfPtr = selfPtr;
  }
}

703
void ChainPromiseNode::get(ExceptionOrValue& output) noexcept {
704
  KJ_REQUIRE(state == STEP2);
705 706 707
  return inner->get(output);
}

708 709
PromiseNode* ChainPromiseNode::getInnerForTrace() {
  return inner;
710 711
}

712
Maybe<Own<Event>> ChainPromiseNode::fire() {
713
  KJ_REQUIRE(state != STEP2);
714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738

  static_assert(sizeof(Promise<int>) == sizeof(PromiseBase),
      "This code assumes Promise<T> does not add any new members to PromiseBase.");

  ExceptionOr<PromiseBase> intermediate;
  inner->get(intermediate);

  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
    inner = nullptr;
  })) {
    intermediate.addException(kj::mv(*exception));
  }

  KJ_IF_MAYBE(exception, intermediate.exception) {
    // There is an exception.  If there is also a value, delete it.
    kj::runCatchingExceptions([&,this]() { intermediate.value = nullptr; });
    // Now set step2 to a rejected promise.
    inner = heap<ImmediateBrokenPromiseNode>(kj::mv(*exception));
  } else KJ_IF_MAYBE(value, intermediate.value) {
    // There is a value and no exception.  The value is itself a promise.  Adopt it as our
    // step2.
    inner = kj::mv(value->node);
  } else {
    // We can only get here if inner->get() returned neither an exception nor a
    // value, which never actually happens.
739
    KJ_FAIL_ASSERT("Inner node returned empty value.");
740 741 742
  }
  state = STEP2;

743 744 745 746 747 748 749 750
  if (selfPtr != nullptr) {
    // Hey, we can shorten the chain here.
    auto chain = selfPtr->downcast<ChainPromiseNode>();
    *selfPtr = kj::mv(inner);
    selfPtr->get()->setSelfPointer(selfPtr);
    if (onReadyEvent != nullptr) {
      selfPtr->get()->onReady(*onReadyEvent);
    }
751

752 753 754 755 756 757 758 759 760 761
    // Return our self-pointer so that the caller takes care of deleting it.
    return Own<Event>(kj::mv(chain));
  } else {
    inner->setSelfPointer(&inner);
    if (onReadyEvent != nullptr) {
      inner->onReady(*onReadyEvent);
    }

    return nullptr;
  }
762 763
}

764 765
// -------------------------------------------------------------------

766 767
ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right)
    : left(*this, kj::mv(left)), right(*this, kj::mv(right)) {}
768 769 770

ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {}

Kenton Varda's avatar
Kenton Varda committed
771 772
void ExclusiveJoinPromiseNode::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
773 774 775
}

void ExclusiveJoinPromiseNode::get(ExceptionOrValue& output) noexcept {
776
  KJ_REQUIRE(left.get(output) || right.get(output), "get() called before ready.");
777 778
}

779 780 781 782 783 784
PromiseNode* ExclusiveJoinPromiseNode::getInnerForTrace() {
  auto result = left.getInnerForTrace();
  if (result == nullptr) {
    result = right.getInnerForTrace();
  }
  return result;
785 786 787
}

ExclusiveJoinPromiseNode::Branch::Branch(
788 789
    ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependencyParam)
    : joinNode(joinNode), dependency(kj::mv(dependencyParam)) {
790
  dependency->setSelfPointer(&dependency);
Kenton Varda's avatar
Kenton Varda committed
791
  dependency->onReady(*this);
792 793
}

794
ExclusiveJoinPromiseNode::Branch::~Branch() noexcept(false) {}
795 796

bool ExclusiveJoinPromiseNode::Branch::get(ExceptionOrValue& output) {
797
  if (dependency) {
798 799 800 801 802 803 804
    dependency->get(output);
    return true;
  } else {
    return false;
  }
}

805
Maybe<Own<Event>> ExclusiveJoinPromiseNode::Branch::fire() {
806 807 808
  // Cancel the branch that didn't return first.  Ignore exceptions caused by cancellation.
  if (this == &joinNode.left) {
    kj::runCatchingExceptions([&]() { joinNode.right.dependency = nullptr; });
809
  } else {
810 811
    kj::runCatchingExceptions([&]() { joinNode.left.dependency = nullptr; });
  }
812

813 814 815
  joinNode.onReadyEvent.arm();
  return nullptr;
}
816

817 818
PromiseNode* ExclusiveJoinPromiseNode::Branch::getInnerForTrace() {
  return dependency;
819 820 821 822
}

// -------------------------------------------------------------------

823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889
ArrayJoinPromiseNodeBase::ArrayJoinPromiseNodeBase(
    Array<Own<PromiseNode>> promises, ExceptionOrValue* resultParts, size_t partSize)
    : countLeft(promises.size()) {
  // Make the branches.
  auto builder = heapArrayBuilder<Branch>(promises.size());
  for (uint i: indices(promises)) {
    ExceptionOrValue& output = *reinterpret_cast<ExceptionOrValue*>(
        reinterpret_cast<byte*>(resultParts) + i * partSize);
    builder.add(*this, kj::mv(promises[i]), output);
  }
  branches = builder.finish();

  if (branches.size() == 0) {
    onReadyEvent.arm();
  }
}
ArrayJoinPromiseNodeBase::~ArrayJoinPromiseNodeBase() noexcept(false) {}

void ArrayJoinPromiseNodeBase::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
}

void ArrayJoinPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
  // If any of the elements threw exceptions, propagate them.
  for (auto& branch: branches) {
    KJ_IF_MAYBE(exception, branch.getPart()) {
      output.addException(kj::mv(*exception));
    }
  }

  if (output.exception == nullptr) {
    // No errors.  The template subclass will need to fill in the result.
    getNoError(output);
  }
}

PromiseNode* ArrayJoinPromiseNodeBase::getInnerForTrace() {
  return branches.size() == 0 ? nullptr : branches[0].getInnerForTrace();
}

ArrayJoinPromiseNodeBase::Branch::Branch(
    ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependencyParam, ExceptionOrValue& output)
    : joinNode(joinNode), dependency(kj::mv(dependencyParam)), output(output) {
  dependency->setSelfPointer(&dependency);
  dependency->onReady(*this);
}

ArrayJoinPromiseNodeBase::Branch::~Branch() noexcept(false) {}

Maybe<Own<Event>> ArrayJoinPromiseNodeBase::Branch::fire() {
  if (--joinNode.countLeft == 0) {
    joinNode.onReadyEvent.arm();
  }
  return nullptr;
}

_::PromiseNode* ArrayJoinPromiseNodeBase::Branch::getInnerForTrace() {
  return dependency->getInnerForTrace();
}

Maybe<Exception> ArrayJoinPromiseNodeBase::Branch::getPart() {
  dependency->get(output);
  return kj::mv(output.exception);
}

// -------------------------------------------------------------------

890 891 892
EagerPromiseNodeBase::EagerPromiseNodeBase(
    Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef)
    : dependency(kj::mv(dependencyParam)), resultRef(resultRef) {
893
  dependency->setSelfPointer(&dependency);
Kenton Varda's avatar
Kenton Varda committed
894
  dependency->onReady(*this);
895 896
}

Kenton Varda's avatar
Kenton Varda committed
897 898
void EagerPromiseNodeBase::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
899 900
}

901 902
PromiseNode* EagerPromiseNodeBase::getInnerForTrace() {
  return dependency;
903 904
}

905
Maybe<Own<Event>> EagerPromiseNodeBase::fire() {
906 907 908 909 910
  dependency->get(resultRef);
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
    dependency = nullptr;
  })) {
    resultRef.addException(kj::mv(*exception));
911
  }
912 913 914

  onReadyEvent.arm();
  return nullptr;
915 916
}

917 918
// -------------------------------------------------------------------

Kenton Varda's avatar
Kenton Varda committed
919 920
void AdapterPromiseNodeBase::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
921 922 923
}

}  // namespace _ (private)
924
}  // namespace kj