Commit d9307c9f authored by Martin Hurton's avatar Martin Hurton

Make ZMQ interoperate with ZMQ 2.x SUB sockets

Since ZMQ 2.x does not support subscription forwarding, it's not
possible to use ZMQ 2.x SUB socket to receive messages from a PUB
socket.

This patch adds some compatibility layer so that ZMQ 2.x SUB socket
receives messages from PUB socket.
parent dfc0222e
...@@ -401,6 +401,14 @@ bool zmq::stream_engine_t::handshake () ...@@ -401,6 +401,14 @@ bool zmq::stream_engine_t::handshake ()
// Make sure the decoder sees the data we have already received. // Make sure the decoder sees the data we have already received.
inpos = greeting; inpos = greeting;
insize = greeting_bytes_read; insize = greeting_bytes_read;
// To allow for interoperability with peers that do not forward
// their subscriptions, we inject a phony subsription
// message into the incomming message stream. To put this
// message right after the identity message, we temporarily
// divert the message stream from session to ourselves.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
decoder.set_msg_sink (this);
} }
// Start polling for output if necessary. // Start polling for output if necessary.
...@@ -414,6 +422,30 @@ bool zmq::stream_engine_t::handshake () ...@@ -414,6 +422,30 @@ bool zmq::stream_engine_t::handshake ()
return true; return true;
} }
int zmq::stream_engine_t::push_msg (msg_t *msg_)
{
zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB);
// The first message is identity.
// Let the session process it.
int rc = session->push_msg (msg_);
errno_assert (rc == 0);
// Inject the subscription message so that the ZMQ 2.x peer
// receives our messages.
rc = msg_->init_size (1);
errno_assert (rc == 0);
*(unsigned char*) msg_->data () = 1;
rc = session->push_msg (msg_);
session->flush ();
// Once we have injected the subscription message, we can
// Divert the message flow back to the session.
decoder.set_msg_sink (session);
return rc;
}
void zmq::stream_engine_t::error () void zmq::stream_engine_t::error ()
{ {
zmq_assert (session); zmq_assert (session);
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "fd.hpp" #include "fd.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "i_msg_sink.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "encoder.hpp" #include "encoder.hpp"
#include "decoder.hpp" #include "decoder.hpp"
...@@ -41,7 +42,7 @@ namespace zmq ...@@ -41,7 +42,7 @@ namespace zmq
// This engine handles any socket with SOCK_STREAM semantics, // This engine handles any socket with SOCK_STREAM semantics,
// e.g. TCP socket or an UNIX domain socket. // e.g. TCP socket or an UNIX domain socket.
class stream_engine_t : public io_object_t, public i_engine class stream_engine_t : public io_object_t, public i_engine, public i_msg_sink
{ {
public: public:
...@@ -55,6 +56,9 @@ namespace zmq ...@@ -55,6 +56,9 @@ namespace zmq
void activate_in (); void activate_in ();
void activate_out (); void activate_out ();
// i_msg_sink interface implementation.
virtual int push_msg (msg_t *msg_);
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
void out_event (); void out_event ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment