Commit fa34f823 authored by Kenton Varda's avatar Kenton Varda

Add a sample RPC application, and fix some bugs / add some missing features needed by said app.

parent b72dd35a
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
/c++/src/os /c++/src/os
/c++/src/protobuf /c++/src/protobuf
/c++/src/snappy /c++/src/snappy
/c++/src/samples
# Ekam build artifacts. # Ekam build artifacts.
/c++/tmp/ /c++/tmp/
......
#include "calculator.capnp.h"
#include <capnp/ez-rpc.h>
#include <kj/debug.h>
#include <math.h>
#include <iostream>
class PowerFunction final: public Calculator::Function::Server {
// An implementation of the Function interface wrapping pow(). Note that
// we're implementing this on the client side and will pass a reference to
// the server. The server will then be able to make calls back to the client.
public:
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == 2, "Wrong number of parameters.");
context.getResults().setValue(pow(params[0], params[1]));
return kj::READY_NOW;
}
};
int main(int argc, const char* argv[]) {
if (argc != 2) {
std::cerr << "usage: " << argv[0] << " HOST:PORT\n"
"Connects to the Calculator server at the given address and "
"does some RPCs." << std::endl;
return 1;
}
capnp::EzRpcClient client(argv[1]);
Calculator::Client calculator = client.importCap<Calculator>("calculator");
// Keep an eye on `waitScope`. Whenever you see it used is a place where we
// stop and wait for the server to respond. If a line of code does not use
// `waitScope`, then it does not block!
auto& waitScope = client.getWaitScope();
{
// Make a request that just evaluates the literal value 123.
//
// What's interesting here is that evaluate() returns a "Value", which is
// another interface and therefore points back to an object living on the
// server. We then have to call read() on that object to read it.
// However, even though we are making two RPC's, this block executes in
// *one* network round trip because of promise pipelining: we do not wait
// for the first call to complete before we send the second call to the
// server.
std::cout << "Evaluating a literal... ";
std::cout.flush();
// Set up the request.
auto request = calculator.evaluateRequest();
request.getExpression().setLiteral(123);
// Send it, which returns a promise for the result (without blocking).
auto evalPromise = request.send();
// Using the promise, create a pipelined request to call read() on the
// returned object, and then send that.
auto readPromise = evalPromise.getValue().readRequest().send();
// Now that we've sent all the requests, wait for the response. Until this
// point, we haven't waited at all!
auto response = readPromise.wait(waitScope);
KJ_ASSERT(response.getValue() == 123);
std::cout << "PASS" << std::endl;
}
{
// Make a request to evaluate 123 + 45 - 67.
//
// The Calculator interface requires that we first call getOperator() to
// get the addition and subtraction functions, then call evaluate() to use
// them. But, once again, we can get both functions, call evaluate(), and
// then read() the result -- four RPCs -- in the time of *one* network
// round trip, because of promise pipelining.
std::cout << "Using add and subtract... ";
std::cout.flush();
Calculator::Function::Client add = nullptr;
Calculator::Function::Client subtract = nullptr;
{
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::ADD);
add = request.send().getFunc();
}
{
// Get the "subtract" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::SUBTRACT);
subtract = request.send().getFunc();
}
// Build the request to evaluate 123 + 45 - 67.
auto request = calculator.evaluateRequest();
auto subtractCall = request.getExpression().initCall();
subtractCall.setFunction(subtract);
auto subtractParams = subtractCall.initParams(2);
subtractParams[1].setLiteral(67);
auto addCall = subtractParams[0].initCall();
addCall.setFunction(add);
auto addParams = addCall.initParams(2);
addParams[0].setLiteral(123);
addParams[1].setLiteral(45);
// Send the evaluate() request, read() the result, and wait for read() to
// finish.
auto evalPromise = request.send();
auto readPromise = evalPromise.getValue().readRequest().send();
auto response = readPromise.wait(waitScope);
KJ_ASSERT(response.getValue() == 101);
std::cout << "PASS" << std::endl;
}
{
// Make a request to evaluate 4 * 6, then use the result in two more
// requests that add 3 and 5.
//
// Since evaluate() returns its result wrapped in a `Value`, we can pass
// that `Value` back to the server in subsequent requests before the first
// `evaluate()` has actually returned. Thus, this example again does only
// one network round trip.
std::cout << "Pipelining eval() calls... ";
std::cout.flush();
Calculator::Function::Client add = nullptr;
Calculator::Function::Client multiply = nullptr;
{
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::ADD);
add = request.send().getFunc();
}
{
// Get the "multiply" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::MULTIPLY);
multiply = request.send().getFunc();
}
// Build the request to evaluate 4 * 6
auto request = calculator.evaluateRequest();
auto multiplyCall = request.getExpression().initCall();
multiplyCall.setFunction(multiply);
auto multiplyParams = multiplyCall.initParams(2);
multiplyParams[0].setLiteral(4);
multiplyParams[1].setLiteral(6);
auto multiplyResult = request.send().getValue();
// Use the result in two calls that add 3 and add 5.
auto add3Request = calculator.evaluateRequest();
auto add3Call = add3Request.getExpression().initCall();
add3Call.setFunction(add);
auto add3Params = add3Call.initParams(2);
add3Params[0].setPreviousResult(multiplyResult);
add3Params[1].setLiteral(3);
auto add3Promise = add3Request.send().getValue().readRequest().send();
auto add5Request = calculator.evaluateRequest();
auto add5Call = add5Request.getExpression().initCall();
add5Call.setFunction(add);
auto add5Params = add5Call.initParams(2);
add5Params[0].setPreviousResult(multiplyResult);
add5Params[1].setLiteral(5);
auto add5Promise = add5Request.send().getValue().readRequest().send();
// Now wait for the results.
KJ_ASSERT(add3Promise.wait(waitScope).getValue() == 27);
KJ_ASSERT(add5Promise.wait(waitScope).getValue() == 29);
std::cout << "PASS" << std::endl;
}
{
// Our calculator interface supports defining functions. Here we use it
// to define two functions and then make calls to them as follows:
//
// f(x, y) = x * 100 + y
// g(x) = f(x, x + 1) * 2;
// f(12, 34)
// g(21)
//
// Once again, the whole thing takes only one network round trip.
std::cout << "Defining functions... ";
std::cout.flush();
Calculator::Function::Client add = nullptr;
Calculator::Function::Client multiply = nullptr;
Calculator::Function::Client f = nullptr;
Calculator::Function::Client g = nullptr;
{
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::ADD);
add = request.send().getFunc();
}
{
// Get the "multiply" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::MULTIPLY);
multiply = request.send().getFunc();
}
{
// Define f.
auto request = calculator.defFunctionRequest();
request.setParamCount(2);
{
// Build the function body.
auto addCall = request.getBody().initCall();
addCall.setFunction(add);
auto addParams = addCall.initParams(2);
addParams[1].setParameter(1); // y
auto multiplyCall = addParams[0].initCall();
multiplyCall.setFunction(multiply);
auto multiplyParams = multiplyCall.initParams(2);
multiplyParams[0].setParameter(0); // x
multiplyParams[1].setLiteral(100);
}
f = request.send().getFunc();
}
{
// Define g.
auto request = calculator.defFunctionRequest();
request.setParamCount(1);
{
// Build the function body.
auto multiplyCall = request.getBody().initCall();
multiplyCall.setFunction(multiply);
auto multiplyParams = multiplyCall.initParams(2);
multiplyParams[1].setLiteral(2);
auto fCall = multiplyParams[0].initCall();
fCall.setFunction(f);
auto fParams = fCall.initParams(2);
fParams[0].setParameter(0);
auto addCall = fParams[1].initCall();
addCall.setFunction(add);
auto addParams = addCall.initParams(2);
addParams[0].setParameter(0);
addParams[1].setLiteral(1);
}
g = request.send().getFunc();
}
// OK, we've defined all our functions. Now create our eval requests.
// f(12, 34)
auto fEvalRequest = calculator.evaluateRequest();
auto fCall = fEvalRequest.initExpression().initCall();
fCall.setFunction(f);
auto fParams = fCall.initParams(2);
fParams[0].setLiteral(12);
fParams[1].setLiteral(34);
auto fEvalPromise = fEvalRequest.send().getValue().readRequest().send();
// g(21)
auto gEvalRequest = calculator.evaluateRequest();
auto gCall = gEvalRequest.initExpression().initCall();
gCall.setFunction(g);
gCall.initParams(1)[0].setLiteral(21);
auto gEvalPromise = gEvalRequest.send().getValue().readRequest().send();
// Wait for the results.
KJ_ASSERT(fEvalPromise.wait(waitScope).getValue() == 1234);
KJ_ASSERT(gEvalPromise.wait(waitScope).getValue() == 4244);
std::cout << "PASS" << std::endl;
}
{
// Make a request that will call back to a function defined locally.
//
// Specifically, we will compute 2^(4 + 5). However, exponent is not
// defined by the Calculator server. So, we'll implement the Function
// interface locally and pass it to the server for it to use when
// evaluating the expression.
//
// This example requires two network round trips to complete, because the
// server calls back to the client once before finishing. In this
// particular case, this could potentially be optimized by using a tail
// call on the server side -- see CallContext::tailCall(). However, to
// keep the example simpler, we haven't implemented this optimization in
// the sample server.
std::cout << "Using a callback... ";
std::cout.flush();
Calculator::Function::Client add = nullptr;
{
// Get the "add" function from the server.
auto request = calculator.getOperatorRequest();
request.setOp(Calculator::Operator::ADD);
add = request.send().getFunc();
}
// Build the eval request for 2^(4+5).
auto request = calculator.evaluateRequest();
auto powCall = request.getExpression().initCall();
powCall.setFunction(kj::heap<PowerFunction>());
auto powParams = powCall.initParams(2);
powParams[0].setLiteral(2);
auto addCall = powParams[1].initCall();
addCall.setFunction(add);
auto addParams = addCall.initParams(2);
addParams[0].setLiteral(4);
addParams[1].setLiteral(5);
// Send the request and wait.
auto response = request.send().getValue().readRequest()
.send().wait(waitScope);
KJ_ASSERT(response.getValue() == 512);
std::cout << "PASS" << std::endl;
}
return 0;
}
#include "calculator.capnp.h"
#include <kj/debug.h>
#include <capnp/ez-rpc.h>
#include <capnp/capability-context.h> // for LocalMessage
#include <iostream>
typedef unsigned int uint;
kj::Promise<double> readValue(Calculator::Value::Client value) {
// Helper function to asynchronously call read() on a Calculator::Value and
// return a promise for the result. (In the future, the generated code might
// include something like this automatically.)
return value.readRequest().send()
.then([](capnp::Response<Calculator::Value::ReadResults> result) {
return result.getValue();
});
}
kj::Promise<double> evaluateImpl(
Calculator::Expression::Reader expression,
capnp::List<double>::Reader params = capnp::List<double>::Reader()) {
// Implementation of CalculatorImpl::evaluate(), also shared by
// FunctionImpl::call(). In the latter case, `params` are the parameter
// values passed to the function; in the former case, `params` is just an
// empty list.
switch (expression.which()) {
case Calculator::Expression::LITERAL:
return expression.getLiteral();
case Calculator::Expression::PREVIOUS_RESULT:
return readValue(expression.getPreviousResult());
case Calculator::Expression::PARAMETER: {
KJ_REQUIRE(expression.getParameter() < params.size(),
"Parameter index out-of-range.");
return params[expression.getParameter()];
}
case Calculator::Expression::CALL: {
auto call = expression.getCall();
auto func = call.getFunction();
// Evaluate each parameter.
kj::Array<kj::Promise<double>> paramPromises =
KJ_MAP(param, call.getParams()) {
return evaluateImpl(param, params);
};
// Join the array of promises into a promise for an array.
kj::Promise<kj::Array<double>> joinedParams =
kj::joinPromises(kj::mv(paramPromises));
// When the parameters are complete, call the function.
return joinedParams.then([func](kj::Array<double>&& paramValues) mutable {
auto request = func.callRequest();
request.setParams(paramValues);
return request.send().then(
[](capnp::Response<Calculator::Function::CallResults>&& result) {
return result.getValue();
});
});
}
default:
// Throw an exception.
KJ_FAIL_REQUIRE("Unknown expression type.");
}
}
class ValueImpl final: public Calculator::Value::Server {
// Simple implementation of the Calculator.Value Cap'n Proto interface.
public:
ValueImpl(double value): value(value) {}
kj::Promise<void> read(ReadContext context) {
context.getResults().setValue(value);
return kj::READY_NOW;
}
private:
double value;
};
class FunctionImpl final: public Calculator::Function::Server {
// Implementation of the Calculator.Function Cap'n Proto interface, where the
// function is defined by a Calculator.Expression.
public:
FunctionImpl(uint paramCount, Calculator::Expression::Reader body)
: paramCount(paramCount), body(body) {}
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == paramCount, "Wrong number of parameters.");
return evaluateImpl(body.getRoot().getAs<Calculator::Expression>(), params)
.then([context](double value) mutable {
context.getResults().setValue(value);
});
}
private:
uint paramCount;
// The function's arity.
capnp::LocalMessage body;
// LocalMessage holds a message that might contain capabilities (interface
// references). Here we're using it to hold a Calculator.Expression, which
// might contain Calculator.Function and/or Calculator.Value capabilities.
};
class OperatorImpl final: public Calculator::Function::Server {
// Implementation of the Calculator.Function Cap'n Proto interface, wrapping
// basic binary arithmetic operators.
public:
OperatorImpl(Calculator::Operator op): op(op) {}
kj::Promise<void> call(CallContext context) {
auto params = context.getParams().getParams();
KJ_REQUIRE(params.size() == 2, "Wrong number of parameters.");
double result;
switch (op) {
case Calculator::Operator::ADD: result = params[0] + params[1]; break;
case Calculator::Operator::SUBTRACT:result = params[0] - params[1]; break;
case Calculator::Operator::MULTIPLY:result = params[0] * params[1]; break;
case Calculator::Operator::DIVIDE: result = params[0] / params[1]; break;
default:
KJ_FAIL_REQUIRE("Unknown operator.");
}
context.getResults().setValue(result);
return kj::READY_NOW;
}
private:
Calculator::Operator op;
};
class CalculatorImpl final: public Calculator::Server {
// Implementation of the Calculator Cap'n Proto interface.
public:
kj::Promise<void> evaluate(EvaluateContext context) override {
return evaluateImpl(context.getParams().getExpression())
.then([context](double value) mutable {
context.getResults().setValue(kj::heap<ValueImpl>(value));
});
}
kj::Promise<void> defFunction(DefFunctionContext context) override {
auto params = context.getParams();
context.getResults().setFunc(kj::heap<FunctionImpl>(
params.getParamCount(), params.getBody()));
return kj::READY_NOW;
}
kj::Promise<void> getOperator(GetOperatorContext context) override {
context.getResults().setFunc(kj::heap<OperatorImpl>(
context.getParams().getOp()));
return kj::READY_NOW;
}
};
int main(int argc, const char* argv[]) {
if (argc != 2) {
std::cerr << "usage: " << argv[0] << " ADDRESS[:PORT]\n"
"Runs the server bound to the given address/port.\n"
"ADDRESS may be '*' to bind to all local addresses.\n"
":PORT may be omitted to choose a port automatically." << std::endl;
return 1;
}
// Set up a server.
capnp::EzRpcServer server(argv[1]);
server.exportCap("calculator", kj::heap<CalculatorImpl>());
// Write the port number to stdout, in case it was chosen automatically.
auto& waitScope = server.getWaitScope();
uint port = server.getPort().wait(waitScope);
if (port == 0) {
// The address format "unix:/path/to/socket" opens a unix domain socket,
// in which case the port will be zero.
std::cout << "Listening on Unix socket..." << std::endl;
} else {
std::cout << "Listening on port " << port << "..." << std::endl;
}
// Run forever, accepting connections and handling requests.
kj::NEVER_DONE.wait(waitScope);
}
@0x85150b117366d14b;
interface Calculator {
# A "simple" mathematical calculator, callable via RPC.
#
# But, to show off Cap'n Proto, we add some twists:
#
# - You can use the result from one call as the input to the next
# without a network round trip. To accomplish this, evaluate()
# returns a `Value` object wrapping the actual numeric value.
# This object may be used in a subsequent expression. With
# promise pipelining, the Value can actually be used before
# the evaluate() call that creates it returns!
#
# - You can define new functions, and then call them. This again
# shows off pipelining, but it also gives the client the
# opportunity to define a function on the client side and have
# the server call back to it.
#
# - The basic arithmetic operators are exposed as Functions, and
# you have to call getOperator() to obtain them from the server.
# This again demonstrates pipelining -- using getOperator() to
# get each operator and then using them in evaluate() still
# only takes one network round trip.
evaluate @0 (expression: Expression) -> (value: Value);
# Evaluate the given expression and return the result. The
# result is returned wrapped in a Value interface so that you
# may pass it back to the server in a pipelined request. To
# actually get the numeric value, you must call read() on the
# Value -- but again, this can be pipelined so that it incurs
# no additional latency.
struct Expression {
# A numeric expression.
union {
literal @0 :Float64;
# A literal numeric value.
previousResult @1 :Value;
# A value that was (or, will be) returned by a previous
# evaluate().
parameter @2 :UInt32;
# A parameter to the function (only valid in function bodies;
# see defFunction).
call :group {
# Call a function on a list of parameters.
function @3 :Function;
params @4 :List(Expression);
}
}
}
interface Value {
# Wraps a numeric value in an RPC object. This allows the value
# to be used in subsequent evaluate() requests without the client
# waiting for the evaluate() that returns the Value to finish.
read @0 () -> (value :Float64);
# Read back the raw numeric value.
}
defFunction @1 (paramCount :Int32, body :Expression)
-> (func :Function);
# Define a function that takes `paramCount` parameters and returns the
# evaluation of `body` after substituting these parameters.
interface Function {
# An algebraic function. Can be called directly, or can be used inside
# an Expression.
#
# A client can create a Function that runs on the server side using
# `defFunction()` or `getOperator()`. Alternatively, a client can
# implement a Function on the client side and the server will call back
# to it. However, a function defined on the client side will require a
# network round trip whenever the server needs to call it, whereas
# functions defined on the server and then passed back to it are called
# locally.
call @0 (params :List(Float64)) -> (value: Float64);
# Call the function on the given parameters.
}
getOperator @2 (op: Operator) -> (func: Function);
# Get a Function representing an arithmetic operator, which can then be
# used in Expressions.
enum Operator {
add @0;
subtract @1;
multiply @2;
divide @3;
}
}
...@@ -11,3 +11,15 @@ c++ -std=c++11 -Wall addressbook.c++ addressbook.capnp.c++ -lcapnp -lkj -pthread ...@@ -11,3 +11,15 @@ c++ -std=c++11 -Wall addressbook.c++ addressbook.capnp.c++ -lcapnp -lkj -pthread
./addressbook dwrite | ./addressbook dread ./addressbook dwrite | ./addressbook dread
rm addressbook addressbook.capnp.c++ addressbook.capnp.h rm addressbook addressbook.capnp.c++ addressbook.capnp.h
capnpc -oc++ calculator.capnp
c++ -std=c++11 -Wall calculator-client.c++ calculator.capnp.c++ -lcapnp-rpc -lcapnp -lkj-async \
-lkj -pthread -o calculator-client
c++ -std=c++11 -Wall calculator-server.c++ calculator.capnp.c++ -lcapnp-rpc -lcapnp -lkj-async \
-lkj -pthread -o calculator-server
rm -f /tmp/capnp-calculator-example-$$
./calculator-server unix:/tmp/capnp-calculator-example-$$ &
sleep 0.1
./calculator-client unix:/tmp/capnp-calculator-example-$$
kill %+
wait %+ || true
rm calculator-client calculator-server calculator.capnp.c++ calculator.capnp.h /tmp/capnp-calculator-example-$$
...@@ -105,7 +105,12 @@ class LocalMessage final { ...@@ -105,7 +105,12 @@ class LocalMessage final {
// know how to properly serialize its capabilities. // know how to properly serialize its capabilities.
public: public:
LocalMessage(kj::Maybe<MessageSize> sizeHint = nullptr); explicit LocalMessage(kj::Maybe<MessageSize> sizeHint = nullptr);
template <typename T, typename = FromReader<T>>
inline LocalMessage(T&& reader): LocalMessage(reader.totalSize()) {
// Create a LocalMessage that is a copy of a given reader.
getRoot().setAs<FromReader<T>>(kj::fwd<T>(reader));
}
inline AnyPointer::Builder getRoot() { return root; } inline AnyPointer::Builder getRoot() { return root; }
inline AnyPointer::Reader getRootReader() const { return root.asReader(); } inline AnyPointer::Reader getRootReader() const { return root.asReader(); }
......
...@@ -51,7 +51,7 @@ if test "$INTERCEPTOR" = ""; then ...@@ -51,7 +51,7 @@ if test "$INTERCEPTOR" = ""; then
exit 1 exit 1
fi fi
echo findProvider file:capnp echo findProvider file:compiler/capnp
read CAPNP read CAPNP
if test "$CAPNP" = ""; then if test "$CAPNP" = ""; then
......
...@@ -944,7 +944,7 @@ private: ...@@ -944,7 +944,7 @@ private:
" inline void set", titleCase, "(", type, "::Reader value);\n", " inline void set", titleCase, "(", type, "::Reader value);\n",
kind == FieldKind::LIST && !isStructOrCapList kind == FieldKind::LIST && !isStructOrCapList
? kj::strTree( ? kj::strTree(
" inline void set", titleCase, "(std::initializer_list<", elementReaderType, "> value);\n") " inline void set", titleCase, "(::kj::ArrayPtr<const ", elementReaderType, "> value);\n")
: kj::strTree(), : kj::strTree(),
kind == FieldKind::STRUCT kind == FieldKind::STRUCT
? kj::strTree( ? kj::strTree(
...@@ -994,7 +994,7 @@ private: ...@@ -994,7 +994,7 @@ private:
"}\n", "}\n",
kind == FieldKind::LIST && !isStructOrCapList kind == FieldKind::LIST && !isStructOrCapList
? kj::strTree( ? kj::strTree(
"inline void ", scope, "Builder::set", titleCase, "(std::initializer_list<", elementReaderType, "> value) {\n", "inline void ", scope, "Builder::set", titleCase, "(::kj::ArrayPtr<const ", elementReaderType, "> value) {\n",
unionDiscrim.set, unionDiscrim.set,
" ::capnp::_::PointerHelpers<", type, ">::set(\n" " ::capnp::_::PointerHelpers<", type, ">::set(\n"
" _builder.getPointerField(", offset, " * ::capnp::POINTERS), value);\n" " _builder.getPointerField(", offset, " * ::capnp::POINTERS), value);\n"
......
...@@ -1403,12 +1403,12 @@ DynamicValue::Reader::Reader(const Reader& other) { ...@@ -1403,12 +1403,12 @@ DynamicValue::Reader::Reader(const Reader& other) {
case ENUM: case ENUM:
case STRUCT: case STRUCT:
case ANY_POINTER: case ANY_POINTER:
static_assert(__has_trivial_copy(Text::Reader) && static_assert(kj::canMemcpy<Text::Reader>() &&
__has_trivial_copy(Data::Reader) && kj::canMemcpy<Data::Reader>() &&
__has_trivial_copy(DynamicList::Reader) && kj::canMemcpy<DynamicList::Reader>() &&
__has_trivial_copy(DynamicEnum) && kj::canMemcpy<DynamicEnum>() &&
__has_trivial_copy(DynamicStruct::Reader) && kj::canMemcpy<DynamicStruct::Reader>() &&
__has_trivial_copy(AnyPointer::Reader), kj::canMemcpy<AnyPointer::Reader>(),
"Assumptions here don't hold."); "Assumptions here don't hold.");
break; break;
...@@ -1434,12 +1434,12 @@ DynamicValue::Reader::Reader(Reader&& other) noexcept { ...@@ -1434,12 +1434,12 @@ DynamicValue::Reader::Reader(Reader&& other) noexcept {
case ENUM: case ENUM:
case STRUCT: case STRUCT:
case ANY_POINTER: case ANY_POINTER:
static_assert(__has_trivial_copy(Text::Reader) && static_assert(kj::canMemcpy<Text::Reader>() &&
__has_trivial_copy(Data::Reader) && kj::canMemcpy<Data::Reader>() &&
__has_trivial_copy(DynamicList::Reader) && kj::canMemcpy<DynamicList::Reader>() &&
__has_trivial_copy(DynamicEnum) && kj::canMemcpy<DynamicEnum>() &&
__has_trivial_copy(DynamicStruct::Reader) && kj::canMemcpy<DynamicStruct::Reader>() &&
__has_trivial_copy(AnyPointer::Reader), kj::canMemcpy<AnyPointer::Reader>(),
"Assumptions here don't hold."); "Assumptions here don't hold.");
break; break;
...@@ -1486,7 +1486,7 @@ DynamicValue::Builder::Builder(Builder& other) { ...@@ -1486,7 +1486,7 @@ DynamicValue::Builder::Builder(Builder& other) {
case ENUM: case ENUM:
case STRUCT: case STRUCT:
case ANY_POINTER: case ANY_POINTER:
// Unfortunately __has_trivial_copy doesn't work on these types due to the use of // Unfortunately canMemcpy() doesn't work on these types due to the use of
// DisallowConstCopy, but __has_trivial_destructor should detect if any of these types // DisallowConstCopy, but __has_trivial_destructor should detect if any of these types
// become non-trivial. // become non-trivial.
static_assert(__has_trivial_destructor(Text::Builder) && static_assert(__has_trivial_destructor(Text::Builder) &&
......
...@@ -1830,7 +1830,12 @@ struct WireHelpers { ...@@ -1830,7 +1830,12 @@ struct WireHelpers {
SegmentReader* segment, const WirePointer* ref, int nestingLimit)) { SegmentReader* segment, const WirePointer* ref, int nestingLimit)) {
kj::Maybe<kj::Own<ClientHook>> maybeCap; kj::Maybe<kj::Own<ClientHook>> maybeCap;
if (ref->isNull()) { if (segment == nullptr) {
// No capability context for unchecked messages.
// TODO(now): This means a capability read from an omitted (and therefore default-valued)
// sub-message will throw a fatal exception.
maybeCap = nullptr;
} else if (ref->isNull()) {
maybeCap = segment->getArena()->newBrokenCap("Calling null capability pointer."); maybeCap = segment->getArena()->newBrokenCap("Calling null capability pointer.");
} else if (!ref->isCapability()) { } else if (!ref->isCapability()) {
KJ_FAIL_REQUIRE( KJ_FAIL_REQUIRE(
......
...@@ -77,8 +77,7 @@ struct PointerHelpers<List<T>, Kind::LIST> { ...@@ -77,8 +77,7 @@ struct PointerHelpers<List<T>, Kind::LIST> {
static inline void set(PointerBuilder builder, typename List<T>::Reader value) { static inline void set(PointerBuilder builder, typename List<T>::Reader value) {
builder.setList(value.reader); builder.setList(value.reader);
} }
template <typename U> static void set(PointerBuilder builder, kj::ArrayPtr<const ReaderFor<T>> value) {
static void set(PointerBuilder builder, std::initializer_list<U> value) {
auto l = init(builder, value.size()); auto l = init(builder, value.size());
uint i = 0; uint i = 0;
for (auto& element: value) { for (auto& element: value) {
......
...@@ -723,9 +723,9 @@ private: ...@@ -723,9 +723,9 @@ private:
fork(eventual.fork()), fork(eventual.fork()),
resolveSelfPromise(fork.addBranch().then( resolveSelfPromise(fork.addBranch().then(
[this](kj::Own<ClientHook>&& resolution) { [this](kj::Own<ClientHook>&& resolution) {
resolve(kj::mv(resolution)); resolve(kj::mv(resolution), false);
}, [this](kj::Exception&& exception) { }, [this](kj::Exception&& exception) {
resolve(newBrokenCap(kj::mv(exception))); resolve(newBrokenCap(kj::mv(exception)), true);
}).eagerlyEvaluate([&](kj::Exception&& e) { }).eagerlyEvaluate([&](kj::Exception&& e) {
// Make any exceptions thrown from resolve() go to the connection's TaskSet which // Make any exceptions thrown from resolve() go to the connection's TaskSet which
// will cause the connection to be terminated. // will cause the connection to be terminated.
...@@ -810,8 +810,8 @@ private: ...@@ -810,8 +810,8 @@ private:
bool receivedCall = false; bool receivedCall = false;
void resolve(kj::Own<ClientHook> replacement) { void resolve(kj::Own<ClientHook> replacement, bool isError) {
if (replacement->getBrand() != connectionState.get() && receivedCall) { if (replacement->getBrand() != connectionState.get() && receivedCall && !isError) {
// The new capability is hosted locally, not on the remote machine. And, we had made calls // The new capability is hosted locally, not on the remote machine. And, we had made calls
// to the promise. We need to make sure those calls echo back to us before we allow new // to the promise. We need to make sure those calls echo back to us before we allow new
// calls to go directly to the local capability, so we need to set a local embargo and send // calls to go directly to the local capability, so we need to set a local embargo and send
......
...@@ -570,8 +570,7 @@ T* HeapArrayDisposer::allocateUninitialized(size_t count) { ...@@ -570,8 +570,7 @@ T* HeapArrayDisposer::allocateUninitialized(size_t count) {
return Allocate_<T, true, true>::allocate(0, count); return Allocate_<T, true, true>::allocate(0, count);
} }
template <typename Element, typename Iterator, template <typename Element, typename Iterator, bool = canMemcpy<Element>()>
bool trivial = __has_trivial_copy(Element) && __has_trivial_assign(Element)>
struct CopyConstructArray_; struct CopyConstructArray_;
template <typename T> template <typename T>
......
...@@ -172,6 +172,9 @@ protected: ...@@ -172,6 +172,9 @@ protected:
class ImmediatePromiseNodeBase: public PromiseNode { class ImmediatePromiseNodeBase: public PromiseNode {
public: public:
ImmediatePromiseNodeBase();
~ImmediatePromiseNodeBase() noexcept(false);
void onReady(Event& event) noexcept override; void onReady(Event& event) noexcept override;
}; };
...@@ -480,6 +483,70 @@ private: ...@@ -480,6 +483,70 @@ private:
// ------------------------------------------------------------------- // -------------------------------------------------------------------
class ArrayJoinPromiseNodeBase: public PromiseNode {
public:
ArrayJoinPromiseNodeBase(Array<Own<PromiseNode>> promises,
ExceptionOrValue* resultParts, size_t partSize);
~ArrayJoinPromiseNodeBase() noexcept(false);
void onReady(Event& event) noexcept override final;
void get(ExceptionOrValue& output) noexcept override final;
PromiseNode* getInnerForTrace() override final;
protected:
virtual void getNoError(ExceptionOrValue& output) noexcept = 0;
// Called to compile the result only in the case where there were no errors.
private:
uint countLeft;
OnReadyEvent onReadyEvent;
class Branch: public Event {
public:
Branch(ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependency,
ExceptionOrValue& output);
~Branch() noexcept(false);
Maybe<Own<Event>> fire() override;
_::PromiseNode* getInnerForTrace() override;
Maybe<Exception> getPart();
// Calls dependency->get(output). If there was an exception, return it.
private:
ArrayJoinPromiseNodeBase& joinNode;
Own<PromiseNode> dependency;
ExceptionOrValue& output;
};
Array<Branch> branches;
};
template <typename T>
class ArrayJoinPromiseNode final: public ArrayJoinPromiseNodeBase {
public:
ArrayJoinPromiseNode(Array<Own<PromiseNode>> promises,
Array<ExceptionOr<T>> resultParts)
: ArrayJoinPromiseNodeBase(kj::mv(promises), resultParts.begin(), sizeof(ExceptionOr<T>)),
resultParts(kj::mv(resultParts)) {}
protected:
void getNoError(ExceptionOrValue& output) noexcept override {
auto builder = heapArrayBuilder<T>(resultParts.size());
for (auto& part: resultParts) {
KJ_IASSERT(part.value != nullptr,
"Bug in KJ promise framework: Promise result had neither value no exception.");
builder.add(kj::mv(*_::readMaybe(part.value)));
}
output.as<Array<T>>() = builder.finish();
}
private:
Array<ExceptionOr<T>> resultParts;
};
// -------------------------------------------------------------------
class EagerPromiseNodeBase: public PromiseNode, protected Event { class EagerPromiseNodeBase: public PromiseNode, protected Event {
// A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly // A PromiseNode that eagerly evaluates its dependency even if its dependent does not eagerly
// evaluate it. // evaluate it.
...@@ -682,6 +749,13 @@ void Promise<void>::detach(ErrorFunc&& errorHandler) { ...@@ -682,6 +749,13 @@ void Promise<void>::detach(ErrorFunc&& errorHandler) {
return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler))); return _::detach(then([]() {}, kj::fwd<ErrorFunc>(errorHandler)));
} }
template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises) {
return Promise<Array<T>>(false, kj::heap<_::ArrayJoinPromiseNode<T>>(
KJ_MAP(p, promises) { return kj::mv(p.node); },
heapArray<_::ExceptionOr<T>>(promises.size())));
}
// ======================================================================================= // =======================================================================================
namespace _ { // private namespace _ { // private
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/un.h> #include <sys/un.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <netinet/tcp.h>
#include <stddef.h> #include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <arpa/inet.h> #include <arpa/inet.h>
...@@ -294,11 +295,26 @@ public: ...@@ -294,11 +295,26 @@ public:
} }
int socket(int type) const { int socket(int type) const {
bool isStream = type == SOCK_STREAM;
int result; int result;
#if __linux__ #if __linux__
type |= SOCK_NONBLOCK | SOCK_CLOEXEC; type |= SOCK_NONBLOCK | SOCK_CLOEXEC;
#endif #endif
KJ_SYSCALL(result = ::socket(addr.generic.sa_family, type, 0)); KJ_SYSCALL(result = ::socket(addr.generic.sa_family, type, 0));
if (isStream && (addr.generic.sa_family == AF_INET ||
addr.generic.sa_family == AF_INET6)) {
// TODO(0.5): As a hack for the 0.4 release we are always setting
// TCP_NODELAY because Nagle's algorithm pretty much kills Cap'n Proto's
// RPC protocol. Later, we should extend the interface to provide more
// control over this. Perhaps write() should have a flag which
// specifies whether to pass MSG_MORE.
int one = 1;
KJ_SYSCALL(setsockopt(
result, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(one)));
}
return result; return result;
} }
...@@ -642,7 +658,7 @@ Promise<Array<SocketAddress>> SocketAddress::lookupHost( ...@@ -642,7 +658,7 @@ Promise<Array<SocketAddress>> SocketAddress::lookupHost(
addr.addrlen = cur->ai_addrlen; addr.addrlen = cur->ai_addrlen;
memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen); memcpy(&addr.addr.generic, cur->ai_addr, cur->ai_addrlen);
} }
static_assert(__has_trivial_copy(SocketAddress), "Can't write() SocketAddress..."); static_assert(canMemcpy<SocketAddress>(), "Can't write() SocketAddress...");
output.write(&addr, sizeof(addr)); output.write(&addr, sizeof(addr));
cur = cur->ai_next; cur = cur->ai_next;
} }
......
...@@ -36,6 +36,9 @@ template <typename T> ...@@ -36,6 +36,9 @@ template <typename T>
class Promise; class Promise;
class WaitScope; class WaitScope;
template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises);
namespace _ { // private namespace _ { // private
template <typename T> struct JoinPromises_ { typedef T Type; }; template <typename T> struct JoinPromises_ { typedef T Type; };
...@@ -44,6 +47,9 @@ template <typename T> struct JoinPromises_<Promise<T>> { typedef T Type; }; ...@@ -44,6 +47,9 @@ template <typename T> struct JoinPromises_<Promise<T>> { typedef T Type; };
template <typename T> template <typename T>
using JoinPromises = typename JoinPromises_<T>::Type; using JoinPromises = typename JoinPromises_<T>::Type;
// If T is Promise<U>, resolves to U, otherwise resolves to T. // If T is Promise<U>, resolves to U, otherwise resolves to T.
//
// TODO(cleanup): Rename to avoid confusion with joinPromises() call which is completely
// unrelated.
class PropagateException { class PropagateException {
// A functor which accepts a kj::Exception as a parameter and returns a broken promise of // A functor which accepts a kj::Exception as a parameter and returns a broken promise of
...@@ -171,6 +177,8 @@ private: ...@@ -171,6 +177,8 @@ private:
template <typename> template <typename>
friend class kj::Promise; friend class kj::Promise;
friend class TaskSetImpl; friend class TaskSetImpl;
template <typename U>
friend Promise<Array<U>> kj::joinPromises(Array<Promise<U>>&& promises);
}; };
void detach(kj::Promise<void>&& promise); void detach(kj::Promise<void>&& promise);
......
...@@ -514,6 +514,25 @@ TEST(Async, ExclusiveJoin) { ...@@ -514,6 +514,25 @@ TEST(Async, ExclusiveJoin) {
} }
} }
TEST(Async, ArrayJoin) {
EventLoop loop;
WaitScope waitScope(loop);
auto builder = heapArrayBuilder<Promise<int>>(3);
builder.add(123);
builder.add(456);
builder.add(789);
Promise<Array<int>> promise = joinPromises(builder.finish());
auto result = promise.wait(waitScope);
ASSERT_EQ(3u, result.size());
EXPECT_EQ(123, result[0]);
EXPECT_EQ(456, result[1]);
EXPECT_EQ(789, result[2]);
}
class ErrorHandlerImpl: public TaskSet::ErrorHandler { class ErrorHandlerImpl: public TaskSet::ErrorHandler {
public: public:
uint exceptionCount = 0; uint exceptionCount = 0;
......
...@@ -54,8 +54,6 @@ public: ...@@ -54,8 +54,6 @@ public:
Promise<short> onFdEvent(int fd, short eventMask); Promise<short> onFdEvent(int fd, short eventMask);
// `eventMask` is a bitwise-OR of poll events (e.g. `POLLIN`, `POLLOUT`, etc.). The next time // `eventMask` is a bitwise-OR of poll events (e.g. `POLLIN`, `POLLOUT`, etc.). The next time
// one or more of the given events occurs on `fd`, the set of events that occurred are returned. // one or more of the given events occurs on `fd`, the set of events that occurred are returned.
//
// The result of waiting on the same FD twice at once is undefined.
Promise<siginfo_t> onSignal(int signum); Promise<siginfo_t> onSignal(int signum);
// When the given signal is delivered to this thread, return the corresponding siginfo_t. // When the given signal is delivered to this thread, return the corresponding siginfo_t.
......
...@@ -508,6 +508,9 @@ void PromiseNode::OnReadyEvent::arm() { ...@@ -508,6 +508,9 @@ void PromiseNode::OnReadyEvent::arm() {
// ------------------------------------------------------------------- // -------------------------------------------------------------------
ImmediatePromiseNodeBase::ImmediatePromiseNodeBase() {}
ImmediatePromiseNodeBase::~ImmediatePromiseNodeBase() noexcept(false) {}
void ImmediatePromiseNodeBase::onReady(Event& event) noexcept { void ImmediatePromiseNodeBase::onReady(Event& event) noexcept {
event.armBreadthFirst(); event.armBreadthFirst();
} }
...@@ -807,6 +810,73 @@ PromiseNode* ExclusiveJoinPromiseNode::Branch::getInnerForTrace() { ...@@ -807,6 +810,73 @@ PromiseNode* ExclusiveJoinPromiseNode::Branch::getInnerForTrace() {
// ------------------------------------------------------------------- // -------------------------------------------------------------------
ArrayJoinPromiseNodeBase::ArrayJoinPromiseNodeBase(
Array<Own<PromiseNode>> promises, ExceptionOrValue* resultParts, size_t partSize)
: countLeft(promises.size()) {
// Make the branches.
auto builder = heapArrayBuilder<Branch>(promises.size());
for (uint i: indices(promises)) {
ExceptionOrValue& output = *reinterpret_cast<ExceptionOrValue*>(
reinterpret_cast<byte*>(resultParts) + i * partSize);
builder.add(*this, kj::mv(promises[i]), output);
}
branches = builder.finish();
if (branches.size() == 0) {
onReadyEvent.arm();
}
}
ArrayJoinPromiseNodeBase::~ArrayJoinPromiseNodeBase() noexcept(false) {}
void ArrayJoinPromiseNodeBase::onReady(Event& event) noexcept {
onReadyEvent.init(event);
}
void ArrayJoinPromiseNodeBase::get(ExceptionOrValue& output) noexcept {
// If any of the elements threw exceptions, propagate them.
for (auto& branch: branches) {
KJ_IF_MAYBE(exception, branch.getPart()) {
output.addException(kj::mv(*exception));
}
}
if (output.exception == nullptr) {
// No errors. The template subclass will need to fill in the result.
getNoError(output);
}
}
PromiseNode* ArrayJoinPromiseNodeBase::getInnerForTrace() {
return branches.size() == 0 ? nullptr : branches[0].getInnerForTrace();
}
ArrayJoinPromiseNodeBase::Branch::Branch(
ArrayJoinPromiseNodeBase& joinNode, Own<PromiseNode> dependencyParam, ExceptionOrValue& output)
: joinNode(joinNode), dependency(kj::mv(dependencyParam)), output(output) {
dependency->setSelfPointer(&dependency);
dependency->onReady(*this);
}
ArrayJoinPromiseNodeBase::Branch::~Branch() noexcept(false) {}
Maybe<Own<Event>> ArrayJoinPromiseNodeBase::Branch::fire() {
if (--joinNode.countLeft == 0) {
joinNode.onReadyEvent.arm();
}
return nullptr;
}
_::PromiseNode* ArrayJoinPromiseNodeBase::Branch::getInnerForTrace() {
return dependency->getInnerForTrace();
}
Maybe<Exception> ArrayJoinPromiseNodeBase::Branch::getPart() {
dependency->get(output);
return kj::mv(output.exception);
}
// -------------------------------------------------------------------
EagerPromiseNodeBase::EagerPromiseNodeBase( EagerPromiseNodeBase::EagerPromiseNodeBase(
Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef) Own<PromiseNode>&& dependencyParam, ExceptionOrValue& resultRef)
: dependency(kj::mv(dependencyParam)), resultRef(resultRef) { : dependency(kj::mv(dependencyParam)), resultRef(resultRef) {
......
...@@ -290,6 +290,8 @@ private: ...@@ -290,6 +290,8 @@ private:
friend class _::TaskSetImpl; friend class _::TaskSetImpl;
friend Promise<void> _::yield(); friend Promise<void> _::yield();
friend class _::NeverDone; friend class _::NeverDone;
template <typename U>
friend Promise<Array<U>> joinPromises(Array<Promise<U>>&& promises);
}; };
template <typename T> template <typename T>
...@@ -339,6 +341,10 @@ PromiseForResult<Func, void> evalLater(Func&& func); ...@@ -339,6 +341,10 @@ PromiseForResult<Func, void> evalLater(Func&& func);
// If you schedule several evaluations with `evalLater` during the same callback, they are // If you schedule several evaluations with `evalLater` during the same callback, they are
// guaranteed to be executed in order. // guaranteed to be executed in order.
template <typename T>
Promise<Array<T>> joinPromises(Array<Promise<T>>&& promises);
// Join an array of promises into a promise for an array.
// ======================================================================================= // =======================================================================================
// Hack for creating a lambda that holds an owned pointer. // Hack for creating a lambda that holds an owned pointer.
......
...@@ -25,8 +25,6 @@ ...@@ -25,8 +25,6 @@
// //
// This defines very simple utilities that are widely applicable. // This defines very simple utilities that are widely applicable.
#include <stddef.h>
#ifndef KJ_COMMON_H_ #ifndef KJ_COMMON_H_
#define KJ_COMMON_H_ #define KJ_COMMON_H_
...@@ -69,6 +67,9 @@ ...@@ -69,6 +67,9 @@
#endif #endif
#endif #endif
#include <stddef.h>
#include <initializer_list>
// ======================================================================================= // =======================================================================================
namespace kj { namespace kj {
...@@ -344,6 +345,29 @@ constexpr bool canConvert() { ...@@ -344,6 +345,29 @@ constexpr bool canConvert() {
return sizeof(CanConvert_<U>::sfinae(instance<T>())) == sizeof(int); return sizeof(CanConvert_<U>::sfinae(instance<T>())) == sizeof(int);
} }
#if __clang__
template <typename T>
constexpr bool canMemcpy() {
// Returns true if T can be copied using memcpy instead of using the copy constructor or
// assignment operator.
// Clang unhelpfully defines __has_trivial_{copy,assign}(T) to be true if the copy constructor /
// assign operator are deleted, on the basis that a strict reading of the definition of "trivial"
// according to the standard says that deleted functions are in fact trivial. Meanwhile Clang
// provides these admittedly-better intrinsics, but GCC does not.
return __is_trivially_constructible(T, const T&) && __is_trivially_assignable(T, const T&);
}
#else
template <typename T>
constexpr bool canMemcpy() {
// Returns true if T can be copied using memcpy instead of using the copy constructor or
// assignment operator.
// GCC defines these to mean what we want them to mean.
return __has_trivial_copy(T) && __has_trivial_assign(T);
}
#endif
// ======================================================================================= // =======================================================================================
// Equivalents to std::move() and std::forward(), since these are very commonly needed and the // Equivalents to std::move() and std::forward(), since these are very commonly needed and the
// std header <utility> pulls in lots of other stuff. // std header <utility> pulls in lots of other stuff.
...@@ -917,6 +941,8 @@ public: ...@@ -917,6 +941,8 @@ public:
inline constexpr ArrayPtr(decltype(nullptr)): ptr(nullptr), size_(0) {} inline constexpr ArrayPtr(decltype(nullptr)): ptr(nullptr), size_(0) {}
inline constexpr ArrayPtr(T* ptr, size_t size): ptr(ptr), size_(size) {} inline constexpr ArrayPtr(T* ptr, size_t size): ptr(ptr), size_(size) {}
inline constexpr ArrayPtr(T* begin, T* end): ptr(begin), size_(end - begin) {} inline constexpr ArrayPtr(T* begin, T* end): ptr(begin), size_(end - begin) {}
inline constexpr ArrayPtr(std::initializer_list<RemoveConstOrDisable<T>> init)
: ptr(init.begin()), size_(init.size()) {}
template <size_t size> template <size_t size>
inline constexpr ArrayPtr(T (&native)[size]): ptr(native), size_(size) {} inline constexpr ArrayPtr(T (&native)[size]): ptr(native), size_(size) {}
......
linux-gcc-4.7 14763 ./super-test.sh tmpdir capnp-gcc-4.7 linux-gcc-4.7 14779 ./super-test.sh tmpdir capnp-gcc-4.7
linux-gcc-4.8 13108 ./super-test.sh tmpdir capnp-gcc-4.8 gcc-4.8 linux-gcc-4.8 13122 ./super-test.sh tmpdir capnp-gcc-4.8 gcc-4.8
linux-clang 14891 ./super-test.sh tmpdir capnp-clang clang linux-clang 14907 ./super-test.sh tmpdir capnp-clang clang
mac 5740 ./super-test.sh remote beat caffeinate mac 5740 ./super-test.sh remote beat caffeinate
cygwin 6762 ./super-test.sh remote Kenton@flashman cygwin 6762 ./super-test.sh remote Kenton@flashman
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment