Commit cb09c695 authored by Martin Sustrik's avatar Martin Sustrik

pipe deallocation added

parent 2dd50165
......@@ -40,6 +40,8 @@ namespace zmq
attach,
bind,
revive,
pipe_term,
pipe_term_ack,
term_req,
term,
term_ack
......@@ -78,6 +80,15 @@ namespace zmq
struct {
} revive;
// Sent by pipe reader to pipe writer to ask it to terminate
// its end of the pipe.
struct {
} pipe_term;
// Pipe writer acknowledges pipe_term command.
struct {
} pipe_term_ack;
// Sent by I/O object ot the socket to request the shutdown of
// the I/O object.
struct {
......
......@@ -83,6 +83,10 @@ zmq::dispatcher_t::~dispatcher_t ()
for (io_threads_t::size_type i = 0; i != io_threads.size (); i++)
delete io_threads [i];
// Deallocate all the orphaned pipes.
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); it++)
delete *it;
delete [] command_pipes;
#ifdef ZMQ_HAVE_WINDOWS
......@@ -146,3 +150,19 @@ zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_)
return io_threads [result];
}
void zmq::dispatcher_t::register_pipe (class pipe_t *pipe_)
{
pipes_sync.lock ();
bool inserted = pipes.insert (pipe_).second;
zmq_assert (inserted);
pipes_sync.unlock ();
}
void zmq::dispatcher_t::unregister_pipe (class pipe_t *pipe_)
{
pipes_sync.lock ();
pipes_t::size_type erased = pipes.erase (pipe_);
zmq_assert (erased == 1);
pipes_sync.unlock ();
}
......@@ -21,6 +21,7 @@
#define __ZMQ_DISPATCHER_HPP_INCLUDED__
#include <vector>
#include <set>
#include <map>
#include <string>
......@@ -85,6 +86,11 @@ namespace zmq
// Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_);
// All pipes are registered with the dispatcher so that even the
// orphaned pipes can be deallocated on the terminal shutdown.
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
private:
// Returns the app thread associated with the current thread.
......@@ -112,6 +118,18 @@ namespace zmq
// Synchronisation of accesses to shared thread data.
mutex_t threads_sync;
// As pipes may reside in orphaned state in particular moments
// of the pipe shutdown process, i.e. neither pipe reader nor
// pipe writer hold reference to the pipe, we have to hold references
// to all pipes in dispatcher so that we can deallocate them
// during terminal shutdown even though it conincides with the
// pipe being in the orphaned state.
typedef std::set <class pipe_t*> pipes_t;
pipes_t pipes;
// Synchronisation of access to the pipes repository.
mutex_t pipes_sync;
dispatcher_t (const dispatcher_t&);
void operator = (const dispatcher_t&);
};
......
......@@ -26,6 +26,8 @@ namespace zmq
struct i_endpoint
{
virtual void revive (class reader_t *pipe_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0;
};
}
......
......@@ -83,6 +83,14 @@ void zmq::object_t::process_command (command_t &cmd_)
cmd_.args.bind.in_pipe, cmd_.args.bind.out_pipe);
return;
case command_t::pipe_term:
process_pipe_term ();
return;
case command_t::pipe_term_ack:
process_pipe_term_ack ();
return;
case command_t::term_req:
process_term_req (cmd_.args.term_req.object);
return;
......@@ -100,6 +108,16 @@ void zmq::object_t::process_command (command_t &cmd_)
}
}
void zmq::object_t::register_pipe (class pipe_t *pipe_)
{
dispatcher->register_pipe (pipe_);
}
void zmq::object_t::unregister_pipe (class pipe_t *pipe_)
{
dispatcher->unregister_pipe (pipe_);
}
zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
{
return dispatcher->choose_io_thread (taskset_);
......@@ -166,6 +184,22 @@ void zmq::object_t::send_revive (object_t *destination_)
send_command (cmd);
}
void zmq::object_t::send_pipe_term (writer_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::pipe_term;
send_command (cmd);
}
void zmq::object_t::send_pipe_term_ack (reader_t *destination_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::pipe_term_ack;
send_command (cmd);
}
void zmq::object_t::send_term_req (socket_base_t *destination_,
owned_t *object_)
{
......@@ -223,6 +257,16 @@ void zmq::object_t::process_revive ()
zmq_assert (false);
}
void zmq::object_t::process_pipe_term ()
{
zmq_assert (false);
}
void zmq::object_t::process_pipe_term_ack ()
{
zmq_assert (false);
}
void zmq::object_t::process_term_req (owned_t *object_)
{
zmq_assert (false);
......
......@@ -42,6 +42,10 @@ namespace zmq
int get_thread_slot ();
void process_command (struct command_t &cmd_);
// Allow pipe to access corresponding dispatcher functions.
void register_pipe (class pipe_t *pipe_);
void unregister_pipe (class pipe_t *pipe_);
protected:
// Derived object can use following functions to interact with
......@@ -60,6 +64,8 @@ namespace zmq
void send_bind (object_t *destination_, class owned_t *session_,
class reader_t *in_pipe_, class writer_t *out_pipe_);
void send_revive (class object_t *destination_);
void send_pipe_term (class writer_t *destination_);
void send_pipe_term_ack (class reader_t *destination_);
void send_term_req (class socket_base_t *destination_,
class owned_t *object_);
void send_term (class owned_t *destination_);
......@@ -74,6 +80,8 @@ namespace zmq
virtual void process_bind (class owned_t *session_,
class reader_t *in_pipe_, class writer_t *out_pipe_);
virtual void process_revive ();
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
virtual void process_term_req (class owned_t *object_);
virtual void process_term ();
virtual void process_term_ack ();
......
......@@ -19,6 +19,8 @@
#include <pthread.h>
#include <../include/zmq.h>
#include "pipe.hpp"
zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
......@@ -39,9 +41,21 @@ zmq::reader_t::~reader_t ()
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
return pipe->read (msg_);
if (!pipe->read (msg_))
return false;
// If delimiter was read, start termination process of the pipe.
unsigned char *offset = 0;
if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
if (endpoint)
endpoint->detach_inpipe (this);
term ();
return false;
}
// TODO: Adjust the size of the pipe.
return true;
}
void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
......@@ -59,19 +73,48 @@ int zmq::reader_t::get_index ()
return index;
}
void zmq::reader_t::term ()
{
endpoint = NULL;
send_pipe_term (peer);
}
void zmq::reader_t::process_revive ()
{
endpoint->revive (this);
}
void zmq::reader_t::process_pipe_term_ack ()
{
peer = NULL;
delete pipe;
}
zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
uint64_t hwm_, uint64_t lwm_) :
object_t (parent_),
pipe (pipe_),
peer (&pipe_->reader),
hwm (hwm_),
lwm (lwm_)
lwm (lwm_),
index (-1),
endpoint (NULL)
{
}
void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
{
endpoint = endpoint_;
}
void zmq::writer_t::set_index (int index_)
{
index = index_;
}
int zmq::writer_t::get_index ()
{
return index;
}
zmq::writer_t::~writer_t ()
......@@ -99,14 +142,46 @@ void zmq::writer_t::flush ()
send_revive (peer);
}
void zmq::writer_t::term ()
{
endpoint = NULL;
// Push delimiter into the pipe.
// Trick the compiler to belive that the tag is a valid pointer.
zmq_msg_t msg;
const unsigned char *offset = 0;
msg.content = (void*) (offset + ZMQ_DELIMITER);
msg.shared = false;
pipe->write (msg);
pipe->flush ();
}
void zmq::writer_t::process_pipe_term ()
{
if (endpoint)
endpoint->detach_outpipe (this);
reader_t *p = peer;
peer = NULL;
send_pipe_term_ack (p);
}
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, uint64_t lwm_) :
reader (reader_parent_, this, hwm_, lwm_),
writer (writer_parent_, this, hwm_, lwm_)
{
reader.register_pipe (this);
}
zmq::pipe_t::~pipe_t ()
{
// Deallocate all the unread messages in the pipe. We have to do it by
// hand because zmq_msg_t is a POD, not a class, so there's no associated
// destructor.
zmq_msg_t msg;
while (read (&msg))
zmq_msg_close (&msg);
reader.unregister_pipe (this);
}
......@@ -39,25 +39,29 @@ namespace zmq
uint64_t hwm_, uint64_t lwm_);
~reader_t ();
void set_endpoint (i_endpoint *endpoint_);
// Reads a message to the underlying pipe.
bool read (struct zmq_msg_t *msg_);
void set_endpoint (i_endpoint *endpoint_);
// Mnaipulation of index of the pipe.
void set_index (int index_);
int get_index ();
// Ask pipe to terminate.
void term ();
private:
// Command handlers.
void process_revive ();
void process_pipe_term_ack ();
// The underlying pipe.
class pipe_t *pipe;
// Pipe writer associated with the other side of the pipe.
class object_t *peer;
class writer_t *peer;
// High and low watermarks for in-memory storage (in bytes).
uint64_t hwm;
......@@ -86,6 +90,8 @@ namespace zmq
uint64_t hwm_, uint64_t lwm_);
~writer_t ();
void set_endpoint (i_endpoint *endpoint_);
// Checks whether message with specified size can be written to the
// pipe. If writing the message would cause high watermark to be
// exceeded, the function returns false.
......@@ -98,13 +104,23 @@ namespace zmq
// Flush the messages downsteam.
void flush ();
// Mnaipulation of index of the pipe.
void set_index (int index_);
int get_index ();
// Ask pipe to terminate.
void term ();
private:
// Command handlers.
void process_pipe_term ();
// The underlying pipe.
class pipe_t *pipe;
// Pipe reader associated with the other side of the pipe.
class object_t *peer;
class reader_t *peer;
// High and low watermarks for in-memory storage (in bytes).
uint64_t hwm;
......@@ -114,6 +130,12 @@ namespace zmq
uint64_t head;
uint64_t tail;
// Index of the pipe in the socket's list of outbound pipes.
int index;
// Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint;
writer_t (const writer_t&);
void operator = (const writer_t&);
};
......
......@@ -36,6 +36,11 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
zmq::session_t::~session_t ()
{
// Ask associated pipes to terminate.
if (in_pipe)
in_pipe->term ();
if (out_pipe)
out_pipe->term ();
}
void zmq::session_t::set_inbound_pipe (reader_t *pipe_)
......@@ -49,6 +54,7 @@ void zmq::session_t::set_outbound_pipe (writer_t *pipe_)
{
zmq_assert (!out_pipe);
out_pipe = pipe_;
out_pipe->set_endpoint (this);
}
......@@ -92,6 +98,17 @@ void zmq::session_t::revive (reader_t *pipe_)
engine->revive ();
}
void zmq::session_t::detach_inpipe (reader_t *pipe_)
{
active = false;
in_pipe = NULL;
}
void zmq::session_t::detach_outpipe (writer_t *pipe_)
{
out_pipe = NULL;
}
void zmq::session_t::process_plug ()
{
// Register the session with the socket.
......@@ -112,6 +129,7 @@ void zmq::session_t::process_plug ()
pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
zmq_assert (outbound);
out_pipe = &outbound->writer;
out_pipe->set_endpoint (this);
send_bind (owner, this, &outbound->reader, &inbound->writer);
}
......
......@@ -52,6 +52,8 @@ namespace zmq
// i_endpoint interface implementation.
void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
// Handlers for incoming commands.
void process_plug ();
......
......@@ -50,6 +50,16 @@ zmq::socket_base_t::~socket_base_t ()
{
shutting_down = true;
// Ask all pipes to terminate.
for (in_pipes_t::iterator it = in_pipes.begin ();
it != in_pipes.end (); it++)
(*it)->term ();
in_pipes.clear ();
for (out_pipes_t::iterator it = out_pipes.begin ();
it != out_pipes.end (); it++)
(*it)->term ();
out_pipes.clear ();
while (true) {
// On third pass of the loop there should be no more I/O objects
......@@ -164,17 +174,18 @@ int zmq::socket_base_t::connect (const char *addr_)
zmq_assert (in_pipe);
in_pipe->reader.set_endpoint (this);
session->set_outbound_pipe (&in_pipe->writer);
in_pipes.push_back (std::make_pair (&in_pipe->reader, session));
in_pipes.back ().first->set_index (active);
in_pipes [active].first->set_index (in_pipes.size () - 1);
in_pipes.push_back (&in_pipe->reader);
in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
// Create outbound pipe.
pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
out_pipe->writer.set_endpoint (this);
session->set_inbound_pipe (&out_pipe->reader);
out_pipes.push_back (std::make_pair (&out_pipe->writer, session));
out_pipes.push_back (&out_pipe->writer);
// Activate the session.
send_plug (session);
......@@ -225,7 +236,7 @@ int zmq::socket_base_t::flush ()
{
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++)
it->first->flush ();
(*it)->flush ();
return 0;
}
......@@ -320,12 +331,38 @@ void zmq::socket_base_t::revive (reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
in_pipes [index].first->set_index (active);
in_pipes [active].first->set_index (index);
in_pipes [index]->set_index (active);
in_pipes [active]->set_index (index);
std::swap (in_pipes [index], in_pipes [active]);
active++;
}
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
{
// Remove the pipe from the list of inbound pipes.
in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index ();
if (index < active) {
in_pipes [index]->set_index (active - 1);
in_pipes [active - 1]->set_index (index);
std::swap (in_pipes [index], in_pipes [active - 1]);
active--;
index = active;
}
in_pipes [index]->set_index (in_pipes.size () - 1);
in_pipes [in_pipes.size () - 1]->set_index (index);
std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]);
in_pipes.pop_back ();
}
void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_)
{
out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index ();
out_pipes [index]->set_index (out_pipes.size () - 1);
out_pipes [out_pipes.size () - 1]->set_index (index);
std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]);
out_pipes.pop_back ();
}
void zmq::socket_base_t::process_own (owned_t *object_)
{
io_objects.insert (object_);
......@@ -336,13 +373,14 @@ void zmq::socket_base_t::process_bind (owned_t *session_,
{
zmq_assert (in_pipe_);
in_pipe_->set_endpoint (this);
in_pipes.push_back (std::make_pair (in_pipe_, session_));
in_pipes.back ().first->set_index (active);
in_pipes [active].first->set_index (in_pipes.size () - 1);
in_pipes.push_back (in_pipe_);
in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
zmq_assert (out_pipe_);
out_pipes.push_back (std::make_pair (out_pipe_, session_));
out_pipe_->set_endpoint (this);
out_pipes.push_back (out_pipe_);
}
void zmq::socket_base_t::process_term_req (owned_t *object_)
......@@ -388,7 +426,7 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
// First check whether all pipes are available for writing.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++)
if (!it->first->check_write (zmq_msg_size (msg_)))
if (!(*it)->check_write (zmq_msg_size (msg_)))
return false;
msg_content_t *content = (msg_content_t*) msg_->content;
......@@ -397,9 +435,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::iterator it = out_pipes.begin ();
it != out_pipes.end (); it++) {
it->first->write (msg_);
(*it)->write (msg_);
if (flush_)
it->first->flush ();
(*it)->flush ();
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
......@@ -410,9 +448,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (pipes_count == 1) {
out_pipes.begin ()->first->write (msg_);
(*out_pipes.begin ())->write (msg_);
if (flush_)
out_pipes.begin ()->first->flush ();
(*out_pipes.begin ())->flush ();
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
......@@ -431,9 +469,9 @@ bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
// Push the message to all destinations.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++) {
it->first->write (msg_);
(*it)->write (msg_);
if (flush_)
it->first->flush ();
(*it)->flush ();
}
// Detach the original message from the data buffer.
......@@ -451,13 +489,13 @@ bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current].first->read (msg_);
bool fetched = in_pipes [current]->read (msg_);
// If there's no message in the pipe, move it to the list of
// non-active pipes.
if (!fetched) {
in_pipes [current].first->set_index (active - 1);
in_pipes [active - 1].first->set_index (current);
in_pipes [current]->set_index (active - 1);
in_pipes [active - 1]->set_index (current);
std::swap (in_pipes [current], in_pipes [active - 1]);
active--;
}
......
......@@ -24,7 +24,6 @@
#include <map>
#include <vector>
#include <string>
#include <utility>
#include "i_endpoint.hpp"
#include "object.hpp"
......@@ -62,6 +61,8 @@ namespace zmq
// i_endpoint interface implementation.
void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_);
private:
......@@ -86,10 +87,7 @@ namespace zmq
io_objects_t io_objects;
// Inbound pipes, i.e. those the socket is getting messages from.
// The second member in the pair indicates the object on the other
// side of the pipe.
typedef std::vector <std::pair <class reader_t*, owned_t*> >
in_pipes_t;
typedef std::vector <class reader_t*> in_pipes_t;
in_pipes_t in_pipes;
// Index of the next inbound pipe to read messages from.
......@@ -100,10 +98,7 @@ namespace zmq
in_pipes_t::size_type active;
// Outbound pipes, i.e. those the socket is sending messages to.
// The second member in the pair indicates the object on the other
// side of the pipe.
typedef std::vector <std::pair <class writer_t*, owned_t*> >
out_pipes_t;
typedef std::vector <class writer_t*> out_pipes_t;
out_pipes_t out_pipes;
// Number of I/O objects that were already asked to terminate
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment