Commit bb83561c authored by Kenton Varda's avatar Kenton Varda

Fix streaming: RpcFlowController::send() must send immediately.

The documentation for this method clearly says that sending the message cannot be delayed because ordering may matter. Only resolving of the returned promise can be delayed to implement flow control.
parent 77da9266
......@@ -2789,7 +2789,7 @@ private:
answerToRelease = answers.erase(finish.getQuestionId());
} else {
KJ_REQUIRE(answer->active, "'Finish' for invalid question ID.") { return; }
KJ_FAIL_REQUIRE("'Finish' for invalid question ID.") { return; }
......@@ -3119,18 +3119,53 @@ class WindowFlowController final: public RpcFlowController, private kj::TaskSet:
WindowFlowController(RpcFlowController::WindowGetter& windowGetter)
: windowGetter(windowGetter), tasks(*this) {
kj::Promise<void> send(kj::Own<OutgoingRpcMessage> message, kj::Promise<void> ack) override {
auto size = message->sizeInWords() * sizeof(capnp::word);
maxMessageSize = kj::max(size, maxMessageSize);
// We are REQUIRED to send the message NOW to maintain correct ordering.
inFlight += size;
tasks.add(ack.then([this, size]() {
inFlight -= size;
KJ_CASE_ONEOF(blockedSends, Running) {
if (isReady()) {
// Release all fulfillers.
for (auto& fulfiller: blockedSends) {
KJ_IF_MAYBE(f, emptyFulfiller) {
if (inFlight == 0) {
KJ_CASE_ONEOF(exception, kj::Exception) {
// A previous call failed, but this one -- which was already in-flight at the time --
// ended up succeeding. That may indicate that the server side is not properly
// handling streaming error propagation. Nothing much we can do about it here though.
KJ_CASE_ONEOF(queue, std::queue<QueuedMessage>) {
auto size = message->sizeInWords() * sizeof(capnp::word);
maxMessageSize = kj::max(size, maxMessageSize);
auto paf = kj::newPromiseAndFulfiller<void>();
queue.push({kj::mv(message), kj::mv(ack), kj::mv(paf.fulfiller), size});
return kj::mv(paf.promise);
KJ_CASE_ONEOF(blockedSends, Running) {
if (isReady()) {
return kj::READY_NOW;
} else {
auto paf = kj::newPromiseAndFulfiller<void>();
return kj::mv(paf.promise);
KJ_CASE_ONEOF(exception, kj::Exception) {
return kj::cp(exception);
......@@ -3140,7 +3175,7 @@ public:
kj::Promise<void> waitAllAcked() override {
KJ_IF_MAYBE(q, state.tryGet<std::queue<QueuedMessage>>()) {
KJ_IF_MAYBE(q, state.tryGet<Running>()) {
if (!q->empty()) {
auto paf = kj::newPromiseAndFulfiller<kj::Promise<void>>();
emptyFulfiller = kj::mv(paf.fulfiller);
......@@ -3155,63 +3190,19 @@ private:
size_t inFlight = 0;
size_t maxMessageSize = 0;
struct QueuedMessage {
kj::Own<OutgoingRpcMessage> message;
kj::Promise<void> ack;
kj::Own<kj::PromiseFulfiller<void>> sentFulfiller;
size_t size;
kj::OneOf<std::queue<QueuedMessage>, kj::Exception> state;
typedef kj::Vector<kj::Own<kj::PromiseFulfiller<void>>> Running;
kj::OneOf<Running, kj::Exception> state;
kj::Maybe<kj::Own<kj::PromiseFulfiller<kj::Promise<void>>>> emptyFulfiller;
kj::TaskSet tasks;
void pumpQueue(std::queue<QueuedMessage>& queue) {
size_t window = windowGetter.getWindow();
// We extend the window by maxMessageSize to avoid a pathological situation when a message
// is larger than the window size. Otherwise, after sending that message, we would end up
// not sending any others until the ack was received, wasting a round trip's worth of
// bandwidth.
while (!queue.empty() && inFlight < window + maxMessageSize) {
auto front = kj::mv(queue.front());
front.sentFulfiller->rejectIfThrows([&]() {
inFlight += front.size;
tasks.add(front.ack.then([this, size = front.size]() {
inFlight -= size;
KJ_CASE_ONEOF(queue, std::queue<QueuedMessage>) {
KJ_CASE_ONEOF(exception, kj::Exception) {
// A previous call failed, but this one -- which was already in-flight at the time --
// ended up succeeding. That may indicate that the server side is not properly
// handling streaming error propagation. Nothing much we can do about it here though.
KJ_IF_MAYBE(f, emptyFulfiller) {
if (queue.empty()) {
void taskFailed(kj::Exception&& exception) override {
KJ_CASE_ONEOF(queue, std::queue<QueuedMessage>) {
KJ_CASE_ONEOF(blockedSends, Running) {
// Fail out all pending sends.
while (!queue.empty()) {
for (auto& fulfiller: blockedSends) {
// Fail out all future sends.
state = kj::mv(exception);
......@@ -3221,6 +3212,15 @@ private:
bool isReady() {
// We extend the window by maxMessageSize to avoid a pathological situation when a message
// is larger than the window size. Otherwise, after sending that message, we would end up
// not sending any others until the ack was received, wasting a round trip's worth of
// bandwidth.
return inFlight <= maxMessageSize // avoid getWindow() call if unnecessary
|| inFlight < windowGetter.getWindow() + maxMessageSize;
class FixedWindowFlowController final
