Commit 538a767e authored by Kenton Varda's avatar Kenton Varda

Implement basic connection-level flow control.

This is blunt: A peer can choose to stop reading new messages from a connection whenever the calls in-flight cross a certain threshold. This is needed in Sandstorm to prevent errant (or malicious) apps from consuming excessive RAM in other parts of the system by flooding them with calls.
parent 72e69b7f
......@@ -94,6 +94,7 @@ private:
Capability::Client baseBootstrap(AnyStruct::Reader vatId);
Capability::Client baseRestore(AnyStruct::Reader vatId, AnyPointer::Reader objectId);
void baseSetFlowLimit(size_t words);
template <typename>
friend class capnp::RpcSystem;
......
......@@ -249,9 +249,11 @@ public:
kj::Maybe<RealmGateway<>::Client> gateway,
kj::Maybe<SturdyRefRestorerBase&> restorer,
kj::Own<VatNetworkBase::Connection>&& connectionParam,
kj::Own<kj::PromiseFulfiller<DisconnectInfo>>&& disconnectFulfiller)
kj::Own<kj::PromiseFulfiller<DisconnectInfo>>&& disconnectFulfiller,
size_t flowLimit)
: bootstrapFactory(bootstrapFactory), gateway(kj::mv(gateway)),
restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), tasks(*this) {
restorer(restorer), disconnectFulfiller(kj::mv(disconnectFulfiller)), flowLimit(flowLimit),
tasks(*this) {
connection.init<Connected>(kj::mv(connectionParam));
tasks.add(messageLoop());
}
......@@ -379,6 +381,11 @@ public:
connection.init<Disconnected>(kj::mv(networkException));
}
void setFlowLimit(size_t words) {
flowLimit = words;
maybeUnblockFlow();
}
private:
class RpcClient;
class ImportClient;
......@@ -529,6 +536,13 @@ private:
// There are only four tables. This definitely isn't a fifth table. I don't know what you're
// talking about.
size_t flowLimit;
size_t callWordsInFlight = 0;
kj::Maybe<kj::Own<kj::PromiseFulfiller<void>>> flowWaiter;
// If non-null, we're currently blocking incoming messages waiting for callWordsInFlight to drop
// below flowLimit. Fulfill this to un-block.
kj::TaskSet tasks;
// =====================================================================================
......@@ -1675,12 +1689,15 @@ private:
bool redirectResults, kj::Own<kj::PromiseFulfiller<void>>&& cancelFulfiller)
: connectionState(kj::addRef(connectionState)),
answerId(answerId),
requestSize(request->getBody().targetSize().wordCount),
request(kj::mv(request)),
paramsCapTable(kj::mv(capTableArray)),
params(paramsCapTable.imbue(params)),
returnMessage(nullptr),
redirectResults(redirectResults),
cancelFulfiller(kj::mv(cancelFulfiller)) {}
cancelFulfiller(kj::mv(cancelFulfiller)) {
connectionState.callWordsInFlight += requestSize;
}
~RpcCallContext() noexcept(false) {
if (isFirstResponder()) {
......@@ -1891,6 +1908,7 @@ private:
// Request ---------------------------------------------
size_t requestSize; // for flow limit purposes
kj::Maybe<kj::Own<IncomingRpcMessage>> request;
ReaderCapabilityTable paramsCapTable;
AnyPointer::Reader params;
......@@ -1954,17 +1972,38 @@ private:
answer.pipeline = nullptr;
}
}
// Also, this is the right time to stop counting the call against the flow limit.
connectionState->callWordsInFlight -= requestSize;
connectionState->maybeUnblockFlow();
}
};
// =====================================================================================
// Message handling
void maybeUnblockFlow() {
if (callWordsInFlight < flowLimit) {
KJ_IF_MAYBE(w, flowWaiter) {
w->get()->fulfill();
flowWaiter = nullptr;
}
}
}
kj::Promise<void> messageLoop() {
if (!connection.is<Connected>()) {
return kj::READY_NOW;
}
if (callWordsInFlight > flowLimit) {
auto paf = kj::newPromiseAndFulfiller<void>();
flowWaiter = kj::mv(paf.fulfiller);
return paf.promise.then([this]() {
return messageLoop();
});
}
return connection.get<Connected>()->receiveIncomingMessage().then(
[this](kj::Maybe<kj::Own<IncomingRpcMessage>>&& message) {
KJ_IF_MAYBE(m, message) {
......@@ -2667,12 +2706,21 @@ public:
}
}
void setFlowLimit(size_t words) {
flowLimit = words;
for (auto& conn: connections) {
conn.second->setFlowLimit(words);
}
}
private:
VatNetworkBase& network;
kj::Maybe<Capability::Client> bootstrapInterface;
BootstrapFactoryBase& bootstrapFactory;
kj::Maybe<RealmGateway<>::Client> gateway;
kj::Maybe<SturdyRefRestorerBase&> restorer;
size_t flowLimit = kj::maxValue;
kj::TaskSet tasks;
typedef std::unordered_map<VatNetworkBase::Connection*, kj::Own<RpcConnectionState>>
......@@ -2693,7 +2741,7 @@ private:
}));
auto newState = kj::refcounted<RpcConnectionState>(
bootstrapFactory, gateway, restorer, kj::mv(connection),
kj::mv(onDisconnect.fulfiller));
kj::mv(onDisconnect.fulfiller), flowLimit);
RpcConnectionState& result = *newState;
connections.insert(std::make_pair(connectionPtr, kj::mv(newState)));
return result;
......@@ -2756,5 +2804,9 @@ Capability::Client RpcSystemBase::baseRestore(
return impl->restore(hostId, objectId);
}
void RpcSystemBase::baseSetFlowLimit(size_t words) {
return impl->setFlowLimit(words);
}
} // namespace _ (private)
} // namespace capnp
......@@ -111,6 +111,37 @@ public:
// to using bootstrap(), which is equivalent to calling restore() with a null `objectId`.
// You may emulate the old concept of object IDs by exporting a bootstrap interface which has
// methods that can be used to obtain other capabilities by ID.
void setFlowLimit(size_t words);
// Sets the incoming call flow limit. If more than `words` worth of call messages have not yet
// received responses, the RpcSystem will not read further messages from the stream. This can be
// used as a crude way to prevent a resource exhaustion attack (or bug) in which a peer makes an
// excessive number of simultaneous calls that consume the receiver's RAM.
//
// There are some caveats. When over the flow limit, all messages are blocked, including returns.
// If the outstanding calls are themselves waiting on calls going in the opposite direction, the
// flow limit may prevent those calls from completing, leading to deadlock. However, a
// sufficiently high limit should make this unlikely.
//
// Note that a call's parameter size counts against the flow limit until the call returns, even
// if the recipient calls releaseParams() to free the parameter memory early. This is because
// releaseParams() may simply indicate that the parameters have been forwarded to another
// machine, but are still in-memory there. For illustration, say that Alice made a call to Bob
// who forwarded the call to Carol. Bob has imposed a flow limit on Alice. Alice's calls are
// being forwarded to Carol, so Bob never keeps the parameters in-memory for more than a brief
// period. However, the flow limit counts all calls that haven't returned, even if Bob has
// already freed the memory they consumed. You might argue that the right solution here is
// instead for Carol to impose her own flow limit on Bob. This has a serious problem, though:
// Bob might be forwarding requests to Carol on behalf of many different parties, not just Alice.
// If Alice can pump enough data to hit the Bob -> Carol flow limit, then those other parties
// will be disrupted. Thus, we can only really impose the limit on the Alice -> Bob link, which
// only affects Alice. We need that one flow limit to limit Alice's impact on the whole system,
// so it has to count all in-flight calls.
//
// In Sandstorm, flow limits are imposed by the supervisor on calls coming out of a grain, in
// order to prevent a grain from innundating the system with in-flight calls. In practice, the
// main time this happens is when a grain is pushing a large file download and doesn't implement
// proper cooperative flow control.
};
template <typename VatId, typename ProvisionId, typename RecipientId,
......@@ -437,6 +468,11 @@ Capability::Client RpcSystem<VatId>::restore(
return baseRestore(_::PointerHelpers<VatId>::getInternalReader(hostId), objectId);
}
template <typename VatId>
inline void RpcSystem<VatId>::setFlowLimit(size_t words) {
baseSetFlowLimit(words);
}
template <typename VatId, typename ProvisionId, typename RecipientId,
typename ThirdPartyCapId, typename JoinResult>
RpcSystem<VatId> makeRpcServer(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment