Commit c7542981 authored by Martin Sustrik's avatar Martin Sustrik

PGM transport reconciled with subscription forwarding

As PGM is not capable of passing subscriptions upstream,
subscriptions are ignored at sub side and engine subscribes
for all messages on pub side.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent d7adc3f1
...@@ -68,6 +68,9 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) ...@@ -68,6 +68,9 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
set_pollin (socket_handle); set_pollin (socket_handle);
sink = sink_; sink = sink_;
// If there are any subscriptions already queued in the session, drop them.
drop_subscriptions ();
} }
void zmq::pgm_receiver_t::unplug () void zmq::pgm_receiver_t::unplug ()
...@@ -101,7 +104,7 @@ void zmq::pgm_receiver_t::terminate () ...@@ -101,7 +104,7 @@ void zmq::pgm_receiver_t::terminate ()
void zmq::pgm_receiver_t::activate_out () void zmq::pgm_receiver_t::activate_out ()
{ {
zmq_assert (false); drop_subscriptions ();
} }
void zmq::pgm_receiver_t::activate_in () void zmq::pgm_receiver_t::activate_in ()
...@@ -255,5 +258,12 @@ void zmq::pgm_receiver_t::timer_event (int token) ...@@ -255,5 +258,12 @@ void zmq::pgm_receiver_t::timer_event (int token)
in_event (); in_event ();
} }
void zmq::pgm_receiver_t::drop_subscriptions ()
{
msg_t msg;
while (sink->read (&msg))
msg.close ();
}
#endif #endif
...@@ -64,6 +64,10 @@ namespace zmq ...@@ -64,6 +64,10 @@ namespace zmq
private: private:
// PGM is not able to move subscriptions upstream. Thus, drop all
// the pending subscriptions.
void drop_subscriptions ();
// RX timeout timer ID. // RX timeout timer ID.
enum {rx_timer_id = 0xa1}; enum {rx_timer_id = 0xa1};
......
...@@ -88,6 +88,15 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_) ...@@ -88,6 +88,15 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, i_engine_sink *sink_)
// Set POLLOUT for downlink_socket_handle. // Set POLLOUT for downlink_socket_handle.
set_pollout (handle); set_pollout (handle);
// PGM is not able to pass subscriptions upstream, thus we have no idea
// what messages are peers interested in. Because of that we have to
// subscribe for all the messages.
msg_t msg;
msg.init ();
bool ok = sink_->write (&msg);
zmq_assert (ok);
sink_->flush ();
} }
void zmq::pgm_sender_t::unplug () void zmq::pgm_sender_t::unplug ()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment