Commit 45f83d78 authored by Martin Sustrik's avatar Martin Sustrik

one more dezombification bug fixed

parent 936dbf95
...@@ -201,22 +201,6 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_) ...@@ -201,22 +201,6 @@ void zmq::ctx_t::zombify_socket (socket_base_t *socket_)
slot_sync.unlock (); slot_sync.unlock ();
} }
void zmq::ctx_t::dezombify_socket (socket_base_t *socket_)
{
// We assume that this function is called only within dezombification
// process, which in turn is running within a slot_sync critical section.
// Therefore, we need no locking here.
// TODO: Can we do this better than O(n)?
zombies_t::iterator it = std::find (zombies.begin (), zombies.end (),
socket_);
zmq_assert (it != zombies.end ());
// Move from the slot from 'zombie' to 'empty' state.
empty_slots.push_back ((*it)->get_slot ());
zombies.erase (it);
}
void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_) void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
{ {
slots [slot_]->send (command_); slots [slot_]->send (command_);
...@@ -307,14 +291,15 @@ void zmq::ctx_t::dezombify () ...@@ -307,14 +291,15 @@ void zmq::ctx_t::dezombify ()
{ {
// Try to dezombify each zombie in the list. Note that caller is // Try to dezombify each zombie in the list. Note that caller is
// responsible for calling this method in the slot_sync critical section. // responsible for calling this method in the slot_sync critical section.
zombies_t::iterator it = zombies.begin (); for (zombies_t::iterator it = zombies.begin (); it != zombies.end ();) {
while (it != zombies.end ()) { uint32_t slot = (*it)->get_slot ();
zombies_t::iterator old = it; if ((*it)->dezombify ()) {
++it; zombies.erase (it);
empty_slots.push_back (slot);
// dezombify_socket can be called here that will invalidate slots [slot] = NULL;
// the iterator. That's why we've got the next zombie beforehand. }
(*old)->dezombify (); else
it++;
} }
} }
...@@ -20,9 +20,7 @@ ...@@ -20,9 +20,7 @@
#ifndef __ZMQ_CTX_HPP_INCLUDED__ #ifndef __ZMQ_CTX_HPP_INCLUDED__
#define __ZMQ_CTX_HPP_INCLUDED__ #define __ZMQ_CTX_HPP_INCLUDED__
#include <set>
#include <map> #include <map>
#include <list>
#include <vector> #include <vector>
#include <string> #include <string>
...@@ -61,9 +59,6 @@ namespace zmq ...@@ -61,9 +59,6 @@ namespace zmq
// Make socket a zombie. // Make socket a zombie.
void zombify_socket (socket_base_t *socket_); void zombify_socket (socket_base_t *socket_);
// Kill the zombie socket.
void dezombify_socket (socket_base_t *socket_);
// Send command to the destination slot. // Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_); void send_command (uint32_t slot_, const command_t &command_);
...@@ -89,7 +84,7 @@ namespace zmq ...@@ -89,7 +84,7 @@ namespace zmq
// List of sockets that were already closed but not yet deallocated. // List of sockets that were already closed but not yet deallocated.
// These sockets still have some pipes and I/O objects attached. // These sockets still have some pipes and I/O objects attached.
typedef std::list <socket_base_t*> zombies_t; typedef std::vector <socket_base_t*> zombies_t;
zombies_t zombies; zombies_t zombies;
// List of unused slots. // List of unused slots.
......
...@@ -147,11 +147,6 @@ void zmq::object_t::zombify_socket (socket_base_t *socket_) ...@@ -147,11 +147,6 @@ void zmq::object_t::zombify_socket (socket_base_t *socket_)
ctx->zombify_socket (socket_); ctx->zombify_socket (socket_);
} }
void zmq::object_t::dezombify_socket (socket_base_t *socket_)
{
ctx->dezombify_socket (socket_);
}
void zmq::object_t::send_stop () void zmq::object_t::send_stop ()
{ {
// 'stop' command goes always from administrative thread to // 'stop' command goes always from administrative thread to
......
...@@ -55,9 +55,6 @@ namespace zmq ...@@ -55,9 +55,6 @@ namespace zmq
// the context. // the context.
void zombify_socket (class socket_base_t *socket_); void zombify_socket (class socket_base_t *socket_);
// Dezombify particular socket, i.e. destroy it.
void dezombify_socket (class socket_base_t *socket_);
// Derived object can use these functions to send commands // Derived object can use these functions to send commands
// to other objects. // to other objects.
void send_stop (); void send_stop ();
......
...@@ -192,7 +192,12 @@ void zmq::own_t::check_term_acks () ...@@ -192,7 +192,12 @@ void zmq::own_t::check_term_acks ()
send_term_ack (owner); send_term_ack (owner);
// Deallocate the resources. // Deallocate the resources.
delete this; process_destroy ();
} }
} }
void zmq::own_t::process_destroy ()
{
delete this;
}
...@@ -85,6 +85,10 @@ namespace zmq ...@@ -85,6 +85,10 @@ namespace zmq
void register_term_acks (int count_); void register_term_acks (int count_);
void unregister_term_ack (); void unregister_term_ack ();
// A place to hook in when phyicallal destruction of the object
// is to be delayed.
virtual void process_destroy ();
private: private:
// Set owner of the object // Set owner of the object
...@@ -94,7 +98,6 @@ namespace zmq ...@@ -94,7 +98,6 @@ namespace zmq
void process_own (own_t *object_); void process_own (own_t *object_);
void process_term_req (own_t *object_); void process_term_req (own_t *object_);
void process_term_ack (); void process_term_ack ();
void process_seqnum (); void process_seqnum ();
// Check whether all the peding term acks were delivered. // Check whether all the peding term acks were delivered.
......
...@@ -110,6 +110,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, ...@@ -110,6 +110,7 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
own_t (parent_, slot_), own_t (parent_, slot_),
zombie (false), zombie (false),
destroyed (false),
last_processing_time (0), last_processing_time (0),
ticks (0), ticks (0),
rcvmore (false) rcvmore (false)
...@@ -118,15 +119,12 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) : ...@@ -118,15 +119,12 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t slot_) :
zmq::socket_base_t::~socket_base_t () zmq::socket_base_t::~socket_base_t ()
{ {
zmq_assert (zombie); zmq_assert (zombie && destroyed);
// Check whether there are no session leaks. // Check whether there are no session leaks.
sessions_sync.lock (); sessions_sync.lock ();
zmq_assert (sessions.empty ()); zmq_assert (sessions.empty ());
sessions_sync.unlock (); sessions_sync.unlock ();
// Mark the socket slot as empty.
dezombify_socket (this);
} }
zmq::signaler_t *zmq::socket_base_t::get_signaler () zmq::signaler_t *zmq::socket_base_t::get_signaler ()
...@@ -604,13 +602,21 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_) ...@@ -604,13 +602,21 @@ zmq::session_t *zmq::socket_base_t::find_session (const blob_t &peer_identity_)
return session; return session;
} }
void zmq::socket_base_t::dezombify () bool zmq::socket_base_t::dezombify ()
{ {
zmq_assert (zombie); zmq_assert (zombie);
// Process any commands from other threads/sockets that may be available // Process any commands from other threads/sockets that may be available
// at the moment. Ultimately, socket will be destroyed. // at the moment. Ultimately, socket will be destroyed.
process_commands (false, false); process_commands (false, false);
// If the object was already marked as destroyed, finish the deallocation.
if (destroyed) {
own_t::process_destroy ();
return true;
}
return false;
} }
void zmq::socket_base_t::process_commands (bool block_, bool throttle_) void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
...@@ -705,6 +711,11 @@ void zmq::socket_base_t::process_term () ...@@ -705,6 +711,11 @@ void zmq::socket_base_t::process_term ()
own_t::process_term (); own_t::process_term ();
} }
void zmq::socket_base_t::process_destroy ()
{
destroyed = true;
}
int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
......
...@@ -85,8 +85,8 @@ namespace zmq ...@@ -85,8 +85,8 @@ namespace zmq
void terminated (class writer_t *pipe_); void terminated (class writer_t *pipe_);
// This function should be called only on zombie sockets. It tries // This function should be called only on zombie sockets. It tries
// to deallocate the zombie. // to deallocate the zombie. Returns true is object is destroyed.
void dezombify (); bool dezombify ();
protected: protected:
...@@ -120,6 +120,9 @@ namespace zmq ...@@ -120,6 +120,9 @@ namespace zmq
// by overloading it. // by overloading it.
void process_term (); void process_term ();
// Delay actual destruction of the socket.
void process_destroy ();
private: private:
// TODO: Check whether we still need this flag... // TODO: Check whether we still need this flag...
...@@ -128,6 +131,11 @@ namespace zmq ...@@ -128,6 +131,11 @@ namespace zmq
// attached to the socket. // attached to the socket.
bool zombie; bool zombie;
// If true, object should have been already destroyed. However,
// destruction is delayed while we unwind the stack to the point
// where it doesn't intersect the object being destroyed.
bool destroyed;
// Check whether transport protocol, as specified in connect or // Check whether transport protocol, as specified in connect or
// bind, is available and compatible with the socket type. // bind, is available and compatible with the socket type.
int check_protocol (const std::string &protocol_); int check_protocol (const std::string &protocol_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment