Commit 05d90849 authored by Martin Sustrik's avatar Martin Sustrik

WIP: Socket migration between threads, new zmq_close() semantics

Sockets may now be migrated between OS threads; sockets may not be used by
more than one thread at any time. To migrate a socket to another thread the
caller must ensure that a full memory barrier is called before using the
socket from the target thread.

The new zmq_close() semantics implement the behaviour discussed at:

http://lists.zeromq.org/pipermail/zeromq-dev/2010-July/004244.html

Specifically, zmq_close() is now deterministic and while it still returns
immediately, it does not discard any data that may still be queued for
sending. Further, zmq_term() will now block until all outstanding data has
been sent.

TODO: Many bugs have been introduced, needs testing. Further, SO_LINGER or
an equivalent mechanism (possibly a configurable timeout to zmq_term())
needs to be implemented.
parent b7e0fa97
......@@ -82,7 +82,6 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
#endif
/* Native 0MQ error codes. */
#define EMTHREAD (ZMQ_HAUSNUMERO + 50)
#define EFSM (ZMQ_HAUSNUMERO + 51)
#define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)
#define ETERM (ZMQ_HAUSNUMERO + 53)
......@@ -161,12 +160,13 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_XREP 6
#define ZMQ_PULL 7
#define ZMQ_PUSH 8
#define ZMQ_UPSTREAM ZMQ_PULL /* Old alias, remove in 3.x */
#define ZMQ_DOWNSTREAM ZMQ_PUSH /* Old alias, remove in 3.x */
/* Deprecated aliases, to be removed in release 3.x */
#define ZMQ_UPSTREAM ZMQ_PULL
#define ZMQ_DOWNSTREAM ZMQ_PUSH
/* Socket options. */
#define ZMQ_HWM 1
/* ZMQ_LWM 2 no longer supported */
#define ZMQ_SWAP 3
#define ZMQ_AFFINITY 4
#define ZMQ_IDENTITY 5
......@@ -178,6 +178,8 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_SNDBUF 11
#define ZMQ_RCVBUF 12
#define ZMQ_RCVMORE 13
#define ZMQ_FD 14
#define ZMQ_EVENTS 15
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
......@@ -217,17 +219,15 @@ typedef struct
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
/* Devices */
/* Devices - Experimental. */
/******************************************************************************/
#define ZMQ_QUEUE 1
#define ZMQ_STREAMER 1
#define ZMQ_FORWARDER 2
#define ZMQ_STREAMER 3
#define ZMQ_QUEUE 3
ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket);
#undef ZMQ_EXPORT
#ifdef __cplusplus
}
#endif
......
......@@ -49,7 +49,7 @@ endif
nodist_libzmq_la_SOURCES = $(pgm_sources)
libzmq_la_SOURCES = app_thread.hpp \
libzmq_la_SOURCES = \
atomic_counter.hpp \
atomic_ptr.hpp \
blob.hpp \
......@@ -58,7 +58,6 @@ libzmq_la_SOURCES = app_thread.hpp \
ctx.hpp \
decoder.hpp \
devpoll.hpp \
push.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
......@@ -69,7 +68,6 @@ libzmq_la_SOURCES = app_thread.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
i_endpoint.hpp \
i_engine.hpp \
i_poll_events.hpp \
kqueue.hpp \
......@@ -91,10 +89,13 @@ libzmq_la_SOURCES = app_thread.hpp \
pair.hpp \
prefix_tree.hpp \
pub.hpp \
pull.hpp \
push.hpp \
queue.hpp \
rep.hpp \
req.hpp \
select.hpp \
semaphore.hpp \
session.hpp \
signaler.hpp \
socket_base.hpp \
......@@ -105,7 +106,6 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
pull.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
......@@ -121,11 +121,9 @@ libzmq_la_SOURCES = app_thread.hpp \
zmq_engine.hpp \
zmq_init.hpp \
zmq_listener.hpp \
app_thread.cpp \
command.cpp \
ctx.cpp \
devpoll.cpp \
push.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
......@@ -139,13 +137,15 @@ libzmq_la_SOURCES = app_thread.hpp \
object.cpp \
options.cpp \
owned.cpp \
pair.cpp \
pgm_receiver.cpp \
pgm_sender.cpp \
pgm_socket.cpp \
pair.cpp \
prefix_tree.cpp \
pipe.cpp \
poll.cpp \
pull.cpp \
push.cpp \
pub.cpp \
queue.cpp \
rep.cpp \
......@@ -160,7 +160,6 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
pull.cpp \
uuid.cpp \
xrep.cpp \
xreq.cpp \
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <new>
#include <algorithm>
#include "../include/zmq.h"
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#if defined _MSC_VER
#include <intrin.h>
#endif
#else
#include <unistd.h>
#endif
#include "app_thread.hpp"
#include "ctx.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "xreq.hpp"
#include "xrep.hpp"
#include "pull.hpp"
#include "push.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
// system with x86 architecture and gcc or MSVC compiler.
#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\
(defined _MSC_VER && (defined _M_IX86 || defined _M_X64))
#define ZMQ_DELAY_COMMANDS
#endif
zmq::app_thread_t::app_thread_t (ctx_t *ctx_,
uint32_t thread_slot_) :
object_t (ctx_, thread_slot_),
last_processing_time (0),
terminated (false)
{
}
zmq::app_thread_t::~app_thread_t ()
{
zmq_assert (sockets.empty ());
}
void zmq::app_thread_t::stop ()
{
send_stop ();
}
zmq::signaler_t *zmq::app_thread_t::get_signaler ()
{
return &signaler;
}
bool zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
bool received;
command_t cmd;
if (block_) {
received = signaler.recv (&cmd, true);
zmq_assert (received);
}
else {
#if defined ZMQ_DELAY_COMMANDS
// Optimised version of command processing - it doesn't have to check
// for incoming commands each time. It does so only if certain time
// elapsed since last command processing. Command delay varies
// depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
// etc. The optimisation makes sense only on platforms where getting
// a timestamp is a very cheap operation (tens of nanoseconds).
if (throttle_) {
// Get timestamp counter.
#if defined __GNUC__
uint32_t low;
uint32_t high;
__asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
uint64_t current_time = (uint64_t) high << 32 | low;
#elif defined _MSC_VER
uint64_t current_time = __rdtsc ();
#else
#error
#endif
// Check whether certain time have elapsed since last command
// processing.
if (current_time - last_processing_time <= max_command_delay)
return !terminated;
last_processing_time = current_time;
}
#endif
// Check whether there are any commands pending for this thread.
received = signaler.recv (&cmd, false);
}
// Process all the commands available at the moment.
while (received) {
cmd.destination->process_command (cmd);
received = signaler.recv (&cmd, false);
}
return !terminated;
}
zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
socket_base_t *s = NULL;
switch (type_) {
case ZMQ_PAIR:
s = new (std::nothrow) pair_t (this);
break;
case ZMQ_PUB:
s = new (std::nothrow) pub_t (this);
break;
case ZMQ_SUB:
s = new (std::nothrow) sub_t (this);
break;
case ZMQ_REQ:
s = new (std::nothrow) req_t (this);
break;
case ZMQ_REP:
s = new (std::nothrow) rep_t (this);
break;
case ZMQ_XREQ:
s = new (std::nothrow) xreq_t (this);
break;
case ZMQ_XREP:
s = new (std::nothrow) xrep_t (this);
break;
case ZMQ_PULL:
s = new (std::nothrow) pull_t (this);
break;
case ZMQ_PUSH:
s = new (std::nothrow) push_t (this);
break;
default:
if (sockets.empty ())
get_ctx ()->no_sockets (this);
errno = EINVAL;
return NULL;
}
zmq_assert (s);
sockets.push_back (s);
return s;
}
void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
{
sockets.erase (socket_);
if (sockets.empty ())
get_ctx ()->no_sockets (this);
}
void zmq::app_thread_t::process_stop ()
{
terminated = true;
}
bool zmq::app_thread_t::is_terminated ()
{
return terminated;
}
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_APP_THREAD_HPP_INCLUDED__
#define __ZMQ_APP_THREAD_HPP_INCLUDED__
#include <vector>
#include "stdint.hpp"
#include "object.hpp"
#include "yarray.hpp"
#include "signaler.hpp"
namespace zmq
{
class app_thread_t : public object_t
{
public:
app_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
~app_thread_t ();
// Interrupt blocking call if the app thread is stuck in one.
// This function is is called from a different thread!
void stop ();
// Returns signaler associated with this application thread.
signaler_t *get_signaler ();
// Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once
// in a predefined time period. The function returns false is the
// associated context was terminated, true otherwise.
bool process_commands (bool block_, bool throttle_);
// Create a socket of a specified type.
class socket_base_t *create_socket (int type_);
// Unregister the socket from the app_thread (called by socket itself).
void remove_socket (class socket_base_t *socket_);
// Returns true is the associated context was already terminated.
bool is_terminated ();
private:
// Command handlers.
void process_stop ();
// All the sockets created from this application thread.
typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets;
// App thread's signaler object.
signaler_t signaler;
// Timestamp of when commands were processed the last time.
uint64_t last_processing_time;
// If true, 'stop' command was already received.
bool terminated;
app_thread_t (const app_thread_t&);
void operator = (const app_thread_t&);
};
}
#endif
......@@ -27,9 +27,8 @@ namespace zmq
enum
{
// Maximal number of OS threads that can own 0MQ sockets
// at the same time.
max_app_threads = 512,
// Maximum number of sockets that can be opened at the same time.
max_sockets = 512,
// Number of new messages in message pipe needed to trigger new memory
// allocation. Setting this parameter to 256 decreases the impact of
......
......@@ -24,7 +24,6 @@
#include "ctx.hpp"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
#include "platform.hpp"
#include "err.hpp"
......@@ -32,11 +31,12 @@
#if defined ZMQ_HAVE_WINDOWS
#include "windows.h"
#else
#include "unistd.h"
#endif
zmq::ctx_t::ctx_t (uint32_t io_threads_) :
sockets (0),
terminated (false)
no_sockets_notify (false)
{
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
......@@ -50,44 +50,32 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
#endif
// Initialise the array of signalers.
signalers_count = max_app_threads + io_threads_;
signalers = (signaler_t**) malloc (sizeof (signaler_t*) * signalers_count);
zmq_assert (signalers);
memset (signalers, 0, sizeof (signaler_t*) * signalers_count);
slot_count = max_sockets + io_threads_;
slots = (signaler_t**) malloc (sizeof (signaler_t*) * slot_count);
zmq_assert (slots);
// Create I/O thread objects and launch them.
for (uint32_t i = 0; i != io_threads_; i++) {
io_thread_t *io_thread = new (std::nothrow) io_thread_t (this, i);
zmq_assert (io_thread);
io_threads.push_back (io_thread);
signalers [i] = io_thread->get_signaler ();
slots [i] = io_thread->get_signaler ();
io_thread->start ();
}
}
int zmq::ctx_t::term ()
{
// First send stop command to application threads so that any
// blocking calls are interrupted.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
app_threads [i].app_thread->stop ();
// Then mark context as terminated.
term_sync.lock ();
zmq_assert (!terminated);
terminated = true;
bool destroy = (sockets == 0);
term_sync.unlock ();
// If there are no sockets open, destroy the context immediately.
if (destroy)
delete this;
return 0;
// In the unused part of the slot array, create a list of empty slots.
for (uint32_t i = slot_count - 1; i >= io_threads_; i--) {
empty_slots.push_back (i);
slots [i] = NULL;
}
}
zmq::ctx_t::~ctx_t ()
{
// Check that there are no remaining open or zombie sockets.
zmq_assert (sockets.empty ());
zmq_assert (zombies.empty ());
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
// thread subsequent invocation of destructor would hang-up.
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
......@@ -97,18 +85,10 @@ zmq::ctx_t::~ctx_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
// Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
delete app_threads [i].app_thread;
// Deallocate all the orphaned pipes.
while (!pipes.empty ())
delete *pipes.begin ();
// Deallocate the array of pointers to signalers. No special work is
// Deallocate the array of slot. No special work is
// needed as signalers themselves were deallocated with their
// corresponding (app_/io_) thread objects.
free (signalers);
// corresponding io_thread/socket objects.
free (slots);
#ifdef ZMQ_HAVE_WINDOWS
// On Windows, uninitialise socket layer.
......@@ -117,110 +97,113 @@ zmq::ctx_t::~ctx_t ()
#endif
}
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
int zmq::ctx_t::term ()
{
app_threads_sync.lock ();
// Find whether the calling thread has app_thread_t object associated
// already. At the same time find an unused app_thread_t so that it can
// be used if there's no associated object for the calling thread.
// Check whether thread ID is already assigned. If so, return it.
app_threads_t::size_type unused = app_threads.size ();
app_threads_t::size_type current;
for (current = 0; current != app_threads.size (); current++) {
if (app_threads [current].associated &&
thread_t::equal (thread_t::id (), app_threads [current].tid))
break;
if (!app_threads [current].associated)
unused = current;
// First send stop command to sockets so that any
// blocking calls are interrupted.
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
sockets [i]->stop ();
// Find out whether there are any open sockets to care about.
// If so, sleep till they are closed. Note that we can use
// no_sockets_notify safely out of the critical section as once set
// its value is never changed again.
slot_sync.lock ();
if (!sockets.empty ())
no_sockets_notify = true;
slot_sync.unlock ();
if (no_sockets_notify)
no_sockets_sync.wait ();
// At this point there's only one application thread (this one) remaining.
// We don't even have to synchronise access to data.
zmq_assert (sockets.empty ());
// Get rid of remaining zombie sockets.
while (!zombies.empty ()) {
dezombify ();
// Sleep for 1ms not to end up busy-looping in the case the I/O threads
// are still busy sending data. We can possibly add a grand poll here
// (polling for fds associated with all the zombie sockets), but it's
// probably not worth of implementing it.
#if defined ZMQ_HAVE_WINDOWS
Sleep (1);
#else
usleep (1000);
#endif
}
// If no app_thread_t is associated with the calling thread,
// associate it with one of the unused app_thread_t objects.
if (current == app_threads.size ()) {
// If all the existing app_threads are already used, create one more.
if (unused == app_threads.size ()) {
// Deallocate the resources.
delete this;
// If max_app_threads limit was reached, return error.
if (app_threads.size () == max_app_threads) {
app_threads_sync.unlock ();
errno = EMTHREAD;
return NULL;
}
return 0;
}
// Create the new application thread proxy object.
app_thread_info_t info;
memset (&info, 0, sizeof (info));
info.associated = false;
info.app_thread = new (std::nothrow) app_thread_t (this,
io_threads.size () + app_threads.size ());
zmq_assert (info.app_thread);
signalers [io_threads.size () + app_threads.size ()] =
info.app_thread->get_signaler ();
app_threads.push_back (info);
}
zmq::socket_base_t *zmq::ctx_t::create_socket (int type_)
{
slot_sync.lock ();
// Incidentally, this works both when there is an unused app_thread
// and when a new one is created.
current = unused;
// Free the slots, if possible.
dezombify ();
// Associate the selected app_thread with the OS thread.
app_threads [current].associated = true;
app_threads [current].tid = thread_t::id ();
// If max_sockets limit was reached, return error.
if (empty_slots.empty ()) {
slot_sync.unlock ();
errno = EMFILE;
return NULL;
}
app_thread_t *thread = app_threads [current].app_thread;
app_threads_sync.unlock ();
// Choose a slot for the socket.
uint32_t slot = empty_slots.back ();
empty_slots.pop_back ();
socket_base_t *s = thread->create_socket (type_);
if (!s)
// Create the socket and register its signaler.
socket_base_t *s = socket_base_t::create (type_, this, slot);
if (!s) {
empty_slots.push_back (slot);
slot_sync.unlock ();
return NULL;
}
sockets.push_back (s);
slots [slot] = s->get_signaler ();
term_sync.lock ();
sockets++;
term_sync.unlock ();
slot_sync.unlock ();
return s;
}
void zmq::ctx_t::destroy_socket ()
void zmq::ctx_t::zombify (socket_base_t *socket_)
{
// If zmq_term was already called and there are no more sockets,
// terminate the whole 0MQ infrastructure.
term_sync.lock ();
zmq_assert (sockets > 0);
sockets--;
bool destroy = (sockets == 0 && terminated);
term_sync.unlock ();
if (destroy)
delete this;
}
// Zombification of socket basically means that its ownership is tranferred
// from the application that created it to the context.
void zmq::ctx_t::no_sockets (app_thread_t *thread_)
{
app_threads_sync.lock ();
app_threads_t::size_type i;
for (i = 0; i != app_threads.size (); i++)
if (app_threads [i].app_thread == thread_) {
app_threads [i].associated = false;
break;
}
zmq_assert (i != app_threads.size ());
app_threads_sync.unlock ();
// Note that the lock provides the memory barrier needed to migrate
// zombie-to-be socket from it's native thread to shared data area
// synchronised by slot_sync.
slot_sync.lock ();
sockets.erase (socket_);
zombies.push_back (socket_);
// Try to get rid of at least some zombie sockets at this point.
dezombify ();
// If shutdown thread is interested in notification about no more
// open sockets, notify it now.
if (sockets.empty () && no_sockets_notify)
no_sockets_sync.post ();
slot_sync.unlock ();
}
void zmq::ctx_t::send_command (uint32_t destination_,
const command_t &command_)
void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
{
signalers [destination_]->send (command_);
slots [slot_]->send (command_);
}
bool zmq::ctx_t::recv_command (uint32_t thread_slot_,
command_t *command_, bool block_)
bool zmq::ctx_t::recv_command (uint32_t slot_, command_t *command_, bool block_)
{
return signalers [thread_slot_]->recv (command_, block_);
return slots [slot_]->recv (command_, block_);
}
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
......@@ -242,22 +225,6 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
return io_threads [result];
}
void zmq::ctx_t::register_pipe (class pipe_t *pipe_)
{
pipes_sync.lock ();
bool inserted = pipes.insert (pipe_).second;
zmq_assert (inserted);
pipes_sync.unlock ();
}
void zmq::ctx_t::unregister_pipe (class pipe_t *pipe_)
{
pipes_sync.lock ();
pipes_t::size_type erased = pipes.erase (pipe_);
zmq_assert (erased == 1);
pipes_sync.unlock ();
}
int zmq::ctx_t::register_endpoint (const char *addr_,
socket_base_t *socket_)
{
......@@ -315,3 +282,15 @@ zmq::socket_base_t *zmq::ctx_t::find_endpoint (const char *addr_)
return endpoint;
}
void zmq::ctx_t::dezombify ()
{
// Try to dezombify each zombie in the list.
for (zombies_t::size_type i = 0; i != zombies.size ();)
if (zombies [i]->dezombify ()) {
empty_slots.push_back (zombies [i]->get_slot ());
zombies.erase (zombies [i]);
}
else
i++;
}
......@@ -26,7 +26,9 @@
#include <string>
#include "signaler.hpp"
#include "semaphore.hpp"
#include "ypipe.hpp"
#include "yarray.hpp"
#include "config.hpp"
#include "mutex.hpp"
#include "stdint.hpp"
......@@ -55,29 +57,19 @@ namespace zmq
// Create a socket.
class socket_base_t *create_socket (int type_);
// Destroy a socket.
void destroy_socket ();
// Make socket a zombie.
void zombify (socket_base_t *socket_);
// Called by app_thread_t when it has no more sockets. The function
// should disassociate the object from the current OS thread.
void no_sockets (class app_thread_t *thread_);
// Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_);
// Send command to the destination thread.
void send_command (uint32_t destination_, const command_t &command_);
// Receive command from another thread.
bool recv_command (uint32_t thread_slot_, command_t *command_,
bool block_);
// Receive command from the source slot.
bool recv_command (uint32_t slot_, command_t *command_, bool block_);
// Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
// All pipes are registered with the context so that even the
// orphaned pipes can be deallocated on the terminal shutdown.
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
// Management of inproc endpoints.
int register_endpoint (const char *addr_, class socket_base_t *socket_);
void unregister_endpoints (class socket_base_t *socket_);
......@@ -87,57 +79,45 @@ namespace zmq
~ctx_t ();
struct app_thread_info_t
{
// If false, 0MQ application thread is free, there's no associated
// OS thread.
bool associated;
// Sockets belonging to this context.
typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets;
// ID of the associated OS thread. If 'associated' is false,
// this field contains bogus data.
thread_t::id_t tid;
// Array of sockets that were already closed but not yet deallocated.
// These sockets still have some pipes and I/O objects attached.
typedef yarray_t <socket_base_t> zombies_t;
zombies_t zombies;
// Pointer to the 0MQ application thread object.
class app_thread_t *app_thread;
};
// List of unused slots.
typedef std::vector <uint32_t> emtpy_slots_t;
emtpy_slots_t empty_slots;
// If true, shutdown thread wants to be informed when there are no
// more open sockets. Do so by posting no_sockets_sync semaphore.
// Note that this variable is synchronised by slot_sync mutex.
bool no_sockets_notify;
// Object used by zmq_term to wait while all the sockets are closed
// by different application threads.
semaphore_t no_sockets_sync;
// Application threads.
typedef std::vector <app_thread_info_t> app_threads_t;
app_threads_t app_threads;
// Synchronisation of accesses to global slot-related data:
// sockets, zombies, empty_slots, terminated. It also synchronises
// access to zombie sockets as such (as oposed to slots) and provides
// a memory barrier to ensure that all CPU cores see the same data.
mutex_t slot_sync;
// Synchronisation of accesses to shared application thread data.
mutex_t app_threads_sync;
// This function attempts to deallocate as many zombie sockets as
// possible. It must be called within a slot_sync critical section.
void dezombify ();
// I/O threads.
typedef std::vector <class io_thread_t*> io_threads_t;
io_threads_t io_threads;
// Array of pointers to signalers for both application and I/O threads.
int signalers_count;
signaler_t **signalers;
// As pipes may reside in orphaned state in particular moments
// of the pipe shutdown process, i.e. neither pipe reader nor
// pipe writer hold reference to the pipe, we have to hold references
// to all pipes in context so that we can deallocate them
// during terminal shutdown even though it conincides with the
// pipe being in the orphaned state.
typedef std::set <class pipe_t*> pipes_t;
pipes_t pipes;
// Synchronisation of access to the pipes repository.
mutex_t pipes_sync;
// Number of sockets alive.
int sockets;
// If true, zmq_term was already called. When last socket is closed
// the whole 0MQ infrastructure should be deallocated.
bool terminated;
// Synchronisation of access to the termination data (socket count
// and 'terminated' flag).
mutex_t term_sync;
uint32_t slot_count;
signaler_t **slots;
// List of inproc endpoints within this context.
typedef std::map <std::string, class socket_base_t*> endpoints_t;
......
......@@ -32,18 +32,19 @@ zmq::fq_t::fq_t () :
zmq::fq_t::~fq_t ()
{
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->term ();
zmq_assert (pipes.empty ());
}
void zmq::fq_t::attach (reader_t *pipe_)
{
pipe_->set_event_sink (this);
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
void zmq::fq_t::detach (reader_t *pipe_)
void zmq::fq_t::terminated (reader_t *pipe_)
{
zmq_assert (!more || pipes [current] != pipe_);
......@@ -57,16 +58,18 @@ void zmq::fq_t::detach (reader_t *pipe_)
pipes.erase (pipe_);
}
void zmq::fq_t::kill (reader_t *pipe_)
bool zmq::fq_t::has_pipes ()
{
// Move the pipe to the list of inactive pipes.
active--;
if (current == active)
current = 0;
pipes.swap (pipes.index (pipe_), active);
return !pipes.empty ();
}
void zmq::fq_t::term_pipes ()
{
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
}
void zmq::fq_t::revive (reader_t *pipe_)
void zmq::fq_t::activated (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
......@@ -98,6 +101,12 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_)
}
return 0;
}
else {
active--;
pipes.swap (current, active);
if (current == active)
current = 0;
}
}
// No message is available. Initialise the output parameter
......
......@@ -21,6 +21,7 @@
#define __ZMQ_FQ_HPP_INCLUDED__
#include "yarray.hpp"
#include "pipe.hpp"
namespace zmq
{
......@@ -28,24 +29,28 @@ namespace zmq
// Class manages a set of inbound pipes. On receive it performs fair
// queueing (RFC970) so that senders gone berserk won't cause denial of
// service for decent senders.
class fq_t
class fq_t : public i_reader_events
{
public:
fq_t ();
~fq_t ();
void attach (class reader_t *pipe_);
void detach (class reader_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
void attach (reader_t *pipe_);
bool has_pipes ();
void term_pipes ();
int recv (zmq_msg_t *msg_, int flags_);
bool has_in ();
// i_reader_events implementation.
void activated (reader_t *pipe_);
void terminated (reader_t *pipe_);
private:
// Inbound pipes.
typedef yarray_t <class reader_t> pipes_t;
typedef yarray_t <reader_t> pipes_t;
pipes_t pipes;
// Number of active pipes. All the active pipes are located at the
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#include "blob.hpp"
namespace zmq
{
struct i_endpoint
{
virtual ~i_endpoint () {}
virtual void attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;
virtual void revive (class reader_t *pipe_) = 0;
virtual void revive (class writer_t *pipe_) = 0;
};
}
#endif
......@@ -26,9 +26,8 @@
#include "err.hpp"
#include "ctx.hpp"
zmq::io_thread_t::io_thread_t (ctx_t *ctx_,
uint32_t thread_slot_) :
object_t (ctx_, thread_slot_)
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t slot_) :
object_t (ctx_, slot_)
{
poller = new (std::nothrow) poller_t;
zmq_assert (poller);
......
......@@ -38,7 +38,7 @@ namespace zmq
{
public:
io_thread_t (class ctx_t *ctx_, uint32_t thread_slot_);
io_thread_t (class ctx_t *ctx_, uint32_t slot_);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
......
......@@ -32,19 +32,27 @@ zmq::lb_t::lb_t () :
zmq::lb_t::~lb_t ()
{
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->term ();
zmq_assert (pipes.empty ());
}
void zmq::lb_t::attach (writer_t *pipe_)
{
pipe_->set_event_sink (this);
pipes.push_back (pipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
void zmq::lb_t::detach (writer_t *pipe_)
void zmq::lb_t::term_pipes ()
{
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
}
void zmq::lb_t::terminated (writer_t *pipe_)
{
// ???
zmq_assert (!more || pipes [current] != pipe_);
// Remove the pipe from the list; adjust number of active pipes
......@@ -57,7 +65,12 @@ void zmq::lb_t::detach (writer_t *pipe_)
pipes.erase (pipe_);
}
void zmq::lb_t::revive (writer_t *pipe_)
bool zmq::lb_t::has_pipes ()
{
return !pipes.empty ();
}
void zmq::lb_t::activated (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
......
......@@ -21,25 +21,30 @@
#define __ZMQ_LB_HPP_INCLUDED__
#include "yarray.hpp"
#include "pipe.hpp"
namespace zmq
{
// Class manages a set of outbound pipes. On send it load balances
// messages fairly among the pipes.
class lb_t
class lb_t : public i_writer_events
{
public:
lb_t ();
~lb_t ();
void attach (class writer_t *pipe_);
void detach (class writer_t *pipe_);
void revive (class writer_t *pipe_);
void attach (writer_t *pipe_);
void term_pipes ();
bool has_pipes ();
int send (zmq_msg_t *msg_, int flags_);
bool has_out ();
// i_writer_events interface implementation.
void activated (writer_t *pipe_);
void terminated (writer_t *pipe_);
private:
// List of outbound pipes.
......
......@@ -28,15 +28,15 @@
#include "session.hpp"
#include "socket_base.hpp"
zmq::object_t::object_t (ctx_t *ctx_, uint32_t thread_slot_) :
zmq::object_t::object_t (ctx_t *ctx_, uint32_t slot_) :
ctx (ctx_),
thread_slot (thread_slot_)
slot (slot_)
{
}
zmq::object_t::object_t (object_t *parent_) :
ctx (parent_->ctx),
thread_slot (parent_->thread_slot)
slot (parent_->slot)
{
}
......@@ -44,9 +44,9 @@ zmq::object_t::~object_t ()
{
}
uint32_t zmq::object_t::get_thread_slot ()
uint32_t zmq::object_t::get_slot ()
{
return thread_slot;
return slot;
}
zmq::ctx_t *zmq::object_t::get_ctx ()
......@@ -123,16 +123,6 @@ void zmq::object_t::process_command (command_t &cmd_)
deallocate_command (&cmd_);
}
void zmq::object_t::register_pipe (class pipe_t *pipe_)
{
ctx->register_pipe (pipe_);
}
void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
{
ctx->unregister_pipe (pipe_);
}
int zmq::object_t::register_endpoint (const char *addr_, socket_base_t *socket_)
{
return ctx->register_endpoint (addr_, socket_);
......@@ -153,6 +143,11 @@ zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
return ctx->choose_io_thread (taskset_);
}
void zmq::object_t::zombify (socket_base_t *socket_)
{
ctx->zombify (socket_);
}
void zmq::object_t::send_stop ()
{
// 'stop' command goes always from administrative thread to
......@@ -160,7 +155,7 @@ void zmq::object_t::send_stop ()
command_t cmd;
cmd.destination = this;
cmd.type = command_t::stop;
ctx->send_command (thread_slot, cmd);
ctx->send_command (slot, cmd);
}
void zmq::object_t::send_plug (owned_t *destination_, bool inc_seqnum_)
......@@ -369,6 +364,6 @@ void zmq::object_t::process_seqnum ()
void zmq::object_t::send_command (command_t &cmd_)
{
ctx->send_command (cmd_.destination->get_thread_slot (), cmd_);
ctx->send_command (cmd_.destination->get_slot (), cmd_);
}
......@@ -32,18 +32,14 @@ namespace zmq
{
public:
object_t (class ctx_t *ctx_, uint32_t thread_slot_);
object_t (class ctx_t *ctx_, uint32_t slot_);
object_t (object_t *parent_);
virtual ~object_t ();
uint32_t get_thread_slot ();
uint32_t get_slot ();
ctx_t *get_ctx ();
void process_command (struct command_t &cmd_);
// Allow pipe to access corresponding context functions.
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
protected:
// Using following function, socket is able to access global
......@@ -55,6 +51,10 @@ namespace zmq
// Chooses least loaded I/O thread.
class io_thread_t *choose_io_thread (uint64_t taskset_);
// Zombify particular socket. In other words, pass the ownership to
// the context.
void zombify (class socket_base_t *socket_);
// Derived object can use these functions to send commands
// to other objects.
void send_stop ();
......@@ -105,7 +105,7 @@ namespace zmq
class ctx_t *ctx;
// Slot ID of the thread the object belongs to.
uint32_t thread_slot;
uint32_t slot;
void send_command (command_t &cmd_);
......
......@@ -35,7 +35,7 @@ zmq::owned_t::~owned_t ()
void zmq::owned_t::inc_seqnum ()
{
// NB: This function may be called from a different thread!
// This function may be called from a different thread!
sent_seqnum.add (1);
}
......@@ -62,10 +62,16 @@ void zmq::owned_t::finalise ()
{
// If termination request was already received and there are no more
// commands to wait for, terminate the object.
if (shutting_down && processed_seqnum == sent_seqnum.get ()) {
if (shutting_down && processed_seqnum == sent_seqnum.get ()
&& is_terminable ()) {
process_unplug ();
send_term_ack (owner);
delete this;
}
}
bool zmq::owned_t::is_terminable ()
{
return true;
}
......@@ -45,6 +45,13 @@ namespace zmq
protected:
// A mechanism allowing derived owned objects to postpone the
// termination process. Default implementation defines no such delay.
// Note that the derived object has to call finalise method when the
// delay is over.
virtual bool is_terminable ();
void finalise ();
// Ask owner socket to terminate this object.
void term ();
......@@ -69,8 +76,6 @@ namespace zmq
void process_term ();
void process_seqnum ();
void finalise ();
// Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum;
......
......@@ -23,11 +23,12 @@
#include "err.hpp"
#include "pipe.hpp"
zmq::pair_t::pair_t (class app_thread_t *parent_) :
socket_base_t (parent_),
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_),
inpipe (NULL),
outpipe (NULL),
alive (true)
inpipe_alive (false),
outpipe_alive (false)
{
options.requires_in = true;
options.requires_out = true;
......@@ -35,56 +36,61 @@ zmq::pair_t::pair_t (class app_thread_t *parent_) :
zmq::pair_t::~pair_t ()
{
if (inpipe)
inpipe->term ();
if (outpipe)
outpipe->term ();
zmq_assert (!inpipe);
zmq_assert (!outpipe);
}
void zmq::pair_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe && !outpipe);
inpipe = inpipe_;
inpipe_alive = true;
inpipe->set_event_sink (this);
outpipe = outpipe_;
outpipe_alive = true;
outpipe->set_event_sink (this);
}
void zmq::pair_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::pair_t::terminated (class reader_t *pipe_)
{
zmq_assert (pipe_ == inpipe);
inpipe = NULL;
inpipe_alive = false;
}
void zmq::pair_t::xdetach_outpipe (class writer_t *pipe_)
void zmq::pair_t::terminated (class writer_t *pipe_)
{
zmq_assert (pipe_ == outpipe);
outpipe = NULL;
outpipe_alive = false;
}
void zmq::pair_t::xkill (class reader_t *pipe_)
void zmq::pair_t::xterm_pipes ()
{
zmq_assert (alive);
alive = false;
if (inpipe)
inpipe->terminate ();
if (outpipe)
outpipe->terminate ();
}
void zmq::pair_t::xrevive (class reader_t *pipe_)
bool zmq::pair_t::xhas_pipes ()
{
zmq_assert (!alive);
alive = true;
return inpipe != NULL || outpipe != NULL;
}
void zmq::pair_t::xrevive (class writer_t *pipe_)
void zmq::pair_t::activated (class reader_t *pipe_)
{
zmq_assert (!outpipe_alive);
outpipe_alive = true;
zmq_assert (!inpipe_alive);
inpipe_alive = true;
}
int zmq::pair_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
void zmq::pair_t::activated (class writer_t *pipe_)
{
errno = EINVAL;
return -1;
zmq_assert (!outpipe_alive);
outpipe_alive = true;
}
int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
......@@ -100,6 +106,7 @@ int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
return -1;
}
if (!(flags_ & ZMQ_SNDMORE))
outpipe->flush ();
// Detach the original message from the data buffer.
......@@ -114,9 +121,12 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
// Deallocate old content of the message.
zmq_msg_close (msg_);
if (!alive || !inpipe || !inpipe->read (msg_)) {
// No message is available. Initialise the output parameter
// to be a 0-byte message.
if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) {
// No message is available.
inpipe_alive = false;
// Initialise the output parameter to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
......@@ -126,14 +136,16 @@ int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
bool zmq::pair_t::xhas_in ()
{
if (alive && inpipe && inpipe->check_read ())
return true;
if (!inpipe || !inpipe_alive)
return false;
inpipe_alive = inpipe->check_read ();
return inpipe_alive;
}
bool zmq::pair_t::xhas_out ()
{
if (outpipe == NULL || !outpipe_alive)
if (!outpipe || !outpipe_alive)
return false;
outpipe_alive = outpipe->check_write ();
......
......@@ -21,37 +21,45 @@
#define __ZMQ_PAIR_HPP_INCLUDED__
#include "socket_base.hpp"
#include "pipe.hpp"
namespace zmq
{
class pair_t : public socket_base_t
class pair_t :
public socket_base_t,
public i_reader_events,
public i_writer_events
{
public:
pair_t (class app_thread_t *parent_);
pair_t (class ctx_t *parent_, uint32_t slot_);
~pair_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
// i_reader_events interface implementation.
void activated (class reader_t *pipe_);
void terminated (class reader_t *pipe_);
// i_writer_events interface implementation.
void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_);
private:
class reader_t *inpipe;
class writer_t *outpipe;
bool alive;
bool inpipe_alive;
bool outpipe_alive;
pair_t (const pair_t&);
......
......@@ -17,31 +17,54 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <new>
#include "../include/zmq.h"
#include "pipe.hpp"
#include "likely.hpp"
zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) :
zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
uint64_t lwm_) :
object_t (parent_),
pipe (NULL),
peer (NULL),
pipe (pipe_),
writer (NULL),
lwm (lwm_),
msgs_read (0),
endpoint (NULL)
{}
sink (NULL),
terminating (false)
{
// Note that writer is not set here. Writer will inform reader about its
// address once it is created (via set_writer method).
}
void zmq::reader_t::set_writer (writer_t *writer_)
{
zmq_assert (!writer);
writer = writer_;
}
zmq::reader_t::~reader_t ()
{
if (pipe)
unregister_pipe (pipe);
// Pipe as such is owned and deallocated by reader object.
// The point is that reader processes the last step of terminal
// handshaking (term_ack).
zmq_assert (pipe);
// First delete all the unread messages in the pipe. We have to do it by
// hand because zmq_msg_t is a POD, not a class, so there's no associated
// destructor.
zmq_msg_t msg;
while (pipe->read (&msg))
zmq_msg_close (&msg);
delete pipe;
}
void zmq::reader_t::set_pipe (pipe_t *pipe_)
void zmq::reader_t::set_event_sink (i_reader_events *sink_)
{
zmq_assert (!pipe);
pipe = pipe_;
peer = &pipe->writer;
register_pipe (pipe);
zmq_assert (!sink);
sink = sink_;
}
bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
......@@ -53,19 +76,20 @@ bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
bool zmq::reader_t::check_read ()
{
if (unlikely (terminating))
return false;
// Check if there's an item in the pipe.
// If not, deactivate the pipe.
if (!pipe->check_read ()) {
endpoint->kill (this);
terminate ();
return false;
}
// If the next item in the pipe is message delimiter,
// initiate its termination.
if (pipe->probe (is_delimiter)) {
if (endpoint)
endpoint->detach_inpipe (this);
term ();
terminate ();
return false;
}
......@@ -74,17 +98,16 @@ bool zmq::reader_t::check_read ()
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
if (!pipe->read (msg_)) {
endpoint->kill (this);
if (unlikely (terminating))
return false;
if (!pipe->read (msg_))
return false;
}
// If delimiter was read, start termination process of the pipe.
unsigned char *offset = 0;
if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
if (endpoint)
endpoint->detach_inpipe (this);
term ();
terminate ();
return false;
}
......@@ -92,51 +115,64 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
msgs_read++;
if (lwm > 0 && msgs_read % lwm == 0)
send_reader_info (peer, msgs_read);
send_reader_info (writer, msgs_read);
return true;
}
void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
void zmq::reader_t::terminate ()
{
endpoint = endpoint_;
// If termination was already started by the peer, do nothing.
if (terminating)
return;
terminating = true;
send_pipe_term (writer);
}
void zmq::reader_t::term ()
bool zmq::reader_t::is_terminating ()
{
endpoint = NULL;
send_pipe_term (peer);
return terminating;
}
void zmq::reader_t::process_revive ()
{
// Beacuse of command throttling mechanism, incoming termination request
// may not have been processed before subsequent send.
// In that case endpoint is NULL.
if (endpoint)
endpoint->revive (this);
// Forward the event to the sink (either socket or session).
sink->activated (this);
}
void zmq::reader_t::process_pipe_term_ack ()
{
peer = NULL;
delete pipe;
// At this point writer may already be deallocated.
// For safety's sake drop the reference to it.
writer = NULL;
// Notify owner about the termination.
zmq_assert (sink);
sink->terminated (this);
// Deallocate resources.
delete this;
}
zmq::writer_t::writer_t (object_t *parent_,
zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
uint64_t hwm_, int64_t swap_size_) :
object_t (parent_),
pipe (NULL),
peer (NULL),
pipe (pipe_),
reader (reader_),
hwm (hwm_),
msgs_read (0),
msgs_written (0),
msg_store (NULL),
extra_msg_flag (false),
stalled (false),
pending_close (false),
endpoint (NULL)
sink (NULL),
terminating (false),
pending_close (false)
{
// Inform reader about the writer.
reader->set_writer (this);
if (swap_size_ > 0) {
msg_store = new (std::nothrow) msg_store_t (swap_size_);
if (msg_store != NULL) {
......@@ -148,11 +184,6 @@ zmq::writer_t::writer_t (object_t *parent_,
}
}
void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
{
endpoint = endpoint_;
}
zmq::writer_t::~writer_t ()
{
if (extra_msg_flag)
......@@ -161,15 +192,17 @@ zmq::writer_t::~writer_t ()
delete msg_store;
}
void zmq::writer_t::set_pipe (pipe_t *pipe_)
void zmq::writer_t::set_event_sink (i_writer_events *sink_)
{
zmq_assert (!pipe);
pipe = pipe_;
peer = &pipe->reader;
zmq_assert (!sink);
sink = sink_;
}
bool zmq::writer_t::check_write ()
{
if (terminating)
return false;
if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) {
stalled = true;
return false;
......@@ -180,6 +213,9 @@ bool zmq::writer_t::check_write ()
bool zmq::writer_t::write (zmq_msg_t *msg_)
{
if (terminating)
return false;
if (!check_write ())
return false;
......@@ -216,23 +252,27 @@ void zmq::writer_t::rollback ()
while (pipe->unwrite (&msg)) {
zmq_assert (msg.flags & ZMQ_MSG_MORE);
zmq_msg_close (&msg);
msgs_written--;
}
if (stalled && endpoint != NULL && check_write ()) {
if (stalled && check_write ()) {
stalled = false;
endpoint->revive (this);
zmq_assert (sink);
sink->activated (this);
}
}
void zmq::writer_t::flush ()
{
if (!pipe->flush ())
send_revive (peer);
send_revive (reader);
}
void zmq::writer_t::term ()
void zmq::writer_t::terminate ()
{
endpoint = NULL;
// Prevent double termination.
if (terminating)
return;
// Rollback any unfinished messages.
rollback ();
......@@ -293,71 +333,69 @@ void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
flush ();
}
if (stalled && endpoint != NULL) {
if (stalled) {
stalled = false;
endpoint->revive (this);
zmq_assert (sink);
sink->activated (this);
}
}
void zmq::writer_t::process_pipe_term ()
{
if (endpoint)
endpoint->detach_outpipe (this);
send_pipe_term_ack (reader);
reader_t *p = peer;
peer = NULL;
send_pipe_term_ack (p);
}
// The above command allows reader to deallocate itself and the pipe.
// For safety's sake we'll drop the pointers here.
reader = NULL;
pipe = NULL;
bool zmq::writer_t::pipe_full ()
{
return hwm > 0 && msgs_written - msgs_read == hwm;
}
// Notify owner about the termination.
zmq_assert (sink);
sink->terminated (this);
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, int64_t swap_size_) :
reader (reader_parent_, compute_lwm (hwm_)),
writer (writer_parent_, hwm_, swap_size_)
{
reader.set_pipe (this);
writer.set_pipe (this);
// Deallocate the resources.
delete this;
}
zmq::pipe_t::~pipe_t ()
bool zmq::writer_t::pipe_full ()
{
// Deallocate all the unread messages in the pipe. We have to do it by
// hand because zmq_msg_t is a POD, not a class, so there's no associated
// destructor.
zmq_msg_t msg;
while (read (&msg))
zmq_msg_close (&msg);
return hwm > 0 && msgs_written - msgs_read == hwm;
}
uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_)
void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, int64_t swap_size_, reader_t **reader_, writer_t **writer_)
{
// Following point should be taken into consideration when computing
// low watermark:
// First compute the low water mark. Following point should be taken
// into consideration:
//
// 1. LWM has to be less than HWM.
// 2. LWM cannot be set to very low value (such as zero) as after filling
// the queue it would start to refill only after all the messages are
// read from it and thus unnecessarily hold the progress back.
// 3. LWM cannot be set to very high value (such as HWM-1) as it would
// result in lock-step filling of the queue - if a single message is read
// from a full queue, writer thread is resumed to write exactly one
// result in lock-step filling of the queue - if a single message is
// read from a full queue, writer thread is resumed to write exactly one
// message to the queue and go back to sleep immediately. This would
// result in low performance.
//
// Given the 3. it would be good to keep HWM and LWM as far apart as
// possible to reduce the thread switching overhead to almost zero,
// say HWM-LWM should be 500 (max_wm_delta).
// say HWM-LWM should be max_wm_delta.
//
// That done, we still we have to account for the cases where HWM<500 thus
// driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases.
if (hwm_ > max_wm_delta * 2)
return hwm_ - max_wm_delta;
else
return (hwm_ + 1) / 2;
// That done, we still we have to account for the cases where
// HWM < max_wm_delta thus driving LWM to negative numbers.
// Let's make LWM 1/2 of HWM in such cases.
uint64_t lwm = (hwm_ > max_wm_delta * 2) ?
hwm_ - max_wm_delta : (hwm_ + 1) / 2;
// Create all three objects pipe consists of: the pipe per se, reader and
// writer. The pipe will be handled by reader and writer, its never passed
// to the user. Reader and writer are returned to the user.
pipe_t *pipe = new (std::nothrow) pipe_t ();
zmq_assert (pipe);
*reader_ = new (std::nothrow) reader_t (reader_parent_, pipe, lwm);
zmq_assert (*reader_);
*writer_ = new (std::nothrow) writer_t (writer_parent_, pipe, *reader_,
hwm_, swap_size_);
zmq_assert (*writer_);
}
......@@ -23,7 +23,6 @@
#include "../include/zmq.h"
#include "stdint.hpp"
#include "i_endpoint.hpp"
#include "yarray_item.hpp"
#include "ypipe.hpp"
#include "msg_store.hpp"
......@@ -33,15 +32,31 @@
namespace zmq
{
// The shutdown mechanism for pipe works as follows: Either endpoint
// (or even both of them) can ask pipe to terminate by calling 'terminate'
// method. Pipe then terminates in asynchronous manner. When the part of
// the shutdown tied to the endpoint is done it triggers 'terminated'
// event. When endpoint processes the event and returns, associated
// reader/writer object is deallocated.
typedef ypipe_t <zmq_msg_t, message_pipe_granularity> pipe_t;
struct i_reader_events
{
virtual void terminated (class reader_t *pipe_) = 0;
virtual void activated (class reader_t *pipe_) = 0;
};
class reader_t : public object_t, public yarray_item_t
{
public:
friend void zmq::create_pipe (object_t*, object_t*, uint64_t,
int64_t, reader_t**, writer_t**);
friend class writer_t;
reader_t (class object_t *parent_, uint64_t lwm_);
~reader_t ();
public:
void set_pipe (class pipe_t *pipe_);
void set_endpoint (i_endpoint *endpoint_);
// Specifies the object to get events from the reader.
void set_event_sink (i_reader_events *endpoint_);
// Returns true if there is at least one message to read in the pipe.
bool check_read ();
......@@ -50,10 +65,20 @@ namespace zmq
bool read (zmq_msg_t *msg_);
// Ask pipe to terminate.
void term ();
void terminate ();
// Returns true if the pipe is already terminating
// (say if delimiter was already read).
bool is_terminating ();
private:
reader_t (class object_t *parent_, pipe_t *pipe_, uint64_t lwm_);
~reader_t ();
// To be called only by writer itself!
void set_writer (class writer_t *writer_);
// Command handlers.
void process_revive ();
void process_pipe_term_ack ();
......@@ -62,10 +87,10 @@ namespace zmq
static bool is_delimiter (zmq_msg_t &msg_);
// The underlying pipe.
class pipe_t *pipe;
pipe_t *pipe;
// Pipe writer associated with the other side of the pipe.
class writer_t *peer;
class writer_t *writer;
// Low watermark for in-memory storage (in bytes).
uint64_t lwm;
......@@ -73,22 +98,32 @@ namespace zmq
// Number of messages read so far.
uint64_t msgs_read;
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
// Sink for the events (either the socket of the session).
i_reader_events *sink;
// True is 'terminate' method was called or delimiter
// was read from the pipe.
bool terminating;
reader_t (const reader_t&);
void operator = (const reader_t&);
};
struct i_writer_events
{
virtual void terminated (class writer_t *pipe_) = 0;
virtual void activated (class writer_t *pipe_) = 0;
};
class writer_t : public object_t, public yarray_item_t
{
public:
friend void zmq::create_pipe (object_t*, object_t*, uint64_t,
int64_t, reader_t**, writer_t**);
writer_t (class object_t *parent_, uint64_t hwm_, int64_t swap_size_);
~writer_t ();
public:
void set_pipe (class pipe_t *pipe_);
void set_endpoint (i_endpoint *endpoint_);
// Specifies the object to get events from the writer.
void set_event_sink (i_writer_events *endpoint_);
// Checks whether a message can be written to the pipe.
// If writing the message would cause high watermark to be
......@@ -106,10 +141,14 @@ namespace zmq
void flush ();
// Ask pipe to terminate.
void term ();
void terminate ();
private:
writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_,
uint64_t hwm_, int64_t swap_size_);
~writer_t ();
void process_reader_info (uint64_t msgs_read_);
// Command handlers.
......@@ -123,10 +162,10 @@ namespace zmq
void write_delimiter ();
// The underlying pipe.
class pipe_t *pipe;
pipe_t *pipe;
// Pipe reader associated with the other side of the pipe.
class reader_t *peer;
reader_t *reader;
// High watermark for in-memory storage (in bytes).
uint64_t hwm;
......@@ -149,35 +188,23 @@ namespace zmq
// True iff the last attempt to write a message has failed.
bool stalled;
bool pending_close;
// Sink for the events (either the socket or the session).
i_writer_events *sink;
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
// True is 'terminate' method was called of 'pipe_term' command
// arrived from the reader.
bool terminating;
bool pending_close;
writer_t (const writer_t&);
void operator = (const writer_t&);
};
// Message pipe.
class pipe_t : public ypipe_t <zmq_msg_t, message_pipe_granularity>
{
public:
pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, int64_t swap_size_);
~pipe_t ();
reader_t reader;
writer_t writer;
private:
uint64_t compute_lwm (uint64_t hwm_);
pipe_t (const pipe_t&);
void operator = (const pipe_t&);
};
// Creates a pipe. Returns pointer to reader and writer objects.
void create_pipe (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, int64_t swap_size_, reader_t **reader_,
writer_t **writer_);
}
#endif
......@@ -24,8 +24,8 @@
#include "msg_content.hpp"
#include "pipe.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
zmq::pub_t::pub_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_),
active (0)
{
options.requires_in = false;
......@@ -34,56 +34,47 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) :
zmq::pub_t::~pub_t ()
{
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->term ();
pipes.clear ();
zmq_assert (pipes.empty ());
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
zmq_assert (!inpipe_);
outpipe_->set_event_sink (this);
pipes.push_back (outpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (false);
}
void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
{
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
if (pipes.index (pipe_) < active)
active--;
pipes.erase (pipe_);
}
void zmq::pub_t::xkill (class reader_t *pipe_)
void zmq::pub_t::xterm_pipes ()
{
zmq_assert (false);
// Start shutdown process for all the pipes.
for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate ();
}
void zmq::pub_t::xrevive (class reader_t *pipe_)
bool zmq::pub_t::xhas_pipes ()
{
zmq_assert (false);
return !pipes.empty ();
}
void zmq::pub_t::xrevive (class writer_t *pipe_)
void zmq::pub_t::activated (writer_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
void zmq::pub_t::terminated (writer_t *pipe_)
{
errno = EINVAL;
return -1;
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
if (pipes.index (pipe_) < active)
active--;
pipes.erase (pipe_);
}
int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
......@@ -101,7 +92,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (pipes_t::size_type i = 0; i != active;)
for (pipes_t::size_type i = 0; i < active;)
if (write (pipes [i], msg_))
i++;
int rc = zmq_msg_init (msg_);
......@@ -133,7 +124,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
}
// Push the message to all destinations.
for (pipes_t::size_type i = 0; i != active;) {
for (pipes_t::size_type i = 0; i < active;) {
if (!write (pipes [i], msg_))
content->refcnt.sub (1);
else
......@@ -147,17 +138,6 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
return 0;
}
int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
bool zmq::pub_t::xhas_in ()
{
return false;
}
bool zmq::pub_t::xhas_out ()
{
return true;
......
......@@ -22,31 +22,30 @@
#include "socket_base.hpp"
#include "yarray.hpp"
#include "pipe.hpp"
namespace zmq
{
class pub_t : public socket_base_t
class pub_t : public socket_base_t, public i_writer_events
{
public:
pub_t (class app_thread_t *parent_);
pub_t (class ctx_t *parent_, uint32_t slot_);
~pub_t ();
// Overloads of functions from socket_base_t.
// Implementations of virtual functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
// i_writer_events interface implementation.
void activated (writer_t *pipe_);
void terminated (writer_t *pipe_);
private:
// Write the message to the pipe. Make the pipe inactive if writing
......
......@@ -22,8 +22,8 @@
#include "pull.hpp"
#include "err.hpp"
zmq::pull_t::pull_t (class app_thread_t *parent_) :
socket_base_t (parent_)
zmq::pull_t::pull_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_)
{
options.requires_in = true;
options.requires_out = false;
......@@ -40,45 +40,14 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
void zmq::pull_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::pull_t::xterm_pipes ()
{
zmq_assert (pipe_);
fq.detach (pipe_);
fq.term_pipes ();
}
void zmq::pull_t::xdetach_outpipe (class writer_t *pipe_)
bool zmq::pull_t::xhas_pipes ()
{
// There are no outpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
void zmq::pull_t::xkill (class reader_t *pipe_)
{
fq.kill (pipe_);
}
void zmq::pull_t::xrevive (class reader_t *pipe_)
{
fq.revive (pipe_);
}
void zmq::pull_t::xrevive (class writer_t *pipe_)
{
zmq_assert (false);
}
int zmq::pull_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special options for this socket type.
errno = EINVAL;
return -1;
}
int zmq::pull_t::xsend (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
return fq.has_pipes ();
}
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
......@@ -91,8 +60,3 @@ bool zmq::pull_t::xhas_in ()
return fq.has_in ();
}
bool zmq::pull_t::xhas_out ()
{
return false;
}
......@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PULL_HPP_INCLUDED__
#define __ZMQ_PULL_HPP_INCLUDED__
#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
#define __ZMQ_UPSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
#include "fq.hpp"
......@@ -30,22 +30,16 @@ namespace zmq
{
public:
pull_t (class app_thread_t *parent_);
pull_t (class ctx_t *parent_, uint32_t slot_);
~pull_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
void xterm_pipes ();
bool xhas_pipes ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
......
......@@ -23,8 +23,8 @@
#include "err.hpp"
#include "pipe.hpp"
zmq::push_t::push_t (class app_thread_t *parent_) :
socket_base_t (parent_)
zmq::push_t::push_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_)
{
options.requires_in = false;
options.requires_out = true;
......@@ -41,41 +41,14 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
void zmq::push_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::push_t::xterm_pipes ()
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
lb.term_pipes ();
}
void zmq::push_t::xdetach_outpipe (class writer_t *pipe_)
bool zmq::push_t::xhas_pipes ()
{
zmq_assert (pipe_);
lb.detach (pipe_);
}
void zmq::push_t::xkill (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
void zmq::push_t::xrevive (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
void zmq::push_t::xrevive (class writer_t *pipe_)
{
lb.revive (pipe_);
}
int zmq::push_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special option for this socket type.
errno = EINVAL;
return -1;
return lb.has_pipes ();
}
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
......@@ -83,17 +56,6 @@ int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
return lb.send (msg_, flags_);
}
int zmq::push_t::xrecv (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
bool zmq::push_t::xhas_in ()
{
return false;
}
bool zmq::push_t::xhas_out ()
{
return lb.has_out ();
......
......@@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PUSH_HPP_INCLUDED__
#define __ZMQ_PUSH_HPP_INCLUDED__
#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
#include "lb.hpp"
......@@ -30,21 +30,15 @@ namespace zmq
{
public:
push_t (class app_thread_t *parent_);
push_t (class ctx_t *parent_, uint32_t slot_);
~push_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
......
......@@ -23,8 +23,8 @@
#include "err.hpp"
#include "pipe.hpp"
zmq::rep_t::rep_t (class app_thread_t *parent_) :
socket_base_t (parent_),
zmq::rep_t::rep_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_),
active (0),
current (0),
sending_reply (false),
......@@ -42,6 +42,8 @@ zmq::rep_t::rep_t (class app_thread_t *parent_) :
zmq::rep_t::~rep_t ()
{
zmq_assert (in_pipes.empty ());
zmq_assert (out_pipes.empty ());
}
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
......@@ -50,15 +52,28 @@ void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
inpipe_->set_event_sink (this);
in_pipes.push_back (inpipe_);
in_pipes.swap (active, in_pipes.size () - 1);
outpipe_->set_event_sink (this);
out_pipes.push_back (outpipe_);
out_pipes.swap (active, out_pipes.size () - 1);
active++;
}
void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::rep_t::xterm_pipes ()
{
for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
in_pipes [i]->terminate ();
for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++)
out_pipes [i]->terminate ();
}
void zmq::rep_t::terminated (reader_t *pipe_)
{
// ???
zmq_assert (sending_reply || !more || in_pipes [current] != pipe_);
zmq_assert (pipe_);
......@@ -71,14 +86,17 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
if (current == active)
current = 0;
}
in_pipes.erase (index);
// ???
if (!zombie) {
if (out_pipes [index])
out_pipes [index]->term ();
in_pipes.erase (index);
out_pipes [index]->terminate ();
out_pipes.erase (index);
}
}
void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
void zmq::rep_t::terminated (writer_t *pipe_)
{
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
......@@ -97,22 +115,22 @@ void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
current = 0;
}
out_pipes.erase (index);
// ???
if (!zombie) {
if (in_pipes [index])
in_pipes [index]->term ();
in_pipes [index]->terminate ();
in_pipes.erase (index);
out_pipes.erase (index);
}
}
void zmq::rep_t::xkill (class reader_t *pipe_)
bool zmq::rep_t::xhas_pipes ()
{
// Move the pipe to the list of inactive pipes.
in_pipes_t::size_type index = in_pipes.index (pipe_);
active--;
in_pipes.swap (index, active);
out_pipes.swap (index, active);
return !in_pipes.empty () || !out_pipes.empty ();
}
void zmq::rep_t::xrevive (class reader_t *pipe_)
void zmq::rep_t::activated (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
in_pipes_t::size_type index = in_pipes.index (pipe_);
......@@ -121,15 +139,10 @@ void zmq::rep_t::xrevive (class reader_t *pipe_)
active++;
}
void zmq::rep_t::xrevive (class writer_t *pipe_)
{
}
int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
void zmq::rep_t::activated (writer_t *pipe_)
{
errno = EINVAL;
return -1;
// TODO: What here?
zmq_assert (false);
}
int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
......@@ -151,6 +164,8 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
// misbehaving requesters stop collecting replies.
// TODO: Tear down the underlying connection (?)
if (!written) {
// TODO: The reply socket becomes deactivated here...
errno = EAGAIN;
return -1;
}
......@@ -198,6 +213,13 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
for (count = active; count != 0; count--) {
if (in_pipes [current]->read (msg_))
break;
// Move the pipe to the list of inactive pipes.
active--;
in_pipes.swap (current, active);
out_pipes.swap (current, active);
// Move to next pipe.
current++;
if (current >= active)
current = 0;
......@@ -258,6 +280,13 @@ bool zmq::rep_t::xhas_in ()
for (int count = active; count != 0; count--) {
if (in_pipes [current]->check_read ())
return !sending_reply;
// Move the pipe to the list of inactive pipes.
active--;
in_pipes.swap (current, active);
out_pipes.swap (current, active);
// Move to the next pipe.
current++;
if (current >= active)
current = 0;
......
......@@ -22,39 +22,47 @@
#include "socket_base.hpp"
#include "yarray.hpp"
#include "pipe.hpp"
namespace zmq
{
class rep_t : public socket_base_t
class rep_t :
public socket_base_t,
public i_reader_events,
public i_writer_events
{
public:
rep_t (class app_thread_t *parent_);
rep_t (class ctx_t *parent_, uint32_t slot_);
~rep_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
// i_reader_events interface implementation.
void activated (reader_t *pipe_);
void terminated (reader_t *pipe_);
// i_writer_events interface implementation.
void activated (writer_t *pipe_);
void terminated (writer_t *pipe_);
private:
// List in outbound and inbound pipes. Note that the two lists are
// always in sync. I.e. outpipe with index N communicates with the
// same session as inpipe with index N.
typedef yarray_t <class writer_t> out_pipes_t;
typedef yarray_t <writer_t> out_pipes_t;
out_pipes_t out_pipes;
typedef yarray_t <class reader_t> in_pipes_t;
typedef yarray_t <reader_t> in_pipes_t;
in_pipes_t in_pipes;
// Number of active inpipes. All the active inpipes are located at the
......@@ -73,7 +81,7 @@ namespace zmq
bool more;
// Pipe we are going to send reply to.
class writer_t *reply_pipe;
writer_t *reply_pipe;
rep_t (const rep_t&);
void operator = (const rep_t&);
......
......@@ -23,8 +23,8 @@
#include "err.hpp"
#include "pipe.hpp"
zmq::req_t::req_t (class app_thread_t *parent_) :
socket_base_t (parent_),
zmq::req_t::req_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_),
active (0),
current (0),
receiving_reply (false),
......@@ -38,24 +38,36 @@ zmq::req_t::req_t (class app_thread_t *parent_) :
zmq::req_t::~req_t ()
{
zmq_assert (in_pipes.empty ());
zmq_assert (out_pipes.empty ());
}
void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
void zmq::req_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
inpipe_->set_event_sink (this);
in_pipes.push_back (inpipe_);
in_pipes.swap (active, in_pipes.size () - 1);
outpipe_->set_event_sink (this);
out_pipes.push_back (outpipe_);
out_pipes.swap (active, out_pipes.size () - 1);
active++;
}
void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::req_t::xterm_pipes ()
{
for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
in_pipes [i]->terminate ();
for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++)
out_pipes [i]->terminate ();
}
void zmq::req_t::terminated (reader_t *pipe_)
{
zmq_assert (!receiving_reply || !more || reply_pipe != pipe_);
......@@ -63,17 +75,21 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
zmq_assert (in_pipes.size () == out_pipes.size ());
// TODO: The pipe we are awaiting the reply from is detached. What now?
// Return ECONNRESET from subsequent recv?
if (receiving_reply && pipe_ == reply_pipe) {
zmq_assert (false);
}
in_pipes_t::size_type index = in_pipes.index (pipe_);
// ???
if (!zombie) {
if (out_pipes [index])
out_pipes [index]->term ();
in_pipes.erase (index);
out_pipes [index]->terminate ();
out_pipes.erase (index);
}
in_pipes.erase (index);
if (index < active) {
active--;
if (current == active)
......@@ -81,7 +97,7 @@ void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
}
}
void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
void zmq::req_t::terminated (writer_t *pipe_)
{
zmq_assert (receiving_reply || !more || out_pipes [current] != pipe_);
......@@ -90,9 +106,13 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
out_pipes_t::size_type index = out_pipes.index (pipe_);
// ???
if (!zombie) {
if (in_pipes [index])
in_pipes [index]->term ();
in_pipes [index]->terminate ();
in_pipes.erase (index);
}
out_pipes.erase (index);
if (index < active) {
active--;
......@@ -101,15 +121,12 @@ void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
}
}
void zmq::req_t::xkill (class reader_t *pipe_)
bool zmq::req_t::xhas_pipes ()
{
zmq_assert (receiving_reply);
zmq_assert (pipe_ == reply_pipe);
reply_pipe_active = false;
return !in_pipes.empty () || !out_pipes.empty ();
}
void zmq::req_t::xrevive (class reader_t *pipe_)
void zmq::req_t::activated (reader_t *pipe_)
{
// TODO: Actually, misbehaving peer can cause this kind of thing.
// Handle it decently, presumably kill the offending connection.
......@@ -117,7 +134,7 @@ void zmq::req_t::xrevive (class reader_t *pipe_)
reply_pipe_active = true;
}
void zmq::req_t::xrevive (class writer_t *pipe_)
void zmq::req_t::activated (writer_t *pipe_)
{
out_pipes_t::size_type index = out_pipes.index (pipe_);
zmq_assert (index >= active);
......@@ -129,13 +146,6 @@ void zmq::req_t::xrevive (class writer_t *pipe_)
}
}
int zmq::req_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
{
// If we've sent a request and we still haven't got the reply,
......@@ -214,6 +224,7 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
// Get the reply from the reply pipe.
if (!reply_pipe_active || !reply_pipe->read (msg_)) {
reply_pipe_active = false;
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
......
......@@ -22,31 +22,39 @@
#include "socket_base.hpp"
#include "yarray.hpp"
#include "pipe.hpp"
namespace zmq
{
class req_t : public socket_base_t
class req_t :
public socket_base_t,
public i_reader_events,
public i_writer_events
{
public:
req_t (class app_thread_t *parent_);
req_t (class ctx_t *parent_, uint32_t slot_);
~req_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
// i_reader_events interface implementation.
void activated (reader_t *pipe_);
void terminated (reader_t *pipe_);
// i_writer_events interface implementation.
void activated (writer_t *pipe_);
void terminated (writer_t *pipe_);
private:
// List in outbound and inbound pipes. Note that the two lists are
......@@ -58,9 +66,9 @@ namespace zmq
// the beginning of the array). We don't have to do the same thing for
// inpipes, because we know which pipe we want to read the
// reply from.
typedef yarray_t <class writer_t> out_pipes_t;
typedef yarray_t <writer_t> out_pipes_t;
out_pipes_t out_pipes;
typedef yarray_t <class reader_t> in_pipes_t;
typedef yarray_t <reader_t> in_pipes_t;
in_pipes_t in_pipes;
// Number of active pipes.
......@@ -82,7 +90,7 @@ namespace zmq
bool more;
// Pipe we are awaiting the reply from.
class reader_t *reply_pipe;
reader_t *reply_pipe;
req_t (const req_t&);
void operator = (const req_t&);
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SEMAPHORE_HPP_INCLUDED__
#define __ZMQ_SEMAPHORE_HPP_INCLUDED__
#include "platform.hpp"
#include "err.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <semaphore.h>
#endif
namespace zmq
{
// Simple semaphore. Only single thread may be waiting at any given time.
// Also, the semaphore may not be posted before the previous post
// was matched by corresponding wait and the waiting thread was
// released.
#if defined ZMQ_HAVE_WINDOWS
// On Windows platform simple semaphore is implemeted using event object.
class semaphore_t
{
public:
// Initialise the semaphore.
inline semaphore_t ()
{
ev = CreateEvent (NULL, FALSE, FALSE, NULL);
win_assert (ev != NULL);
}
// Destroy the semaphore.
inline ~semaphore_t ()
{
int rc = CloseHandle (ev);
win_assert (rc != 0);
}
// Wait for the semaphore.
inline void wait ()
{
DWORD rc = WaitForSingleObject (ev, INFINITE);
win_assert (rc != WAIT_FAILED);
}
// Post the semaphore.
inline void post ()
{
int rc = SetEvent (ev);
win_assert (rc != 0);
}
private:
HANDLE ev;
// Disable copying of the object.
semaphore_t (const semaphore_t&);
void operator = (const semaphore_t&);
};
#else
// Default implementation maps simple semaphore to POSIX semaphore.
class semaphore_t
{
public:
// Initialise the semaphore.
inline semaphore_t ()
{
int rc = sem_init (&sem, 0, 0);
errno_assert (rc != -1);
}
// Destroy the semaphore.
inline ~semaphore_t ()
{
int rc = sem_destroy (&sem);
errno_assert (rc != -1);
}
// Wait for the semaphore.
inline void wait ()
{
int rc = sem_wait (&sem);
errno_assert (rc != -1);
}
// Post the semaphore.
inline void post ()
{
int rc = sem_post (&sem);
errno_assert (rc != -1);
}
private:
// Underlying system semaphore object.
sem_t sem;
// Disable copying of the object.
semaphore_t (const semaphore_t&);
void operator = (const semaphore_t&);
};
#endif
}
#endif
......@@ -69,13 +69,22 @@ zmq::session_t::~session_t ()
zmq_assert (!out_pipe);
}
bool zmq::session_t::is_terminable ()
{
return in_pipe->is_terminating ();
}
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
if (!in_pipe || !active)
return false;
if (!in_pipe->read (msg_))
if (!in_pipe->read (msg_)) {
active = false;
if (in_pipe->is_terminating ())
finalise ();
return false;
}
incomplete_in = msg_->flags & ZMQ_MSG_MORE;
return true;
......@@ -156,33 +165,28 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_,
zmq_assert (!in_pipe);
in_pipe = inpipe_;
active = true;
in_pipe->set_endpoint (this);
in_pipe->set_event_sink (this);
}
if (outpipe_) {
zmq_assert (!out_pipe);
out_pipe = outpipe_;
out_pipe->set_endpoint (this);
out_pipe->set_event_sink (this);
}
}
void zmq::session_t::detach_inpipe (reader_t *pipe_)
void zmq::session_t::terminated (reader_t *pipe_)
{
active = false;
in_pipe = NULL;
}
void zmq::session_t::detach_outpipe (writer_t *pipe_)
void zmq::session_t::terminated (writer_t *pipe_)
{
out_pipe = NULL;
}
void zmq::session_t::kill (reader_t *pipe_)
{
active = false;
}
void zmq::session_t::revive (reader_t *pipe_)
void zmq::session_t::activated (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
active = true;
......@@ -190,7 +194,7 @@ void zmq::session_t::revive (reader_t *pipe_)
engine->revive ();
}
void zmq::session_t::revive (writer_t *pipe_)
void zmq::session_t::activated (writer_t *pipe_)
{
zmq_assert (out_pipe == pipe_);
if (engine)
......@@ -203,6 +207,11 @@ void zmq::session_t::process_plug ()
void zmq::session_t::process_unplug ()
{
// TODO: There may be a problem here. The called ensures that all the
// commands on the fly have been delivered. However, given that the
// session is unregistered from the global repository only at this point
// there may be some commands being sent to the session right now.
// Unregister the session from the socket.
if (ordinal)
owner->unregister_session (ordinal);
......@@ -210,14 +219,10 @@ void zmq::session_t::process_unplug ()
owner->unregister_session (peer_identity);
// Ask associated pipes to terminate.
if (in_pipe) {
in_pipe->term ();
in_pipe = NULL;
}
if (out_pipe) {
out_pipe->term ();
out_pipe = NULL;
}
if (in_pipe)
in_pipe->terminate ();
if (out_pipe)
out_pipe->terminate ();
if (engine) {
engine->unplug ();
......@@ -265,19 +270,15 @@ void zmq::session_t::process_attach (i_engine *engine_,
writer_t *socket_writer = NULL;
if (options.requires_in && !out_pipe) {
pipe_t *pipe = new (std::nothrow) pipe_t (owner, this, options.hwm, options.swap);
zmq_assert (pipe);
out_pipe = &pipe->writer;
out_pipe->set_endpoint (this);
socket_reader = &pipe->reader;
create_pipe (owner, this, options.hwm, options.swap, &socket_reader,
&out_pipe);
out_pipe->set_event_sink (this);
}
if (options.requires_out && !in_pipe) {
pipe_t *pipe = new (std::nothrow) pipe_t (this, owner, options.hwm, options.swap);
zmq_assert (pipe);
in_pipe = &pipe->reader;
in_pipe->set_endpoint (this);
socket_writer = &pipe->writer;
create_pipe (this, owner, options.hwm, options.swap, &in_pipe,
&socket_writer);
in_pipe->set_event_sink (this);
}
if (socket_reader || socket_writer)
......@@ -289,3 +290,4 @@ void zmq::session_t::process_attach (i_engine *engine_,
engine = engine_;
engine->plug (this);
}
......@@ -21,15 +21,19 @@
#define __ZMQ_SESSION_HPP_INCLUDED__
#include "i_inout.hpp"
#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
#include "blob.hpp"
#include "pipe.hpp"
namespace zmq
{
class session_t : public owned_t, public i_inout, public i_endpoint
class session_t :
public owned_t,
public i_inout,
public i_reader_events,
public i_writer_events
{
public:
......@@ -50,19 +54,25 @@ namespace zmq
class socket_base_t *get_owner ();
uint64_t get_ordinal ();
// i_endpoint interface implementation.
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
void revive (class writer_t *pipe_);
// i_reader_events interface implementation.
void activated (class reader_t *pipe_);
void terminated (class reader_t *pipe_);
// i_writer_events interface implementation.
void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_);
private:
~session_t ();
// Define the delayed termination. (I.e. termination is postponed
// till all the data is flushed to the kernel.)
bool is_terminable ();
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
......
......@@ -23,9 +23,18 @@
#include "../include/zmq.h"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#if defined _MSC_VER
#include <intrin.h>
#endif
#else
#include <unistd.h>
#endif
#include "socket_base.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "io_thread.hpp"
......@@ -39,15 +48,73 @@
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "likely.hpp"
#include "pair.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "pull.hpp"
#include "push.hpp"
#include "xreq.hpp"
#include "xrep.hpp"
#include "uuid.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
// system with x86 architecture and gcc or MSVC compiler.
#if (defined __GNUC__ && (defined __i386__ || defined __x86_64__)) ||\
(defined _MSC_VER && (defined _M_IX86 || defined _M_X64))
#define ZMQ_DELAY_COMMANDS
#endif
zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
uint32_t slot_)
{
socket_base_t *s = NULL;
switch (type_) {
case ZMQ_PAIR:
s = new (std::nothrow) pair_t (parent_, slot_);
break;
case ZMQ_PUB:
s = new (std::nothrow) pub_t (parent_, slot_);
break;
case ZMQ_SUB:
s = new (std::nothrow) sub_t (parent_, slot_);
break;
case ZMQ_REQ:
s = new (std::nothrow) req_t (parent_, slot_);
break;
case ZMQ_REP:
s = new (std::nothrow) rep_t (parent_, slot_);
break;
case ZMQ_XREQ:
s = new (std::nothrow) xreq_t (parent_, slot_);
break;
case ZMQ_XREP:
s = new (std::nothrow) xrep_t (parent_, slot_);
break;
case ZMQ_PULL:
s = new (std::nothrow) pull_t (parent_, slot_);
break;
case ZMQ_PUSH:
s = new (std::nothrow) push_t (parent_, slot_);
break;
default:
errno = EINVAL;
return NULL;
}
zmq_assert (s);
return s;
}
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
object_t (parent_, slot_),
zombie (false),
last_processing_time (0),
pending_term_acks (0),
ticks (0),
rcvmore (false),
app_thread (parent_),
shutting_down (false),
sent_seqnum (0),
processed_seqnum (0),
next_ordinal (1)
......@@ -58,10 +125,38 @@ zmq::socket_base_t::~socket_base_t ()
{
}
zmq::signaler_t *zmq::socket_base_t::get_signaler ()
{
return &signaler;
}
void zmq::socket_base_t::stop ()
{
// Called by ctx when it is terminated (zmq_term).
// 'stop' command is sent from the threads that called zmq_term to
// the thread owning the socket. This way, blocking call in the
// owner thread can be interrupted.
send_stop ();
}
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
{
// If the peer haven't specified it's identity, let's generate one.
if (peer_identity_.size ()) {
xattach_pipes (inpipe_, outpipe_, peer_identity_);
}
else {
blob_t identity (1, 0);
identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len);
xattach_pipes (inpipe_, outpipe_, identity);
}
}
int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
if (unlikely (app_thread->is_terminated ())) {
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
......@@ -79,7 +174,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
if (unlikely (app_thread->is_terminated ())) {
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
......@@ -94,12 +189,37 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return 0;
}
if (option_ == ZMQ_FD) {
if (*optvallen_ < sizeof (fd_t)) {
errno = EINVAL;
return -1;
}
*((fd_t*) optval_) = signaler.get_fd ();
*optvallen_ = sizeof (fd_t);
return 0;
}
if (option_ == ZMQ_EVENTS) {
if (*optvallen_ < sizeof (uint32_t)) {
errno = EINVAL;
return -1;
}
process_commands(false, false);
*((uint32_t*) optval_) = 0;
if (has_out ())
*((uint32_t*) optval_) |= ZMQ_POLLOUT;
if (has_in ())
*((uint32_t*) optval_) |= ZMQ_POLLIN;
*optvallen_ = sizeof (uint32_t);
return 0;
}
return options.getsockopt (option_, optval_, optvallen_);
}
int zmq::socket_base_t::bind (const char *addr_)
{
if (unlikely (app_thread->is_terminated ())) {
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
......@@ -159,7 +279,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int zmq::socket_base_t::connect (const char *addr_)
{
if (unlikely (app_thread->is_terminated ())) {
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
......@@ -190,30 +310,29 @@ int zmq::socket_base_t::connect (const char *addr_)
if (!peer)
return -1;
pipe_t *in_pipe = NULL;
pipe_t *out_pipe = NULL;
reader_t *inpipe_reader = NULL;
writer_t *inpipe_writer = NULL;
reader_t *outpipe_reader = NULL;
writer_t *outpipe_writer = NULL;
// Create inbound pipe, if required.
if (options.requires_in) {
in_pipe = new (std::nothrow) pipe_t (this, peer, options.hwm, options.swap);
zmq_assert (in_pipe);
}
if (options.requires_in)
create_pipe (this, peer, options.hwm, options.swap,
&inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out) {
out_pipe = new (std::nothrow) pipe_t (peer, this, options.hwm, options.swap);
zmq_assert (out_pipe);
}
if (options.requires_out)
create_pipe (peer, this, options.hwm, options.swap,
&outpipe_reader, &outpipe_writer);
// Attach the pipes to this socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
out_pipe ? &out_pipe->writer : NULL, blob_t ());
attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
// Attach the pipes to the peer socket. Note that peer's seqnum
// was incremented in find_endpoint function. The callee is notified
// about the fact via the last parameter.
send_bind (peer, out_pipe ? &out_pipe->reader : NULL,
in_pipe ? &in_pipe->writer : NULL, options.identity, false);
send_bind (peer, outpipe_reader, inpipe_writer,
options.identity, false);
return 0;
}
......@@ -224,34 +343,31 @@ int zmq::socket_base_t::connect (const char *addr_)
this, options);
zmq_assert (session);
// If 'immediate connect' feature is required, we'll created the pipes
// If 'immediate connect' feature is required, we'll create the pipes
// to the session straight away. Otherwise, they'll be created by the
// session once the connection is established.
if (options.immediate_connect) {
pipe_t *in_pipe = NULL;
pipe_t *out_pipe = NULL;
reader_t *inpipe_reader = NULL;
writer_t *inpipe_writer = NULL;
reader_t *outpipe_reader = NULL;
writer_t *outpipe_writer = NULL;
// Create inbound pipe, if required.
if (options.requires_in) {
in_pipe = new (std::nothrow) pipe_t (this, session, options.hwm, options.swap);
zmq_assert (in_pipe);
}
if (options.requires_in)
create_pipe (this, session, options.hwm, options.swap,
&inpipe_reader, &inpipe_writer);
// Create outbound pipe, if required.
if (options.requires_out) {
out_pipe = new (std::nothrow) pipe_t (session, this, options.hwm, options.swap);
zmq_assert (out_pipe);
}
if (options.requires_out)
create_pipe (session, this, options.hwm, options.swap,
&outpipe_reader, &outpipe_writer);
// Attach the pipes to the socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
out_pipe ? &out_pipe->writer : NULL, blob_t ());
attach_pipes (inpipe_reader, outpipe_writer, blob_t ());
// Attach the pipes to the session object.
session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
in_pipe ? &in_pipe->writer : NULL, blob_t ());
session->attach_pipes (outpipe_reader, inpipe_writer, blob_t ());
}
// Activate the session.
......@@ -347,8 +463,14 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
// Process pending commands, if any.
if (unlikely (!app_thread->process_commands (false, true))) {
process_commands (false, true);
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
......@@ -372,7 +494,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
if (unlikely (!app_thread->process_commands (true, false))) {
process_commands (true, false);
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
......@@ -383,6 +506,11 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
// Get the message.
int rc = xrecv (msg_, flags_);
int err = errno;
......@@ -396,7 +524,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing rdtsc all the time.
if (++ticks == inbound_poll_rate) {
if (unlikely (!app_thread->process_commands (false, false))) {
process_commands (false, false);
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
......@@ -420,7 +549,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (flags_ & ZMQ_NOBLOCK) {
if (errno != EAGAIN)
return -1;
if (unlikely (!app_thread->process_commands (false, false))) {
process_commands (false, false);
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
......@@ -440,7 +570,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
while (rc != 0) {
if (errno != EAGAIN)
return -1;
if (unlikely (!app_thread->process_commands (true, false))) {
process_commands (true, false);
if (unlikely (zombie)) {
errno = ETERM;
return -1;
}
......@@ -456,74 +587,72 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
int zmq::socket_base_t::close ()
{
shutting_down = true;
// Let the thread know that the socket is no longer available.
app_thread->remove_socket (this);
// Pointer to the context must be retrieved before the socket is
// deallocated. Afterwards it is not available.
ctx_t *ctx = get_ctx ();
// Socket becomes a zombie. From now on all new arrived pipes (bind
// command) and I/O objects (own command) are immediately terminated.
// Also, any further requests form I/O object termination are ignored
// (we are going to shut them down anyway -- this way we assure that
// we do so once only).
zombie = true;
// Unregister all inproc endpoints associated with this socket.
// From this point we are sure that inc_seqnum won't be called again
// on this object.
ctx->unregister_endpoints (this);
// Wait till all undelivered commands are delivered. This should happen
// very quickly. There's no way to wait here for extensive period of time.
// Doing this we make sure that no new pipes from other sockets (inproc)
// will be initiated. However, there may be some inproc pipes already
// on the fly, but not yet received by this socket. To get finished
// with them we'll do the subsequent waiting from on-the-fly commands.
// This should happen very quickly. There's no way to block here for
// extensive period of time.
unregister_endpoints (this);
while (processed_seqnum != sent_seqnum.get ())
app_thread->process_commands (true, false);
process_commands (true, false);
// TODO: My feeling is that the above has to be done in the dezombification
// loop, otherwise we may end up with number of i/o object dropping to zero
// even though there are more i/o objects on the way.
while (true) {
// The above process ensures that only pipes that will arrive from now on
// are those initiated by sessions. These in turn have a nice property of
// not arriving totally asynchronously. When a session -- being an I/O
// object -- acknowledges its termination we are 100% sure that we'll get
// no new pipe from it.
// On third pass of the loop there should be no more I/O objects
// because all connecters and listerners were destroyed during
// the first pass and all engines delivered by delayed 'own' commands
// are destroyed during the second pass.
if (io_objects.empty () && !pending_term_acks)
break;
// Start termination of all the pipes presently associated with the socket.
xterm_pipes ();
// Send termination request to all associated I/O objects.
// Start waiting for the acks. Note that the actual waiting is not done
// in this function. Rather it is done in delayed manner as socket is
// being dezombified. The reason is that I/O object shutdown can take
// considerable amount of time in case there's still a lot of data to
// push to the network.
for (io_objects_t::iterator it = io_objects.begin ();
it != io_objects.end (); it++)
send_term (*it);
// Move the objects to the list of pending term acks.
pending_term_acks += io_objects.size ();
io_objects.clear ();
// Process commands till we get all the termination acknowledgements.
while (pending_term_acks)
app_thread->process_commands (true, false);
}
// Check whether there are no session leaks.
sessions_sync.lock ();
zmq_assert (named_sessions.empty ());
zmq_assert (unnamed_sessions.empty ());
sessions_sync.unlock ();
delete this;
// This function must be called after the socket is completely deallocated
// as it may cause termination of the whole 0MQ infrastructure.
ctx->destroy_socket ();
// Note that new I/O objects may arrive even in zombie state (say new
// session initiated by a listener object), however, in such case number
// of pending acks never drops to zero. Here's the scenario: We have an
// pending ack for the listener object. Then 'own' commands arrives from
// the listener notifying the socket about new session. It immediately
// triggers termination request and number of of pending acks if
// incremented. Then term_acks arrives from the listener. Number of pending
// acks is decremented. Later on, the session itself will ack its
// termination. During the process, number of pending acks never dropped
// to zero and thus the socket remains safely in the zombie state.
// Transfer the ownership of the socket from this application thread
// to the context which will take care of the rest of shutdown process.
zombify (this);
return 0;
}
void zmq::socket_base_t::inc_seqnum ()
{
// NB: This function may be called from a different thread!
// Be aware: This function may be called from a different thread!
sent_seqnum.add (1);
}
zmq::app_thread_t *zmq::socket_base_t::get_thread ()
{
return app_thread;
}
bool zmq::socket_base_t::has_in ()
{
return xhas_in ();
......@@ -607,68 +736,133 @@ zmq::session_t *zmq::socket_base_t::find_session (uint64_t ordinal_)
return session;
}
void zmq::socket_base_t::kill (reader_t *pipe_)
bool zmq::socket_base_t::dezombify ()
{
xkill (pipe_);
}
zmq_assert (zombie);
void zmq::socket_base_t::revive (reader_t *pipe_)
{
xrevive (pipe_);
}
// Process any commands from other threads/sockets that may be available
// at the moment.
process_commands (false, false);
void zmq::socket_base_t::revive (writer_t *pipe_)
{
xrevive (pipe_);
// If there are no more pipes attached and there are no more I/O objects
// owned by the socket, we can kill the zombie.
if (!pending_term_acks && !xhas_pipes ()) {
// If all objects have acknowledged their termination there should
// definitely be no I/O object remaining in the list.
zmq_assert (io_objects.empty ());
// Check whether there are no session leaks.
sessions_sync.lock ();
zmq_assert (named_sessions.empty ());
zmq_assert (unnamed_sessions.empty ());
sessions_sync.unlock ();
// Deallocate all the resources tied to this socket.
delete this;
// Notify the caller about the fact that the zombie is finally dead.
return true;
}
// The zombie remains undead.
return false;
}
void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
{
if (inpipe_)
inpipe_->set_endpoint (this);
if (outpipe_)
outpipe_->set_endpoint (this);
// If the peer haven't specified it's identity, let's generate one.
if (peer_identity_.size ()) {
xattach_pipes (inpipe_, outpipe_, peer_identity_);
bool received;
command_t cmd;
if (block_) {
received = signaler.recv (&cmd, true);
zmq_assert (received);
}
else {
blob_t identity (1, 0);
identity.append (uuid_t ().to_blob (), uuid_t::uuid_blob_len);
xattach_pipes (inpipe_, outpipe_, identity);
#if defined ZMQ_DELAY_COMMANDS
// Optimised version of command processing - it doesn't have to check
// for incoming commands each time. It does so only if certain time
// elapsed since last command processing. Command delay varies
// depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
// etc. The optimisation makes sense only on platforms where getting
// a timestamp is a very cheap operation (tens of nanoseconds).
if (throttle_) {
// Get timestamp counter.
#if defined __GNUC__
uint32_t low;
uint32_t high;
__asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
uint64_t current_time = (uint64_t) high << 32 | low;
#elif defined _MSC_VER
uint64_t current_time = __rdtsc ();
#else
#error
#endif
// Check whether certain time have elapsed since last command
// processing.
if (current_time - last_processing_time <= max_command_delay)
return;
last_processing_time = current_time;
}
}
#endif
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
{
xdetach_inpipe (pipe_);
pipe_->set_endpoint (NULL); // ?
// Check whether there are any commands pending for this thread.
received = signaler.recv (&cmd, false);
}
// Process all the commands available at the moment.
while (received) {
cmd.destination->process_command (cmd);
received = signaler.recv (&cmd, false);
}
}
void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_)
void zmq::socket_base_t::process_stop ()
{
xdetach_outpipe (pipe_);
pipe_->set_endpoint (NULL); // ?
// Here, someone have called zmq_term while the socket was still alive.
// We'll zombify it so that any blocking call is interrupted and any
// further attempt to use the socket will return ETERM. The user is still
// responsible for calling zmq_close on the socket though!
zombie = true;
}
void zmq::socket_base_t::process_own (owned_t *object_)
{
// If the socket is already being shut down, new owned objects are
// immediately asked to terminate.
if (zombie) {
send_term (object_);
pending_term_acks++;
return;
}
io_objects.insert (object_);
}
void zmq::socket_base_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_,
const blob_t &peer_identity_)
{
// If the socket is already being shut down, the termination process on
// the new pipes is started immediately. However, they are still attached
// as to let the process finish in a decent manner.
if (unlikely (zombie)) {
if (in_pipe_)
in_pipe_->terminate ();
if (out_pipe_)
out_pipe_->terminate ();
}
attach_pipes (in_pipe_, out_pipe_, peer_identity_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
{
// When shutting down we can ignore termination requests from owned
// objects. They are going to be terminated anyway.
if (shutting_down)
// objects. It means the termination request was already sent to
// the object.
if (zombie)
return;
// If I/O object is well and alive ask it to terminate.
......@@ -676,7 +870,7 @@ void zmq::socket_base_t::process_term_req (owned_t *object_)
io_objects.end (), object_);
// If not found, we assume that termination request was already sent to
// the object so we can sagely ignore the request.
// the object so we can safely ignore the request.
if (it == io_objects.end ())
return;
......@@ -696,3 +890,32 @@ void zmq::socket_base_t::process_seqnum ()
processed_seqnum++;
}
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
bool zmq::socket_base_t::xhas_out ()
{
return false;
}
int zmq::socket_base_t::xsend (zmq_msg_t *msg_, int options_)
{
errno = ENOTSUP;
return -1;
}
bool zmq::socket_base_t::xhas_in ()
{
return false;
}
int zmq::socket_base_t::xrecv (zmq_msg_t *msg_, int options_)
{
errno = ENOTSUP;
return -1;
}
......@@ -26,13 +26,13 @@
#include "../include/zmq.h"
#include "i_endpoint.hpp"
#include "object.hpp"
#include "yarray_item.hpp"
#include "mutex.hpp"
#include "options.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
#include "signaler.hpp"
#include "stdint.hpp"
#include "blob.hpp"
......@@ -40,11 +40,21 @@ namespace zmq
{
class socket_base_t :
public object_t, public i_endpoint, public yarray_item_t
public object_t,
public yarray_item_t
{
public:
socket_base_t (class app_thread_t *parent_);
// Create a socket of a specified type.
static socket_base_t *create (int type_, class ctx_t *parent_,
uint32_t slot_);
// Returns the signaler associated with this socket.
signaler_t *get_signaler ();
// Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread!
void stop ();
// Interface for communication with the API layer.
int setsockopt (int option_, const void *optval_, size_t optvallen_);
......@@ -60,11 +70,6 @@ namespace zmq
// before the command is delivered.
void inc_seqnum ();
// This function is used by the polling mechanism to determine
// whether the socket belongs to the application thread the poll
// is called from.
class app_thread_t *get_thread ();
// These functions are used by the polling mechanism to determine
// which events are to be reported from this socket.
bool has_in ();
......@@ -85,43 +90,67 @@ namespace zmq
void unregister_session (uint64_t ordinal_);
class session_t *find_session (uint64_t ordinal_);
// i_endpoint interface implementation.
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
void revive (class writer_t *pipe_);
// i_reader_events interface implementation.
void activated (class reader_t *pipe_);
void terminated (class reader_t *pipe_);
// i_writer_events interface implementation.
void activated (class writer_t *pipe_);
void terminated (class writer_t *pipe_);
// This function should be called only on zombie sockets. It tries
// to deallocate the zombie and returns true is successful.
bool dezombify ();
protected:
// Destructor is protected. Socket is closed using 'close' function.
socket_base_t (class ctx_t *parent_, uint32_t slot_);
virtual ~socket_base_t ();
// Pipe management is done by individual socket types.
// Concrete algorithms for the x- methods are to be defined by
// individual socket types.
virtual void xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_) = 0;
virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
virtual void xkill (class reader_t *pipe_) = 0;
virtual void xrevive (class reader_t *pipe_) = 0;
virtual void xrevive (class writer_t *pipe_) = 0;
virtual void xterm_pipes () = 0;
virtual bool xhas_pipes () = 0;
// Actual algorithms are to be defined by individual socket types.
// The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, overload this
// method.
virtual int xsetsockopt (int option_, const void *optval_,
size_t optvallen_) = 0;
virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
virtual bool xhas_in () = 0;
virtual bool xhas_out () = 0;
size_t optvallen_);
// The default implementation assumes that send is not supported.
virtual bool xhas_out ();
virtual int xsend (zmq_msg_t *msg_, int options_);
// The default implementation assumes that recv in not supported.
virtual bool xhas_in ();
virtual int xrecv (zmq_msg_t *msg_, int options_);
// Socket options.
options_t options;
// If true, socket was already closed but not yet deallocated
// because either shutdown is in process or there are still pipes
// attached to the socket.
bool zombie;
private:
// If no identity set generate one and call xattach_pipes ().
void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
// Processes commands sent to this socket (if any). If 'block' is
// set to true, returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once
// in a predefined time period.
void process_commands (bool block_, bool throttle_);
// Handlers for incoming commands.
void process_stop ();
void process_own (class owned_t *object_);
void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_,
const blob_t &peer_identity_);
......@@ -129,6 +158,12 @@ namespace zmq
void process_term_ack ();
void process_seqnum ();
// App thread's signaler object.
signaler_t signaler;
// Timestamp of when commands were processed the last time.
uint64_t last_processing_time;
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::set <class owned_t*> io_objects_t;
......@@ -144,13 +179,6 @@ namespace zmq
// If true there's a half-read message in the socket.
bool rcvmore;
// Application thread the socket lives in.
class app_thread_t *app_thread;
// If true, socket is already shutting down. No new work should be
// started.
bool shutting_down;
// Sequence number of the last command sent to this object.
atomic_counter_t sent_seqnum;
......
......@@ -24,8 +24,8 @@
#include "sub.hpp"
#include "err.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_),
zmq::sub_t::sub_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_),
has_message (false),
more (false)
{
......@@ -46,31 +46,14 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
fq.attach (inpipe_);
}
void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::sub_t::xterm_pipes ()
{
zmq_assert (pipe_);
fq.detach (pipe_);
fq.term_pipes ();
}
void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_)
bool zmq::sub_t::xhas_pipes ()
{
// SUB socket is read-only thus there should be no outpipes.
zmq_assert (false);
}
void zmq::sub_t::xkill (class reader_t *pipe_)
{
fq.kill (pipe_);
}
void zmq::sub_t::xrevive (class reader_t *pipe_)
{
fq.revive (pipe_);
}
void zmq::sub_t::xrevive (class writer_t *pipe_)
{
zmq_assert (false);
return fq.has_pipes ();
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
......@@ -93,12 +76,6 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
return -1;
}
int zmq::sub_t::xsend (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// If there's already a message prepared by a previous call to zmq_poll,
......@@ -179,11 +156,6 @@ bool zmq::sub_t::xhas_in ()
}
}
bool zmq::sub_t::xhas_out ()
{
return false;
}
bool zmq::sub_t::match (zmq_msg_t *msg_)
{
return subscriptions.check ((unsigned char*) zmq_msg_data (msg_),
......
......@@ -33,7 +33,7 @@ namespace zmq
{
public:
sub_t (class app_thread_t *parent_);
sub_t (class ctx_t *parent_, uint32_t slot_);
~sub_t ();
protected:
......@@ -41,16 +41,11 @@ namespace zmq
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
void xterm_pipes ();
bool xhas_pipes ();
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
......
......@@ -38,16 +38,6 @@ void zmq::thread_t::stop ()
win_assert (rc != WAIT_FAILED);
}
zmq::thread_t::id_t zmq::thread_t::id ()
{
return GetCurrentThreadId ();
}
bool zmq::thread_t::equal (id_t id1_, id_t id2_)
{
return id1_ == id2_;
}
unsigned int __stdcall zmq::thread_t::thread_routine (void *arg_)
{
thread_t *self = (thread_t*) arg_;
......@@ -73,16 +63,6 @@ void zmq::thread_t::stop ()
errno_assert (rc == 0);
}
zmq::thread_t::id_t zmq::thread_t::id ()
{
return pthread_self ();
}
bool zmq::thread_t::equal (id_t id1_, id_t id2_)
{
return pthread_equal (id1_, id2_) != 0;
}
void *zmq::thread_t::thread_routine (void *arg_)
{
#if !defined ZMQ_HAVE_OPENVMS
......
......@@ -55,15 +55,6 @@ namespace zmq
// Waits for thread termination.
void stop ();
#ifdef ZMQ_HAVE_WINDOWS
typedef DWORD id_t;
#else
typedef pthread_t id_t;
#endif
static id_t id ();
static bool equal (id_t id1_, id_t id2_);
private:
#ifdef ZMQ_HAVE_WINDOWS
......
......@@ -23,8 +23,8 @@
#include "err.hpp"
#include "pipe.hpp"
zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
socket_base_t (parent_),
zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_),
current_in (0),
prefetched (false),
more_in (false),
......@@ -41,31 +41,41 @@ zmq::xrep_t::xrep_t (class app_thread_t *parent_) :
zmq::xrep_t::~xrep_t ()
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end (); it++)
it->reader->term ();
for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
it++)
it->second.writer->term ();
zmq_assert (inpipes.empty ());
zmq_assert (outpipes.empty ());
}
void zmq::xrep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_, const blob_t &peer_identity_)
void zmq::xrep_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_)
{
zmq_assert (inpipe_ && outpipe_);
outpipe_->set_event_sink (this);
// TODO: What if new connection has same peer identity as the old one?
outpipe_t outpipe = {outpipe_, true};
bool ok = outpipes.insert (std::make_pair (
peer_identity_, outpipe)).second;
zmq_assert (ok);
inpipe_->set_event_sink (this);
inpipe_t inpipe = {inpipe_, peer_identity_, true};
inpipes.push_back (inpipe);
}
void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::xrep_t::xterm_pipes ()
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
it++)
it->reader->terminate ();
for (outpipes_t::iterator it = outpipes.begin (); it != outpipes.end ();
it++)
it->second.writer->terminate ();
}
void zmq::xrep_t::terminated (reader_t *pipe_)
{
// TODO:!
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
it++) {
if (it->reader == pipe_) {
......@@ -76,7 +86,7 @@ void zmq::xrep_t::xdetach_inpipe (class reader_t *pipe_)
zmq_assert (false);
}
void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
void zmq::xrep_t::terminated (writer_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
......@@ -90,20 +100,12 @@ void zmq::xrep_t::xdetach_outpipe (class writer_t *pipe_)
zmq_assert (false);
}
void zmq::xrep_t::xkill (class reader_t *pipe_)
bool zmq::xrep_t::xhas_pipes ()
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
it++) {
if (it->reader == pipe_) {
zmq_assert (it->active);
it->active = false;
return;
}
}
zmq_assert (false);
return !inpipes.empty () || !outpipes.empty ();
}
void zmq::xrep_t::xrevive (class reader_t *pipe_)
void zmq::xrep_t::activated (reader_t *pipe_)
{
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
it++) {
......@@ -116,7 +118,7 @@ void zmq::xrep_t::xrevive (class reader_t *pipe_)
zmq_assert (false);
}
void zmq::xrep_t::xrevive (class writer_t *pipe_)
void zmq::xrep_t::activated (writer_t *pipe_)
{
for (outpipes_t::iterator it = outpipes.begin ();
it != outpipes.end (); ++it) {
......@@ -129,13 +131,6 @@ void zmq::xrep_t::xrevive (class writer_t *pipe_)
zmq_assert (false);
}
int zmq::xrep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
{
// If this is the first part of the message it's the identity of the
......@@ -232,7 +227,9 @@ int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}
// If me don't have a message, move to next pipe.
// If me don't have a message, mark the pipe as passive and
// move to next pipe.
inpipes [current_in].active = false;
current_in++;
if (current_in >= inpipes.size ())
current_in = 0;
......@@ -259,6 +256,10 @@ bool zmq::xrep_t::xhas_in ()
if (inpipes [current_in].active &&
inpipes [current_in].reader->check_read ())
return true;
// If me don't have a message, mark the pipe as passive and
// move to next pipe.
inpipes [current_in].active = false;
current_in++;
if (current_in >= inpipes.size ())
current_in = 0;
......
......@@ -25,32 +25,40 @@
#include "socket_base.hpp"
#include "blob.hpp"
#include "pipe.hpp"
namespace zmq
{
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class xrep_t : public socket_base_t
class xrep_t :
public socket_base_t,
public i_reader_events,
public i_writer_events
{
public:
xrep_t (class app_thread_t *parent_);
xrep_t (class ctx_t *parent_, uint32_t slot_);
~xrep_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
void xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
// i_reader_events interface implementation.
void activated (reader_t *pipe_);
void terminated (reader_t *pipe_);
// i_writer_events interface implementation.
void activated (writer_t *pipe_);
void terminated (writer_t *pipe_);
private:
struct inpipe_t
......
......@@ -22,8 +22,8 @@
#include "xreq.hpp"
#include "err.hpp"
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
socket_base_t (parent_)
zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t slot_) :
socket_base_t (parent_, slot_)
{
options.requires_in = true;
options.requires_out = true;
......@@ -41,38 +41,15 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
lb.attach (outpipe_);
}
void zmq::xreq_t::xdetach_inpipe (class reader_t *pipe_)
void zmq::xreq_t::xterm_pipes ()
{
zmq_assert (pipe_);
fq.detach (pipe_);
fq.term_pipes ();
lb.term_pipes ();
}
void zmq::xreq_t::xdetach_outpipe (class writer_t *pipe_)
bool zmq::xreq_t::xhas_pipes ()
{
zmq_assert (pipe_);
lb.detach (pipe_);
}
void zmq::xreq_t::xkill (class reader_t *pipe_)
{
fq.kill (pipe_);
}
void zmq::xreq_t::xrevive (class reader_t *pipe_)
{
fq.revive (pipe_);
}
void zmq::xreq_t::xrevive (class writer_t *pipe_)
{
lb.revive (pipe_);
}
int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
return fq.has_pipes () || lb.has_pipes ();
}
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
......
......@@ -31,18 +31,14 @@ namespace zmq
{
public:
xreq_t (class app_thread_t *parent_);
xreq_t (class ctx_t *parent_, uint32_t slot_);
~xreq_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_,
const blob_t &peer_identity_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
void xterm_pipes ();
bool xhas_pipes ();
int xsend (zmq_msg_t *msg_, int flags_);
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
......
......@@ -29,7 +29,6 @@
#include "queue.hpp"
#include "streamer.hpp"
#include "socket_base.hpp"
#include "app_thread.hpp"
#include "msg_content.hpp"
#include "platform.hpp"
#include "stdint.hpp"
......@@ -83,8 +82,6 @@ const char *zmq_strerror (int errnum_)
case EINPROGRESS:
return "Operation in progress";
#endif
case EMTHREAD:
return "Number of preallocated application threads exceeded";
case EFSM:
return "Operation cannot be accomplished in current state";
case ENOCOMPATPROTO:
......@@ -367,6 +364,7 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
{
/*
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
......@@ -679,6 +677,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno = ENOTSUP;
return -1;
#endif
*/
zmq_assert (false);
return -1;
}
int zmq_errno ()
......
......@@ -52,7 +52,7 @@ bool zmq::zmq_encoder_t::size_ready ()
bool zmq::zmq_encoder_t::message_ready ()
{
// Destroy content of the old message.
zmq_msg_close(&in_progress);
zmq_msg_close (&in_progress);
// Read new message. If there is none, return false.
// Note that new state is set only if write is successful. That way
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment