Commit 55d661f7 authored by Kenton Varda's avatar Kenton Varda

Actually use tail calls when an RPC call bounces back over the same connection.

parent 312c10f5
......@@ -109,21 +109,24 @@ public:
}
return responseBuilder;
}
kj::Promise<void> tailCall(kj::Own<RequestHook> request) override {
kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
auto result = directTailCall(kj::mv(request));
KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
f->get()->fulfill(ObjectPointer::Pipeline(kj::mv(result.pipeline)));
}
return kj::mv(result.promise);
}
ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
KJ_REQUIRE(response == nullptr, "Can't call tailCall() after initializing the results struct.");
releaseParams();
auto promise = request->send();
// Link pipelines.
KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
f->get()->fulfill(kj::mv(kj::implicitCast<ObjectPointer::Pipeline&>(promise)));
}
// Wait for response.
return promise.then([this](Response<ObjectPointer>&& tailResponse) {
auto voidPromise = promise.then([this](Response<ObjectPointer>&& tailResponse) {
response = kj::mv(tailResponse);
});
return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
}
kj::Promise<ObjectPointer::Pipeline> onTailCall() override {
auto paf = kj::newPromiseAndFulfiller<ObjectPointer::Pipeline>();
......
......@@ -85,6 +85,7 @@ private:
friend struct DynamicCapability;
template <typename, typename>
friend class CallContext;
friend class RequestHook;
};
template <typename Results>
......@@ -315,6 +316,11 @@ public:
// Returns a void* that identifies who made this request. This can be used by an RPC adapter to
// discover when tail call is going to be sent over its own connection and therefore can be
// optimized into a remote tail call.
template <typename T, typename U>
inline static kj::Own<RequestHook> from(Request<T, U>&& request) {
return kj::mv(request.hook);
}
};
class ResponseHook {
......@@ -386,7 +392,7 @@ public:
virtual ObjectPointer::Reader getParams() = 0;
virtual void releaseParams() = 0;
virtual ObjectPointer::Builder getResults(uint firstSegmentWordSize) = 0;
virtual kj::Promise<void> tailCall(kj::Own<RequestHook> request) = 0;
virtual kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) = 0;
virtual void allowAsyncCancellation() = 0;
virtual bool isCanceled() = 0;
......@@ -394,6 +400,11 @@ public:
// If `tailCall()` is called, resolves to the PipelineHook from the tail call. An
// implementation of `ClientHook::call()` is allowed to call this at most once.
virtual ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) = 0;
// Call this when you would otherwise call onTailCall() immediately followed by tailCall().
// Implementations of tailCall() should typically call directTailCall() and then fulfill the
// promise fulfiller for onTailCall() with the returned pipeline.
virtual kj::Own<CallContextHook> addRef() = 0;
};
......
......@@ -36,44 +36,7 @@ class InterfaceSchema;
class Orphanage;
class ClientHook;
class PipelineHook;
// =======================================================================================
// Pipeline helpers
//
// These relate to capabilities, but we don't declare them in capability.h because generated code
// for structs needs to know about these, even in files that contain no interfaces.
struct PipelineOp {
// Corresponds to rpc.capnp's PromisedAnswer.Op.
enum Type {
NOOP, // for convenience
GET_POINTER_FIELD
// There may be other types in the future...
};
Type type;
union {
uint16_t pointerIndex; // for GET_POINTER_FIELD
};
};
class PipelineHook {
// Represents a currently-running call, and implements pipelined requests on its result.
public:
virtual kj::Own<const PipelineHook> addRef() const = 0;
// Increment this object's reference count.
virtual kj::Own<const ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) const = 0;
// Extract a promised Capability from the results.
virtual kj::Own<const ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) const;
// Version of getPipelinedCap() passing the array by move. May avoid a copy in some cases.
// Default implementation just calls the other version.
};
struct PipelineOp;
// =======================================================================================
// ObjectPointer!
......@@ -249,6 +212,7 @@ struct ObjectPointer {
: hook(kj::mv(hook)), ops(kj::mv(ops)) {}
friend class LocalClient;
friend class PipelineHook;
};
};
......@@ -316,6 +280,48 @@ private:
friend class ObjectPointer::Builder;
};
// =======================================================================================
// Pipeline helpers
//
// These relate to capabilities, but we don't declare them in capability.h because generated code
// for structs needs to know about these, even in files that contain no interfaces.
struct PipelineOp {
// Corresponds to rpc.capnp's PromisedAnswer.Op.
enum Type {
NOOP, // for convenience
GET_POINTER_FIELD
// There may be other types in the future...
};
Type type;
union {
uint16_t pointerIndex; // for GET_POINTER_FIELD
};
};
class PipelineHook {
// Represents a currently-running call, and implements pipelined requests on its result.
public:
virtual kj::Own<const PipelineHook> addRef() const = 0;
// Increment this object's reference count.
virtual kj::Own<const ClientHook> getPipelinedCap(kj::ArrayPtr<const PipelineOp> ops) const = 0;
// Extract a promised Capability from the results.
virtual kj::Own<const ClientHook> getPipelinedCap(kj::Array<PipelineOp>&& ops) const;
// Version of getPipelinedCap() passing the array by move. May avoid a copy in some cases.
// Default implementation just calls the other version.
static inline kj::Own<const PipelineHook> from(ObjectPointer::Pipeline&& pipeline) {
return kj::mv(pipeline.hook);
}
};
// =======================================================================================
// Inline implementation details
......
......@@ -75,7 +75,9 @@ public:
auto paramType = schema.getDependency(methodProto.getParamStructType()).asStruct();
auto resultType = schema.getDependency(methodProto.getResultStructType()).asStruct();
returnTypes[std::make_pair(sender, call.getQuestionId())] = resultType;
if (call.getSendResultsTo().isCaller()) {
returnTypes[std::make_pair(sender, call.getQuestionId())] = resultType;
}
CapExtractorImpl extractor;
CapReaderContext context(extractor);
......
......@@ -691,8 +691,8 @@ private:
size_t sizeHint = params.targetSizeInWords();
// TODO(perf): Extend targetSizeInWords() to include a capability count? Here we increase
// the size by 1/16 to deal with cap descriptors possibly expanding. See also below, when
// handling the response, and in RpcRequest::send().
// the size by 1/16 to deal with cap descriptors possibly expanding. See also in
// RpcRequest::send() and RpcCallContext::directTailCall().
sizeHint += sizeHint / 16;
// Don't overflow.
......@@ -708,26 +708,7 @@ private:
// We can and should propagate cancellation.
context->allowAsyncCancellation();
auto promise = request.send();
auto pipeline = promise.releasePipelineHook();
auto voidPromise = promise.thenInAnyThread(kj::mvCapture(context,
[](kj::Own<CallContextHook>&& context, Response<ObjectPointer> response) {
size_t sizeHint = response.targetSizeInWords();
// See above TODO.
sizeHint += sizeHint / 16;
// Don't overflow.
if (uint(sizeHint) != sizeHint) {
sizeHint = ~uint(0);
}
context->getResults(sizeHint).set(response);
}));
return { kj::mv(voidPromise), kj::mv(pipeline) };
return context->directTailCall(RequestHook::from(kj::mv(request)));
}
kj::Own<const ClientHook> addRef() const override {
......@@ -2068,7 +2049,14 @@ private:
return results;
}
}
kj::Promise<void> tailCall(kj::Own<RequestHook> request) override {
kj::Promise<void> tailCall(kj::Own<RequestHook>&& request) override {
auto result = directTailCall(kj::mv(request));
KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
f->get()->fulfill(ObjectPointer::Pipeline(kj::mv(result.pipeline)));
}
return kj::mv(result.promise);
}
ClientHook::VoidPromiseAndPipeline directTailCall(kj::Own<RequestHook>&& request) override {
KJ_REQUIRE(response == nullptr,
"Can't call tailCall() after initializing the results struct.");
releaseParams();
......@@ -2078,11 +2066,6 @@ private:
// optimize out the return trip.
KJ_IF_MAYBE(tailInfo, kj::downcast<RpcRequest>(*request).tailSend()) {
// Link pipelines.
KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
f->get()->fulfill(ObjectPointer::Pipeline(kj::mv(tailInfo->pipeline)));
}
if (isFirstResponder()) {
auto message = connectionState->connection->newOutgoingMessage(
messageSizeHint<rpc::Return>() + requestCapExtractor.retainedListSizeHint(true));
......@@ -2097,25 +2080,24 @@ private:
message->send();
cleanupAnswerTable(connectionState->tables.lockExclusive(), nullptr);
}
return kj::mv(tailInfo->promise);
return { kj::mv(tailInfo->promise), kj::mv(tailInfo->pipeline) };
}
}
// Just forwarding to another local call.
auto promise = request->send();
// Link pipelines.
KJ_IF_MAYBE(f, tailCallPipelineFulfiller) {
f->get()->fulfill(kj::mv(kj::implicitCast<ObjectPointer::Pipeline&>(promise)));
}
// Wait for response.
return promise.then([this](Response<ObjectPointer>&& tailResponse) {
auto voidPromise = promise.then([this](Response<ObjectPointer>&& tailResponse) {
// Copy the response.
// TODO(perf): It would be nice if we could somehow make the response get built in-place
// but requires some refactoring.
getResults(tailResponse.targetSizeInWords()).set(tailResponse);
size_t sizeHint = tailResponse.targetSizeInWords();
sizeHint += sizeHint / 16; // see TODO in RpcClient::call().
getResults(sizeHint).set(tailResponse);
});
return { kj::mv(voidPromise), PipelineHook::from(kj::mv(promise)) };
}
kj::Promise<ObjectPointer::Pipeline> onTailCall() override {
auto paf = kj::newPromiseAndFulfiller<ObjectPointer::Pipeline>();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment