Commit abec711a authored by zhujiashun's avatar zhujiashun

remove _goaway_received and use _goaway_stream_id

parent 7883472f
...@@ -325,9 +325,9 @@ H2Context::H2Context(Socket* socket, const Server* server) ...@@ -325,9 +325,9 @@ H2Context::H2Context(Socket* socket, const Server* server)
: _socket(socket) : _socket(socket)
, _remote_window_left(H2Settings::DEFAULT_INITIAL_WINDOW_SIZE) , _remote_window_left(H2Settings::DEFAULT_INITIAL_WINDOW_SIZE)
, _conn_state(H2_CONNECTION_UNINITIALIZED) , _conn_state(H2_CONNECTION_UNINITIALIZED)
, _last_receive_stream_id(-1) , _last_received_stream_id(-1)
, _last_send_stream_id(1) , _last_sent_stream_id(1)
, _goaway_received(false) , _goaway_stream_id(-1)
, _goaway_sent(false) , _goaway_sent(false)
, _deferred_window_update(0) { , _deferred_window_update(0) {
// Stop printing the field which is useless for remote settings. // Stop printing the field which is useless for remote settings.
...@@ -341,9 +341,9 @@ H2Context::H2Context(Socket* socket, const Server* server) ...@@ -341,9 +341,9 @@ H2Context::H2Context(Socket* socket, const Server* server)
_unack_local_settings.connection_window_size = FLAGS_h2_client_connection_window_size; _unack_local_settings.connection_window_size = FLAGS_h2_client_connection_window_size;
} }
#if defined(UNIT_TEST) #if defined(UNIT_TEST)
// In ut, we hope _last_send_stream_id run out quickly to test the correctness // In ut, we hope _last_sent_stream_id run out quickly to test the correctness
// of creating new h2 socket. This value is 10,000 less than 0x7FFFFFFF. // of creating new h2 socket. This value is 10,000 less than 0x7FFFFFFF.
_last_send_stream_id = 0x7fffd8ef; _last_sent_stream_id = 0x7fffd8ef;
#endif #endif
} }
...@@ -389,7 +389,7 @@ void H2Context::RemoveGoAwayStreams( ...@@ -389,7 +389,7 @@ void H2Context::RemoveGoAwayStreams(
StreamMap tmp; StreamMap tmp;
{ {
std::unique_lock<butil::Mutex> mu(_stream_mutex); std::unique_lock<butil::Mutex> mu(_stream_mutex);
_goaway_received = true; _goaway_stream_id = goaway_stream_id;
_pending_streams.swap(tmp); _pending_streams.swap(tmp);
} }
for (StreamMap::const_iterator it = tmp.begin(); it != tmp.end(); ++it) { for (StreamMap::const_iterator it = tmp.begin(); it != tmp.end(); ++it) {
...@@ -397,7 +397,7 @@ void H2Context::RemoveGoAwayStreams( ...@@ -397,7 +397,7 @@ void H2Context::RemoveGoAwayStreams(
} }
} else { } else {
std::unique_lock<butil::Mutex> mu(_stream_mutex); std::unique_lock<butil::Mutex> mu(_stream_mutex);
_goaway_received = true; _goaway_stream_id = goaway_stream_id;
for (StreamMap::const_iterator it = _pending_streams.begin(); for (StreamMap::const_iterator it = _pending_streams.begin();
it != _pending_streams.end(); ++it) { it != _pending_streams.end(); ++it) {
if (it->first > goaway_stream_id) { if (it->first > goaway_stream_id) {
...@@ -421,7 +421,7 @@ H2StreamContext* H2Context::FindStream(int stream_id) { ...@@ -421,7 +421,7 @@ H2StreamContext* H2Context::FindStream(int stream_id) {
int H2Context::TryToInsertStream(int stream_id, H2StreamContext* ctx) { int H2Context::TryToInsertStream(int stream_id, H2StreamContext* ctx) {
std::unique_lock<butil::Mutex> mu(_stream_mutex); std::unique_lock<butil::Mutex> mu(_stream_mutex);
if (_goaway_received) { if (_goaway_stream_id >= 0 && stream_id > _goaway_stream_id) {
return 1; return 1;
} }
H2StreamContext*& sctx = _pending_streams[stream_id]; H2StreamContext*& sctx = _pending_streams[stream_id];
...@@ -526,7 +526,7 @@ ParseResult H2Context::Consume( ...@@ -526,7 +526,7 @@ ParseResult H2Context::Consume(
} else { // send GOAWAY } else { // send GOAWAY
char goawaybuf[FRAME_HEAD_SIZE + 8]; char goawaybuf[FRAME_HEAD_SIZE + 8];
SerializeFrameHead(goawaybuf, 8, H2_FRAME_GOAWAY, 0, 0); SerializeFrameHead(goawaybuf, 8, H2_FRAME_GOAWAY, 0, 0);
SaveUint32(goawaybuf + FRAME_HEAD_SIZE, _last_receive_stream_id); SaveUint32(goawaybuf + FRAME_HEAD_SIZE, _last_received_stream_id);
SaveUint32(goawaybuf + FRAME_HEAD_SIZE + 4, h2_res.error()); SaveUint32(goawaybuf + FRAME_HEAD_SIZE + 4, h2_res.error());
if (WriteAck(_socket, goawaybuf, sizeof(goawaybuf)) != 0) { if (WriteAck(_socket, goawaybuf, sizeof(goawaybuf)) != 0) {
LOG(WARNING) << "Fail to send GOAWAY to " << *_socket; LOG(WARNING) << "Fail to send GOAWAY to " << *_socket;
...@@ -580,13 +580,13 @@ H2ParseResult H2Context::OnHeaders( ...@@ -580,13 +580,13 @@ H2ParseResult H2Context::OnHeaders(
frag_size -= pad_length; frag_size -= pad_length;
H2StreamContext* sctx = NULL; H2StreamContext* sctx = NULL;
if (is_server_side() && if (is_server_side() &&
frame_head.stream_id > _last_receive_stream_id) { // new stream frame_head.stream_id > _last_received_stream_id) { // new stream
if ((frame_head.stream_id & 1) == 0) { if ((frame_head.stream_id & 1) == 0) {
LOG(ERROR) << "stream_id=" << frame_head.stream_id LOG(ERROR) << "stream_id=" << frame_head.stream_id
<< " created by client is not odd"; << " created by client is not odd";
return MakeH2Error(H2_PROTOCOL_ERROR); return MakeH2Error(H2_PROTOCOL_ERROR);
} }
_last_receive_stream_id = frame_head.stream_id; _last_received_stream_id = frame_head.stream_id;
sctx = new H2StreamContext(_socket->is_read_progressive()); sctx = new H2StreamContext(_socket->is_read_progressive());
sctx->Init(this, frame_head.stream_id); sctx->Init(this, frame_head.stream_id);
const int rc = TryToInsertStream(frame_head.stream_id, sctx); const int rc = TryToInsertStream(frame_head.stream_id, sctx);
...@@ -1023,8 +1023,8 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const { ...@@ -1023,8 +1023,8 @@ void H2Context::Describe(std::ostream& os, const DescribeOptions& opt) const {
} }
const char sep = (opt.verbose ? '\n' : ' '); const char sep = (opt.verbose ? '\n' : ' ');
os << "conn_state=" << H2ConnectionState2Str(_conn_state); os << "conn_state=" << H2ConnectionState2Str(_conn_state);
os << sep << "last_receive_stream_id=" << _last_receive_stream_id; os << sep << "last_received_stream_id=" << _last_received_stream_id
os << sep << "last_send_stream_id=" << _last_send_stream_id; << sep << "last_sent_stream_id=" << _last_sent_stream_id;
os << sep << "deferred_window_update=" os << sep << "deferred_window_update="
<< _deferred_window_update.load(butil::memory_order_relaxed) << _deferred_window_update.load(butil::memory_order_relaxed)
<< sep << "remote_conn_window_left=" << sep << "remote_conn_window_left="
......
...@@ -380,9 +380,9 @@ friend void InitFrameHandlers(); ...@@ -380,9 +380,9 @@ friend void InitFrameHandlers();
Socket* _socket; Socket* _socket;
butil::atomic<int64_t> _remote_window_left; butil::atomic<int64_t> _remote_window_left;
H2ConnectionState _conn_state; H2ConnectionState _conn_state;
int _last_receive_stream_id; int _last_received_stream_id;
uint32_t _last_send_stream_id; uint32_t _last_sent_stream_id;
bool _goaway_received; int _goaway_stream_id;
bool _goaway_sent; bool _goaway_sent;
H2Settings _remote_settings; H2Settings _remote_settings;
H2Settings _local_settings; H2Settings _local_settings;
...@@ -402,13 +402,13 @@ inline int H2Context::AllocateClientStreamId() { ...@@ -402,13 +402,13 @@ inline int H2Context::AllocateClientStreamId() {
<< _last_client_stream_id; << _last_client_stream_id;
return -1; return -1;
} }
const int id = _last_send_stream_id; const int id = _last_sent_stream_id;
_last_send_stream_id += 2; _last_sent_stream_id += 2;
return id; return id;
} }
inline bool H2Context::RunOutStreams() const { inline bool H2Context::RunOutStreams() const {
return (_last_send_stream_id > 0x7FFFFFFF); return (_last_sent_stream_id > 0x7FFFFFFF);
} }
inline std::ostream& operator<<(std::ostream& os, const H2UnsentRequest& req) { inline std::ostream& operator<<(std::ostream& os, const H2UnsentRequest& req) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment