Commit 8aabf87f authored by gejun's avatar gejun

Simplify socket writing in h2 with WriteAck

parent ad00daf8
...@@ -138,6 +138,14 @@ inline void SerializeFrameHead(void* out_buf, const H2FrameHead& h) { ...@@ -138,6 +138,14 @@ inline void SerializeFrameHead(void* out_buf, const H2FrameHead& h) {
h.flags, h.stream_id); h.flags, h.stream_id);
} }
static int WriteAck(Socket* s, const void* data, size_t n) {
butil::IOBuf sendbuf;
sendbuf.append(data, n);
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
return s->Write(&sendbuf, &wopt);
}
// [ https://tools.ietf.org/html/rfc7540#section-6.5.1 ] // [ https://tools.ietf.org/html/rfc7540#section-6.5.1 ]
enum H2SettingsIdentifier { enum H2SettingsIdentifier {
...@@ -471,11 +479,7 @@ ParseResult H2Context::Consume( ...@@ -471,11 +479,7 @@ ParseResult H2Context::Consume(
char settingsbuf[FRAME_HEAD_SIZE + H2_SETTINGS_MAX_BYTE_SIZE + char settingsbuf[FRAME_HEAD_SIZE + H2_SETTINGS_MAX_BYTE_SIZE +
FRAME_HEAD_SIZE + 4/*for WU*/]; FRAME_HEAD_SIZE + 4/*for WU*/];
const size_t nb = SerializeH2SettingsFrameAndWU(_unack_local_settings, settingsbuf); const size_t nb = SerializeH2SettingsFrameAndWU(_unack_local_settings, settingsbuf);
butil::IOBuf buf; if (WriteAck(socket, settingsbuf, nb) != 0) {
buf.append(settingsbuf, nb);
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (socket->Write(&buf, &wopt) != 0) {
LOG(WARNING) << "Fail to respond http2-client with settings to " << *socket; LOG(WARNING) << "Fail to respond http2-client with settings to " << *socket;
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
...@@ -503,11 +507,7 @@ ParseResult H2Context::Consume( ...@@ -503,11 +507,7 @@ ParseResult H2Context::Consume(
SerializeFrameHead(rstbuf, 4, H2_FRAME_RST_STREAM, SerializeFrameHead(rstbuf, 4, H2_FRAME_RST_STREAM,
0, h2_res.stream_id()); 0, h2_res.stream_id());
SaveUint32(rstbuf + FRAME_HEAD_SIZE, h2_res.error()); SaveUint32(rstbuf + FRAME_HEAD_SIZE, h2_res.error());
butil::IOBuf sendbuf; if (WriteAck(_socket, rstbuf, sizeof(rstbuf)) != 0) {
sendbuf.append(rstbuf, sizeof(rstbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send RST_STREAM to " << *_socket; LOG(WARNING) << "Fail to send RST_STREAM to " << *_socket;
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
...@@ -528,11 +528,7 @@ ParseResult H2Context::Consume( ...@@ -528,11 +528,7 @@ ParseResult H2Context::Consume(
SerializeFrameHead(goawaybuf, 8, H2_FRAME_GOAWAY, 0, 0); SerializeFrameHead(goawaybuf, 8, H2_FRAME_GOAWAY, 0, 0);
SaveUint32(goawaybuf + FRAME_HEAD_SIZE, 0/*last-stream-id*/); SaveUint32(goawaybuf + FRAME_HEAD_SIZE, 0/*last-stream-id*/);
SaveUint32(goawaybuf + FRAME_HEAD_SIZE + 4, h2_res.error()); SaveUint32(goawaybuf + FRAME_HEAD_SIZE + 4, h2_res.error());
butil::IOBuf sendbuf; if (WriteAck(_socket, goawaybuf, sizeof(goawaybuf)) != 0) {
sendbuf.append(goawaybuf, sizeof(goawaybuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send GOAWAY to " << *_socket; LOG(WARNING) << "Fail to send GOAWAY to " << *_socket;
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG); return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
...@@ -771,12 +767,7 @@ H2ParseResult H2StreamContext::OnData( ...@@ -771,12 +767,7 @@ H2ParseResult H2StreamContext::OnData(
const int64_t conn_wu = stream_wu + _conn_ctx->ReleaseDeferredWindowUpdate(); const int64_t conn_wu = stream_wu + _conn_ctx->ReleaseDeferredWindowUpdate();
SerializeFrameHead(p, 4, H2_FRAME_WINDOW_UPDATE, 0, 0); SerializeFrameHead(p, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(p + FRAME_HEAD_SIZE, conn_wu); SaveUint32(p + FRAME_HEAD_SIZE, conn_wu);
if (WriteAck(_conn_ctx->_socket, winbuf, sizeof(winbuf)) != 0) {
butil::IOBuf sendbuf;
sendbuf.append(winbuf, sizeof(winbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_conn_ctx->_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE to " << *_conn_ctx->_socket; LOG(WARNING) << "Fail to send WINDOW_UPDATE to " << *_conn_ctx->_socket;
return MakeH2Error(H2_INTERNAL_ERROR); return MakeH2Error(H2_INTERNAL_ERROR);
} }
...@@ -899,11 +890,7 @@ H2ParseResult H2Context::OnSettings( ...@@ -899,11 +890,7 @@ H2ParseResult H2Context::OnSettings(
// Respond with ack // Respond with ack
char headbuf[FRAME_HEAD_SIZE]; char headbuf[FRAME_HEAD_SIZE];
SerializeFrameHead(headbuf, 0, H2_FRAME_SETTINGS, H2_FLAGS_ACK, 0); SerializeFrameHead(headbuf, 0, H2_FRAME_SETTINGS, H2_FLAGS_ACK, 0);
butil::IOBuf sendbuf; if (WriteAck(_socket, headbuf, sizeof(headbuf)) != 0) {
sendbuf.append(headbuf, FRAME_HEAD_SIZE);
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to respond settings with ack to " << *_socket; LOG(WARNING) << "Fail to respond settings with ack to " << *_socket;
return MakeH2Error(H2_PROTOCOL_ERROR); return MakeH2Error(H2_PROTOCOL_ERROR);
} }
...@@ -939,11 +926,7 @@ H2ParseResult H2Context::OnPing( ...@@ -939,11 +926,7 @@ H2ParseResult H2Context::OnPing(
char pongbuf[FRAME_HEAD_SIZE + 8]; char pongbuf[FRAME_HEAD_SIZE + 8];
SerializeFrameHead(pongbuf, 8, H2_FRAME_PING, H2_FLAGS_ACK, 0); SerializeFrameHead(pongbuf, 8, H2_FRAME_PING, H2_FLAGS_ACK, 0);
it.copy_and_forward(pongbuf + FRAME_HEAD_SIZE, 8); it.copy_and_forward(pongbuf + FRAME_HEAD_SIZE, 8);
butil::IOBuf sendbuf; if (WriteAck(_socket, pongbuf, sizeof(pongbuf)) != 0) {
sendbuf.append(pongbuf, sizeof(pongbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send ack of PING to " << *_socket; LOG(WARNING) << "Fail to send ack of PING to " << *_socket;
return MakeH2Error(H2_PROTOCOL_ERROR); return MakeH2Error(H2_PROTOCOL_ERROR);
} }
...@@ -1070,12 +1053,7 @@ void H2Context::DeferWindowUpdate(int64_t size) { ...@@ -1070,12 +1053,7 @@ void H2Context::DeferWindowUpdate(int64_t size) {
char winbuf[FRAME_HEAD_SIZE + 4]; char winbuf[FRAME_HEAD_SIZE + 4];
SerializeFrameHead(winbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0); SerializeFrameHead(winbuf, 4, H2_FRAME_WINDOW_UPDATE, 0, 0);
SaveUint32(winbuf + FRAME_HEAD_SIZE, conn_wu); SaveUint32(winbuf + FRAME_HEAD_SIZE, conn_wu);
if (WriteAck(_socket, winbuf, sizeof(winbuf)) != 0) {
butil::IOBuf sendbuf;
sendbuf.append(winbuf, sizeof(winbuf));
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
if (_socket->Write(&sendbuf, &wopt) != 0) {
LOG(WARNING) << "Fail to send WINDOW_UPDATE"; LOG(WARNING) << "Fail to send WINDOW_UPDATE";
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment