Commit 63c34d47 authored by Kenton Varda's avatar Kenton Varda

Implement byte streams over Cap'n Proto.

This implementation features path-shortening through pumps. That is, if an incoming Cap'n Proto stream wraps a KJ stream which ends up pumping to an outgoing Cap'n Proto stream, the incoming stream will be redirected directly to the outgoing stream, in such a way that the RPC system can recognize and reduce the number of network hops as appropriate.

This proved tricky due to the features of KJ's `pumpTo()`, in particular:
- The caller of `pumpTo()` expects eventually to be told how many bytes were pumped (e.g. before EOF was hit).
- A pump may have a specified length limit. In this case the rest of the stream can be pumped somewhere else.
- Multiple streams can be pumped to the same destination stream -- this implies that a pump does not propagate EOF.

These requirements mean that path-shortening is not as simple as redirecting the incoming stream to the outgoing. Intsead, we must first ask the outgoing stream's server to create a "substream" object, and then redirect to that. The substream can have a length limit, swallows EOF, informs the original creator on completion, and can even redirect *back* to the original creator to allow the stream to now pump somewhere else.
parent f203027c
This diff is collapsed.
This diff is collapsed.
@0x8f5d14e1c273738d;
$import "/capnp/c++.capnp".namespace("capnp");
interface ByteStream {
write @0 (bytes :Data) -> stream;
# Write a chunk.
end @1 ();
# Signals clean EOF. (If the ByteStream is dropped without calling this, then the stream was
# prematurely canceled and so thet body should not be considered complete.)
getSubstream @2 (callback :SubstreamCallback,
limit :UInt64 = 0xffffffffffffffff) -> (substream :ByteStream);
# This method is used to implement path shortening optimization. It is designed in particular
# with KJ streams' pumpTo() in mind.
#
# getSubstream() returns a new stream object that can be used to write to the same destination
# as this stream. The substream will operate until it has received `limit` bytes, or its `end()`
# method has been called, whichever occurs first. At that time, it invokes one of the methods of
# `callback` based on the termination condition.
#
# While a substream is active, it is an error to call write() on the original stream. Doing so
# may throw an exception or may arbitrarily interleave bytes with the substream's writes.
interface SubstreamCallback {
ended @0 (byteCount :UInt64);
# `end()` was called on the substream after writing `byteCount` bytes. The `end()` call was
# NOT forwarded to the underlying stream, which remains open.
reachedLimit @1 () -> (next :ByteStream);
# The number of bytes specified by the `limit` parameter of `getSubstream()` was reached.
# The substream will "resolve itself" to `next`, so that all future calls to the substream
# are forwarded to `next`.
#
# If the `write()` call which reached the limit included bytes past the limit, then the first
# `write()` call to `next` will be for those leftover bytes.
}
}
// Copyright (c) 2019 Cloudflare, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#pragma once
// Bridges from KJ streams to Cap'n Proto ByteStream RPC protocol.
#include <capnp/compat/byte-stream.capnp.h>
#include <kj/async-io.h>
namespace capnp {
class ByteStreamFactory {
// In order to allow path-shortening through KJ, a common factory must be used for converting
// between RPC ByteStreams and KJ streams.
public:
capnp::ByteStream::Client kjToCapnp(kj::Own<kj::AsyncOutputStream> kjStream);
kj::Own<kj::AsyncOutputStream> capnpToKj(capnp::ByteStream::Client capnpStream);
private:
CapabilityServerSet<capnp::ByteStream> streamSet;
class StreamServerBase;
class SubstreamImpl;
class CapnpToKjStreamAdapter;
class KjToCapnpStreamAdapter;
};
} // namespace capnp
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment