Commit 60ef14f8 authored by Doron Somech's avatar Doron Somech

problem: ws_engine don't support WS RFC close control msg

Solution: when peer send a close msg, close the connection
parent b120ec33
...@@ -454,6 +454,11 @@ bool zmq::msg_t::is_pong () const ...@@ -454,6 +454,11 @@ bool zmq::msg_t::is_pong () const
return (_u.base.flags & CMD_TYPE_MASK) == pong; return (_u.base.flags & CMD_TYPE_MASK) == pong;
} }
bool zmq::msg_t::is_close_cmd () const
{
return (_u.base.flags & CMD_TYPE_MASK) == close_cmd;
}
size_t zmq::msg_t::command_body_size () const size_t zmq::msg_t::command_body_size () const
{ {
if (this->is_ping () || this->is_pong ()) if (this->is_ping () || this->is_pong ())
......
...@@ -84,6 +84,7 @@ class msg_t ...@@ -84,6 +84,7 @@ class msg_t
pong = 8, pong = 8,
subscribe = 12, subscribe = 12,
cancel = 16, cancel = 16,
close_cmd = 20,
credential = 32, credential = 32,
routing_id = 64, routing_id = 64,
shared = 128 shared = 128
...@@ -126,6 +127,7 @@ class msg_t ...@@ -126,6 +127,7 @@ class msg_t
bool is_leave () const; bool is_leave () const;
bool is_ping () const; bool is_ping () const;
bool is_pong () const; bool is_pong () const;
bool is_close_cmd () const;
// These are called on each message received by the session_base class, // These are called on each message received by the session_base class,
// so get them inlined to avoid the overhead of 2 function calls per msg // so get them inlined to avoid the overhead of 2 function calls per msg
......
...@@ -77,7 +77,7 @@ int zmq::ws_decoder_t::opcode_ready (unsigned char const *) ...@@ -77,7 +77,7 @@ int zmq::ws_decoder_t::opcode_ready (unsigned char const *)
case zmq::ws_protocol_t::opcode_binary: case zmq::ws_protocol_t::opcode_binary:
break; break;
case zmq::ws_protocol_t::opcode_close: case zmq::ws_protocol_t::opcode_close:
_msg_flags = msg_t::command; // TODO: set the command name to CLOSE _msg_flags = msg_t::command | msg_t::close_cmd;
break; break;
case zmq::ws_protocol_t::opcode_ping: case zmq::ws_protocol_t::opcode_ping:
_msg_flags = msg_t::ping | msg_t::command; _msg_flags = msg_t::ping | msg_t::command;
......
...@@ -59,6 +59,8 @@ void zmq::ws_encoder_t::message_ready () ...@@ -59,6 +59,8 @@ void zmq::ws_encoder_t::message_ready ()
_tmp_buf[offset++] = 0x80 | zmq::ws_protocol_t::opcode_ping; _tmp_buf[offset++] = 0x80 | zmq::ws_protocol_t::opcode_ping;
else if (in_progress ()->is_pong ()) else if (in_progress ()->is_pong ())
_tmp_buf[offset++] = 0x80 | zmq::ws_protocol_t::opcode_pong; _tmp_buf[offset++] = 0x80 | zmq::ws_protocol_t::opcode_pong;
else if (in_progress ()->is_close_cmd ())
_tmp_buf[offset++] = 0x80 | zmq::ws_protocol_t::opcode_close;
else else
_tmp_buf[offset++] = 0x82; // Final | binary _tmp_buf[offset++] = 0x82; // Final | binary
......
...@@ -131,6 +131,7 @@ zmq::ws_engine_t::ws_engine_t (fd_t fd_, ...@@ -131,6 +131,7 @@ zmq::ws_engine_t::ws_engine_t (fd_t fd_,
_next_msg = &ws_engine_t::next_handshake_command; _next_msg = &ws_engine_t::next_handshake_command;
_process_msg = &ws_engine_t::process_handshake_command; _process_msg = &ws_engine_t::process_handshake_command;
_close_msg.init ();
if (_options.heartbeat_interval > 0) { if (_options.heartbeat_interval > 0) {
_heartbeat_timeout = _options.heartbeat_timeout; _heartbeat_timeout = _options.heartbeat_timeout;
...@@ -141,6 +142,7 @@ zmq::ws_engine_t::ws_engine_t (fd_t fd_, ...@@ -141,6 +142,7 @@ zmq::ws_engine_t::ws_engine_t (fd_t fd_,
zmq::ws_engine_t::~ws_engine_t () zmq::ws_engine_t::~ws_engine_t ()
{ {
_close_msg.close ();
} }
void zmq::ws_engine_t::start_ws_handshake () void zmq::ws_engine_t::start_ws_handshake ()
...@@ -921,7 +923,7 @@ int zmq::ws_engine_t::decode_and_push (msg_t *msg_) ...@@ -921,7 +923,7 @@ int zmq::ws_engine_t::decode_and_push (msg_t *msg_)
zmq_assert (_mechanism != NULL); zmq_assert (_mechanism != NULL);
// with WS engine, ping and pong commands are control messages and should not go through any mechanism // with WS engine, ping and pong commands are control messages and should not go through any mechanism
if (msg_->is_ping () || msg_->is_pong ()) { if (msg_->is_ping () || msg_->is_pong () || msg_->is_close_cmd ()) {
if (process_command_message (msg_) == -1) if (process_command_message (msg_) == -1)
return -1; return -1;
} else if (_mechanism->decode (msg_) == -1) } else if (_mechanism->decode (msg_) == -1)
...@@ -933,7 +935,7 @@ int zmq::ws_engine_t::decode_and_push (msg_t *msg_) ...@@ -933,7 +935,7 @@ int zmq::ws_engine_t::decode_and_push (msg_t *msg_)
} }
if (msg_->flags () & msg_t::command && !msg_->is_ping () if (msg_->flags () & msg_t::command && !msg_->is_ping ()
&& !msg_->is_pong ()) && !msg_->is_pong () && !msg_->is_close_cmd ())
process_command_message (msg_); process_command_message (msg_);
if (_metadata) if (_metadata)
...@@ -946,6 +948,32 @@ int zmq::ws_engine_t::decode_and_push (msg_t *msg_) ...@@ -946,6 +948,32 @@ int zmq::ws_engine_t::decode_and_push (msg_t *msg_)
return 0; return 0;
} }
int zmq::ws_engine_t::produce_close_message (msg_t *msg_)
{
int rc = msg_->move (_close_msg);
errno_assert (rc == 0);
_next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&ws_engine_t::produce_no_msg_after_close);
return rc;
}
int zmq::ws_engine_t::produce_no_msg_after_close (msg_t *msg_)
{
_next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&ws_engine_t::close_connection_after_close);
errno = EAGAIN;
return -1;
}
int zmq::ws_engine_t::close_connection_after_close (msg_t *msg_)
{
error (connection_error);
errno = ECONNRESET;
return -1;
}
int zmq::ws_engine_t::produce_ping_message (msg_t *msg_) int zmq::ws_engine_t::produce_ping_message (msg_t *msg_)
{ {
...@@ -980,6 +1008,12 @@ int zmq::ws_engine_t::process_command_message (msg_t *msg_) ...@@ -980,6 +1008,12 @@ int zmq::ws_engine_t::process_command_message (msg_t *msg_)
_next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> ( _next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&ws_engine_t::produce_pong_message); &ws_engine_t::produce_pong_message);
out_event (); out_event ();
} else if (msg_->is_close_cmd ()) {
int rc = _close_msg.copy (*msg_);
errno_assert (rc == 0);
_next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&ws_engine_t::produce_close_message);
out_event ();
} }
return 0; return 0;
......
...@@ -146,6 +146,9 @@ class ws_engine_t : public stream_engine_base_t ...@@ -146,6 +146,9 @@ class ws_engine_t : public stream_engine_base_t
private: private:
int routing_id_msg (msg_t *msg_); int routing_id_msg (msg_t *msg_);
int process_routing_id_msg (msg_t *msg_); int process_routing_id_msg (msg_t *msg_);
int produce_close_message (msg_t *msg_);
int produce_no_msg_after_close (msg_t *msg_);
int close_connection_after_close (msg_t *msg_);
bool select_protocol (char *protocol); bool select_protocol (char *protocol);
...@@ -172,6 +175,7 @@ class ws_engine_t : public stream_engine_base_t ...@@ -172,6 +175,7 @@ class ws_engine_t : public stream_engine_base_t
char _websocket_accept[MAX_HEADER_VALUE_LENGTH + 1]; char _websocket_accept[MAX_HEADER_VALUE_LENGTH + 1];
int _heartbeat_timeout; int _heartbeat_timeout;
msg_t _close_msg;
}; };
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment