Unverified Commit 181cc66a authored by Kenton Varda's avatar Kenton Varda Committed by GitHub

Merge pull request #899 from capnproto/http-over-rpc-fixes

Various bug fixes
parents 379a8444 6327f6b7
...@@ -603,6 +603,8 @@ protected: ...@@ -603,6 +603,8 @@ protected:
KJ_CASE_ONEOF(capnpStream, capnp::ByteStream::Client) { KJ_CASE_ONEOF(capnpStream, capnp::ByteStream::Client) {
auto params = context.getParams(); auto params = context.getParams();
auto req = capnpStream.getSubstreamRequest(params.totalSize()); auto req = capnpStream.getSubstreamRequest(params.totalSize());
req.setCallback(params.getCallback());
req.setLimit(params.getLimit());
return context.tailCall(kj::mv(req)); return context.tailCall(kj::mv(req));
} }
KJ_CASE_ONEOF(b, Borrowed) { KJ_CASE_ONEOF(b, Borrowed) {
......
...@@ -449,12 +449,13 @@ public: ...@@ -449,12 +449,13 @@ public:
kj::HttpService& kjService) kj::HttpService& kjService)
: factory(factory), : factory(factory),
method(validateMethod(request.getMethod())), method(validateMethod(request.getMethod())),
url(kj::str(request.getUrl())),
headers(factory.headersToKj(request.getHeaders()).clone()), headers(factory.headersToKj(request.getHeaders()).clone()),
clientContext(kj::mv(clientContext)), clientContext(kj::mv(clientContext)),
// Note we attach `requestBodyIn` to `task` so that we will implicitly cancel reading // Note we attach `requestBodyIn` to `task` so that we will implicitly cancel reading
// the request body as soon as the service returns. This is important in particular when // the request body as soon as the service returns. This is important in particular when
// the request body is not fully consumed, in order to propagate cancellation. // the request body is not fully consumed, in order to propagate cancellation.
task(kjService.request(method, request.getUrl(), headers, *requestBodyIn, *this) task(kjService.request(method, url, headers, *requestBodyIn, *this)
.attach(kj::mv(requestBodyIn))) {} .attach(kj::mv(requestBodyIn))) {}
KJ_DISALLOW_COPY(ServerRequestContextImpl); KJ_DISALLOW_COPY(ServerRequestContextImpl);
...@@ -544,6 +545,7 @@ public: ...@@ -544,6 +545,7 @@ public:
private: private:
HttpOverCapnpFactory& factory; HttpOverCapnpFactory& factory;
kj::HttpMethod method; kj::HttpMethod method;
kj::String url;
kj::HttpHeaders headers; kj::HttpHeaders headers;
capnp::HttpService::ClientRequestContext::Client clientContext; capnp::HttpService::ClientRequestContext::Client clientContext;
kj::Promise<void> task; kj::Promise<void> task;
......
...@@ -262,7 +262,7 @@ public: ...@@ -262,7 +262,7 @@ public:
} }
void releaseParams() override { void releaseParams() override {
KJ_REQUIRE(!releasedParams); // Note that releaseParams() is idempotent -- it can be called multiple times.
releasedParams = true; releasedParams = true;
inner->releaseParams(); inner->releaseParams();
} }
......
...@@ -3059,7 +3059,12 @@ private: ...@@ -3059,7 +3059,12 @@ private:
} }
kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override { kj::Promise<void> close(uint16_t code, kj::StringPtr reason) override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress"); KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
return canceler.wrap(output.close(code, reason)); return canceler.wrap(output.close(code, reason).then([this]() {
// A pump is expected to end upon seeing a Close message.
canceler.release();
pipe.endState(*this);
fulfiller.fulfill();
}));
} }
kj::Promise<void> disconnect() override { kj::Promise<void> disconnect() override {
KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress"); KJ_REQUIRE(canceler.isEmpty(), "another message send is already in progress");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment