Commit b15f6959 authored by Martin Sustrik's avatar Martin Sustrik

different fixes to req/rep

parent cb1b6fe3
...@@ -88,10 +88,6 @@ int zmq::dispatcher_t::term () ...@@ -88,10 +88,6 @@ int zmq::dispatcher_t::term ()
zmq::dispatcher_t::~dispatcher_t () zmq::dispatcher_t::~dispatcher_t ()
{ {
// Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
delete app_threads [i];
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O // Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up. // thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
...@@ -101,6 +97,10 @@ zmq::dispatcher_t::~dispatcher_t () ...@@ -101,6 +97,10 @@ zmq::dispatcher_t::~dispatcher_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i]; delete io_threads [i];
// Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
delete app_threads [i];
// Deallocate all the orphaned pipes. // Deallocate all the orphaned pipes.
while (!pipes.empty ()) while (!pipes.empty ())
delete *pipes.begin (); delete *pipes.begin ();
......
...@@ -222,22 +222,22 @@ void zmq::fd_signaler_t::signal (int signal_) ...@@ -222,22 +222,22 @@ void zmq::fd_signaler_t::signal (int signal_)
uint64_t zmq::fd_signaler_t::poll () uint64_t zmq::fd_signaler_t::poll ()
{ {
// If there are signals available, return straight away. unsigned char buffer [64];
uint64_t signals = check (); ssize_t nbytes = recv (r, buffer, 64, 0);
if (signals) zmq_assert (nbytes != -1);
return signals;
// If there are no signals, wait until at least one signal arrives. uint64_t signals = 0;
unsigned char sig; for (int pos = 0; pos != nbytes; pos ++) {
ssize_t nbytes = recv (r, &sig, 1, 0); zmq_assert (buffer [pos] < 64);
errno_assert (nbytes != -1); signals |= (uint64_t (1) << (buffer [pos]));
return uint64_t (1) << sig; }
return signals;
} }
uint64_t zmq::fd_signaler_t::check () uint64_t zmq::fd_signaler_t::check ()
{ {
unsigned char buffer [32]; unsigned char buffer [64];
ssize_t nbytes = recv (r, buffer, 32, MSG_DONTWAIT); ssize_t nbytes = recv (r, buffer, 64, MSG_DONTWAIT);
if (nbytes == -1 && errno == EAGAIN) if (nbytes == -1 && errno == EAGAIN)
return 0; return 0;
zmq_assert (nbytes != -1); zmq_assert (nbytes != -1);
......
...@@ -29,7 +29,9 @@ zmq::options_t::options_t () : ...@@ -29,7 +29,9 @@ zmq::options_t::options_t () :
affinity (0), affinity (0),
rate (100), rate (100),
recovery_ivl (10), recovery_ivl (10),
use_multicast_loop (false) use_multicast_loop (false),
requires_in (false),
requires_out (false)
{ {
} }
......
...@@ -48,6 +48,11 @@ namespace zmq ...@@ -48,6 +48,11 @@ namespace zmq
// Enable multicast loopback. Default disabled (false). // Enable multicast loopback. Default disabled (false).
bool use_multicast_loop; bool use_multicast_loop;
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;
bool requires_out;
}; };
} }
......
...@@ -23,24 +23,16 @@ ...@@ -23,24 +23,16 @@
#include "err.hpp" #include "err.hpp"
zmq::p2p_t::p2p_t (class app_thread_t *parent_) : zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_P2P) socket_base_t (parent_)
{ {
options.requires_in = true;
options.requires_out = true;
} }
zmq::p2p_t::~p2p_t () zmq::p2p_t::~p2p_t ()
{ {
} }
bool zmq::p2p_t::xrequires_in ()
{
return true;
}
bool zmq::p2p_t::xrequires_out ()
{
return true;
}
void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_, void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
......
...@@ -33,8 +33,6 @@ namespace zmq ...@@ -33,8 +33,6 @@ namespace zmq
~p2p_t (); ~p2p_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
bool xrequires_in ();
bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_); void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
......
...@@ -25,8 +25,10 @@ ...@@ -25,8 +25,10 @@
#include "pipe.hpp" #include "pipe.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) : zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_PUB) socket_base_t (parent_)
{ {
options.requires_in = false;
options.requires_out = true;
} }
zmq::pub_t::~pub_t () zmq::pub_t::~pub_t ()
...@@ -36,16 +38,6 @@ zmq::pub_t::~pub_t () ...@@ -36,16 +38,6 @@ zmq::pub_t::~pub_t ()
out_pipes.clear (); out_pipes.clear ();
} }
bool zmq::pub_t::xrequires_in ()
{
return false;
}
bool zmq::pub_t::xrequires_out ()
{
return true;
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_, void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
......
...@@ -34,8 +34,6 @@ namespace zmq ...@@ -34,8 +34,6 @@ namespace zmq
~pub_t (); ~pub_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
bool xrequires_in ();
bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_); void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
......
...@@ -24,28 +24,20 @@ ...@@ -24,28 +24,20 @@
#include "pipe.hpp" #include "pipe.hpp"
zmq::rep_t::rep_t (class app_thread_t *parent_) : zmq::rep_t::rep_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_REP), socket_base_t (parent_),
active (0), active (0),
current (0), current (0),
waiting_for_reply (false), waiting_for_reply (false),
reply_pipe (NULL) reply_pipe (NULL)
{ {
options.requires_in = true;
options.requires_out = true;
} }
zmq::rep_t::~rep_t () zmq::rep_t::~rep_t ()
{ {
} }
bool zmq::rep_t::xrequires_in ()
{
return true;
}
bool zmq::rep_t::xrequires_out ()
{
return true;
}
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_, void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
......
...@@ -34,8 +34,6 @@ namespace zmq ...@@ -34,8 +34,6 @@ namespace zmq
~rep_t (); ~rep_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
bool xrequires_in ();
bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_); void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
......
...@@ -24,28 +24,20 @@ ...@@ -24,28 +24,20 @@
#include "pipe.hpp" #include "pipe.hpp"
zmq::req_t::req_t (class app_thread_t *parent_) : zmq::req_t::req_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_REQ), socket_base_t (parent_),
current (0), current (0),
waiting_for_reply (false), waiting_for_reply (false),
reply_pipe_active (false), reply_pipe_active (false),
reply_pipe (NULL) reply_pipe (NULL)
{ {
options.requires_in = true;
options.requires_out = true;
} }
zmq::req_t::~req_t () zmq::req_t::~req_t ()
{ {
} }
bool zmq::req_t::xrequires_in ()
{
return true;
}
bool zmq::req_t::xrequires_out ()
{
return true;
}
void zmq::req_t::xattach_pipes (class reader_t *inpipe_, void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
......
...@@ -34,8 +34,6 @@ namespace zmq ...@@ -34,8 +34,6 @@ namespace zmq
~req_t (); ~req_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
bool xrequires_in ();
bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_); void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
......
...@@ -43,7 +43,7 @@ zmq::session_t::~session_t () ...@@ -43,7 +43,7 @@ zmq::session_t::~session_t ()
bool zmq::session_t::read (::zmq_msg_t *msg_) bool zmq::session_t::read (::zmq_msg_t *msg_)
{ {
if (!active) if (!in_pipe || !active)
return false; return false;
return in_pipe->read (msg_); return in_pipe->read (msg_);
...@@ -51,8 +51,9 @@ bool zmq::session_t::read (::zmq_msg_t *msg_) ...@@ -51,8 +51,9 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
bool zmq::session_t::write (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_)
{ {
if (!out_pipe) // The communication is unidirectional.
return true; // We don't expect any message to arrive.
zmq_assert (out_pipe);
if (out_pipe->write (msg_)) { if (out_pipe->write (msg_)) {
zmq_msg_init (msg_); zmq_msg_init (msg_);
...@@ -136,15 +137,26 @@ void zmq::session_t::process_plug () ...@@ -136,15 +137,26 @@ void zmq::session_t::process_plug ()
// already. Otherwise, it's being created by the listener and the pipes // already. Otherwise, it's being created by the listener and the pipes
// are yet to be created. // are yet to be created.
if (!in_pipe && !out_pipe) { if (!in_pipe && !out_pipe) {
pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);
pipe_t *inbound = NULL;
pipe_t *outbound = NULL;
if (options.requires_out) {
inbound = new pipe_t (this, owner, options.hwm, options.lwm);
zmq_assert (inbound); zmq_assert (inbound);
in_pipe = &inbound->reader; in_pipe = &inbound->reader;
in_pipe->set_endpoint (this); in_pipe->set_endpoint (this);
pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm); }
if (options.requires_in) {
outbound = new pipe_t (owner, this, options.hwm, options.lwm);
zmq_assert (outbound); zmq_assert (outbound);
out_pipe = &outbound->writer; out_pipe = &outbound->writer;
out_pipe->set_endpoint (this); out_pipe->set_endpoint (this);
send_bind (owner, this, &outbound->reader, &inbound->writer); }
send_bind (owner, this, outbound ? &outbound->reader : NULL,
inbound ? &inbound->writer : NULL);
} }
owned_t::process_plug (); owned_t::process_plug ();
......
...@@ -38,9 +38,8 @@ ...@@ -38,9 +38,8 @@
#include "pgm_sender.hpp" #include "pgm_sender.hpp"
#include "pgm_receiver.hpp" #include "pgm_receiver.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) : zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_), object_t (parent_),
type (type_),
pending_term_acks (0), pending_term_acks (0),
ticks (0), ticks (0),
app_thread (parent_), app_thread (parent_),
...@@ -137,14 +136,14 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -137,14 +136,14 @@ int zmq::socket_base_t::connect (const char *addr_)
pipe_t *out_pipe = NULL; pipe_t *out_pipe = NULL;
// Create inbound pipe, if required. // Create inbound pipe, if required.
if (xrequires_in ()) { if (options.requires_in) {
in_pipe = new pipe_t (this, session, options.hwm, options.lwm); in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
zmq_assert (in_pipe); zmq_assert (in_pipe);
} }
// Create outbound pipe, if required. // Create outbound pipe, if required.
if (xrequires_out ()) { if (options.requires_out) {
out_pipe = new pipe_t (session, this, options.hwm, options.lwm); out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe); zmq_assert (out_pipe);
} }
...@@ -163,8 +162,9 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -163,8 +162,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "tcp") { if (addr_type == "tcp") {
// Create the connecter object. Supply it with the session name so that // Create the connecter object. Supply it with the session name
// it can bind the new connection to the session once it is established. // so that it can bind the new connection to the session once
// it is established.
zmq_connecter_t *connecter = new zmq_connecter_t ( zmq_connecter_t *connecter = new zmq_connecter_t (
choose_io_thread (options.affinity), this, options, choose_io_thread (options.affinity), this, options,
session_name.c_str (), false); session_name.c_str (), false);
...@@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -184,7 +184,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// If the socket type requires bi-directional communication // If the socket type requires bi-directional communication
// multicast is not an option (it is uni-directional). // multicast is not an option (it is uni-directional).
if (xrequires_in () && xrequires_out ()) { if (options.requires_in && options.requires_out) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
...@@ -194,11 +194,9 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -194,11 +194,9 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "udp") if (addr_type == "udp")
udp_encapsulation = true; udp_encapsulation = true;
switch (type) { if (options.requires_out) {
// PGM sender. // PGM sender.
case ZMQ_PUB:
{
pgm_sender_t *pgm_sender = pgm_sender_t *pgm_sender =
new pgm_sender_t (choose_io_thread (options.affinity), options, new pgm_sender_t (choose_io_thread (options.affinity), options,
session_name.c_str ()); session_name.c_str ());
...@@ -212,15 +210,10 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -212,15 +210,10 @@ int zmq::socket_base_t::connect (const char *addr_)
// Reserve a sequence number for following 'attach' command. // Reserve a sequence number for following 'attach' command.
session->inc_seqnum (); session->inc_seqnum ();
send_attach (session, pgm_sender); send_attach (session, pgm_sender);
pgm_sender = NULL;
break;
} }
else if (options.requires_in) {
// PGM receiver. // PGM receiver.
case ZMQ_SUB:
{
pgm_receiver_t *pgm_receiver = pgm_receiver_t *pgm_receiver =
new pgm_receiver_t (choose_io_thread (options.affinity), options, new pgm_receiver_t (choose_io_thread (options.affinity), options,
session_name.c_str ()); session_name.c_str ());
...@@ -234,16 +227,9 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -234,16 +227,9 @@ int zmq::socket_base_t::connect (const char *addr_)
// Reserve a sequence number for following 'attach' command. // Reserve a sequence number for following 'attach' command.
session->inc_seqnum (); session->inc_seqnum ();
send_attach (session, pgm_receiver); send_attach (session, pgm_receiver);
pgm_receiver = NULL;
break;
}
default:
errno = EINVAL;
return -1;
} }
else
zmq_assert (false);
return 0; return 0;
} }
......
...@@ -40,7 +40,7 @@ namespace zmq ...@@ -40,7 +40,7 @@ namespace zmq
{ {
public: public:
socket_base_t (class app_thread_t *parent_, int type_); socket_base_t (class app_thread_t *parent_);
// Interface for communication with the API layer. // Interface for communication with the API layer.
int setsockopt (int option_, const void *optval_, int setsockopt (int option_, const void *optval_,
...@@ -73,8 +73,6 @@ namespace zmq ...@@ -73,8 +73,6 @@ namespace zmq
virtual ~socket_base_t (); virtual ~socket_base_t ();
// Pipe management is done by individual socket types. // Pipe management is done by individual socket types.
virtual bool xrequires_in () = 0;
virtual bool xrequires_out () = 0;
virtual void xattach_pipes (class reader_t *inpipe_, virtual void xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) = 0; class writer_t *outpipe_) = 0;
virtual void xdetach_inpipe (class reader_t *pipe_) = 0; virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
...@@ -89,6 +87,9 @@ namespace zmq ...@@ -89,6 +87,9 @@ namespace zmq
virtual int xflush () = 0; virtual int xflush () = 0;
virtual int xrecv (struct zmq_msg_t *msg_, int options_) = 0; virtual int xrecv (struct zmq_msg_t *msg_, int options_) = 0;
// Socket options.
options_t options;
private: private:
// Handlers for incoming commands. // Handlers for incoming commands.
...@@ -98,9 +99,6 @@ namespace zmq ...@@ -98,9 +99,6 @@ namespace zmq
void process_term_req (class owned_t *object_); void process_term_req (class owned_t *object_);
void process_term_ack (); void process_term_ack ();
// Type of the socket.
int type;
// List of all I/O objects owned by this socket. The socket is // List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits. // responsible for deallocating them before it quits.
typedef std::set <class owned_t*> io_objects_t; typedef std::set <class owned_t*> io_objects_t;
...@@ -116,9 +114,6 @@ namespace zmq ...@@ -116,9 +114,6 @@ namespace zmq
// Application thread the socket lives in. // Application thread the socket lives in.
class app_thread_t *app_thread; class app_thread_t *app_thread;
// Socket options.
options_t options;
// If true, socket is already shutting down. No new work should be // If true, socket is already shutting down. No new work should be
// started. // started.
bool shutting_down; bool shutting_down;
......
...@@ -24,11 +24,13 @@ ...@@ -24,11 +24,13 @@
#include "pipe.hpp" #include "pipe.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) : zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_SUB), socket_base_t (parent_),
active (0), active (0),
current (0), current (0),
all_count (0) all_count (0)
{ {
options.requires_in = true;
options.requires_out = false;
} }
zmq::sub_t::~sub_t () zmq::sub_t::~sub_t ()
...@@ -38,16 +40,6 @@ zmq::sub_t::~sub_t () ...@@ -38,16 +40,6 @@ zmq::sub_t::~sub_t ()
in_pipes.clear (); in_pipes.clear ();
} }
bool zmq::sub_t::xrequires_in ()
{
return true;
}
bool zmq::sub_t::xrequires_out ()
{
return false;
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) class writer_t *outpipe_)
{ {
......
...@@ -39,8 +39,6 @@ namespace zmq ...@@ -39,8 +39,6 @@ namespace zmq
protected: protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
bool xrequires_in ();
bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_); void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_); void xdetach_outpipe (class writer_t *pipe_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment