pipe.cpp 8.83 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the Lesser GNU General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    Lesser GNU General Public License for more details.

    You should have received a copy of the Lesser GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "../include/zmq.h"
Martin Sustrik's avatar
Martin Sustrik committed
21

Martin Sustrik's avatar
Martin Sustrik committed
22 23
#include "pipe.hpp"

Martin Hurton's avatar
Martin Hurton committed
24
zmq::reader_t::reader_t (object_t *parent_, uint64_t lwm_) :
Martin Sustrik's avatar
Martin Sustrik committed
25
    object_t (parent_),
unknown's avatar
unknown committed
26 27
    pipe (NULL),
    peer (NULL),
Martin Sustrik's avatar
Martin Sustrik committed
28
    lwm (lwm_),
Martin Hurton's avatar
Martin Hurton committed
29
    msgs_read (0),
Martin Sustrik's avatar
Martin Sustrik committed
30
    endpoint (NULL)
Martin Hurton's avatar
Martin Hurton committed
31
{}
Martin Sustrik's avatar
Martin Sustrik committed
32 33 34

zmq::reader_t::~reader_t ()
{
35 36
    if (pipe)
        unregister_pipe (pipe);
Martin Sustrik's avatar
Martin Sustrik committed
37 38
}

unknown's avatar
unknown committed
39 40 41 42
void zmq::reader_t::set_pipe (pipe_t *pipe_)
{
    zmq_assert (!pipe);
    pipe = pipe_;
43 44
    peer = &pipe->writer;
    register_pipe (pipe);
unknown's avatar
unknown committed
45 46
}

47 48 49 50 51 52 53
bool zmq::reader_t::is_delimiter (zmq_msg_t &msg_)
{
    unsigned char *offset = 0;

    return msg_.content == (void*) (offset + ZMQ_DELIMITER);
}

54 55 56 57
bool zmq::reader_t::check_read ()
{
    //  Check if there's an item in the pipe.
    //  If not, deactivate the pipe.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
    if (!pipe->check_read ()) {
        endpoint->kill (this);
        return false;
    }

    //  If the next item in the pipe is message delimiter,
    //  initiate its termination.
    if (pipe->probe (is_delimiter)) {
        if (endpoint)
            endpoint->detach_inpipe (this);
        term ();
        return false;
    }

    return true;
73 74
}

Martin Sustrik's avatar
Martin Sustrik committed
75 76
bool zmq::reader_t::read (zmq_msg_t *msg_)
{
77 78
    if (!pipe->read (msg_)) {
        endpoint->kill (this);
Martin Sustrik's avatar
Martin Sustrik committed
79
        return false;
80
    }
Martin Sustrik's avatar
Martin Sustrik committed
81 82 83 84 85 86 87 88 89

    //  If delimiter was read, start termination process of the pipe.
    unsigned char *offset = 0;
    if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
        if (endpoint)
            endpoint->detach_inpipe (this);
        term ();
        return false;
    }
Martin Sustrik's avatar
Martin Sustrik committed
90

91
    if (!(msg_->flags & ZMQ_MSG_MORE))
92 93
        msgs_read++;

Martin Hurton's avatar
Martin Hurton committed
94 95
    if (lwm > 0 && msgs_read % lwm == 0)
        send_reader_info (peer, msgs_read);
Martin Sustrik's avatar
Martin Sustrik committed
96 97

    return true;
Martin Sustrik's avatar
Martin Sustrik committed
98 99 100 101 102 103 104
}

void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
{
    endpoint = endpoint_;
}

Martin Sustrik's avatar
Martin Sustrik committed
105 106 107 108 109 110
void zmq::reader_t::term ()
{
    endpoint = NULL;
    send_pipe_term (peer);
}

Martin Sustrik's avatar
Martin Sustrik committed
111 112
void zmq::reader_t::process_revive ()
{
113 114 115 116 117
    //  Beacuse of command throttling mechanism, incoming termination request
    //  may not have been processed before subsequent send.
    //  In that case endpoint is NULL.
    if (endpoint)
        endpoint->revive (this);
Martin Sustrik's avatar
Martin Sustrik committed
118 119
}

Martin Sustrik's avatar
Martin Sustrik committed
120 121 122 123 124 125
void zmq::reader_t::process_pipe_term_ack ()
{
    peer = NULL;
    delete pipe;
}

unknown's avatar
unknown committed
126
zmq::writer_t::writer_t (object_t *parent_,
Martin Hurton's avatar
Martin Hurton committed
127
      uint64_t hwm_, int64_t swap_size_) :
Martin Sustrik's avatar
Martin Sustrik committed
128
    object_t (parent_),
unknown's avatar
unknown committed
129 130
    pipe (NULL),
    peer (NULL),
Martin Sustrik's avatar
Martin Sustrik committed
131
    hwm (hwm_),
Martin Hurton's avatar
Martin Hurton committed
132 133
    msgs_read (0),
    msgs_written (0),
Martin Hurton's avatar
Martin Hurton committed
134 135
    msg_store (NULL),
    extra_msg_flag (false),
Martin Hurton's avatar
Martin Hurton committed
136
    stalled (false),
Martin Hurton's avatar
Martin Hurton committed
137
    pending_close (false),
Martin Sustrik's avatar
Martin Sustrik committed
138 139
    endpoint (NULL)
{
Martin Hurton's avatar
Martin Hurton committed
140 141 142 143 144 145 146 147 148
    if (swap_size_ > 0) {
        msg_store = new (std::nothrow) msg_store_t (swap_size_);
        if (msg_store != NULL) {
            if (msg_store->init () < 0) {
                delete msg_store;
                msg_store = NULL;
            }
        }
    }
Martin Sustrik's avatar
Martin Sustrik committed
149 150 151 152 153 154 155
}

void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
{
    endpoint = endpoint_;
}

Martin Sustrik's avatar
Martin Sustrik committed
156 157
zmq::writer_t::~writer_t ()
{
Martin Hurton's avatar
Martin Hurton committed
158 159 160 161
    if (extra_msg_flag)
        zmq_msg_close (&extra_msg);

    delete msg_store;
Martin Sustrik's avatar
Martin Sustrik committed
162 163
}

unknown's avatar
unknown committed
164 165 166 167
void zmq::writer_t::set_pipe (pipe_t *pipe_)
{
    zmq_assert (!pipe);
    pipe = pipe_;
168
    peer = &pipe->reader;
unknown's avatar
unknown committed
169 170
}

Martin Hurton's avatar
Martin Hurton committed
171
bool zmq::writer_t::check_write ()
Martin Sustrik's avatar
Martin Sustrik committed
172
{
Martin Hurton's avatar
Martin Hurton committed
173
    if (pipe_full () && (msg_store == NULL || msg_store->full () || extra_msg_flag)) {
Martin Hurton's avatar
Martin Hurton committed
174 175 176
        stalled = true;
        return false;
    }
Martin Sustrik's avatar
Martin Sustrik committed
177 178 179 180

    return true;
}

181
bool zmq::writer_t::write (zmq_msg_t *msg_)
Martin Sustrik's avatar
Martin Sustrik committed
182
{
Martin Hurton's avatar
Martin Hurton committed
183
    if (!check_write ())
Martin Hurton's avatar
Martin Hurton committed
184
        return false;
Martin Hurton's avatar
Martin Hurton committed
185 186 187 188 189 190 191 192 193 194 195 196 197 198

    if (pipe_full ()) {
        if (msg_store->store (msg_)) {
            if (!(msg_->flags & ZMQ_MSG_MORE))
                msg_store->commit ();
        } else {
            extra_msg = *msg_;
            extra_msg_flag = true;
        }
    }
    else {
        pipe->write (*msg_, msg_->flags & ZMQ_MSG_MORE);
        if (!(msg_->flags & ZMQ_MSG_MORE))
            msgs_written++;
Martin Hurton's avatar
Martin Hurton committed
199 200
    }

Martin Sustrik's avatar
Martin Sustrik committed
201 202 203
    return true;
}

204 205
void zmq::writer_t::rollback ()
{
Martin Hurton's avatar
Martin Hurton committed
206 207 208 209
    if (extra_msg_flag && extra_msg.flags & ZMQ_MSG_MORE) {
        zmq_msg_close (&extra_msg);
        extra_msg_flag = false;
    }
Martin Hurton's avatar
Martin Hurton committed
210

Martin Hurton's avatar
Martin Hurton committed
211 212 213 214
    if (msg_store != NULL)
        msg_store->rollback ();

    zmq_msg_t msg;
215
    //  Remove all incomplete messages from the pipe.
Martin Hurton's avatar
Martin Hurton committed
216
    while (pipe->unwrite (&msg)) {
217
        zmq_assert (msg.flags & ZMQ_MSG_MORE);
218 219 220
        zmq_msg_close (&msg);
    }

Martin Hurton's avatar
Martin Hurton committed
221
    if (stalled && endpoint != NULL && check_write ()) {
Martin Hurton's avatar
Martin Hurton committed
222 223 224
        stalled = false;
        endpoint->revive (this);
    }
225 226
}

Martin Sustrik's avatar
Martin Sustrik committed
227 228 229 230 231 232
void zmq::writer_t::flush ()
{
    if (!pipe->flush ())
        send_revive (peer);
}

Martin Sustrik's avatar
Martin Sustrik committed
233 234 235 236
void zmq::writer_t::term ()
{
    endpoint = NULL;

237 238 239
    //  Rollback any unfinished messages.
    rollback ();

Martin Hurton's avatar
Martin Hurton committed
240 241 242 243 244 245 246 247
    if (msg_store == NULL || (msg_store->empty () && !extra_msg_flag))
        write_delimiter ();
    else
        pending_close = true;
}

void zmq::writer_t::write_delimiter ()
{
Martin Sustrik's avatar
Martin Sustrik committed
248 249 250 251 252
    //  Push delimiter into the pipe.
    //  Trick the compiler to belive that the tag is a valid pointer.
    zmq_msg_t msg;
    const unsigned char *offset = 0;
    msg.content = (void*) (offset + ZMQ_DELIMITER);
253
    msg.flags = 0;
254
    pipe->write (msg, false);
255
    flush ();
Martin Sustrik's avatar
Martin Sustrik committed
256 257
}

Martin Hurton's avatar
Martin Hurton committed
258 259
void zmq::writer_t::process_reader_info (uint64_t msgs_read_)
{
Martin Hurton's avatar
Martin Hurton committed
260 261
    zmq_msg_t msg;

Martin Hurton's avatar
Martin Hurton committed
262
    msgs_read = msgs_read_;
Martin Hurton's avatar
Martin Hurton committed
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
    if (msg_store) {

        //  Move messages from backing store into pipe.
        while (!pipe_full () && !msg_store->empty ()) {
            msg_store->fetch(&msg);
            //  Write message into the pipe.
            pipe->write (msg, msg.flags & ZMQ_MSG_MORE);
            if (!(msg.flags & ZMQ_MSG_MORE))
                msgs_written++;
        }

        if (extra_msg_flag) {
            if (!pipe_full ()) {
                pipe->write (extra_msg, extra_msg.flags & ZMQ_MSG_MORE);
                if (!(extra_msg.flags & ZMQ_MSG_MORE))
                    msgs_written++;
                extra_msg_flag = false;
            }
            else if (msg_store->store (&extra_msg)) {
                if (!(extra_msg.flags & ZMQ_MSG_MORE))
                    msg_store->commit ();
                extra_msg_flag = false;
            }
        }

        if (pending_close && msg_store->empty () && !extra_msg_flag) {
            write_delimiter ();
            pending_close = false;
        }

        flush ();
    }

Martin Hurton's avatar
Martin Hurton committed
296 297 298 299 300 301
    if (stalled && endpoint != NULL) {
        stalled = false;
        endpoint->revive (this);
    }
}

Martin Sustrik's avatar
Martin Sustrik committed
302 303 304 305 306 307 308 309 310 311
void zmq::writer_t::process_pipe_term ()
{
    if (endpoint)
        endpoint->detach_outpipe (this);

    reader_t *p = peer;
    peer = NULL;
    send_pipe_term_ack (p);
}

Martin Hurton's avatar
Martin Hurton committed
312 313 314 315 316
bool zmq::writer_t::pipe_full ()
{
    return hwm > 0 && msgs_written - msgs_read == hwm;
}

Martin Sustrik's avatar
Martin Sustrik committed
317
zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_,
Martin Hurton's avatar
Martin Hurton committed
318 319 320
      uint64_t hwm_, int64_t swap_size_) :
    reader (reader_parent_, compute_lwm (hwm_)),
    writer (writer_parent_, hwm_, swap_size_)
Martin Sustrik's avatar
Martin Sustrik committed
321
{
unknown's avatar
unknown committed
322 323
    reader.set_pipe (this);
    writer.set_pipe (this);
Martin Sustrik's avatar
Martin Sustrik committed
324 325 326 327
}

zmq::pipe_t::~pipe_t ()
{
Martin Sustrik's avatar
Martin Sustrik committed
328 329 330 331 332 333
    //  Deallocate all the unread messages in the pipe. We have to do it by
    //  hand because zmq_msg_t is a POD, not a class, so there's no associated
    //  destructor.
    zmq_msg_t msg;
    while (read (&msg))
       zmq_msg_close (&msg);
Martin Sustrik's avatar
Martin Sustrik committed
334
}
335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360

uint64_t zmq::pipe_t::compute_lwm (uint64_t hwm_)
{
   //  Following point should be taken into consideration when computing
   //  low watermark:
   //
   //  1. LWM has to be less than HWM.
   //  2. LWM cannot be set to very low value (such as zero) as after filling
   //     the queue it would start to refill only after all the messages are
   //     read from it and thus unnecessarily hold the progress back.
   //  3. LWM cannot be set to very high value (such as HWM-1) as it would
   //     result in lock-step filling of the queue - if a single message is read
   //     from a full queue, writer thread is resumed to write exactly one
   //     message to the queue and go back to sleep immediately. This would
   //     result in low performance.
   //
   //  Given the 3. it would be good to keep HWM and LWM as far apart as
   //  possible to reduce the thread switching overhead to almost zero,
   //  say HWM-LWM should be 500 (max_wm_delta).
   //
   //  That done, we still we have to account for the cases where HWM<500 thus
   //  driving LWM to negative numbers. Let's make LWM 1/2 of HWM in such cases.

    if (hwm_ > max_wm_delta * 2)
        return hwm_ - max_wm_delta;
    else
361
        return (hwm_ + 1) / 2;
362 363
}