Unverified Commit 0f9dd85e authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #3741 from drolevar/master

Change XSUB -> XPUB multipart message processing.
parents e0d9e213 04761133
# Permission to Relicense under MPLv2 or any other OSI approved license chosen by the current ZeroMQ BDFL
This is a statement by Andrij Abyzov
that grants permission to relicense his copyrights in the libzmq C++
library (ZeroMQ) under the Mozilla Public License v2 (MPLv2) or any other
Open Source Initiative approved license chosen by the current ZeroMQ
BDFL (Benevolent Dictator for Life).
A portion of the commits made by the Github handle "drolevar", with
commit author "Andrij Abyzov <drolevar@gmail.com>", are copyright of Andrij Abyzov.
This document hereby grants the libzmq project team to relicense libzmq,
including all past, present and future contributions of the author listed
above.
Andrij Abyzov
2019/11/19
...@@ -373,10 +373,9 @@ void zmq::select_t::loop () ...@@ -373,10 +373,9 @@ void zmq::select_t::loop ()
// http://stackoverflow.com/q/35043420/188530 // http://stackoverflow.com/q/35043420/188530
if (FD_ISSET (fd, &family_entry.fds_set.read) if (FD_ISSET (fd, &family_entry.fds_set.read)
&& FD_ISSET (fd, &family_entry.fds_set.write)) && FD_ISSET (fd, &family_entry.fds_set.write))
rc = rc = WSAEventSelect (fd, wsa_events.events[3],
WSAEventSelect (fd, wsa_events.events[3], FD_READ | FD_ACCEPT | FD_CLOSE
FD_READ | FD_ACCEPT | FD_CLOSE | FD_WRITE | FD_CONNECT);
| FD_WRITE | FD_CONNECT);
else if (FD_ISSET (fd, &family_entry.fds_set.read)) else if (FD_ISSET (fd, &family_entry.fds_set.read))
rc = WSAEventSelect (fd, wsa_events.events[0], rc = WSAEventSelect (fd, wsa_events.events[0],
FD_READ | FD_ACCEPT | FD_CLOSE); FD_READ | FD_ACCEPT | FD_CLOSE);
......
...@@ -41,7 +41,8 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : ...@@ -41,7 +41,8 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), socket_base_t (parent_, tid_, sid_),
_verbose_subs (false), _verbose_subs (false),
_verbose_unsubs (false), _verbose_unsubs (false),
_more (false), _more_send (false),
_more_recv (false),
_lossy (true), _lossy (true),
_manual (false), _manual (false),
_send_last_pipe (false), _send_last_pipe (false),
...@@ -91,31 +92,40 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, ...@@ -91,31 +92,40 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_,
void zmq::xpub_t::xread_activated (pipe_t *pipe_) void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{ {
// There are some subscriptions waiting. Let's process them. // There are some subscriptions waiting. Let's process them.
msg_t sub; msg_t msg;
while (pipe_->read (&sub)) { while (pipe_->read (&msg)) {
metadata_t *metadata = sub.metadata (); metadata_t *metadata = msg.metadata ();
unsigned char *msg_data = static_cast<unsigned char *> (sub.data ()), unsigned char *msg_data = static_cast<unsigned char *> (msg.data ()),
*data = NULL; *data = NULL;
size_t size = 0; size_t size = 0;
bool subscribe = false; bool subscribe = false;
bool is_subscribe_or_cancel = false;
if (!_more_recv) {
// Apply the subscription to the trie
if (msg.is_subscribe () || msg.is_cancel ()) {
data = static_cast<unsigned char *> (msg.command_body ());
size = msg.command_body_size ();
subscribe = msg.is_subscribe ();
is_subscribe_or_cancel = true;
} else if (msg.size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
data = msg_data + 1;
size = msg.size () - 1;
subscribe = *msg_data == 1;
is_subscribe_or_cancel = true;
}
}
// Apply the subscription to the trie _more_recv = (msg.flags () & msg_t::more) != 0;
if (sub.is_subscribe () || sub.is_cancel ()) {
data = static_cast<unsigned char *> (sub.command_body ()); if (!is_subscribe_or_cancel) {
size = sub.command_body_size ();
subscribe = sub.is_subscribe ();
} else if (sub.size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
data = msg_data + 1;
size = sub.size () - 1;
subscribe = *msg_data == 1;
} else {
// Process user message coming upstream from xsub socket // Process user message coming upstream from xsub socket
_pending_data.push_back (blob_t (msg_data, sub.size ())); _pending_data.push_back (blob_t (msg_data, msg.size ()));
if (metadata) if (metadata)
metadata->add_ref (); metadata->add_ref ();
_pending_metadata.push_back (metadata); _pending_metadata.push_back (metadata);
_pending_flags.push_back (sub.flags ()); _pending_flags.push_back (msg.flags ());
sub.close (); msg.close ();
continue; continue;
} }
...@@ -174,7 +184,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -174,7 +184,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
_pending_flags.push_back (0); _pending_flags.push_back (0);
} }
} }
sub.close (); msg.close ();
} }
} }
...@@ -278,7 +288,7 @@ int zmq::xpub_t::xsend (msg_t *msg_) ...@@ -278,7 +288,7 @@ int zmq::xpub_t::xsend (msg_t *msg_)
bool msg_more = (msg_->flags () & msg_t::more) != 0; bool msg_more = (msg_->flags () & msg_t::more) != 0;
// For the first part of multi-part message, find the matching pipes. // For the first part of multi-part message, find the matching pipes.
if (!_more) { if (!_more_send) {
if (unlikely (_manual && _last_pipe && _send_last_pipe)) { if (unlikely (_manual && _last_pipe && _send_last_pipe)) {
_subscriptions.match (static_cast<unsigned char *> (msg_->data ()), _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), mark_last_pipe_as_matching, msg_->size (), mark_last_pipe_as_matching,
...@@ -300,7 +310,7 @@ int zmq::xpub_t::xsend (msg_t *msg_) ...@@ -300,7 +310,7 @@ int zmq::xpub_t::xsend (msg_t *msg_)
// all the pipes as non-matching. // all the pipes as non-matching.
if (!msg_more) if (!msg_more)
_dist.unmatch (); _dist.unmatch ();
_more = msg_more; _more_send = msg_more;
rc = 0; // Yay, sent successfully rc = 0; // Yay, sent successfully
} }
} else } else
......
...@@ -91,7 +91,10 @@ class xpub_t : public socket_base_t ...@@ -91,7 +91,10 @@ class xpub_t : public socket_base_t
bool _verbose_unsubs; bool _verbose_unsubs;
// True if we are in the middle of sending a multi-part message. // True if we are in the middle of sending a multi-part message.
bool _more; bool _more_send;
// True if we are in the middle of receiving a multi-part message.
bool _more_recv;
// Drop messages if HWM reached, otherwise return with EAGAIN // Drop messages if HWM reached, otherwise return with EAGAIN
bool _lossy; bool _lossy;
......
...@@ -37,7 +37,8 @@ ...@@ -37,7 +37,8 @@
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_), socket_base_t (parent_, tid_, sid_),
_has_message (false), _has_message (false),
_more (false) _more_send (false),
_more_recv (false)
{ {
options.type = ZMQ_XSUB; options.type = ZMQ_XSUB;
...@@ -99,6 +100,13 @@ int zmq::xsub_t::xsend (msg_t *msg_) ...@@ -99,6 +100,13 @@ int zmq::xsub_t::xsend (msg_t *msg_)
size_t size = msg_->size (); size_t size = msg_->size ();
unsigned char *data = static_cast<unsigned char *> (msg_->data ()); unsigned char *data = static_cast<unsigned char *> (msg_->data ());
bool send_more = _more_send;
_more_send = (msg_->flags () & msg_t::more) != 0;
if (send_more)
// User message sent upstream to XPUB socket
return _dist.send_to_all (msg_);
if (msg_->is_subscribe () || (size > 0 && *data == 1)) { if (msg_->is_subscribe () || (size > 0 && *data == 1)) {
// Process subscribe message // Process subscribe message
// This used to filter out duplicate subscriptions, // This used to filter out duplicate subscriptions,
...@@ -152,7 +160,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_) ...@@ -152,7 +160,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_)
int rc = msg_->move (_message); int rc = msg_->move (_message);
errno_assert (rc == 0); errno_assert (rc == 0);
_has_message = false; _has_message = false;
_more = (msg_->flags () & msg_t::more) != 0; _more_recv = (msg_->flags () & msg_t::more) != 0;
return 0; return 0;
} }
...@@ -170,8 +178,8 @@ int zmq::xsub_t::xrecv (msg_t *msg_) ...@@ -170,8 +178,8 @@ int zmq::xsub_t::xrecv (msg_t *msg_)
// Check whether the message matches at least one subscription. // Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed // Non-initial parts of the message are passed
if (_more || !options.filter || match (msg_)) { if (_more_recv || !options.filter || match (msg_)) {
_more = (msg_->flags () & msg_t::more) != 0; _more_recv = (msg_->flags () & msg_t::more) != 0;
return 0; return 0;
} }
...@@ -187,7 +195,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_) ...@@ -187,7 +195,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_)
bool zmq::xsub_t::xhas_in () bool zmq::xsub_t::xhas_in ()
{ {
// There are subsequent parts of the partly-read message available. // There are subsequent parts of the partly-read message available.
if (_more) if (_more_recv)
return true; return true;
// If there's already a message prepared by a previous call to zmq_poll, // If there's already a message prepared by a previous call to zmq_poll,
......
...@@ -93,9 +93,13 @@ class xsub_t : public socket_base_t ...@@ -93,9 +93,13 @@ class xsub_t : public socket_base_t
bool _has_message; bool _has_message;
msg_t _message; msg_t _message;
// If true, part of a multipart message was already sent, but
// there are following parts still waiting.
bool _more_send;
// If true, part of a multipart message was already received, but // If true, part of a multipart message was already received, but
// there are following parts still waiting. // there are following parts still waiting.
bool _more; bool _more_recv;
xsub_t (const xsub_t &); xsub_t (const xsub_t &);
const xsub_t &operator= (const xsub_t &); const xsub_t &operator= (const xsub_t &);
......
...@@ -456,6 +456,58 @@ void test_user_message () ...@@ -456,6 +456,58 @@ void test_user_message ()
test_context_socket_close (sub); test_context_socket_close (sub);
} }
void test_user_message_multi ()
{
// Create a publisher
void *pub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
// Create a subscriber
void *sub = test_context_socket (ZMQ_XSUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
// Send some data that is neither sub nor unsub
const uint8_t msg_common[] = {'A', 'B', 'C'};
// Message starts with 0 but should still treated as user
const uint8_t msg_0[] = {0, 'B', 'C'};
// Message starts with 1 but should still treated as user
const uint8_t msg_1[] = {1, 'B', 'C'};
// Test second message starting with 0
send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
send_array_expect_success (sub, msg_0, 0);
// Receive messages from subscriber
recv_array_expect_success (pub, msg_common, 0);
recv_array_expect_success (pub, msg_0, 0);
// Test second message starting with 1
send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
send_array_expect_success (sub, msg_1, 0);
// Receive messages from subscriber
recv_array_expect_success (pub, msg_common, 0);
recv_array_expect_success (pub, msg_1, 0);
char buffer[255];
// Test first message starting with 0
send_array_expect_success (sub, msg_0, 0);
// wait
msleep (SETTLE_TIME);
int rc = zmq_recv (pub, buffer, sizeof (buffer), ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
// Test first message starting with 1
send_array_expect_success (sub, msg_1, 0);
recv_array_expect_success (pub, msg_1, 0);
// Clean up.
test_context_socket_close (pub);
test_context_socket_close (sub);
}
int main () int main ()
{ {
setup_test_environment (); setup_test_environment ();
...@@ -467,6 +519,7 @@ int main () ...@@ -467,6 +519,7 @@ int main ()
RUN_TEST (test_missing_subscriptions); RUN_TEST (test_missing_subscriptions);
RUN_TEST (test_unsubscribe_cleanup); RUN_TEST (test_unsubscribe_cleanup);
RUN_TEST (test_user_message); RUN_TEST (test_user_message);
RUN_TEST (test_user_message_multi);
return UNITY_END (); return UNITY_END ();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment