Commit c081ed08 authored by zhujiashun's avatar zhujiashun Committed by gejun

Remove the global mutex in H2Context::ReclaimWindowSize

parent 1425a87b
......@@ -237,7 +237,7 @@ friend void InitFrameHandlers();
butil::Mutex _stream_mutex;
StreamMap _pending_streams;
butil::Mutex _conn_window_mutex;;
int64_t _pending_conn_window_size;
butil::atomic<int64_t> _pending_conn_window_size;
};
inline bool add_window_size(butil::atomic<int64_t>* window_size, int64_t diff) {
......@@ -925,7 +925,6 @@ H2ParseResult H2Context::OnWindowUpdate(
LOG(ERROR) << "Invalid window_size_increment=" << inc;
return MakeH2Error(H2_PROTOCOL_ERROR);
}
if (frame_head.stream_id == 0) {
if (!add_window_size(&_remote_conn_window_size, inc)) {
LOG(ERROR) << "Invalid window_size_increment=" << inc;
......@@ -973,23 +972,22 @@ void H2Context::ReclaimWindowSize(int64_t size) {
// Spec does not stipulate how a receiver decides when to send this frame or the value
// that it sends, nor does it specify how a sender chooses to send packets.
// Implementations are able to select any algorithm that suits their needs.
{
std::unique_lock<butil::Mutex> mu(_stream_mutex);
_pending_conn_window_size += size;
if (_pending_conn_window_size < FLAGS_http2_window_update_size) {
return;
}
_pending_conn_window_size -= FLAGS_http2_window_update_size;
int64_t window_update_size = 0;
if (_pending_conn_window_size.fetch_add(size) > FLAGS_http2_window_update_size) {
window_update_size =
_pending_conn_window_size.exchange(0, butil::memory_order_relaxed);
}
char cwinbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(cwinbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(cwinbuf + FRAME_HEAD_SIZE, FLAGS_http2_window_update_size);
butil::IOBuf sendbuf;
sendbuf.append(cwinbuf, sizeof(cwinbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE";
if (window_update_size > 0) {
char cwinbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(cwinbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(cwinbuf + FRAME_HEAD_SIZE, window_update_size);
butil::IOBuf sendbuf;
sendbuf.append(cwinbuf, sizeof(cwinbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE";
}
}
}
......@@ -1652,8 +1650,6 @@ void PackH2Request(butil::IOBuf*,
StreamUserData* H2GlobalStreamCreator::OnCreatingStream(
SocketUniquePtr* inout, Controller* cntl) {
// Although the critical section looks huge, it should rarely be contended
// since timeout of RPC is much larger than the delay of sending.
std::unique_lock<butil::Mutex> mu(_mutex);
SocketUniquePtr& agent_sock = (*inout)->_agent_socket;
if (!agent_sock || agent_sock->Failed() ||
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment