Commit 99c5d928 authored by Martin Sustrik's avatar Martin Sustrik

pipes added

parent aacdb7a4
......@@ -67,7 +67,7 @@ extern "C" {
// single accept. There's no message routing or message filtering involved.
#define ZMQ_P2P 0
// Socket to distribute data. Recv fuction is not implemeted for this socket
// Socket to distribute data. Recv fuction is not implemented for this socket
// type. Messages are distributed in fanout fashion to all peers.
#define ZMQ_PUB 1
......
......@@ -19,6 +19,7 @@ libzmq_la_SOURCES = \
io_object.hpp \
io_thread.hpp \
ip.hpp \
i_endpoint.hpp \
i_poller.hpp \
i_poll_events.hpp \
i_signaler.hpp \
......@@ -66,6 +67,7 @@ libzmq_la_SOURCES = \
object.cpp \
options.cpp \
owned.cpp \
pipe.cpp \
poll.cpp \
select.cpp \
session.cpp \
......
......@@ -77,7 +77,7 @@ bool zmq::app_thread_t::make_current ()
return true;
}
void zmq::app_thread_t::process_commands (bool block_)
void zmq::app_thread_t::process_commands (bool block_, bool throttle_)
{
ypollset_t::signals_t signals;
if (block_)
......@@ -91,24 +91,26 @@ void zmq::app_thread_t::process_commands (bool block_)
// depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
// etc. The optimisation makes sense only on platforms where getting
// a timestamp is a very cheap operation (tens of nanoseconds).
if (throttle_) {
// Get timestamp counter.
// Get timestamp counter.
#if defined __GNUC__
uint32_t low;
uint32_t high;
__asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
uint64_t current_time = (uint64_t) high << 32 | low;
uint32_t low;
uint32_t high;
__asm__ volatile ("rdtsc" : "=a" (low), "=d" (high));
uint64_t current_time = (uint64_t) high << 32 | low;
#elif defined _MSC_VER
uint64_t current_time = __rdtsc ();
uint64_t current_time = __rdtsc ();
#else
#error
#endif
// Check whether certain time have elapsed since last command
// processing.
if (current_time - last_processing_time <= max_command_delay)
return;
last_processing_time = current_time;
// Check whether certain time have elapsed since last command
// processing.
if (current_time - last_processing_time <= max_command_delay)
return;
last_processing_time = current_time;
}
#endif
// Check whether there are any commands pending for this thread.
......
......@@ -53,7 +53,9 @@ namespace zmq
// Processes commands sent to this thread (if any). If 'block' is
// set to true, returns only after at least one command was processed.
void process_commands (bool block_);
// If throttle argument is true, commands are processed at most once
// in a predefined time period.
void process_commands (bool block_, bool throttle_);
// Create a socket of a specified type.
class socket_base_t *create_socket (int type_);
......
......@@ -39,6 +39,7 @@ namespace zmq
own,
attach,
bind,
revive,
term_req,
term,
term_ack
......@@ -65,10 +66,18 @@ namespace zmq
class zmq_engine_t *engine;
} attach;
// Sent between objects to establish pipe(s) between them.
// Sent from session to socket to establish pipe(s) between them.
struct {
class owned_t *session;
class reader_t *in_pipe;
class writer_t *out_pipe;
} bind;
// Sent by pipe writer to inform dormant pipe reader that there
// are messages in the pipe.
struct {
} revive;
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
struct {
......
......@@ -38,6 +38,14 @@ namespace zmq
// footprint of dispatcher.
command_pipe_granularity = 4,
// Determines how often does socket poll for new commands when it
// still has unprocessed messages to handle. Thus, if it is set to 100,
// socket will process 100 inbound messages before doing the poll.
// If there are no unprocessed messages available, poll is done
// immediately. Decreasing the value trades overall latency for more
// real-time behaviour (less latency peaks).
inbound_poll_rate = 100,
// Maximal batching size for engines with receiving functionality.
// So, if there are 10 messages that fit into the batch size, all of
// them may be read by a single 'recv' system call, thus avoiding
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_ENDPOINT_HPP_INCLUDED__
#define __ZMQ_I_ENDPOINT_HPP_INCLUDED__
namespace zmq
{
struct i_endpoint
{
virtual void revive (class reader_t *pipe_) = 0;
};
}
#endif
......@@ -20,6 +20,7 @@
#include "object.hpp"
#include "dispatcher.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "io_thread.hpp"
#include "simple_semaphore.hpp"
#include "owned.hpp"
......@@ -57,6 +58,10 @@ void zmq::object_t::process_command (command_t &cmd_)
{
switch (cmd_.type) {
case command_t::revive:
process_revive ();
break;
case command_t::stop:
process_stop ();
break;
......@@ -74,7 +79,8 @@ void zmq::object_t::process_command (command_t &cmd_)
return;
case command_t::bind:
process_bind ();
process_bind (cmd_.args.bind.session,
cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
return;
case command_t::term_req:
......@@ -140,11 +146,23 @@ void zmq::object_t::send_attach (session_t *destination_, zmq_engine_t *engine_)
send_command (cmd);
}
void zmq::object_t::send_bind (object_t *destination_)
void zmq::object_t::send_bind (object_t *destination_, owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::bind;
cmd.args.bind.session = session_;
cmd.args.bind.in_pipe = in_pipe_;
cmd.args.bind.out_pipe = out_pipe_;
send_command (cmd);
}
void zmq::object_t::send_revive (object_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::revive;
send_command (cmd);
}
......@@ -194,7 +212,13 @@ void zmq::object_t::process_attach (zmq_engine_t *engine_)
zmq_assert (false);
}
void zmq::object_t::process_bind ()
void zmq::object_t::process_bind (owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_)
{
zmq_assert (false);
}
void zmq::object_t::process_revive ()
{
zmq_assert (false);
}
......
......@@ -24,7 +24,6 @@
namespace zmq
{
// Base class for all objects that participate in inter-thread
// communication.
......@@ -58,7 +57,9 @@ namespace zmq
class owned_t *object_);
void send_attach (class session_t *destination_,
class zmq_engine_t *engine_);
void send_bind (object_t *destination_);
void send_bind (object_t *destination_, class owned_t *session_,
class reader_t *in_pipe_, class writer_t *out_pipe_);
void send_revive (class object_t *destination_);
void send_term_req (class socket_base_t *destination_,
class owned_t *object_);
void send_term (class owned_t *destination_);
......@@ -70,7 +71,9 @@ namespace zmq
virtual void process_plug ();
virtual void process_own (class owned_t *object_);
virtual void process_attach (class zmq_engine_t *engine_);
virtual void process_bind ();
virtual void process_bind (class owned_t *session_,
class reader_t *in_pipe_, class writer_t *out_pipe_);
virtual void process_revive ();
virtual void process_term_req (class owned_t *object_);
virtual void process_term ();
virtual void process_term_ack ();
......
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <pthread.h>
#include "pipe.hpp"
zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
pipe (pipe_),
peer (&pipe_->writer),
hwm (hwm_),
lwm (lwm_),
index (-1),
endpoint (NULL)
{
}
zmq::reader_t::~reader_t ()
{
}
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
return pipe->read (msg_);
// TODO: Adjust the size of the pipe.
}
void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
{
endpoint = endpoint_;
}
void zmq::reader_t::set_index (int index_)
{
index = index_;
}
int zmq::reader_t::get_index ()
{
return index;
}
void zmq::reader_t::process_revive ()
{
endpoint->revive (this);
}
zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
pipe (pipe_),
peer (&pipe_->reader),
hwm (hwm_),
lwm (lwm_)
{
}
zmq::writer_t::~writer_t ()
{
}
bool zmq::writer_t::check_write (uint64_t size_)
{
// TODO: Check whether hwm is exceeded.
return true;
}
bool zmq::writer_t::write (struct zmq_msg_t *msg_)
{
pipe->write (*msg_);
return true;
// TODO: Adjust size of the pipe.
}
void zmq::writer_t::flush ()
{
if (!pipe->flush ())
send_revive (peer);
}
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) :
reader (reader_parent_, this, hwm_, lwm_),
writer (writer_parent_, this, hwm_, lwm_)
{
}
zmq::pipe_t::~pipe_t ()
{
}
......@@ -22,15 +22,117 @@
#include "../include/zmq.h"
#include "stdint.hpp"
#include "i_endpoint.hpp"
#include "ypipe.hpp"
#include "config.hpp"
#include "object.hpp"
namespace zmq
{
class reader_t : public object_t
{
public:
reader_t (class object_t *parent_, class pipe_t *pipe_,
uint64_t hwm_, uint64_t lwm_);
~reader_t ();
// Reads a message to the underlying pipe.
bool read (struct zmq_msg_t *msg_);
// Mnaipulation of index of the pipe.
void set_endpoint (i_endpoint *endpoint_);
void set_index (int index_);
int get_index ();
private:
// Command handlers.
void process_revive ();
// The underlying pipe.
class pipe_t *pipe;
// Pipe writer associated with the other side of the pipe.
class object_t *peer;
// High and low watermarks for in-memory storage (in bytes).
uint64_t hwm;
uint64_t lwm;
// Positions of head and tail of the pipe (in bytes).
uint64_t head;
uint64_t tail;
uint64_t last_sent_head;
// Index of the pipe in the socket's list of inbound pipes.
int index;
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
reader_t (const reader_t&);
void operator = (const reader_t&);
};
class writer_t : public object_t
{
public:
writer_t (class object_t *parent_, class pipe_t *pipe_,
uint64_t hwm_, uint64_t lwm_);
~writer_t ();
// Checks whether message with specified size can be written to the
// pipe. If writing the message would cause high watermark to be
// exceeded, the function returns false.
bool check_write (uint64_t size_);
// Writes a message to the underlying pipe. Returns false if the
// message cannot be written because high watermark was reached.
bool write (struct zmq_msg_t *msg_);
// Flush the messages downsteam.
void flush ();
private:
// The underlying pipe.
class pipe_t *pipe;
// Pipe reader associated with the other side of the pipe.
class object_t *peer;
// High and low watermarks for in-memory storage (in bytes).
uint64_t hwm;
uint64_t lwm;
// Positions of head and tail of the pipe (in bytes).
uint64_t head;
uint64_t tail;
writer_t (const writer_t&);
void operator = (const writer_t&);
};
// Message pipe.
class pipe_t : public ypipe_t <zmq_msg_t, false, message_pipe_granularity>
{
public:
pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_);
~pipe_t ();
reader_t reader;
writer_t writer;
private:
pipe_t (const pipe_t&);
void operator = (const pipe_t&);
};
}
......
......@@ -20,12 +20,17 @@
#include "session.hpp"
#include "zmq_engine.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const char *name_) :
const char *name_, const options_t &options_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (false),
out_pipe (NULL),
engine (NULL),
name (name_)
name (name_),
options (options_)
{
}
......@@ -33,18 +38,48 @@ zmq::session_t::~session_t ()
{
}
void zmq::session_t::set_inbound_pipe (reader_t *pipe_)
{
zmq_assert (!in_pipe);
in_pipe = pipe_;
active = true;
in_pipe->set_endpoint (this);
}
void zmq::session_t::set_outbound_pipe (writer_t *pipe_)
{
zmq_assert (!out_pipe);
out_pipe = pipe_;
}
bool zmq::session_t::read (::zmq_msg_t *msg_)
{
return false;
if (!active)
return false;
bool fetched = in_pipe->read (msg_);
if (!fetched)
active = false;
return fetched;
}
bool zmq::session_t::write (::zmq_msg_t *msg_)
{
return false;
return out_pipe->write (msg_);
}
void zmq::session_t::flush ()
{
out_pipe->flush ();
}
void zmq::session_t::revive (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
active = true;
if (engine)
engine->revive ();
}
void zmq::session_t::process_plug ()
......@@ -56,6 +91,19 @@ void zmq::session_t::process_plug ()
// We should syslog it and drop the session. TODO
zmq_assert (ok);
// If session is created by 'connect' function, it has the pipes set
// already. Otherwise, it's being created by the listener and the pipes
// are yet to be created.
if (!in_pipe && !out_pipe) {
pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);
zmq_assert (inbound);
in_pipe = &inbound->reader;
pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
zmq_assert (outbound);
out_pipe = &outbound->writer;
send_bind (owner, this, &outbound->reader, &inbound->writer);
}
owned_t::process_plug ();
}
......
......@@ -23,17 +23,22 @@
#include <string>
#include "i_inout.hpp"
#include "i_endpoint.hpp"
#include "owned.hpp"
#include "options.hpp"
namespace zmq
{
class session_t : public owned_t, public i_inout
class session_t : public owned_t, public i_inout, public i_endpoint
{
public:
session_t (object_t *parent_, socket_base_t *owner_, const char *name_);
session_t (object_t *parent_, socket_base_t *owner_, const char *name_,
const options_t &options_);
void set_inbound_pipe (class reader_t *pipe_);
void set_outbound_pipe (class writer_t *pipe_);
private:
......@@ -44,17 +49,32 @@ namespace zmq
bool write (::zmq_msg_t *msg_);
void flush ();
// i_endpoint interface implementation.
void revive (class reader_t *pipe_);
// Handlers for incoming commands.
void process_plug ();
void process_unplug ();
void process_attach (class zmq_engine_t *engine_);
// Inbound pipe, i.e. one the session is getting messages from.
class reader_t *in_pipe;
// If true, in_pipe is active. Otherwise there are no messages to get.
bool active;
// Outbound pipe, i.e. one the socket is sending messages to.
class writer_t *out_pipe;
class zmq_engine_t *engine;
// The name of the session. One that is used to register it with
// socket-level repository of sessions.
std::string name;
// Inherited socket options.
options_t options;
session_t (const session_t&);
void operator = (const session_t&);
};
......
......@@ -27,18 +27,23 @@
#include "err.hpp"
#include "zmq_listener.hpp"
#include "zmq_connecter.hpp"
#include "msg_content.hpp"
#include "io_thread.hpp"
#include "session.hpp"
#include "config.hpp"
#include "owned.hpp"
#include "uuid.hpp"
#include "pipe.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
object_t (parent_),
current (0),
active (0),
pending_term_acks (0),
ticks (0),
app_thread (parent_),
shutting_down (false)
{
{
}
zmq::socket_base_t::~socket_base_t ()
......@@ -65,7 +70,7 @@ zmq::socket_base_t::~socket_base_t ()
// Process commands till we get all the termination acknowledgements.
while (pending_term_acks)
app_thread->process_commands (true);
app_thread->process_commands (true, false);
}
// Check whether there are no session leaks.
......@@ -150,8 +155,28 @@ int zmq::socket_base_t::connect (const char *addr_)
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str ());
session_t *session = new session_t (io_thread, this, session_name.c_str (),
options);
zmq_assert (session);
// Create inbound pipe.
pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
zmq_assert (in_pipe);
in_pipe->reader.set_endpoint (this);
session->set_outbound_pipe (&in_pipe->writer);
in_pipes.push_back (std::make_pair (&in_pipe->reader, session));
in_pipes.back ().first->set_index (active);
in_pipes [active].first->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
// Create outbound pipe.
pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
session->set_inbound_pipe (&out_pipe->reader);
out_pipes.push_back (std::make_pair (&out_pipe->writer, session));
// Activate the session.
send_plug (session);
send_own (this, session);
......@@ -173,17 +198,79 @@ int zmq::socket_base_t::connect (const char *addr_)
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
// Process pending commands, if any.
app_thread->process_commands (false, true);
// Try to send the message.
bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
if (!(flags_ & ZMQ_NOBLOCK)) {
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
while (!sent) {
app_thread->process_commands (true, false);
sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
}
}
else if (!sent) {
errno = EAGAIN;
return -1;
}
return 0;
}
int zmq::socket_base_t::flush ()
{
zmq_assert (false);
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++)
it->first->flush ();
return 0;
}
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
// If the message cannot be fetched immediately, there are two scenarios.
// For non-blocking recv, commands are processed in case there's a message
// already waiting we don't know about. If it's not, return EAGAIN.
// In blocking scenario, commands are processed over and over again until
// we are able to fetch a message.
bool fetched = fetch (msg_);
if (!fetched) {
if (flags_ & ZMQ_NOBLOCK) {
app_thread->process_commands (false, false);
fetched = fetch (msg_);
}
else {
while (!fetched) {
app_thread->process_commands (true, false);
ticks = 0;
fetched = fetch (msg_);
}
}
}
// Once every inbound_poll_rate messages check for signals and process
// incoming commands. This happens only if we are not polling altogether
// because there are messages available all the time. If poll occurs,
// ticks is set to zero and thus we avoid this code.
//
// Note that 'recv' uses different command throttling algorithm (the one
// described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing rdtsc all the time.
if (++ticks == inbound_poll_rate) {
app_thread->process_commands (false, false);
ticks = 0;
}
if (!fetched) {
errno = EAGAIN;
return -1;
}
return 0;
}
int zmq::socket_base_t::close ()
......@@ -229,11 +316,35 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second;
}
void zmq::socket_base_t::revive (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
in_pipes [index].first->set_index (active);
in_pipes [active].first->set_index (index);
std::swap (in_pipes [index], in_pipes [active]);
active++;
}
void zmq::socket_base_t::process_own (owned_t *object_)
{
io_objects.insert (object_);
}
void zmq::socket_base_t::process_bind (owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_)
{
zmq_assert (in_pipe_);
in_pipe_->set_endpoint (this);
in_pipes.push_back (std::make_pair (in_pipe_, session_));
in_pipes.back ().first->set_index (active);
in_pipes [active].first->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
zmq_assert (out_pipe_);
out_pipes.push_back (std::make_pair (out_pipe_, session_));
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
{
// When shutting down we can ignore termination requests from owned
......@@ -260,3 +371,107 @@ void zmq::socket_base_t::process_term_ack ()
zmq_assert (pending_term_acks);
pending_term_acks--;
}
bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
{
int pipes_count = out_pipes.size ();
// If there are no pipes available, simply drop the message.
if (pipes_count == 0) {
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
}
// First check whether all pipes are available for writing.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++)
if (!it->first->check_write (zmq_msg_size (msg_)))
return false;
msg_content_t *content = (msg_content_t*) msg_->content;
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::iterator it = out_pipes.begin ();
it != out_pipes.end (); it++) {
it->first->write (msg_);
if (flush_)
it->first->flush ();
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
}
// Optimisation for the case when there's only a single pipe
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (pipes_count == 1) {
out_pipes.begin ()->first->write (msg_);
if (flush_)
out_pipes.begin ()->first->flush ();
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
}
// There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why -1).
if (msg_->shared)
content->refcnt.add (pipes_count - 1);
else {
content->refcnt.set (pipes_count);
msg_->shared = true;
}
// Push the message to all destinations.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++) {
it->first->write (msg_);
if (flush_)
it->first->flush ();
}
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
}
bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current].first->read (msg_);
// If there's no message in the pipe, move it to the list of
// non-active pipes.
if (!fetched) {
in_pipes [current].first->set_index (active - 1);
in_pipes [active - 1].first->set_index (current);
std::swap (in_pipes [current], in_pipes [active - 1]);
active--;
}
current ++;
if (current >= active)
current = 0;
if (fetched)
return true;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
return false;
}
......@@ -22,8 +22,11 @@
#include <set>
#include <map>
#include <vector>
#include <string>
#include <utility>
#include "i_endpoint.hpp"
#include "object.hpp"
#include "mutex.hpp"
#include "options.hpp"
......@@ -32,7 +35,7 @@
namespace zmq
{
class socket_base_t : public object_t
class socket_base_t : public object_t, public i_endpoint
{
public:
......@@ -57,22 +60,59 @@ namespace zmq
bool unregister_session (const char *name_);
class session_t *find_session (const char *name_);
// i_endpoint interface implementation.
void revive (class reader_t *pipe_);
private:
// Handlers for incoming commands.
void process_own (class owned_t *object_);
void process_bind (class owned_t *session_,
class reader_t *in_pipe_, class writer_t *out_pipe_);
void process_term_req (class owned_t *object_);
void process_term_ack ();
// Attempts to distribute the message to all the outbound pipes.
// Returns false if not possible because of pipe overflow.
bool distribute (struct zmq_msg_t *msg_, bool flush_);
// Gets a message from one of the inbound pipes. Implementation of
// fair queueing.
bool fetch (struct zmq_msg_t *msg_);
// List of all I/O objects owned by this socket. The socket is
// responsible for deallocating them before it quits.
typedef std::set <class owned_t*> io_objects_t;
io_objects_t io_objects;
// Inbound pipes, i.e. those the socket is getting messages from.
// The second member in the pair indicates the object on the other
// side of the pipe.
typedef std::vector <std::pair <class reader_t*, owned_t*> >
in_pipes_t;
in_pipes_t in_pipes;
// Index of the next inbound pipe to read messages from.
in_pipes_t::size_type current;
// Number of active inbound pipes. Active pipes are stored in the
// initial section of the in_pipes array.
in_pipes_t::size_type active;
// Outbound pipes, i.e. those the socket is sending messages to.
// The second member in the pair indicates the object on the other
// side of the pipe.
typedef std::vector <std::pair <class writer_t*, owned_t*> >
out_pipes_t;
out_pipes_t out_pipes;
// Number of I/O objects that were already asked to terminate
// but haven't acknowledged it yet.
int pending_term_acks;
// Number of messages received since last command processing.
int ticks;
// Application thread the socket lives in.
class app_thread_t *app_thread;
......
......@@ -43,9 +43,9 @@ namespace zmq
{
public:
// Initialises the pipe. If 'dead' is set to true, the pipe is
// created in dead state.
inline ypipe_t (bool dead_ = true) :
// Initialises the pipe. In D scenario it is created in dead state.
// Otherwise it's alive.
inline ypipe_t () :
stop (false)
{
// Insert terminator element into the queue.
......@@ -54,7 +54,7 @@ namespace zmq
// Let all the pointers to point to the terminator
// (unless pipe is dead, in which case c is set to NULL).
r = w = &queue.back ();
c.set (dead_ ? NULL : &queue.back ());
c.set (D ? NULL : &queue.back ());
}
// Following function (write) deliberately copies uninitialised data
......@@ -110,7 +110,7 @@ namespace zmq
// available.
inline bool read (T *value_)
{
// Was the value was prefetched already? If so, return it.
// Was the value prefetched already? If so, return it.
if (&queue.front () != r) {
*value_ = queue.front ();
queue.pop ();
......@@ -159,14 +159,14 @@ namespace zmq
// If there are no elements prefetched, exit.
// During pipe's lifetime r should never be NULL, however,
// during pipe shutdown when retrieving messages from it
// to deallocate them, this can happen.
// it can happen during pipe shutdown when messages
// are being deallocated.
if (&queue.front () == r || !r)
return false;
}
// There was at least one value prefetched -
// return it to the caller.
// There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front ();
queue.pop ();
return true;
......@@ -188,8 +188,8 @@ namespace zmq
// exclusively by reader thread.
T *r;
// The single contention point of contention between writer and
// reader thread. Points past the last flushed item. If it is NULL,
// The single point of contention between writer and reader thread.
// Points past the last flushed item. If it is NULL,
// reader is asleep. This pointer should be always accessed using
// atomic operations.
atomic_ptr_t <T> c;
......
......@@ -88,7 +88,7 @@ namespace zmq
back_chunk = end_chunk;
back_pos = end_pos;
if (++ end_pos != N)
if (++end_pos != N)
return;
end_chunk->next = new chunk_t;
......
......@@ -54,9 +54,9 @@ bool zmq::zmq_encoder_t::message_ready ()
// Note that new state is set only if write is successful. That way
// unsuccessful write will cause retry on the next state machine
// invocation.
if (!source->read (&in_progress)) {
if (!source->read (&in_progress))
return false;
}
size_t size = zmq_msg_size (&in_progress);
// For messages less than 255 bytes long, write one byte of message size.
......
......@@ -129,7 +129,12 @@ void zmq::zmq_engine_t::out_event ()
}
}
void zmq::zmq_engine_t::revive ()
{
set_pollout (handle);
}
void zmq::zmq_engine_t::error ()
{
// zmq_assert (false);
zmq_assert (false);
}
......@@ -42,6 +42,10 @@ namespace zmq
void in_event ();
void out_event ();
// This method is called by the session to signalise that there
// are messages to send available.
void revive ();
private:
// Function to handle network disconnections.
......
......@@ -59,7 +59,8 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
session_t *session = owner->find_session (session_name.c_str ());
if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity);
session = new session_t (io_thread, owner, session_name.c_str ());
session = new session_t (io_thread, owner, session_name.c_str (),
options);
zmq_assert (session);
send_plug (session);
send_own (owner, session);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment