Commit bd6d75ba authored by Kenton Varda's avatar Kenton Varda

Introduce new 'stream' keyword.

This can be used on a method to indicate that it is used for "streaming", like:

    write @0 (bytes :Data) -> stream;

A "streaming" method is one which is expected to be called many times to transmit an ordered stream of items. For best throughput, it is often necessary to make multiple overlapping calls, so as not to wait for a round trip for every item. However, to avoid excess buffering, it may be necessary to apply backpressure by having the client limit the total number of overlapping calls. This logic is difficult to get right at the application level, so making it a language feature gives us the opportunity to implement it in the RPC layer.

We can, however, do it in a way that is backwards-compatible with implementations that don't support it. The above declaration is equivalent to:

    write @0 (bytes :Data) -> import "/capnp/stream.capnp".StreamResult;

RPC implementations that don't explicitly support streaming can thus instead leave it up to the application to handle.
parent 2ae7ca9b
......@@ -5,7 +5,7 @@ set -euo pipefail
export PATH=$PWD/bin:$PWD:$PATH
capnp compile -Isrc --no-standard-import --src-prefix=src -oc++:src \
src/capnp/c++.capnp src/capnp/schema.capnp \
src/capnp/c++.capnp src/capnp/schema.capnp src/capnp/stream.capnp \
src/capnp/compiler/lexer.capnp src/capnp/compiler/grammar.capnp \
src/capnp/rpc.capnp src/capnp/rpc-twoparty.capnp src/capnp/persistent.capnp \
src/capnp/compat/json.capnp
......@@ -51,8 +51,9 @@ fi
mkdir -p tmp/capnp/bootstrap-test-tmp
INPUTS="capnp/c++.capnp capnp/schema.capnp capnp/compiler/lexer.capnp capnp/compiler/grammar.capnp \
capnp/rpc.capnp capnp/rpc-twoparty.capnp capnp/persistent.capnp"
INPUTS="capnp/c++.capnp capnp/schema.capnp capnp/stream.capnp capnp/compiler/lexer.capnp \
capnp/compiler/grammar.capnp capnp/rpc.capnp capnp/rpc-twoparty.capnp capnp/persistent.capnp \
capnp/compat/json.capnp"
SRC_INPUTS=""
for file in $INPUTS; do
......
......@@ -33,6 +33,7 @@ INPUT=$1
case "$INPUT" in
*capnp/c++.capnp | \
*capnp/schema.capnp | \
*capnp/stream.capnp | \
*capnp/rpc.capnp | \
*capnp/rpc-twoparty.capnp | \
*capnp/persistent.capnp | \
......
......@@ -114,5 +114,5 @@ test_eval 'TestListDefaults.lists.int32ListList[2][0]' 12341234
test "x`$CAPNP eval $SCHEMA -ojson globalPrintableStruct | tr -d '\r'`" = "x{\"someText\": \"foo\"}" || fail eval json "globalPrintableStruct == {someText = \"foo\"}"
$CAPNP compile --src-prefix="$PREFIX" -ofoo $TESTDATA/errors.capnp.nobuild 2>&1 | sed -e "s,^.*errors[.]capnp[.]nobuild:,file:,g" | tr -d '\r' |
$CAPNP compile --no-standard-import --src-prefix="$PREFIX" -ofoo $TESTDATA/errors.capnp.nobuild 2>&1 | sed -e "s,^.*errors[.]capnp[.]nobuild:,file:,g" | tr -d '\r' |
cmp $TESTDATA/errors.txt - || fail error output
......@@ -39,6 +39,7 @@
#include <kj/main.h>
#include <algorithm>
#include <map>
#include <capnp/stream.capnp.h>
#if HAVE_CONFIG_H
#include "config.h"
......@@ -493,7 +494,9 @@ private:
kj::StringTree genParamList(InterfaceSchema interface, StructSchema schema,
schema::Brand::Reader brand, InterfaceSchema::Method method) {
if (schema.getProto().getScopeId() == 0) {
if (schema.getProto().getId() == typeId<StreamResult>()) {
return kj::strTree("stream");
} else if (schema.getProto().getScopeId() == 0) {
// A named parameter list.
return kj::strTree("(", kj::StringTree(
KJ_MAP(field, schema.getFields()) {
......
......@@ -1048,6 +1048,25 @@ static void findImports(Expression::Reader exp, std::set<kj::StringPtr>& output)
}
}
static void findImports(Declaration::ParamList::Reader paramList, std::set<kj::StringPtr>& output) {
switch (paramList.which()) {
case Declaration::ParamList::NAMED_LIST:
for (auto param: paramList.getNamedList()) {
findImports(param.getType(), output);
for (auto ann: param.getAnnotations()) {
findImports(ann.getName(), output);
}
}
break;
case Declaration::ParamList::TYPE:
findImports(paramList.getType(), output);
break;
case Declaration::ParamList::STREAM:
output.insert("/capnp/stream.capnp");
break;
}
}
static void findImports(Declaration::Reader decl, std::set<kj::StringPtr>& output) {
switch (decl.which()) {
case Declaration::USING:
......@@ -1067,30 +1086,9 @@ static void findImports(Declaration::Reader decl, std::set<kj::StringPtr>& outpu
case Declaration::METHOD: {
auto method = decl.getMethod();
auto params = method.getParams();
if (params.isNamedList()) {
for (auto param: params.getNamedList()) {
findImports(param.getType(), output);
for (auto ann: param.getAnnotations()) {
findImports(ann.getName(), output);
}
}
} else {
findImports(params.getType(), output);
}
findImports(method.getParams(), output);
if (method.getResults().isExplicit()) {
auto results = method.getResults().getExplicit();
if (results.isNamedList()) {
for (auto param: results.getNamedList()) {
findImports(param.getType(), output);
for (auto ann: param.getAnnotations()) {
findImports(ann.getName(), output);
}
}
} else {
findImports(results.getType(), output);
}
findImports(method.getResults().getExplicit(), output);
}
break;
}
......
......@@ -244,6 +244,9 @@ struct Declaration {
type @1 :Expression;
# Specified some other struct type instead of a named list.
stream @4 :Void;
# The keyword "stream".
}
startByte @2 :UInt32;
......
......@@ -28,6 +28,7 @@
#include <set>
#include <map>
#include <stdlib.h>
#include <capnp/stream.capnp.h>
namespace capnp {
namespace compiler {
......@@ -2384,9 +2385,13 @@ void NodeTranslator::compileInterface(Declaration::Interface::Reader decl,
implicitsBuilder[i].setName(implicits[i].getName());
}
auto params = methodReader.getParams();
if (params.isStream()) {
errorReporter.addErrorOn(params, "'stream' can only appear after '->', not before.");
}
methodBuilder.setParamStructType(compileParamList(
methodDecl.getName().getValue(), ordinal, false,
methodReader.getParams(), implicits,
params, implicits,
[&]() { return methodBuilder.initParamBrand(); }));
auto results = methodReader.getResults();
......@@ -2478,6 +2483,20 @@ uint64_t NodeTranslator::compileParamList(
}
}
return 0;
case Declaration::ParamList::STREAM:
KJ_IF_MAYBE(streamCapnp, resolver.resolveImport("/capnp/stream.capnp")) {
if (streamCapnp->resolver->resolveMember("StreamResult") == nullptr) {
errorReporter.addErrorOn(paramList,
"The version of '/capnp/stream.capnp' found in your import path does not appear "
"to be the official one; it is missing the declaration of StreamResult.");
}
} else {
errorReporter.addErrorOn(paramList,
"A method declaration uses streaming, but '/capnp/stream.capnp' is not found "
"in the import path. This is a standard file that should always be installed "
"with the Cap'n Proto compiler.");
}
return typeId<StreamResult>();
}
KJ_UNREACHABLE;
}
......
......@@ -218,6 +218,27 @@ constexpr auto op(const char* expected)
return p::transformOrReject(operatorToken, ExactString(expected));
}
class LocatedExactString {
public:
constexpr LocatedExactString(const char* expected): expected(expected) {}
kj::Maybe<Located<Text::Reader>> operator()(Located<Text::Reader>&& text) const {
if (text.value == expected) {
return kj::mv(text);
} else {
return nullptr;
}
}
private:
const char* expected;
};
constexpr auto locatedKeyword(const char* expected)
-> decltype(p::transformOrReject(identifier, LocatedExactString(expected))) {
return p::transformOrReject(identifier, LocatedExactString(expected));
}
// =======================================================================================
template <typename ItemParser>
......@@ -856,6 +877,14 @@ CapnpParser::CapnpParser(Orphanage orphanageParam, ErrorReporter& errorReporterP
}
return decl;
}),
p::transform(locatedKeyword("stream"),
[this](Located<Text::Reader>&& kw) -> Orphan<Declaration::ParamList> {
auto decl = orphanage.newOrphan<Declaration::ParamList>();
auto builder = decl.get();
kw.copyLocationTo(builder);
builder.setStream();
return decl;
}),
p::transform(parsers.expression,
[this](Orphan<Expression>&& name) -> Orphan<Declaration::ParamList> {
auto decl = orphanage.newOrphan<Declaration::ParamList>();
......
......@@ -389,6 +389,17 @@ TEST(SchemaLoader, Generics) {
}
}
TEST(SchemaLoader, LoadStreaming) {
SchemaLoader loader;
InterfaceSchema schema =
loader.load(Schema::from<test::TestStreaming>().getProto()).asInterface();
auto results = schema.getMethodByName("doStreamI").getResultType();
KJ_EXPECT(results.isStreamResult());
KJ_EXPECT(results.getShortDisplayName() == "StreamResult", results.getShortDisplayName());
}
} // namespace
} // namespace _ (private)
} // namespace capnp
......@@ -29,6 +29,7 @@
#include <kj/vector.h>
#include <algorithm>
#include <kj/map.h>
#include <capnp/stream.capnp.h>
#if _MSC_VER
#include <atomic>
......@@ -1728,9 +1729,17 @@ void SchemaLoader::Impl::makeDep(_::RawBrandedSchema::Binding& result,
uint64_t typeId, schema::Type::Which whichType, schema::Node::Which expectedKind,
schema::Brand::Reader brand, kj::StringPtr scopeName,
kj::Maybe<kj::ArrayPtr<const _::RawBrandedSchema::Scope>> brandBindings) {
const _::RawSchema* schema = loadEmpty(typeId,
kj::str("(unknown type; seen as dependency of ", scopeName, ")"),
expectedKind, true);
const _::RawSchema* schema;
if (typeId == capnp::typeId<StreamResult>()) {
// StreamResult is a very special type that is used to mark when a method is declared as
// streaming ("foo @0 () -> stream;"). We like to auto-load it if we see it as someone's
// dependency.
schema = loadNative(&_::rawSchema<StreamResult>());
} else {
schema = loadEmpty(typeId,
kj::str("(unknown type; seen as dependency of ", scopeName, ")"),
expectedKind, true);
}
result.which = static_cast<uint8_t>(whichType);
result.schema = makeBranded(schema, brand, brandBindings);
}
......
......@@ -22,6 +22,7 @@
#include "schema.h"
#include "message.h"
#include <kj/debug.h>
#include <capnp/stream.capnp.h>
namespace capnp {
......@@ -503,6 +504,11 @@ kj::Maybe<StructSchema::Field> StructSchema::getFieldByDiscriminant(uint16_t dis
}
}
bool StructSchema::isStreamResult() const {
auto& streamRaw = _::rawSchema<StreamResult>();
return raw->generic == &streamRaw || raw->generic->canCastTo == &streamRaw;
}
Type StructSchema::Field::getType() const {
auto proto = getProto();
uint location = _::RawBrandedSchema::makeDepLocation(_::RawBrandedSchema::DepKind::FIELD, index);
......
......@@ -260,6 +260,9 @@ public:
// there is no such field. (If the schema does not represent a union or a struct containing
// an unnamed union, then this always returns null.)
bool isStreamResult() const;
// Convenience method to check if this is the result type of a streaming RPC method.
private:
StructSchema(Schema base): Schema(base) {}
template <typename T> static inline StructSchema fromImpl() {
......@@ -491,6 +494,9 @@ public:
inline uint16_t getOrdinal() const { return ordinal; }
inline uint getIndex() const { return ordinal; }
bool isStreaming() const { return getResultType().isStreamResult(); }
// Check if this is a streaming method.
StructSchema getParamType() const;
StructSchema getResultType() const;
// Get the parameter and result types, including substituting generic parameters.
......
# Copyright (c) 2019 Cloudflare, Inc. and contributors
# Licensed under the MIT License:
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
@0x86c366a91393f3f8;
# Defines placeholder types used to provide backwards-compatibility while introducing streaming
# to the language. The goal is that old code generators that don't know about streaming can still
# generate code that functions, leaving it up to the application to implement flow control
# manually.
$import "/capnp/c++.capnp".namespace("capnp");
struct StreamResult @0x995f9a3377c0b16e {
# Empty struct that serves as the return type for "streaming" methods.
#
# Defining a method like:
#
# write @0 (bytes :Data) -> stream;
#
# Is equivalent to:
#
# write @0 (bytes :Data) -> import "/capnp/stream.capnp".StreamResult;
#
# However, implementations that recognize streaming will elide the reference to StreamResult
# and instead give write() a different signature appropriate for streaming.
#
# Streaming methods do not return a result -- that is, they return Promise<void>. This promise
# resolves not to indicate that the call was actually delivered, but instead to provide
# backpressure. When the previous call's promise resolves, it is time to make another call. On
# the client side, the RPC system will resolve promises immediately until an appropriate number
# of requests are in-flight, and then will delay promise resolution to apply back-pressure.
# On the server side, the RPC system will deliver one call at a time.
}
......@@ -818,6 +818,13 @@ interface TestTailCaller {
foo @0 (i :Int32, callee :TestTailCallee) -> TestTailCallee.TailResult;
}
interface TestStreaming {
doStreamI @0 (i :UInt32) -> stream;
doStreamJ @1 (j :UInt32) -> stream;
finishStream @2 () -> (totalI :UInt32, totalJ :UInt32);
# Test streaming. finishStream() returns the totals of the values streamed to the other calls.
}
interface TestHandle {}
interface TestMoreStuff extends(TestCallOrder) {
......
......@@ -158,4 +158,5 @@ using Baz = import "nosuchfile-unused.capnp".Baz;
interface TestInterface {
foo @0 (a :UInt32 = null);
bar @1 stream -> ();
}
......@@ -57,4 +57,6 @@ file:150:38-43: error: Sorry, only pointer types can be used as generic paramete
file:153:30-44: error: Embeds can only be used when Text, Data, or a struct is expected.
file:154:37-51: error: Couldn't read file for embed: no-such-file
file:160:23-27: error: Only pointer parameters can declare their default as 'null'.
file:161:10-16: error: 'stream' can only appear after '->', not before.
file:161:10-16: error: A method declaration uses streaming, but '/capnp/stream.capnp' is not found in the import path. This is a standard file that should always be installed with the Cap'n Proto compiler.
file:156:20-45: error: Import failed: nosuchfile-unused.capnp
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment