Commit 841cf69e authored by Ian Barber's avatar Ian Barber

Merge branch 'master' of https://github.com/zeromq/libzmq

parents 3053f7e3 ace8f753
...@@ -145,7 +145,7 @@ bool zmq::decoder_t::eight_byte_size_ready () ...@@ -145,7 +145,7 @@ bool zmq::decoder_t::eight_byte_size_ready ()
bool zmq::decoder_t::flags_ready () bool zmq::decoder_t::flags_ready ()
{ {
// Store the flags from the wire into the message structure. // Store the flags from the wire into the message structure.
in_progress.set_flags (tmpbuf [0]); in_progress.set_flags (tmpbuf [0] & msg_t::more);
next_step (in_progress.data (), in_progress.size (), next_step (in_progress.data (), in_progress.size (),
&decoder_t::message_ready); &decoder_t::message_ready);
......
...@@ -84,11 +84,6 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -84,11 +84,6 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
// subsequent part should be immediately available. // subsequent part should be immediately available.
bool fetched = pipes [current]->read (msg_); bool fetched = pipes [current]->read (msg_);
// Check the atomicity of the message. If we've already received the
// first part of the message we should get the remaining parts
// without blocking.
zmq_assert (!more || fetched);
// Note that when message is not fetched, current pipe is deactivated // Note that when message is not fetched, current pipe is deactivated
// and replaced by another active pipe. Thus we don't have to increase // and replaced by another active pipe. Thus we don't have to increase
// the 'current' pointer. // the 'current' pointer.
...@@ -101,6 +96,11 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_) ...@@ -101,6 +96,11 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
return 0; return 0;
} }
// Check the atomicity of the message.
// If we've already received the first part of the message
// we should get the remaining parts without blocking.
zmq_assert (!more);
active--; active--;
pipes.swap (current, active); pipes.swap (current, active);
if (current == active) if (current == active)
......
...@@ -298,7 +298,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -298,7 +298,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Connect writer to the listener. // Connect writer to the listener.
rc = connect (*w_, (sockaddr *) &addr, sizeof (addr)); rc = connect (*w_, (struct sockaddr*) &addr, sizeof (addr));
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
// Accept connection from writer. // Accept connection from writer.
...@@ -327,7 +327,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_) ...@@ -327,7 +327,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
// //
// The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
// create the socket pair manually. // create the socket pair manually.
sockaddr_in lcladdr; struct sockaddr_in lcladdr;
memset (&lcladdr, 0, sizeof (lcladdr)); memset (&lcladdr, 0, sizeof (lcladdr));
lcladdr.sin_family = AF_INET; lcladdr.sin_family = AF_INET;
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment