Commit f93ea92c authored by Kenton Varda's avatar Kenton Varda

Fix RPC oversized message handling with -fno-exceptions.

parent 41c15b12
...@@ -351,13 +351,14 @@ TEST(TwoPartyNetwork, HugeMessage) { ...@@ -351,13 +351,14 @@ TEST(TwoPartyNetwork, HugeMessage) {
{ {
auto req = client.methodWithDefaultsRequest(); auto req = client.methodWithDefaultsRequest();
req.initA(100000000); // 100 MB req.initA(100000000); // 100 MB
KJ_EXPECT_THROW_MESSAGE("larger than the single-message size limit",
req.send().wait(ioContext.waitScope)); KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("larger than the single-message size limit",
req.send().ignoreResult().wait(ioContext.waitScope));
} }
// Oversized response fails. // Oversized response fails.
KJ_EXPECT_THROW_MESSAGE("larger than the single-message size limit", KJ_EXPECT_THROW_RECOVERABLE_MESSAGE("larger than the single-message size limit",
client.getEnormousStringRequest().send().wait(ioContext.waitScope)); client.getEnormousStringRequest().send().ignoreResult().wait(ioContext.waitScope));
// Connection is still up. // Connection is still up.
{ {
......
...@@ -448,6 +448,9 @@ private: ...@@ -448,6 +448,9 @@ private:
bool isTailCall = false; bool isTailCall = false;
// Is this a tail call? If so, we don't expect to receive results in the `Return`. // Is this a tail call? If so, we don't expect to receive results in the `Return`.
bool skipFinish = false;
// If true, don't send a Finish message.
inline bool operator==(decltype(nullptr)) const { inline bool operator==(decltype(nullptr)) const {
return !isAwaitingReturn && selfRef == nullptr; return !isAwaitingReturn && selfRef == nullptr;
} }
...@@ -1331,7 +1334,7 @@ private: ...@@ -1331,7 +1334,7 @@ private:
connectionState->questions.find(id), "Question ID no longer on table?"); connectionState->questions.find(id), "Question ID no longer on table?");
// Send the "Finish" message (if the connection is not already broken). // Send the "Finish" message (if the connection is not already broken).
if (connectionState->connection.is<Connected>()) { if (connectionState->connection.is<Connected>() && !question.skipFinish) {
auto message = connectionState->connection.get<Connected>()->newOutgoingMessage( auto message = connectionState->connection.get<Connected>()->newOutgoingMessage(
messageSizeHint<rpc::Finish>()); messageSizeHint<rpc::Finish>());
auto builder = message->getBody().getAs<rpc::Message>().initFinish(); auto builder = message->getBody().getAs<rpc::Message>().initFinish();
...@@ -1504,6 +1507,14 @@ private: ...@@ -1504,6 +1507,14 @@ private:
question.paramExports = kj::mv(exports); question.paramExports = kj::mv(exports);
question.isTailCall = isTailCall; question.isTailCall = isTailCall;
// Make the QuentionRef and result promise.
SendInternalResult result;
auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
result.questionRef = kj::refcounted<QuestionRef>(
*connectionState, questionId, kj::mv(paf.fulfiller));
question.selfRef = *result.questionRef;
result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
// Finish and send. // Finish and send.
callBuilder.setQuestionId(questionId); callBuilder.setQuestionId(questionId);
if (isTailCall) { if (isTailCall) {
...@@ -1514,18 +1525,13 @@ private: ...@@ -1514,18 +1525,13 @@ private:
callBuilder.getInterfaceId(), callBuilder.getMethodId()); callBuilder.getInterfaceId(), callBuilder.getMethodId());
message->send(); message->send();
})) { })) {
KJ_LOG(WARNING, *exception); // We can't safely throw the exception from here since we've already modified the question
kj::throwRecoverableException(kj::mv(*exception)); // table state. We'll have to reject the promise instead.
question.isAwaitingReturn = false;
question.skipFinish = true;
result.questionRef->reject(kj::mv(*exception));
} }
// Make the result promise.
SendInternalResult result;
auto paf = kj::newPromiseAndFulfiller<kj::Promise<kj::Own<RpcResponse>>>();
result.questionRef = kj::refcounted<QuestionRef>(
*connectionState, questionId, kj::mv(paf.fulfiller));
question.selfRef = *result.questionRef;
result.promise = paf.promise.attach(kj::addRef(*result.questionRef));
// Send and return. // Send and return.
return kj::mv(result); return kj::mv(result);
} }
...@@ -1819,7 +1825,6 @@ private: ...@@ -1819,7 +1825,6 @@ private:
KJ_CONTEXT("returning from RPC call", interfaceId, methodId); KJ_CONTEXT("returning from RPC call", interfaceId, methodId);
exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send(); exports = kj::downcast<RpcServerResponseImpl>(*KJ_ASSERT_NONNULL(response)).send();
})) { })) {
KJ_LOG(WARNING, *exception);
responseSent = false; responseSent = false;
sendErrorReturn(kj::mv(*exception)); sendErrorReturn(kj::mv(*exception));
return; return;
...@@ -2268,7 +2273,7 @@ private: ...@@ -2268,7 +2273,7 @@ private:
// Add the answer to the answer table for pipelining and send the response. // Add the answer to the answer table for pipelining and send the response.
auto& answer = answers[answerId]; auto& answer = answers[answerId];
KJ_REQUIRE(!answer.active, "questionId is already in use") { KJ_REQUIRE(!answer.active, "questionId is already in use", answerId) {
return; return;
} }
......
...@@ -869,6 +869,26 @@ T Promise<T>::wait(WaitScope& waitScope) { ...@@ -869,6 +869,26 @@ T Promise<T>::wait(WaitScope& waitScope) {
} }
} }
template <>
inline void Promise<void>::wait(WaitScope& waitScope) {
// Override <void> case to use throwRecoverableException().
_::ExceptionOr<_::Void> result;
waitImpl(kj::mv(node), result, waitScope);
if (result.value != nullptr) {
KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
}
} else KJ_IF_MAYBE(exception, result.exception) {
throwRecoverableException(kj::mv(*exception));
} else {
// Result contained neither a value nor an exception?
KJ_UNREACHABLE;
}
}
template <typename T> template <typename T>
ForkedPromise<T> Promise<T>::fork() { ForkedPromise<T> Promise<T>::fork() {
return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node))); return ForkedPromise<T>(false, refcounted<_::ForkHub<_::FixVoid<T>>>(kj::mv(node)));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment