own.cpp 5.74 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25

26
    You should have received a copy of the GNU Lesser General Public License
27 28 29
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "precompiled.hpp"
31 32 33 34
#include "own.hpp"
#include "err.hpp"
#include "io_thread.hpp"

Martin Sustrik's avatar
Martin Sustrik committed
35 36
zmq::own_t::own_t (class ctx_t *parent_, uint32_t tid_) :
    object_t (parent_, tid_),
37 38 39 40 41
    _terminating (false),
    _sent_seqnum (0),
    _processed_seqnum (0),
    _owner (NULL),
    _term_acks (0)
42 43 44
{
}

45
zmq::own_t::own_t (io_thread_t *io_thread_, const options_t &options_) :
46
    object_t (io_thread_),
47
    options (options_),
48 49 50 51 52
    _terminating (false),
    _sent_seqnum (0),
    _processed_seqnum (0),
    _owner (NULL),
    _term_acks (0)
53 54 55 56 57 58 59 60 61
{
}

zmq::own_t::~own_t ()
{
}

void zmq::own_t::set_owner (own_t *owner_)
{
62 63
    zmq_assert (!_owner);
    _owner = owner_;
64 65 66 67 68
}

void zmq::own_t::inc_seqnum ()
{
    //  This function may be called from a different thread!
69
    _sent_seqnum.add (1);
70 71 72 73 74
}

void zmq::own_t::process_seqnum ()
{
    //  Catch up with counter of processed commands.
75
    _processed_seqnum++;
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92

    //  We may have catched up and still have pending terms acks.
    check_term_acks ();
}

void zmq::own_t::launch_child (own_t *object_)
{
    //  Specify the owner of the object.
    object_->set_owner (this);

    //  Plug the object into the I/O thread.
    send_plug (object_);

    //  Take ownership of the object.
    send_own (this, object_);
}

93 94 95 96 97
void zmq::own_t::term_child (own_t *object_)
{
    process_term_req (object_);
}

98 99 100 101
void zmq::own_t::process_term_req (own_t *object_)
{
    //  When shutting down we can ignore termination requests from owned
    //  objects. The termination request was already sent to the object.
102
    if (_terminating)
103 104 105 106
        return;

    //  If not found, we assume that termination request was already sent to
    //  the object so we can safely ignore the request.
107
    if (0 == _owned.erase (object_))
108 109
        return;

110
    //  If I/O object is well and alive let's ask it to terminate.
111
    register_term_acks (1);
112 113 114

    //  Note that this object is the root of the (partial shutdown) thus, its
    //  value of linger is used, rather than the value stored by the children.
115
    send_term (object_, options.linger.load ());
116 117 118 119 120
}

void zmq::own_t::process_own (own_t *object_)
{
    //  If the object is already being shut down, new owned objects are
121
    //  immediately asked to terminate. Note that linger is set to zero.
122
    if (_terminating) {
123
        register_term_acks (1);
124
        send_term (object_, 0);
125 126 127 128
        return;
    }

    //  Store the reference to the owned object.
129
    _owned.insert (object_);
130 131 132 133 134 135
}

void zmq::own_t::terminate ()
{
    //  If termination is already underway, there's no point
    //  in starting it anew.
136
    if (_terminating)
137 138
        return;

139
    //  As for the root of the ownership tree, there's no one to terminate it,
140
    //  so it has to terminate itself.
141
    if (!_owner) {
142
        process_term (options.linger.load ());
143 144 145 146
        return;
    }

    //  If I am an owned object, I'll ask my owner to terminate me.
147
    send_term_req (_owner, this);
148 149
}

150 151
bool zmq::own_t::is_terminating ()
{
152
    return _terminating;
153 154
}

155
void zmq::own_t::process_term (int linger_)
156 157
{
    //  Double termination should never happen.
158
    zmq_assert (!_terminating);
159

160
    //  Send termination request to all owned objects.
161 162
    for (owned_t::iterator it = _owned.begin (), end = _owned.end (); it != end;
         ++it)
163
        send_term (*it, linger_);
164 165
    register_term_acks (static_cast<int> (_owned.size ()));
    _owned.clear ();
166 167 168

    //  Start termination process and check whether by chance we cannot
    //  terminate immediately.
169
    _terminating = true;
170 171 172 173 174
    check_term_acks ();
}

void zmq::own_t::register_term_acks (int count_)
{
175
    _term_acks += count_;
176 177 178 179
}

void zmq::own_t::unregister_term_ack ()
{
180 181
    zmq_assert (_term_acks > 0);
    _term_acks--;
182 183

    //  This may be a last ack we are waiting for before termination...
184
    check_term_acks ();
185 186 187 188 189 190 191 192 193
}

void zmq::own_t::process_term_ack ()
{
    unregister_term_ack ();
}

void zmq::own_t::check_term_acks ()
{
194 195
    if (_terminating && _processed_seqnum == _sent_seqnum.get ()
        && _term_acks == 0) {
196
        //  Sanity check. There should be no active children at this point.
197
        zmq_assert (_owned.empty ());
198 199 200

        //  The root object has nobody to confirm the termination to.
        //  Other nodes will confirm the termination to the owner.
201 202
        if (_owner)
            send_term_ack (_owner);
203 204

        //  Deallocate the resources.
205
        process_destroy ();
206 207 208
    }
}

209 210 211 212
void zmq::own_t::process_destroy ()
{
    delete this;
}