Commit acf0b0e5 authored by Martin Sustrik's avatar Martin Sustrik

Introduces bi-directional pipes

So far, there was a pair of unidirectional pipes between a socket
and a session (or an inproc peer). This resulted in complex
problems with half-closed states and tracking which inpipe
corresponds to which outpipe.

This patch doesn't add any functionality in itself, but is
essential for further work on features like subscription
forwarding.
Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent 9e6b3992
...@@ -28,14 +28,17 @@ namespace zmq ...@@ -28,14 +28,17 @@ namespace zmq
{ {
// Base class for objects stored in the array. Note that each object can // Base class for objects stored in the array. Note that each object can
// be stored in at most one array. // be stored in at most two arrays. This is needed specifically in the
// case where single pipe object is stored both in array of inbound pipes
// and in the array of outbound pipes.
class array_item_t class array_item_t
{ {
public: public:
inline array_item_t () : inline array_item_t () :
array_index (-1) array_index1 (-1),
array_index2 (-1)
{ {
} }
...@@ -45,19 +48,30 @@ namespace zmq ...@@ -45,19 +48,30 @@ namespace zmq
{ {
} }
inline void set_array_index (int index_) inline void set_array_index1 (int index_)
{ {
array_index = index_; array_index1 = index_;
} }
inline int get_array_index () inline int get_array_index1 ()
{ {
return array_index; return array_index1;
}
inline void set_array_index2 (int index_)
{
array_index2 = index_;
}
inline int get_array_index2 ()
{
return array_index2;
} }
private: private:
int array_index; int array_index1;
int array_index2;
array_item_t (const array_item_t&); array_item_t (const array_item_t&);
const array_item_t &operator = (const array_item_t&); const array_item_t &operator = (const array_item_t&);
...@@ -65,9 +79,11 @@ namespace zmq ...@@ -65,9 +79,11 @@ namespace zmq
// Fast array implementation with O(1) access to item, insertion and // Fast array implementation with O(1) access to item, insertion and
// removal. Array stores pointers rather than objects. The objects have // removal. Array stores pointers rather than objects. The objects have
// to be derived from array_item_t class. // to be derived from array_item_t class, thus they can be stored in
// two arrays. Template parameter N specifies which index in array_item_t
// to use.
template <typename T> class array_t template <typename T, int N = 1> class array_t
{ {
public: public:
...@@ -98,28 +114,48 @@ namespace zmq ...@@ -98,28 +114,48 @@ namespace zmq
inline void push_back (T *item_) inline void push_back (T *item_)
{ {
if (item_) if (item_) {
item_->set_array_index ((int) items.size ()); if (N == 1)
item_->set_array_index1 ((int) items.size ());
else
item_->set_array_index2 ((int) items.size ());
}
items.push_back (item_); items.push_back (item_);
} }
inline void erase (T *item_) { inline void erase (T *item_)
erase (item_->get_array_index ()); {
if (N == 1)
erase (item_->get_array_index1 ());
else
erase (item_->get_array_index2 ());
} }
inline void erase (size_type index_) { inline void erase (size_type index_) {
if (items.back ()) if (items.back ()) {
items.back ()->set_array_index ((int) index_); if (N == 1)
items.back ()->set_array_index1 ((int) index_);
else
items.back ()->set_array_index2 ((int) index_);
}
items [index_] = items.back (); items [index_] = items.back ();
items.pop_back (); items.pop_back ();
} }
inline void swap (size_type index1_, size_type index2_) inline void swap (size_type index1_, size_type index2_)
{ {
if (N == 1) {
if (items [index1_])
items [index1_]->set_array_index1 ((int) index2_);
if (items [index2_])
items [index2_]->set_array_index1 ((int) index1_);
}
else {
if (items [index1_]) if (items [index1_])
items [index1_]->set_array_index ((int) index2_); items [index1_]->set_array_index2 ((int) index2_);
if (items [index2_]) if (items [index2_])
items [index2_]->set_array_index ((int) index1_); items [index2_]->set_array_index2 ((int) index1_);
}
std::swap (items [index1_], items [index2_]); std::swap (items [index1_], items [index2_]);
} }
...@@ -130,7 +166,10 @@ namespace zmq ...@@ -130,7 +166,10 @@ namespace zmq
inline size_type index (T *item_) inline size_type index (T *item_)
{ {
return (size_type) item_->get_array_index (); if (N == 1)
return (size_type) item_->get_array_index1 ();
else
return (size_type) item_->get_array_index2 ();
} }
private: private:
......
...@@ -40,8 +40,8 @@ namespace zmq ...@@ -40,8 +40,8 @@ namespace zmq
own, own,
attach, attach,
bind, bind,
activate_reader, activate_read,
activate_writer, activate_write,
pipe_term, pipe_term,
pipe_term_ack, pipe_term_ack,
term_req, term_req,
...@@ -79,8 +79,7 @@ namespace zmq ...@@ -79,8 +79,7 @@ namespace zmq
// Sent from session to socket to establish pipe(s) between them. // Sent from session to socket to establish pipe(s) between them.
// Caller have used inc_seqnum beforehand sending the command. // Caller have used inc_seqnum beforehand sending the command.
struct { struct {
class reader_t *in_pipe; class pipe_t *pipe;
class writer_t *out_pipe;
unsigned char peer_identity_size; unsigned char peer_identity_size;
unsigned char *peer_identity; unsigned char *peer_identity;
} bind; } bind;
...@@ -88,13 +87,13 @@ namespace zmq ...@@ -88,13 +87,13 @@ namespace zmq
// Sent by pipe writer to inform dormant pipe reader that there // Sent by pipe writer to inform dormant pipe reader that there
// are messages in the pipe. // are messages in the pipe.
struct { struct {
} activate_reader; } activate_read;
// Sent by pipe reader to inform pipe writer about how many // Sent by pipe reader to inform pipe writer about how many
// messages it has read so far. // messages it has read so far.
struct { struct {
uint64_t msgs_read; uint64_t msgs_read;
} activate_writer; } activate_write;
// Sent by pipe reader to pipe writer to ask it to terminate // Sent by pipe reader to pipe writer to ask it to terminate
// its end of the pipe. // its end of the pipe.
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "zmq_connecter.hpp" #include "zmq_connecter.hpp"
#include "pgm_sender.hpp" #include "pgm_sender.hpp"
#include "pgm_receiver.hpp" #include "pgm_receiver.hpp"
#include "err.hpp"
zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_, zmq::connect_session_t::connect_session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_, class socket_base_t *socket_, const options_t &options_,
......
...@@ -39,10 +39,8 @@ zmq::dist_t::~dist_t () ...@@ -39,10 +39,8 @@ zmq::dist_t::~dist_t ()
zmq_assert (pipes.empty ()); zmq_assert (pipes.empty ());
} }
void zmq::dist_t::attach (writer_t *pipe_) void zmq::dist_t::attach (pipe_t *pipe_)
{ {
pipe_->set_event_sink (this);
// If we are in the middle of sending a message, we'll add new pipe // If we are in the middle of sending a message, we'll add new pipe
// into the list of eligible pipes. Otherwise we add it to the list // into the list of eligible pipes. Otherwise we add it to the list
// of active pipes. // of active pipes.
...@@ -74,7 +72,7 @@ void zmq::dist_t::terminate () ...@@ -74,7 +72,7 @@ void zmq::dist_t::terminate ()
pipes [i]->terminate (); pipes [i]->terminate ();
} }
void zmq::dist_t::terminated (writer_t *pipe_) void zmq::dist_t::terminated (pipe_t *pipe_)
{ {
// Remove the pipe from the list; adjust number of active and/or // Remove the pipe from the list; adjust number of active and/or
// eligible pipes accordingly. // eligible pipes accordingly.
...@@ -88,7 +86,7 @@ void zmq::dist_t::terminated (writer_t *pipe_) ...@@ -88,7 +86,7 @@ void zmq::dist_t::terminated (writer_t *pipe_)
sink->unregister_term_ack (); sink->unregister_term_ack ();
} }
void zmq::dist_t::activated (writer_t *pipe_) void zmq::dist_t::activated (pipe_t *pipe_)
{ {
// Move the pipe from passive to eligible state. // Move the pipe from passive to eligible state.
pipes.swap (pipes.index (pipe_), eligible); pipes.swap (pipes.index (pipe_), eligible);
...@@ -153,7 +151,7 @@ bool zmq::dist_t::has_out () ...@@ -153,7 +151,7 @@ bool zmq::dist_t::has_out ()
return true; return true;
} }
bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
{ {
if (!pipe_->write (msg_)) { if (!pipe_->write (msg_)) {
pipes.swap (pipes.index (pipe_), active - 1); pipes.swap (pipes.index (pipe_), active - 1);
......
...@@ -31,33 +31,32 @@ namespace zmq ...@@ -31,33 +31,32 @@ namespace zmq
// Class manages a set of outbound pipes. It sends each messages to // Class manages a set of outbound pipes. It sends each messages to
// each of them. // each of them.
class dist_t : public i_writer_events class dist_t
{ {
public: public:
dist_t (class own_t *sink_); dist_t (class own_t *sink_);
~dist_t (); ~dist_t ();
void attach (writer_t *pipe_); void attach (class pipe_t *pipe_);
void terminate (); void terminate ();
int send (class msg_t *msg_, int flags_); int send (class msg_t *msg_, int flags_);
bool has_out (); bool has_out ();
// i_writer_events interface implementation. void activated (class pipe_t *pipe_);
void activated (writer_t *pipe_); void terminated (class pipe_t *pipe_);
void terminated (writer_t *pipe_);
private: private:
// Write the message to the pipe. Make the pipe inactive if writing // Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned. // fails. In such a case false is returned.
bool write (class writer_t *pipe_, class msg_t *msg_); bool write (class pipe_t *pipe_, class msg_t *msg_);
// Put the message to all active pipes. // Put the message to all active pipes.
void distribute (class msg_t *msg_, int flags_); void distribute (class msg_t *msg_, int flags_);
// List of outbound pipes. // List of outbound pipes.
typedef array_t <class writer_t> pipes_t; typedef array_t <class pipe_t, 2> pipes_t;
pipes_t pipes; pipes_t pipes;
// Number of active pipes. All the active pipes are located at the // Number of active pipes. All the active pipes are located at the
......
...@@ -38,10 +38,8 @@ zmq::fq_t::~fq_t () ...@@ -38,10 +38,8 @@ zmq::fq_t::~fq_t ()
zmq_assert (pipes.empty ()); zmq_assert (pipes.empty ());
} }
void zmq::fq_t::attach (reader_t *pipe_) void zmq::fq_t::attach (pipe_t *pipe_)
{ {
pipe_->set_event_sink (this);
pipes.push_back (pipe_); pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1); pipes.swap (active, pipes.size () - 1);
active++; active++;
...@@ -53,7 +51,7 @@ void zmq::fq_t::attach (reader_t *pipe_) ...@@ -53,7 +51,7 @@ void zmq::fq_t::attach (reader_t *pipe_)
} }
} }
void zmq::fq_t::terminated (reader_t *pipe_) void zmq::fq_t::terminated (pipe_t *pipe_)
{ {
// Make sure that we are not closing current pipe while // Make sure that we are not closing current pipe while
// message is half-read. // message is half-read.
...@@ -72,10 +70,6 @@ void zmq::fq_t::terminated (reader_t *pipe_) ...@@ -72,10 +70,6 @@ void zmq::fq_t::terminated (reader_t *pipe_)
sink->unregister_term_ack (); sink->unregister_term_ack ();
} }
void zmq::fq_t::delimited (reader_t *pipe_)
{
}
void zmq::fq_t::terminate () void zmq::fq_t::terminate ()
{ {
zmq_assert (!terminating); zmq_assert (!terminating);
...@@ -86,7 +80,7 @@ void zmq::fq_t::terminate () ...@@ -86,7 +80,7 @@ void zmq::fq_t::terminate ()
pipes [i]->terminate (); pipes [i]->terminate ();
} }
void zmq::fq_t::activated (reader_t *pipe_) void zmq::fq_t::activated (pipe_t *pipe_)
{ {
// Move the pipe to the list of active pipes. // Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active); pipes.swap (pipes.index (pipe_), active);
......
...@@ -31,28 +31,26 @@ namespace zmq ...@@ -31,28 +31,26 @@ namespace zmq
// Class manages a set of inbound pipes. On receive it performs fair // Class manages a set of inbound pipes. On receive it performs fair
// queueing (RFC970) so that senders gone berserk won't cause denial of // queueing (RFC970) so that senders gone berserk won't cause denial of
// service for decent senders. // service for decent senders.
class fq_t : public i_reader_events class fq_t
{ {
public: public:
fq_t (class own_t *sink_); fq_t (class own_t *sink_);
~fq_t (); ~fq_t ();
void attach (reader_t *pipe_); void attach (pipe_t *pipe_);
void terminate (); void terminate ();
int recv (msg_t *msg_, int flags_); int recv (msg_t *msg_, int flags_);
bool has_in (); bool has_in ();
// i_reader_events implementation. void activated (pipe_t *pipe_);
void activated (reader_t *pipe_); void terminated (pipe_t *pipe_);
void terminated (reader_t *pipe_);
void delimited (reader_t *pipe_);
private: private:
// Inbound pipes. // Inbound pipes.
typedef array_t <reader_t> pipes_t; typedef array_t <pipe_t, 1> pipes_t;
pipes_t pipes; pipes_t pipes;
// Number of active pipes. All the active pipes are located at the // Number of active pipes. All the active pipes are located at the
......
...@@ -39,10 +39,8 @@ zmq::lb_t::~lb_t () ...@@ -39,10 +39,8 @@ zmq::lb_t::~lb_t ()
zmq_assert (pipes.empty ()); zmq_assert (pipes.empty ());
} }
void zmq::lb_t::attach (writer_t *pipe_) void zmq::lb_t::attach (pipe_t *pipe_)
{ {
pipe_->set_event_sink (this);
pipes.push_back (pipe_); pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1); pipes.swap (active, pipes.size () - 1);
active++; active++;
...@@ -63,7 +61,7 @@ void zmq::lb_t::terminate () ...@@ -63,7 +61,7 @@ void zmq::lb_t::terminate ()
pipes [i]->terminate (); pipes [i]->terminate ();
} }
void zmq::lb_t::terminated (writer_t *pipe_) void zmq::lb_t::terminated (pipe_t *pipe_)
{ {
pipes_t::size_type index = pipes.index (pipe_); pipes_t::size_type index = pipes.index (pipe_);
...@@ -85,7 +83,7 @@ void zmq::lb_t::terminated (writer_t *pipe_) ...@@ -85,7 +83,7 @@ void zmq::lb_t::terminated (writer_t *pipe_)
sink->unregister_term_ack (); sink->unregister_term_ack ();
} }
void zmq::lb_t::activated (writer_t *pipe_) void zmq::lb_t::activated (pipe_t *pipe_)
{ {
// Move the pipe to the list of active pipes. // Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active); pipes.swap (pipes.index (pipe_), active);
......
...@@ -27,28 +27,28 @@ ...@@ -27,28 +27,28 @@
namespace zmq namespace zmq
{ {
// Class manages a set of outbound pipes. On send it load balances // This class manages a set of outbound pipes. On send it load balances
// messages fairly among the pipes. // messages fairly among the pipes.
class lb_t : public i_writer_events
class lb_t
{ {
public: public:
lb_t (class own_t *sink_); lb_t (class own_t *sink_);
~lb_t (); ~lb_t ();
void attach (writer_t *pipe_); void attach (pipe_t *pipe_);
void terminate (); void terminate ();
int send (msg_t *msg_, int flags_); int send (msg_t *msg_, int flags_);
bool has_out (); bool has_out ();
// i_writer_events interface implementation. void activated (pipe_t *pipe_);
void activated (writer_t *pipe_); void terminated (pipe_t *pipe_);
void terminated (writer_t *pipe_);
private: private:
// List of outbound pipes. // List of outbound pipes.
typedef array_t <class writer_t> pipes_t; typedef array_t <class pipe_t, 2> pipes_t;
pipes_t pipes; pipes_t pipes;
// Number of active pipes. All the active pipes are located at the // Number of active pipes. All the active pipes are located at the
......
...@@ -59,12 +59,12 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -59,12 +59,12 @@ void zmq::object_t::process_command (command_t &cmd_)
{ {
switch (cmd_.type) { switch (cmd_.type) {
case command_t::activate_reader: case command_t::activate_read:
process_activate_reader (); process_activate_read ();
break; break;
case command_t::activate_writer: case command_t::activate_write:
process_activate_writer (cmd_.args.activate_writer.msgs_read); process_activate_write (cmd_.args.activate_write.msgs_read);
break; break;
case command_t::stop: case command_t::stop:
...@@ -90,8 +90,8 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -90,8 +90,8 @@ void zmq::object_t::process_command (command_t &cmd_)
break; break;
case command_t::bind: case command_t::bind:
process_bind (cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe, process_bind (cmd_.args.bind.pipe, cmd_.args.bind.peer_identity ?
cmd_.args.bind.peer_identity ? blob_t (cmd_.args.bind.peer_identity, blob_t (cmd_.args.bind.peer_identity,
cmd_.args.bind.peer_identity_size) : blob_t ()); cmd_.args.bind.peer_identity_size) : blob_t ());
process_seqnum (); process_seqnum ();
break; break;
...@@ -236,8 +236,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_, ...@@ -236,8 +236,8 @@ void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_,
writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_) const blob_t &peer_identity_, bool inc_seqnum_)
{ {
if (inc_seqnum_) if (inc_seqnum_)
destination_->inc_seqnum (); destination_->inc_seqnum ();
...@@ -248,8 +248,7 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, ...@@ -248,8 +248,7 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
#endif #endif
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::bind; cmd.type = command_t::bind;
cmd.args.bind.in_pipe = in_pipe_; cmd.args.bind.pipe = pipe_;
cmd.args.bind.out_pipe = out_pipe_;
if (peer_identity_.empty ()) { if (peer_identity_.empty ()) {
cmd.args.bind.peer_identity_size = 0; cmd.args.bind.peer_identity_size = 0;
cmd.args.bind.peer_identity = NULL; cmd.args.bind.peer_identity = NULL;
...@@ -267,18 +266,18 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, ...@@ -267,18 +266,18 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_,
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_activate_reader (reader_t *destination_) void zmq::object_t::send_activate_read (pipe_t *destination_)
{ {
command_t cmd; command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY #if defined ZMQ_MAKE_VALGRIND_HAPPY
memset (&cmd, 0, sizeof (cmd)); memset (&cmd, 0, sizeof (cmd));
#endif #endif
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::activate_reader; cmd.type = command_t::activate_read;
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_activate_writer (writer_t *destination_, void zmq::object_t::send_activate_write (pipe_t *destination_,
uint64_t msgs_read_) uint64_t msgs_read_)
{ {
command_t cmd; command_t cmd;
...@@ -286,12 +285,12 @@ void zmq::object_t::send_activate_writer (writer_t *destination_, ...@@ -286,12 +285,12 @@ void zmq::object_t::send_activate_writer (writer_t *destination_,
memset (&cmd, 0, sizeof (cmd)); memset (&cmd, 0, sizeof (cmd));
#endif #endif
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::activate_writer; cmd.type = command_t::activate_write;
cmd.args.activate_writer.msgs_read = msgs_read_; cmd.args.activate_write.msgs_read = msgs_read_;
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_pipe_term (writer_t *destination_) void zmq::object_t::send_pipe_term (pipe_t *destination_)
{ {
command_t cmd; command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY #if defined ZMQ_MAKE_VALGRIND_HAPPY
...@@ -302,7 +301,7 @@ void zmq::object_t::send_pipe_term (writer_t *destination_) ...@@ -302,7 +301,7 @@ void zmq::object_t::send_pipe_term (writer_t *destination_)
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_pipe_term_ack (reader_t *destination_) void zmq::object_t::send_pipe_term_ack (pipe_t *destination_)
{ {
command_t cmd; command_t cmd;
#if defined ZMQ_MAKE_VALGRIND_HAPPY #if defined ZMQ_MAKE_VALGRIND_HAPPY
...@@ -404,18 +403,17 @@ void zmq::object_t::process_attach (i_engine *engine_, ...@@ -404,18 +403,17 @@ void zmq::object_t::process_attach (i_engine *engine_,
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, void zmq::object_t::process_bind (pipe_t *pipe_, const blob_t &peer_identity_)
const blob_t &peer_identity_)
{ {
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_activate_reader () void zmq::object_t::process_activate_read ()
{ {
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_activate_writer (uint64_t msgs_read_) void zmq::object_t::process_activate_write (uint64_t msgs_read_)
{ {
zmq_assert (false); zmq_assert (false);
} }
......
...@@ -66,14 +66,13 @@ namespace zmq ...@@ -66,14 +66,13 @@ namespace zmq
void send_attach (class session_t *destination_, void send_attach (class session_t *destination_,
struct i_engine *engine_, const blob_t &peer_identity_, struct i_engine *engine_, const blob_t &peer_identity_,
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
void send_bind (class own_t *destination_, void send_bind (class own_t *destination_, class pipe_t *pipe_,
class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_, bool inc_seqnum_ = true); const blob_t &peer_identity_, bool inc_seqnum_ = true);
void send_activate_reader (class reader_t *destination_); void send_activate_read (class pipe_t *destination_);
void send_activate_writer (class writer_t *destination_, void send_activate_write (class pipe_t *destination_,
uint64_t msgs_read_); uint64_t msgs_read_);
void send_pipe_term (class writer_t *destination_); void send_pipe_term (class pipe_t *destination_);
void send_pipe_term_ack (class reader_t *destination_); void send_pipe_term_ack (class pipe_t *destination_);
void send_term_req (class own_t *destination_, void send_term_req (class own_t *destination_,
class own_t *object_); class own_t *object_);
void send_term (class own_t *destination_, int linger_); void send_term (class own_t *destination_, int linger_);
...@@ -89,10 +88,10 @@ namespace zmq ...@@ -89,10 +88,10 @@ namespace zmq
virtual void process_own (class own_t *object_); virtual void process_own (class own_t *object_);
virtual void process_attach (struct i_engine *engine_, virtual void process_attach (struct i_engine *engine_,
const blob_t &peer_identity_); const blob_t &peer_identity_);
virtual void process_bind (class reader_t *in_pipe_, virtual void process_bind (class pipe_t *pipe_,
class writer_t *out_pipe_, const blob_t &peer_identity_); const blob_t &peer_identity_);
virtual void process_activate_reader (); virtual void process_activate_read ();
virtual void process_activate_writer (uint64_t msgs_read_); virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_pipe_term (); virtual void process_pipe_term ();
virtual void process_pipe_term_ack (); virtual void process_pipe_term_ack ();
virtual void process_term_req (class own_t *object_); virtual void process_term_req (class own_t *object_);
......
...@@ -38,8 +38,6 @@ zmq::options_t::options_t () : ...@@ -38,8 +38,6 @@ zmq::options_t::options_t () :
reconnect_ivl_max (0), reconnect_ivl_max (0),
backlog (100), backlog (100),
maxmsgsize (-1), maxmsgsize (-1),
requires_in (false),
requires_out (false),
immediate_connect (true) immediate_connect (true)
{ {
} }
......
...@@ -75,11 +75,6 @@ namespace zmq ...@@ -75,11 +75,6 @@ namespace zmq
// Maximal size of message to handle. // Maximal size of message to handle.
int64_t maxmsgsize; int64_t maxmsgsize;
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;
bool requires_out;
// If true, when connecting, pipes are created immediately without // If true, when connecting, pipes are created immediately without
// waiting for the connection to be established. That way the socket // waiting for the connection to be established. That way the socket
// is not aware of the peer's identity, however, it is able to send // is not aware of the peer's identity, however, it is able to send
......
...@@ -173,6 +173,7 @@ void zmq::own_t::process_term (int linger_) ...@@ -173,6 +173,7 @@ void zmq::own_t::process_term (int linger_)
void zmq::own_t::register_term_acks (int count_) void zmq::own_t::register_term_acks (int count_)
{ {
term_acks += count_; term_acks += count_;
printf ("reg %d acks (%p, %d)\n", count_, (void*) this, term_acks);
} }
void zmq::own_t::unregister_term_ack () void zmq::own_t::unregister_term_ack ()
...@@ -180,6 +181,8 @@ void zmq::own_t::unregister_term_ack () ...@@ -180,6 +181,8 @@ void zmq::own_t::unregister_term_ack ()
zmq_assert (term_acks > 0); zmq_assert (term_acks > 0);
term_acks--; term_acks--;
printf ("unreg 1 acks (%p, %d)\n", (void*) this, term_acks);
// This may be a last ack we are waiting for before termination... // This may be a last ack we are waiting for before termination...
check_term_acks (); check_term_acks ();
} }
......
...@@ -25,111 +25,72 @@ ...@@ -25,111 +25,72 @@
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) : zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
socket_base_t (parent_, tid_), socket_base_t (parent_, tid_),
inpipe (NULL), pipe (NULL),
outpipe (NULL),
inpipe_alive (false),
outpipe_alive (false),
terminating (false) terminating (false)
{ {
options.type = ZMQ_PAIR; options.type = ZMQ_PAIR;
options.requires_in = true;
options.requires_out = true;
} }
zmq::pair_t::~pair_t () zmq::pair_t::~pair_t ()
{ {
zmq_assert (!inpipe); zmq_assert (!pipe);
zmq_assert (!outpipe);
} }
void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, void zmq::pair_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
const blob_t &peer_identity_)
{ {
zmq_assert (!inpipe && !outpipe); zmq_assert (!pipe);
inpipe = inpipe_; pipe = pipe_;
inpipe_alive = true; pipe->set_event_sink (this);
inpipe->set_event_sink (this);
outpipe = outpipe_;
outpipe_alive = true;
outpipe->set_event_sink (this);
if (terminating) { if (terminating) {
register_term_acks (2); register_term_acks (1);
inpipe_->terminate (); pipe_->terminate ();
outpipe_->terminate ();
} }
} }
void zmq::pair_t::terminated (reader_t *pipe_) void zmq::pair_t::terminated (pipe_t *pipe_)
{
zmq_assert (pipe_ == inpipe);
inpipe = NULL;
inpipe_alive = false;
if (terminating)
unregister_term_ack ();
}
void zmq::pair_t::terminated (writer_t *pipe_)
{ {
zmq_assert (pipe_ == outpipe); zmq_assert (pipe_ == pipe);
outpipe = NULL; pipe = NULL;
outpipe_alive = false;
if (terminating) if (terminating)
unregister_term_ack (); unregister_term_ack ();
} }
void zmq::pair_t::delimited (reader_t *pipe_)
{
}
void zmq::pair_t::process_term (int linger_) void zmq::pair_t::process_term (int linger_)
{ {
terminating = true; terminating = true;
if (inpipe) { if (pipe) {
register_term_acks (1); register_term_acks (1);
inpipe->terminate (); pipe->terminate ();
}
if (outpipe) {
register_term_acks (1);
outpipe->terminate ();
} }
socket_base_t::process_term (linger_); socket_base_t::process_term (linger_);
} }
void zmq::pair_t::activated (class reader_t *pipe_) void zmq::pair_t::read_activated (pipe_t *pipe_)
{ {
zmq_assert (!inpipe_alive); // There's just one pipe. No lists of active and inactive pipes.
inpipe_alive = true; // There's nothing to do here.
} }
void zmq::pair_t::activated (class writer_t *pipe_) void zmq::pair_t::write_activated (pipe_t *pipe_)
{ {
zmq_assert (!outpipe_alive); // There's just one pipe. No lists of active and inactive pipes.
outpipe_alive = true; // There's nothing to do here.
} }
int zmq::pair_t::xsend (msg_t *msg_, int flags_) int zmq::pair_t::xsend (msg_t *msg_, int flags_)
{ {
if (outpipe == NULL || !outpipe_alive) { if (!pipe || !pipe->write (msg_)) {
errno = EAGAIN;
return -1;
}
if (!outpipe->write (msg_)) {
outpipe_alive = false;
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
if (!(flags_ & ZMQ_SNDMORE)) if (!(flags_ & ZMQ_SNDMORE))
outpipe->flush (); pipe->flush ();
// Detach the original message from the data buffer. // Detach the original message from the data buffer.
int rc = msg_->init (); int rc = msg_->init ();
...@@ -144,14 +105,12 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_) ...@@ -144,14 +105,12 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_)
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) { if (!pipe || !pipe->read (msg_)) {
// No message is available.
inpipe_alive = false;
// Initialise the output parameter to be a 0-byte message. // Initialise the output parameter to be a 0-byte message.
rc = msg_->init (); rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
...@@ -160,24 +119,23 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_) ...@@ -160,24 +119,23 @@ int zmq::pair_t::xrecv (msg_t *msg_, int flags_)
bool zmq::pair_t::xhas_in () bool zmq::pair_t::xhas_in ()
{ {
if (!inpipe || !inpipe_alive) if (!pipe)
return false; return false;
inpipe_alive = inpipe->check_read (); return pipe->check_read ();
return inpipe_alive;
} }
bool zmq::pair_t::xhas_out () bool zmq::pair_t::xhas_out ()
{ {
if (!outpipe || !outpipe_alive) if (!pipe)
return false; return false;
msg_t msg; msg_t msg;
int rc = msg.init (); int rc = msg.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
outpipe_alive = outpipe->check_write (&msg); bool result = pipe->check_write (&msg);
rc = msg.close (); rc = msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
return outpipe_alive; return result;
} }
...@@ -29,8 +29,7 @@ namespace zmq ...@@ -29,8 +29,7 @@ namespace zmq
class pair_t : class pair_t :
public socket_base_t, public socket_base_t,
public i_reader_events, public i_pipe_events
public i_writer_events
{ {
public: public:
...@@ -38,32 +37,23 @@ namespace zmq ...@@ -38,32 +37,23 @@ namespace zmq
~pair_t (); ~pair_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
// i_reader_events interface implementation. // i_pipe_events interface implementation.
void activated (class reader_t *pipe_); void read_activated (class pipe_t *pipe_);
void terminated (class reader_t *pipe_); void write_activated (class pipe_t *pipe_);
void delimited (class reader_t *pipe_); void terminated (class pipe_t *pipe_);
// i_writer_events interface implementation.
void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_);
private: private:
// Hook into termination process. // Hook into termination process.
void process_term (int linger_); void process_term (int linger_);
class reader_t *inpipe; class pipe_t *pipe;
class writer_t *outpipe;
bool inpipe_alive;
bool outpipe_alive;
bool terminating; bool terminating;
......
This diff is collapsed.
...@@ -22,47 +22,43 @@ ...@@ -22,47 +22,43 @@
#define __ZMQ_PIPE_HPP_INCLUDED__ #define __ZMQ_PIPE_HPP_INCLUDED__
#include "msg.hpp" #include "msg.hpp"
#include "array.hpp"
#include "ypipe.hpp" #include "ypipe.hpp"
#include "config.hpp" #include "config.hpp"
#include "object.hpp" #include "object.hpp"
#include "stdint.hpp" #include "array.hpp"
namespace zmq namespace zmq
{ {
// Creates a pipe. Returns pointer to reader and writer objects. // Create a pipepair for bi-directional transfer of messages.
void create_pipe (object_t *reader_parent_, object_t *writer_parent_, // First HWM is for messages passed from first pipe to the second pipe.
int hwm_, class reader_t **reader_, class writer_t **writer_); // Second HWM is for messages passed from second pipe to the first pipe.
// Delay specifies whether the pipe receives all the pending messages
// The shutdown mechanism for pipe works as follows: Either endpoint // before terminating or whether it terminates straight away.
// (or even both of them) can ask pipe to terminate by calling 'terminate' int pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
// method. Pipe then terminates in asynchronous manner. When the part of int hwms_ [2], bool delays_ [2]);
// the shutdown tied to the endpoint is done it triggers 'terminated'
// event. When endpoint processes the event and returns, associated
// reader/writer object is deallocated.
typedef ypipe_t <msg_t, message_pipe_granularity> pipe_t;
struct i_reader_events struct i_pipe_events
{ {
virtual ~i_reader_events () {} virtual ~i_pipe_events () {}
virtual void terminated (class reader_t *pipe_) = 0; virtual void read_activated (class pipe_t *pipe_) = 0;
virtual void activated (class reader_t *pipe_) = 0; virtual void write_activated (class pipe_t *pipe_) = 0;
virtual void delimited (class reader_t *pipe_) = 0; virtual void terminated (class pipe_t *pipe_) = 0;
}; };
class reader_t : public object_t, public array_item_t class pipe_t :
public object_t,
public array_item_t
{ {
friend void create_pipe (object_t*, object_t*, int, // This allows pipepair to create pipe objects.
reader_t**, writer_t**); friend int pipepair (class object_t *parents_ [2],
friend class writer_t; class pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]);
public: public:
// Specifies the object to get events from the reader. // Specifies the object to send events to.
void set_event_sink (i_reader_events *endpoint_); void set_event_sink (i_pipe_events *sink_);
// Returns true if there is at least one message to read in the pipe. // Returns true if there is at least one message to read in the pipe.
bool check_read (); bool check_read ();
...@@ -70,127 +66,100 @@ namespace zmq ...@@ -70,127 +66,100 @@ namespace zmq
// Reads a message to the underlying pipe. // Reads a message to the underlying pipe.
bool read (msg_t *msg_); bool read (msg_t *msg_);
// Ask pipe to terminate. // Checks whether messages can be written to the pipe. If writing
void terminate (); // the message would cause high watermark the function returns false.
private:
reader_t (class object_t *parent_, pipe_t *pipe_, int lwm_);
~reader_t ();
// To be called only by writer itself!
void set_writer (class writer_t *writer_);
// Command handlers.
void process_activate_reader ();
void process_pipe_term_ack ();
// Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_);
// True, if pipe can be read from.
bool active;
// The underlying pipe.
pipe_t *pipe;
// Pipe writer associated with the other side of the pipe.
class writer_t *writer;
// Low watermark for in-memory storage (in bytes).
int lwm;
// Number of messages read so far.
uint64_t msgs_read;
// Sink for the events (either the socket of the session).
i_reader_events *sink;
// True is 'terminate' method was called or delimiter
// was read from the pipe.
bool terminating;
reader_t (const reader_t&);
const reader_t &operator = (const reader_t&);
};
struct i_writer_events
{
virtual ~i_writer_events () {}
virtual void terminated (class writer_t *pipe_) = 0;
virtual void activated (class writer_t *pipe_) = 0;
};
class writer_t : public object_t, public array_item_t
{
friend void create_pipe (object_t*, object_t*, int,
reader_t**, writer_t**);
public:
// Specifies the object to get events from the writer.
void set_event_sink (i_writer_events *endpoint_);
// Checks whether messages can be written to the pipe.
// If writing the message would cause high watermark
// the function returns false.
bool check_write (msg_t *msg_); bool check_write (msg_t *msg_);
// Writes a message to the underlying pipe. Returns false if the // Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached. // message cannot be written because high watermark was reached.
bool write (msg_t *msg_); bool write (msg_t *msg_);
// Remove unfinished part of a message from the pipe. // Remove unfinished parts of the outbound message from the pipe.
void rollback (); void rollback ();
// Flush the messages downsteam. // Flush the messages downsteam.
void flush (); void flush ();
// Ask pipe to terminate. // Ask pipe to terminate. The termination will happen asynchronously
// and user will be notified about actual deallocation by 'terminated'
// event.
void terminate (); void terminate ();
private: private:
writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_,
int hwm_);
~writer_t ();
// Command handlers. // Command handlers.
void process_activate_writer (uint64_t msgs_read_); void process_activate_read ();
void process_activate_write (uint64_t msgs_read_);
void process_pipe_term (); void process_pipe_term ();
void process_pipe_term_ack ();
// Type of the underlying lock-free pipe.
typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
// Tests whether underlying pipe is already full. // Constructor is private. Pipe can only be created using
bool pipe_full (); // pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
int inhwm_, int outhwm_, bool delay_);
// True, if this object can be written to. // Pipepair uses this function to let us know about
bool active; // the peer pipe object.
void set_peer (pipe_t *pipe_);
// The underlying pipe. // Destructor is private. Pipe objects destroy themselves.
pipe_t *pipe; ~pipe_t ();
// Pipe reader associated with the other side of the pipe. // Underlying pipes for both directions.
reader_t *reader; upipe_t *inpipe;
upipe_t *outpipe;
// High watermark for in-memory storage (in bytes). // Can the pipe be read from / written to?
bool in_active;
bool out_active;
// High watermark for the outbound pipe.
int hwm; int hwm;
// Last confirmed number of messages read from the pipe. // Low watermark for the inbound pipe.
// The actual number can be higher. int lwm;
uint64_t msgs_read;
// Number of messages we have written so far. // Number of messages read and written so far.
uint64_t msgs_read;
uint64_t msgs_written; uint64_t msgs_written;
// Sink for the events (either the socket or the session). // Last received peer's msgs_read. The actual number in the peer
i_writer_events *sink; // can be higher at the moment.
uint64_t peers_msgs_read;
// True is 'terminate' method was called of 'pipe_term' command // The pipe object on the other side of the pipepair.
// arrived from the reader. pipe_t *peer;
// Sink to send events to.
i_pipe_events *sink;
// True is 'terminate' method was called or termination request
// was received from the peer.
bool terminating; bool terminating;
writer_t (const writer_t&); // True is we've already got pipe_term command from the peer.
const writer_t &operator = (const writer_t&); bool term_recvd;
// True if delimiter was already received from the peer.
bool delimited;
// If true, we receive all the pending inbound messages before
// terminating. If false, we terminate immediately when the peer
// asks us to.
bool delay;
// Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_);
// Computes appropriate low watermark from the given high watermark.
static int compute_lwm (int hwm_);
// Disable copying.
pipe_t (const pipe_t&);
const pipe_t &operator = (const pipe_t&);
}; };
} }
......
...@@ -27,19 +27,33 @@ zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -27,19 +27,33 @@ zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t tid_) :
fq (this) fq (this)
{ {
options.type = ZMQ_PULL; options.type = ZMQ_PULL;
options.requires_in = true;
options.requires_out = false;
} }
zmq::pull_t::~pull_t () zmq::pull_t::~pull_t ()
{ {
} }
void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, void zmq::pull_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
zmq_assert (inpipe_ && !outpipe_); zmq_assert (pipe_);
fq.attach (inpipe_); pipe_->set_event_sink (this);
fq.attach (pipe_);
}
void zmq::pull_t::read_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
void zmq::pull_t::write_activated (pipe_t *pipe_)
{
// There are no outbound messages in pull socket. This should never happen.
zmq_assert (false);
}
void zmq::pull_t::terminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
} }
void zmq::pull_t::process_term (int linger_) void zmq::pull_t::process_term (int linger_)
......
...@@ -22,12 +22,15 @@ ...@@ -22,12 +22,15 @@
#define __ZMQ_PULL_HPP_INCLUDED__ #define __ZMQ_PULL_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pipe.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
class pull_t : public socket_base_t class pull_t :
public socket_base_t,
public i_pipe_events
{ {
public: public:
...@@ -37,13 +40,17 @@ namespace zmq ...@@ -37,13 +40,17 @@ namespace zmq
protected: protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process. // Hook into the termination process.
void process_term (int linger_); void process_term (int linger_);
......
...@@ -28,19 +28,33 @@ zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -28,19 +28,33 @@ zmq::push_t::push_t (class ctx_t *parent_, uint32_t tid_) :
lb (this) lb (this)
{ {
options.type = ZMQ_PUSH; options.type = ZMQ_PUSH;
options.requires_in = false;
options.requires_out = true;
} }
zmq::push_t::~push_t () zmq::push_t::~push_t ()
{ {
} }
void zmq::push_t::xattach_pipes (class reader_t *inpipe_, void zmq::push_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
zmq_assert (!inpipe_ && outpipe_); zmq_assert (pipe_);
lb.attach (outpipe_); pipe_->set_event_sink (this);
lb.attach (pipe_);
}
void zmq::push_t::read_activated (pipe_t *pipe_)
{
// There are no inbound messages in push socket. This should never happen.
zmq_assert (false);
}
void zmq::push_t::write_activated (pipe_t *pipe_)
{
lb.activated (pipe_);
}
void zmq::push_t::terminated (pipe_t *pipe_)
{
lb.terminated (pipe_);
} }
void zmq::push_t::process_term (int linger_) void zmq::push_t::process_term (int linger_)
......
...@@ -22,12 +22,15 @@ ...@@ -22,12 +22,15 @@
#define __ZMQ_PUSH_HPP_INCLUDED__ #define __ZMQ_PUSH_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pipe.hpp"
#include "lb.hpp" #include "lb.hpp"
namespace zmq namespace zmq
{ {
class push_t : public socket_base_t class push_t :
public socket_base_t,
public i_pipe_events
{ {
public: public:
...@@ -37,13 +40,17 @@ namespace zmq ...@@ -37,13 +40,17 @@ namespace zmq
protected: protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process. // Hook into the termination process.
void process_term (int linger_); void process_term (int linger_);
......
...@@ -29,13 +29,12 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, ...@@ -29,13 +29,12 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_) : class socket_base_t *socket_, const options_t &options_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
in_pipe (NULL), pipe (NULL),
incomplete_in (false), incomplete_in (false),
out_pipe (NULL),
engine (NULL), engine (NULL),
socket (socket_), socket (socket_),
io_thread (io_thread_), io_thread (io_thread_),
pipes_attached (false), pipe_attached (false),
delimiter_processed (false), delimiter_processed (false),
force_terminate (false), force_terminate (false),
has_linger_timer (false), has_linger_timer (false),
...@@ -45,8 +44,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_, ...@@ -45,8 +44,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
zmq::session_t::~session_t () zmq::session_t::~session_t ()
{ {
zmq_assert (!in_pipe); zmq_assert (!pipe);
zmq_assert (!out_pipe);
if (engine) if (engine)
engine->terminate (); engine->terminate ();
...@@ -66,13 +64,9 @@ void zmq::session_t::proceed_with_term () ...@@ -66,13 +64,9 @@ void zmq::session_t::proceed_with_term ()
has_linger_timer = false; has_linger_timer = false;
} }
if (in_pipe) { if (pipe) {
register_term_acks (1); register_term_acks (1);
in_pipe->terminate (); pipe->terminate ();
}
if (out_pipe) {
register_term_acks (1);
out_pipe->terminate ();
} }
// The session has already waited for the linger period. We don't want // The session has already waited for the linger period. We don't want
...@@ -82,10 +76,10 @@ void zmq::session_t::proceed_with_term () ...@@ -82,10 +76,10 @@ void zmq::session_t::proceed_with_term ()
bool zmq::session_t::read (msg_t *msg_) bool zmq::session_t::read (msg_t *msg_)
{ {
if (!in_pipe) if (!pipe)
return false; return false;
if (!in_pipe->read (msg_)) if (!pipe->read (msg_))
return false; return false;
incomplete_in = msg_->flags () & msg_t::more; incomplete_in = msg_->flags () & msg_t::more;
...@@ -94,7 +88,7 @@ bool zmq::session_t::read (msg_t *msg_) ...@@ -94,7 +88,7 @@ bool zmq::session_t::read (msg_t *msg_)
bool zmq::session_t::write (msg_t *msg_) bool zmq::session_t::write (msg_t *msg_)
{ {
if (out_pipe && out_pipe->write (msg_)) { if (pipe && pipe->write (msg_)) {
int rc = msg_->init (); int rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
return true; return true;
...@@ -105,21 +99,20 @@ bool zmq::session_t::write (msg_t *msg_) ...@@ -105,21 +99,20 @@ bool zmq::session_t::write (msg_t *msg_)
void zmq::session_t::flush () void zmq::session_t::flush ()
{ {
if (out_pipe) if (pipe)
out_pipe->flush (); pipe->flush ();
} }
void zmq::session_t::clean_pipes () void zmq::session_t::clean_pipes ()
{ {
if (pipe) {
// Get rid of half-processed messages in the out pipe. Flush any // Get rid of half-processed messages in the out pipe. Flush any
// unflushed messages upstream. // unflushed messages upstream.
if (out_pipe) { pipe->rollback ();
out_pipe->rollback (); pipe->flush ();
out_pipe->flush ();
}
// Remove any half-read message from the in pipe. // Remove any half-read message from the in pipe.
if (in_pipe) {
while (incomplete_in) { while (incomplete_in) {
msg_t msg; msg_t msg;
int rc = msg.init (); int rc = msg.init ();
...@@ -134,78 +127,54 @@ void zmq::session_t::clean_pipes () ...@@ -134,78 +127,54 @@ void zmq::session_t::clean_pipes ()
} }
} }
void zmq::session_t::attach_pipes (class reader_t *inpipe_, void zmq::session_t::attach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
zmq_assert (!pipes_attached); zmq_assert (!pipe_attached);
pipes_attached = true; pipe_attached = true;
if (inpipe_) {
zmq_assert (!in_pipe);
in_pipe = inpipe_;
in_pipe->set_event_sink (this);
}
if (outpipe_) { if (pipe_) {
zmq_assert (!out_pipe); zmq_assert (!pipe);
out_pipe = outpipe_; pipe = pipe_;
out_pipe->set_event_sink (this); pipe->set_event_sink (this);
} }
// If we are already terminating, terminate the pipes straight away. // If we are already terminating, terminate the pipes straight away.
if (state == terminating) { if (state == terminating) {
if (in_pipe) { if (pipe) {
in_pipe->terminate (); pipe->terminate ();
register_term_acks (1);
}
if (out_pipe) {
out_pipe->terminate ();
register_term_acks (1); register_term_acks (1);
} }
} }
} }
void zmq::session_t::delimited (reader_t *pipe_) void zmq::session_t::terminated (pipe_t *pipe_)
{ {
zmq_assert (in_pipe == pipe_); zmq_assert (pipe == pipe_);
zmq_assert (!delimiter_processed);
delimiter_processed = true;
// If we are in process of being closed, but still waiting for all // If we are in process of being closed, but still waiting for all
// pending messeges being sent, we can terminate here. // pending messeges being sent, we can terminate here.
if (state == pending) if (state == pending)
proceed_with_term (); proceed_with_term ();
}
void zmq::session_t::terminated (reader_t *pipe_) pipe = NULL;
{
zmq_assert (in_pipe == pipe_);
in_pipe = NULL;
if (state == terminating) if (state == terminating)
unregister_term_ack (); unregister_term_ack ();
} }
void zmq::session_t::terminated (writer_t *pipe_) void zmq::session_t::read_activated (pipe_t *pipe_)
{ {
zmq_assert (out_pipe == pipe_); zmq_assert (pipe == pipe_);
out_pipe = NULL;
if (state == terminating)
unregister_term_ack ();
}
void zmq::session_t::activated (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
if (likely (engine != NULL)) if (likely (engine != NULL))
engine->activate_out (); engine->activate_out ();
else else
in_pipe->check_read (); pipe->check_read ();
} }
void zmq::session_t::activated (writer_t *pipe_) void zmq::session_t::write_activated (pipe_t *pipe_)
{ {
zmq_assert (out_pipe == pipe_); zmq_assert (pipe == pipe_);
if (engine) if (engine)
engine->activate_in (); engine->activate_in ();
} }
...@@ -240,29 +209,27 @@ void zmq::session_t::process_attach (i_engine *engine_, ...@@ -240,29 +209,27 @@ void zmq::session_t::process_attach (i_engine *engine_,
return; return;
} }
// Check whether the required pipes already exist. If not so, we'll // Check whether the required pipe already exists and create it
// create them and bind them to the socket object. // if it does not.
if (!pipes_attached) { if (!pipe_attached) {
zmq_assert (!in_pipe && !out_pipe); zmq_assert (!pipe);
pipes_attached = true; pipe_attached = true;
reader_t *socket_reader = NULL;
writer_t *socket_writer = NULL; object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL};
// Create the pipes, as required. int hwms [2] = {options.rcvhwm, options.sndhwm};
if (options.requires_in) { bool delays [2] = {true, true};
create_pipe (socket, this, options.rcvhwm, &socket_reader, int rc = pipepair (parents, pipes, hwms, delays);
&out_pipe); errno_assert (rc == 0);
out_pipe->set_event_sink (this);
} // Plug the local end of the pipe.
if (options.requires_out) { pipes [0]->set_event_sink (this);
create_pipe (this, socket, options.sndhwm, &in_pipe,
&socket_writer); // Remember the local end of the pipe.
in_pipe->set_event_sink (this); pipe = pipes [0];
}
// Bind the pipes to the socket object. // Ask socket to plug into the remote end of the pipe.
if (socket_reader || socket_writer) send_bind (socket, pipes [1], peer_identity_);
send_bind (socket, socket_reader, socket_writer, peer_identity_);
} }
// Plug in the engine. // Plug in the engine.
...@@ -282,9 +249,9 @@ void zmq::session_t::detach () ...@@ -282,9 +249,9 @@ void zmq::session_t::detach ()
// Send the event to the derived class. // Send the event to the derived class.
detached (); detached ();
// Just in case, there's only a delimiter in the inbound pipe. // Just in case there's only a delimiter in the inbound pipe.
if (in_pipe) if (pipe)
in_pipe->check_read (); pipe->check_read ();
} }
void zmq::session_t::process_term (int linger_) void zmq::session_t::process_term (int linger_)
...@@ -308,16 +275,16 @@ void zmq::session_t::process_term (int linger_) ...@@ -308,16 +275,16 @@ void zmq::session_t::process_term (int linger_)
// If there's no engine and there's only delimiter in the pipe it wouldn't // If there's no engine and there's only delimiter in the pipe it wouldn't
// be ever read. Thus we check for it explicitly. // be ever read. Thus we check for it explicitly.
if (in_pipe) if (pipe)
in_pipe->check_read (); pipe->check_read ();
// If there's no in pipe there are no pending messages to send. // If there's no in pipe, there are no pending messages to send.
// We can proceed with the shutdown straight away. Also, if there is // We can proceed with the shutdown straight away. Also, if there is
// inbound pipe, but the delimiter was already processed, we can // pipe, but the delimiter was already processed, we can terminate
// terminate immediately. Alternatively, if the derived session type have // immediately. Alternatively, if the derived session type have
// called 'terminate' we'll finish straight away. // called 'terminate' we'll finish straight away.
if (!options.requires_out || delimiter_processed || force_terminate || if (delimiter_processed || force_terminate ||
(!options.immediate_connect && !in_pipe)) (!options.immediate_connect && !pipe))
proceed_with_term (); proceed_with_term ();
} }
......
...@@ -34,8 +34,7 @@ namespace zmq ...@@ -34,8 +34,7 @@ namespace zmq
public own_t, public own_t,
public io_object_t, public io_object_t,
public i_inout, public i_inout,
public i_reader_events, public i_pipe_events
public i_writer_events
{ {
public: public:
...@@ -50,17 +49,12 @@ namespace zmq ...@@ -50,17 +49,12 @@ namespace zmq
void flush (); void flush ();
void detach (); void detach ();
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
// i_reader_events interface implementation.
void activated (class reader_t *pipe_);
void terminated (class reader_t *pipe_);
void delimited (class reader_t *pipe_);
// i_writer_events interface implementation. // i_pipe_events interface implementation.
void activated (class writer_t *pipe_); void read_activated (class pipe_t *pipe_);
void terminated (class writer_t *pipe_); void write_activated (class pipe_t *pipe_);
void terminated (class pipe_t *pipe_);
protected: protected:
...@@ -103,16 +97,13 @@ namespace zmq ...@@ -103,16 +97,13 @@ namespace zmq
// Call this function to move on with the delayed process_term. // Call this function to move on with the delayed process_term.
void proceed_with_term (); void proceed_with_term ();
// Inbound pipe, i.e. one the session is getting messages from. // Pipe connecting the session to its socket.
class reader_t *in_pipe; class pipe_t *pipe;
// This flag is true if the remainder of the message being processed // This flag is true if the remainder of the message being processed
// is still in the in pipe. // is still in the in pipe.
bool incomplete_in; bool incomplete_in;
// Outbound pipe, i.e. one the socket is sending messages to.
class writer_t *out_pipe;
// The protocol I/O engine connected to the session. // The protocol I/O engine connected to the session.
struct i_engine *engine; struct i_engine *engine;
...@@ -123,8 +114,8 @@ namespace zmq ...@@ -123,8 +114,8 @@ namespace zmq
// the engines into the same thread. // the engines into the same thread.
class io_thread_t *io_thread; class io_thread_t *io_thread;
// If true, pipes were already attached to this session. // If true, pipe was already attached to this session.
bool pipes_attached; bool pipe_attached;
// If true, delimiter was already read from the inbound pipe. // If true, delimiter was already read from the inbound pipe.
bool delimiter_processed; bool delimiter_processed;
......
...@@ -211,17 +211,17 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) ...@@ -211,17 +211,17 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return 0; return 0;
} }
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_, void zmq::socket_base_t::attach_pipe (class pipe_t *pipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) const blob_t &peer_identity_)
{ {
// If the peer haven't specified it's identity, let's generate one. // If the peer haven't specified it's identity, let's generate one.
if (peer_identity_.size ()) { if (peer_identity_.size ()) {
xattach_pipes (inpipe_, outpipe_, peer_identity_); xattach_pipe (pipe_, peer_identity_);
} }
else { else {
blob_t identity (17, 0); blob_t identity (17, 0);
generate_uuid ((unsigned char*) identity.data () + 1); generate_uuid ((unsigned char*) identity.data () + 1);
xattach_pipes (inpipe_, outpipe_, identity); xattach_pipe (pipe_, identity);
} }
} }
...@@ -378,11 +378,6 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -378,11 +378,6 @@ int zmq::socket_base_t::connect (const char *addr_)
if (!peer.socket) if (!peer.socket)
return -1; return -1;
reader_t *inpipe_reader = NULL;
writer_t *inpipe_writer = NULL;
reader_t *outpipe_reader = NULL;
writer_t *outpipe_writer = NULL;
// The total HWM for an inproc connection should be the sum of // The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM. // the binder's HWM and the connector's HWM.
int sndhwm; int sndhwm;
...@@ -396,24 +391,21 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -396,24 +391,21 @@ int zmq::socket_base_t::connect (const char *addr_)
else else
rcvhwm = options.rcvhwm + peer.options.sndhwm; rcvhwm = options.rcvhwm + peer.options.sndhwm;
// Create inbound pipe, if required. // Create a bi-directional pipe to connect the peers.
if (options.requires_in) object_t *parents [2] = {this, peer.socket};
create_pipe (this, peer.socket, rcvhwm, &inpipe_reader, pipe_t *pipes [2] = {NULL, NULL};
&inpipe_writer); int hwms [2] = {sndhwm, rcvhwm};
bool delays [2] = {true, true};
// Create outbound pipe, if required. int rc = pipepair (parents, pipes, hwms, delays);
if (options.requires_out) errno_assert (rc == 0);
create_pipe (peer.socket, this, sndhwm, &outpipe_reader,
&outpipe_writer);
// Attach the pipes to this socket object. // Attach local end of the pipe to this socket object.
attach_pipes (inpipe_reader, outpipe_writer, peer.options.identity); attach_pipe (pipes [0], peer.options.identity);
// Attach the pipes to the peer socket. Note that peer's seqnum // Attach remote end of the pipe to the peer socket. Note that peer's
// was incremented in find_endpoint function. We don't need it // seqnum was incremented in find_endpoint function. We don't need it
// increased here. // increased here.
send_bind (peer.socket, outpipe_reader, inpipe_writer, send_bind (peer.socket, pipes [1], options.identity, false);
options.identity, false);
return 0; return 0;
} }
...@@ -435,26 +427,19 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -435,26 +427,19 @@ int zmq::socket_base_t::connect (const char *addr_)
// session once the connection is established. // session once the connection is established.
if (options.immediate_connect) { if (options.immediate_connect) {
reader_t *inpipe_reader = NULL; // Create a bi-directional pipe.
writer_t *inpipe_writer = NULL; object_t *parents [2] = {this, session};
reader_t *outpipe_reader = NULL; pipe_t *pipes [2] = {NULL, NULL};
writer_t *outpipe_writer = NULL; int hwms [2] = {options.sndhwm, options.rcvhwm};
bool delays [2] = {true, true};
// Create inbound pipe, if required. int rc = pipepair (parents, pipes, hwms, delays);
if (options.requires_in) errno_assert (rc == 0);
create_pipe (this, session, options.rcvhwm,
&inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out)
create_pipe (session, this, options.sndhwm,
&outpipe_reader, &outpipe_writer);
// Attach the pipes to the socket object. // Attach local end of the pipe to the socket object.
attach_pipes (inpipe_reader, outpipe_writer, blob_t ()); attach_pipe (pipes [0], blob_t ());
// Attach the pipes to the session object. // Attach remote end of the pipe to the session object.
session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ()); session->attach_pipe (pipes [1], blob_t ());
} }
// Activate the session. Make it a child of this socket. // Activate the session. Make it a child of this socket.
...@@ -718,10 +703,10 @@ void zmq::socket_base_t::process_stop () ...@@ -718,10 +703,10 @@ void zmq::socket_base_t::process_stop ()
ctx_terminated = true; ctx_terminated = true;
} }
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, void zmq::socket_base_t::process_bind (pipe_t *pipe_,
const blob_t &peer_identity_) const blob_t &peer_identity_)
{ {
attach_pipes (in_pipe_, out_pipe_, peer_identity_); attach_pipe (pipe_, peer_identity_);
} }
void zmq::socket_base_t::process_unplug () void zmq::socket_base_t::process_unplug ()
......
...@@ -110,8 +110,8 @@ namespace zmq ...@@ -110,8 +110,8 @@ namespace zmq
// Concrete algorithms for the x- methods are to be defined by // Concrete algorithms for the x- methods are to be defined by
// individual socket types. // individual socket types.
virtual void xattach_pipes (class reader_t *inpipe_, virtual void xattach_pipe (class pipe_t *pipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) = 0; const blob_t &peer_identity_) = 0;
// The default implementation assumes there are no specific socket // The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, overload this // options for the particular socket type. If not so, overload this
...@@ -156,9 +156,8 @@ namespace zmq ...@@ -156,9 +156,8 @@ namespace zmq
// bind, is available and compatible with the socket type. // bind, is available and compatible with the socket type.
int check_protocol (const std::string &protocol_); int check_protocol (const std::string &protocol_);
// If no identity set generate one and call xattach_pipes (). // If no identity is set, generate one and call xattach_pipe ().
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void attach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
// Processes commands sent to this socket (if any). If 'block' is // Processes commands sent to this socket (if any). If 'block' is
// set to true, returns only after at least one command was processed. // set to true, returns only after at least one command was processed.
...@@ -168,8 +167,7 @@ namespace zmq ...@@ -168,8 +167,7 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_stop (); void process_stop ();
void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, void process_bind (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
void process_unplug (); void process_unplug ();
// Socket's mailbox object. // Socket's mailbox object.
......
...@@ -28,19 +28,33 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -28,19 +28,33 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
dist (this) dist (this)
{ {
options.type = ZMQ_XPUB; options.type = ZMQ_XPUB;
options.requires_in = false;
options.requires_out = true;
} }
zmq::xpub_t::~xpub_t () zmq::xpub_t::~xpub_t ()
{ {
} }
void zmq::xpub_t::xattach_pipes (class reader_t *inpipe_, void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
zmq_assert (!inpipe_ && outpipe_); zmq_assert (pipe_);
dist.attach (outpipe_); pipe_->set_event_sink (this);
dist.attach (pipe_);
}
void zmq::xpub_t::read_activated (pipe_t *pipe_)
{
// PUB socket never receives messages. This should never happen.
zmq_assert (false);
}
void zmq::xpub_t::write_activated (pipe_t *pipe_)
{
dist.activated (pipe_);
}
void zmq::xpub_t::terminated (pipe_t *pipe_)
{
dist.terminated (pipe_);
} }
void zmq::xpub_t::process_term (int linger_) void zmq::xpub_t::process_term (int linger_)
......
...@@ -29,7 +29,9 @@ ...@@ -29,7 +29,9 @@
namespace zmq namespace zmq
{ {
class xpub_t : public socket_base_t class xpub_t :
public socket_base_t,
public i_pipe_events
{ {
public: public:
...@@ -37,8 +39,7 @@ namespace zmq ...@@ -37,8 +39,7 @@ namespace zmq
~xpub_t (); ~xpub_t ();
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
...@@ -46,6 +47,11 @@ namespace zmq ...@@ -46,6 +47,11 @@ namespace zmq
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process. // Hook into the termination process.
void process_term (int linger_); void process_term (int linger_);
......
...@@ -32,8 +32,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -32,8 +32,6 @@ zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
terminating (false) terminating (false)
{ {
options.type = ZMQ_XREP; options.type = ZMQ_XREP;
options.requires_in = true;
options.requires_out = true;
// On connect, pipes are created only after initial handshaking. // On connect, pipes are created only after initial handshaking.
// That way we are aware of the peer's identity when binding to the pipes. // That way we are aware of the peer's identity when binding to the pipes.
...@@ -46,36 +44,26 @@ zmq::xrep_t::~xrep_t () ...@@ -46,36 +44,26 @@ zmq::xrep_t::~xrep_t ()
zmq_assert (outpipes.empty ()); zmq_assert (outpipes.empty ());
} }
void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
const blob_t &peer_identity_)
{ {
if (outpipe_) { zmq_assert (pipe_);
pipe_->set_event_sink (this);
outpipe_->set_event_sink (this);
// Add the pipe to the map out outbound pipes.
// TODO: What if new connection has same peer identity as the old one? // TODO: What if new connection has same peer identity as the old one?
outpipe_t outpipe = {outpipe_, true}; outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type ( bool ok = outpipes.insert (outpipes_t::value_type (
peer_identity_, outpipe)).second; peer_identity_, outpipe)).second;
zmq_assert (ok); zmq_assert (ok);
if (terminating) { // Add the pipe to the list of inbound pipes.
register_term_acks (1); inpipe_t inpipe = {pipe_, peer_identity_, true};
outpipe_->terminate ();
}
}
if (inpipe_) {
inpipe_->set_event_sink (this);
inpipe_t inpipe = {inpipe_, peer_identity_, true};
inpipes.push_back (inpipe); inpipes.push_back (inpipe);
// In case we are already terminating, ask this pipe to terminate as well.
if (terminating) { if (terminating) {
register_term_acks (1); register_term_acks (1);
inpipe_->terminate (); pipe_->terminate ();
}
} }
} }
...@@ -85,21 +73,17 @@ void zmq::xrep_t::process_term (int linger_) ...@@ -85,21 +73,17 @@ void zmq::xrep_t::process_term (int linger_)
register_term_acks ((int) (inpipes.size () + outpipes.size ())); register_term_acks ((int) (inpipes.size () + outpipes.size ()));
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); ++it)
++it) it->pipe->terminate ();
it->reader->terminate ();
for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
++it)
it->second.writer->terminate ();
socket_base_t::process_term (linger_); socket_base_t::process_term (linger_);
} }
void zmq::xrep_t::terminated (reader_t *pipe_) void zmq::xrep_t::terminated (pipe_t *pipe_)
{ {
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) { ++it) {
if (it->reader == pipe_) { if (it->pipe == pipe_) {
if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in) if ((inpipes_t::size_type) (it - inpipes.begin ()) < current_in)
current_in--; current_in--;
inpipes.erase (it); inpipes.erase (it);
...@@ -107,17 +91,15 @@ void zmq::xrep_t::terminated (reader_t *pipe_) ...@@ -107,17 +91,15 @@ void zmq::xrep_t::terminated (reader_t *pipe_)
current_in = 0; current_in = 0;
if (terminating) if (terminating)
unregister_term_ack (); unregister_term_ack ();
return; goto clean_outpipes;
} }
} }
zmq_assert (false); zmq_assert (false);
}
void zmq::xrep_t::terminated (writer_t *pipe_) clean_outpipes:
{
for (outpipes_t::iterator it = outpipes.begin (); for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) { it != outpipes.end (); ++it) {
if (it->second.writer == pipe_) { if (it->second.pipe == pipe_) {
outpipes.erase (it); outpipes.erase (it);
if (pipe_ == current_out) if (pipe_ == current_out)
current_out = NULL; current_out = NULL;
...@@ -129,15 +111,11 @@ void zmq::xrep_t::terminated (writer_t *pipe_) ...@@ -129,15 +111,11 @@ void zmq::xrep_t::terminated (writer_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::xrep_t::delimited (reader_t *pipe_) void zmq::xrep_t::read_activated (pipe_t *pipe_)
{
}
void zmq::xrep_t::activated (reader_t *pipe_)
{ {
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
++it) { ++it) {
if (it->reader == pipe_) { if (it->pipe == pipe_) {
zmq_assert (!it->active); zmq_assert (!it->active);
it->active = true; it->active = true;
return; return;
...@@ -146,11 +124,11 @@ void zmq::xrep_t::activated (reader_t *pipe_) ...@@ -146,11 +124,11 @@ void zmq::xrep_t::activated (reader_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
void zmq::xrep_t::activated (writer_t *pipe_) void zmq::xrep_t::write_activated (pipe_t *pipe_)
{ {
for (outpipes_t::iterator it = outpipes.begin (); for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) { it != outpipes.end (); ++it) {
if (it->second.writer == pipe_) { if (it->second.pipe == pipe_) {
zmq_assert (!it->second.active); zmq_assert (!it->second.active);
it->second.active = true; it->second.active = true;
return; return;
...@@ -178,7 +156,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) ...@@ -178,7 +156,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
outpipes_t::iterator it = outpipes.find (identity); outpipes_t::iterator it = outpipes.find (identity);
if (it != outpipes.end ()) { if (it != outpipes.end ()) {
current_out = it->second.writer; current_out = it->second.pipe;
msg_t empty; msg_t empty;
int rc = empty.init (); int rc = empty.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -245,7 +223,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) ...@@ -245,7 +223,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// If we are in the middle of reading a message, just grab next part of it. // If we are in the middle of reading a message, just grab next part of it.
if (more_in) { if (more_in) {
zmq_assert (inpipes [current_in].active); zmq_assert (inpipes [current_in].active);
bool fetched = inpipes [current_in].reader->read (msg_); bool fetched = inpipes [current_in].pipe->read (msg_);
zmq_assert (fetched); zmq_assert (fetched);
more_in = msg_->flags () & msg_t::more; more_in = msg_->flags () & msg_t::more;
if (!more_in) { if (!more_in) {
...@@ -261,7 +239,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) ...@@ -261,7 +239,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
// Try to fetch new message. // Try to fetch new message.
if (inpipes [current_in].active) if (inpipes [current_in].active)
prefetched = inpipes [current_in].reader->read (&prefetched_msg); prefetched = inpipes [current_in].pipe->read (&prefetched_msg);
// If we have a message, create a prefix and return it to the caller. // If we have a message, create a prefix and return it to the caller.
if (prefetched) { if (prefetched) {
...@@ -311,7 +289,7 @@ bool zmq::xrep_t::xhas_in () ...@@ -311,7 +289,7 @@ bool zmq::xrep_t::xhas_in ()
// pipe holding messages, skipping only pipes with no messages available. // pipe holding messages, skipping only pipes with no messages available.
for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) { for (inpipes_t::size_type count = inpipes.size (); count != 0; count--) {
if (inpipes [current_in].active && if (inpipes [current_in].active &&
inpipes [current_in].reader->check_read ()) inpipes [current_in].pipe->check_read ())
return true; return true;
// If me don't have a message, mark the pipe as passive and // If me don't have a message, mark the pipe as passive and
......
...@@ -35,8 +35,7 @@ namespace zmq ...@@ -35,8 +35,7 @@ namespace zmq
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class xrep_t : class xrep_t :
public socket_base_t, public socket_base_t,
public i_reader_events, public i_pipe_events
public i_writer_events
{ {
public: public:
...@@ -44,8 +43,7 @@ namespace zmq ...@@ -44,8 +43,7 @@ namespace zmq
~xrep_t (); ~xrep_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_, void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
...@@ -61,18 +59,14 @@ namespace zmq ...@@ -61,18 +59,14 @@ namespace zmq
// Hook into the termination process. // Hook into the termination process.
void process_term (int linger_); void process_term (int linger_);
// i_reader_events interface implementation. // i_pipe_events interface implementation.
void activated (reader_t *pipe_); void read_activated (pipe_t *pipe_);
void terminated (reader_t *pipe_); void write_activated (pipe_t *pipe_);
void delimited (reader_t *pipe_); void terminated (pipe_t *pipe_);
// i_writer_events interface implementation.
void activated (writer_t *pipe_);
void terminated (writer_t *pipe_);
struct inpipe_t struct inpipe_t
{ {
class reader_t *reader; class pipe_t *pipe;
blob_t identity; blob_t identity;
bool active; bool active;
}; };
...@@ -95,7 +89,7 @@ namespace zmq ...@@ -95,7 +89,7 @@ namespace zmq
struct outpipe_t struct outpipe_t
{ {
class writer_t *writer; class pipe_t *pipe;
bool active; bool active;
}; };
...@@ -104,7 +98,7 @@ namespace zmq ...@@ -104,7 +98,7 @@ namespace zmq
outpipes_t outpipes; outpipes_t outpipes;
// The pipe we are currently writing to. // The pipe we are currently writing to.
class writer_t *current_out; class pipe_t *current_out;
// If true, more outgoing message parts are expected. // If true, more outgoing message parts are expected.
bool more_out; bool more_out;
......
...@@ -28,20 +28,18 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -28,20 +28,18 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) :
lb (this) lb (this)
{ {
options.type = ZMQ_XREQ; options.type = ZMQ_XREQ;
options.requires_in = true;
options.requires_out = true;
} }
zmq::xreq_t::~xreq_t () zmq::xreq_t::~xreq_t ()
{ {
} }
void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, void zmq::xreq_t::xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
zmq_assert (inpipe_ && outpipe_); zmq_assert (pipe_);
fq.attach (inpipe_); pipe_->set_event_sink (this);
lb.attach (outpipe_); fq.attach (pipe_);
lb.attach (pipe_);
} }
void zmq::xreq_t::process_term (int linger_) void zmq::xreq_t::process_term (int linger_)
...@@ -71,3 +69,19 @@ bool zmq::xreq_t::xhas_out () ...@@ -71,3 +69,19 @@ bool zmq::xreq_t::xhas_out ()
return lb.has_out (); return lb.has_out ();
} }
void zmq::xreq_t::read_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
void zmq::xreq_t::write_activated (pipe_t *pipe_)
{
lb.activated (pipe_);
}
void zmq::xreq_t::terminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
lb.terminated (pipe_);
}
...@@ -23,13 +23,16 @@ ...@@ -23,13 +23,16 @@
#define __ZMQ_XREQ_HPP_INCLUDED__ #define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pipe.hpp"
#include "fq.hpp" #include "fq.hpp"
#include "lb.hpp" #include "lb.hpp"
namespace zmq namespace zmq
{ {
class xreq_t : public socket_base_t class xreq_t :
public socket_base_t,
public i_pipe_events
{ {
public: public:
...@@ -39,8 +42,7 @@ namespace zmq ...@@ -39,8 +42,7 @@ namespace zmq
protected: protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int flags_); int xsend (class msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
...@@ -48,6 +50,11 @@ namespace zmq ...@@ -48,6 +50,11 @@ namespace zmq
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process. // Hook into the termination process.
void process_term (int linger_); void process_term (int linger_);
......
...@@ -30,8 +30,6 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) : ...@@ -30,8 +30,6 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_) :
more (false) more (false)
{ {
options.type = ZMQ_XSUB; options.type = ZMQ_XSUB;
options.requires_in = true;
options.requires_out = false;
int rc = message.init (); int rc = message.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
} }
...@@ -42,11 +40,27 @@ zmq::xsub_t::~xsub_t () ...@@ -42,11 +40,27 @@ zmq::xsub_t::~xsub_t ()
errno_assert (rc == 0); errno_assert (rc == 0);
} }
void zmq::xsub_t::xattach_pipes (class reader_t *inpipe_, void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
class writer_t *outpipe_, const blob_t &peer_identity_)
{ {
zmq_assert (inpipe_ && !outpipe_); zmq_assert (pipe_);
fq.attach (inpipe_); pipe_->set_event_sink (this);
fq.attach (pipe_);
}
void zmq::xsub_t::read_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
void zmq::xsub_t::write_activated (pipe_t *pipe_)
{
// SUB socket never sends messages. This should never happen.
zmq_assert (false);
}
void zmq::xsub_t::terminated (pipe_t *pipe_)
{
fq.terminated (pipe_);
} }
void zmq::xsub_t::process_term (int linger_) void zmq::xsub_t::process_term (int linger_)
......
...@@ -23,13 +23,16 @@ ...@@ -23,13 +23,16 @@
#include "trie.hpp" #include "trie.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pipe.hpp"
#include "msg.hpp" #include "msg.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
class xsub_t : public socket_base_t class xsub_t :
public socket_base_t,
public i_pipe_events
{ {
public: public:
...@@ -39,8 +42,7 @@ namespace zmq ...@@ -39,8 +42,7 @@ namespace zmq
protected: protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_, void xattach_pipe (class pipe_t *pipe_, const blob_t &peer_identity_);
const blob_t &peer_identity_);
int xsend (class msg_t *msg_, int options_); int xsend (class msg_t *msg_, int options_);
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_); int xrecv (class msg_t *msg_, int flags_);
...@@ -48,6 +50,11 @@ namespace zmq ...@@ -48,6 +50,11 @@ namespace zmq
private: private:
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_);
// Hook into the termination process. // Hook into the termination process.
void process_term (int linger_); void process_term (int linger_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment