Commit 0b5cc026 authored by Martin Sustrik's avatar Martin Sustrik

clean up - session/socket/engine stuff removed

parent b8b4acef
......@@ -7,53 +7,30 @@ libzmq_la_SOURCES = \
atomic_ptr.hpp \
command.hpp \
config.hpp \
connecter.hpp \
context.hpp \
data_distributor.hpp \
decoder.hpp \
devpoll.hpp \
dummy_aggregator.hpp \
dummy_distributor.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
fair_aggregator.hpp \
fd.hpp \
fd_signaler.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
i_api.hpp \
i_demux.hpp \
i_mux.hpp \
i_poller.hpp \
i_poll_events.hpp \
i_session.hpp \
i_signaler.hpp \
i_engine.hpp \
i_thread.hpp \
listener.hpp \
i_socket.hpp \
kqueue.hpp \
load_balancer.hpp \
msg.hpp \
mutex.hpp \
object.hpp \
p2p.hpp \
pipe.hpp \
pipe_reader.hpp \
pipe_writer.hpp \
platform.hpp \
poll.hpp \
pub.hpp \
rep.hpp \
req.hpp \
safe_object.hpp \
select.hpp \
session.hpp \
session_stub.hpp \
simple_semaphore.hpp \
socket_base.hpp \
sub.hpp \
stdint.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
......@@ -65,50 +42,24 @@ libzmq_la_SOURCES = \
ypipe.hpp \
ypollset.hpp \
yqueue.hpp \
zmq_decoder.hpp \
zmq_encoder.hpp \
zmq_tcp_engine.hpp \
app_thread.cpp \
connecter.cpp \
context.cpp \
data_distributor.cpp \
devpoll.hpp \
dummy_aggregator.cpp \
dummy_distributor.cpp \
epoll.cpp \
err.cpp \
fair_aggregator.cpp \
fd_signaler.cpp \
io_object.cpp \
io_thread.cpp \
ip.cpp \
kqueue.cpp \
listener.cpp \
load_balancer.cpp \
object.cpp \
p2p.cpp \
pipe.cpp \
pipe_reader.cpp \
pipe_writer.cpp \
poll.cpp \
pub.cpp \
rep.cpp \
req.cpp \
safe_object.cpp \
select.cpp \
session.cpp \
session_stub.cpp \
socket_base.cpp \
sub.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
uuid.cpp \
ypollset.cpp \
zmq_decoder.cpp \
zmq_encoder.cpp \
zmq_tcp_engine.cpp \
zmq.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0
......
......@@ -28,20 +28,8 @@
#include "app_thread.hpp"
#include "context.hpp"
#include "err.hpp"
#include "session.hpp"
#include "pipe.hpp"
#include "config.hpp"
#include "i_api.hpp"
#include "dummy_aggregator.hpp"
#include "fair_aggregator.hpp"
#include "dummy_distributor.hpp"
#include "data_distributor.hpp"
#include "load_balancer.hpp"
#include "p2p.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
......@@ -58,37 +46,15 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
{
}
void zmq::app_thread_t::shutdown ()
{
// Deallocate all the sessions associated with the thread.
while (!sessions.empty ())
sessions [0]->shutdown ();
delete this;
}
zmq::app_thread_t::~app_thread_t ()
{
}
void zmq::app_thread_t::attach_session (session_t *session_)
{
session_->set_index (sessions.size ());
sessions.push_back (session_);
}
void zmq::app_thread_t::detach_session (session_t *session_)
{
// O(1) removal of the session from the list.
sessions_t::size_type i = session_->get_index ();
sessions [i] = sessions [sessions.size () - 1];
sessions [i]->set_index (i);
sessions.pop_back ();
}
// Ask all the sockets to start termination, then wait till it is complete.
for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
(*it)->stop ();
for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
delete *it;
zmq::i_poller *zmq::app_thread_t::get_poller ()
{
zmq_assert (false);
delete this;
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
......@@ -98,76 +64,20 @@ zmq::i_signaler *zmq::app_thread_t::get_signaler ()
bool zmq::app_thread_t::is_current ()
{
return !sessions.empty () && tid == getpid ();
return !sockets.empty () && tid == getpid ();
}
bool zmq::app_thread_t::make_current ()
{
// If there are object managed by this slot we cannot assign the slot
// to a different thread.
if (!sessions.empty ())
if (!sockets.empty ())
return false;
tid = getpid ();
return true;
}
zmq::i_api *zmq::app_thread_t::create_socket (int type_)
{
i_mux *mux = NULL;
i_demux *demux = NULL;
session_t *session = NULL;
i_api *api = NULL;
switch (type_) {
case ZMQ_P2P:
mux = new dummy_aggregator_t;
zmq_assert (mux);
demux = new dummy_distributor_t;
zmq_assert (demux);
session = new session_t (this, this, mux, demux, true, false);
zmq_assert (session);
api = new p2p_t (this, session);
zmq_assert (api);
break;
case ZMQ_PUB:
demux = new data_distributor_t;
zmq_assert (demux);
session = new session_t (this, this, mux, demux, true, false);
zmq_assert (session);
api = new pub_t (this, session);
zmq_assert (api);
break;
case ZMQ_SUB:
mux = new fair_aggregator_t;
zmq_assert (mux);
session = new session_t (this, this, mux, demux, true, false);
zmq_assert (session);
api = new sub_t (this, session);
zmq_assert (api);
break;
case ZMQ_REQ:
// TODO
zmq_assert (false);
api = new req_t (this, session);
zmq_assert (api);
break;
case ZMQ_REP:
// TODO
zmq_assert (false);
api = new rep_t (this, session);
zmq_assert (api);
break;
default:
errno = EINVAL;
return NULL;
}
attach_session (session);
return api;
}
void zmq::app_thread_t::process_commands (bool block_)
{
ypollset_t::signals_t signals;
......
......@@ -22,7 +22,7 @@
#include <vector>
#include "i_thread.hpp"
#include "i_socket.hpp"
#include "stdint.hpp"
#include "object.hpp"
#include "ypollset.hpp"
......@@ -30,23 +30,18 @@
namespace zmq
{
class app_thread_t : public object_t, public i_thread
class app_thread_t : public object_t
{
public:
app_thread_t (class context_t *context_, int thread_slot_);
// To be called when the whole infrastrucure is being closed.
void shutdown ();
~app_thread_t ();
// Returns signaler associated with this application thread.
i_signaler *get_signaler ();
// Create socket engine in this thread. Return false if the calling
// thread doesn't match the thread handled by this app thread object.
struct i_api *create_socket (int type_);
// Nota bene: The following two functions are accessed from different
// Nota bene: Following two functions are accessed from different
// threads. The caller (context) is responsible for synchronisation
// of accesses.
......@@ -61,25 +56,17 @@ namespace zmq
// set to true, returns only after at least one command was processed.
void process_commands (bool block_);
// i_thread implementation.
void attach_session (class session_t *session_);
void detach_session (class session_t *session_);
struct i_poller *get_poller ();
private:
// Clean-up.
~app_thread_t ();
// All the sockets created from this application thread.
typedef std::vector <i_socket*> sockets_t;
sockets_t sockets;
// Thread ID associated with this slot.
// TODO: Virtualise pid_t!
// TODO: Check whether getpid returns unique ID for each thread.
int tid;
// Vector of all sessionss associated with this app thread.
typedef std::vector <class session_t*> sessions_t;
sessions_t sessions;
// App thread's signaler object.
ypollset_t pollset;
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "connecter.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "err.hpp"
#include "simple_semaphore.hpp"
#include "zmq_tcp_engine.hpp"
zmq::connecter_t::connecter_t (io_thread_t *thread_, const char *addr_,
session_t *session_) :
io_object_t (thread_),
state (idle),
poller (NULL),
session (session_),
addr (addr_),
identity ("abcde"),
engine (NULL)
{
}
void zmq::connecter_t::terminate ()
{
delete this;
}
void zmq::connecter_t::shutdown ()
{
delete this;
}
zmq::connecter_t::~connecter_t ()
{
}
void zmq::connecter_t::process_reg (simple_semaphore_t *smph_)
{
// Fet poller pointer for further use.
zmq_assert (!poller);
poller = get_poller ();
// Ask the session to register itself with the I/O thread. Note that
// the session is living in the same I/O thread, thus this results
// in a synchronous call.
session->inc_seqnum ();
send_reg (session, NULL);
// Unlock the application thread that created the connecter.
if (smph_)
smph_->post ();
// Manually trigger timer event which will launch asynchronous connect.
state = waiting;
timer_event ();
}
void zmq::connecter_t::process_unreg (simple_semaphore_t *smph_)
{
// Unregister connecter/engine from the poller.
zmq_assert (poller);
if (state == connecting)
poller->rm_fd (handle);
else if (state == waiting)
poller->cancel_timer (this);
else if (state == sending)
engine->terminate ();
// Unlock the application thread closing the connecter.
if (smph_)
smph_->post ();
}
void zmq::connecter_t::in_event ()
{
// Error occured in asynchronous connect. Retry to connect after a while.
if (state == connecting) {
fd_t fd = tcp_connecter.connect ();
zmq_assert (fd == retired_fd);
poller->rm_fd (handle);
poller->add_timer (this);
state = waiting;
return;
}
zmq_assert (false);
}
void zmq::connecter_t::out_event ()
{
if (state == connecting) {
fd_t fd = tcp_connecter.connect ();
if (fd == retired_fd) {
poller->rm_fd (handle);
poller->add_timer (this);
state = waiting;
return;
}
poller->rm_fd (handle);
engine = new zmq_tcp_engine_t (fd);
zmq_assert (engine);
engine->attach (poller, this);
state = sending;
return;
}
zmq_assert (false);
}
void zmq::connecter_t::timer_event ()
{
zmq_assert (state == waiting);
// Initiate async connect and start polling for its completion. If async
// connect fails instantly, try to reconnect after a while.
int rc = tcp_connecter.open (addr.c_str ());
if (rc == 0) {
state = connecting;
in_event ();
}
else if (rc == 1) {
handle = poller->add_fd (tcp_connecter.get_fd (), this);
poller->set_pollout (handle);
state = connecting;
}
else {
poller->add_timer (this);
state = waiting;
}
}
void zmq::connecter_t::set_engine (struct i_engine *engine_)
{
engine = engine_;
}
bool zmq::connecter_t::read (zmq_msg *msg_)
{
zmq_assert (state == sending);
// Deallocate old content of the message just in case.
zmq_msg_close (msg_);
// Send the identity.
zmq_msg_init_size (msg_, identity.size ());
memcpy (zmq_msg_data (msg_), identity.c_str (), identity.size ());
// Ask engine to unregister from the poller.
i_engine *e = engine;
engine->detach ();
// Attach the engine to the session. (Note that this is actually
// a synchronous call.
session->inc_seqnum ();
send_engine (session, e);
state = idle;
return true;
}
bool zmq::connecter_t::write (struct zmq_msg *msg_)
{
// No incoming messages are accepted till identity is sent.
return false;
}
void zmq::connecter_t::flush ()
{
// No incoming messages are accepted till identity is sent.
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_CONNECTER_HPP_INCLUDED__
#define __ZMQ_CONNECTER_HPP_INCLUDED__
#include <string>
#include "../include/zmq.h"
#include "i_poller.hpp"
#include "io_object.hpp"
#include "i_poll_events.hpp"
#include "i_session.hpp"
#include "tcp_connecter.hpp"
namespace zmq
{
class connecter_t : public io_object_t, public i_poll_events,
public i_session
{
public:
connecter_t (class io_thread_t *thread_, const char *addr_,
class session_t *session_);
void terminate ();
void shutdown ();
void process_reg (class simple_semaphore_t *smph_);
void process_unreg (class simple_semaphore_t *smph_);
// i_poll_events implementation.
void in_event ();
void out_event ();
void timer_event ();
// i_session implementation
void set_engine (struct i_engine *engine_);
// void shutdown ();
bool read (struct zmq_msg *msg_);
bool write (struct zmq_msg *msg_);
void flush ();
private:
// Clean-up.
~connecter_t ();
enum {
idle,
waiting,
connecting,
sending
} state;
// Cached pointer to the poller.
struct i_poller *poller;
// Handle of the connecting socket.
handle_t handle;
// Associated session. It lives in the same I/O thread.
class session_t *session;
// Address to connect to.
std::string addr;
// Identity of the connection.
std::string identity;
tcp_connecter_t tcp_connecter;
struct i_engine *engine;
connecter_t (const connecter_t&);
void operator = (const connecter_t&);
};
}
#endif
......@@ -20,15 +20,12 @@
#include "../include/zmq.h"
#include "context.hpp"
#include "i_api.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "pipe_reader.hpp"
#include "pipe_writer.hpp"
#include "session.hpp"
#include "i_api.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.h"
......@@ -72,37 +69,23 @@ zmq::context_t::context_t (int app_threads_, int io_threads_)
io_threads [i]->start ();
}
void zmq::context_t::shutdown ()
{
delete this;
}
zmq::context_t::~context_t ()
{
// Ask I/O threads to terminate.
// Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
delete app_threads [i];
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->stop ();
// Wait till I/O threads actually terminate.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->join ();
// At this point the current thread is the only thread with access to
// our internal data. Deallocation will be done exclusively in this thread.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
app_threads [i]->shutdown ();
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
io_threads [i]->shutdown ();
delete io_threads [i];
delete [] command_pipes;
// Deallocate all the pipes, pipe readers and pipe writers.
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++) {
delete it->pipe;
delete it->reader;
delete it->writer;
}
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
int rc = WSACleanup ();
......@@ -123,7 +106,11 @@ zmq::i_api *zmq::context_t::create_socket (int type_)
threads_sync.unlock ();
return NULL;
}
i_api *s = thread->create_socket (type_);
zmq_assert (false);
i_api *s = NULL;
//i_api *s = thread->create_socket (type_);
threads_sync.unlock ();
return s;
}
......@@ -164,103 +151,3 @@ zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_)
return io_threads [result];
}
void zmq::context_t::create_pipe (object_t *reader_parent_,
object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
pipe_reader_t **reader_, pipe_writer_t **writer_)
{
// Create the pipe, reader & writer triple.
pipe_t *pipe = new pipe_t;
zmq_assert (pipe);
pipe_reader_t *reader = new pipe_reader_t (reader_parent_, pipe,
hwm_, lwm_);
zmq_assert (reader);
pipe_writer_t *writer = new pipe_writer_t (writer_parent_, pipe, reader,
hwm_, lwm_);
zmq_assert (writer);
reader->set_peer (writer);
// Store the pipe in the repository.
pipe_info_t info = {pipe, reader, writer};
pipes_sync.lock ();
pipe->set_index (pipes.size ());
pipes.push_back (info);
pipes_sync.unlock ();
*reader_ = reader;
*writer_ = writer;
}
void zmq::context_t::destroy_pipe (pipe_t *pipe_)
{
// Remove the pipe from the repository.
pipe_info_t info;
pipes_sync.lock ();
pipes_t::size_type i = pipe_->get_index ();
info = pipes [i];
pipes [i] = pipes.back ();
pipes.pop_back ();
pipes_sync.unlock ();
// Deallocate the pipe and associated pipe reader & pipe writer.
zmq_assert (info.pipe == pipe_);
delete info.pipe;
delete info.reader;
delete info.writer;
}
int zmq::context_t::register_inproc_endpoint (const char *endpoint_,
session_t *session_)
{
inproc_endpoint_sync.lock ();
inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
if (it != inproc_endpoints.end ()) {
inproc_endpoint_sync.unlock ();
errno = EADDRINUSE;
return -1;
}
inproc_endpoints.insert (std::make_pair (endpoint_, session_));
inproc_endpoint_sync.unlock ();
return 0;
}
zmq::object_t *zmq::context_t::get_inproc_endpoint (const char *endpoint_)
{
inproc_endpoint_sync.lock ();
inproc_endpoints_t::iterator it = inproc_endpoints.find (endpoint_);
if (it == inproc_endpoints.end ()) {
inproc_endpoint_sync.unlock ();
errno = EADDRNOTAVAIL;
return NULL;
}
it->second->inc_seqnum ();
object_t *session = it->second;
inproc_endpoint_sync.unlock ();
return session;
}
void zmq::context_t::unregister_inproc_endpoints (session_t *session_)
{
inproc_endpoint_sync.lock ();
// Remove the connection from the repository.
// TODO: Yes, the algorithm has O(n^2) complexity. Should be O(log n).
for (inproc_endpoints_t::iterator it = inproc_endpoints.begin ();
it != inproc_endpoints.end ();) {
if (it->second == session_) {
inproc_endpoints.erase (it);
it = inproc_endpoints.begin ();
}
else
it++;
}
inproc_endpoint_sync.unlock ();
}
......@@ -52,9 +52,9 @@ namespace zmq
context_t (int app_threads_, int io_threads_);
// To be called to terminate the whole infrastructure (zmq_term).
void shutdown ();
~context_t ();
// Create a socket engine.
// Create a socket.
struct i_api *create_socket (int type_);
// Returns number of thread slots in the context. To be used by
......@@ -81,37 +81,12 @@ namespace zmq
destination_].read (command_);
}
// Creates new pipe.
void create_pipe (class object_t *reader_parent_,
class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
class pipe_reader_t **reader_, class pipe_writer_t **writer_);
// Deallocates the pipe.
void destroy_pipe (class pipe_t *pipe_);
// Registers existing session object as an inproc endpoint.
int register_inproc_endpoint (const char *endpoint_,
class session_t *session_);
// Retrieves an inproc endpoint. Increments the command sequence number
// of the object by one. Caller is thus bound to send the command
// to the connection after invoking this function. Returns NULL if
// the endpoint doesn't exist.
class object_t *get_inproc_endpoint (const char *endpoint_);
// Removes all the inproc endpoints associated with the given session
// object from the global repository.
void unregister_inproc_endpoints (class session_t *session_);
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
private:
// Clean-up.
~context_t ();
// Returns the app thread associated with the current thread.
// NULL if we are out of app thread slots.
class app_thread_t *choose_app_thread ();
......@@ -137,29 +112,6 @@ namespace zmq
// Synchronisation of accesses to shared thread data.
mutex_t threads_sync;
// Global repository of pipes. It's used only on terminal shutdown
// to deallocate all the pipes irrespective of whether they are
// referenced from pipe_reader, pipe_writer or both.
struct pipe_info_t
{
class pipe_t *pipe;
class pipe_reader_t *reader;
class pipe_writer_t *writer;
};
typedef std::vector <pipe_info_t> pipes_t;
pipes_t pipes;
// Synchronisation of access to global repository of pipes.
mutex_t pipes_sync;
// Global repository of available inproc endpoints.
typedef std::map <std::string, class session_t*> inproc_endpoints_t;
inproc_endpoints_t inproc_endpoints;
// Synchronisation of access to the global repository
// of inproc endpoints.
mutex_t inproc_endpoint_sync;
context_t (const context_t&);
void operator = (const context_t&);
};
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "data_distributor.hpp"
#include "pipe_writer.hpp"
#include "err.hpp"
#include "session.hpp"
#include "msg.hpp"
zmq::data_distributor_t::data_distributor_t () :
session (NULL)
{
}
void zmq::data_distributor_t::set_session (session_t *session_)
{
zmq_assert (!session);
session = session_;
}
void zmq::data_distributor_t::shutdown ()
{
// No need to deallocate pipes here. They'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
void zmq::data_distributor_t::terminate ()
{
// Pipe unregisters itself during the call to terminate, so the pipes
// list shinks by one in each iteration.
while (!pipes.empty ())
pipes [0]->terminate ();
delete this;
}
zmq::data_distributor_t::~data_distributor_t ()
{
}
void zmq::data_distributor_t::attach_pipe (pipe_writer_t *pipe_)
{
// Associate demux with a new pipe.
pipe_->set_demux (this);
pipe_->set_index (pipes.size ());
pipes.push_back (pipe_);
}
void zmq::data_distributor_t::detach_pipe (pipe_writer_t *pipe_)
{
// Release the reference to the pipe.
int index = pipe_->get_index ();
pipe_->set_index (-1);
pipes [index] = pipes.back ();
pipes [index]->set_index (index);
pipes.pop_back ();
}
bool zmq::data_distributor_t::empty ()
{
return pipes.empty ();
}
bool zmq::data_distributor_t::send (zmq_msg *msg_)
{
int pipes_count = pipes.size ();
// If there are no pipes available, simply drop the message.
if (pipes_count == 0) {
zmq_msg_close (msg_);
zmq_msg_init (msg_);
return true;
}
// TODO: ???
// First check whether all pipes are available for writing.
// for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
// if (!(*it)->check_write (msg_))
// return false;
// For VSMs the copying is straighforward.
if (msg_->content == (zmq_msg_content*) ZMQ_VSM) {
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
write_to_pipe (*it, msg_);
zmq_msg_init (msg_);
return true;
}
// Optimisation for the case when there's only a single pipe
// to send the message to - no refcount adjustment (i.e. atomic
// operations) needed.
if (pipes_count == 1) {
write_to_pipe (*pipes.begin (), msg_);
zmq_msg_init (msg_);
return true;
}
// There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why the -1).
if (msg_->shared)
msg_->content->refcnt.add (pipes_count - 1);
else {
msg_->shared = true;
// TODO: Add memory barrier here.
msg_->content->refcnt.set (pipes_count);
}
// Push the message to all destinations.
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
write_to_pipe (*it, msg_);
// Detach the original message from the data buffer.
zmq_msg_init (msg_);
return true;
}
void zmq::data_distributor_t::flush ()
{
// Flush all pipes. If there's large number of pipes, it can be pretty
// inefficient (especially if there's new message only in a single pipe).
// Can it be improved?
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
(*it)->flush ();
}
void zmq::data_distributor_t::write_to_pipe (class pipe_writer_t *pipe_,
struct zmq_msg *msg_)
{
if (!pipe_->write (msg_)) {
// TODO: Push gap notification to the pipe.
zmq_assert (false);
}
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__
#define __ZMQ_DATA_DISTRIBUTOR_HPP_INCLUDED__
#include <vector>
#include <i_demux.hpp>
namespace zmq
{
// Object to distribute messages to outbound pipes.
class data_distributor_t : public i_demux
{
public:
data_distributor_t ();
// i_demux implementation.
void set_session (class session_t *session_);
void shutdown ();
void terminate ();
void attach_pipe (class pipe_writer_t *pipe_);
void detach_pipe (class pipe_writer_t *pipe_);
bool empty ();
bool send (struct zmq_msg *msg_);
void flush ();
private:
// Clean-up.
~data_distributor_t ();
// Reference to the owner session object.
class session_t *session;
// Writes the message to the pipe if possible. If it isn't, writes
// a gap notification to the pipe.
void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_);
// The list of outbound pipes.
typedef std::vector <class pipe_writer_t*> pipes_t;
pipes_t pipes;
data_distributor_t (const data_distributor_t&);
void operator = (const data_distributor_t&);
};
}
#endif
......@@ -52,6 +52,10 @@ zmq::devpoll_t::devpoll_t ()
zmq::devpoll_t::~devpoll_t ()
{
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
worker.stop ();
close (devpoll_fd);
}
......@@ -152,11 +156,6 @@ void zmq::devpoll_t::stop ()
stopping = true;
}
void zmq::devpoll_t::join ()
{
worker.stop ();
}
bool zmq::devpoll_t::loop ()
{
// According to the poll(7d) man page, we can retrieve
......
......@@ -42,7 +42,7 @@ namespace zmq
public:
devpoll_t ();
virtual ~devpoll_t ();
~devpoll_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
......@@ -56,7 +56,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
void join ();
private:
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "dummy_aggregator.hpp"
#include "err.hpp"
#include "pipe_reader.hpp"
#include "session.hpp"
// Swaps pipes at specified indices.
#define swap_pipes(i1_, i2_) \
std::swap (pipes [i1_], pipes [i2_]);\
pipes [i1_]->set_index (i1_);\
pipes [i2_]->set_index (i2_);
zmq::dummy_aggregator_t::dummy_aggregator_t () :
session (NULL),
pipe (NULL),
active (false)
{
}
void zmq::dummy_aggregator_t::set_session (session_t *session_)
{
zmq_assert (!session);
session = session_;
}
void zmq::dummy_aggregator_t::shutdown ()
{
// No need to deallocate the pipe here. It'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
void zmq::dummy_aggregator_t::terminate ()
{
if (pipe)
pipe->terminate ();
delete this;
}
zmq::dummy_aggregator_t::~dummy_aggregator_t ()
{
}
void zmq::dummy_aggregator_t::attach_pipe (pipe_reader_t *pipe_)
{
zmq_assert (!pipe);
pipe = pipe_;
active = true;
// Associate new pipe with the mux object.
pipe_->set_mux (this);
session->revive ();
}
void zmq::dummy_aggregator_t::detach_pipe (pipe_reader_t *pipe_)
{
zmq_assert (pipe == pipe_);
deactivate (pipe_);
pipe = NULL;
}
bool zmq::dummy_aggregator_t::empty ()
{
return pipe == NULL;
}
bool zmq::dummy_aggregator_t::recv (zmq_msg *msg_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// Try to read from the pipe.
if (pipe && pipe->read (msg_))
return true;
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
return false;
}
void zmq::dummy_aggregator_t::deactivate (pipe_reader_t *pipe_)
{
active = false;
}
void zmq::dummy_aggregator_t::reactivate (pipe_reader_t *pipe_)
{
active = true;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__
#define __ZMQ_DUMMY_AGGREGATOR_HPP_INCLUDED__
#include <vector>
#include "i_mux.hpp"
namespace zmq
{
// Fake message aggregator. There can be at most one pipe bound to it,
// so there's no real aggregation going on. However, it is more efficient
// than a real aggregator. It's intended to be used in the contexts
// where business logic ensures there'll be at most one pipe bound.
class dummy_aggregator_t : public i_mux
{
public:
dummy_aggregator_t ();
// i_mux interface implementation.
void set_session (session_t *session_);
void shutdown ();
void terminate ();
void attach_pipe (class pipe_reader_t *pipe_);
void detach_pipe (class pipe_reader_t *pipe_);
bool empty ();
void deactivate (class pipe_reader_t *pipe_);
void reactivate (class pipe_reader_t *pipe_);
bool recv (struct zmq_msg *msg_);
private:
// Clean-up.
~dummy_aggregator_t ();
// Reference to the owner session object.
class session_t *session;
// The single pipe bound.
class pipe_reader_t *pipe;
// If true, the pipe is active.
bool active;
dummy_aggregator_t (const dummy_aggregator_t&);
void operator = (const dummy_aggregator_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "dummy_distributor.hpp"
#include "pipe_writer.hpp"
#include "err.hpp"
#include "session.hpp"
#include "msg.hpp"
zmq::dummy_distributor_t::dummy_distributor_t () :
session (NULL)
{
}
void zmq::dummy_distributor_t::set_session (session_t *session_)
{
zmq_assert (!session);
session = session_;
}
void zmq::dummy_distributor_t::shutdown ()
{
// No need to deallocate pipe here. It'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
void zmq::dummy_distributor_t::terminate ()
{
if (pipe)
pipe->terminate ();
delete this;
}
zmq::dummy_distributor_t::~dummy_distributor_t ()
{
}
void zmq::dummy_distributor_t::attach_pipe (pipe_writer_t *pipe_)
{
zmq_assert (!pipe);
pipe = pipe_;
}
void zmq::dummy_distributor_t::detach_pipe (pipe_writer_t *pipe_)
{
zmq_assert (pipe == pipe_);
pipe = NULL;
}
bool zmq::dummy_distributor_t::empty ()
{
return pipe == NULL;
}
bool zmq::dummy_distributor_t::send (zmq_msg *msg_)
{
return pipe && pipe->write (msg_);
}
void zmq::dummy_distributor_t::flush ()
{
if (pipe)
pipe->flush ();
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__
#define __ZMQ_DUMMY_DISTRIBUTOR_HPP_INCLUDED__
#include <vector>
#include <i_demux.hpp>
namespace zmq
{
// Fake message distributor. There can be only one pipe bound to it
// so there no real distribution going on. However, it is more efficient
// than a real distributor and should be used where business logic
// ensures there'll be at most one pipe bound.
class dummy_distributor_t : public i_demux
{
public:
dummy_distributor_t ();
// i_demux implementation.
void set_session (class session_t *session_);
void shutdown ();
void terminate ();
void attach_pipe (class pipe_writer_t *pipe_);
void detach_pipe (class pipe_writer_t *pipe_);
bool empty ();
bool send (struct zmq_msg *msg_);
void flush ();
private:
// Clean-up.
~dummy_distributor_t ();
// Reference to the owner session object.
class session_t *session;
// The bound pipe.
class pipe_writer_t *pipe;
dummy_distributor_t (const dummy_distributor_t&);
void operator = (const dummy_distributor_t&);
};
}
#endif
......@@ -41,8 +41,11 @@ zmq::epoll_t::epoll_t () :
zmq::epoll_t::~epoll_t ()
{
close (epoll_fd);
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
worker.stop ();
close (epoll_fd);
for (retired_t::iterator it = retired.begin (); it != retired.end (); it ++)
delete *it;
}
......@@ -144,11 +147,6 @@ void zmq::epoll_t::stop ()
stopping = true;
}
void zmq::epoll_t::join ()
{
worker.stop ();
}
void zmq::epoll_t::loop ()
{
epoll_event ev_buf [max_io_events];
......
......@@ -44,7 +44,7 @@ namespace zmq
public:
epoll_t ();
virtual ~epoll_t ();
~epoll_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
......@@ -58,7 +58,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
void join ();
private:
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "fair_aggregator.hpp"
#include "err.hpp"
#include "pipe_reader.hpp"
#include "session.hpp"
// Swaps pipes at specified indices.
#define swap_pipes(i1_, i2_) \
std::swap (pipes [i1_], pipes [i2_]);\
pipes [i1_]->set_index (i1_);\
pipes [i2_]->set_index (i2_);
zmq::fair_aggregator_t::fair_aggregator_t () :
session (NULL),
active (0),
current (0)
{
}
void zmq::fair_aggregator_t::set_session (session_t *session_)
{
zmq_assert (!session);
session = session_;
}
void zmq::fair_aggregator_t::shutdown ()
{
// No need to deallocate pipes here. They'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
void zmq::fair_aggregator_t::terminate ()
{
// Pipe unregisters itself during the call to terminate, so the pipes
// list shinks by one in each iteration.
while (!pipes.empty ())
pipes [0]->terminate ();
delete this;
}
zmq::fair_aggregator_t::~fair_aggregator_t ()
{
}
void zmq::fair_aggregator_t::attach_pipe (pipe_reader_t *pipe_)
{
// Associate new pipe with the mux object.
pipe_->set_mux (this);
pipes.push_back (pipe_);
active++;
if (pipes.size () > active)
swap_pipes (pipes.size () - 1, active - 1);
if (active == 1)
session->revive ();
}
void zmq::fair_aggregator_t::detach_pipe (pipe_reader_t *pipe_)
{
// Move the pipe from the list of active pipes to the list of idle pipes.
deactivate (pipe_);
// Move the pipe to the end of the idle list and remove it.
swap_pipes (pipe_->get_index (), pipes.size () - 1);
pipes.pop_back ();
}
bool zmq::fair_aggregator_t::empty ()
{
return pipes.empty ();
}
bool zmq::fair_aggregator_t::recv (zmq_msg *msg_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// O(1) fair queueing. Round-robin over the active pipes to get
// next message.
for (pipes_t::size_type i = active; i != 0; i--) {
// Update current.
current = (current + 1) % active;
// Try to read from current.
if (pipes [current]->read (msg_))
return true;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
return false;
}
void zmq::fair_aggregator_t::deactivate (pipe_reader_t *pipe_)
{
int index = pipe_->get_index ();
// Suspend an active pipe.
swap_pipes (index, active - 1);
active--;
// If the deactiveted pipe is the current one, shift the current one pipe
// backwards so that the pipe that replaced the deactiveted one will be
// processed immediately rather than skipped.
if (index == (int) current) {
index--;
if (index == -1)
index = active - 1;
current = index;
}
}
void zmq::fair_aggregator_t::reactivate (pipe_reader_t *pipe_)
{
// Revive an idle pipe.
swap_pipes (pipe_->get_index (), active);
active++;
if (active == 1)
session->revive ();
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__
#define __ZMQ_FAIR_AGGREGATOR_HPP_INCLUDED__
#include <vector>
#include "i_mux.hpp"
namespace zmq
{
// Object to aggregate messages from inbound pipes.
class fair_aggregator_t : public i_mux
{
public:
fair_aggregator_t ();
// i_mux interface implementation.
void set_session (session_t *session_);
void shutdown ();
void terminate ();
void attach_pipe (class pipe_reader_t *pipe_);
void detach_pipe (class pipe_reader_t *pipe_);
bool empty ();
void deactivate (class pipe_reader_t *pipe_);
void reactivate (class pipe_reader_t *pipe_);
bool recv (struct zmq_msg *msg_);
private:
// Clean-up.
~fair_aggregator_t ();
// Reference to the owner session object.
class session_t *session;
// The list of inbound pipes. The active pipes are occupying indices
// from 0 to active-1. Suspended pipes occupy indices from 'active'
// to the end of the array.
typedef std::vector <class pipe_reader_t*> pipes_t;
pipes_t pipes;
// The number of active pipes.
pipes_t::size_type active;
// Pipe to retrieve next message from. The messages are retrieved
// from the pipes in round-robin fashion (a.k.a. fair queueing).
pipes_t::size_type current;
fair_aggregator_t (const fair_aggregator_t&);
void operator = (const fair_aggregator_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_API_HPP_INCLUDED__
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_DEMUX_HPP_INCLUDED__
#define __ZMQ_I_DEMUX_HPP_INCLUDED__
namespace zmq
{
struct i_demux
{
// Attaches mux to a particular session.
virtual void set_session (class session_t *session_) = 0;
// To be called when the whole infrastrucure
// is being closed (zmq_term).
virtual void shutdown () = 0;
// To be called when session is being closed.
virtual void terminate () = 0;
// Adds new pipe to the demux to send messages to.
virtual void attach_pipe (class pipe_writer_t *pipe_) = 0;
// Removes pipe from the demux.
virtual void detach_pipe (class pipe_writer_t *pipe_) = 0;
// Returns true if there's no pipe attached.
virtual bool empty () = 0;
// Sends the message. Returns false if the message cannot be sent
// because the pipes are full.
virtual bool send (struct zmq_msg *msg_) = 0;
// Flushes messages downstream.
virtual void flush () = 0;
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_ENGINE_HPP_INCLUDED__
#define __ZMQ_I_ENGINE_HPP_INCLUDED__
namespace zmq
{
// Generic interface to access engines from MD objects.
struct i_engine
{
// Attach the engine with specified context.
virtual void attach (struct i_poller *poller_,
struct i_session *session_) = 0;
// Detach the engine from the current context.
virtual void detach () = 0;
// Notify the engine that new messages are available.
virtual void revive () = 0;
// Called by session when it decides the engine
// should terminate itself.
virtual void schedule_terminate () = 0;
// Called by normal object termination process.
virtual void terminate () = 0;
// To be called by MD when terminal shutdown (zmq_term) is in progress.
virtual void shutdown () = 0;
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_MUX_HPP_INCLUDED__
#define __ZMQ_I_MUX_HPP_INCLUDED__
namespace zmq
{
struct i_mux
{
// Attaches mux to a particular session.
virtual void set_session (class session_t *session_) = 0;
// To be called when the whole infrastrucure
// is being closed (zmq_term).
virtual void shutdown () = 0;
// To be called when session is being closed.
virtual void terminate () = 0;
// Adds new pipe to the mux to send messages to.
virtual void attach_pipe (class pipe_reader_t *pipe_) = 0;
// Removes pipe from the mux.
virtual void detach_pipe (class pipe_reader_t *pipe_) = 0;
// Returns true if there's no pipe attached.
virtual bool empty () = 0;
// Shifts the pipe from active to passive state and vice versa.
// TODO: Check whether state transitions cannot be done by
// mux object itself without a need for external APIs.
virtual void deactivate (class pipe_reader_t *pipe_) = 0;
virtual void reactivate (class pipe_reader_t *pipe_) = 0;
// Receives a message. Returns false when there is no message
// to receive.
virtual bool recv (struct zmq_msg *msg_) = 0;
};
}
#endif
......@@ -75,13 +75,8 @@ namespace zmq
// This method is called from a foreign thread.
virtual void start () = 0;
// Ask underlying I/O thread to stop. This method is called from
// underlying thread (callback from io_thread object).
// Ask underlying I/O thread to stop.
virtual void stop () = 0;
// Wait for termination of undelying I/O thread.
// This method is called from a foreign thread.
virtual void join () = 0;
};
}
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_SESSION_HPP_INCLUDED__
#define __ZMQ_I_SESSION_HPP_INCLUDED__
namespace zmq
{
struct i_session
{
virtual void set_engine (struct i_engine *engine_) = 0;
virtual void shutdown () = 0;
virtual bool read (struct zmq_msg *msg_) = 0;
virtual bool write (struct zmq_msg *msg_) = 0;
virtual void flush () = 0;
};
}
#endif
......@@ -17,13 +17,20 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#ifndef __ZMQ_I_SOCKET_HPP_INCLUDED__
#define __ZMQ_I_SOCKET_HPP_INCLUDED__
#include "req.hpp"
#include "app_thread.hpp"
#include "session.hpp"
zmq::req_t::req_t (app_thread_t *thread_, session_t *session_) :
socket_base_t (thread_, session_)
namespace zmq
{
struct i_socket
{
virtual ~i_socket () {};
// Start shutting down the socket.
virtual void stop () = 0;
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_THREAD_HPP_INCLUDED__
#define __ZMQ_I_THREAD_HPP_INCLUDED__
namespace zmq
{
// Interface used by session object to communicate with the thread
// it belongs to.
struct i_thread
{
virtual void attach_session (class session_t *session_) = 0;
virtual void detach_session (class session_t *session_) = 0;
virtual struct i_poller *get_poller () = 0;
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "io_object.hpp"
#include "io_thread.hpp"
#include "i_poller.hpp"
zmq::io_object_t::io_object_t (io_thread_t *thread_) :
object_t (thread_),
thread (thread_)
{
}
zmq::io_object_t::~io_object_t ()
{
}
zmq::i_poller *zmq::io_object_t::get_poller ()
{
return thread->get_poller ();
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_IO_OBJECT_HPP_INCLUDED__
#define __ZMQ_IO_OBJECT_HPP_INCLUDED__
#include "object.hpp"
namespace zmq
{
// All objects running within the context of an I/O thread should be
// derived from this class to allow owning application threads to
// destroy them.
class io_object_t : public object_t
{
public:
io_object_t (class io_thread_t *thread_);
~io_object_t ();
virtual void terminate () = 0;
virtual void shutdown () = 0;
struct i_poller *get_poller ();
private:
class io_thread_t *thread;
};
}
#endif
......@@ -30,9 +30,7 @@
#include "devpoll.hpp"
#include "kqueue.hpp"
#include "context.hpp"
#include "session.hpp"
#include "simple_semaphore.hpp"
#include "session.hpp"
zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) :
object_t (context_, thread_slot_)
......@@ -76,15 +74,6 @@ zmq::io_thread_t::io_thread_t (context_t *context_, int thread_slot_) :
poller->set_pollin (signaler_handle);
}
void zmq::io_thread_t::shutdown ()
{
// Deallocate all the sessions associated with the thread.
while (!sessions.empty ())
sessions [0]->shutdown ();
delete this;
}
zmq::io_thread_t::~io_thread_t ()
{
delete poller;
......@@ -101,11 +90,6 @@ void zmq::io_thread_t::stop ()
send_stop ();
}
void zmq::io_thread_t::join ()
{
poller->join ();
}
zmq::i_signaler *zmq::io_thread_t::get_signaler ()
{
return &signaler;
......@@ -149,21 +133,6 @@ void zmq::io_thread_t::timer_event ()
zmq_assert (false);
}
void zmq::io_thread_t::attach_session (session_t *session_)
{
session_->set_index (sessions.size ());
sessions.push_back (session_);
}
void zmq::io_thread_t::detach_session (session_t *session_)
{
// O(1) removal of the session from the list.
sessions_t::size_type i = session_->get_index ();
sessions [i] = sessions [sessions.size () - 1];
sessions [i]->set_index (i);
sessions.pop_back ();
}
zmq::i_poller *zmq::io_thread_t::get_poller ()
{
zmq_assert (poller);
......
......@@ -23,7 +23,6 @@
#include <vector>
#include "object.hpp"
#include "i_thread.hpp"
#include "i_poller.hpp"
#include "i_poll_events.hpp"
#include "fd_signaler.hpp"
......@@ -34,26 +33,22 @@ namespace zmq
// Generic part of the I/O thread. Polling-mechanism-specific features
// are implemented in separate "polling objects".
class io_thread_t : public object_t, public i_poll_events, public i_thread
class io_thread_t : public object_t, public i_poll_events
{
public:
io_thread_t (class context_t *context_, int thread_slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
~io_thread_t ();
// Launch the physical thread.
void start ();
// Ask underlying thread to stop.
void stop ();
// Wait till undelying thread terminates.
void join ();
// To be called when the whole infrastrucure is being closed (zmq_term).
// It's vital to call the individual commands in this sequence:
// stop, join, shutdown.
void shutdown ();
// Returns signaler associated with this I/O thread.
i_signaler *get_signaler ();
......@@ -62,9 +57,7 @@ namespace zmq
void out_event ();
void timer_event ();
// i_thread implementation.
void attach_session (class session_t *session_);
void detach_session (class session_t *session_);
// ???
struct i_poller *get_poller ();
// Command handlers.
......@@ -75,9 +68,6 @@ namespace zmq
private:
// Clean-up.
~io_thread_t ();
// Poll thread gets notifications about incoming commands using
// this signaler.
fd_signaler_t signaler;
......@@ -87,11 +77,6 @@ namespace zmq
// I/O multiplexing is performed using a poller object.
i_poller *poller;
// Vector of all sessions associated with this app thread.
typedef std::vector <class session_t*> sessions_t;
sessions_t sessions;
};
}
......
......@@ -42,6 +42,10 @@ zmq::kqueue_t::kqueue_t ()
zmq::kqueue_t::~kqueue_t ()
{
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
worker.stop ();
close (kqueue_fd);
}
......@@ -144,11 +148,6 @@ void zmq::kqueue_t::stop ()
stopping = true;
}
void zmq::kqueue_t::join ()
{
worker.stop ();
}
void zmq::kqueue_t::loop ()
{
while (!stopping) {
......
......@@ -42,7 +42,7 @@ namespace zmq
public:
kqueue_t ();
virtual ~kqueue_t ();
~kqueue_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
......@@ -56,7 +56,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
void join ();
private:
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "listener.hpp"
#include "simple_semaphore.hpp"
#include "zmq_tcp_engine.hpp"
#include "io_thread.hpp"
#include "session_stub.hpp"
#include "session.hpp"
#include "err.hpp"
#include "dummy_aggregator.hpp"
#include "dummy_distributor.hpp"
zmq::listener_t::listener_t (io_thread_t *thread_, const char *addr_,
session_t *peer_, bool has_in_, bool has_out_, uint64_t taskset_) :
io_object_t (thread_),
poller (NULL),
addr (addr_),
peer (peer_),
taskset (taskset_),
has_in (has_in_),
has_out (has_out_)
{
}
void zmq::listener_t::terminate ()
{
for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++)
session_stubs [i]->terminate ();
delete this;
}
void zmq::listener_t::shutdown ()
{
for (session_stubs_t::size_type i = 0; i != session_stubs.size (); i++)
session_stubs [i]->shutdown ();
delete this;
}
zmq::listener_t::~listener_t ()
{
}
void zmq::listener_t::got_identity (session_stub_t *session_stub_,
const char *identity_)
{
// Get the engine allready disconnected from the stub and poller.
i_engine *engine = session_stub_->detach_engine ();
zmq_assert (engine);
// Find the corresponding session.
session_t *session;
sessions_t::iterator it = sessions.find (identity_);
// Destroy the stub.
int i = session_stub_->get_index ();
session_stubs [i] = session_stubs [session_stubs.size () - 1];
session_stubs [i]->set_index (i);
session_stubs.pop_back ();
session_stub_->terminate ();
// If there's no session with the specified identity, create one.
if (it != sessions.end ()) {
session = it->second;
session->inc_seqnum ();
}
else {
// Choose an I/O thread with the least load to handle the new session.
io_thread_t *io_thread = choose_io_thread (taskset);
// Create the session and bind it to the I/O thread and peer. Make
// sure that the peer session won't get deallocated till it processes
// the subsequent bind command.
i_mux *mux = new dummy_aggregator_t;
zmq_assert (mux);
i_demux *demux = new dummy_distributor_t;
zmq_assert (demux);
session = new session_t (io_thread, io_thread, mux, demux, false, true);
zmq_assert (session);
session->inc_seqnum ();
session->inc_seqnum ();
peer->inc_seqnum ();
send_reg_and_bind (session, peer, has_in, has_out);
}
// Attach the engine to the session.
send_engine (session, engine);
}
void zmq::listener_t::process_reg (simple_semaphore_t *smph_)
{
zmq_assert (!poller);
poller = get_poller ();
// Open the listening socket.
int rc = tcp_listener.open (addr.c_str ());
zmq_assert (rc == 0);
// Unlock the application thread that created the listener.
if (smph_)
smph_->post ();
// Start polling for incoming connections.
handle = poller->add_fd (tcp_listener.get_fd (), this);
poller->set_pollin (handle);
}
void zmq::listener_t::process_unreg (simple_semaphore_t *smph_)
{
// Disassociate listener from the poller.
zmq_assert (poller);
poller->rm_fd (handle);
poller = NULL;
// Unlock the application thread closing the listener.
if (smph_)
smph_->post ();
}
void zmq::listener_t::in_event ()
{
fd_t fd = tcp_listener.accept ();
// If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd)
return;
// Create an session stub for the engine to take care for it till its
// identity is retreived.
session_stub_t *session_stub = new session_stub_t (this);
zmq_assert (session_stub);
session_stub->set_index (session_stubs.size ());
session_stubs.push_back (session_stub);
// Create an engine to encaspulate the socket. Engine will register itself
// with the stub so the stub will be able to free it in case of shutdown.
zmq_tcp_engine_t *engine = new zmq_tcp_engine_t (fd);
zmq_assert (engine);
engine->attach (poller, session_stub);
}
void zmq::listener_t::out_event ()
{
zmq_assert (false);
}
void zmq::listener_t::timer_event ()
{
zmq_assert (false);
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_LISTENER_HPP_INCLUDED__
#define __ZMQ_LISTENER_HPP_INCLUDED__
#include <map>
#include <vector>
#include <string>
#include "io_object.hpp"
#include "tcp_listener.hpp"
#include "i_poller.hpp"
#include "i_poll_events.hpp"
#include "stdint.hpp"
namespace zmq
{
class listener_t : public io_object_t, public i_poll_events
{
public:
listener_t (class io_thread_t *thread_, const char *addr_,
class session_t *peer_, bool has_in_, bool has_out_,
uint64_t taskset_);
void terminate ();
void shutdown ();
// This function is called by session stub once the identity
// is retrieved from the incoming connection.
void got_identity (class session_stub_t *session_stub_,
const char *identity_);
void process_reg (class simple_semaphore_t *smph_);
void process_unreg (class simple_semaphore_t *smph_);
// i_poll_events implementation.
void in_event ();
void out_event ();
void timer_event ();
private:
~listener_t ();
struct i_poller *poller;
// Handle corresponding to the listening socket.
handle_t handle;
// Actual listening socket.
tcp_listener_t tcp_listener;
// Address to bind to.
std::string addr;
// Peer session. All the newly created connections should bind to
// this session.
session_t *peer;
// Taskset specifies which I/O threads are to be use to handle
// newly created connections (0 = all).
uint64_t taskset;
// Sessions created by this listener are stored in this map. They are
// indexed by peer identities so that the same peer connects to the
// same session after reconnection.
// NB: Sessions are destroyed from other place and possibly later on,
// so no need to care about them during listener object termination.
typedef std::map <std::string, class session_t*> sessions_t;
sessions_t sessions;
// List of engines (bound to temorary session stubs) that we haven't
// retrieved the identity from so far.
typedef std::vector <class session_stub_t*> session_stubs_t;
session_stubs_t session_stubs;
// If true, create inbound pipe when binding new connection
// to the peer.
bool has_in;
// If true, create outbound pipe when binding new connection
// to the peer.
bool has_out;
listener_t (const listener_t&);
void operator = (const listener_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "load_balancer.hpp"
#include "pipe_writer.hpp"
#include "err.hpp"
#include "session.hpp"
#include "msg.hpp"
zmq::load_balancer_t::load_balancer_t () :
session (NULL),
current (0)
{
}
void zmq::load_balancer_t::set_session (session_t *session_)
{
zmq_assert (!session);
session = session_;
}
void zmq::load_balancer_t::shutdown ()
{
// No need to deallocate pipes here. They'll be deallocated during the
// shutdown of the dispatcher.
delete this;
}
void zmq::load_balancer_t::terminate ()
{
// Pipe unregisters itself during the call to terminate, so the pipes
// list shinks by one in each iteration.
while (!pipes.empty ())
pipes [0]->terminate ();
delete this;
}
zmq::load_balancer_t::~load_balancer_t ()
{
}
void zmq::load_balancer_t::attach_pipe (pipe_writer_t *pipe_)
{
// Associate demux with a new pipe.
pipe_->set_demux (this);
pipe_->set_index (pipes.size ());
pipes.push_back (pipe_);
}
void zmq::load_balancer_t::detach_pipe (pipe_writer_t *pipe_)
{
// Release the reference to the pipe.
int index = pipe_->get_index ();
pipe_->set_index (-1);
pipes [index] = pipes.back ();
pipes [index]->set_index (index);
pipes.pop_back ();
}
bool zmq::load_balancer_t::empty ()
{
return pipes.empty ();
}
bool zmq::load_balancer_t::send (zmq_msg *msg_)
{
// If there are no pipes, message cannot be sent.
if (pipes.size () == 0)
return false;
// Find the first pipe that is ready to accept the message.
bool found = false;
for (pipes_t::size_type i = 0; !found && i < pipes.size (); i++) {
// if (pipes [current]->check_write (msg))
found = true;
// else
// current = (current + 1) % pipes.size ();
}
// Oops, no pipe is ready to accept the message.
if (!found)
return false;
// Send the message to the selected pipe.
write_to_pipe (pipes [current], msg_);
current = (current + 1) % pipes.size ();
// Detach the original message from the data buffer.
zmq_msg_init (msg_);
return true;
}
void zmq::load_balancer_t::flush ()
{
// Flush all pipes. If there's large number of pipes, it can be pretty
// inefficient (especially if there's new message only in a single pipe).
// Can it be improved?
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it ++)
(*it)->flush ();
}
void zmq::load_balancer_t::write_to_pipe (class pipe_writer_t *pipe_,
struct zmq_msg *msg_)
{
if (!pipe_->write (msg_)) {
// TODO: Push gap notification to the pipe.
zmq_assert (false);
}
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_LOAD_BALANCER_HPP_INCLUDED__
#define __ZMQ_LOAD_BALANCER_HPP_INCLUDED__
#include <vector>
#include <i_demux.hpp>
namespace zmq
{
// Object to distribute messages to outbound pipes.
class load_balancer_t : public i_demux
{
public:
load_balancer_t ();
// i_demux implementation.
void set_session (class session_t *session_);
void shutdown ();
void terminate ();
void attach_pipe (class pipe_writer_t *pipe_);
void detach_pipe (class pipe_writer_t *pipe_);
bool empty ();
bool send (struct zmq_msg *msg_);
void flush ();
private:
// Clean-up.
~load_balancer_t ();
// Reference to the owner session object.
class session_t *session;
// Writes the message to the pipe if possible. If it isn't, writes
// a gap notification to the pipe.
void write_to_pipe (class pipe_writer_t *pipe_, struct zmq_msg *msg_);
// The list of outbound pipes.
typedef std::vector <class pipe_writer_t*> pipes_t;
pipes_t pipes;
// Current pipe to write next message to.
pipes_t::size_type current;
load_balancer_t (const load_balancer_t&);
void operator = (const load_balancer_t&);
};
}
#endif
......@@ -20,12 +20,8 @@
#include "object.hpp"
#include "context.hpp"
#include "err.hpp"
#include "pipe_reader.hpp"
#include "pipe_writer.hpp"
#include "session.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
#include "i_engine.hpp"
zmq::object_t::object_t (context_t *context_, int thread_slot_) :
context (context_),
......@@ -103,35 +99,6 @@ void zmq::object_t::process_command (command_t &cmd_)
}
}
void zmq::object_t::create_pipe (object_t *reader_parent_,
object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
pipe_reader_t **reader_, pipe_writer_t **writer_)
{
context->create_pipe (reader_parent_, writer_parent_, hwm_, lwm_,
reader_, writer_);
}
void zmq::object_t::destroy_pipe (pipe_t *pipe_)
{
context->destroy_pipe (pipe_);
}
int zmq::object_t::register_inproc_endpoint (const char *endpoint_,
session_t *session_)
{
return context->register_inproc_endpoint (endpoint_, session_);
}
zmq::object_t *zmq::object_t::get_inproc_endpoint (const char *endpoint_)
{
return context->get_inproc_endpoint (endpoint_);
}
void zmq::object_t::unregister_inproc_endpoints (session_t *session_)
{
context->unregister_inproc_endpoints (session_);
}
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
return context->choose_io_thread (taskset_);
......
......@@ -44,14 +44,6 @@ namespace zmq
// Derived object can use following functions to interact with
// global repositories. See context.hpp for function details.
int thread_slot_count ();
void create_pipe (class object_t *reader_parent_,
class object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_,
class pipe_reader_t **reader_, class pipe_writer_t **writer_);
void destroy_pipe (class pipe_t *pipe_);
int register_inproc_endpoint (const char *endpoint_,
class session_t *session_);
class object_t *get_inproc_endpoint (const char *endpoint_);
void unregister_inproc_endpoints (class session_t *session_);
class io_thread_t *choose_io_thread (uint64_t taskset_);
// Derived object can use these functions to send commands
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "p2p.hpp"
#include "app_thread.hpp"
#include "session.hpp"
zmq::p2p_t::p2p_t (app_thread_t *thread_, session_t *session_) :
socket_base_t (thread_, session_)
{
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_P2P_HPP_INCLUDED__
#define __ZMQ_P2P_HPP_INCLUDED__
#include "socket_base.hpp"
namespace zmq
{
class p2p_t : public socket_base_t
{
public:
p2p_t (class app_thread_t *thread_, class session_t *session_);
private:
p2p_t (const p2p_t&);
void operator = (const p2p_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pipe.hpp"
zmq::pipe_t::pipe_t () :
ypipe_t <zmq_msg, false, message_pipe_granularity> (false),
index (-1)
{
}
zmq::pipe_t::~pipe_t ()
{
// Flush any outstanding messages to the pipe.
flush ();
// Deallocate all the messages in the pipe.
zmq_msg msg;
while (read (&msg))
zmq_msg_close (&msg);
}
void zmq::pipe_t::set_index (int index_)
{
index = index_;
}
int zmq::pipe_t::get_index ()
{
return index;
}
......@@ -28,28 +28,9 @@
namespace zmq
{
// Message pipe. A simple wrapper on top of ypipe.
// Message pipe.
class pipe_t : public ypipe_t <zmq_msg, false, message_pipe_granularity>
{
// Context is a friend so that it can create & destroy the pipes.
// By making constructor & destructor private we are sure that nobody
// except context messes with pipes.
friend class context_t;
private:
pipe_t ();
~pipe_t ();
void set_index (int index_);
int get_index ();
// Index of the pipe in context's array of pipes.
int index;
pipe_t (const pipe_t&);
void operator = (const pipe_t&);
};
}
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "pipe_reader.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "i_mux.hpp"
zmq::pipe_reader_t::pipe_reader_t (object_t *parent_, pipe_t *pipe_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
pipe (pipe_),
peer (NULL),
mux (NULL),
index (-1),
hwm (hwm_),
lwm (lwm_),
head (0),
tail (0),
last_sent_head (0)
{
}
void zmq::pipe_reader_t::set_peer (object_t *peer_)
{
peer = peer_;
}
zmq::pipe_reader_t::~pipe_reader_t ()
{
}
void zmq::pipe_reader_t::set_mux (i_mux *mux_)
{
mux = mux_;
}
void zmq::pipe_reader_t::set_index (int index_)
{
index = index_;
}
int zmq::pipe_reader_t::get_index ()
{
return index;
}
void zmq::pipe_reader_t::process_tail (uint64_t bytes_)
{
tail = bytes_;
mux->reactivate (this);
}
bool zmq::pipe_reader_t::read (struct zmq_msg *msg_)
{
// Read a message.
if (!pipe->read (msg_)) {
mux->deactivate (this);
return false;
}
// If successfull, adjust the head of the pipe.
head += zmq_msg_size (msg_);
// If pipe writer wasn't notified about the head position for long enough,
// notify it.
if (head - last_sent_head >= hwm - lwm) {
send_head (peer, head);
last_sent_head = head;
}
if (zmq_msg_type (msg_) == ZMQ_DELIMITER) {
// Detach the pipe from the mux and send termination request to
// the pipe writer.
mux->detach_pipe (this);
mux = NULL;
send_terminate (peer);
return false;
}
return true;
}
void zmq::pipe_reader_t::terminate ()
{
// Detach the pipe from the mux and send termination request to
// the pipe writer.
if (mux) {
mux->detach_pipe (this);
mux = NULL;
}
send_terminate (peer);
}
void zmq::pipe_reader_t::process_terminate_ack ()
{
// Ask context to deallocate the pipe.
destroy_pipe (pipe);
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PIPE_READER_HPP_INCLUDED__
#define __ZMQ_PIPE_READER_HPP_INCLUDED__
#include "object.hpp"
#include "stdint.hpp"
namespace zmq
{
class pipe_reader_t : public object_t
{
// Context is a friend so that it can create & destroy the reader.
// By making constructor & destructor private we are sure that nobody
// except context messes with readers.
friend class context_t;
public:
// Set & get index in the associated mux object.
void set_mux (struct i_mux *mux_);
void set_index (int index_);
int get_index ();
// Reads a message to the underlying pipe.
bool read (struct zmq_msg *msg_);
// Asks pipe to destroy itself.
void terminate ();
private:
pipe_reader_t (class object_t *parent_, class pipe_t *pipe_,
uint64_t hwm_, uint64_t lwm_);
~pipe_reader_t ();
// Second step of reader construction. The parameter cannot be passed
// in constructor as peer object doesn't yet exist at the time.
void set_peer (class object_t *peer_);
void process_tail (uint64_t bytes_);
void process_terminate_ack ();
// The underlying pipe.
class pipe_t *pipe;
// Pipe writer associated with the other side of the pipe.
class object_t *peer;
// Associated mux object.
struct i_mux *mux;
// Index in the associated mux object.
int index;
// High and low watermarks for in-memory storage (in bytes).
uint64_t hwm;
uint64_t lwm;
// Positions of head and tail of the pipe (in bytes).
uint64_t head;
uint64_t tail;
uint64_t last_sent_head;
pipe_reader_t (const pipe_reader_t&);
void operator = (const pipe_reader_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "pipe_writer.hpp"
#include "pipe.hpp"
#include "i_demux.hpp"
zmq::pipe_writer_t::pipe_writer_t (object_t *parent_, pipe_t *pipe_,
object_t *peer_, uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
pipe (pipe_),
peer (peer_),
demux (NULL),
index (-1),
hwm (hwm_),
lwm (lwm_),
head (0),
tail (0)
{
}
zmq::pipe_writer_t::~pipe_writer_t ()
{
}
void zmq::pipe_writer_t::set_demux (i_demux *demux_)
{
demux = demux_;
}
void zmq::pipe_writer_t::set_index (int index_)
{
index = index_;
}
int zmq::pipe_writer_t::get_index ()
{
return index;
}
bool zmq::pipe_writer_t::write (zmq_msg *msg_)
{
size_t msg_size = zmq_msg_size (msg_);
// If message won't fit into the in-memory pipe, there's no way
// to pass it further.
// TODO: It should be discarded and 'oversized' notification should be
// placed into the pipe.
zmq_assert (!hwm || msg_size <= hwm);
// If there's not enough space in the pipe at the moment, return false.
if (hwm && tail + msg_size - head > hwm)
return false;
// Write the message to the pipe and adjust tail position.
pipe->write (*msg_);
flush ();
tail += msg_size;
return true;
}
void zmq::pipe_writer_t::flush ()
{
if (!pipe->flush ())
send_tail (peer, tail);
}
void zmq::pipe_writer_t::process_head (uint64_t bytes_)
{
head = bytes_;
}
void zmq::pipe_writer_t::terminate ()
{
// Disconnect from the associated demux.
if (demux) {
demux->detach_pipe (this);
demux = NULL;
}
// Push the delimiter to the pipe. Delimiter is a notification for pipe
// reader that there will be no more messages in the pipe.
zmq_msg delimiter;
delimiter.content = (zmq_msg_content*) ZMQ_DELIMITER;
delimiter.shared = false;
delimiter.vsm_size = 0;
pipe->write (delimiter);
flush ();
}
void zmq::pipe_writer_t::process_terminate ()
{
// Disconnect from the associated demux.
if (demux) {
demux->detach_pipe (this);
demux = NULL;
}
// Send termination acknowledgement to the pipe reader.
send_terminate_ack (peer);
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PIPE_WRITER_HPP_INCLUDED__
#define __ZMQ_PIPE_WRITER_HPP_INCLUDED__
#include "object.hpp"
#include "stdint.hpp"
namespace zmq
{
class pipe_writer_t : public object_t
{
// Context is a friend so that it can create & destroy the writer.
// By making constructor & destructor private we are sure that nobody
// except context messes with writers.
friend class context_t;
public:
// Set & get index in the associated demux object.
void set_demux (struct i_demux *demux_);
void set_index (int index_);
int get_index ();
// Writes a message to the underlying pipe. Returns false if the
// message cannot be written to the pipe at the moment.
bool write (struct zmq_msg *msg_);
// Flush the messages downsteam.
void flush ();
// Asks pipe to destroy itself.
void terminate ();
private:
pipe_writer_t (class object_t *parent_, class pipe_t *pipe_,
class object_t *peer_, uint64_t hwm_, uint64_t lwm_);
~pipe_writer_t ();
void process_head (uint64_t bytes_);
void process_terminate ();
// The underlying pipe.
class pipe_t *pipe;
// Pipe reader associated with the other side of the pipe.
class object_t *peer;
// Associated demux object.
struct i_demux *demux;
// Index in the associated demux object.
int index;
// High and low watermarks for in-memory storage (in bytes).
uint64_t hwm;
uint64_t lwm;
// Positions of head and tail of the pipe (in bytes).
uint64_t head;
uint64_t tail;
pipe_writer_t (const pipe_writer_t&);
void operator = (const pipe_writer_t&);
};
}
#endif
......@@ -50,6 +50,14 @@ zmq::poll_t::poll_t () :
fd_table [i].index = retired_fd;
}
zmq::poll_t::~poll_t ()
{
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
worker.stop ();
}
zmq::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_)
{
pollfd pfd = {fd_, 0, 0};
......@@ -132,11 +140,6 @@ void zmq::poll_t::stop ()
stopping = true;
}
void zmq::poll_t::join ()
{
worker.stop ();
}
void zmq::poll_t::loop ()
{
while (!stopping) {
......
......@@ -47,7 +47,7 @@ namespace zmq
public:
poll_t ();
virtual ~poll_t () {}
~poll_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
......@@ -61,7 +61,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
void join ();
private:
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "pub.hpp"
#include "app_thread.hpp"
#include "session.hpp"
#include "err.hpp"
zmq::pub_t::pub_t (app_thread_t *thread_, session_t *session_) :
socket_base_t (thread_, session_)
{
disable_in ();
}
int zmq::pub_t::recv (struct zmq_msg *msg_, int flags_)
{
// Publisher socket has no recv function.
errno = ENOTSUP;
return -1;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PUB_HPP_INCLUDED__
#define __ZMQ_PUB_HPP_INCLUDED__
#include "socket_base.hpp"
namespace zmq
{
class pub_t : public socket_base_t
{
public:
pub_t (class app_thread_t *thread_, class session_t *session_);
// i_api overloads.
int recv (struct zmq_msg *msg_, int flags_);
private:
pub_t (const pub_t&);
void operator = (const pub_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "rep.hpp"
#include "app_thread.hpp"
#include "session.hpp"
zmq::rep_t::rep_t (app_thread_t *thread_, session_t *session_) :
socket_base_t (thread_, session_)
{
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_REP_HPP_INCLUDED__
#define __ZMQ_REP_HPP_INCLUDED__
#include "socket_base.hpp"
namespace zmq
{
class rep_t : public socket_base_t
{
public:
rep_t (class app_thread_t *thread_, class session_t *session_);
private:
rep_t (const rep_t&);
void operator = (const rep_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_REQ_HPP_INCLUDED__
#define __ZMQ_REQ_HPP_INCLUDED__
#include "socket_base.hpp"
namespace zmq
{
class req_t : public socket_base_t
{
public:
req_t (class app_thread_t *thread_, class session_t *session_);
private:
req_t (const req_t&);
void operator = (const req_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "safe_object.hpp"
zmq::safe_object_t::safe_object_t (class context_t *context_,
int thread_slot_) :
object_t (context_, thread_slot_),
processed_seqnum (0),
terminating (false)
{
}
zmq::safe_object_t::safe_object_t (object_t *parent_) :
object_t (parent_),
processed_seqnum (0),
terminating (false)
{
}
void zmq::safe_object_t::inc_seqnum ()
{
// This function is called from the sender thread to ensure that this
// object will still exist when the command sent to it arrives in the
// destination thread.
sent_seqnum.add (1);
}
void zmq::safe_object_t::process_command (struct command_t &cmd_)
{
object_t::process_command (cmd_);
// Adjust sequence number of the last processed command.
processed_seqnum++;
// If we are already in the termination phase and all commands sent to
// this object are processed, it's safe to deallocate it.
if (terminating && sent_seqnum.get () == processed_seqnum)
delete this;
}
void zmq::safe_object_t::terminate ()
{
// Wait till all commands sent to this session are processed.
terminating = true;
// If there's no pending command we can deallocate the session
// straight saway.
if (sent_seqnum.get () == processed_seqnum)
delete this;
}
bool zmq::safe_object_t::is_terminating ()
{
return terminating;
}
zmq::safe_object_t::~safe_object_t ()
{
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SAFE_OBJECT_HPP_INCLUDED__
#define __ZMQ_SAFE_OBJECT_HPP_INCLUDED__
#include "object.hpp"
#include "atomic_counter.hpp"
namespace zmq
{
// Same as object_t with the exception of termination mechanism. While
// object_t is destroyed immediately on terminate (assuming that the caller
// have ensured that there are no more commands for the object on the
// fly), safe_object_t switches into termination mode and waits for all
// the on-the-fly commands to be delivered before it deallocates itself.
class safe_object_t : public object_t
{
public:
safe_object_t (class context_t *context_, int thread_slot_);
safe_object_t (object_t *parent_);
void inc_seqnum ();
void process_command (struct command_t &cmd_);
protected:
void terminate ();
bool is_terminating ();
virtual ~safe_object_t ();
private:
// Sequence number of the last command sent to the object and last
// command processed by the object. The former is an atomic counter
// meaning that other threads can increment it safely.
atomic_counter_t sent_seqnum;
uint32_t processed_seqnum;
bool terminating;
safe_object_t (const safe_object_t&);
void operator = (const safe_object_t&);
};
}
#endif
......@@ -51,6 +51,14 @@ zmq::select_t::select_t () :
FD_ZERO (&source_set_err);
}
zmq::select_t::~select_t ()
{
// Make sure there are no fds registered on shutdown.
zmq_assert (load.get () == 0);
worker.stop ();
}
zmq::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_)
{
// Store the file descriptor.
......@@ -156,11 +164,6 @@ void zmq::select_t::stop ()
stopping = true;
}
void zmq::select_t::join ()
{
worker.stop ();
}
void zmq::select_t::loop ()
{
while (!stopping) {
......
......@@ -50,6 +50,7 @@ namespace zmq
public:
select_t ();
~select_t ();
// i_poller implementation.
handle_t add_fd (fd_t fd_, i_poll_events *events_);
......@@ -63,7 +64,6 @@ namespace zmq
int get_load ();
void start ();
void stop ();
void join ();
private:
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "session.hpp"
#include "i_engine.hpp"
#include "i_thread.hpp"
#include "i_mux.hpp"
#include "i_demux.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "pipe_reader.hpp"
#include "pipe_writer.hpp"
#include "simple_semaphore.hpp"
zmq::session_t::session_t (object_t *parent_, i_thread *thread_,
i_mux *mux_, i_demux *demux_,
bool terminate_on_disconnect_, bool terminate_on_no_pipes_) :
safe_object_t (parent_),
mux (mux_),
demux (demux_),
thread (thread_),
engine (NULL),
terminate_on_disconnect (terminate_on_disconnect_),
terminate_on_no_pipes (false),
terminate_on_no_pipes_delayed (terminate_on_no_pipes_),
index (-1)
{
// At least one way to terminate the session should be allowed. Otherwise
// the session can be orphaned forever.
zmq_assert (terminate_on_disconnect || terminate_on_no_pipes_delayed);
// Give the mux and the demux callback pointer to ourselves.
if (mux)
mux->set_session (this);
if (demux)
demux->set_session (this);
}
void zmq::session_t::shutdown ()
{
// Session may live even without an associated engine, thus we have
// to check if for NULL value.
if (engine)
engine->shutdown ();
// Propagate the shutdown signal to both inbound and outbound pipes.
if (mux)
mux->shutdown ();
if (demux)
demux->shutdown ();
delete this;
}
void zmq::session_t::disconnected ()
{
// It's engine who calls this function so there's no need to deallocate
// the engine. Just drop the reference.
engine = NULL;
// Some sessions won't shut down because of disconnect. New engine will
// attached to the session later on.
if (!terminate_on_disconnect)
return;
terminate ();
}
void zmq::session_t::bind (object_t *peer_, bool in_, bool out_)
{
// Create the out pipe (if required).
pipe_reader_t *pipe_reader = NULL;
if (out_) {
pipe_writer_t *pipe_writer;
create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer);
demux->attach_pipe (pipe_writer);
// There's at least one pipe attached. We can deallocate the object
// when there are no pipes (if required).
terminate_on_no_pipes = terminate_on_no_pipes_delayed;
}
// Ask peer to attach to the out pipe (if one exists). If required, ask
// it to create a pipe in opposite direction. It's assumed that peer's
// seqnum was already incremented, so we don't need to care whether it's
// alive at the moment.
if (in_)
inc_seqnum ();
send_bind (peer_, pipe_reader, in_ ? this : NULL);
}
void zmq::session_t::revive ()
{
if (engine)
engine->revive ();
}
void zmq::session_t::terminate ()
{
// Terminate is always called by engine, thus it'll terminate itself,
// we just have to drop the pointer.
engine = NULL;
// Propagate the terminate signal to both inbound and outbound pipes.
if (mux) {
mux->terminate ();
mux = NULL;
}
if (demux) {
demux->terminate ();
demux = NULL;
}
// Session cannot be deallocated at this point. There can still be
// pending commands to process. Unregister session from global
// repository thus ensuring that no new commands will be sent.
unregister_inproc_endpoints (this);
// Move to terminating state.
safe_object_t::terminate ();
}
zmq::session_t::~session_t ()
{
// When session is actually deallocated it unregisters from its thread.
// Unregistration cannot be done earlier as it would result in memory
// leak if global shutdown happens in the middle of session termination.
thread->detach_session (this);
}
void zmq::session_t::set_engine (i_engine *engine_)
{
zmq_assert (!engine || !engine_);
engine = engine_;
}
void zmq::session_t::set_index (int index_)
{
index = index_;
}
int zmq::session_t::get_index ()
{
return index;
}
bool zmq::session_t::write (zmq_msg *msg_)
{
return demux->send (msg_);
}
void zmq::session_t::flush ()
{
demux->flush ();
}
bool zmq::session_t::read (zmq_msg *msg_)
{
bool retrieved = mux->recv (msg_);
if (terminate_on_no_pipes && mux->empty () && demux->empty ()) {
zmq_assert (engine);
engine->schedule_terminate ();
terminate ();
}
return retrieved;
}
void zmq::session_t::process_bind (pipe_reader_t *reader_, session_t *peer_)
{
if (is_terminating ()) {
// If session is already in termination phase, we'll ask newly arrived
// pipe reader & writer to terminate straight away.
if (reader_)
reader_->terminate ();
// Peer session has already incremented its seqnum. We have to send
// a dummy command to avoid a memory leak.
if (peer_)
send_bind (peer_, NULL, NULL);
return;
}
// If inbound pipe is provided, bind it to the mux.
if (reader_) {
mux->attach_pipe (reader_);
// There's at least one pipe attached. We can deallocate the object
// when there are no pipes (if required).
terminate_on_no_pipes = terminate_on_no_pipes_delayed;
}
// If peer wants to get messages from ourselves, we'll bind to it.
if (peer_) {
pipe_reader_t *pipe_reader;
pipe_writer_t *pipe_writer;
create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer);
demux->attach_pipe (pipe_writer);
send_bind (peer_, pipe_reader, NULL);
// There's at least one pipe attached. We can deallocate the object
// when there are no pipes (if required).
terminate_on_no_pipes = terminate_on_no_pipes_delayed;
}
}
void zmq::session_t::process_reg (simple_semaphore_t *smph_)
{
zmq_assert (!is_terminating ());
// Add the session to the list of sessions associated with this I/O thread.
// This way the session will be deallocated on the terminal shutdown.
thread->attach_session (this);
// Release calling thead (if required).
if (smph_)
smph_->post ();
}
void zmq::session_t::process_reg_and_bind (session_t *peer_,
bool flow_in_, bool flow_out_)
{
zmq_assert (!is_terminating ());
// Add the session to the list of sessions associated with this I/O thread.
// This way the session will be deallocated on the terminal shutdown.
thread->attach_session (this);
// Bind to the peer. Note that caller have already incremented command
// sequence number of the peer so we are sure it still exists.
pipe_reader_t *pipe_reader = NULL;
if (flow_out_) {
pipe_writer_t *pipe_writer;
create_pipe (peer_, this, 0, 0, &pipe_reader, &pipe_writer);
demux->attach_pipe (pipe_writer);
// There's at least one pipe attached. We can deallocate the object
// when there are no pipes (if required).
terminate_on_no_pipes = terminate_on_no_pipes_delayed;
}
send_bind (peer_, pipe_reader, flow_in_ ? this : NULL);
}
void zmq::session_t::process_engine (i_engine *engine_)
{
if (is_terminating ()) {
// Kill the engine. It won't be needed anymore.
engine_->terminate ();
return;
}
engine_->attach (thread->get_poller (), this);
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SESSION_HPP_INCLUDED__
#define __ZMQ_SESSION_HPP_INCLUDED__
#include "i_session.hpp"
#include "safe_object.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
namespace zmq
{
// Object that encapsulates both mux and demux.
class session_t : public safe_object_t, public i_session
{
public:
// Creates the session object.
session_t (struct object_t *parent_, struct i_thread *thread_,
struct i_mux *mux_, struct i_demux *demux_,
bool terminate_on_disconnect_, bool terminate_on_no_pipes_);
// i_session implementation
void set_engine (struct i_engine *engine_);
void shutdown ();
bool read (struct zmq_msg *msg_);
bool write (struct zmq_msg *msg_);
void flush ();
// Called by the engine when it is being closed.
void disconnected ();
// Creates a message flow between this session and the peer session.
// If in_ is true, the messages can flow from the peer to ourselves.
// If out_ is true, messages can flow from ourselves to the peer.
// It's assumed that peer's seqnum was already incremented.
void bind (class object_t *peer_, bool in_, bool out_);
// Called by mux if new messages are available.
void revive ();
// Functions to set & retrieve index of this MD in thread's array
// of session objects.
void set_index (int index_);
int get_index ();
private:
// Clean-up.
~session_t ();
// Terminate is private here. It is called by either when disconnected
// or no_pipes event occurs.
void terminate ();
void process_bind (class pipe_reader_t *reader_,
class session_t *peer_);
void process_reg (class simple_semaphore_t *smph_);
void process_reg_and_bind (class session_t *peer_,
bool flow_in_, bool flow_out_);
void process_engine (i_engine *engine_);
struct i_mux *mux;
struct i_demux *demux;
struct i_thread *thread;
struct i_engine *engine;
// If true termination of the session can be triggered by engine
// disconnect/close.
bool terminate_on_disconnect;
// If true termination of the session can be triggered when the last
// pipe detaches from it.
bool terminate_on_no_pipes;
// If true, terminate_on_no_pipes should be set when at least one
// pipe was bound.
bool terminate_on_no_pipes_delayed;
// Index in thread's session array.
int index;
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include "../include/zmq.h"
#include "session_stub.hpp"
#include "i_engine.hpp"
#include "listener.hpp"
#include "err.hpp"
zmq::session_stub_t::session_stub_t (listener_t *listener_) :
state (reading_identity),
engine (NULL),
listener (listener_),
index (-1)
{
}
void zmq::session_stub_t::terminate ()
{
if (engine)
engine->terminate ();
delete this;
}
void zmq::session_stub_t::shutdown ()
{
if (engine)
engine->shutdown ();
delete this;
}
zmq::session_stub_t::~session_stub_t ()
{
}
void zmq::session_stub_t::set_engine (i_engine *engine_)
{
zmq_assert (!engine_ || !engine);
engine = engine_;
}
bool zmq::session_stub_t::read (struct zmq_msg *msg_)
{
// No messages are sent to the connecting peer.
return false;
}
bool zmq::session_stub_t::write (struct zmq_msg *msg_)
{
// The first message arrived is the connection identity.
if (state == reading_identity) {
identity = std::string ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
state = has_identity;
return true;
}
// We are not interested in any subsequent messages.
return false;
}
void zmq::session_stub_t::flush ()
{
// We have the identity. At this point we can find the correct session and
// attach it to the connection.
if (state == has_identity) {
// At this point the stub will be deleted. Return immediately without
// touching 'this' pointer.
listener->got_identity (this, identity.c_str ());
return;
}
}
zmq::i_engine *zmq::session_stub_t::detach_engine ()
{
// Ask engine to unregister from the poller.
i_engine *e = engine;
engine->detach ();
return e;
}
void zmq::session_stub_t::set_index (int index_)
{
index = index_;
}
int zmq::session_stub_t::get_index ()
{
return index;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SESSION_STUB_HPP_INCLUDED__
#define __ZMQ_SESSION_STUB_HPP_INCLUDED__
#include <string>
#include "i_session.hpp"
namespace zmq
{
// This class is used instead of regular session till the identity of
// incomming connection is established and connection is attached
// to corresponding session.
class session_stub_t : public i_session
{
public:
session_stub_t (class listener_t *listener_);
// i_session implementation.
void set_engine (struct i_engine *engine_);
void terminate ();
void shutdown ();
bool read (struct zmq_msg *msg_);
bool write (struct zmq_msg *msg_);
void flush ();
// Detaches engine from the stub. Returns it to the caller.
struct i_engine *detach_engine ();
// Manipulate stubs's index in listener's array of stubs.
void set_index (int index_);
int get_index ();
private:
// Clean-up.
virtual ~session_stub_t ();
enum {
reading_identity,
has_identity
} state;
// Reference to the associated engine.
i_engine *engine;
// Reference to the listener object that owns this stub.
class listener_t *listener;
// Index of the stub in listener's array of stubs.
int index;
// Identity of the connection.
std::string identity;
session_stub_t (const session_stub_t&);
void operator = (const session_stub_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include "../include/zmq.h"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "err.hpp"
#include "listener.hpp"
#include "connecter.hpp"
#include "simple_semaphore.hpp"
#include "io_thread.hpp"
#include "io_object.hpp"
#include "session.hpp"
#include "dummy_aggregator.hpp"
#include "dummy_distributor.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *thread_, session_t *session_) :
object_t (thread_),
thread (thread_),
session (session_),
has_in (true),
has_out (true)
{
session->set_engine (this);
}
void zmq::socket_base_t::shutdown ()
{
// Destroy all the I/O objects created from this socket.
for (io_objects_t::size_type i = 0; i != io_objects.size (); i++)
io_objects [i]->shutdown ();
delete this;
}
void zmq::socket_base_t::schedule_terminate ()
{
// Terminate is never scheduled on socket engines.
zmq_assert (false);
}
void zmq::socket_base_t::terminate ()
{
// Destroy all the I/O objects created from this socket.
// First unregister the object from I/O thread, then terminate it in
// this application thread.
simple_semaphore_t smph;
for (io_objects_t::size_type i = 0; i != io_objects.size (); i++) {
send_unreg (io_objects [i], &smph);
smph.wait ();
io_objects [i]->terminate ();
}
zmq_assert (session);
session->disconnected ();
delete this;
}
zmq::socket_base_t::~socket_base_t ()
{
}
void zmq::socket_base_t::disable_in ()
{
has_in = false;
}
void zmq::socket_base_t::disable_out ()
{
has_out = false;
}
int zmq::socket_base_t::bind (const char *addr_, zmq_opts *opts_)
{
thread->process_commands (false);
std::string addr (addr_);
std::string::size_type pos = addr.find ("://");
if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") {
// Choose the I/O thread with the least load, create the listener.
// Note that same taskset is used to choose the I/O thread to handle
// the listening socket and newly created connections.
// Note that has_in and has_out are twisted at this place - listener
// is going to create peer objects, so the message flows are viewed
// from the opposite direction.
io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0);
listener_t *listener = new listener_t (io_thread, addr_, session,
has_out, has_in, opts_ ? opts_->taskset : 0);
// Ask it to start interacting with the I/O thread.
simple_semaphore_t smph;
send_reg (listener, &smph);
// Store the reference to the listener so that it can be terminated
// when the socket is closed.
io_objects.push_back (listener);
// Wait while listener is actually registered with the I/O thread.
smph.wait ();
return 0;
}
else if (addr.substr (0, pos) == "inproc") {
// For inproc transport the only thing we have to do is to register
// this socket as an inproc endpoint with the supplied name.
return register_inproc_endpoint (addr.substr (pos + 3).c_str (),
session);
}
else {
// Unknown protocol requested.
errno = EINVAL;
return -1;
}
}
int zmq::socket_base_t::connect (const char *addr_, zmq_opts *opts_)
{
thread->process_commands (false);
std::string addr (addr_);
std::string::size_type pos = addr.find ("://");
if (pos == std::string::npos || addr.substr (0, pos) == "zmq.tcp") {
// Choose the I/O thread with the least load, create the connecter and
// session.
io_thread_t *io_thread = choose_io_thread (opts_ ? opts_->taskset : 0);
i_mux *mux = new dummy_aggregator_t;
zmq_assert (mux);
i_demux *demux = new dummy_distributor_t;
zmq_assert (demux);
session_t *peer = new session_t (io_thread, io_thread, mux, demux,
false, true);
zmq_assert (peer);
connecter_t *connecter = new connecter_t (io_thread, addr_, peer);
zmq_assert (connecter);
// Increment session's command sequence number so that it won't get
// deallocated till the subsequent bind command arrives.
peer->inc_seqnum ();
// Register the connecter (and session) with its I/O thread.
simple_semaphore_t smph;
send_reg (connecter, &smph);
// Store the reference to the connecter so that it can be terminated
// when the socket is closed.
io_objects.push_back (connecter);
// Wait till registration succeeds.
smph.wait ();
// Bind local session with the connecter's session so that messages
// can flow in both directions.
session->bind (peer, has_in, has_out);
return 0;
}
else if (addr.substr (0, pos) == "inproc") {
// Get the MD responsible for the address. In case of invalid address
// return error.
object_t *peer = get_inproc_endpoint (addr.substr (pos + 3).c_str ());
if (!peer) {
errno = EADDRNOTAVAIL;
return -1;
}
// Create bidirectional message pipes between this session and
// the peer session.
session->bind (peer, has_in, has_out);
return 0;
}
else {
// Unknown protocol requested.
errno = EINVAL;
return -1;
}
}
int zmq::socket_base_t::subscribe (const char *criteria_)
{
// No implementation at the moment...
errno = ENOTSUP;
return -1;
}
int zmq::socket_base_t::send (zmq_msg *msg_, int flags_)
{
thread->process_commands (false);
while (true) {
if (session->write (msg_))
return 0;
if (flags_ & ZMQ_NOBLOCK) {
errno = EAGAIN;
return -1;
}
thread->process_commands (true);
}
}
int zmq::socket_base_t::flush ()
{
thread->process_commands (false);
session->flush ();
return 0;
}
int zmq::socket_base_t::recv (zmq_msg *msg_, int flags_)
{
thread->process_commands (false);
while (true) {
if (session->read (msg_))
return 0;
if (flags_ & ZMQ_NOBLOCK) {
errno = EAGAIN;
return -1;
}
thread->process_commands (true);
}
}
int zmq::socket_base_t::close ()
{
terminate ();
return 0;
}
void zmq::socket_base_t::attach (struct i_poller *poller_, i_session *session_)
{
zmq_assert (false);
}
void zmq::socket_base_t::detach ()
{
zmq_assert (false);
}
void zmq::socket_base_t::revive ()
{
// We can ignore the event safely here.
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#include <vector>
#include "i_engine.hpp"
#include "i_api.hpp"
#include "object.hpp"
namespace zmq
{
class socket_base_t : public object_t, public i_engine, public i_api
{
public:
// TODO: Possibly, session can be attached to the engine using
// attach function.
socket_base_t (class app_thread_t *thread_, class session_t *session_);
// i_engine interface implementation.
void attach (struct i_poller *poller_, struct i_session *session_);
void detach ();
void revive ();
void schedule_terminate ();
void terminate ();
void shutdown ();
// i_api interface implementation.
int bind (const char *addr_, struct zmq_opts *opts_);
int connect (const char *addr_, struct zmq_opts *opts_);
int subscribe (const char *criteria_);
int send (struct zmq_msg *msg_, int flags_);
int flush ();
int recv (struct zmq_msg *msg_, int flags_);
int close ();
protected:
// Clean-up. The function has to be protected rather than private,
// otherwise auto-generated destructor in derived classes
// cannot be compiled. It has to be virtual so that socket_base_t's
// terminate & shutdown functions deallocate correct type of object.
virtual ~socket_base_t ();
// By default, socket is able to pass messages in both inward and
// outward directions. By calling these functions, particular
// socket type is able to eliminate one direction.
void disable_in ();
void disable_out ();
private:
// Pointer to the application thread the socket belongs to.
class app_thread_t *thread;
// Pointer to the associated session object.
class session_t *session;
// List of I/O object created via this socket. These have to be shut
// down when the socket is closed.
typedef std::vector <class io_object_t*> io_objects_t;
io_objects_t io_objects;
// If true, socket creates inbound pipe when binding to an engine.
bool has_in;
// If true, socket creates outbound pipe when binding to an engine.
bool has_out;
socket_base_t (const socket_base_t&);
void operator = (const socket_base_t&);
};
}
#endif
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "sub.hpp"
#include "app_thread.hpp"
#include "session.hpp"
#include "err.hpp"
zmq::sub_t::sub_t (app_thread_t *thread_, session_t *session_) :
socket_base_t (thread_, session_)
{
disable_out ();
}
int zmq::sub_t::send (struct zmq_msg *msg_, int flags_)
{
// Subscriber socket has no send function.
errno = ENOTSUP;
return -1;
}
int zmq::sub_t::flush ()
{
// Subscriber socket has no flush function.
errno = ENOTSUP;
return -1;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SUB_HPP_INCLUDED__
#define __ZMQ_SUB_HPP_INCLUDED__
#include "socket_base.hpp"
namespace zmq
{
class sub_t : public socket_base_t
{
public:
sub_t (class app_thread_t *thread_, class session_t *session_);
// i_api overloads.
int send (struct zmq_msg *msg_, int flags_);
int flush ();
private:
sub_t (const sub_t&);
void operator = (const sub_t&);
};
}
#endif
......@@ -176,7 +176,7 @@ void *zmq_init (int app_threads_, int io_threads_)
int zmq_term (void *context_)
{
((zmq::context_t*) context_)->shutdown ();
delete (zmq::context_t*) context_;
return 0;
}
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "zmq_decoder.hpp"
#include "i_session.hpp"
#include "wire.hpp"
zmq::zmq_decoder_t::zmq_decoder_t () :
destination (NULL)
{
zmq_msg_init (&in_progress);
// At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
}
zmq::zmq_decoder_t::~zmq_decoder_t ()
{
zmq_msg_close (&in_progress);
}
void zmq::zmq_decoder_t::set_session (i_session *destination_)
{
destination = destination_;
}
bool zmq::zmq_decoder_t::one_byte_size_ready ()
{
// First byte of size is read. If it is 0xff read 8-byte size.
// Otherwise allocate the buffer for message data and read the
// message data into it.
if (*tmpbuf == 0xff)
next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready);
else {
zmq_msg_init_size (&in_progress, *tmpbuf);
next_step (zmq_msg_data (&in_progress), *tmpbuf,
&zmq_decoder_t::message_ready);
}
return true;
}
bool zmq::zmq_decoder_t::eight_byte_size_ready ()
{
// 8-byte size is read. Allocate the buffer for message body and
// read the message data into it.
size_t size = (size_t) get_uint64 (tmpbuf);
zmq_msg_init_size (&in_progress, size);
next_step (zmq_msg_data (&in_progress), size,
&zmq_decoder_t::message_ready);
return true;
}
bool zmq::zmq_decoder_t::message_ready ()
{
// Message is completely read. Push it further and start reading
// new message.
if (!destination->write (&in_progress))
return false;
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
return true;
}
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
#include "../include/zmq.h"
#include "decoder.hpp"
namespace zmq
{
// Decoder for 0MQ backend protocol. Converts data batches into messages.
class zmq_decoder_t : public decoder_t <zmq_decoder_t>
{
public:
zmq_decoder_t ();
~zmq_decoder_t ();
void set_session (struct i_session *destination_);
private:
bool one_byte_size_ready ();
bool eight_byte_size_ready ();
bool message_ready ();
struct i_session *destination;
unsigned char tmpbuf [8];
::zmq_msg in_progress;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
};
}
#endif
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment