Commit 8e8fdcc9 authored by laplaceyang's avatar laplaceyang

fix bug zmq4.x PUB msg to ZMTP1.0 SUB server

parent f9d23143
...@@ -752,9 +752,18 @@ int zmq::stream_engine_t::process_identity_msg (msg_t *msg_) ...@@ -752,9 +752,18 @@ int zmq::stream_engine_t::process_identity_msg (msg_t *msg_)
errno_assert (rc == 0); errno_assert (rc == 0);
} }
if (subscription_required) if (subscription_required) {
process_msg = &stream_engine_t::write_subscription_msg; msg_t subscription;
else
// Inject the subscription message, so that also
// ZMQ 2.x peers receive published messages.
int rc = subscription.init_size (1);
errno_assert (rc == 0);
*(unsigned char*) subscription.data () = 1;
rc = session->push_msg (&subscription);
errno_assert (rc == 0);
}
process_msg = &stream_engine_t::push_msg_to_session; process_msg = &stream_engine_t::push_msg_to_session;
return 0; return 0;
...@@ -947,23 +956,6 @@ int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_) ...@@ -947,23 +956,6 @@ int zmq::stream_engine_t::push_one_then_decode_and_push (msg_t *msg_)
return rc; return rc;
} }
int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
{
msg_t subscription;
// Inject the subscription message, so that also
// ZMQ 2.x peers receive published messages.
int rc = subscription.init_size (1);
errno_assert (rc == 0);
*(unsigned char*) subscription.data () = 1;
rc = session->push_msg (&subscription);
if (rc == -1)
return -1;
process_msg = &stream_engine_t::push_msg_to_session;
return push_msg_to_session (msg_);
}
void zmq::stream_engine_t::error (error_reason_t reason) void zmq::stream_engine_t::error (error_reason_t reason)
{ {
if (options.raw_socket && options.raw_notify) { if (options.raw_socket && options.raw_notify) {
......
...@@ -116,8 +116,6 @@ namespace zmq ...@@ -116,8 +116,6 @@ namespace zmq
void mechanism_ready (); void mechanism_ready ();
int write_subscription_msg (msg_t *msg_);
size_t add_property (unsigned char *ptr, size_t add_property (unsigned char *ptr,
const char *name, const void *value, size_t value_len); const char *name, const void *value, size_t value_len);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment