Commit 936dbf95 authored by Martin Sustrik's avatar Martin Sustrik

dezombification procedure fixed

parent 76bd6e73
...@@ -97,7 +97,7 @@ zmq::ctx_t::~ctx_t () ...@@ -97,7 +97,7 @@ zmq::ctx_t::~ctx_t ()
#endif #endif
} }
int zmq::ctx_t::term () int zmq::ctx_t::terminate ()
{ {
// First send stop command to sockets so that any // First send stop command to sockets so that any
// blocking calls are interrupted. // blocking calls are interrupted.
...@@ -115,12 +115,15 @@ int zmq::ctx_t::term () ...@@ -115,12 +115,15 @@ int zmq::ctx_t::term ()
if (no_sockets_notify) if (no_sockets_notify)
no_sockets_sync.wait (); no_sockets_sync.wait ();
// At this point there's only one application thread (this one) remaining. // At this point there should be no active sockets. What we have is a set
// We don't even have to synchronise access to data. // of zombies waiting to be dezombified.
zmq_assert (sockets.empty ()); zmq_assert (sockets.empty ());
// TODO: We are accessing the list of zombies in unsynchronised manner here! // Get rid of remaining zombie sockets. Note that the lock won't block
// Get rid of remaining zombie sockets. // anyone here. There's noone else having open sockets anyway. The only
// purpose of the lock is to double-check all the CPU caches have been
// synchronised.
slot_sync.lock ();
while (!zombies.empty ()) { while (!zombies.empty ()) {
dezombify (); dezombify ();
...@@ -134,6 +137,7 @@ int zmq::ctx_t::term () ...@@ -134,6 +137,7 @@ int zmq::ctx_t::term ()
usleep (1000); usleep (1000);
#endif #endif
} }
slot_sync.unlock ();
// Deallocate the resources. // Deallocate the resources.
delete this; delete this;
...@@ -197,6 +201,22 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_) ...@@ -197,6 +201,22 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_)
slot_sync.unlock (); slot_sync.unlock ();
} }
void zmq::ctx_t::dezombify_socket (socket_base_t *socket_)
{
// We assume that this function is called only within dezombification
// process, which in turn is running within a slot_sync critical section.
// Therefore, we need no locking here.
// TODO: Can we do this better than O(n)?
zombies_t::iterator it = std::find (zombies.begin (), zombies.end (),
socket_);
zmq_assert (it != zombies.end ());
// Move from the slot from 'zombie' to 'empty' state.
empty_slots.push_back ((*it)->get_slot ());
zombies.erase (it);
}
void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
{ {
slots [slot_]->send (command_); slots [slot_]->send (command_);
...@@ -287,12 +307,14 @@ void zmq::ctx_t::dezombify () ...@@ -287,12 +307,14 @@ void zmq::ctx_t::dezombify ()
{ {
// Try to dezombify each zombie in the list. Note that caller is // Try to dezombify each zombie in the list. Note that caller is
// responsible for calling this method in the slot_sync critical section. // responsible for calling this method in the slot_sync critical section.
for (zombies_t::size_type i = 0; i != zombies.size ();) zombies_t::iterator it = zombies.begin ();
if (zombies [i]->dezombify ()) { while (it != zombies.end ()) {
empty_slots.push_back (zombies [i]->get_slot ()); zombies_t::iterator old = it;
zombies.erase (zombies [i]); ++it;
}
else // dezombify_socket can be called here that will invalidate
i++; // the iterator. That's why we've got the next zombie beforehand.
(*old)->dezombify ();
}
} }
...@@ -20,9 +20,10 @@ ...@@ -20,9 +20,10 @@
#ifndef __ZMQ_CTX_HPP_INCLUDED__ #ifndef __ZMQ_CTX_HPP_INCLUDED__
#define __ZMQ_CTX_HPP_INCLUDED__ #define __ZMQ_CTX_HPP_INCLUDED__
#include <vector>
#include <set> #include <set>
#include <map> #include <map>
#include <list>
#include <vector>
#include <string> #include <string>
#include "signaler.hpp" #include "signaler.hpp"
...@@ -52,7 +53,7 @@ namespace zmq ...@@ -52,7 +53,7 @@ namespace zmq
// no more sockets open it'll cause all the infrastructure to be shut // no more sockets open it'll cause all the infrastructure to be shut
// down. If there are open sockets still, the deallocation happens // down. If there are open sockets still, the deallocation happens
// after the last one is closed. // after the last one is closed.
int term (); int terminate ();
// Create a socket. // Create a socket.
class socket_base_t *create_socket (int type_); class socket_base_t *create_socket (int type_);
...@@ -60,6 +61,9 @@ namespace zmq ...@@ -60,6 +61,9 @@ namespace zmq
// Make socket a zombie. // Make socket a zombie.
void zombify_socket (socket_base_t *socket_); void zombify_socket (socket_base_t *socket_);
// Kill the zombie socket.
void dezombify_socket (socket_base_t *socket_);
// Send command to the destination slot. // Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_); void send_command (uint32_t slot_, const command_t &command_);
...@@ -83,9 +87,9 @@ namespace zmq ...@@ -83,9 +87,9 @@ namespace zmq
typedef yarray_t <socket_base_t> sockets_t; typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets; sockets_t sockets;
// Array of sockets that were already closed but not yet deallocated. // List of sockets that were already closed but not yet deallocated.
// These sockets still have some pipes and I/O objects attached. // These sockets still have some pipes and I/O objects attached.
typedef yarray_t <socket_base_t> zombies_t; typedef std::list <socket_base_t*> zombies_t;
zombies_t zombies; zombies_t zombies;
// List of unused slots. // List of unused slots.
......
...@@ -147,6 +147,11 @@ void zmq::object_t::zombify_socket (socket_base_t *socket_) ...@@ -147,6 +147,11 @@ void zmq::object_t::zombify_socket (socket_base_t *socket_)
ctx->zombify_socket (socket_); ctx->zombify_socket (socket_);
} }
void zmq::object_t::dezombify_socket (socket_base_t *socket_)
{
ctx->dezombify_socket (socket_);
}
void zmq::object_t::send_stop () void zmq::object_t::send_stop ()
{ {
// 'stop' command goes always from administrative thread to // 'stop' command goes always from administrative thread to
......
...@@ -55,6 +55,9 @@ namespace zmq ...@@ -55,6 +55,9 @@ namespace zmq
// the context. // the context.
void zombify_socket (class socket_base_t *socket_); void zombify_socket (class socket_base_t *socket_);
// Dezombify particular socket, i.e. destroy it.
void dezombify_socket (class socket_base_t *socket_);
// Derived object can use these functions to send commands // Derived object can use these functions to send commands
// to other objects. // to other objects.
void send_stop (); void send_stop ();
......
...@@ -118,10 +118,15 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : ...@@ -118,10 +118,15 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
zmq::socket_base_t::~socket_base_t () zmq::socket_base_t::~socket_base_t ()
{ {
zmq_assert (zombie);
// Check whether there are no session leaks. // Check whether there are no session leaks.
sessions_sync.lock (); sessions_sync.lock ();
zmq_assert (sessions.empty ()); zmq_assert (sessions.empty ());
sessions_sync.unlock (); sessions_sync.unlock ();
// Mark the socket slot as empty.
dezombify_socket (this);
} }
zmq::signaler_t *zmq::socket_base_t::get_signaler () zmq::signaler_t *zmq::socket_base_t::get_signaler ()
...@@ -599,16 +604,13 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) ...@@ -599,16 +604,13 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
return session; return session;
} }
bool zmq::socket_base_t::dezombify () void zmq::socket_base_t::dezombify ()
{ {
zmq_assert (zombie); zmq_assert (zombie);
// Process any commands from other threads/sockets that may be available // Process any commands from other threads/sockets that may be available
// at the moment. Ultimately, socket will be destroyed. // at the moment. Ultimately, socket will be destroyed.
process_commands (false, false); process_commands (false, false);
// TODO: ???
return true;
} }
void zmq::socket_base_t::process_commands (bool block_, bool throttle_) void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
......
...@@ -85,8 +85,8 @@ namespace zmq ...@@ -85,8 +85,8 @@ namespace zmq
void terminated (class writer_t *pipe_); void terminated (class writer_t *pipe_);
// This function should be called only on zombie sockets. It tries // This function should be called only on zombie sockets. It tries
// to deallocate the zombie. Returns true if zombie is finally dead. // to deallocate the zombie.
bool dezombify (); void dezombify ();
protected: protected:
......
...@@ -269,7 +269,7 @@ void *zmq_init (int io_threads_) ...@@ -269,7 +269,7 @@ void *zmq_init (int io_threads_)
int zmq_term (void *ctx_) int zmq_term (void *ctx_)
{ {
int rc = ((zmq::ctx_t*) ctx_)->term (); int rc = ((zmq::ctx_t*) ctx_)->terminate ();
int en = errno; int en = errno;
if (!ctx_) { if (!ctx_) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment