Commit 7bee77b3 authored by Zhangyi Chen's avatar Zhangyi Chen

Fail the RPC ASSP if the sending socket is overcrowded

parent d28ae56f
...@@ -386,12 +386,19 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { ...@@ -386,12 +386,19 @@ void ProcessRpcRequest(InputMessageBase* msg_base) {
cntl->SetFailed(ELOGOFF, "Server is stopping"); cntl->SetFailed(ELOGOFF, "Server is stopping");
break; break;
} }
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->options().max_concurrency);
break; break;
} }
if (FLAGS_usercode_in_pthread && TooManyUserCode()) { if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
cntl->SetFailed(ELIMIT, "Too many user code to run when" cntl->SetFailed(ELIMIT, "Too many user code to run when"
" -usercode_in_pthread is on"); " -usercode_in_pthread is on");
......
...@@ -1183,6 +1183,11 @@ void ProcessHttpRequest(InputMessageBase *msg) { ...@@ -1183,6 +1183,11 @@ void ProcessHttpRequest(InputMessageBase *msg) {
// NOTE: accesses to builtin services are not counted as part of // NOTE: accesses to builtin services are not counted as part of
// concurrency, therefore are not limited by ServerOptions.max_concurrency. // concurrency, therefore are not limited by ServerOptions.max_concurrency.
if (!sp->is_builtin_service && !sp->params.is_tabbed) { if (!sp->is_builtin_service && !sp->params.is_tabbed) {
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
return SendHttpResponse(cntl.release(), server, method_status);
}
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->options().max_concurrency);
......
...@@ -418,6 +418,12 @@ void ProcessHuluRequest(InputMessageBase* msg_base) { ...@@ -418,6 +418,12 @@ void ProcessHuluRequest(InputMessageBase* msg_base) {
break; break;
} }
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->options().max_concurrency);
......
...@@ -290,6 +290,11 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) { ...@@ -290,6 +290,11 @@ void ProcessNsheadRequest(InputMessageBase* msg_base) {
cntl->SetFailed(ELOGOFF, "Server is stopping"); cntl->SetFailed(ELOGOFF, "Server is stopping");
break; break;
} }
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
if (!server_accessor.AddConcurrency(cntl)) { if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->options().max_concurrency);
......
...@@ -382,6 +382,12 @@ void ProcessSofaRequest(InputMessageBase* msg_base) { ...@@ -382,6 +382,12 @@ void ProcessSofaRequest(InputMessageBase* msg_base) {
break; break;
} }
if (socket->is_overcrowded()) {
cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded",
butil::endpoint2str(socket->remote_side()).c_str());
break;
}
if (!server_accessor.AddConcurrency(cntl.get())) { if (!server_accessor.AddConcurrency(cntl.get())) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->options().max_concurrency); server->options().max_concurrency);
......
...@@ -452,6 +452,9 @@ public: ...@@ -452,6 +452,9 @@ public:
// A brief description of this socket, consistent with os << *this // A brief description of this socket, consistent with os << *this
std::string description() const; std::string description() const;
// Returns true if the remote side is overcrowded.
bool is_overcrowded() const { return _overcrowded; }
private: private:
DISALLOW_COPY_AND_ASSIGN(Socket); DISALLOW_COPY_AND_ASSIGN(Socket);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment