Commit 92c7c183 authored by Martin Sustrik's avatar Martin Sustrik

Message atomicity problem solved in PUB socket

When new peer connects to a PUB socket while it is in the middle
of sending of multi-part messages, it gets just the remaining
part of the message, i.e. message atomicity is broken.

This patch drops the tail part of the message and starts sending
to the peer only when new message is started.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent fac9c2da
...@@ -41,6 +41,13 @@ zmq::dist_t::~dist_t () ...@@ -41,6 +41,13 @@ zmq::dist_t::~dist_t ()
void zmq::dist_t::attach (writer_t *pipe_) void zmq::dist_t::attach (writer_t *pipe_)
{ {
// If we are in the middle of sending a message, let's postpone plugging
// in the pipe.
if (!terminating && more) {
new_pipes.push_back (pipe_);
return;
}
pipe_->set_event_sink (this); pipe_->set_event_sink (this);
pipes.push_back (pipe_); pipes.push_back (pipe_);
...@@ -83,6 +90,23 @@ void zmq::dist_t::activated (writer_t *pipe_) ...@@ -83,6 +90,23 @@ void zmq::dist_t::activated (writer_t *pipe_)
} }
int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
{
// Is this end of a multipart message?
bool msg_more = msg_->flags & ZMQ_MSG_MORE;
// Push the message to active pipes.
distribute (msg_, flags_);
// If mutlipart message is fully sent, activate new pipes.
if (more && !msg_more)
clear_new_pipes ();
more = msg_more;
return 0;
}
void zmq::dist_t::distribute (zmq_msg_t *msg_, int flags_)
{ {
// If there are no active pipes available, simply drop the message. // If there are no active pipes available, simply drop the message.
if (active == 0) { if (active == 0) {
...@@ -90,7 +114,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) ...@@ -90,7 +114,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
zmq_assert (rc == 0); zmq_assert (rc == 0);
rc = zmq_msg_init (msg_); rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
return 0; return;
} }
msg_content_t *content = (msg_content_t*) msg_->content; msg_content_t *content = (msg_content_t*) msg_->content;
...@@ -102,7 +126,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) ...@@ -102,7 +126,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
i++; i++;
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
return 0; return;
} }
// Optimisation for the case when there's only a single pipe // Optimisation for the case when there's only a single pipe
...@@ -115,7 +139,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) ...@@ -115,7 +139,7 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
} }
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
return 0; return;
} }
// There are at least 2 destinations for the message. That means we have // There are at least 2 destinations for the message. That means we have
...@@ -139,8 +163,6 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_) ...@@ -139,8 +163,6 @@ int zmq::dist_t::send (zmq_msg_t *msg_, int flags_)
// Detach the original message from the data buffer. // Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
return 0;
} }
bool zmq::dist_t::has_out () bool zmq::dist_t::has_out ()
...@@ -160,3 +182,15 @@ bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_) ...@@ -160,3 +182,15 @@ bool zmq::dist_t::write (class writer_t *pipe_, zmq_msg_t *msg_)
return true; return true;
} }
void zmq::dist_t::clear_new_pipes ()
{
for (new_pipes_t::iterator it = new_pipes.begin (); it != new_pipes.end ();
++it) {
(*it)->set_event_sink (this);
pipes.push_back (*it);
pipes.swap (active, pipes.size () - 1);
active++;
}
new_pipes.clear ();
}
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#ifndef __ZMQ_DIST_HPP_INCLUDED__ #ifndef __ZMQ_DIST_HPP_INCLUDED__
#define __ZMQ_DIST_HPP_INCLUDED__ #define __ZMQ_DIST_HPP_INCLUDED__
#include <vector>
#include "array.hpp" #include "array.hpp"
#include "pipe.hpp" #include "pipe.hpp"
...@@ -51,10 +53,23 @@ namespace zmq ...@@ -51,10 +53,23 @@ namespace zmq
// fails. In such a case false is returned. // fails. In such a case false is returned.
bool write (class writer_t *pipe_, zmq_msg_t *msg_); bool write (class writer_t *pipe_, zmq_msg_t *msg_);
// Put the message to all active pipes.
void distribute (zmq_msg_t *msg_, int flags_);
// Plug in all the delayed pipes.
void clear_new_pipes ();
// List of outbound pipes. // List of outbound pipes.
typedef array_t <class writer_t> pipes_t; typedef array_t <class writer_t> pipes_t;
pipes_t pipes; pipes_t pipes;
// List of new pipes that were not yet inserted into 'pipes' list.
// These pipes are moves to 'pipes' list once the current multipart
// message is fully sent. This way we avoid sending incomplete messages
// to peers.
typedef std::vector <class writer_t*> new_pipes_t;
new_pipes_t new_pipes;
// Number of active pipes. All the active pipes are located at the // Number of active pipes. All the active pipes are located at the
// beginning of the pipes array. // beginning of the pipes array.
pipes_t::size_type active; pipes_t::size_type active;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment