Commit aeb918c7 authored by Kenton Varda's avatar Kenton Varda

Add a sample RPC application, and fix some bugs / add some missing features needed by said app.

parent 70f59e43
......@@ -16,6 +16,7 @@
# Ekam build artifacts.
#include "calculator.capnp.h"
#include <capnp/ez-rpc.h>
#include <kj/debug.h>
#include <math.h>
#include <iostream>
class PowerFunction final: public Calculator::Function::Server {
// An implementation of the Function interface wrapping pow(). Note that
// we're implementing this on the client side and will pass a reference to
// the server. The server will then be able to make calls back to the client.
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == 2, "Wrong number of parameters.");
context.getResults().setValue(pow(params[0], params[1]));
return kj::READY_NOW;
int main(int argc, const char* argv[]) {
if (argc != 2) {
std::cerr << "usage: " << argv[0] << " HOST:PORT\n"
"Connects to the Calculator server at the given address and "
"does some RPCs." << std::endl;
return 1;
capnp::EzRpcClient client(argv[1]);
Calculator::Client calculator = client.importCap<Calculator>("calculator");
// Keep an eye on `waitScope`. Whenever you see it used is a place where we
// stop and wait for the server to respond. If a line of code does not use
// `waitScope`, then it does not block!
auto& waitScope = client.getWaitScope();
// Make a request that just evaluates the literal value 123.
// What's interesting here is that evaluate() returns a "Value", which is
// another interface and therefore points back to an object living on the
// server. We then have to call read() on that object to read it.
// However, even though we are making two RPC's, this block executes in
// *one* network round trip because of promise pipelining: we do not wait
// for the first call to complete before we send the second call to the
// server.
std::cout << "Evaluating a literal... ";
// Set up the request.
auto request = calculator.evaluateRequest();
// Send it, which returns a promise for the result (without blocking).
auto evalPromise = request.send();
// Using the promise, create a pipelined request to call read() on the
// returned object, and then send that.
auto readPromise = evalPromise.getValue().readRequest().send();
// Now that we've sent all the requests, wait for the response. Until this
// point, we haven't waited at all!
auto response = readPromise.wait(waitScope);
KJ_ASSERT(response.getValue() == 123);
std::cout << "PASS" << std::endl;
// Make a request to evaluate 123 + 45 - 67.
// The Calculator interface requires that we first call getOperator() to
// get the addition and subtraction functions, then call evaluate() to use
// them. But, once again, we can get both functions, call evaluate(), and
// then read() the result -- four RPCs -- in the time of *one* network
// round trip, because of promise pipelining.
std::cout << "Using add and subtract... ";
Calculator::Function::Client add = nullptr;
Calculator::Function::Client subtract = nullptr;
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
add = request.send().getFunc();
// Get the "subtract" function from the server.
auto request = calculator.getOperatorRequest();
subtract = request.send().getFunc();
// Build the request to evaluate 123 + 45 - 67.
auto request = calculator.evaluateRequest();
auto subtractCall = request.getExpression().initCall();
auto subtractParams = subtractCall.initParams(2);
auto addCall = subtractParams[0].initCall();
auto addParams = addCall.initParams(2);
// Send the evaluate() request, read() the result, and wait for read() to
// finish.
auto evalPromise = request.send();
auto readPromise = evalPromise.getValue().readRequest().send();
auto response = readPromise.wait(waitScope);
KJ_ASSERT(response.getValue() == 101);
std::cout << "PASS" << std::endl;
// Make a request to evaluate 4 * 6, then use the result in two more
// requests that add 3 and 5.
// Since evaluate() returns its result wrapped in a `Value`, we can pass
// that `Value` back to the server in subsequent requests before the first
// `evaluate()` has actually returned. Thus, this example again does only
// one network round trip.
std::cout << "Pipelining eval() calls... ";
Calculator::Function::Client add = nullptr;
Calculator::Function::Client multiply = nullptr;
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
add = request.send().getFunc();
// Get the "multiply" function from the server.
auto request = calculator.getOperatorRequest();
multiply = request.send().getFunc();
// Build the request to evaluate 4 * 6
auto request = calculator.evaluateRequest();
auto multiplyCall = request.getExpression().initCall();
auto multiplyParams = multiplyCall.initParams(2);
auto multiplyResult = request.send().getValue();
// Use the result in two calls that add 3 and add 5.
auto add3Request = calculator.evaluateRequest();
auto add3Call = add3Request.getExpression().initCall();
auto add3Params = add3Call.initParams(2);
auto add3Promise = add3Request.send().getValue().readRequest().send();
auto add5Request = calculator.evaluateRequest();
auto add5Call = add5Request.getExpression().initCall();
auto add5Params = add5Call.initParams(2);
auto add5Promise = add5Request.send().getValue().readRequest().send();
// Now wait for the results.
KJ_ASSERT(add3Promise.wait(waitScope).getValue() == 27);
KJ_ASSERT(add5Promise.wait(waitScope).getValue() == 29);
std::cout << "PASS" << std::endl;
// Our calculator interface supports defining functions. Here we use it
// to define two functions and then make calls to them as follows:
// f(x, y) = x * 100 + y
// g(x) = f(x, x + 1) * 2;
// f(12, 34)
// g(21)
// Once again, the whole thing takes only one network round trip.
std::cout << "Defining functions... ";
Calculator::Function::Client add = nullptr;
Calculator::Function::Client multiply = nullptr;
Calculator::Function::Client f = nullptr;
Calculator::Function::Client g = nullptr;
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
add = request.send().getFunc();
// Get the "multiply" function from the server.
auto request = calculator.getOperatorRequest();
multiply = request.send().getFunc();
// Define f.
auto request = calculator.defFunctionRequest();
// Build the function body.
auto addCall = request.getBody().initCall();
auto addParams = addCall.initParams(2);
addParams[1].setParameter(1); // y
auto multiplyCall = addParams[0].initCall();
auto multiplyParams = multiplyCall.initParams(2);
multiplyParams[0].setParameter(0); // x
f = request.send().getFunc();
// Define g.
auto request = calculator.defFunctionRequest();
// Build the function body.
auto multiplyCall = request.getBody().initCall();
auto multiplyParams = multiplyCall.initParams(2);
auto fCall = multiplyParams[0].initCall();
auto fParams = fCall.initParams(2);
auto addCall = fParams[1].initCall();
auto addParams = addCall.initParams(2);
g = request.send().getFunc();
// OK, we've defined all our functions. Now create our eval requests.
// f(12, 34)
auto fEvalRequest = calculator.evaluateRequest();
auto fCall = fEvalRequest.initExpression().initCall();
auto fParams = fCall.initParams(2);
auto fEvalPromise = fEvalRequest.send().getValue().readRequest().send();
// g(21)
auto gEvalRequest = calculator.evaluateRequest();
auto gCall = gEvalRequest.initExpression().initCall();
auto gEvalPromise = gEvalRequest.send().getValue().readRequest().send();
// Wait for the results.
KJ_ASSERT(fEvalPromise.wait(waitScope).getValue() == 1234);
KJ_ASSERT(gEvalPromise.wait(waitScope).getValue() == 4244);
std::cout << "PASS" << std::endl;
// Make a request that will call back to a function defined locally.
// Specifically, we will compute 2^(4 + 5). However, exponent is not
// defined by the Calculator server. So, we'll implement the Function
// interface locally and pass it to the server for it to use when
// evaluating the expression.
// This example requires two network round trips to complete, because the
// server calls back to the client once before finishing. In this
// particular case, this could potentially be optimized by using a tail
// call on the server side -- see CallContext::tailCall(). However, to
// keep the example simpler, we haven't implemented this optimization in
// the sample server.
std::cout << "Using a callback... ";
Calculator::Function::Client add = nullptr;
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
add = request.send().getFunc();
// Build the eval request for 2^(4+5).
auto request = calculator.evaluateRequest();
auto powCall = request.getExpression().initCall();
auto powParams = powCall.initParams(2);
auto addCall = powParams[1].initCall();
auto addParams = addCall.initParams(2);
// Send the request and wait.
auto response = request.send().getValue().readRequest()
KJ_ASSERT(response.getValue() == 512);
std::cout << "PASS" << std::endl;
return 0;
#include "calculator.capnp.h"
#include <kj/debug.h>
#include <capnp/ez-rpc.h>
#include <capnp/capability-context.h> // for LocalMessage
#include <iostream>
typedef unsigned int uint;
kj::Promise<double> readValue(Calculator::Value::Client value) {
// Helper function to asynchronously call read() on a Calculator::Value and
// return a promise for the result. (In the future, the generated code might
// include something like this automatically.)
return value.readRequest().send()
.then([](capnp::Response<Calculator::Value::ReadResults> result) {
return result.getValue();
kj::Promise<double> evaluateImpl(
Calculator::Expression::Reader expression,
capnp::List<double>::Reader params = capnp::List<double>::Reader()) {
// Implementation of CalculatorImpl::evaluate(), also shared by
// FunctionImpl::call(). In the latter case, `params` are the parameter
// values passed to the function; in the former case, `params` is just an
// empty list.
switch (expression.which()) {
case Calculator::Expression::LITERAL:
return expression.getLiteral();
case Calculator::Expression::PREVIOUS_RESULT:
return readValue(expression.getPreviousResult());
case Calculator::Expression::PARAMETER: {
KJ_REQUIRE(expression.getParameter() < params.size(),
"Parameter index out-of-range.");
return params[expression.getParameter()];
case Calculator::Expression::CALL: {
auto call = expression.getCall();
auto func = call.getFunction();
// Evaluate each parameter.
kj::Array<kj::Promise<double>> paramPromises =
KJ_MAP(param, call.getParams()) {
return evaluateImpl(param, params);
// Join the array of promises into a promise for an array.
kj::Promise<kj::Array<double>> joinedParams =
// When the parameters are complete, call the function.
return joinedParams.then([func](kj::Array<double>&& paramValues) mutable {
auto request = func.callRequest();
return request.send().then(
[](capnp::Response<Calculator::Function::CallResults>&& result) {
return result.getValue();
// Throw an exception.
KJ_FAIL_REQUIRE("Unknown expression type.");
class ValueImpl final: public Calculator::Value::Server {
// Simple implementation of the Calculator.Value Cap'n Proto interface.
ValueImpl(double value): value(value) {}
kj::Promise<void> read(ReadContext context) {
return kj::READY_NOW;
double value;
class FunctionImpl final: public Calculator::Function::Server {
// Implementation of the Calculator.Function Cap'n Proto interface, where the
// function is defined by a Calculator.Expression.
FunctionImpl(uint paramCount, Calculator::Expression::Reader body)
: paramCount(paramCount), body(body) {}
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == paramCount, "Wrong number of parameters.");
return evaluateImpl(body.getRoot().getAs<Calculator::Expression>(), params)
.then([context](double value) mutable {
uint paramCount;
// The function's arity.
capnp::LocalMessage body;
// LocalMessage holds a message that might contain capabilities (interface
// references). Here we're using it to hold a Calculator.Expression, which
// might contain Calculator.Function and/or Calculator.Value capabilities.
class OperatorImpl final: public Calculator::Function::Server {
// Implementation of the Calculator.Function Cap'n Proto interface, wrapping
// basic binary arithmetic operators.
OperatorImpl(Calculator::Operator op): op(op) {}
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == 2, "Wrong number of parameters.");
double result;
switch (op) {
case Calculator::Operator::ADD: result = params[0] + params[1]; break;
case Calculator::Operator::SUBTRACT:result = params[0] - params[1]; break;
case Calculator::Operator::MULTIPLY:result = params[0] * params[1]; break;
case Calculator::Operator::DIVIDE: result = params[0] / params[1]; break;
KJ_FAIL_REQUIRE("Unknown operator.");
return kj::READY_NOW;
Calculator::Operator op;
class CalculatorImpl final: public Calculator::Server {
// Implementation of the Calculator Cap'n Proto interface.
kj::Promise<void> evaluate(EvaluateContext context) override {
return evaluateImpl(context.getParams().getExpression())
.then([context](double value) mutable {
kj::Promise<void> defFunction(DefFunctionContext context) override {
auto params = context.getParams();
params.getParamCount(), params.getBody()));
return kj::READY_NOW;
kj::Promise<void> getOperator(GetOperatorContext context) override {
return kj::READY_NOW;
int main(int argc, const char* argv[]) {
if (argc != 2) {
std::cerr << "usage: " << argv[0] << " ADDRESS[:PORT]\n"
"Runs the server bound to the given address/port.\n"
"ADDRESS may be '*' to bind to all local addresses.\n"
":PORT may be omitted to choose a port automatically." << std::endl;
return 1;
// Set up a server.
capnp::EzRpcServer server(argv[1]);
server.exportCap("calculator", kj::heap<CalculatorImpl>());
// Write the port number to stdout, in case it was chosen automatically.
auto& waitScope = server.getWaitScope();
uint port = server.getPort().wait(waitScope);
if (port == 0) {
// The address format "unix:/path/to/socket" opens a unix domain socket,
// in which case the port will be zero.
std::cout << "Listening on Unix socket..." << std::endl;
} else {
std::cout << "Listening on port " << port << "..." << std::endl;
// Run forever, accepting connections and handling requests.
interface Calculator {
# A "simple" mathematical calculator, callable via RPC.
# But, to show off Cap'n Proto, we add some twists:
# - You can use the result from one call as the input to the next
# without a network round trip. To accomplish this, evaluate()
# returns a `Value` object wrapping the actual numeric value.
# This object may be used in a subsequent expression. With
# promise pipelining, the Value can actually be used before
# the evaluate() call that creates it returns!
# - You can define new functions, and then call them. This again
# shows off pipelining, but it also gives the client the
# opportunity to define a function on the client side and have
# the server call back to it.
# - The basic arithmetic operators are exposed as Functions, and
# you have to call getOperator() to obtain them from the server.
# This again demonstrates pipelining -- using getOperator() to
# get each operator and then using them in evaluate() still
# only takes one network round trip.
evaluate @0 (expression: Expression) -> (value: Value);
# Evaluate the given expression and return the result. The
# result is returned wrapped in a Value interface so that you
# may pass it back to the server in a pipelined request. To
# actually get the numeric value, you must call read() on the
# Value -- but again, this can be pipelined so that it incurs
# no additional latency.
struct Expression {
# A numeric expression.
union {
literal @0 :Float64;
# A literal numeric value.
previousResult @1 :Value;
# A value that was (or, will be) returned by a previous
# evaluate().
parameter @2 :UInt32;
# A parameter to the function (only valid in function bodies;
# see defFunction).
call :group {
# Call a function on a list of parameters.
function @3 :Function;
params @4 :List(Expression);
interface Value {
# Wraps a numeric value in an RPC object. This allows the value
# to be used in subsequent evaluate() requests without the client
# waiting for the evaluate() that returns the Value to finish.
read @0 () -> (value :Float64);
# Read back the raw numeric value.
defFunction @1 (paramCount :Int32, body :Expression)
-> (func :Function);
# Define a function that takes `paramCount` parameters and returns the
# evaluation of `body` after substituting these parameters.
interface Function {
# An algebraic function. Can be called directly, or can be used inside
# an Expression.
# A client can create a Function that runs on the server side using
# `defFunction()` or `getOperator()`. Alternatively, a client can
# implement a Function on the client side and the server will call back
# to it. However, a function defined on the client side will require a
# network round trip whenever the server needs to call it, whereas
# functions defined on the server and then passed back to it are called
# locally.
call @0 (params :List(Float64)) -> (value: Float64);
# Call the function on the given parameters.
getOperator @2 (op: Operator) -> (func: Function);
# Get a Function representing an arithmetic operator, which can then be
# used in Expressions.
enum Operator {
add @0;
subtract @1;
multiply @2;
divide @3;
......@@ -11,3 +11,15 @@ c++ -std=c++11 -Wall addressbook.c++ addressbook.capnp.c++ -lcapnp -lkj -pthread
./addressbook dwrite | ./addressbook dread
rm addressbook addressbook.capnp.c++ addressbook.capnp.h
capnpc -oc++ calculator.capnp
c++ -std=c++11 -Wall calculator-client.c++ calculator.capnp.c++ -lcapnp-rpc -lcapnp -lkj-async \
-lkj -pthread -o calculator-client
c++ -std=c++11 -Wall calculator-server.c++ calculator.capnp.c++ -lcapnp-rpc -lcapnp -lkj-async \
-lkj -pthread -o calculator-server
rm -f /tmp/capnp-calculator-example-$$
./calculator-server unix:/tmp/capnp-calculator-example-$$ &
sleep 0.1
./calculator-client unix:/tmp/capnp-calculator-example-$$
kill %+
wait %+ || true
rm calculator-client calculator-server calculator.capnp.c++ calculator.capnp.h /tmp/capnp-calculator-example-$$
......@@ -105,7 +105,12 @@ class LocalMessage final {
// know how to properly serialize its capabilities.
LocalMessage(kj::Maybe<MessageSize> sizeHint = nullptr);
explicit LocalMessage(kj::Maybe<MessageSize> sizeHint = nullptr);
template <typename T, typename = FromReader<T>>
inline LocalMessage(T&& reader): LocalMessage(reader.totalSize()) {
// Create a LocalMessage that is a copy of a given reader.
inline AnyPointer::Builder getRoot() { return root; }
inline AnyPointer::Reader getRootReader() const { return root.asReader(); }
......@@ -51,7 +51,7 @@ if test "$INTERCEPTOR" = ""; then
exit 1
echo findProvider file:capnp
echo findProvider file:compiler/capnp
read CAPNP
if test "$CAPNP" = ""; then
......@@ -944,7 +944,7 @@ private:
" inline void set", titleCase, "(", type, "::Reader value);\n",
kind == FieldKind::LIST && !isStructOrCapList
? kj::strTree(
" inline void set", titleCase, "(std::initializer_list<", elementReaderType, "> value);\n")
" inline void set", titleCase, "(::kj::ArrayPtr<const ", elementReaderType, "> value);\n")
: kj::strTree(),
kind == FieldKind::STRUCT
? kj::strTree(
......@@ -994,7 +994,7 @@ private:
kind == FieldKind::LIST && !isStructOrCapList
? kj::strTree(
"inline void ", scope, "Builder::set", titleCase, "(std::initializer_list<", elementReaderType, "> value) {\n",
"inline void ", scope, "Builder::set", titleCase, "(::kj::ArrayPtr<const ", elementReaderType, "> value) {\n",
" ::capnp::_::PointerHelpers<", type, ">::set(\n"
" _builder.getPointerField(", offset, " * ::capnp::POINTERS), value);\n"
......@@ -1403,12 +1403,12 @@ DynamicValue::Reader::Reader(const Reader& other) {
case ENUM:
case STRUCT:
static_assert(__has_trivial_copy(Text::Reader) &&
__has_trivial_copy(Data::Reader) &&
__has_trivial_copy(DynamicList::Reader) &&
__has_trivial_copy(DynamicEnum) &&
__has_trivial_copy(DynamicStruct::Reader) &&
static_assert(kj::canMemcpy<Text::Reader>() &&
kj::canMemcpy<Data::Reader>() &&
kj::canMemcpy<DynamicList::Reader>() &&
kj::canMemcpy<DynamicEnum>() &&
kj::canMemcpy<DynamicStruct::Reader>() &&
"Assumptions here don't hold.");
......@@ -1434,12 +1434,12 @@ DynamicValue::Reader::Reader(Reader&& other) noexcept {
case ENUM:
case STRUCT:
static_assert(__has_trivial_copy(Text::Reader) &&
__has_trivial_copy(Data::Reader) &&
__has_trivial_copy(DynamicList::Reader) &&
__has_trivial_copy(DynamicEnum) &&
__has_trivial_copy(DynamicStruct::Reader) &&
static_assert(kj::canMemcpy<Text::Reader>() &&
kj::canMemcpy<Data::Reader>() &&
kj::canMemcpy<DynamicList::Reader>() &&
kj::canMemcpy<DynamicEnum>() &&
kj::canMemcpy<DynamicStruct::Reader>() &&
"Assumptions here don't hold.");
......@@ -1486,7 +1486,7 @@ DynamicValue::Builder::Builder(Builder& other) {
case ENUM:
case STRUCT:
// Unfortunately __has_trivial_copy doesn't work on these types due to the use of
// Unfortunately canMemcpy() doesn't work on these types due to the use of
// DisallowConstCopy, but __has_trivial_destructor should detect if any of these types
// become non-trivial.
static_assert(__has_trivial_destructor(Text::Builder) &&
......@@ -1830,7 +1830,12 @@ struct WireHelpers {
SegmentReader* segment, const WirePointer* ref, int nestingLimit)) {
kj::Maybe<kj::Own<ClientHook>> maybeCap;
if (ref->isNull()) {
if (segment == nullptr) {
// No capability context for unchecked messages.
// TODO(now): This means a capability read from an omitted (and therefore default-valued)
// sub-message will throw a fatal exception.
maybeCap = nullptr;
} else if (ref->isNull()) {
maybeCap = segment->getArena()->newBrokenCap("Calling null capability pointer.");
} else if (!ref->isCapability()) {
......@@ -77,8 +77,7 @@ struct PointerHelpers<List<T>, Kind::LIST> {
static inline void set(PointerBuilder builder, typename List<T>::Reader value) {
template <typename U>
static void set(PointerBuilder builder, std::initializer_list<U> value) {
static void set(PointerBuilder builder, kj::ArrayPtr<const ReaderFor<T>> value) {
auto l = init(builder, value.size());
uint i = 0;
for (auto& element: value) {
......@@ -723,9 +723,9 @@ private:
[this](kj::Own<ClientHook>&& resolution) {
resolve(kj::mv(resolution), false);
}, [this](kj::Exception&& exception) {
resolve(newBrokenCap(kj::mv(exception)), true);
}).eagerlyEvaluate([&](kj::Exception&& e) {
// Make any exceptions thrown from resolve() go to the connection's TaskSet which
// will cause the connection to be terminated.
......@@ -810,8 +810,8 @@ private:
bool receivedCall = false;
void resolve(kj::Own<ClientHook> replacement) {
if (replacement->getBrand() != connectionState.get() && receivedCall) {
void resolve(kj::Own<ClientHook> replacement, bool isError) {
if (replacement->getBrand() != connectionState.get() && receivedCall && !isError) {
// The new capability is hosted locally, not on the remote machine. And, we had made calls
// to the promise. We need to make sure those calls echo back to us before we allow new
// calls to go directly to the local capability, so we need to set a local embargo and send
......@@ -570,8 +570,7 @@ T* HeapArrayDisposer::allocateUninitialized(size_t count) {
return Allocate_<T, true, true>::allocate(0, count);
template <typename Element, typename Iterator,
bool trivial = __has_trivial_copy(Element) && __has_trivial_assign(Element)>
template <typename Element, typename Iterator, bool = canMemcpy<Element>()>
struct CopyConstructArray_;
template <typename T>
......@@ -172,6 +172,9 @@ protected:
class ImmediatePromiseNodeBase: public PromiseNode {
~ImmediatePromiseNodeBase() noexcept(false);
void onReady(Event& event) noexcept override;
......@@ -480,6 +483,70 @@ private:
// -------------------------------------------------------------------
class ArrayJoinPromiseNodeBase: public PromiseNode {
ArrayJoinPromiseNodeBase(Array<Own<PromiseNode>> promises,
ExceptionOrValue* resultParts, size_t partSize);
~ArrayJoinPromiseNodeBase() noexcept(false);
void onReady(Event& event) noexcept override final;
void get(ExceptionOrValue& output) noexcept override final;
PromiseNode* getInnerForTrace() override final;
virtual void getNoError(ExceptionOrValue& output) noexcept = 0;
// Called to compile the result only in the case where there were no errors.
uint countLeft;
OnReadyEvent onReadyEvent;
class Branch: public Event {
Branch(ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependency,
ExceptionOrValue& output);
~Branch() noexcept(false);
Maybe<Own<Event>> fire() override;
_::PromiseNode* getInnerForTrace() override;
Maybe<Exception> getPart();
// Calls dependency->get(output). If there was an exception, return it.
ArrayJoinPromiseNodeBase& joinNode;
Own<PromiseNode> dependency;
ExceptionOrValue& output;
Array<Branch> branches;
template <typename T>
class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase {
ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises,
Array<ExceptionOr<T>> resultParts)
: ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>)),
resultParts(kj::mv(resultParts)) {}
void getNoError(ExceptionOrValue& output) noexcept override {
auto builder = heapArrayBuilder<T>(resultParts.size());
for (auto& part: resultParts) {
KJ_IASSERT(part.value != nullptr,
"Bug in KJ promise framework: Promise result had neither value no exception.");
}<Array<T>>() = builder.finish();
Array<ExceptionOr<T>> resultParts;
// -------------------------------------------------------------------
class EagerPromiseNodeBase: public PromiseNode, protected Event {
// A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly
// evaluate it.
......@@ -682,6 +749,13 @@ void Promise<void>::detach(ErrorFunc&& errorHandler) {
return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises) {
return Promise<Array<T>>(false, kj::heap<_::ArrayJoinPromiseNode<T>>(
KJ_MAP(p, promises) { return kj::mv(p.node); },
// =======================================================================================
namespace _ { // private
......@@ -34,6 +34,7 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stddef.h>
#include <stdlib.h>
#include <arpa/inet.h>
......@@ -294,11 +295,26 @@ public:
int socket(int type) const {
bool isStream = type == SOCK_STREAM;
int result;
#if __linux__
KJ_SYSCALL(result = ::socket(addr.generic.sa_family, type, 0));
if (isStream && (addr.generic.sa_family == AF_INET ||
addr.generic.sa_family == AF_INET6)) {
// TODO(0.5): As a hack for the 0.4 release we are always setting
// TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
// RPC protocol. Later, we should extend the interface to provide more
// control over this. Perhaps write() should have a flag which
// specifies whether to pass MSG_MORE.
int one = 1;
result, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)));
return result;
......@@ -642,7 +658,7 @@ Promise<Array<SocketAddress>> SocketAddress::lookupHost(
addr.addrlen = cur->ai_addrlen;
memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen);
static_assert(__has_trivial_copy(SocketAddress), "Can't write() SocketAddress...");
static_assert(canMemcpy<SocketAddress>(), "Can't write() SocketAddress...");
output.write(&addr, sizeof(addr));
cur = cur->ai_next;
......@@ -36,6 +36,9 @@ template <typename T>
class Promise;
class WaitScope;
template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises);
namespace _ { // private
template <typename T> struct JoinPromises_ { typedef T Type; };
......@@ -44,6 +47,9 @@ template <typename T> struct JoinPromises_<Promise<T>> { typedef T Type; };
template <typename T>
using JoinPromises = typename JoinPromises_<T>::Type;
// If T is Promise<U>, resolves to U, otherwise resolves to T.
// TODO(cleanup): Rename to avoid confusion with joinPromises() call which is completely
// unrelated.
class PropagateException {
// A functor which accepts a kj::Exception as a parameter and returns a broken promise of
......@@ -171,6 +177,8 @@ private:
template <typename>
friend class kj::Promise;
friend class TaskSetImpl;
template <typename U>
friend Promise<Array<U>> kj::joinPromises(Array<Promise<U>>&& promises);
void detach(kj::Promise<void>&& promise);
......@@ -514,6 +514,25 @@ TEST(Async, ExclusiveJoin) {
TEST(Async, ArrayJoin) {
EventLoop loop;
WaitScope waitScope(loop);
auto builder = heapArrayBuilder<Promise<int>>(3);
Promise<Array<int>> promise = joinPromises(builder.finish());
auto result = promise.wait(waitScope);
ASSERT_EQ(3u, result.size());
EXPECT_EQ(123, result[0]);
EXPECT_EQ(456, result[1]);
EXPECT_EQ(789, result[2]);
class ErrorHandlerImpl: public TaskSet::ErrorHandler {
uint exceptionCount = 0;
......@@ -54,8 +54,6 @@ public:
Promise<short> onFdEvent(int fd, short eventMask);
// `eventMask` is a bitwise-OR of poll events (e.g. `POLLIN`, `POLLOUT`, etc.). The next time
// one or more of the given events occurs on `fd`, the set of events that occurred are returned.
// The result of waiting on the same FD twice at once is undefined.
Promise<siginfo_t> onSignal(int signum);
// When the given signal is delivered to this thread, return the corresponding siginfo_t.
......@@ -508,6 +508,9 @@ void PromiseNode::OnReadyEvent::arm() {
// -------------------------------------------------------------------
ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
ImmediatePromiseNodeBase::~ImmediatePromiseNodeBase() noexcept(false) {}
void ImmediatePromiseNodeBase::onReady(Event& event) noexcept {
......@@ -807,6 +810,73 @@ PromiseNode* ExclusiveJoinPromiseNode::Branch::getInnerForTrace() {
// -------------------------------------------------------------------
Array<Own<PromiseNode>> promises, ExceptionOrValue* resultParts, size_t partSize)
: countLeft(promises.size()) {
// Make the branches.
auto builder = heapArrayBuilder<Branch>(promises.size());
for (uint i: indices(promises)) {
ExceptionOrValue& output = *reinterpret_cast<ExceptionOrValue*>(
reinterpret_cast<byte*>(resultParts) + i * partSize);
builder.add(*this, kj::mv(promises[i]), output);
branches = builder.finish();
if (branches.size() == 0) {
ArrayJoinPromiseNodeBase::~ArrayJoinPromiseNodeBase() noexcept(false) {}
void ArrayJoinPromiseNodeBase::onReady(Event& event) noexcept {
void ArrayJoinPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
// If any of the elements threw exceptions, propagate them.
for (auto& branch: branches) {
KJ_IF_MAYBE(exception, branch.getPart()) {
if (output.exception == nullptr) {
// No errors. The template subclass will need to fill in the result.
PromiseNode* ArrayJoinPromiseNodeBase::getInnerForTrace() {
return branches.size() == 0 ? nullptr : branches[0].getInnerForTrace();
ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependencyParam, ExceptionOrValue& output)
: joinNode(joinNode), dependency(kj::mv(dependencyParam)), output(output) {
ArrayJoinPromiseNodeBase::Branch::~Branch() noexcept(false) {}
Maybe<Own<Event>> ArrayJoinPromiseNodeBase::Branch::fire() {
if (--joinNode.countLeft == 0) {
return nullptr;
_::PromiseNode* ArrayJoinPromiseNodeBase::Branch::getInnerForTrace() {
return dependency->getInnerForTrace();
Maybe<Exception> ArrayJoinPromiseNodeBase::Branch::getPart() {
return kj::mv(output.exception);
// -------------------------------------------------------------------
Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef)
: dependency(kj::mv(dependencyParam)), resultRef(resultRef) {
......@@ -290,6 +290,8 @@ private:
friend class _::TaskSetImpl;
friend Promise<void> _::yield();
friend class _::NeverDone;
template <typename U>
friend Promise<Array<U>> joinPromises(Array<Promise<U>>&& promises);
template <typename T>
......@@ -339,6 +341,10 @@ PromiseForResult<Func, void> evalLater(Func&& func);
// If you schedule several evaluations with `evalLater` during the same callback, they are
// guaranteed to be executed in order.
template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises);
// Join an array of promises into a promise for an array.
// =======================================================================================
// Hack for creating a lambda that holds an owned pointer.
......@@ -25,8 +25,6 @@
// This defines very simple utilities that are widely applicable.
#include <stddef.h>
#ifndef KJ_COMMON_H_
#define KJ_COMMON_H_
......@@ -69,6 +67,9 @@
#include <stddef.h>
#include <initializer_list>
// =======================================================================================
namespace kj {
......@@ -344,6 +345,29 @@ constexpr bool canConvert() {
return sizeof(CanConvert_<U>::sfinae(instance<T>())) == sizeof(int);
#if __clang__
template <typename T>
constexpr bool canMemcpy() {
// Returns true if T can be copied using memcpy instead of using the copy constructor or
// assignment operator.
// Clang unhelpfully defines __has_trivial_{copy,assign}(T) to be true if the copy constructor /
// assign operator are deleted, on the basis that a strict reading of the definition of "trivial"
// according to the standard says that deleted functions are in fact trivial. Meanwhile Clang
// provides these admittedly-better intrinsics, but GCC does not.
return __is_trivially_constructible(T, const T&) && __is_trivially_assignable(T, const T&);
template <typename T>
constexpr bool canMemcpy() {
// Returns true if T can be copied using memcpy instead of using the copy constructor or
// assignment operator.
// GCC defines these to mean what we want them to mean.
return __has_trivial_copy(T) && __has_trivial_assign(T);
// =======================================================================================
// Equivalents to std::move() and std::forward(), since these are very commonly needed and the
// std header <utility> pulls in lots of other stuff.
......@@ -917,6 +941,8 @@ public:
inline constexpr ArrayPtr(decltype(nullptr)): ptr(nullptr), size_(0) {}
inline constexpr ArrayPtr(T* ptr, size_t size): ptr(ptr), size_(size) {}
inline constexpr ArrayPtr(T* begin, T* end): ptr(begin), size_(end - begin) {}
inline constexpr ArrayPtr(std::initializer_list<RemoveConstOrDisable<T>> init)
: ptr(init.begin()), size_(init.size()) {}
template <size_t size>
inline constexpr ArrayPtr(T (&native)[size]): ptr(native), size_(size) {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment