Commit 253e9dd2 authored by Luca Boccassi's avatar Luca Boccassi

Problem: libzmq does not send ZMTP 3.1 sub/cancel commands

Solution: if all peers of a socket are >= 3.1 use sub/cancel commands
instead of the old 0/1 messages.
For backward compatibility, move the handling of 0/1 or sub/cancel
command strings to the encoders, so that the right thing can be done
depending on the protocol version.
Do not set the command flag until the encoder, so that we can handle
the inproc case (which skips the encoder).
parent e7f802d1
...@@ -865,6 +865,7 @@ set(cxx-sources ...@@ -865,6 +865,7 @@ set(cxx-sources
v1_encoder.cpp v1_encoder.cpp
v2_decoder.cpp v2_decoder.cpp
v2_encoder.cpp v2_encoder.cpp
v3_1_encoder.cpp
xpub.cpp xpub.cpp
xsub.cpp xsub.cpp
zmq.cpp zmq.cpp
...@@ -1007,6 +1008,7 @@ set(cxx-sources ...@@ -1007,6 +1008,7 @@ set(cxx-sources
v1_encoder.hpp v1_encoder.hpp
v2_decoder.hpp v2_decoder.hpp
v2_encoder.hpp v2_encoder.hpp
v3_1_encoder.hpp
v2_protocol.hpp v2_protocol.hpp
vmci.hpp vmci.hpp
vmci_address.hpp vmci_address.hpp
......
...@@ -239,6 +239,8 @@ src_libzmq_la_SOURCES = \ ...@@ -239,6 +239,8 @@ src_libzmq_la_SOURCES = \
src/v1_encoder.hpp \ src/v1_encoder.hpp \
src/v2_encoder.cpp \ src/v2_encoder.cpp \
src/v2_encoder.hpp \ src/v2_encoder.hpp \
src/v3_1_encoder.cpp \
src/v3_1_encoder.hpp \
src/v2_protocol.hpp \ src/v2_protocol.hpp \
src/vmci.cpp \ src/vmci.cpp \
src/vmci.hpp \ src/vmci.hpp \
......
...@@ -212,6 +212,36 @@ int zmq::msg_t::init_leave () ...@@ -212,6 +212,36 @@ int zmq::msg_t::init_leave ()
return 0; return 0;
} }
int zmq::msg_t::init_subscribe (const size_t size_, const unsigned char *topic)
{
int rc = init_size (size_);
if (rc == 0) {
set_flags (zmq::msg_t::subscribe);
// We explicitly allow a NULL subscription with size zero
if (size_) {
assert (topic);
memcpy (data (), topic, size_);
}
}
return rc;
}
int zmq::msg_t::init_cancel (const size_t size_, const unsigned char *topic)
{
int rc = init_size (size_);
if (rc == 0) {
set_flags (zmq::msg_t::cancel);
// We explicitly allow a NULL subscription with size zero
if (size_) {
assert (topic);
memcpy (data (), topic, size_);
}
}
return rc;
}
int zmq::msg_t::close () int zmq::msg_t::close ()
{ {
// Check the validity of the message. // Check the validity of the message.
...@@ -487,9 +517,12 @@ size_t zmq::msg_t::command_body_size () const ...@@ -487,9 +517,12 @@ size_t zmq::msg_t::command_body_size () const
{ {
if (this->is_ping () || this->is_pong ()) if (this->is_ping () || this->is_pong ())
return this->size () - ping_cmd_name_size; return this->size () - ping_cmd_name_size;
if (this->is_subscribe ()) else if (!(this->flags () & msg_t::command)
&& (this->is_subscribe () || this->is_cancel ()))
return this->size ();
else if (this->is_subscribe ())
return this->size () - sub_cmd_name_size; return this->size () - sub_cmd_name_size;
if (this->is_cancel ()) else if (this->is_cancel ())
return this->size () - cancel_cmd_name_size; return this->size () - cancel_cmd_name_size;
return 0; return 0;
...@@ -498,12 +531,17 @@ size_t zmq::msg_t::command_body_size () const ...@@ -498,12 +531,17 @@ size_t zmq::msg_t::command_body_size () const
void *zmq::msg_t::command_body () void *zmq::msg_t::command_body ()
{ {
unsigned char *data = NULL; unsigned char *data = NULL;
if (this->is_ping () || this->is_pong ()) if (this->is_ping () || this->is_pong ())
data = data =
static_cast<unsigned char *> (this->data ()) + ping_cmd_name_size; static_cast<unsigned char *> (this->data ()) + ping_cmd_name_size;
if (this->is_subscribe ()) // With inproc, command flag is not set for sub/cancel
else if (!(this->flags () & msg_t::command)
&& (this->is_subscribe () || this->is_cancel ()))
data = static_cast<unsigned char *> (this->data ());
else if (this->is_subscribe ())
data = static_cast<unsigned char *> (this->data ()) + sub_cmd_name_size; data = static_cast<unsigned char *> (this->data ()) + sub_cmd_name_size;
if (this->is_cancel ()) else if (this->is_cancel ())
data = data =
static_cast<unsigned char *> (this->data ()) + cancel_cmd_name_size; static_cast<unsigned char *> (this->data ()) + cancel_cmd_name_size;
......
...@@ -54,6 +54,9 @@ namespace zmq ...@@ -54,6 +54,9 @@ namespace zmq
// Note that this structure needs to be explicitly constructed // Note that this structure needs to be explicitly constructed
// (init functions) and destructed (close function). // (init functions) and destructed (close function).
static const char cancel_cmd_name[] = "\6CANCEL";
static const char sub_cmd_name[] = "\x9SUBSCRIBE";
class msg_t class msg_t
{ {
public: public:
...@@ -109,6 +112,8 @@ class msg_t ...@@ -109,6 +112,8 @@ class msg_t
int init_delimiter (); int init_delimiter ();
int init_join (); int init_join ();
int init_leave (); int init_leave ();
int init_subscribe (const size_t size_, const unsigned char *topic);
int init_cancel (const size_t size_, const unsigned char *topic);
int close (); int close ();
int move (msg_t &src_); int move (msg_t &src_);
int copy (msg_t &src_); int copy (msg_t &src_);
......
...@@ -56,15 +56,15 @@ int zmq::sub_t::xsetsockopt (int option_, ...@@ -56,15 +56,15 @@ int zmq::sub_t::xsetsockopt (int option_,
// Create the subscription message. // Create the subscription message.
msg_t msg; msg_t msg;
int rc = msg.init_size (optvallen_ + 1); int rc;
errno_assert (rc == 0); const unsigned char *data = static_cast<const unsigned char *> (optval_);
unsigned char *data = static_cast<unsigned char *> (msg.data ()); if (option_ == ZMQ_SUBSCRIBE) {
*data = (option_ == ZMQ_SUBSCRIBE); rc = msg.init_subscribe (optvallen_, data);
// We explicitly allow a NULL subscription with size zero } else {
if (optvallen_) { rc = msg.init_cancel (optvallen_, data);
assert (optval_);
memcpy (data + 1, optval_, optvallen_);
} }
errno_assert (rc == 0);
// Pass it further on in the stack. // Pass it further on in the stack.
rc = xsub_t::xsend (&msg); rc = xsub_t::xsend (&msg);
return close_and_return (&msg, rc); return close_and_return (&msg, rc);
......
...@@ -55,23 +55,41 @@ void zmq::v1_encoder_t::size_ready () ...@@ -55,23 +55,41 @@ void zmq::v1_encoder_t::size_ready ()
void zmq::v1_encoder_t::message_ready () void zmq::v1_encoder_t::message_ready ()
{ {
size_t header_size = 2; // flags byte + size byte
// Get the message size. // Get the message size.
size_t size = in_progress ()->size (); size_t size = in_progress ()->size ();
// Account for the 'flags' byte. // Account for the 'flags' byte.
size++; size++;
// Account for the subscribe/cancel byte.
if (in_progress ()->is_subscribe () || in_progress ()->is_cancel ())
size++;
// For messages less than 255 bytes long, write one byte of message size. // For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte // For longer messages write 0xff escape character followed by 8-byte
// message size. In both cases 'flags' field follows. // message size. In both cases 'flags' field follows.
if (size < UCHAR_MAX) { if (size < UCHAR_MAX) {
_tmpbuf[0] = static_cast<unsigned char> (size); _tmpbuf[0] = static_cast<unsigned char> (size);
_tmpbuf[1] = (in_progress ()->flags () & msg_t::more); _tmpbuf[1] = (in_progress ()->flags () & msg_t::more);
next_step (_tmpbuf, 2, &v1_encoder_t::size_ready, false);
} else { } else {
_tmpbuf[0] = UCHAR_MAX; _tmpbuf[0] = UCHAR_MAX;
put_uint64 (_tmpbuf + 1, size); put_uint64 (_tmpbuf + 1, size);
_tmpbuf[9] = (in_progress ()->flags () & msg_t::more); _tmpbuf[9] = (in_progress ()->flags () & msg_t::more);
next_step (_tmpbuf, 10, &v1_encoder_t::size_ready, false); header_size = 10;
} }
// Encode the subscribe/cancel byte. This is done in the encoder as
// opposed to when the subscribe message is created to allow different
// protocol behaviour on the wire in the v3.1 and legacy encoders.
// It results in the work being done multiple times in case the sub
// is sending the subscription/cancel to multiple pubs, but it cannot
// be avoided. This processing can be moved to xsub once support for
// ZMTP < 3.1 is dropped.
if (in_progress ()->is_subscribe ())
_tmpbuf[header_size++] = 1;
else if (in_progress ()->is_cancel ())
_tmpbuf[header_size++] = 0;
next_step (_tmpbuf, header_size, &v1_encoder_t::size_ready, false);
} }
...@@ -46,7 +46,7 @@ class v1_encoder_t ZMQ_FINAL : public encoder_base_t<v1_encoder_t> ...@@ -46,7 +46,7 @@ class v1_encoder_t ZMQ_FINAL : public encoder_base_t<v1_encoder_t>
void size_ready (); void size_ready ();
void message_ready (); void message_ready ();
unsigned char _tmpbuf[10]; unsigned char _tmpbuf[11];
ZMQ_NON_COPYABLE_NOR_MOVABLE (v1_encoder_t) ZMQ_NON_COPYABLE_NOR_MOVABLE (v1_encoder_t)
}; };
......
...@@ -50,6 +50,8 @@ zmq::v2_encoder_t::~v2_encoder_t () ...@@ -50,6 +50,8 @@ zmq::v2_encoder_t::~v2_encoder_t ()
void zmq::v2_encoder_t::message_ready () void zmq::v2_encoder_t::message_ready ()
{ {
// Encode flags. // Encode flags.
size_t size = in_progress ()->size ();
size_t header_size = 2; // flags byte + size byte
unsigned char &protocol_flags = _tmp_buf[0]; unsigned char &protocol_flags = _tmp_buf[0];
protocol_flags = 0; protocol_flags = 0;
if (in_progress ()->flags () & msg_t::more) if (in_progress ()->flags () & msg_t::more)
...@@ -58,18 +60,32 @@ void zmq::v2_encoder_t::message_ready () ...@@ -58,18 +60,32 @@ void zmq::v2_encoder_t::message_ready ()
protocol_flags |= v2_protocol_t::large_flag; protocol_flags |= v2_protocol_t::large_flag;
if (in_progress ()->flags () & msg_t::command) if (in_progress ()->flags () & msg_t::command)
protocol_flags |= v2_protocol_t::command_flag; protocol_flags |= v2_protocol_t::command_flag;
if (in_progress ()->is_subscribe () || in_progress ()->is_cancel ())
++size;
// Encode the message length. For messages less then 256 bytes, // Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger // the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used. // messages, 64-bit unsigned integer in network byte order is used.
const size_t size = in_progress ()->size ();
if (unlikely (size > UCHAR_MAX)) { if (unlikely (size > UCHAR_MAX)) {
put_uint64 (_tmp_buf + 1, size); put_uint64 (_tmp_buf + 1, size);
next_step (_tmp_buf, 9, &v2_encoder_t::size_ready, false); header_size = 9; // flags byte + size 8 bytes
} else { } else {
_tmp_buf[1] = static_cast<uint8_t> (size); _tmp_buf[1] = static_cast<uint8_t> (size);
next_step (_tmp_buf, 2, &v2_encoder_t::size_ready, false);
} }
// Encode the subscribe/cancel byte. This is done in the encoder as
// opposed to when the subscribe message is created to allow different
// protocol behaviour on the wire in the v3.1 and legacy encoders.
// It results in the work being done multiple times in case the sub
// is sending the subscription/cancel to multiple pubs, but it cannot
// be avoided. This processing can be moved to xsub once support for
// ZMTP < 3.1 is dropped.
if (in_progress ()->is_subscribe ())
_tmp_buf[header_size++] = 1;
else if (in_progress ()->is_cancel ())
_tmp_buf[header_size++] = 0;
next_step (_tmp_buf, header_size, &v2_encoder_t::size_ready, false);
} }
void zmq::v2_encoder_t::size_ready () void zmq::v2_encoder_t::size_ready ()
......
...@@ -46,7 +46,8 @@ class v2_encoder_t ZMQ_FINAL : public encoder_base_t<v2_encoder_t> ...@@ -46,7 +46,8 @@ class v2_encoder_t ZMQ_FINAL : public encoder_base_t<v2_encoder_t>
void size_ready (); void size_ready ();
void message_ready (); void message_ready ();
unsigned char _tmp_buf[9]; // flags byte + size byte (or 8 bytes) + sub/cancel byte
unsigned char _tmp_buf[10];
ZMQ_NON_COPYABLE_NOR_MOVABLE (v2_encoder_t) ZMQ_NON_COPYABLE_NOR_MOVABLE (v2_encoder_t)
}; };
......
/*
Copyright (c) 2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "v2_protocol.hpp"
#include "v3_1_encoder.hpp"
#include "msg.hpp"
#include "likely.hpp"
#include "wire.hpp"
#include <limits.h>
zmq::v3_1_encoder_t::v3_1_encoder_t (size_t bufsize_) :
encoder_base_t<v3_1_encoder_t> (bufsize_)
{
// Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &v3_1_encoder_t::message_ready, true);
}
zmq::v3_1_encoder_t::~v3_1_encoder_t ()
{
}
void zmq::v3_1_encoder_t::message_ready ()
{
// Encode flags.
size_t size = in_progress ()->size ();
size_t header_size = 2; // flags byte + size byte
unsigned char &protocol_flags = _tmp_buf[0];
protocol_flags = 0;
if (in_progress ()->flags () & msg_t::more)
protocol_flags |= v2_protocol_t::more_flag;
if (in_progress ()->size () > UCHAR_MAX)
protocol_flags |= v2_protocol_t::large_flag;
if (in_progress ()->flags () & msg_t::command
|| in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) {
protocol_flags |= v2_protocol_t::command_flag;
if (in_progress ()->is_subscribe ())
size += zmq::msg_t::sub_cmd_name_size;
else if (in_progress ()->is_cancel ())
size += zmq::msg_t::cancel_cmd_name_size;
}
// Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used.
if (unlikely (size > UCHAR_MAX)) {
put_uint64 (_tmp_buf + 1, size);
header_size = 9; // flags byte + size 8 bytes
} else {
_tmp_buf[1] = static_cast<uint8_t> (size);
}
// Encode the sub/cancel command string. This is done in the encoder as
// opposed to when the subscribe message is created to allow different
// protocol behaviour on the wire in the v3.1 and legacy encoders.
// It results in the work being done multiple times in case the sub
// is sending the subscription/cancel to multiple pubs, but it cannot
// be avoided. This processing can be moved to xsub once support for
// ZMTP < 3.1 is dropped.
if (in_progress ()->is_subscribe ()) {
memcpy (_tmp_buf + header_size, zmq::sub_cmd_name,
zmq::msg_t::sub_cmd_name_size);
header_size += zmq::msg_t::sub_cmd_name_size;
} else if (in_progress ()->is_cancel ()) {
memcpy (_tmp_buf + header_size, zmq::cancel_cmd_name,
zmq::msg_t::cancel_cmd_name_size);
header_size += zmq::msg_t::cancel_cmd_name_size;
}
next_step (_tmp_buf, header_size, &v3_1_encoder_t::size_ready, false);
}
void zmq::v3_1_encoder_t::size_ready ()
{
// Write message body into the buffer.
next_step (in_progress ()->data (), in_progress ()->size (),
&v3_1_encoder_t::message_ready, true);
}
/*
Copyright (c) 2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_V3_1_ENCODER_HPP_INCLUDED__
#define __ZMQ_V3_1_ENCODER_HPP_INCLUDED__
#include "encoder.hpp"
#include "msg.hpp"
namespace zmq
{
// Encoder for 0MQ framing protocol. Converts messages into data stream.
class v3_1_encoder_t ZMQ_FINAL : public encoder_base_t<v3_1_encoder_t>
{
public:
v3_1_encoder_t (size_t bufsize_);
~v3_1_encoder_t () ZMQ_FINAL;
private:
void size_ready ();
void message_ready ();
unsigned char _tmp_buf[9 + zmq::msg_t::sub_cmd_name_size];
ZMQ_NON_COPYABLE_NOR_MOVABLE (v3_1_encoder_t)
};
}
#endif
...@@ -102,6 +102,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -102,6 +102,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
size_t size = 0; size_t size = 0;
bool subscribe = false; bool subscribe = false;
bool is_subscribe_or_cancel = false; bool is_subscribe_or_cancel = false;
bool notify = false;
const bool first_part = !_more_recv; const bool first_part = !_more_recv;
_more_recv = (msg.flags () & msg_t::more) != 0; _more_recv = (msg.flags () & msg_t::more) != 0;
...@@ -144,53 +145,46 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) ...@@ -144,53 +145,46 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
_manual_subscriptions.add (data, size, pipe_); _manual_subscriptions.add (data, size, pipe_);
_pending_pipes.push_back (pipe_); _pending_pipes.push_back (pipe_);
} else {
if (!subscribe) {
const mtrie_t::rm_result rm_result =
_subscriptions.rm (data, size, pipe_);
// TODO reconsider what to do if rm_result == mtrie_t::not_found
notify = rm_result != mtrie_t::values_remain || _verbose_unsubs;
} else {
const bool first_added = _subscriptions.add (data, size, pipe_);
notify = first_added || _verbose_subs;
}
}
// If the request was a new subscription, or the subscription
// was removed, or verbose mode or manual mode are enabled, store it
// so that it can be passed to the user on next recv call.
if (_manual || (options.type == ZMQ_XPUB && notify)) {
// ZMTP 3.1 hack: we need to support sub/cancel commands, but // ZMTP 3.1 hack: we need to support sub/cancel commands, but
// we can't give them back to userspace as it would be an API // we can't give them back to userspace as it would be an API
// breakage since the payload of the message is completely // breakage since the payload of the message is completely
// different. Manually craft an old-style message instead. // different. Manually craft an old-style message instead.
data = data - 1; // Although with other transports it would be possible to simply
size = size + 1; // reuse the same buffer and prefix a 0/1 byte to the topic, with
// inproc the subscribe/cancel command string is not present in
// the message, so this optimization is not possible.
// The pushback makes a copy of the data array anyway, so the
// number of buffer copies does not change.
blob_t notification (size + 1);
if (subscribe) if (subscribe)
*data = 1; *notification.data () = 1;
else else
*data = 0; *notification.data () = 0;
memcpy (notification.data () + 1, data, size);
_pending_data.push_back (blob_t (data, size)); _pending_data.push_back (ZMQ_MOVE (notification));
if (metadata) if (metadata)
metadata->add_ref (); metadata->add_ref ();
_pending_metadata.push_back (metadata); _pending_metadata.push_back (metadata);
_pending_flags.push_back (0); _pending_flags.push_back (0);
} else {
bool notify;
if (!subscribe) {
const mtrie_t::rm_result rm_result =
_subscriptions.rm (data, size, pipe_);
// TODO reconsider what to do if rm_result == mtrie_t::not_found
notify = rm_result != mtrie_t::values_remain || _verbose_unsubs;
} else {
const bool first_added = _subscriptions.add (data, size, pipe_);
notify = first_added || _verbose_subs;
}
// If the request was a new subscription, or the subscription
// was removed, or verbose mode is enabled, store it so that
// it can be passed to the user on next recv call.
if (options.type == ZMQ_XPUB && notify) {
data = data - 1;
size = size + 1;
if (subscribe)
*data = 1;
else
*data = 0;
_pending_data.push_back (blob_t (data, size));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (0);
}
} }
msg.close (); msg.close ();
} }
} }
......
...@@ -135,10 +135,7 @@ int zmq::xsub_t::xsend (msg_t *msg_) ...@@ -135,10 +135,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
// however this is alread done on the XPUB side and // however this is alread done on the XPUB side and
// doing it here as well breaks ZMQ_XPUB_VERBOSE // doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved. // when there are forwarding devices involved.
if (msg_->is_subscribe ()) { if (!msg_->is_subscribe ()) {
data = static_cast<unsigned char *> (msg_->command_body ());
size = msg_->command_body_size ();
} else {
data = data + 1; data = data + 1;
size = size - 1; size = size - 1;
} }
...@@ -148,10 +145,7 @@ int zmq::xsub_t::xsend (msg_t *msg_) ...@@ -148,10 +145,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
} }
if (msg_->is_cancel () || (size > 0 && *data == 0)) { if (msg_->is_cancel () || (size > 0 && *data == 0)) {
// Process unsubscribe message // Process unsubscribe message
if (msg_->is_cancel ()) { if (!msg_->is_cancel ()) {
data = static_cast<unsigned char *> (msg_->command_body ());
size = msg_->command_body_size ();
} else {
data = data + 1; data = data + 1;
size = size - 1; size = size - 1;
} }
...@@ -271,16 +265,8 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, ...@@ -271,16 +265,8 @@ void zmq::xsub_t::send_subscription (unsigned char *data_,
// Create the subscription message. // Create the subscription message.
msg_t msg; msg_t msg;
const int rc = msg.init_size (size_ + 1); const int rc = msg.init_subscribe (size_, data_);
errno_assert (rc == 0); errno_assert (rc == 0);
unsigned char *data = static_cast<unsigned char *> (msg.data ());
data[0] = 1;
// We explicitly allow a NULL subscription with size zero
if (size_) {
assert (data_);
memcpy (data + 1, data_, size_);
}
// Send it to the pipe. // Send it to the pipe.
const bool sent = pipe->write (&msg); const bool sent = pipe->write (&msg);
......
...@@ -47,6 +47,7 @@ ...@@ -47,6 +47,7 @@
#include "v1_decoder.hpp" #include "v1_decoder.hpp"
#include "v2_encoder.hpp" #include "v2_encoder.hpp"
#include "v2_decoder.hpp" #include "v2_decoder.hpp"
#include "v3_1_encoder.hpp"
#include "null_mechanism.hpp" #include "null_mechanism.hpp"
#include "plain_client.hpp" #include "plain_client.hpp"
#include "plain_server.hpp" #include "plain_server.hpp"
...@@ -115,8 +116,9 @@ void zmq::zmtp_engine_t::plug_internal () ...@@ -115,8 +116,9 @@ void zmq::zmtp_engine_t::plug_internal ()
in_event (); in_event ();
} }
// Position of the revision field in the greeting. // Position of the revision and minor fields in the greeting.
const size_t revision_pos = 10; const size_t revision_pos = 10;
const size_t minor_pos = 11;
bool zmq::zmtp_engine_t::handshake () bool zmq::zmtp_engine_t::handshake ()
{ {
...@@ -128,8 +130,8 @@ bool zmq::zmtp_engine_t::handshake () ...@@ -128,8 +130,8 @@ bool zmq::zmtp_engine_t::handshake ()
const bool unversioned = rc != 0; const bool unversioned = rc != 0;
if (!(this if (!(this
->*select_handshake_fun (unversioned, ->*select_handshake_fun (unversioned, _greeting_recv[revision_pos],
_greeting_recv[revision_pos])) ()) _greeting_recv[minor_pos])) ())
return false; return false;
// Start polling for output if necessary. // Start polling for output if necessary.
...@@ -228,9 +230,8 @@ void zmq::zmtp_engine_t::receive_greeting_versioned () ...@@ -228,9 +230,8 @@ void zmq::zmtp_engine_t::receive_greeting_versioned ()
} }
} }
zmq::zmtp_engine_t::handshake_fun_t zmq::zmtp_engine_t::handshake_fun_t zmq::zmtp_engine_t::select_handshake_fun (
zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_, bool unversioned_, unsigned char revision_, unsigned char minor_)
unsigned char revision_)
{ {
// Is the peer using ZMTP/1.0 with no revision number? // Is the peer using ZMTP/1.0 with no revision number?
if (unversioned_) { if (unversioned_) {
...@@ -241,8 +242,15 @@ zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_, ...@@ -241,8 +242,15 @@ zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_,
return &zmtp_engine_t::handshake_v1_0; return &zmtp_engine_t::handshake_v1_0;
case ZMTP_2_0: case ZMTP_2_0:
return &zmtp_engine_t::handshake_v2_0; return &zmtp_engine_t::handshake_v2_0;
case ZMTP_3_x:
switch (minor_) {
case 0:
return &zmtp_engine_t::handshake_v3_0;
default:
return &zmtp_engine_t::handshake_v3_1;
}
default: default:
return &zmtp_engine_t::handshake_v3_0; return &zmtp_engine_t::handshake_v3_1;
} }
} }
...@@ -339,15 +347,8 @@ bool zmq::zmtp_engine_t::handshake_v2_0 () ...@@ -339,15 +347,8 @@ bool zmq::zmtp_engine_t::handshake_v2_0 ()
return true; return true;
} }
bool zmq::zmtp_engine_t::handshake_v3_0 () bool zmq::zmtp_engine_t::handshake_v3_x ()
{ {
_encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
alloc_assert (_encoder);
_decoder = new (std::nothrow) v2_decoder_t (
_options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
alloc_assert (_decoder);
if (_options.mechanism == ZMQ_NULL if (_options.mechanism == ZMQ_NULL
&& memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
20) 20)
...@@ -408,6 +409,30 @@ bool zmq::zmtp_engine_t::handshake_v3_0 () ...@@ -408,6 +409,30 @@ bool zmq::zmtp_engine_t::handshake_v3_0 ()
return true; return true;
} }
bool zmq::zmtp_engine_t::handshake_v3_0 ()
{
_encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
alloc_assert (_encoder);
_decoder = new (std::nothrow) v2_decoder_t (
_options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
alloc_assert (_decoder);
return zmq::zmtp_engine_t::handshake_v3_x ();
}
bool zmq::zmtp_engine_t::handshake_v3_1 ()
{
_encoder = new (std::nothrow) v3_1_encoder_t (_options.out_batch_size);
alloc_assert (_encoder);
_decoder = new (std::nothrow) v2_decoder_t (
_options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
alloc_assert (_decoder);
return zmq::zmtp_engine_t::handshake_v3_x ();
}
int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_) int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_)
{ {
const int rc = msg_->init_size (_options.routing_id_size); const int rc = msg_->init_size (_options.routing_id_size);
......
...@@ -49,7 +49,8 @@ namespace zmq ...@@ -49,7 +49,8 @@ namespace zmq
enum enum
{ {
ZMTP_1_0 = 0, ZMTP_1_0 = 0,
ZMTP_2_0 = 1 ZMTP_2_0 = 1,
ZMTP_3_x = 3
}; };
class io_thread_t; class io_thread_t;
...@@ -85,12 +86,15 @@ class zmtp_engine_t ZMQ_FINAL : public stream_engine_base_t ...@@ -85,12 +86,15 @@ class zmtp_engine_t ZMQ_FINAL : public stream_engine_base_t
typedef bool (zmtp_engine_t::*handshake_fun_t) (); typedef bool (zmtp_engine_t::*handshake_fun_t) ();
static handshake_fun_t select_handshake_fun (bool unversioned, static handshake_fun_t select_handshake_fun (bool unversioned,
unsigned char revision); unsigned char revision,
unsigned char minor);
bool handshake_v1_0_unversioned (); bool handshake_v1_0_unversioned ();
bool handshake_v1_0 (); bool handshake_v1_0 ();
bool handshake_v2_0 (); bool handshake_v2_0 ();
bool handshake_v3_x ();
bool handshake_v3_0 (); bool handshake_v3_0 ();
bool handshake_v3_1 ();
int routing_id_msg (msg_t *msg_); int routing_id_msg (msg_t *msg_);
int process_routing_id_msg (msg_t *msg_); int process_routing_id_msg (msg_t *msg_);
......
...@@ -81,7 +81,7 @@ static void recv_with_retry (raw_socket fd_, char *buffer_, int bytes_) ...@@ -81,7 +81,7 @@ static void recv_with_retry (raw_socket fd_, char *buffer_, int bytes_)
} }
} }
static void mock_handshake (raw_socket fd_) static void mock_handshake (raw_socket fd_, bool sub_command, bool mock_pub)
{ {
const uint8_t zmtp_greeting[33] = {0xff, 0, 0, 0, 0, 0, 0, 0, 0, const uint8_t zmtp_greeting[33] = {0xff, 0, 0, 0, 0, 0, 0, 0, 0,
0x7f, 3, 0, 'N', 'U', 'L', 'L', 0}; 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0};
...@@ -89,31 +89,44 @@ static void mock_handshake (raw_socket fd_) ...@@ -89,31 +89,44 @@ static void mock_handshake (raw_socket fd_)
memset (buffer, 0, sizeof (buffer)); memset (buffer, 0, sizeof (buffer));
memcpy (buffer, zmtp_greeting, sizeof (zmtp_greeting)); memcpy (buffer, zmtp_greeting, sizeof (zmtp_greeting));
// Mock ZMTP 3.1 which uses commands
if (sub_command) {
buffer[11] = 1;
}
int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 64, 0)); int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 64, 0));
TEST_ASSERT_EQUAL_INT (64, rc); TEST_ASSERT_EQUAL_INT (64, rc);
recv_with_retry (fd_, buffer, 64); recv_with_retry (fd_, buffer, 64);
const uint8_t zmtp_ready[27] = { if (!mock_pub) {
4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', const uint8_t zmtp_ready[27] = {
't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'}; 4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e',
't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
send (fd_, (const char *) zmtp_ready, 27, 0));
TEST_ASSERT_EQUAL_INT (27, rc);
} else {
const uint8_t zmtp_ready[28] = {
4, 26, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e',
't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 4, 'X', 'P', 'U', 'B'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
send (fd_, (const char *) zmtp_ready, 28, 0));
TEST_ASSERT_EQUAL_INT (28, rc);
}
// greeting - XPUB has one extra byte
memset (buffer, 0, sizeof (buffer)); memset (buffer, 0, sizeof (buffer));
memcpy (buffer, zmtp_ready, 27); recv_with_retry (fd_, buffer, mock_pub ? 27 : 28);
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 27, 0));
TEST_ASSERT_EQUAL_INT (27, rc);
// greeting - XPUB so one extra byte
recv_with_retry (fd_, buffer, 28);
} }
static void prep_server_socket (void **server_out_, static void prep_server_socket (void **server_out_,
void **mon_out_, void **mon_out_,
char *endpoint_, char *endpoint_,
size_t ep_length_) size_t ep_length_,
int socket_type)
{ {
// We'll be using this socket in raw mode // We'll be using this socket in raw mode
void *server = test_context_socket (ZMQ_XPUB); void *server = test_context_socket (socket_type);
int value = 0; int value = 0;
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
...@@ -136,13 +149,14 @@ static void prep_server_socket (void **server_out_, ...@@ -136,13 +149,14 @@ static void prep_server_socket (void **server_out_,
*mon_out_ = server_mon; *mon_out_ = server_mon;
} }
static void test_mock_sub (bool sub_command_) static void test_mock_pub_sub (bool sub_command_, bool mock_pub_)
{ {
int rc; int rc;
char my_endpoint[MAX_SOCKET_STRING]; char my_endpoint[MAX_SOCKET_STRING];
void *server, *server_mon; void *server, *server_mon;
prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING); prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING,
mock_pub_ ? ZMQ_SUB : ZMQ_XPUB);
struct sockaddr_in ip4addr; struct sockaddr_in ip4addr;
raw_socket s; raw_socket s;
...@@ -161,36 +175,62 @@ static void test_mock_sub (bool sub_command_) ...@@ -161,36 +175,62 @@ static void test_mock_sub (bool sub_command_)
TEST_ASSERT_GREATER_THAN_INT (-1, rc); TEST_ASSERT_GREATER_THAN_INT (-1, rc);
// Mock a ZMTP 3 client so we can forcibly try sub commands // Mock a ZMTP 3 client so we can forcibly try sub commands
mock_handshake (s); mock_handshake (s, sub_command_, mock_pub_);
// By now everything should report as connected // By now everything should report as connected
rc = get_monitor_event (server_mon); rc = get_monitor_event (server_mon);
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc); TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc);
if (sub_command_) { char buffer[32];
const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S',
'C', 'R', 'I', 'B', 'E', 'A'};
rc =
TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 13, 0));
TEST_ASSERT_EQUAL_INT (13, rc);
} else {
const uint8_t sub[4] = {0, 2, 1, 'A'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 4, 0));
TEST_ASSERT_EQUAL_INT (4, rc);
}
char buffer[16];
memset (buffer, 0, sizeof (buffer)); memset (buffer, 0, sizeof (buffer));
rc = zmq_recv (server, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2));
rc = zmq_send (server, "ALOL", 4, 0); if (mock_pub_) {
TEST_ASSERT_EQUAL_INT (4, rc); rc = zmq_setsockopt (server, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
// SUB binds, let its state machine run
zmq_recv (server, buffer, 16, ZMQ_DONTWAIT);
if (sub_command_) {
recv_with_retry (s, buffer, 13);
TEST_ASSERT_EQUAL_INT (0,
memcmp (buffer, "\4\xb\x9SUBSCRIBEA", 13));
} else {
recv_with_retry (s, buffer, 4);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\2\1A", 4));
}
memcpy (buffer, "\0\4ALOL", 6);
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, buffer, 6, 0));
TEST_ASSERT_EQUAL_INT (6, rc);
memset (buffer, 0, sizeof (buffer)); memset (buffer, 0, sizeof (buffer));
recv_with_retry (s, buffer, 6); rc = zmq_recv (server, buffer, 4, 0);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6)); TEST_ASSERT_EQUAL_INT (4, rc);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "ALOL", 4));
} else {
if (sub_command_) {
const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S',
'C', 'R', 'I', 'B', 'E', 'A'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
send (s, (const char *) sub, 13, 0));
TEST_ASSERT_EQUAL_INT (13, rc);
} else {
const uint8_t sub[4] = {0, 2, 1, 'A'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
send (s, (const char *) sub, 4, 0));
TEST_ASSERT_EQUAL_INT (4, rc);
}
rc = zmq_recv (server, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2));
rc = zmq_send (server, "ALOL", 4, 0);
TEST_ASSERT_EQUAL_INT (4, rc);
memset (buffer, 0, sizeof (buffer));
recv_with_retry (s, buffer, 6);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6));
}
close (s); close (s);
...@@ -200,12 +240,22 @@ static void test_mock_sub (bool sub_command_) ...@@ -200,12 +240,22 @@ static void test_mock_sub (bool sub_command_)
void test_mock_sub_command () void test_mock_sub_command ()
{ {
test_mock_sub (true); test_mock_pub_sub (true, false);
} }
void test_mock_sub_legacy () void test_mock_sub_legacy ()
{ {
test_mock_sub (false); test_mock_pub_sub (false, false);
}
void test_mock_pub_command ()
{
test_mock_pub_sub (true, true);
}
void test_mock_pub_legacy ()
{
test_mock_pub_sub (false, true);
} }
int main (void) int main (void)
...@@ -216,6 +266,8 @@ int main (void) ...@@ -216,6 +266,8 @@ int main (void)
RUN_TEST (test_mock_sub_command); RUN_TEST (test_mock_sub_command);
RUN_TEST (test_mock_sub_legacy); RUN_TEST (test_mock_sub_legacy);
RUN_TEST (test_mock_pub_command);
RUN_TEST (test_mock_pub_legacy);
return UNITY_END (); return UNITY_END ();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment