Commit 4c6d07d3 authored by Martin Sustrik's avatar Martin Sustrik

single term ack counting mechanism for every socket (no separate mechanisms for fq_t and lb_t)

parent fb6ce536
...@@ -72,7 +72,6 @@ libzmq_la_SOURCES = \ ...@@ -72,7 +72,6 @@ libzmq_la_SOURCES = \
ip.hpp \ ip.hpp \
i_engine.hpp \ i_engine.hpp \
i_poll_events.hpp \ i_poll_events.hpp \
i_terminate_events.hpp \
kqueue.hpp \ kqueue.hpp \
lb.hpp \ lb.hpp \
likely.hpp \ likely.hpp \
......
...@@ -22,9 +22,9 @@ ...@@ -22,9 +22,9 @@
#include "fq.hpp" #include "fq.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "i_terminate_events.hpp" #include "own.hpp"
zmq::fq_t::fq_t (i_terminate_events *sink_) : zmq::fq_t::fq_t (own_t *sink_) :
active (0), active (0),
current (0), current (0),
more (false), more (false),
...@@ -47,8 +47,10 @@ void zmq::fq_t::attach (reader_t *pipe_) ...@@ -47,8 +47,10 @@ void zmq::fq_t::attach (reader_t *pipe_)
active++; active++;
// If we are already terminating, ask the pipe to terminate straight away. // If we are already terminating, ask the pipe to terminate straight away.
if (terminating) if (terminating) {
sink->register_term_acks (1);
pipe_->terminate (); pipe_->terminate ();
}
} }
void zmq::fq_t::terminated (reader_t *pipe_) void zmq::fq_t::terminated (reader_t *pipe_)
...@@ -67,8 +69,8 @@ void zmq::fq_t::terminated (reader_t *pipe_) ...@@ -67,8 +69,8 @@ void zmq::fq_t::terminated (reader_t *pipe_)
} }
pipes.erase (pipe_); pipes.erase (pipe_);
if (terminating && pipes.empty ()) if (terminating)
sink->terminated (); sink->unregister_term_ack ();
} }
void zmq::fq_t::terminate () void zmq::fq_t::terminate ()
...@@ -76,11 +78,7 @@ void zmq::fq_t::terminate () ...@@ -76,11 +78,7 @@ void zmq::fq_t::terminate ()
zmq_assert (!terminating); zmq_assert (!terminating);
terminating = true; terminating = true;
if (pipes.empty ()) { sink->register_term_acks (pipes.size ());
sink->terminated ();
return;
}
for (pipes_t::size_type i = 0; i != pipes.size (); i++) for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate (); pipes [i]->terminate ();
} }
......
...@@ -33,7 +33,7 @@ namespace zmq ...@@ -33,7 +33,7 @@ namespace zmq
{ {
public: public:
fq_t (struct i_terminate_events *sink_); fq_t (class own_t *sink_);
~fq_t (); ~fq_t ();
void attach (reader_t *pipe_); void attach (reader_t *pipe_);
...@@ -64,7 +64,7 @@ namespace zmq ...@@ -64,7 +64,7 @@ namespace zmq
bool more; bool more;
// Object to send events to. // Object to send events to.
i_terminate_events *sink; class own_t *sink;
// If true, termination process is already underway. // If true, termination process is already underway.
bool terminating; bool terminating;
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
#define __ZMQ_I_TERMINATE_EVENTS_HPP_INCLUDED__
namespace zmq
{
// Algorithms such as fair queueing (fq_t) and load balancing (lb_t)
// use this interface to communicate termination event to the socket.
struct i_terminate_events
{
virtual ~i_terminate_events () {}
virtual void terminated () = 0;
};
}
#endif
...@@ -22,9 +22,9 @@ ...@@ -22,9 +22,9 @@
#include "lb.hpp" #include "lb.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "err.hpp" #include "err.hpp"
#include "i_terminate_events.hpp" #include "own.hpp"
zmq::lb_t::lb_t (i_terminate_events *sink_) : zmq::lb_t::lb_t (own_t *sink_) :
active (0), active (0),
current (0), current (0),
more (false), more (false),
...@@ -46,8 +46,10 @@ void zmq::lb_t::attach (writer_t *pipe_) ...@@ -46,8 +46,10 @@ void zmq::lb_t::attach (writer_t *pipe_)
pipes.swap (active, pipes.size () - 1); pipes.swap (active, pipes.size () - 1);
active++; active++;
if (terminating) if (terminating) {
sink->register_term_acks (1);
pipe_->terminate (); pipe_->terminate ();
}
} }
void zmq::lb_t::terminate () void zmq::lb_t::terminate ()
...@@ -55,11 +57,7 @@ void zmq::lb_t::terminate () ...@@ -55,11 +57,7 @@ void zmq::lb_t::terminate ()
zmq_assert (!terminating); zmq_assert (!terminating);
terminating = true; terminating = true;
if (pipes.empty ()) { sink->register_term_acks (pipes.size ());
sink->terminated ();
return;
}
for (pipes_t::size_type i = 0; i != pipes.size (); i++) for (pipes_t::size_type i = 0; i != pipes.size (); i++)
pipes [i]->terminate (); pipes [i]->terminate ();
} }
...@@ -75,8 +73,8 @@ void zmq::lb_t::terminated (writer_t *pipe_) ...@@ -75,8 +73,8 @@ void zmq::lb_t::terminated (writer_t *pipe_)
} }
pipes.erase (pipe_); pipes.erase (pipe_);
if (terminating && pipes.empty ()) if (terminating)
sink->terminated (); sink->unregister_term_ack ();
} }
void zmq::lb_t::activated (writer_t *pipe_) void zmq::lb_t::activated (writer_t *pipe_)
......
...@@ -32,7 +32,7 @@ namespace zmq ...@@ -32,7 +32,7 @@ namespace zmq
{ {
public: public:
lb_t (struct i_terminate_events *sink_); lb_t (class own_t *sink_);
~lb_t (); ~lb_t ();
void attach (writer_t *pipe_); void attach (writer_t *pipe_);
...@@ -61,7 +61,7 @@ namespace zmq ...@@ -61,7 +61,7 @@ namespace zmq
bool more; bool more;
// Object to send events to. // Object to send events to.
struct i_terminate_events *sink; class own_t *sink;
// If true, termination process is already underway. // If true, termination process is already underway.
bool terminating; bool terminating;
......
...@@ -52,6 +52,14 @@ namespace zmq ...@@ -52,6 +52,14 @@ namespace zmq
// before the command is delivered. // before the command is delivered.
void inc_seqnum (); void inc_seqnum ();
// Use following two functions to wait for arbitrary events before
// terminating. Just add number of events to wait for using
// register_tem_acks functions. When event occurs, call
// remove_term_ack. When number of pending acks reaches zero
// object will be deallocated.
void register_term_acks (int count_);
void unregister_term_ack ();
protected: protected:
// Launch the supplied object and become its owner. // Launch the supplied object and become its owner.
...@@ -77,14 +85,6 @@ namespace zmq ...@@ -77,14 +85,6 @@ namespace zmq
// steps to the beginning of the termination process. // steps to the beginning of the termination process.
void process_term (); void process_term ();
// Use following two functions to wait for arbitrary events before
// terminating. Just add number of events to wait for using
// register_tem_acks functions. When event occurs, call
// remove_term_ack. When number of pending acks reaches zero
// object will be deallocated.
void register_term_acks (int count_);
void unregister_term_ack ();
// A place to hook in when phyicallal destruction of the object // A place to hook in when phyicallal destruction of the object
// is to be delayed. // is to be delayed.
virtual void process_destroy (); virtual void process_destroy ();
......
...@@ -43,17 +43,10 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_, ...@@ -43,17 +43,10 @@ void zmq::pull_t::xattach_pipes (class reader_t *inpipe_,
void zmq::pull_t::process_term () void zmq::pull_t::process_term ()
{ {
register_term_acks (1);
fq.terminate (); fq.terminate ();
socket_base_t::process_term (); socket_base_t::process_term ();
} }
void zmq::pull_t::terminated ()
{
unregister_term_ack ();
}
int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::pull_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
return fq.recv (msg_, flags_); return fq.recv (msg_, flags_);
......
...@@ -20,14 +20,13 @@ ...@@ -20,14 +20,13 @@
#ifndef __ZMQ_PULL_HPP_INCLUDED__ #ifndef __ZMQ_PULL_HPP_INCLUDED__
#define __ZMQ_PULL_HPP_INCLUDED__ #define __ZMQ_PULL_HPP_INCLUDED__
#include "i_terminate_events.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
class pull_t : public socket_base_t, public i_terminate_events class pull_t : public socket_base_t
{ {
public: public:
...@@ -44,9 +43,6 @@ namespace zmq ...@@ -44,9 +43,6 @@ namespace zmq
private: private:
// i_terminate_events interface implementation.
void terminated ();
// Hook into the termination process. // Hook into the termination process.
void process_term (); void process_term ();
......
...@@ -44,17 +44,10 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_, ...@@ -44,17 +44,10 @@ void zmq::push_t::xattach_pipes (class reader_t *inpipe_,
void zmq::push_t::process_term () void zmq::push_t::process_term ()
{ {
register_term_acks (1);
lb.terminate (); lb.terminate ();
socket_base_t::process_term (); socket_base_t::process_term ();
} }
void zmq::push_t::terminated ()
{
unregister_term_ack ();
}
int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::push_t::xsend (zmq_msg_t *msg_, int flags_)
{ {
return lb.send (msg_, flags_); return lb.send (msg_, flags_);
......
...@@ -20,14 +20,13 @@ ...@@ -20,14 +20,13 @@
#ifndef __ZMQ_PUSH_HPP_INCLUDED__ #ifndef __ZMQ_PUSH_HPP_INCLUDED__
#define __ZMQ_PUSH_HPP_INCLUDED__ #define __ZMQ_PUSH_HPP_INCLUDED__
#include "i_terminate_events.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "lb.hpp" #include "lb.hpp"
namespace zmq namespace zmq
{ {
class push_t : public socket_base_t, public i_terminate_events class push_t : public socket_base_t
{ {
public: public:
...@@ -44,9 +43,6 @@ namespace zmq ...@@ -44,9 +43,6 @@ namespace zmq
private: private:
// i_terminate_events interface implementation.
void terminated ();
// Hook into the termination process. // Hook into the termination process.
void process_term (); void process_term ();
......
...@@ -49,17 +49,10 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_, ...@@ -49,17 +49,10 @@ void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
void zmq::sub_t::process_term () void zmq::sub_t::process_term ()
{ {
register_term_acks (1);
fq.terminate (); fq.terminate ();
socket_base_t::process_term (); socket_base_t::process_term ();
} }
void zmq::sub_t::terminated ()
{
unregister_term_ack ();
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_, int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
......
...@@ -24,13 +24,12 @@ ...@@ -24,13 +24,12 @@
#include "trie.hpp" #include "trie.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "i_terminate_events.hpp"
#include "fq.hpp" #include "fq.hpp"
namespace zmq namespace zmq
{ {
class sub_t : public socket_base_t, public i_terminate_events class sub_t : public socket_base_t
{ {
public: public:
...@@ -48,9 +47,6 @@ namespace zmq ...@@ -48,9 +47,6 @@ namespace zmq
private: private:
// i_terminate_events interface implementation.
void terminated ();
// Hook into the termination process. // Hook into the termination process.
void process_term (); void process_term ();
......
...@@ -45,18 +45,11 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_, ...@@ -45,18 +45,11 @@ void zmq::xreq_t::xattach_pipes (class reader_t *inpipe_,
void zmq::xreq_t::process_term () void zmq::xreq_t::process_term ()
{ {
register_term_acks (2);
fq.terminate (); fq.terminate ();
lb.terminate (); lb.terminate ();
socket_base_t::process_term (); socket_base_t::process_term ();
} }
void zmq::xreq_t::terminated ()
{
unregister_term_ack ();
}
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
{ {
return lb.send (msg_, flags_); return lb.send (msg_, flags_);
......
...@@ -21,14 +21,13 @@ ...@@ -21,14 +21,13 @@
#define __ZMQ_XREQ_HPP_INCLUDED__ #define __ZMQ_XREQ_HPP_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "i_terminate_events.hpp"
#include "fq.hpp" #include "fq.hpp"
#include "lb.hpp" #include "lb.hpp"
namespace zmq namespace zmq
{ {
class xreq_t : public socket_base_t, public i_terminate_events class xreq_t : public socket_base_t
{ {
public: public:
...@@ -47,9 +46,6 @@ namespace zmq ...@@ -47,9 +46,6 @@ namespace zmq
private: private:
// i_terminate_events interface implementation.
void terminated ();
// Hook into the termination process. // Hook into the termination process.
void process_term (); void process_term ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment