Commit 4e028ecb authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #461 from hurtonm/code_cleanup

session_base: code cleanup
parents 5da97127 e51a1f04
...@@ -157,8 +157,7 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) ...@@ -157,8 +157,7 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::pull_msg (msg_t *msg_) int zmq::session_base_t::pull_msg (msg_t *msg_)
{ {
// First message to send is identity // First message to send is identity
if (!identity_sent) { if (unlikely (!identity_sent)) {
zmq_assert (!(msg_->flags () & msg_t::more));
int rc = msg_->init_size (options.identity_size); int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0); errno_assert (rc == 0);
memcpy (msg_->data (), options.identity, options.identity_size); memcpy (msg_->data (), options.identity, options.identity_size);
...@@ -179,7 +178,7 @@ int zmq::session_base_t::pull_msg (msg_t *msg_) ...@@ -179,7 +178,7 @@ int zmq::session_base_t::pull_msg (msg_t *msg_)
int zmq::session_base_t::push_msg (msg_t *msg_) int zmq::session_base_t::push_msg (msg_t *msg_)
{ {
// First message to receive is identity // First message to receive is identity
if (!identity_received) { if (unlikely (!identity_received)) {
msg_->set_flags (msg_t::identity); msg_->set_flags (msg_t::identity);
identity_received = true; identity_received = true;
if (!options.recv_identity) { if (!options.recv_identity) {
...@@ -228,10 +227,8 @@ void zmq::session_base_t::clean_pipes () ...@@ -228,10 +227,8 @@ void zmq::session_base_t::clean_pipes ()
msg_t msg; msg_t msg;
int rc = msg.init (); int rc = msg.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
if (pull_msg (&msg) != 0) { rc = pull_msg (&msg);
zmq_assert (!incomplete_in); errno_assert (rc == 0);
break;
}
rc = msg.close (); rc = msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
} }
...@@ -258,11 +255,10 @@ void zmq::session_base_t::terminated (pipe_t *pipe_) ...@@ -258,11 +255,10 @@ void zmq::session_base_t::terminated (pipe_t *pipe_)
terminate (); terminate ();
} }
// If we are waiting for pending messages to be sent, at this point // If we are waiting for pending messages to be sent, at this point
// we are sure that there will be no more messages and we can proceed // we are sure that there will be no more messages and we can proceed
// with termination safely. // with termination safely.
if (pending && !pipe && terminating_pipes.size () == 0) if (pending && !pipe && terminating_pipes.empty ())
proceed_with_term (); proceed_with_term ();
} }
...@@ -470,13 +466,16 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -470,13 +466,16 @@ void zmq::session_base_t::start_connecting (bool wait_)
} }
#endif #endif
#if defined ZMQ_HAVE_OPENPGM #ifdef ZMQ_HAVE_OPENPGM
// Both PGM and EPGM transports are using the same infrastructure. // Both PGM and EPGM transports are using the same infrastructure.
if (addr->protocol == "pgm" || addr->protocol == "epgm") { if (addr->protocol == "pgm" || addr->protocol == "epgm") {
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
|| options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
// For EPGM transport with UDP encapsulation of PGM is used. // For EPGM transport with UDP encapsulation of PGM is used.
bool udp_encapsulation = (addr->protocol == "epgm"); bool const udp_encapsulation = addr->protocol == "epgm";
// At this point we'll create message pipes to the session straight // At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect' // away. There's no point in delaying it as no concept of 'connect'
...@@ -484,7 +483,7 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -484,7 +483,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
// PGM sender. // PGM sender.
pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t ( pgm_sender_t *pgm_sender = new (std::nothrow) pgm_sender_t (
io_thread, options); io_thread, options);
alloc_assert (pgm_sender); alloc_assert (pgm_sender);
...@@ -493,11 +492,10 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -493,11 +492,10 @@ void zmq::session_base_t::start_connecting (bool wait_)
send_attach (this, pgm_sender); send_attach (this, pgm_sender);
} }
else else {
if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
// PGM receiver. // PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
io_thread, options); io_thread, options);
alloc_assert (pgm_receiver); alloc_assert (pgm_receiver);
...@@ -506,8 +504,6 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -506,8 +504,6 @@ void zmq::session_base_t::start_connecting (bool wait_)
send_attach (this, pgm_receiver); send_attach (this, pgm_receiver);
} }
else
zmq_assert (false);
return; return;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment