Commit 90d0435b authored by Martin Lucina's avatar Martin Lucina

Merge branch 'master' of github.com:zeromq/libzmq

parents 991f7e2c a7b0b0d3
Contributors Contributors
============ ============
AJ Lewis <aj.lewis@quantum.com>
Alexej Lotz <alexej.lotz@arcor.de> Alexej Lotz <alexej.lotz@arcor.de>
Andrew Thompson <andy@fud.org.nz> Andrew Thompson <andy@fud.org.nz>
Asko Kauppi <askok@dnainternet.net> Asko Kauppi <askok@dnainternet.net>
...@@ -40,7 +41,7 @@ Jon Dyte <jon@totient.co.uk> ...@@ -40,7 +41,7 @@ Jon Dyte <jon@totient.co.uk>
Kamil Shakirov <kamils80@gmail.com> Kamil Shakirov <kamils80@gmail.com>
Marc Rossi <mrossi19@gmail.com> Marc Rossi <mrossi19@gmail.com>
Martin Hurton <hurtonm@gmail.com> Martin Hurton <hurtonm@gmail.com>
Martin Lucina <mato@kotelna.sk> Martin Lucina <martin@lucina.net>
Martin Pales <m.pales@gmail.com> Martin Pales <m.pales@gmail.com>
Martin Sustrik <sustrik@250bpm.com> Martin Sustrik <sustrik@250bpm.com>
Matus Hamorsky <mhamorsky@gmail.com> Matus Hamorsky <mhamorsky@gmail.com>
......
...@@ -27,12 +27,18 @@ ...@@ -27,12 +27,18 @@
namespace zmq namespace zmq
{ {
class object_t;
class own_t;
struct i_engine;
class pipe_t;
class socket_base_t;
// This structure defines the commands that can be sent between threads. // This structure defines the commands that can be sent between threads.
struct command_t struct command_t
{ {
// Object to process the command. // Object to process the command.
class object_t *destination; zmq::object_t *destination;
enum type_t enum type_t
{ {
...@@ -67,7 +73,7 @@ namespace zmq ...@@ -67,7 +73,7 @@ namespace zmq
// Sent to socket to let it know about the newly created object. // Sent to socket to let it know about the newly created object.
struct { struct {
class own_t *object; zmq::own_t *object;
} own; } own;
// Attach the engine to the session. If engine is NULL, it informs // Attach the engine to the session. If engine is NULL, it informs
...@@ -79,7 +85,7 @@ namespace zmq ...@@ -79,7 +85,7 @@ namespace zmq
// Sent from session to socket to establish pipe(s) between them. // Sent from session to socket to establish pipe(s) between them.
// Caller have used inc_seqnum beforehand sending the command. // Caller have used inc_seqnum beforehand sending the command.
struct { struct {
class pipe_t *pipe; zmq::pipe_t *pipe;
} bind; } bind;
// Sent by pipe writer to inform dormant pipe reader that there // Sent by pipe writer to inform dormant pipe reader that there
...@@ -112,7 +118,7 @@ namespace zmq ...@@ -112,7 +118,7 @@ namespace zmq
// Sent by I/O object ot the socket to request the shutdown of // Sent by I/O object ot the socket to request the shutdown of
// the I/O object. // the I/O object.
struct { struct {
class own_t *object; zmq::own_t *object;
} term_req; } term_req;
// Sent by socket to I/O object to start its shutdown. // Sent by socket to I/O object to start its shutdown.
...@@ -128,7 +134,7 @@ namespace zmq ...@@ -128,7 +134,7 @@ namespace zmq
// Transfers the ownership of the closed socket // Transfers the ownership of the closed socket
// to the reaper thread. // to the reaper thread.
struct { struct {
class socket_base_t *socket; zmq::socket_base_t *socket;
} reap; } reap;
// Closed socket notifies the reaper that it's already deallocated. // Closed socket notifies the reaper that it's already deallocated.
......
...@@ -36,6 +36,12 @@ ...@@ -36,6 +36,12 @@
namespace zmq namespace zmq
{ {
class object_t;
class io_thread_t;
class socket_base_t;
class reaper_t;
// Information associated with inproc endpoint. Note that endpoint options // Information associated with inproc endpoint. Note that endpoint options
// are registered as well so that the peer can access them without a need // are registered as well so that the peer can access them without a need
// for synchronisation, handshaking or similar. // for synchronisation, handshaking or similar.
...@@ -66,8 +72,8 @@ namespace zmq ...@@ -66,8 +72,8 @@ namespace zmq
int terminate (); int terminate ();
// Create and destroy a socket. // Create and destroy a socket.
class socket_base_t *create_socket (int type_); zmq::socket_base_t *create_socket (int type_);
void destroy_socket (class socket_base_t *socket_); void destroy_socket (zmq::socket_base_t *socket_);
// Send command to the destination thread. // Send command to the destination thread.
void send_command (uint32_t tid_, const command_t &command_); void send_command (uint32_t tid_, const command_t &command_);
...@@ -75,14 +81,14 @@ namespace zmq ...@@ -75,14 +81,14 @@ namespace zmq
// Returns the I/O thread that is the least busy at the moment. // Returns the I/O thread that is the least busy at the moment.
// Affinity specifies which I/O threads are eligible (0 = all). // Affinity specifies which I/O threads are eligible (0 = all).
// Returns NULL is no I/O thread is available. // Returns NULL is no I/O thread is available.
class io_thread_t *choose_io_thread (uint64_t affinity_); zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
// Returns reaper thread object. // Returns reaper thread object.
class object_t *get_reaper (); zmq::object_t *get_reaper ();
// Management of inproc endpoints. // Management of inproc endpoints.
int register_endpoint (const char *addr_, endpoint_t &endpoint_); int register_endpoint (const char *addr_, endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_); void unregister_endpoints (zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_); endpoint_t find_endpoint (const char *addr_);
// Logging. // Logging.
...@@ -120,10 +126,10 @@ namespace zmq ...@@ -120,10 +126,10 @@ namespace zmq
mutex_t slot_sync; mutex_t slot_sync;
// The reaper thread. // The reaper thread.
class reaper_t *reaper; zmq::reaper_t *reaper;
// I/O threads. // I/O threads.
typedef std::vector <class io_thread_t*> io_threads_t; typedef std::vector <zmq::io_thread_t*> io_threads_t;
io_threads_t io_threads; io_threads_t io_threads;
// Array of pointers to mailboxes for both application and I/O threads. // Array of pointers to mailboxes for both application and I/O threads.
...@@ -142,7 +148,7 @@ namespace zmq ...@@ -142,7 +148,7 @@ namespace zmq
// PUB socket for logging. The socket is shared among all the threads, // PUB socket for logging. The socket is shared among all the threads,
// thus it is synchronised by a mutex. // thus it is synchronised by a mutex.
class socket_base_t *log_socket; zmq::socket_base_t *log_socket;
mutex_t log_sync; mutex_t log_sync;
ctx_t (const ctx_t&); ctx_t (const ctx_t&);
......
...@@ -34,6 +34,8 @@ ...@@ -34,6 +34,8 @@
namespace zmq namespace zmq
{ {
class session_base_t;
// Helper base class for decoders that know the amount of data to read // Helper base class for decoders that know the amount of data to read
// in advance at any moment. Knowing the amount in advance is a property // in advance at any moment. Knowing the amount in advance is a property
// of the protocol used. 0MQ framing protocol is based size-prefixed // of the protocol used. 0MQ framing protocol is based size-prefixed
...@@ -193,7 +195,7 @@ namespace zmq ...@@ -193,7 +195,7 @@ namespace zmq
decoder_t (size_t bufsize_, int64_t maxmsgsize_); decoder_t (size_t bufsize_, int64_t maxmsgsize_);
~decoder_t (); ~decoder_t ();
void set_session (class session_base_t *session_); void set_session (zmq::session_base_t *session_);
private: private:
...@@ -202,7 +204,7 @@ namespace zmq ...@@ -202,7 +204,7 @@ namespace zmq
bool flags_ready (); bool flags_ready ();
bool message_ready (); bool message_ready ();
class session_base_t *session; zmq::session_base_t *session;
unsigned char tmpbuf [8]; unsigned char tmpbuf [8];
msg_t in_progress; msg_t in_progress;
......
...@@ -35,6 +35,8 @@ ...@@ -35,6 +35,8 @@
namespace zmq namespace zmq
{ {
struct i_poll_events;
// Implements socket polling mechanism using the "/dev/poll" interface. // Implements socket polling mechanism using the "/dev/poll" interface.
class devpoll_t : public poller_base_t class devpoll_t : public poller_base_t
...@@ -47,7 +49,7 @@ namespace zmq ...@@ -47,7 +49,7 @@ namespace zmq
~devpoll_t (); ~devpoll_t ();
// "poller" concept. // "poller" concept.
handle_t add_fd (fd_t fd_, struct i_poll_events *events_); handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
...@@ -70,7 +72,7 @@ namespace zmq ...@@ -70,7 +72,7 @@ namespace zmq
struct fd_entry_t struct fd_entry_t
{ {
short events; short events;
struct i_poll_events *reactor; zmq::i_poll_events *reactor;
bool valid; bool valid;
bool accepted; bool accepted;
}; };
......
...@@ -29,6 +29,9 @@ ...@@ -29,6 +29,9 @@
namespace zmq namespace zmq
{ {
class pipe_t;
class msg_t;
// Class manages a set of outbound pipes. It sends each messages to // Class manages a set of outbound pipes. It sends each messages to
// each of them. // each of them.
class dist_t class dist_t
...@@ -39,26 +42,26 @@ namespace zmq ...@@ -39,26 +42,26 @@ namespace zmq
~dist_t (); ~dist_t ();
// Adds the pipe to the distributor object. // Adds the pipe to the distributor object.
void attach (class pipe_t *pipe_); void attach (zmq::pipe_t *pipe_);
// Activates pipe that have previously reached high watermark. // Activates pipe that have previously reached high watermark.
void activated (class pipe_t *pipe_); void activated (zmq::pipe_t *pipe_);
// Mark the pipe as matching. Subsequent call to send_to_matching // Mark the pipe as matching. Subsequent call to send_to_matching
// will send message also to this pipe. // will send message also to this pipe.
void match (class pipe_t *pipe_); void match (zmq::pipe_t *pipe_);
// Mark all pipes as non-matching. // Mark all pipes as non-matching.
void unmatch (); void unmatch ();
// Removes the pipe from the distributor object. // Removes the pipe from the distributor object.
void terminated (class pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_);
// Send the message to the matching outbound pipes. // Send the message to the matching outbound pipes.
int send_to_matching (class msg_t *msg_, int flags_); int send_to_matching (zmq::msg_t *msg_, int flags_);
// Send the message to all the outbound pipes. // Send the message to all the outbound pipes.
int send_to_all (class msg_t *msg_, int flags_); int send_to_all (zmq::msg_t *msg_, int flags_);
bool has_out (); bool has_out ();
...@@ -66,13 +69,13 @@ namespace zmq ...@@ -66,13 +69,13 @@ namespace zmq
// Write the message to the pipe. Make the pipe inactive if writing // Write the message to the pipe. Make the pipe inactive if writing
// fails. In such a case false is returned. // fails. In such a case false is returned.
bool write (class pipe_t *pipe_, class msg_t *msg_); bool write (zmq::pipe_t *pipe_, zmq::msg_t *msg_);
// Put the message to all active pipes. // Put the message to all active pipes.
void distribute (class msg_t *msg_, int flags_); void distribute (zmq::msg_t *msg_, int flags_);
// List of outbound pipes. // List of outbound pipes.
typedef array_t <class pipe_t, 2> pipes_t; typedef array_t <zmq::pipe_t, 2> pipes_t;
pipes_t pipes; pipes_t pipes;
// Number of all the pipes to send the next message to. // Number of all the pipes to send the next message to.
......
...@@ -33,6 +33,8 @@ ...@@ -33,6 +33,8 @@
namespace zmq namespace zmq
{ {
class session_base_t;
// Helper base class for encoders. It implements the state machine that // Helper base class for encoders. It implements the state machine that
// fills the outgoing buffer. Derived classes should implement individual // fills the outgoing buffer. Derived classes should implement individual
// state machine actions. // state machine actions.
...@@ -173,14 +175,14 @@ namespace zmq ...@@ -173,14 +175,14 @@ namespace zmq
encoder_t (size_t bufsize_); encoder_t (size_t bufsize_);
~encoder_t (); ~encoder_t ();
void set_session (class session_base_t *session_); void set_session (zmq::session_base_t *session_);
private: private:
bool size_ready (); bool size_ready ();
bool message_ready (); bool message_ready ();
class session_base_t *session; zmq::session_base_t *session;
msg_t in_progress; msg_t in_progress;
unsigned char tmpbuf [10]; unsigned char tmpbuf [10];
......
...@@ -36,6 +36,8 @@ ...@@ -36,6 +36,8 @@
namespace zmq namespace zmq
{ {
struct i_poll_events;
// This class implements socket polling mechanism using the Linux-specific // This class implements socket polling mechanism using the Linux-specific
// epoll mechanism. // epoll mechanism.
...@@ -49,7 +51,7 @@ namespace zmq ...@@ -49,7 +51,7 @@ namespace zmq
~epoll_t (); ~epoll_t ();
// "poller" concept. // "poller" concept.
handle_t add_fd (fd_t fd_, struct i_poll_events *events_); handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
...@@ -73,7 +75,7 @@ namespace zmq ...@@ -73,7 +75,7 @@ namespace zmq
{ {
fd_t fd; fd_t fd;
epoll_event ev; epoll_event ev;
struct i_poll_events *events; zmq::i_poll_events *events;
}; };
// List of retired event sources. // List of retired event sources.
......
...@@ -25,6 +25,8 @@ ...@@ -25,6 +25,8 @@
namespace zmq namespace zmq
{ {
class io_thread_t;
// Abstract interface to be implemented by various engines. // Abstract interface to be implemented by various engines.
struct i_engine struct i_engine
...@@ -32,7 +34,7 @@ namespace zmq ...@@ -32,7 +34,7 @@ namespace zmq
virtual ~i_engine () {} virtual ~i_engine () {}
// Plug the engine to the session. // Plug the engine to the session.
virtual void plug (class io_thread_t *io_thread_, virtual void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_) = 0; class session_base_t *session_) = 0;
// Unplug the engine from the session. // Unplug the engine from the session.
......
...@@ -31,6 +31,8 @@ ...@@ -31,6 +31,8 @@
namespace zmq namespace zmq
{ {
class io_thread_t;
// Simple base class for objects that live in I/O threads. // Simple base class for objects that live in I/O threads.
// It makes communication with the poller object easier and // It makes communication with the poller object easier and
// makes defining unneeded event handlers unnecessary. // makes defining unneeded event handlers unnecessary.
...@@ -39,12 +41,12 @@ namespace zmq ...@@ -39,12 +41,12 @@ namespace zmq
{ {
public: public:
io_object_t (class io_thread_t *io_thread_ = NULL); io_object_t (zmq::io_thread_t *io_thread_ = NULL);
~io_object_t (); ~io_object_t ();
// When migrating an object from one I/O thread to another, first // When migrating an object from one I/O thread to another, first
// unplug it, then migrate it, then plug it to the new thread. // unplug it, then migrate it, then plug it to the new thread.
void plug (class io_thread_t *io_thread_); void plug (zmq::io_thread_t *io_thread_);
void unplug (); void unplug ();
protected: protected:
......
...@@ -33,6 +33,8 @@ ...@@ -33,6 +33,8 @@
namespace zmq namespace zmq
{ {
class ctx_t;
// Generic part of the I/O thread. Polling-mechanism-specific features // Generic part of the I/O thread. Polling-mechanism-specific features
// are implemented in separate "polling objects". // are implemented in separate "polling objects".
...@@ -40,7 +42,7 @@ namespace zmq ...@@ -40,7 +42,7 @@ namespace zmq
{ {
public: public:
io_thread_t (class ctx_t *ctx_, uint32_t tid_); io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);
// Clean-up. If the thread was started, it's neccessary to call 'stop' // Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up. // before invoking destructor. Otherwise the destructor would hang up.
......
...@@ -34,14 +34,17 @@ ...@@ -34,14 +34,17 @@
namespace zmq namespace zmq
{ {
class io_thread_t;
class session_base_t;
class ipc_connecter_t : public own_t, public io_object_t class ipc_connecter_t : public own_t, public io_object_t
{ {
public: public:
// If 'delay' is true connecter first waits for a while, then starts // If 'delay' is true connecter first waits for a while, then starts
// connection process. // connection process.
ipc_connecter_t (class io_thread_t *io_thread_, ipc_connecter_t (zmq::io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_, zmq::session_base_t *session_, const options_t &options_,
const char *address_, bool delay_); const char *address_, bool delay_);
~ipc_connecter_t (); ~ipc_connecter_t ();
...@@ -101,7 +104,7 @@ namespace zmq ...@@ -101,7 +104,7 @@ namespace zmq
bool wait; bool wait;
// Reference to the session we belong to. // Reference to the session we belong to.
class session_base_t *session; zmq::session_base_t *session;
// Current reconnect ivl, updated for backoff strategy // Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl; int current_reconnect_ivl;
......
...@@ -35,12 +35,15 @@ ...@@ -35,12 +35,15 @@
namespace zmq namespace zmq
{ {
class io_thread_t;
class socket_base_t;
class ipc_listener_t : public own_t, public io_object_t class ipc_listener_t : public own_t, public io_object_t
{ {
public: public:
ipc_listener_t (class io_thread_t *io_thread_, ipc_listener_t (zmq::io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_); zmq::socket_base_t *socket_, const options_t &options_);
~ipc_listener_t (); ~ipc_listener_t ();
// Set address to listen on. // Set address to listen on.
...@@ -76,7 +79,7 @@ namespace zmq ...@@ -76,7 +79,7 @@ namespace zmq
handle_t handle; handle_t handle;
// Socket the listerner belongs to. // Socket the listerner belongs to.
class socket_base_t *socket; zmq::socket_base_t *socket;
ipc_listener_t (const ipc_listener_t&); ipc_listener_t (const ipc_listener_t&);
const ipc_listener_t &operator = (const ipc_listener_t&); const ipc_listener_t &operator = (const ipc_listener_t&);
......
...@@ -35,6 +35,8 @@ ...@@ -35,6 +35,8 @@
namespace zmq namespace zmq
{ {
struct i_poll_events;
// Implements socket polling mechanism using the BSD-specific // Implements socket polling mechanism using the BSD-specific
// kqueue interface. // kqueue interface.
...@@ -48,7 +50,7 @@ namespace zmq ...@@ -48,7 +50,7 @@ namespace zmq
~kqueue_t (); ~kqueue_t ();
// "poller" concept. // "poller" concept.
handle_t add_fd (fd_t fd_, struct i_poll_events *events_); handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
...@@ -79,7 +81,7 @@ namespace zmq ...@@ -79,7 +81,7 @@ namespace zmq
fd_t fd; fd_t fd;
bool flag_pollin; bool flag_pollin;
bool flag_pollout; bool flag_pollout;
i_poll_events *reactor; zmq::i_poll_events *reactor;
}; };
// List of retired event sources. // List of retired event sources.
......
...@@ -48,7 +48,7 @@ namespace zmq ...@@ -48,7 +48,7 @@ namespace zmq
private: private:
// List of outbound pipes. // List of outbound pipes.
typedef array_t <class pipe_t, 2> pipes_t; typedef array_t <pipe_t, 2> pipes_t;
pipes_t pipes; pipes_t pipes;
// Number of active pipes. All the active pipes are located at the // Number of active pipes. All the active pipes are located at the
......
...@@ -29,6 +29,8 @@ ...@@ -29,6 +29,8 @@
namespace zmq namespace zmq
{ {
class pipe_t;
// Multi-trie. Each node in the trie is a set of pointers to pipes. // Multi-trie. Each node in the trie is a set of pointers to pipes.
class mtrie_t class mtrie_t
...@@ -40,35 +42,35 @@ namespace zmq ...@@ -40,35 +42,35 @@ namespace zmq
// Add key to the trie. Returns true if it's a new subscription // Add key to the trie. Returns true if it's a new subscription
// rather than a duplicate. // rather than a duplicate.
bool add (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); bool add (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);
// Remove all subscriptions for a specific peer from the trie. // Remove all subscriptions for a specific peer from the trie.
// If there are no subscriptions left on some topics, invoke the // If there are no subscriptions left on some topics, invoke the
// supplied callback function. // supplied callback function.
void rm (class pipe_t *pipe_, void rm (zmq::pipe_t *pipe_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_), void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_); void *arg_);
// Remove specific subscription from the trie. Return true is it was // Remove specific subscription from the trie. Return true is it was
// actually removed rather than de-duplicated. // actually removed rather than de-duplicated.
bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); bool rm (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);
// Signal all the matching pipes. // Signal all the matching pipes.
void match (unsigned char *data_, size_t size_, void match (unsigned char *data_, size_t size_,
void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_); void (*func_) (zmq::pipe_t *pipe_, void *arg_), void *arg_);
private: private:
bool add_helper (unsigned char *prefix_, size_t size_, bool add_helper (unsigned char *prefix_, size_t size_,
class pipe_t *pipe_); zmq::pipe_t *pipe_);
void rm_helper (class pipe_t *pipe_, unsigned char **buff_, void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_,
size_t buffsize_, size_t maxbuffsize_, size_t buffsize_, size_t maxbuffsize_,
void (*func_) (unsigned char *data_, size_t size_, void *arg_), void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_); void *arg_);
bool rm_helper (unsigned char *prefix_, size_t size_, bool rm_helper (unsigned char *prefix_, size_t size_,
class pipe_t *pipe_); zmq::pipe_t *pipe_);
typedef std::set <class pipe_t*> pipes_t; typedef std::set <zmq::pipe_t*> pipes_t;
pipes_t pipes; pipes_t pipes;
unsigned char min; unsigned char min;
......
...@@ -26,6 +26,17 @@ ...@@ -26,6 +26,17 @@
namespace zmq namespace zmq
{ {
struct i_engine;
struct endpoint_t;
struct command_t;
class ctx_t;
class pipe_t;
class socket_base_t;
class session_base_t;
class io_thread_t;
class own_t;
// Base class for all objects that participate in inter-thread // Base class for all objects that participate in inter-thread
// communication. // communication.
...@@ -33,51 +44,51 @@ namespace zmq ...@@ -33,51 +44,51 @@ namespace zmq
{ {
public: public:
object_t (class ctx_t *ctx_, uint32_t tid_); object_t (zmq::ctx_t *ctx_, uint32_t tid_);
object_t (object_t *parent_); object_t (object_t *parent_);
virtual ~object_t (); virtual ~object_t ();
uint32_t get_tid (); uint32_t get_tid ();
ctx_t *get_ctx (); ctx_t *get_ctx ();
void process_command (struct command_t &cmd_); void process_command (zmq::command_t &cmd_);
protected: protected:
// Using following function, socket is able to access global // Using following function, socket is able to access global
// repository of inproc endpoints. // repository of inproc endpoints.
int register_endpoint (const char *addr_, struct endpoint_t &endpoint_); int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_);
void unregister_endpoints (class socket_base_t *socket_); void unregister_endpoints (zmq::socket_base_t *socket_);
struct endpoint_t find_endpoint (const char *addr_); zmq::endpoint_t find_endpoint (const char *addr_);
void destroy_socket (class socket_base_t *socket_); void destroy_socket (zmq::socket_base_t *socket_);
// Logs an message. // Logs an message.
void log (const char *format_, ...); void log (const char *format_, ...);
// Chooses least loaded I/O thread. // Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t affinity_); zmq::io_thread_t *choose_io_thread (uint64_t affinity_);
// Derived object can use these functions to send commands // Derived object can use these functions to send commands
// to other objects. // to other objects.
void send_stop (); void send_stop ();
void send_plug (class own_t *destination_, void send_plug (zmq::own_t *destination_,
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
void send_own (class own_t *destination_, void send_own (zmq::own_t *destination_,
class own_t *object_); zmq::own_t *object_);
void send_attach (class session_base_t *destination_, void send_attach (zmq::session_base_t *destination_,
struct i_engine *engine_, bool inc_seqnum_ = true); zmq::i_engine *engine_, bool inc_seqnum_ = true);
void send_bind (class own_t *destination_, class pipe_t *pipe_, void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_,
bool inc_seqnum_ = true); bool inc_seqnum_ = true);
void send_activate_read (class pipe_t *destination_); void send_activate_read (zmq::pipe_t *destination_);
void send_activate_write (class pipe_t *destination_, void send_activate_write (zmq::pipe_t *destination_,
uint64_t msgs_read_); uint64_t msgs_read_);
void send_hiccup (class pipe_t *destination_, void *pipe_); void send_hiccup (zmq::pipe_t *destination_, void *pipe_);
void send_pipe_term (class pipe_t *destination_); void send_pipe_term (zmq::pipe_t *destination_);
void send_pipe_term_ack (class pipe_t *destination_); void send_pipe_term_ack (zmq::pipe_t *destination_);
void send_term_req (class own_t *destination_, void send_term_req (zmq::own_t *destination_,
class own_t *object_); zmq::own_t *object_);
void send_term (class own_t *destination_, int linger_); void send_term (zmq::own_t *destination_, int linger_);
void send_term_ack (class own_t *destination_); void send_term_ack (zmq::own_t *destination_);
void send_reap (class socket_base_t *socket_); void send_reap (zmq::socket_base_t *socket_);
void send_reaped (); void send_reaped ();
void send_done (); void send_done ();
...@@ -85,18 +96,18 @@ namespace zmq ...@@ -85,18 +96,18 @@ namespace zmq
// called when command arrives from another thread. // called when command arrives from another thread.
virtual void process_stop (); virtual void process_stop ();
virtual void process_plug (); virtual void process_plug ();
virtual void process_own (class own_t *object_); virtual void process_own (zmq::own_t *object_);
virtual void process_attach (struct i_engine *engine_); virtual void process_attach (zmq::i_engine *engine_);
virtual void process_bind (class pipe_t *pipe_); virtual void process_bind (zmq::pipe_t *pipe_);
virtual void process_activate_read (); virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_); virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_hiccup (void *pipe_); virtual void process_hiccup (void *pipe_);
virtual void process_pipe_term (); virtual void process_pipe_term ();
virtual void process_pipe_term_ack (); virtual void process_pipe_term_ack ();
virtual void process_term_req (class own_t *object_); virtual void process_term_req (zmq::own_t *object_);
virtual void process_term (int linger_); virtual void process_term (int linger_);
virtual void process_term_ack (); virtual void process_term_ack ();
virtual void process_reap (class socket_base_t *socket_); virtual void process_reap (zmq::socket_base_t *socket_);
virtual void process_reaped (); virtual void process_reaped ();
// Special handler called after a command that requires a seqnum // Special handler called after a command that requires a seqnum
...@@ -107,7 +118,7 @@ namespace zmq ...@@ -107,7 +118,7 @@ namespace zmq
private: private:
// Context provides access to the global state. // Context provides access to the global state.
class ctx_t *ctx; zmq::ctx_t *ctx;
// Thread ID of the thread the object belongs to. // Thread ID of the thread the object belongs to.
uint32_t tid; uint32_t tid;
......
...@@ -32,6 +32,9 @@ ...@@ -32,6 +32,9 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class io_thread_t;
// Base class for objects forming a part of ownership hierarchy. // Base class for objects forming a part of ownership hierarchy.
// It handles initialisation and destruction of such objects. // It handles initialisation and destruction of such objects.
...@@ -44,10 +47,10 @@ namespace zmq ...@@ -44,10 +47,10 @@ namespace zmq
// The object is not living within an I/O thread. It has it's own // The object is not living within an I/O thread. It has it's own
// thread outside of 0MQ infrastructure. // thread outside of 0MQ infrastructure.
own_t (class ctx_t *parent_, uint32_t tid_); own_t (zmq::ctx_t *parent_, uint32_t tid_);
// The object is living within I/O thread. // The object is living within I/O thread.
own_t (class io_thread_t *io_thread_, const options_t &options_); own_t (zmq::io_thread_t *io_thread_, const options_t &options_);
// When another owned object wants to send command to this object // When another owned object wants to send command to this object
// it calls this function to let it know it should not shut down // it calls this function to let it know it should not shut down
......
...@@ -28,27 +28,32 @@ ...@@ -28,27 +28,32 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class msg_t;
class pipe_t;
class io_thread_t;
class pair_t : class pair_t :
public socket_base_t public socket_base_t
{ {
public: public:
pair_t (class ctx_t *parent_, uint32_t tid_); pair_t (zmq::ctx_t *parent_, uint32_t tid_);
~pair_t (); ~pair_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_); void xattach_pipe (zmq::pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (class pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xterminated (class pipe_t *pipe_); void xterminated (zmq::pipe_t *pipe_);
private: private:
class pipe_t *pipe; zmq::pipe_t *pipe;
pair_t (const pair_t&); pair_t (const pair_t&);
const pair_t &operator = (const pair_t&); const pair_t &operator = (const pair_t&);
...@@ -58,8 +63,8 @@ namespace zmq ...@@ -58,8 +63,8 @@ namespace zmq
{ {
public: public:
pair_session_t (class io_thread_t *io_thread_, bool connect_, pair_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~pair_session_t (); ~pair_session_t ();
......
...@@ -43,19 +43,22 @@ ...@@ -43,19 +43,22 @@
namespace zmq namespace zmq
{ {
class io_thread_t;
class session_base_t;
class pgm_receiver_t : public io_object_t, public i_engine class pgm_receiver_t : public io_object_t, public i_engine
{ {
public: public:
pgm_receiver_t (class io_thread_t *parent_, const options_t &options_); pgm_receiver_t (zmq::io_thread_t *parent_, const options_t &options_);
~pgm_receiver_t (); ~pgm_receiver_t ();
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_); zmq::session_base_t *session_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
...@@ -108,7 +111,7 @@ namespace zmq ...@@ -108,7 +111,7 @@ namespace zmq
options_t options; options_t options;
// Associated session. // Associated session.
class session_base_t *session; zmq::session_base_t *session;
// Most recently used decoder. // Most recently used decoder.
decoder_t *mru_decoder; decoder_t *mru_decoder;
......
...@@ -41,19 +41,22 @@ ...@@ -41,19 +41,22 @@
namespace zmq namespace zmq
{ {
class io_thread_t;
class session_base_t;
class pgm_sender_t : public io_object_t, public i_engine class pgm_sender_t : public io_object_t, public i_engine
{ {
public: public:
pgm_sender_t (class io_thread_t *parent_, const options_t &options_); pgm_sender_t (zmq::io_thread_t *parent_, const options_t &options_);
~pgm_sender_t (); ~pgm_sender_t ();
int init (bool udp_encapsulation_, const char *network_); int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_); zmq::session_base_t *session_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
......
...@@ -34,23 +34,26 @@ ...@@ -34,23 +34,26 @@
namespace zmq namespace zmq
{ {
class object_t;
class pipe_t;
// Create a pipepair for bi-directional transfer of messages. // Create a pipepair for bi-directional transfer of messages.
// First HWM is for messages passed from first pipe to the second pipe. // First HWM is for messages passed from first pipe to the second pipe.
// Second HWM is for messages passed from second pipe to the first pipe. // Second HWM is for messages passed from second pipe to the first pipe.
// Delay specifies how the pipe behaves when the peer terminates. If true // Delay specifies how the pipe behaves when the peer terminates. If true
// pipe receives all the pending messages before terminating, otherwise it // pipe receives all the pending messages before terminating, otherwise it
// terminates straight away. // terminates straight away.
int pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
int hwms_ [2], bool delays_ [2]); int hwms_ [2], bool delays_ [2]);
struct i_pipe_events struct i_pipe_events
{ {
virtual ~i_pipe_events () {} virtual ~i_pipe_events () {}
virtual void read_activated (class pipe_t *pipe_) = 0; virtual void read_activated (zmq::pipe_t *pipe_) = 0;
virtual void write_activated (class pipe_t *pipe_) = 0; virtual void write_activated (zmq::pipe_t *pipe_) = 0;
virtual void hiccuped (class pipe_t *pipe_) = 0; virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
virtual void terminated (class pipe_t *pipe_) = 0; virtual void terminated (zmq::pipe_t *pipe_) = 0;
}; };
// Note that pipe can be stored in three different arrays. // Note that pipe can be stored in three different arrays.
...@@ -64,8 +67,8 @@ namespace zmq ...@@ -64,8 +67,8 @@ namespace zmq
public array_item_t <3> public array_item_t <3>
{ {
// This allows pipepair to create pipe objects. // This allows pipepair to create pipe objects.
friend int pipepair (class object_t *parents_ [2], friend int pipepair (zmq::object_t *parents_ [2],
class pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]);
public: public:
......
...@@ -37,6 +37,8 @@ ...@@ -37,6 +37,8 @@
namespace zmq namespace zmq
{ {
struct i_poll_events;
// Implements socket polling mechanism using the POSIX.1-2001 // Implements socket polling mechanism using the POSIX.1-2001
// poll() system call. // poll() system call.
...@@ -50,7 +52,7 @@ namespace zmq ...@@ -50,7 +52,7 @@ namespace zmq
~poll_t (); ~poll_t ();
// "poller" concept. // "poller" concept.
handle_t add_fd (fd_t fd_, struct i_poll_events *events_); handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
...@@ -70,7 +72,7 @@ namespace zmq ...@@ -70,7 +72,7 @@ namespace zmq
struct fd_entry_t struct fd_entry_t
{ {
fd_t index; fd_t index;
struct i_poll_events *events; zmq::i_poll_events *events;
}; };
// This table stores data for registered descriptors. // This table stores data for registered descriptors.
......
...@@ -29,6 +29,8 @@ ...@@ -29,6 +29,8 @@
namespace zmq namespace zmq
{ {
struct i_poll_events;
class poller_base_t class poller_base_t
{ {
public: public:
...@@ -43,10 +45,10 @@ namespace zmq ...@@ -43,10 +45,10 @@ namespace zmq
// Add a timeout to expire in timeout_ milliseconds. After the // Add a timeout to expire in timeout_ milliseconds. After the
// expiration timer_event on sink_ object will be called with // expiration timer_event on sink_ object will be called with
// argument set to id_. // argument set to id_.
void add_timer (int timeout_, struct i_poll_events *sink_, int id_); void add_timer (int timeout_, zmq::i_poll_events *sink_, int id_);
// Cancel the timer created by sink_ object with ID equal to id_. // Cancel the timer created by sink_ object with ID equal to id_.
void cancel_timer (struct i_poll_events *sink_, int id_); void cancel_timer (zmq::i_poll_events *sink_, int id_);
protected: protected:
...@@ -65,7 +67,7 @@ namespace zmq ...@@ -65,7 +67,7 @@ namespace zmq
// List of active timers. // List of active timers.
struct timer_info_t struct timer_info_t
{ {
struct i_poll_events *sink; zmq::i_poll_events *sink;
int id; int id;
}; };
typedef std::multimap <uint64_t, timer_info_t> timers_t; typedef std::multimap <uint64_t, timer_info_t> timers_t;
......
...@@ -27,15 +27,20 @@ ...@@ -27,15 +27,20 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class io_thread_t;
class socket_base_t;
class msg_t;
class pub_t : public xpub_t class pub_t : public xpub_t
{ {
public: public:
pub_t (class ctx_t *parent_, uint32_t tid_); pub_t (zmq::ctx_t *parent_, uint32_t tid_);
~pub_t (); ~pub_t ();
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
int xrecv (class msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
private: private:
...@@ -48,8 +53,8 @@ namespace zmq ...@@ -48,8 +53,8 @@ namespace zmq
{ {
public: public:
pub_session_t (class io_thread_t *io_thread_, bool connect_, pub_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~pub_session_t (); ~pub_session_t ();
......
...@@ -29,22 +29,27 @@ ...@@ -29,22 +29,27 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class pipe_t;
class msg_t;
class io_thread_t;
class pull_t : class pull_t :
public socket_base_t public socket_base_t
{ {
public: public:
pull_t (class ctx_t *parent_, uint32_t tid_); pull_t (zmq::ctx_t *parent_, uint32_t tid_);
~pull_t (); ~pull_t ();
protected: protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_); void xattach_pipe (zmq::pipe_t *pipe_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
void xread_activated (class pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xterminated (class pipe_t *pipe_); void xterminated (zmq::pipe_t *pipe_);
private: private:
...@@ -60,8 +65,8 @@ namespace zmq ...@@ -60,8 +65,8 @@ namespace zmq
{ {
public: public:
pull_session_t (class io_thread_t *io_thread_, bool connect_, pull_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~pull_session_t (); ~pull_session_t ();
......
...@@ -29,22 +29,27 @@ ...@@ -29,22 +29,27 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class pipe_t;
class msg_t;
class io_thread_t;
class push_t : class push_t :
public socket_base_t public socket_base_t
{ {
public: public:
push_t (class ctx_t *parent_, uint32_t tid_); push_t (zmq::ctx_t *parent_, uint32_t tid_);
~push_t (); ~push_t ();
protected: protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_); void xattach_pipe (zmq::pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
void xwrite_activated (class pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xterminated (class pipe_t *pipe_); void xterminated (zmq::pipe_t *pipe_);
private: private:
...@@ -59,8 +64,8 @@ namespace zmq ...@@ -59,8 +64,8 @@ namespace zmq
{ {
public: public:
push_session_t (class io_thread_t *io_thread_, bool connect_, push_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~push_session_t (); ~push_session_t ();
......
...@@ -29,11 +29,14 @@ ...@@ -29,11 +29,14 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class socket_base_t;
class reaper_t : public object_t, public i_poll_events class reaper_t : public object_t, public i_poll_events
{ {
public: public:
reaper_t (class ctx_t *ctx_, uint32_t tid_); reaper_t (zmq::ctx_t *ctx_, uint32_t tid_);
~reaper_t (); ~reaper_t ();
mailbox_t *get_mailbox (); mailbox_t *get_mailbox ();
...@@ -50,7 +53,7 @@ namespace zmq ...@@ -50,7 +53,7 @@ namespace zmq
// Command handlers. // Command handlers.
void process_stop (); void process_stop ();
void process_reap (class socket_base_t *socket_); void process_reap (zmq::socket_base_t *socket_);
void process_reaped (); void process_reaped ();
// Reaper thread accesses incoming commands via this mailbox. // Reaper thread accesses incoming commands via this mailbox.
......
...@@ -27,16 +27,21 @@ ...@@ -27,16 +27,21 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class msg_t;
class io_thread_t;
class socket_base_t;
class rep_t : public xrep_t class rep_t : public xrep_t
{ {
public: public:
rep_t (class ctx_t *parent_, uint32_t tid_); rep_t (zmq::ctx_t *parent_, uint32_t tid_);
~rep_t (); ~rep_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
int xsend (class msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
...@@ -59,8 +64,8 @@ namespace zmq ...@@ -59,8 +64,8 @@ namespace zmq
{ {
public: public:
rep_session_t (class io_thread_t *io_thread_, bool connect_, rep_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~rep_session_t (); ~rep_session_t ();
......
...@@ -29,16 +29,21 @@ ...@@ -29,16 +29,21 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class msg_t;
class io_thread_t;
class socket_base_t;
class req_t : public xreq_t class req_t : public xreq_t
{ {
public: public:
req_t (class ctx_t *parent_, uint32_t tid_); req_t (zmq::ctx_t *parent_, uint32_t tid_);
~req_t (); ~req_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
int xsend (class msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
...@@ -60,8 +65,8 @@ namespace zmq ...@@ -60,8 +65,8 @@ namespace zmq
{ {
public: public:
req_session_t (class io_thread_t *io_thread_, bool connect_, req_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~req_session_t (); ~req_session_t ();
......
...@@ -47,6 +47,8 @@ ...@@ -47,6 +47,8 @@
namespace zmq namespace zmq
{ {
struct i_poll_events;
// Implements socket polling mechanism using POSIX.1-2001 select() // Implements socket polling mechanism using POSIX.1-2001 select()
// function. // function.
...@@ -60,7 +62,7 @@ namespace zmq ...@@ -60,7 +62,7 @@ namespace zmq
~select_t (); ~select_t ();
// "poller" concept. // "poller" concept.
handle_t add_fd (fd_t fd_, struct i_poll_events *events_); handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_);
void rm_fd (handle_t handle_); void rm_fd (handle_t handle_);
void set_pollin (handle_t handle_); void set_pollin (handle_t handle_);
void reset_pollin (handle_t handle_); void reset_pollin (handle_t handle_);
...@@ -80,7 +82,7 @@ namespace zmq ...@@ -80,7 +82,7 @@ namespace zmq
struct fd_entry_t struct fd_entry_t
{ {
fd_t fd; fd_t fd;
struct i_poll_events *events; zmq::i_poll_events *events;
}; };
// Checks if an fd_entry_t is retired. // Checks if an fd_entry_t is retired.
......
...@@ -26,13 +26,17 @@ ...@@ -26,13 +26,17 @@
#include <string> #include <string>
#include "own.hpp" #include "own.hpp"
#include "i_engine.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "pipe.hpp" #include "pipe.hpp"
namespace zmq namespace zmq
{ {
class pipe_t;
class io_thread_t;
class socket_base_t;
struct i_engine;
class session_base_t : class session_base_t :
public own_t, public own_t,
public io_object_t, public io_object_t,
...@@ -41,13 +45,13 @@ namespace zmq ...@@ -41,13 +45,13 @@ namespace zmq
public: public:
// Create a session of the particular type. // Create a session of the particular type.
static session_base_t *create (class io_thread_t *io_thread_, static session_base_t *create (zmq::io_thread_t *io_thread_,
bool connect_, class socket_base_t *socket_, bool connect_, zmq::socket_base_t *socket_,
const options_t &options_, const char *protocol_, const options_t &options_, const char *protocol_,
const char *address_); const char *address_);
// To be used once only, when creating the session. // To be used once only, when creating the session.
void attach_pipe (class pipe_t *pipe_); void attach_pipe (zmq::pipe_t *pipe_);
// Following functions are the interface exposed towards the engine. // Following functions are the interface exposed towards the engine.
virtual int read (msg_t *msg_); virtual int read (msg_t *msg_);
...@@ -56,15 +60,15 @@ namespace zmq ...@@ -56,15 +60,15 @@ namespace zmq
void detach (); void detach ();
// i_pipe_events interface implementation. // i_pipe_events interface implementation.
void read_activated (class pipe_t *pipe_); void read_activated (zmq::pipe_t *pipe_);
void write_activated (class pipe_t *pipe_); void write_activated (zmq::pipe_t *pipe_);
void hiccuped (class pipe_t *pipe_); void hiccuped (zmq::pipe_t *pipe_);
void terminated (class pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_);
protected: protected:
session_base_t (class io_thread_t *io_thread_, bool connect_, session_base_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~session_base_t (); ~session_base_t ();
...@@ -76,7 +80,7 @@ namespace zmq ...@@ -76,7 +80,7 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
void process_attach (struct i_engine *engine_); void process_attach (zmq::i_engine *engine_);
void process_term (int linger_); void process_term (int linger_);
// i_poll_events handlers. // i_poll_events handlers.
...@@ -94,7 +98,7 @@ namespace zmq ...@@ -94,7 +98,7 @@ namespace zmq
bool connect; bool connect;
// Pipe connecting the session to its socket. // Pipe connecting the session to its socket.
class pipe_t *pipe; zmq::pipe_t *pipe;
// This flag is true if the remainder of the message being processed // This flag is true if the remainder of the message being processed
// is still in the in pipe. // is still in the in pipe.
...@@ -105,14 +109,14 @@ namespace zmq ...@@ -105,14 +109,14 @@ namespace zmq
bool pending; bool pending;
// The protocol I/O engine connected to the session. // The protocol I/O engine connected to the session.
struct i_engine *engine; zmq::i_engine *engine;
// The socket the session belongs to. // The socket the session belongs to.
class socket_base_t *socket; zmq::socket_base_t *socket;
// I/O thread the session is living in. It will be used to plug in // I/O thread the session is living in. It will be used to plug in
// the engines into the same thread. // the engines into the same thread.
class io_thread_t *io_thread; zmq::io_thread_t *io_thread;
// ID of the linger timer // ID of the linger timer
enum {linger_timer_id = 0x20}; enum {linger_timer_id = 0x20};
......
...@@ -38,6 +38,10 @@ ...@@ -38,6 +38,10 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class msg_t;
class pipe_t;
class socket_base_t : class socket_base_t :
public own_t, public own_t,
public array_item_t <>, public array_item_t <>,
...@@ -52,7 +56,7 @@ namespace zmq ...@@ -52,7 +56,7 @@ namespace zmq
bool check_tag (); bool check_tag ();
// Create a socket of a specified type. // Create a socket of a specified type.
static socket_base_t *create (int type_, class ctx_t *parent_, static socket_base_t *create (int type_, zmq::ctx_t *parent_,
uint32_t tid_); uint32_t tid_);
// Returns the mailbox associated with this socket. // Returns the mailbox associated with this socket.
...@@ -67,8 +71,8 @@ namespace zmq ...@@ -67,8 +71,8 @@ namespace zmq
int getsockopt (int option_, void *optval_, size_t *optvallen_); int getsockopt (int option_, void *optval_, size_t *optvallen_);
int bind (const char *addr_); int bind (const char *addr_);
int connect (const char *addr_); int connect (const char *addr_);
int send (class msg_t *msg_, int flags_); int send (zmq::msg_t *msg_, int flags_);
int recv (class msg_t *msg_, int flags_); int recv (zmq::msg_t *msg_, int flags_);
int close (); int close ();
// These functions are used by the polling mechanism to determine // These functions are used by the polling mechanism to determine
...@@ -94,12 +98,12 @@ namespace zmq ...@@ -94,12 +98,12 @@ namespace zmq
protected: protected:
socket_base_t (class ctx_t *parent_, uint32_t tid_); socket_base_t (zmq::ctx_t *parent_, uint32_t tid_);
virtual ~socket_base_t (); virtual ~socket_base_t ();
// Concrete algorithms for the x- methods are to be defined by // Concrete algorithms for the x- methods are to be defined by
// individual socket types. // individual socket types.
virtual void xattach_pipe (class pipe_t *pipe_) = 0; virtual void xattach_pipe (zmq::pipe_t *pipe_) = 0;
// The default implementation assumes there are no specific socket // The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, overload this // options for the particular socket type. If not so, overload this
...@@ -109,11 +113,11 @@ namespace zmq ...@@ -109,11 +113,11 @@ namespace zmq
// The default implementation assumes that send is not supported. // The default implementation assumes that send is not supported.
virtual bool xhas_out (); virtual bool xhas_out ();
virtual int xsend (class msg_t *msg_, int flags_); virtual int xsend (zmq::msg_t *msg_, int flags_);
// The default implementation assumes that recv in not supported. // The default implementation assumes that recv in not supported.
virtual bool xhas_in (); virtual bool xhas_in ();
virtual int xrecv (class msg_t *msg_, int flags_); virtual int xrecv (zmq::msg_t *msg_, int flags_);
// i_pipe_events will be forwarded to these functions. // i_pipe_events will be forwarded to these functions.
virtual void xread_activated (pipe_t *pipe_); virtual void xread_activated (pipe_t *pipe_);
...@@ -154,7 +158,7 @@ namespace zmq ...@@ -154,7 +158,7 @@ namespace zmq
int check_protocol (const std::string &protocol_); int check_protocol (const std::string &protocol_);
// Register the pipe with this socket. // Register the pipe with this socket.
void attach_pipe (class pipe_t *pipe_); void attach_pipe (zmq::pipe_t *pipe_);
// Processes commands sent to this socket (if any). If timeout is -1, // Processes commands sent to this socket (if any). If timeout is -1,
// returns only after at least one command was processed. // returns only after at least one command was processed.
...@@ -164,7 +168,7 @@ namespace zmq ...@@ -164,7 +168,7 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_stop (); void process_stop ();
void process_bind (class pipe_t *pipe_); void process_bind (zmq::pipe_t *pipe_);
void process_unplug (); void process_unplug ();
void process_term (int linger_); void process_term (int linger_);
......
...@@ -34,6 +34,9 @@ ...@@ -34,6 +34,9 @@
namespace zmq namespace zmq
{ {
class io_thread_t;
class session_base_t;
// This engine handles any socket with SOCK_STREAM semantics, // This engine handles any socket with SOCK_STREAM semantics,
// e.g. TCP socket or an UNIX domain socket. // e.g. TCP socket or an UNIX domain socket.
...@@ -45,8 +48,8 @@ namespace zmq ...@@ -45,8 +48,8 @@ namespace zmq
~stream_engine_t (); ~stream_engine_t ();
// i_engine interface implementation. // i_engine interface implementation.
void plug (class io_thread_t *io_thread_, void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_); zmq::session_base_t *session_);
void unplug (); void unplug ();
void terminate (); void terminate ();
void activate_in (); void activate_in ();
...@@ -86,10 +89,10 @@ namespace zmq ...@@ -86,10 +89,10 @@ namespace zmq
encoder_t encoder; encoder_t encoder;
// The session this engine is attached to. // The session this engine is attached to.
class session_base_t *session; zmq::session_base_t *session;
// Detached transient session. // Detached transient session.
class session_base_t *leftover_session; zmq::session_base_t *leftover_session;
options_t options; options_t options;
......
...@@ -27,17 +27,22 @@ ...@@ -27,17 +27,22 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class msg_t;
class io_thread_t;
class socket_base_t;
class sub_t : public xsub_t class sub_t : public xsub_t
{ {
public: public:
sub_t (class ctx_t *parent_, uint32_t tid_); sub_t (zmq::ctx_t *parent_, uint32_t tid_);
~sub_t (); ~sub_t ();
protected: protected:
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (class msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
private: private:
...@@ -50,8 +55,8 @@ namespace zmq ...@@ -50,8 +55,8 @@ namespace zmq
{ {
public: public:
sub_session_t (class io_thread_t *io_thread_, bool connect_, sub_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~sub_session_t (); ~sub_session_t ();
......
...@@ -31,14 +31,17 @@ ...@@ -31,14 +31,17 @@
namespace zmq namespace zmq
{ {
class io_thread_t;
class session_base_t;
class tcp_connecter_t : public own_t, public io_object_t class tcp_connecter_t : public own_t, public io_object_t
{ {
public: public:
// If 'delay' is true connecter first waits for a while, then starts // If 'delay' is true connecter first waits for a while, then starts
// connection process. // connection process.
tcp_connecter_t (class io_thread_t *io_thread_, tcp_connecter_t (zmq::io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_, zmq::session_base_t *session_, const options_t &options_,
const char *address_, bool delay_); const char *address_, bool delay_);
~tcp_connecter_t (); ~tcp_connecter_t ();
...@@ -98,7 +101,7 @@ namespace zmq ...@@ -98,7 +101,7 @@ namespace zmq
bool wait; bool wait;
// Reference to the session we belong to. // Reference to the session we belong to.
class session_base_t *session; zmq::session_base_t *session;
// Current reconnect ivl, updated for backoff strategy // Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl; int current_reconnect_ivl;
......
...@@ -31,12 +31,15 @@ ...@@ -31,12 +31,15 @@
namespace zmq namespace zmq
{ {
class io_thread_t;
class socket_base_t;
class tcp_listener_t : public own_t, public io_object_t class tcp_listener_t : public own_t, public io_object_t
{ {
public: public:
tcp_listener_t (class io_thread_t *io_thread_, tcp_listener_t (zmq::io_thread_t *io_thread_,
class socket_base_t *socket_, const options_t &options_); zmq::socket_base_t *socket_, const options_t &options_);
~tcp_listener_t (); ~tcp_listener_t ();
// Set address to listen on. // Set address to listen on.
...@@ -72,7 +75,7 @@ namespace zmq ...@@ -72,7 +75,7 @@ namespace zmq
handle_t handle; handle_t handle;
// Socket the listerner belongs to. // Socket the listerner belongs to.
class socket_base_t *socket; zmq::socket_base_t *socket;
tcp_listener_t (const tcp_listener_t&); tcp_listener_t (const tcp_listener_t&);
const tcp_listener_t &operator = (const tcp_listener_t&); const tcp_listener_t &operator = (const tcp_listener_t&);
......
...@@ -33,23 +33,28 @@ ...@@ -33,23 +33,28 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class msg_t;
class pipe_t;
class io_thread_t;
class xpub_t : class xpub_t :
public socket_base_t public socket_base_t
{ {
public: public:
xpub_t (class ctx_t *parent_, uint32_t tid_); xpub_t (zmq::ctx_t *parent_, uint32_t tid_);
~xpub_t (); ~xpub_t ();
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_); void xattach_pipe (zmq::pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
void xread_activated (class pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xterminated (class pipe_t *pipe_); void xterminated (zmq::pipe_t *pipe_);
private: private:
...@@ -59,7 +64,7 @@ namespace zmq ...@@ -59,7 +64,7 @@ namespace zmq
void *arg_); void *arg_);
// Function to be applied to each matching pipes. // Function to be applied to each matching pipes.
static void mark_as_matching (class pipe_t *pipe_, void *arg_); static void mark_as_matching (zmq::pipe_t *pipe_, void *arg_);
// List of all subscriptions mapped to corresponding pipes. // List of all subscriptions mapped to corresponding pipes.
mtrie_t subscriptions; mtrie_t subscriptions;
...@@ -84,8 +89,8 @@ namespace zmq ...@@ -84,8 +89,8 @@ namespace zmq
{ {
public: public:
xpub_session_t (class io_thread_t *io_thread_, bool connect_, xpub_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~xpub_session_t (); ~xpub_session_t ();
......
...@@ -35,24 +35,27 @@ ...@@ -35,24 +35,27 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class pipe_t;
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class xrep_t : class xrep_t :
public socket_base_t public socket_base_t
{ {
public: public:
xrep_t (class ctx_t *parent_, uint32_t tid_); xrep_t (zmq::ctx_t *parent_, uint32_t tid_);
~xrep_t (); ~xrep_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_); void xattach_pipe (zmq::pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_); int xsend (msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (class pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xterminated (class pipe_t *pipe_); void xterminated (zmq::pipe_t *pipe_);
protected: protected:
...@@ -75,7 +78,7 @@ namespace zmq ...@@ -75,7 +78,7 @@ namespace zmq
struct outpipe_t struct outpipe_t
{ {
class pipe_t *pipe; zmq::pipe_t *pipe;
bool active; bool active;
}; };
...@@ -84,7 +87,7 @@ namespace zmq ...@@ -84,7 +87,7 @@ namespace zmq
outpipes_t outpipes; outpipes_t outpipes;
// The pipe we are currently writing to. // The pipe we are currently writing to.
class pipe_t *current_out; zmq::pipe_t *current_out;
// If true, more outgoing message parts are expected. // If true, more outgoing message parts are expected.
bool more_out; bool more_out;
...@@ -101,8 +104,8 @@ namespace zmq ...@@ -101,8 +104,8 @@ namespace zmq
{ {
public: public:
xrep_session_t (class io_thread_t *io_thread_, bool connect_, xrep_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~xrep_session_t (); ~xrep_session_t ();
......
...@@ -29,25 +29,31 @@ ...@@ -29,25 +29,31 @@
namespace zmq namespace zmq
{ {
class ctx_t;
class msg_t;
class pipe_t;
class io_thread_t;
class socket_base_t;
class xreq_t : class xreq_t :
public socket_base_t public socket_base_t
{ {
public: public:
xreq_t (class ctx_t *parent_, uint32_t tid_); xreq_t (zmq::ctx_t *parent_, uint32_t tid_);
~xreq_t (); ~xreq_t ();
protected: protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_); void xattach_pipe (zmq::pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_, int flags_);
int xrecv (class msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
void xread_activated (class pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xterminated (class pipe_t *pipe_); void xterminated (zmq::pipe_t *pipe_);
private: private:
...@@ -64,8 +70,8 @@ namespace zmq ...@@ -64,8 +70,8 @@ namespace zmq
{ {
public: public:
xreq_session_t (class io_thread_t *io_thread_, bool connect_, xreq_session_t (zmq::io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, zmq::socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~xreq_session_t (); ~xreq_session_t ();
......
...@@ -26,36 +26,39 @@ ...@@ -26,36 +26,39 @@
#include "dist.hpp" #include "dist.hpp"
#include "fq.hpp" #include "fq.hpp"
#include "trie.hpp" #include "trie.hpp"
#include "msg.hpp"
namespace zmq namespace zmq
{ {
class ctx_t;
class pipe_t;
class io_thread_t;
class xsub_t : class xsub_t :
public socket_base_t public socket_base_t
{ {
public: public:
xsub_t (class ctx_t *parent_, uint32_t tid_); xsub_t (zmq::ctx_t *parent_, uint32_t tid_);
~xsub_t (); ~xsub_t ();
protected: protected:
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (class pipe_t *pipe_); void xattach_pipe (zmq::pipe_t *pipe_);
int xsend (class msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_, int flags_);
bool xhas_out (); bool xhas_out ();
int xrecv (class msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
void xread_activated (class pipe_t *pipe_); void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (class pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xhiccuped (pipe_t *pipe_); void xhiccuped (pipe_t *pipe_);
void xterminated (class pipe_t *pipe_); void xterminated (zmq::pipe_t *pipe_);
private: private:
// Check whether the message matches at least one subscription. // Check whether the message matches at least one subscription.
bool match (class msg_t *msg_); bool match (zmq::msg_t *msg_);
// Function to be applied to the trie to send all the subsciptions // Function to be applied to the trie to send all the subsciptions
// upstream. // upstream.
...@@ -89,7 +92,7 @@ namespace zmq ...@@ -89,7 +92,7 @@ namespace zmq
public: public:
xsub_session_t (class io_thread_t *io_thread_, bool connect_, xsub_session_t (class io_thread_t *io_thread_, bool connect_,
class socket_base_t *socket_, const options_t &options_, socket_base_t *socket_, const options_t &options_,
const char *protocol_, const char *address_); const char *protocol_, const char *address_);
~xsub_session_t (); ~xsub_session_t ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment