Commit a8b410e6 authored by Martin Sustrik's avatar Martin Sustrik

lockfree interaction patter for 3 theads implemented

parent 0b5cc026
...@@ -38,7 +38,7 @@ namespace zmq ...@@ -38,7 +38,7 @@ namespace zmq
message_delimiter = 1 << ZMQ_DELIMITER message_delimiter = 1 << ZMQ_DELIMITER
}; };
class no_memory : public exception class no_memory : public std::exception
{ {
virtual const char *what () virtual const char *what ()
{ {
...@@ -46,7 +46,7 @@ namespace zmq ...@@ -46,7 +46,7 @@ namespace zmq
} }
}; };
class invalid_argument : public exception class invalid_argument : public std::exception
{ {
virtual const char *what () virtual const char *what ()
{ {
...@@ -54,7 +54,7 @@ namespace zmq ...@@ -54,7 +54,7 @@ namespace zmq
} }
}; };
class too_many_threads : public exception class too_many_threads : public std::exception
{ {
virtual const char *what () virtual const char *what ()
{ {
...@@ -62,7 +62,7 @@ namespace zmq ...@@ -62,7 +62,7 @@ namespace zmq
} }
}; };
class address_in_use : public exception class address_in_use : public std::exception
{ {
virtual const char *what () virtual const char *what ()
{ {
......
...@@ -7,14 +7,15 @@ libzmq_la_SOURCES = \ ...@@ -7,14 +7,15 @@ libzmq_la_SOURCES = \
atomic_ptr.hpp \ atomic_ptr.hpp \
command.hpp \ command.hpp \
config.hpp \ config.hpp \
context.hpp \
decoder.hpp \ decoder.hpp \
devpoll.hpp \ devpoll.hpp \
dispatcher.hpp \
encoder.hpp \ encoder.hpp \
epoll.hpp \ epoll.hpp \
err.hpp \ err.hpp \
fd.hpp \ fd.hpp \
fd_signaler.hpp \ fd_signaler.hpp \
io_object.hpp \
io_thread.hpp \ io_thread.hpp \
ip.hpp \ ip.hpp \
i_api.hpp \ i_api.hpp \
...@@ -31,6 +32,7 @@ libzmq_la_SOURCES = \ ...@@ -31,6 +32,7 @@ libzmq_la_SOURCES = \
poll.hpp \ poll.hpp \
select.hpp \ select.hpp \
simple_semaphore.hpp \ simple_semaphore.hpp \
socket_base.hpp \
stdint.hpp \ stdint.hpp \
tcp_connecter.hpp \ tcp_connecter.hpp \
tcp_listener.hpp \ tcp_listener.hpp \
...@@ -42,25 +44,29 @@ libzmq_la_SOURCES = \ ...@@ -42,25 +44,29 @@ libzmq_la_SOURCES = \
ypipe.hpp \ ypipe.hpp \
ypollset.hpp \ ypollset.hpp \
yqueue.hpp \ yqueue.hpp \
zmq_listener.hpp \
app_thread.cpp \ app_thread.cpp \
context.cpp \ devpoll.cpp \
devpoll.hpp \ dispatcher.cpp \
epoll.cpp \ epoll.cpp \
err.cpp \ err.cpp \
fd_signaler.cpp \ fd_signaler.cpp \
io_object.cpp \
io_thread.cpp \ io_thread.cpp \
ip.cpp \ ip.cpp \
kqueue.cpp \ kqueue.cpp \
object.cpp \ object.cpp \
poll.cpp \ poll.cpp \
select.cpp \ select.cpp \
socket_base.cpp \
tcp_connecter.cpp \ tcp_connecter.cpp \
tcp_listener.cpp \ tcp_listener.cpp \
tcp_socket.cpp \ tcp_socket.cpp \
thread.cpp \ thread.cpp \
uuid.cpp \ uuid.cpp \
ypollset.cpp \ ypollset.cpp \
zmq.cpp zmq.cpp \
zmq_listener.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0 libzmq_la_LDFLAGS = -version-info 0:0:0
libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@ libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <algorithm>
#include "../include/zmq.h" #include "../include/zmq.h"
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
...@@ -26,10 +28,12 @@ ...@@ -26,10 +28,12 @@
#endif #endif
#include "app_thread.hpp" #include "app_thread.hpp"
#include "context.hpp" #include "i_api.hpp"
#include "dispatcher.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "config.hpp" #include "config.hpp"
#include "socket_base.hpp"
// If the RDTSC is available we use it to prevent excessive // If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any // polling for commands. The nice thing here is that it will work on any
...@@ -39,8 +43,8 @@ ...@@ -39,8 +43,8 @@
#define ZMQ_DELAY_COMMANDS #define ZMQ_DELAY_COMMANDS
#endif #endif
zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
object_t (context_, thread_slot_), object_t (dispatcher_, thread_slot_),
tid (0), tid (0),
last_processing_time (0) last_processing_time (0)
{ {
...@@ -48,13 +52,9 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) : ...@@ -48,13 +52,9 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
zmq::app_thread_t::~app_thread_t () zmq::app_thread_t::~app_thread_t ()
{ {
// Ask all the sockets to start termination, then wait till it is complete. // Destroy all the sockets owned by this application thread.
for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
(*it)->stop ();
for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++) for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
delete *it; delete *it;
delete this;
} }
zmq::i_signaler *zmq::app_thread_t::get_signaler () zmq::i_signaler *zmq::app_thread_t::get_signaler ()
...@@ -123,9 +123,28 @@ void zmq::app_thread_t::process_commands (bool block_) ...@@ -123,9 +123,28 @@ void zmq::app_thread_t::process_commands (bool block_)
for (int i = 0; i != thread_slot_count (); i++) { for (int i = 0; i != thread_slot_count (); i++) {
if (signals & (ypollset_t::signals_t (1) << i)) { if (signals & (ypollset_t::signals_t (1) << i)) {
command_t cmd; command_t cmd;
while (context->read (i, get_thread_slot (), &cmd)) while (dispatcher->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);
} }
} }
} }
} }
zmq::i_api *zmq::app_thread_t::create_socket (int type_)
{
// TODO: type is ignored for the time being.
socket_base_t *s = new socket_base_t (this);
zmq_assert (s);
sockets.push_back (s);
return s;
}
void zmq::app_thread_t::remove_socket (i_api *socket_)
{
// TODO: To speed this up we can possibly use the system where each socket
// holds its index (see I/O scheduler implementation).
sockets_t::iterator it = std::find (sockets.begin (), sockets.end (),
socket_);
zmq_assert (it != sockets.end ());
sockets.erase (it);
}
...@@ -22,7 +22,6 @@ ...@@ -22,7 +22,6 @@
#include <vector> #include <vector>
#include "i_socket.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "object.hpp" #include "object.hpp"
#include "ypollset.hpp" #include "ypollset.hpp"
...@@ -34,7 +33,7 @@ namespace zmq ...@@ -34,7 +33,7 @@ namespace zmq
{ {
public: public:
app_thread_t (class context_t *context_, int thread_slot_); app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
~app_thread_t (); ~app_thread_t ();
...@@ -42,7 +41,7 @@ namespace zmq ...@@ -42,7 +41,7 @@ namespace zmq
i_signaler *get_signaler (); i_signaler *get_signaler ();
// Nota bene: Following two functions are accessed from different // Nota bene: Following two functions are accessed from different
// threads. The caller (context) is responsible for synchronisation // threads. The caller (dispatcher) is responsible for synchronisation
// of accesses. // of accesses.
// Returns true is current thread is associated with the app thread. // Returns true is current thread is associated with the app thread.
...@@ -56,10 +55,16 @@ namespace zmq ...@@ -56,10 +55,16 @@ namespace zmq
// set to true, returns only after at least one command was processed. // set to true, returns only after at least one command was processed.
void process_commands (bool block_); void process_commands (bool block_);
// Create a socket of a specified type.
struct i_api *create_socket (int type_);
// Unregister the socket from the app_thread (called by socket itself).
void remove_socket (struct i_api *socket_);
private: private:
// All the sockets created from this application thread. // All the sockets created from this application thread.
typedef std::vector <i_socket*> sockets_t; typedef std::vector <struct i_api*> sockets_t;
sockets_t sockets; sockets_t sockets;
// Thread ID associated with this slot. // Thread ID associated with this slot.
......
...@@ -35,60 +35,49 @@ namespace zmq ...@@ -35,60 +35,49 @@ namespace zmq
enum type_t enum type_t
{ {
stop, stop,
plug,
own,
bind, bind,
head, term_req,
tail, term,
reg, term_ack
reg_and_bind,
unreg,
engine,
terminate,
terminate_ack
} type; } type;
union { union {
// Sent to I/O thread to let it know that it should
// terminate itself.
struct { struct {
} stop; } stop;
// Sent to I/O object to make it register with its I/O thread.
struct { struct {
class pipe_reader_t *reader; } plug;
class session_t *peer;
} bind;
// Sent to socket to let it know about the newly created object.
struct { struct {
uint64_t bytes; class object_t *object;
} tail; } own;
// Sent between objects to establish pipe(s) between them.
struct { struct {
uint64_t bytes; } bind;
} head;
struct {
class simple_semaphore_t *smph;
} reg;
struct {
class session_t *peer;
bool flow_in;
bool flow_out;
} reg_and_bind;
struct {
class simple_semaphore_t *smph;
} unreg;
// TODO: Engine object won't be deallocated on terminal shutdown // Sent by I/O object ot the socket to request the shutdown of
// while the command is still on the fly! // the I/O object.
struct { struct {
class i_engine *engine; class object_t *object;
} engine; } term_req;
// Sent by socket to I/O object to start its shutdown.
struct { struct {
} terminate; } term;
// Sent by I/O object to the socket to acknowledge it has
// shut down.
struct { struct {
} terminate_ack; } term_ack;
} args; } args;
}; };
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include "context.hpp" #include "dispatcher.hpp"
#include "i_api.hpp" #include "i_api.hpp"
#include "app_thread.hpp" #include "app_thread.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
#include "windows.h" #include "windows.h"
#endif #endif
zmq::context_t::context_t (int app_threads_, int io_threads_) zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
{ {
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple // Intialise Windows sockets. Note that WSAStartup can be called multiple
...@@ -69,7 +69,7 @@ zmq::context_t::context_t (int app_threads_, int io_threads_) ...@@ -69,7 +69,7 @@ zmq::context_t::context_t (int app_threads_, int io_threads_)
io_threads [i]->start (); io_threads [i]->start ();
} }
zmq::context_t::~context_t () zmq::dispatcher_t::~dispatcher_t ()
{ {
// Close all application theads, sockets, io_objects etc. // Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
...@@ -93,12 +93,12 @@ zmq::context_t::~context_t () ...@@ -93,12 +93,12 @@ zmq::context_t::~context_t ()
#endif #endif
} }
int zmq::context_t::thread_slot_count () int zmq::dispatcher_t::thread_slot_count ()
{ {
return signalers.size (); return signalers.size ();
} }
zmq::i_api *zmq::context_t::create_socket (int type_) zmq::i_api *zmq::dispatcher_t::create_socket (int type_)
{ {
threads_sync.lock (); threads_sync.lock ();
app_thread_t *thread = choose_app_thread (); app_thread_t *thread = choose_app_thread ();
...@@ -106,16 +106,12 @@ zmq::i_api *zmq::context_t::create_socket (int type_) ...@@ -106,16 +106,12 @@ zmq::i_api *zmq::context_t::create_socket (int type_)
threads_sync.unlock (); threads_sync.unlock ();
return NULL; return NULL;
} }
zmq_assert (false);
i_api *s = NULL;
//i_api *s = thread->create_socket (type_);
threads_sync.unlock (); threads_sync.unlock ();
return s;
return thread->create_socket (type_);
} }
zmq::app_thread_t *zmq::context_t::choose_app_thread () zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{ {
// Check whether thread ID is already assigned. If so, return it. // Check whether thread ID is already assigned. If so, return it.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++) for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
...@@ -132,7 +128,7 @@ zmq::app_thread_t *zmq::context_t::choose_app_thread () ...@@ -132,7 +128,7 @@ zmq::app_thread_t *zmq::context_t::choose_app_thread ()
return NULL; return NULL;
} }
zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_) zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_)
{ {
zmq_assert (io_threads.size () > 0); zmq_assert (io_threads.size () > 0);
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__ #ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__
#define __ZMQ_CONTEXT_HPP_INCLUDED__ #define __ZMQ_DISPATCHER_HPP_INCLUDED__
#include <vector> #include <vector>
#include <map> #include <map>
...@@ -37,27 +37,27 @@ namespace zmq ...@@ -37,27 +37,27 @@ namespace zmq
// Dispatcher implements bidirectional thread-safe passing of commands // Dispatcher implements bidirectional thread-safe passing of commands
// between N threads. It consists of a ypipes to pass commands and // between N threads. It consists of a ypipes to pass commands and
// signalers to wake up the receiver thread when new commands are // signalers to wake up the receiver thread when new commands are
// available. Note that context is inefficient for passing messages // available. Note that dispatcher is inefficient for passing messages
// within a thread (sender thread = receiver thread). The optimisation is // within a thread (sender thread = receiver thread). The optimisation is
// not part of the class and should be implemented by individual threads // not part of the class and should be implemented by individual threads
// (presumably by calling the command handling function directly). // (presumably by calling the command handling function directly).
class context_t class dispatcher_t
{ {
public: public:
// Create the context object. Matrix of pipes to communicate between // Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate // each socket and each I/O thread is created along with appropriate
// signalers. // signalers.
context_t (int app_threads_, int io_threads_); dispatcher_t (int app_threads_, int io_threads_);
// To be called to terminate the whole infrastructure (zmq_term). // To be called to terminate the whole infrastructure (zmq_term).
~context_t (); ~dispatcher_t ();
// Create a socket. // Create a socket.
struct i_api *create_socket (int type_); struct i_api *create_socket (int type_);
// Returns number of thread slots in the context. To be used by // Returns number of thread slots in the dispatcher. To be used by
// individual threads to find out how many distinct signals can be // individual threads to find out how many distinct signals can be
// received. // received.
int thread_slot_count (); int thread_slot_count ();
...@@ -112,8 +112,8 @@ namespace zmq ...@@ -112,8 +112,8 @@ namespace zmq
// Synchronisation of accesses to shared thread data. // Synchronisation of accesses to shared thread data.
mutex_t threads_sync; mutex_t threads_sync;
context_t (const context_t&); dispatcher_t (const dispatcher_t&);
void operator = (const context_t&); void operator = (const dispatcher_t&);
}; };
} }
......
...@@ -80,6 +80,12 @@ namespace zmq ...@@ -80,6 +80,12 @@ namespace zmq
abort ();\ abort ();\
}} while (false) }} while (false)
// Provides convenient way to check for POSIX errors.
#define posix_assert(x) do {\
fprintf (stderr, "%s (%s:%d)\n", strerror (x), __FILE__, __LINE__);\
abort ();\
} while (false)
// Provides convenient way to check for errors from getaddrinfo. // Provides convenient way to check for errors from getaddrinfo.
#define gai_assert(x) do { if (x) {\ #define gai_assert(x) do { if (x) {\
const char *errstr = gai_strerror (x);\ const char *errstr = gai_strerror (x);\
......
/* /*
Copyright (c) 2007-2009 FastMQ Inc. Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ. This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under 0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or the Free Software Foundation; either version 3 of the License, or
(at your option) any later version. (at your option) any later version.
0MQ is distributed in the hope that it will be useful, 0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details. Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_I_API_HPP_INCLUDED__ #ifndef __ZMQ_I_API_HPP_INCLUDED__
...@@ -25,6 +25,8 @@ namespace zmq ...@@ -25,6 +25,8 @@ namespace zmq
struct i_api struct i_api
{ {
virtual ~i_api () {}
virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0; virtual int bind (const char *addr_, struct zmq_opts *opts_) = 0;
virtual int connect (const char *addr_, struct zmq_opts *opts_) = 0; virtual int connect (const char *addr_, struct zmq_opts *opts_) = 0;
virtual int subscribe (const char *criteria_) = 0; virtual int subscribe (const char *criteria_) = 0;
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "io_object.hpp"
zmq::io_object_t::io_object_t (object_t *parent_, object_t *owner_) :
object_t (parent_),
owner (owner_)
{
}
zmq::io_object_t::~io_object_t ()
{
}
void zmq::io_object_t::term ()
{
send_term_req (owner, this);
}
void zmq::io_object_t::process_term ()
{
send_term_ack (owner);
delete this;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__
#define __ZMQ_IO_OBJECT_HPP_INCLUDED__
#include "object.hpp"
namespace zmq
{
class io_object_t : public object_t
{
public:
// I/O object will live in the thread inherited from the parent.
// However, it's lifetime is managed by the owner.
io_object_t (object_t *parent_, object_t *owner_);
protected:
// Ask owner socket to terminate this I/O object. This may not happen
void term ();
// I/O object destroys itself. No point in allowing others to invoke
// the destructor. At the same time, it has to be virtual so that
// generic io_object deallocation mechanism destroys specific type
// of I/O object correctly.
virtual ~io_object_t ();
private:
// Handlers for incoming commands.
void process_term ();
// Socket owning this I/O object. It is responsible for destroying
// it when it's being closed.
object_t *owner;
io_object_t (const io_object_t&);
void operator = (const io_object_t&);
};
}
#endif
...@@ -29,11 +29,11 @@ ...@@ -29,11 +29,11 @@
#include "select.hpp" #include "select.hpp"
#include "devpoll.hpp" #include "devpoll.hpp"
#include "kqueue.hpp" #include "kqueue.hpp"
#include "context.hpp" #include "dispatcher.hpp"
#include "simple_semaphore.hpp" #include "simple_semaphore.hpp"
zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) : zmq::io_thread_t::io_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
object_t (context_, thread_slot_) object_t (dispatcher_, thread_slot_)
{ {
#if defined ZMQ_FORCE_SELECT #if defined ZMQ_FORCE_SELECT
poller = new select_t; poller = new select_t;
...@@ -115,7 +115,7 @@ void zmq::io_thread_t::in_event () ...@@ -115,7 +115,7 @@ void zmq::io_thread_t::in_event ()
// Read all the commands from particular thread. // Read all the commands from particular thread.
command_t cmd; command_t cmd;
while (context->read (source_thread_slot, thread_slot, &cmd)) while (dispatcher->read (source_thread_slot, thread_slot, &cmd))
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);
} }
} }
......
...@@ -37,7 +37,7 @@ namespace zmq ...@@ -37,7 +37,7 @@ namespace zmq
{ {
public: public:
io_thread_t (class context_t *context_, int thread_slot_); io_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop' // Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up. // before invoking destructor. Otherwise the destructor would hang up.
......
...@@ -72,43 +72,47 @@ namespace zmq ...@@ -72,43 +72,47 @@ namespace zmq
namespace zmq namespace zmq
{ {
class mutex_t class mutex_t
{ {
public: public:
inline mutex_t () inline mutex_t ()
{ {
int rc = pthread_mutex_init (&mutex, NULL); int rc = pthread_mutex_init (&mutex, NULL);
errno_assert (rc == 0); if (rc)
posix_assert (rc);
} }
inline ~mutex_t () inline ~mutex_t ()
{ {
int rc = pthread_mutex_destroy (&mutex); int rc = pthread_mutex_destroy (&mutex);
errno_assert (rc == 0); if (rc)
posix_assert (rc);
} }
inline void lock () inline void lock ()
{ {
int rc = pthread_mutex_lock (&mutex); int rc = pthread_mutex_lock (&mutex);
errno_assert (rc == 0); if (rc)
posix_assert (rc);
} }
inline void unlock () inline void unlock ()
{ {
int rc = pthread_mutex_unlock (&mutex); int rc = pthread_mutex_unlock (&mutex);
errno_assert (rc == 0); if (rc)
posix_assert (rc);
} }
private: private:
pthread_mutex_t mutex; pthread_mutex_t mutex;
// Disable copy construction and assignment. // Disable copy construction and assignment.
mutex_t (const mutex_t&); mutex_t (const mutex_t&);
void operator = (const mutex_t&); void operator = (const mutex_t&);
}; };
} }
#endif #endif
......
...@@ -18,19 +18,19 @@ ...@@ -18,19 +18,19 @@
*/ */
#include "object.hpp" #include "object.hpp"
#include "context.hpp" #include "dispatcher.hpp"
#include "err.hpp" #include "err.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "simple_semaphore.hpp" #include "simple_semaphore.hpp"
zmq::object_t::object_t (context_t *context_, int thread_slot_) : zmq::object_t::object_t (dispatcher_t *dispatcher_, int thread_slot_) :
context (context_), dispatcher (dispatcher_),
thread_slot (thread_slot_) thread_slot (thread_slot_)
{ {
} }
zmq::object_t::object_t (object_t *parent_) : zmq::object_t::object_t (object_t *parent_) :
context (parent_->context), dispatcher (parent_->dispatcher),
thread_slot (parent_->thread_slot) thread_slot (parent_->thread_slot)
{ {
} }
...@@ -41,7 +41,7 @@ zmq::object_t::~object_t () ...@@ -41,7 +41,7 @@ zmq::object_t::~object_t ()
int zmq::object_t::thread_slot_count () int zmq::object_t::thread_slot_count ()
{ {
return context->thread_slot_count (); return dispatcher->thread_slot_count ();
} }
int zmq::object_t::get_thread_slot () int zmq::object_t::get_thread_slot ()
...@@ -53,45 +53,32 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -53,45 +53,32 @@ void zmq::object_t::process_command (command_t &cmd_)
{ {
switch (cmd_.type) { switch (cmd_.type) {
case command_t::head: case command_t::stop:
process_head (cmd_.args.head.bytes); process_stop ();
break; break;
case command_t::tail: case command_t::plug:
process_tail (cmd_.args.tail.bytes); process_plug ();
break; return;
case command_t::engine: case command_t::own:
process_engine (cmd_.args.engine.engine); process_own (cmd_.args.own.object);
break; return;
case command_t::bind: case command_t::bind:
process_bind (cmd_.args.bind.reader, cmd_.args.bind.peer); process_bind ();
break; return;
case command_t::reg:
process_reg (cmd_.args.reg.smph);
break;
case command_t::reg_and_bind:
process_reg_and_bind (cmd_.args.reg_and_bind.peer,
cmd_.args.reg_and_bind.flow_in, cmd_.args.reg_and_bind.flow_out);
break;
case command_t::unreg:
process_unreg (cmd_.args.unreg.smph);
break;
case command_t::terminate:
process_terminate ();
break;
case command_t::terminate_ack: case command_t::term_req:
process_terminate_ack (); process_term_req (cmd_.args.term_req.object);
break; return;
case command_t::term:
process_term ();
return;
case command_t::stop: case command_t::term_ack:
process_stop (); process_term_ack ();
return; return;
default: default:
...@@ -101,7 +88,7 @@ void zmq::object_t::process_command (command_t &cmd_) ...@@ -101,7 +88,7 @@ void zmq::object_t::process_command (command_t &cmd_)
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_) zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{ {
return context->choose_io_thread (taskset_); return dispatcher->choose_io_thread (taskset_);
} }
void zmq::object_t::send_stop () void zmq::object_t::send_stop ()
...@@ -111,91 +98,56 @@ void zmq::object_t::send_stop () ...@@ -111,91 +98,56 @@ void zmq::object_t::send_stop ()
command_t cmd; command_t cmd;
cmd.destination = this; cmd.destination = this;
cmd.type = command_t::stop; cmd.type = command_t::stop;
context->write (thread_slot, thread_slot, cmd); dispatcher->write (thread_slot, thread_slot, cmd);
} }
void zmq::object_t::send_bind (object_t *destination_, pipe_reader_t *reader_, void zmq::object_t::send_plug (object_t *destination_)
session_t *peer_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::bind; cmd.type = command_t::plug;
cmd.args.bind.reader = reader_;
cmd.args.bind.peer = peer_;
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_head (object_t *destination_, uint64_t bytes_) void zmq::object_t::send_own (object_t *destination_, object_t *object_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::head; cmd.type = command_t::own;
cmd.args.head.bytes = bytes_; cmd.args.own.object = object_;
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_tail (object_t *destination_, uint64_t bytes_) void zmq::object_t::send_bind (object_t *destination_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::tail; cmd.type = command_t::bind;
cmd.args.tail.bytes = bytes_;
send_command (cmd);
}
void zmq::object_t::send_reg (object_t *destination_, simple_semaphore_t *smph_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::reg;
cmd.args.reg.smph = smph_;
send_command (cmd);
}
void zmq::object_t::send_reg_and_bind (object_t *destination_,
session_t *peer_, bool flow_in_, bool flow_out_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::reg_and_bind;
cmd.args.reg_and_bind.peer = peer_;
cmd.args.reg_and_bind.flow_in = flow_in_;
cmd.args.reg_and_bind.flow_out = flow_out_;
send_command (cmd);
}
void zmq::object_t::send_unreg (object_t *destination_,
simple_semaphore_t *smph_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::unreg;
cmd.args.unreg.smph = smph_;
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_engine (object_t *destination_, i_engine *engine_) void zmq::object_t::send_term_req (object_t *destination_, object_t *object_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::engine; cmd.type = command_t::term_req;
cmd.args.engine.engine = engine_; cmd.args.term_req.object = object_;
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_terminate (object_t *destination_) void zmq::object_t::send_term (object_t *destination_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::terminate; cmd.type = command_t::term;
send_command (cmd); send_command (cmd);
} }
void zmq::object_t::send_terminate_ack (object_t *destination_) void zmq::object_t::send_term_ack (object_t *destination_)
{ {
command_t cmd; command_t cmd;
cmd.destination = destination_; cmd.destination = destination_;
cmd.type = command_t::terminate_ack; cmd.type = command_t::term_ack;
send_command (cmd); send_command (cmd);
} }
...@@ -204,48 +156,32 @@ void zmq::object_t::process_stop () ...@@ -204,48 +156,32 @@ void zmq::object_t::process_stop ()
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_bind (pipe_reader_t *reader_, session_t *peer_) void zmq::object_t::process_plug ()
{
zmq_assert (false);
}
void zmq::object_t::process_head (uint64_t bytes_)
{
zmq_assert (false);
}
void zmq::object_t::process_tail (uint64_t bytes_)
{
zmq_assert (false);
}
void zmq::object_t::process_reg (simple_semaphore_t *smph_)
{ {
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_reg_and_bind (session_t *session_, void zmq::object_t::process_own (object_t *object_)
bool flow_in_, bool flow_out_)
{ {
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_unreg (simple_semaphore_t *smph_) void zmq::object_t::process_bind ()
{ {
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_engine (i_engine *engine_) void zmq::object_t::process_term_req (object_t *object_)
{ {
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_terminate () void zmq::object_t::process_term ()
{ {
zmq_assert (false); zmq_assert (false);
} }
void zmq::object_t::process_terminate_ack () void zmq::object_t::process_term_ack ()
{ {
zmq_assert (false); zmq_assert (false);
} }
...@@ -256,6 +192,6 @@ void zmq::object_t::send_command (command_t &cmd_) ...@@ -256,6 +192,6 @@ void zmq::object_t::send_command (command_t &cmd_)
if (destination_thread_slot == thread_slot) if (destination_thread_slot == thread_slot)
cmd_.destination->process_command (cmd_); cmd_.destination->process_command (cmd_);
else else
context->write (thread_slot, destination_thread_slot, cmd_); dispatcher->write (thread_slot, destination_thread_slot, cmd_);
} }
...@@ -32,7 +32,7 @@ namespace zmq ...@@ -32,7 +32,7 @@ namespace zmq
{ {
public: public:
object_t (class context_t *context_, int thread_slot_); object_t (class dispatcher_t *dispatcher_, int thread_slot_);
object_t (object_t *parent_); object_t (object_t *parent_);
~object_t (); ~object_t ();
...@@ -42,44 +42,32 @@ namespace zmq ...@@ -42,44 +42,32 @@ namespace zmq
protected: protected:
// Derived object can use following functions to interact with // Derived object can use following functions to interact with
// global repositories. See context.hpp for function details. // global repositories. See dispatcher.hpp for function details.
int thread_slot_count (); int thread_slot_count ();
class io_thread_t *choose_io_thread (uint64_t taskset_); class io_thread_t *choose_io_thread (uint64_t taskset_);
// Derived object can use these functions to send commands // Derived object can use these functions to send commands
// to other objects. // to other objects.
void send_stop (); void send_stop ();
void send_bind (object_t *destination_, class pipe_reader_t *reader_, void send_plug (object_t *destination_);
class session_t *peer_); void send_own (object_t *destination_, object_t *object_);
void send_head (object_t *destination_, uint64_t bytes_); void send_bind (object_t *destination_);
void send_tail (object_t *destination_, uint64_t bytes_); void send_term_req (object_t *destination_, object_t *object_);
void send_reg (object_t *destination_, void send_term (object_t *destination_);
class simple_semaphore_t *smph_); void send_term_ack (object_t *destination_);
void send_reg_and_bind (object_t *destination_, class session_t *peer_,
bool flow_in_, bool flow_out_);
void send_unreg (object_t *destination_,
class simple_semaphore_t *smph_);
void send_engine (object_t *destination_, struct i_engine *engine_);
void send_terminate (object_t *destination_);
void send_terminate_ack (object_t *destination_);
// These handlers can be overloaded by the derived objects. They are // These handlers can be overloaded by the derived objects. They are
// called when command arrives from another thread. // called when command arrives from another thread.
virtual void process_stop (); virtual void process_stop ();
virtual void process_bind (class pipe_reader_t *reader_, virtual void process_plug ();
class session_t *peer_); virtual void process_own (object_t *object_);
virtual void process_head (uint64_t bytes_); virtual void process_bind ();
virtual void process_tail (uint64_t bytes_); virtual void process_term_req (object_t *object_);
virtual void process_reg (class simple_semaphore_t *smph_); virtual void process_term ();
virtual void process_reg_and_bind (class session_t *peer_, virtual void process_term_ack ();
bool flow_in_, bool flow_out_);
virtual void process_unreg (class simple_semaphore_t *smph_);
virtual void process_engine (struct i_engine *engine_);
virtual void process_terminate ();
virtual void process_terminate_ack ();
// Pointer to the root of the infrastructure. // Pointer to the root of the infrastructure.
class context_t *context; class dispatcher_t *dispatcher;
// Slot ID of the thread the object belongs to. // Slot ID of the thread the object belongs to.
int thread_slot; int thread_slot;
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <algorithm>
#include "../include/zmq.h"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "err.hpp"
#include "zmq_listener.hpp"
#include "io_thread.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
pending_term_acks (0),
app_thread (parent_)
{
}
zmq::socket_base_t::~socket_base_t ()
{
while (true) {
// On third pass of the loop there should be no more I/O objects
// because all connecters and listerners were destroyed during
// the first pass and all engines delivered by delayed 'own' commands
// are destroyed during the second pass.
if (io_objects.empty () && !pending_term_acks)
break;
// Send termination request to all associated I/O objects.
for (io_objects_t::size_type i = 0; i != io_objects.size (); i++)
send_term (io_objects [i]);
// Move the objects to the list of pending term acks.
pending_term_acks += io_objects.size ();
io_objects.clear ();
// Process commands till we get all the termination acknowledgements.
while (pending_term_acks)
app_thread->process_commands (true);
}
}
int zmq::socket_base_t::bind (const char *addr_, struct zmq_opts *opts_)
{
uint64_t taskset = opts_ ? opts_->taskset : 0;
object_t *listener = new zmq_listener_t (choose_io_thread (taskset), this);
send_plug (listener);
send_own (this, listener);
return 0;
}
int zmq::socket_base_t::connect (const char *addr_, struct zmq_opts *opts_)
{
zmq_assert (false);
}
int zmq::socket_base_t::subscribe (const char *criteria_)
{
zmq_assert (false);
}
int zmq::socket_base_t::send (struct zmq_msg *msg_, int flags_)
{
zmq_assert (false);
}
int zmq::socket_base_t::flush ()
{
zmq_assert (false);
}
int zmq::socket_base_t::recv (struct zmq_msg *msg_, int flags_)
{
zmq_assert (false);
}
int zmq::socket_base_t::close ()
{
app_thread->remove_socket (this);
delete this;
return 0;
}
void zmq::socket_base_t::process_own (object_t *object_)
{
io_objects.push_back (object_);
}
void zmq::socket_base_t::process_term_req (object_t *object_)
{
// If I/O object is well and alive ask it to terminate.
// TODO: Following find may produce an unacceptable jitter in
// C10K-style applications. If so, use set instead of vector.
io_objects_t::iterator it = std::find (io_objects.begin (),
io_objects.end (), object_);
if (it != io_objects.end ()) {
pending_term_acks++;
io_objects.erase (it);
send_term (object_);
}
// If not found, we assume that termination request was already sent to
// the object so we can sagely ignore the request.
}
void zmq::socket_base_t::process_term_ack ()
{
zmq_assert (pending_term_acks);
pending_term_acks--;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#include <vector>
#include "i_api.hpp"
#include "object.hpp"
namespace zmq
{
class socket_base_t : public object_t, public i_api
{
public:
socket_base_t (class app_thread_t *parent_);
~socket_base_t ();
// i_api interface implementation.
int bind (const char *addr_, struct zmq_opts *opts_);
int connect (const char *addr_, struct zmq_opts *opts_);
int subscribe (const char *criteria_);
int send (struct zmq_msg *msg_, int flags_);
int flush ();
int recv (struct zmq_msg *msg_, int flags_);
int close ();
private:
// Handlers for incoming commands.
void process_own (object_t *object_);
void process_term_req (object_t *object_);
void process_term_ack ();
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::vector <object_t*> io_objects_t;
io_objects_t io_objects;
// Number of I/O objects that were already asked to terminate
// but haven't acknowledged it yet.
int pending_term_acks;
// Application thread the socket lives in.
class app_thread_t *app_thread;
socket_base_t (const socket_base_t&);
void operator = (const socket_base_t&);
};
}
#endif
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#include "i_api.hpp" #include "i_api.hpp"
#include "err.hpp" #include "err.hpp"
#include "context.hpp" #include "dispatcher.hpp"
#include "msg.hpp" #include "msg.hpp"
int zmq_msg_init (zmq_msg *msg_) int zmq_msg_init (zmq_msg *msg_)
...@@ -162,27 +162,28 @@ int zmq_msg_type (zmq_msg *msg_) ...@@ -162,27 +162,28 @@ int zmq_msg_type (zmq_msg *msg_)
void *zmq_init (int app_threads_, int io_threads_) void *zmq_init (int app_threads_, int io_threads_)
{ {
// There should be at least a single thread managed by the context. // There should be at least a single thread managed by the dispatcher.
if (app_threads_ < 0 || io_threads_ < 0 || if (app_threads_ < 0 || io_threads_ < 0 ||
app_threads_ + io_threads_ == 0) { app_threads_ + io_threads_ == 0) {
errno = EINVAL; errno = EINVAL;
return NULL; return NULL;
} }
zmq::context_t *context = new zmq::context_t (app_threads_, io_threads_); zmq::dispatcher_t *dispatcher = new zmq::dispatcher_t (app_threads_,
zmq_assert (context); io_threads_);
return (void*) context; zmq_assert (dispatcher);
return (void*) dispatcher;
} }
int zmq_term (void *context_) int zmq_term (void *dispatcher_)
{ {
delete (zmq::context_t*) context_; delete (zmq::dispatcher_t*) dispatcher_;
return 0; return 0;
} }
void *zmq_socket (void *context_, int type_) void *zmq_socket (void *dispatcher_, int type_)
{ {
return (void*) (((zmq::context_t*) context_)->create_socket (type_)); return (void*) (((zmq::dispatcher_t*) dispatcher_)->create_socket (type_));
} }
int zmq_close (void *s_) int zmq_close (void *s_)
......
...@@ -17,20 +17,19 @@ ...@@ -17,20 +17,19 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef __ZMQ_I_SOCKET_HPP_INCLUDED__ #include "zmq_listener.hpp"
#define __ZMQ_I_SOCKET_HPP_INCLUDED__ #include "err.hpp"
namespace zmq zmq::zmq_listener_t::zmq_listener_t (object_t *parent_, object_t *owner_) :
io_object_t (parent_, owner_)
{ {
}
struct i_socket zmq::zmq_listener_t::~zmq_listener_t ()
{ {
virtual ~i_socket () {};
// Start shutting down the socket.
virtual void stop () = 0;
};
} }
#endif void zmq::zmq_listener_t::process_plug ()
{
// TODO: Register with the I/O thread here.
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__
#include "io_object.hpp"
namespace zmq
{
class zmq_listener_t : public io_object_t
{
public:
zmq_listener_t (object_t *parent_, object_t *owner_);
~zmq_listener_t ();
private:
// Handlers for incoming commands.
void process_plug ();
zmq_listener_t (const zmq_listener_t&);
void operator = (const zmq_listener_t&);
};
}
#endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment