async.c++ 25.9 KB
Newer Older
Kenton Varda's avatar
Kenton Varda committed
1 2
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
3
//
Kenton Varda's avatar
Kenton Varda committed
4 5 6 7 8 9
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
10
//
Kenton Varda's avatar
Kenton Varda committed
11 12
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
13
//
Kenton Varda's avatar
Kenton Varda committed
14 15 16 17 18 19 20
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
21 22 23

#include "async.h"
#include "debug.h"
24
#include "vector.h"
25
#include "threadlocal.h"
26
#include <exception>
27
#include <map>
28

Kenton Varda's avatar
Kenton Varda committed
29
#if KJ_USE_FUTEX
30 31 32
#include <unistd.h>
#include <sys/syscall.h>
#include <linux/futex.h>
33
#endif
34

35 36
#if !KJ_NO_RTTI
#include <typeinfo>
37 38
#if __GNUC__
#include <cxxabi.h>
39 40
#include <stdlib.h>
#endif
41 42
#endif

43 44 45 46
namespace kj {

namespace {

47
KJ_THREADLOCAL_PTR(EventLoop) threadLocalEventLoop = nullptr;
48

49
#define _kJ_ALREADY_READY reinterpret_cast< ::kj::_::Event*>(1)
50

51 52 53 54 55 56
EventLoop& currentEventLoop() {
  EventLoop* loop = threadLocalEventLoop;
  KJ_REQUIRE(loop != nullptr, "No event loop is running on this thread.");
  return *loop;
}

57
class BoolEvent: public _::Event {
58 59 60
public:
  bool fired = false;

61
  Maybe<Own<_::Event>> fire() override {
62
    fired = true;
63
    return nullptr;
64 65 66
  }
};

67 68
class YieldPromiseNode final: public _::PromiseNode {
public:
Kenton Varda's avatar
Kenton Varda committed
69
  void onReady(_::Event& event) noexcept override {
70
    event.armBreadthFirst();
71 72
  }
  void get(_::ExceptionOrValue& output) noexcept override {
Kenton Varda's avatar
Kenton Varda committed
73
    output.as<_::Void>() = _::Void();
74 75 76
  }
};

77
class NeverDonePromiseNode final: public _::PromiseNode {
78
public:
Kenton Varda's avatar
Kenton Varda committed
79 80
  void onReady(_::Event& event) noexcept override {
    // ignore
81 82 83 84 85 86
  }
  void get(_::ExceptionOrValue& output) noexcept override {
    KJ_FAIL_REQUIRE("Not ready.");
  }
};

87 88
}  // namespace

Kenton Varda's avatar
Kenton Varda committed
89 90 91 92
namespace _ {  // private

class TaskSetImpl {
public:
93 94
  inline TaskSetImpl(TaskSet::ErrorHandler& errorHandler)
    : errorHandler(errorHandler) {}
Kenton Varda's avatar
Kenton Varda committed
95 96 97

  ~TaskSetImpl() noexcept(false) {
    // std::map doesn't like it when elements' destructors throw, so carefully disassemble it.
98 99 100
    if (!tasks.empty()) {
      Vector<Own<Task>> deleteMe(tasks.size());
      for (auto& entry: tasks) {
Kenton Varda's avatar
Kenton Varda committed
101 102 103 104 105
        deleteMe.add(kj::mv(entry.second));
      }
    }
  }

106
  class Task final: public Event {
Kenton Varda's avatar
Kenton Varda committed
107
  public:
108 109
    Task(TaskSetImpl& taskSet, Own<_::PromiseNode>&& nodeParam)
        : taskSet(taskSet), node(kj::mv(nodeParam)) {
110
      node->setSelfPointer(&node);
Kenton Varda's avatar
Kenton Varda committed
111
      node->onReady(*this);
Kenton Varda's avatar
Kenton Varda committed
112 113 114
    }

  protected:
115
    Maybe<Own<Event>> fire() override {
Kenton Varda's avatar
Kenton Varda committed
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
      // Get the result.
      _::ExceptionOr<_::Void> result;
      node->get(result);

      // Delete the node, catching any exceptions.
      KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
        node = nullptr;
      })) {
        result.addException(kj::mv(*exception));
      }

      // Call the error handler if there was an exception.
      KJ_IF_MAYBE(e, result.exception) {
        taskSet.errorHandler.taskFailed(kj::mv(*e));
      }
131 132 133 134 135 136 137 138 139 140 141

      // Remove from the task map.
      auto iter = taskSet.tasks.find(this);
      KJ_ASSERT(iter != taskSet.tasks.end());
      Own<Event> self = kj::mv(iter->second);
      taskSet.tasks.erase(iter);
      return mv(self);
    }

    _::PromiseNode* getInnerForTrace() override {
      return node;
Kenton Varda's avatar
Kenton Varda committed
142 143 144
    }

  private:
145
    TaskSetImpl& taskSet;
Kenton Varda's avatar
Kenton Varda committed
146 147 148
    kj::Own<_::PromiseNode> node;
  };

149 150
  void add(Promise<void>&& promise) {
    auto task = heap<Task>(*this, kj::mv(promise.node));
Kenton Varda's avatar
Kenton Varda committed
151
    Task* ptr = task;
152 153 154 155 156 157 158 159 160
    tasks.insert(std::make_pair(ptr, kj::mv(task)));
  }

  kj::String trace() {
    kj::Vector<kj::String> traces;
    for (auto& entry: tasks) {
      traces.add(entry.second->trace());
    }
    return kj::strArray(traces, "\n============================================\n");
Kenton Varda's avatar
Kenton Varda committed
161 162 163 164 165
  }

private:
  TaskSet::ErrorHandler& errorHandler;

166
  // TODO(perf):  Use a linked list instead.
167
  std::map<Task*, Own<Task>> tasks;
Kenton Varda's avatar
Kenton Varda committed
168 169 170 171 172 173 174 175 176 177 178 179 180
};

class LoggingErrorHandler: public TaskSet::ErrorHandler {
public:
  static LoggingErrorHandler instance;

  void taskFailed(kj::Exception&& exception) override {
    KJ_LOG(ERROR, "Uncaught exception in daemonized task.", exception);
  }
};

LoggingErrorHandler LoggingErrorHandler::instance = LoggingErrorHandler();

181 182
class NullEventPort: public EventPort {
public:
183
  bool wait() override {
184 185 186
    KJ_FAIL_REQUIRE("Nothing to wait for; this thread would hang forever.");
  }

187 188 189
  bool poll() override { return false; }

  void wake() const override {
190
    // TODO(someday): Implement using condvar.
191 192 193
    kj::throwRecoverableException(KJ_EXCEPTION(UNIMPLEMENTED,
        "Cross-thread events are not yet implemented for EventLoops with no EventPort."));
  }
194 195 196 197 198 199

  static NullEventPort instance;
};

NullEventPort NullEventPort::instance = NullEventPort();

Kenton Varda's avatar
Kenton Varda committed
200 201 202 203
}  // namespace _ (private)

// =======================================================================================

204
void EventPort::setRunnable(bool runnable) {}
205

206 207 208 209 210
void EventPort::wake() const {
  kj::throwRecoverableException(KJ_EXCEPTION(UNIMPLEMENTED,
      "cross-thread wake() not implemented by this EventPort implementation"));
}

211 212
EventLoop::EventLoop()
    : port(_::NullEventPort::instance),
213
      daemons(kj::heap<_::TaskSetImpl>(_::LoggingErrorHandler::instance)) {}
214

215 216
EventLoop::EventLoop(EventPort& port)
    : port(port),
217
      daemons(kj::heap<_::TaskSetImpl>(_::LoggingErrorHandler::instance)) {}
Kenton Varda's avatar
Kenton Varda committed
218

219
EventLoop::~EventLoop() noexcept(false) {
220 221 222 223 224 225 226 227 228
  // Destroy all "daemon" tasks, noting that their destructors might try to access the EventLoop
  // some more.
  daemons = nullptr;

  // The application _should_ destroy everything using the EventLoop before destroying the
  // EventLoop itself, so if there are events on the loop, this indicates a memory leak.
  KJ_REQUIRE(head == nullptr, "EventLoop destroyed with events still in the queue.  Memory leak?",
             head->trace()) {
    // Unlink all the events and hope that no one ever fires them...
229
    _::Event* event = head;
230
    while (event != nullptr) {
231
      _::Event* next = event->next;
232 233 234 235 236 237
      event->next = nullptr;
      event->prev = nullptr;
      event = next;
    }
    break;
  }
238 239 240 241 242 243

  KJ_REQUIRE(threadLocalEventLoop != this,
             "EventLoop destroyed while still current for the thread.") {
    threadLocalEventLoop = nullptr;
    break;
  }
244
}
245

246
void EventLoop::run(uint maxTurnCount) {
247 248 249
  running = true;
  KJ_DEFER(running = false);

250 251 252 253 254
  for (uint i = 0; i < maxTurnCount; i++) {
    if (!turn()) {
      break;
    }
  }
255

256
  setRunnable(isRunnable());
257 258
}

259 260
bool EventLoop::turn() {
  _::Event* event = head;
261

262 263 264 265 266 267 268 269
  if (event == nullptr) {
    // No events in the queue.
    return false;
  } else {
    head = event->next;
    if (head != nullptr) {
      head->prev = &head;
    }
270

271 272 273 274
    depthFirstInsertPoint = &head;
    if (tail == &event->next) {
      tail = &head;
    }
275

276 277
    event->next = nullptr;
    event->prev = nullptr;
278

279 280 281 282 283 284
    Maybe<Own<_::Event>> eventToDestroy;
    {
      event->firing = true;
      KJ_DEFER(event->firing = false);
      eventToDestroy = event->fire();
    }
Kenton Varda's avatar
Kenton Varda committed
285

286 287 288 289
    depthFirstInsertPoint = &head;
    return true;
  }
}
290

291 292 293 294
bool EventLoop::isRunnable() {
  return head != nullptr;
}

295 296 297 298 299 300 301
void EventLoop::setRunnable(bool runnable) {
  if (runnable != lastRunnableState) {
    port.setRunnable(runnable);
    lastRunnableState = runnable;
  }
}

302 303 304 305 306 307 308 309 310 311 312 313 314
void EventLoop::enterScope() {
  KJ_REQUIRE(threadLocalEventLoop == nullptr, "This thread already has an EventLoop.");
  threadLocalEventLoop = this;
}

void EventLoop::leaveScope() {
  KJ_REQUIRE(threadLocalEventLoop == this,
             "WaitScope destroyed in a different thread than it was created in.") {
    break;
  }
  threadLocalEventLoop = nullptr;
}

315
namespace _ {  // private
316

317 318 319
void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope& waitScope) {
  EventLoop& loop = waitScope.loop;
  KJ_REQUIRE(&loop == threadLocalEventLoop, "WaitScope not valid for this thread.");
320
  KJ_REQUIRE(!loop.running, "wait() is not allowed from within event callbacks.");
321

322
  BoolEvent doneEvent;
323
  node->setSelfPointer(&node);
324 325 326 327 328 329 330 331 332 333
  node->onReady(doneEvent);

  loop.running = true;
  KJ_DEFER(loop.running = false);

  while (!doneEvent.fired) {
    if (!loop.turn()) {
      // No events in the queue.  Wait for callback.
      loop.port.wait();
    }
334
  }
335

336
  loop.setRunnable(loop.isRunnable());
337

338
  node->get(result);
339
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
340 341 342 343
    node = nullptr;
  })) {
    result.addException(kj::mv(*exception));
  }
344 345
}

346
Promise<void> yield() {
347 348 349
  return Promise<void>(false, kj::heap<YieldPromiseNode>());
}

350 351 352 353
Own<PromiseNode> neverDone() {
  return kj::heap<NeverDonePromiseNode>();
}

Kenton Varda's avatar
Kenton Varda committed
354
void NeverDone::wait(WaitScope& waitScope) const {
355 356 357 358 359
  ExceptionOr<Void> dummy;
  waitImpl(neverDone(), dummy, waitScope);
  KJ_UNREACHABLE;
}

360
void detach(kj::Promise<void>&& promise) {
361 362 363
  EventLoop& loop = currentEventLoop();
  KJ_REQUIRE(loop.daemons.get() != nullptr, "EventLoop is shutting down.") { return; }
  loop.daemons->add(kj::mv(promise));
364 365
}

366
Event::Event()
367
    : loop(currentEventLoop()), next(nullptr), prev(nullptr) {}
368

369
Event::~Event() noexcept(false) {
370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
  if (prev != nullptr) {
    if (loop.tail == &next) {
      loop.tail = prev;
    }
    if (loop.depthFirstInsertPoint == &next) {
      loop.depthFirstInsertPoint = prev;
    }

    *prev = next;
    if (next != nullptr) {
      next->prev = prev;
    }
  }

  KJ_REQUIRE(!firing, "Promise callback destroyed itself.");
  KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
             "Promise destroyed from a different thread than it was created in.");
Kenton Varda's avatar
Kenton Varda committed
387 388
}

389
void Event::armDepthFirst() {
390 391 392
  KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
             "Event armed from different thread than it was created in.  You must use "
             "the thread-safe work queue to queue events cross-thread.");
393

394 395 396 397 398 399 400
  if (prev == nullptr) {
    next = *loop.depthFirstInsertPoint;
    prev = loop.depthFirstInsertPoint;
    *prev = this;
    if (next != nullptr) {
      next->prev = &next;
    }
401

402 403 404 405 406
    loop.depthFirstInsertPoint = &next;

    if (loop.tail == prev) {
      loop.tail = &next;
    }
407 408

    loop.setRunnable(true);
409 410 411
  }
}

412
void Event::armBreadthFirst() {
413 414 415 416 417 418 419 420 421 422 423 424 425
  KJ_REQUIRE(threadLocalEventLoop == &loop || threadLocalEventLoop == nullptr,
             "Event armed from different thread than it was created in.  You must use "
             "the thread-safe work queue to queue events cross-thread.");

  if (prev == nullptr) {
    next = *loop.tail;
    prev = loop.tail;
    *prev = this;
    if (next != nullptr) {
      next->prev = &next;
    }

    loop.tail = &next;
426 427

    loop.setRunnable(true);
428 429 430
  }
}

431
_::PromiseNode* Event::getInnerForTrace() {
432 433 434
  return nullptr;
}

435
#if !KJ_NO_RTTI
436 437 438 439
#if __GNUC__
static kj::String demangleTypeName(const char* name) {
  int status;
  char* buf = abi::__cxa_demangle(name, nullptr, nullptr, &status);
440
  kj::String result = kj::heapString(buf == nullptr ? name : buf);
441 442 443 444 445 446 447 448
  free(buf);
  return kj::mv(result);
}
#else
static kj::String demangleTypeName(const char* name) {
  return kj::heapString(name);
}
#endif
449
#endif
450

451
static kj::String traceImpl(Event* event, _::PromiseNode* node) {
452 453 454
#if KJ_NO_RTTI
  return heapString("Trace not available because RTTI is disabled.");
#else
455 456 457 458 459 460 461 462 463 464 465 466
  kj::Vector<kj::String> trace;

  if (event != nullptr) {
    trace.add(demangleTypeName(typeid(*event).name()));
  }

  while (node != nullptr) {
    trace.add(demangleTypeName(typeid(*node).name()));
    node = node->getInnerForTrace();
  }

  return strArray(trace, "\n");
467
#endif
468 469
}

470
kj::String Event::trace() {
471
  return traceImpl(this, getInnerForTrace());
472 473
}

474 475
}  // namespace _ (private)

476 477
// =======================================================================================

478 479
TaskSet::TaskSet(ErrorHandler& errorHandler)
    : impl(heap<_::TaskSetImpl>(errorHandler)) {}
480 481 482

TaskSet::~TaskSet() noexcept(false) {}

483
void TaskSet::add(Promise<void>&& promise) {
484 485 486
  impl->add(kj::mv(promise));
}

487 488 489 490
kj::String TaskSet::trace() {
  return impl->trace();
}

491 492
namespace _ {  // private

493 494 495 496
kj::String PromiseBase::trace() {
  return traceImpl(nullptr, node);
}

497 498
void PromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {}

499
PromiseNode* PromiseNode::getInnerForTrace() { return nullptr; }
500

Kenton Varda's avatar
Kenton Varda committed
501
void PromiseNode::OnReadyEvent::init(Event& newEvent) {
502
  if (event == _kJ_ALREADY_READY) {
Kenton Varda's avatar
Kenton Varda committed
503 504 505 506
    // A new continuation was added to a promise that was already ready.  In this case, we schedule
    // breadth-first, to make it difficult for applications to accidentally starve the event loop
    // by repeatedly waiting on immediate promises.
    newEvent.armBreadthFirst();
507 508
  } else {
    event = &newEvent;
509 510 511
  }
}

512 513 514 515
void PromiseNode::OnReadyEvent::arm() {
  if (event == nullptr) {
    event = _kJ_ALREADY_READY;
  } else {
Kenton Varda's avatar
Kenton Varda committed
516 517 518
    // A promise resolved and an event is already waiting on it.  In this case, arm in depth-first
    // order so that the event runs immediately after the current one.  This way, chained promises
    // execute together for better cache locality and lower latency.
519
    event->armDepthFirst();
520 521 522
  }
}

523 524
// -------------------------------------------------------------------

525 526 527
ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
ImmediatePromiseNodeBase::~ImmediatePromiseNodeBase() noexcept(false) {}

Kenton Varda's avatar
Kenton Varda committed
528 529 530
void ImmediatePromiseNodeBase::onReady(Event& event) noexcept {
  event.armBreadthFirst();
}
531 532 533 534 535 536 537 538

ImmediateBrokenPromiseNode::ImmediateBrokenPromiseNode(Exception&& exception)
    : exception(kj::mv(exception)) {}

void ImmediateBrokenPromiseNode::get(ExceptionOrValue& output) noexcept {
  output.exception = kj::mv(exception);
}

539 540
// -------------------------------------------------------------------

541 542 543 544
AttachmentPromiseNodeBase::AttachmentPromiseNodeBase(Own<PromiseNode>&& dependencyParam)
    : dependency(kj::mv(dependencyParam)) {
  dependency->setSelfPointer(&dependency);
}
545

Kenton Varda's avatar
Kenton Varda committed
546 547
void AttachmentPromiseNodeBase::onReady(Event& event) noexcept {
  dependency->onReady(event);
548 549 550 551 552 553
}

void AttachmentPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
  dependency->get(output);
}

554 555
PromiseNode* AttachmentPromiseNodeBase::getInnerForTrace() {
  return dependency;
556 557 558 559 560 561 562 563
}

void AttachmentPromiseNodeBase::dropDependency() {
  dependency = nullptr;
}

// -------------------------------------------------------------------

Kenton Varda's avatar
Kenton Varda committed
564 565 566
TransformPromiseNodeBase::TransformPromiseNodeBase(
    Own<PromiseNode>&& dependencyParam, void* continuationTracePtr)
    : dependency(kj::mv(dependencyParam)), continuationTracePtr(continuationTracePtr) {
567 568
  dependency->setSelfPointer(&dependency);
}
569

Kenton Varda's avatar
Kenton Varda committed
570 571
void TransformPromiseNodeBase::onReady(Event& event) noexcept {
  dependency->onReady(event);
572 573 574 575 576
}

void TransformPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
    getImpl(output);
577
    dropDependency();
578 579 580 581 582
  })) {
    output.addException(kj::mv(*exception));
  }
}

583 584
PromiseNode* TransformPromiseNodeBase::getInnerForTrace() {
  return dependency;
585 586
}

587 588 589 590
void TransformPromiseNodeBase::dropDependency() {
  dependency = nullptr;
}

591 592 593 594 595 596 597
void TransformPromiseNodeBase::getDepResult(ExceptionOrValue& output) {
  dependency->get(output);
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
    dependency = nullptr;
  })) {
    output.addException(kj::mv(*exception));
  }
Kenton Varda's avatar
Kenton Varda committed
598 599 600 601

  KJ_IF_MAYBE(e, output.exception) {
    e->addTrace(continuationTracePtr);
  }
602 603
}

604 605
// -------------------------------------------------------------------

606
ForkBranchBase::ForkBranchBase(Own<ForkHubBase>&& hubParam): hub(kj::mv(hubParam)) {
607
  if (hub->tailBranch == nullptr) {
608
    onReadyEvent.arm();
609 610
  } else {
    // Insert into hub's linked list of branches.
611
    prevPtr = hub->tailBranch;
612 613
    *prevPtr = this;
    next = nullptr;
614
    hub->tailBranch = &next;
615 616 617
  }
}

Kenton Varda's avatar
Kenton Varda committed
618
ForkBranchBase::~ForkBranchBase() noexcept(false) {
619 620 621
  if (prevPtr != nullptr) {
    // Remove from hub's linked list of branches.
    *prevPtr = next;
622
    (next == nullptr ? hub->tailBranch : next->prevPtr) = prevPtr;
623 624 625 626
  }
}

void ForkBranchBase::hubReady() noexcept {
627
  onReadyEvent.arm();
628 629 630 631
}

void ForkBranchBase::releaseHub(ExceptionOrValue& output) {
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
632
    hub = nullptr;
633 634 635 636 637
  })) {
    output.addException(kj::mv(*exception));
  }
}

Kenton Varda's avatar
Kenton Varda committed
638 639
void ForkBranchBase::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
640 641
}

642 643
PromiseNode* ForkBranchBase::getInnerForTrace() {
  return hub->getInnerForTrace();
644 645 646 647
}

// -------------------------------------------------------------------

648 649
ForkHubBase::ForkHubBase(Own<PromiseNode>&& innerParam, ExceptionOrValue& resultRef)
    : inner(kj::mv(innerParam)), resultRef(resultRef) {
650
  inner->setSelfPointer(&inner);
Kenton Varda's avatar
Kenton Varda committed
651
  inner->onReady(*this);
652
}
653

654
Maybe<Own<Event>> ForkHubBase::fire() {
655 656 657 658 659 660 661
  // Dependency is ready.  Fetch its result and then delete the node.
  inner->get(resultRef);
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
    inner = nullptr;
  })) {
    resultRef.addException(kj::mv(*exception));
  }
662

663
  for (auto branch = headBranch; branch != nullptr; branch = branch->next) {
664 665 666
    branch->hubReady();
    *branch->prevPtr = nullptr;
    branch->prevPtr = nullptr;
667
  }
668
  *tailBranch = nullptr;
669 670

  // Indicate that the list is no longer active.
671
  tailBranch = nullptr;
672 673 674 675 676 677

  return nullptr;
}

_::PromiseNode* ForkHubBase::getInnerForTrace() {
  return inner;
678 679 680 681
}

// -------------------------------------------------------------------

682 683
ChainPromiseNode::ChainPromiseNode(Own<PromiseNode> innerParam)
    : state(STEP1), inner(kj::mv(innerParam)) {
684
  inner->setSelfPointer(&inner);
Kenton Varda's avatar
Kenton Varda committed
685
  inner->onReady(*this);
686 687
}

688
ChainPromiseNode::~ChainPromiseNode() noexcept(false) {}
689

Kenton Varda's avatar
Kenton Varda committed
690
void ChainPromiseNode::onReady(Event& event) noexcept {
691 692
  switch (state) {
    case STEP1:
693
      KJ_REQUIRE(onReadyEvent == nullptr, "onReady() can only be called once.");
694
      onReadyEvent = &event;
Kenton Varda's avatar
Kenton Varda committed
695
      return;
696
    case STEP2:
Kenton Varda's avatar
Kenton Varda committed
697 698
      inner->onReady(event);
      return;
699 700 701 702
  }
  KJ_UNREACHABLE;
}

703 704 705 706 707 708 709 710 711
void ChainPromiseNode::setSelfPointer(Own<PromiseNode>* selfPtr) noexcept {
  if (state == STEP2) {
    *selfPtr = kj::mv(inner);  // deletes this!
    selfPtr->get()->setSelfPointer(selfPtr);
  } else {
    this->selfPtr = selfPtr;
  }
}

712
void ChainPromiseNode::get(ExceptionOrValue& output) noexcept {
713
  KJ_REQUIRE(state == STEP2);
714 715 716
  return inner->get(output);
}

717 718
PromiseNode* ChainPromiseNode::getInnerForTrace() {
  return inner;
719 720
}

721
Maybe<Own<Event>> ChainPromiseNode::fire() {
722
  KJ_REQUIRE(state != STEP2);
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737

  static_assert(sizeof(Promise<int>) == sizeof(PromiseBase),
      "This code assumes Promise<T> does not add any new members to PromiseBase.");

  ExceptionOr<PromiseBase> intermediate;
  inner->get(intermediate);

  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
    inner = nullptr;
  })) {
    intermediate.addException(kj::mv(*exception));
  }

  KJ_IF_MAYBE(exception, intermediate.exception) {
    // There is an exception.  If there is also a value, delete it.
738
    kj::runCatchingExceptions([&]() { intermediate.value = nullptr; });
739 740 741 742 743 744 745 746 747
    // Now set step2 to a rejected promise.
    inner = heap<ImmediateBrokenPromiseNode>(kj::mv(*exception));
  } else KJ_IF_MAYBE(value, intermediate.value) {
    // There is a value and no exception.  The value is itself a promise.  Adopt it as our
    // step2.
    inner = kj::mv(value->node);
  } else {
    // We can only get here if inner->get() returned neither an exception nor a
    // value, which never actually happens.
748
    KJ_FAIL_ASSERT("Inner node returned empty value.");
749 750 751
  }
  state = STEP2;

752 753 754 755 756 757 758 759
  if (selfPtr != nullptr) {
    // Hey, we can shorten the chain here.
    auto chain = selfPtr->downcast<ChainPromiseNode>();
    *selfPtr = kj::mv(inner);
    selfPtr->get()->setSelfPointer(selfPtr);
    if (onReadyEvent != nullptr) {
      selfPtr->get()->onReady(*onReadyEvent);
    }
760

761 762 763 764 765 766 767 768 769 770
    // Return our self-pointer so that the caller takes care of deleting it.
    return Own<Event>(kj::mv(chain));
  } else {
    inner->setSelfPointer(&inner);
    if (onReadyEvent != nullptr) {
      inner->onReady(*onReadyEvent);
    }

    return nullptr;
  }
771 772
}

773 774
// -------------------------------------------------------------------

775 776
ExclusiveJoinPromiseNode::ExclusiveJoinPromiseNode(Own<PromiseNode> left, Own<PromiseNode> right)
    : left(*this, kj::mv(left)), right(*this, kj::mv(right)) {}
777 778 779

ExclusiveJoinPromiseNode::~ExclusiveJoinPromiseNode() noexcept(false) {}

Kenton Varda's avatar
Kenton Varda committed
780 781
void ExclusiveJoinPromiseNode::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
782 783 784
}

void ExclusiveJoinPromiseNode::get(ExceptionOrValue& output) noexcept {
785
  KJ_REQUIRE(left.get(output) || right.get(output), "get() called before ready.");
786 787
}

788 789 790 791 792 793
PromiseNode* ExclusiveJoinPromiseNode::getInnerForTrace() {
  auto result = left.getInnerForTrace();
  if (result == nullptr) {
    result = right.getInnerForTrace();
  }
  return result;
794 795 796
}

ExclusiveJoinPromiseNode::Branch::Branch(
797 798
    ExclusiveJoinPromiseNode& joinNode, Own<PromiseNode> dependencyParam)
    : joinNode(joinNode), dependency(kj::mv(dependencyParam)) {
799
  dependency->setSelfPointer(&dependency);
Kenton Varda's avatar
Kenton Varda committed
800
  dependency->onReady(*this);
801 802
}

803
ExclusiveJoinPromiseNode::Branch::~Branch() noexcept(false) {}
804 805

bool ExclusiveJoinPromiseNode::Branch::get(ExceptionOrValue& output) {
806
  if (dependency) {
807 808 809 810 811 812 813
    dependency->get(output);
    return true;
  } else {
    return false;
  }
}

814
Maybe<Own<Event>> ExclusiveJoinPromiseNode::Branch::fire() {
815 816 817
  // Cancel the branch that didn't return first.  Ignore exceptions caused by cancellation.
  if (this == &joinNode.left) {
    kj::runCatchingExceptions([&]() { joinNode.right.dependency = nullptr; });
818
  } else {
819 820
    kj::runCatchingExceptions([&]() { joinNode.left.dependency = nullptr; });
  }
821

822 823 824
  joinNode.onReadyEvent.arm();
  return nullptr;
}
825

826 827
PromiseNode* ExclusiveJoinPromiseNode::Branch::getInnerForTrace() {
  return dependency;
828 829 830 831
}

// -------------------------------------------------------------------

832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896
ArrayJoinPromiseNodeBase::ArrayJoinPromiseNodeBase(
    Array<Own<PromiseNode>> promises, ExceptionOrValue* resultParts, size_t partSize)
    : countLeft(promises.size()) {
  // Make the branches.
  auto builder = heapArrayBuilder<Branch>(promises.size());
  for (uint i: indices(promises)) {
    ExceptionOrValue& output = *reinterpret_cast<ExceptionOrValue*>(
        reinterpret_cast<byte*>(resultParts) + i * partSize);
    builder.add(*this, kj::mv(promises[i]), output);
  }
  branches = builder.finish();

  if (branches.size() == 0) {
    onReadyEvent.arm();
  }
}
ArrayJoinPromiseNodeBase::~ArrayJoinPromiseNodeBase() noexcept(false) {}

void ArrayJoinPromiseNodeBase::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
}

void ArrayJoinPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
  // If any of the elements threw exceptions, propagate them.
  for (auto& branch: branches) {
    KJ_IF_MAYBE(exception, branch.getPart()) {
      output.addException(kj::mv(*exception));
    }
  }

  if (output.exception == nullptr) {
    // No errors.  The template subclass will need to fill in the result.
    getNoError(output);
  }
}

PromiseNode* ArrayJoinPromiseNodeBase::getInnerForTrace() {
  return branches.size() == 0 ? nullptr : branches[0].getInnerForTrace();
}

ArrayJoinPromiseNodeBase::Branch::Branch(
    ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependencyParam, ExceptionOrValue& output)
    : joinNode(joinNode), dependency(kj::mv(dependencyParam)), output(output) {
  dependency->setSelfPointer(&dependency);
  dependency->onReady(*this);
}

ArrayJoinPromiseNodeBase::Branch::~Branch() noexcept(false) {}

Maybe<Own<Event>> ArrayJoinPromiseNodeBase::Branch::fire() {
  if (--joinNode.countLeft == 0) {
    joinNode.onReadyEvent.arm();
  }
  return nullptr;
}

_::PromiseNode* ArrayJoinPromiseNodeBase::Branch::getInnerForTrace() {
  return dependency->getInnerForTrace();
}

Maybe<Exception> ArrayJoinPromiseNodeBase::Branch::getPart() {
  dependency->get(output);
  return kj::mv(output.exception);
}

Kenton Varda's avatar
Kenton Varda committed
897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917
ArrayJoinPromiseNode<void>::ArrayJoinPromiseNode(
    Array<Own<PromiseNode>> promises, Array<ExceptionOr<_::Void>> resultParts)
    : ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<_::Void>)),
      resultParts(kj::mv(resultParts)) {}

ArrayJoinPromiseNode<void>::~ArrayJoinPromiseNode() {}

void ArrayJoinPromiseNode<void>::getNoError(ExceptionOrValue& output) noexcept {
  output.as<_::Void>() = _::Void();
}

}  // namespace _ (private)

Promise<void> joinPromises(Array<Promise<void>>&& promises) {
  return Promise<void>(false, kj::heap<_::ArrayJoinPromiseNode<void>>(
      KJ_MAP(p, promises) { return kj::mv(p.node); },
      heapArray<_::ExceptionOr<_::Void>>(promises.size())));
}

namespace _ {  // (private)

918 919
// -------------------------------------------------------------------

920 921 922
EagerPromiseNodeBase::EagerPromiseNodeBase(
    Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef)
    : dependency(kj::mv(dependencyParam)), resultRef(resultRef) {
923
  dependency->setSelfPointer(&dependency);
Kenton Varda's avatar
Kenton Varda committed
924
  dependency->onReady(*this);
925 926
}

Kenton Varda's avatar
Kenton Varda committed
927 928
void EagerPromiseNodeBase::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
929 930
}

931 932
PromiseNode* EagerPromiseNodeBase::getInnerForTrace() {
  return dependency;
933 934
}

935
Maybe<Own<Event>> EagerPromiseNodeBase::fire() {
936 937 938 939 940
  dependency->get(resultRef);
  KJ_IF_MAYBE(exception, kj::runCatchingExceptions([this]() {
    dependency = nullptr;
  })) {
    resultRef.addException(kj::mv(*exception));
941
  }
942 943 944

  onReadyEvent.arm();
  return nullptr;
945 946
}

947 948
// -------------------------------------------------------------------

Kenton Varda's avatar
Kenton Varda committed
949 950
void AdapterPromiseNodeBase::onReady(Event& event) noexcept {
  onReadyEvent.init(event);
951 952
}

Kenton Varda's avatar
Kenton Varda committed
953 954 955 956
// -------------------------------------------------------------------

Promise<void> IdentityFunc<Promise<void>>::operator()() const { return READY_NOW; }

957
}  // namespace _ (private)
958
}  // namespace kj