pair.cpp 3.73 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2010 iMatix Corporation
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
15

16
    You should have received a copy of the GNU Lesser General Public License
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "../include/zmq.h"
21

Martin Sustrik's avatar
Martin Sustrik committed
22
#include "pair.hpp"
23
#include "err.hpp"
24
#include "pipe.hpp"
25

Martin Sustrik's avatar
Martin Sustrik committed
26 27
zmq::pair_t::pair_t (class ctx_t *parent_, uint32_t tid_) :
    socket_base_t (parent_, tid_),
28 29
    inpipe (NULL),
    outpipe (NULL),
30
    inpipe_alive (false),
31 32
    outpipe_alive (false),
    terminating (false)
33
{
34
    options.type = ZMQ_PAIR;
35 36
    options.requires_in = true;
    options.requires_out = true;
37 38
}

Martin Sustrik's avatar
Martin Sustrik committed
39
zmq::pair_t::~pair_t ()
40
{
41 42
    zmq_assert (!inpipe);
    zmq_assert (!outpipe);
43 44
}

45 46
void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
    const blob_t &peer_identity_)
47
{
48
    zmq_assert (!inpipe && !outpipe);
49

50
    inpipe = inpipe_;
51 52 53
    inpipe_alive = true;
    inpipe->set_event_sink (this);

54
    outpipe = outpipe_;
55
    outpipe_alive = true;
56
    outpipe->set_event_sink (this);
57 58 59 60 61 62

    if (terminating) {
        register_term_acks (2);
        inpipe_->terminate ();
        outpipe_->terminate ();
    }
63 64
}

65
void zmq::pair_t::terminated (reader_t *pipe_)
66
{
67 68
    zmq_assert (pipe_ == inpipe);
    inpipe = NULL;
69
    inpipe_alive = false;
70 71 72

    if (terminating)
        unregister_term_ack ();
73 74
}

75
void zmq::pair_t::terminated (writer_t *pipe_)
76
{
77 78
    zmq_assert (pipe_ == outpipe);
    outpipe = NULL;
79
    outpipe_alive = false;
80

81 82
    if (terminating)
        unregister_term_ack ();
83 84
}

85 86 87 88
void  zmq::pair_t::delimited (reader_t *pipe_)
{
}

89
void zmq::pair_t::process_term (int linger_)
90
{
91 92
    terminating = true;

93 94 95 96 97 98 99 100 101
    if (inpipe) {
        register_term_acks (1);
        inpipe->terminate ();
    }

    if (outpipe) {
        register_term_acks (1);
        outpipe->terminate ();
    }
102

103
    socket_base_t::process_term (linger_);
104 105
}

106
void zmq::pair_t::activated (class reader_t *pipe_)
Martin Hurton's avatar
Martin Hurton committed
107
{
108 109
    zmq_assert (!inpipe_alive);
    inpipe_alive = true;
Martin Hurton's avatar
Martin Hurton committed
110 111
}

112
void zmq::pair_t::activated (class writer_t *pipe_)
113
{
114 115
    zmq_assert (!outpipe_alive);
    outpipe_alive = true;
116 117
}

Martin Sustrik's avatar
Martin Sustrik committed
118
int zmq::pair_t::xsend (zmq_msg_t *msg_, int flags_)
119
{
120 121 122 123 124 125 126
    if (outpipe == NULL || !outpipe_alive) {
        errno = EAGAIN;
        return -1;
    }

    if (!outpipe->write (msg_)) {
        outpipe_alive = false;
127 128 129 130
        errno = EAGAIN;
        return -1;
    }

131 132
    if (!(flags_ & ZMQ_SNDMORE))
        outpipe->flush ();
133 134 135 136 137

    //  Detach the original message from the data buffer.
    int rc = zmq_msg_init (msg_);
    zmq_assert (rc == 0);

Martin Sustrik's avatar
Martin Sustrik committed
138
    return 0;
139 140
}

Martin Sustrik's avatar
Martin Sustrik committed
141
int zmq::pair_t::xrecv (zmq_msg_t *msg_, int flags_)
142
{
143 144 145
    //  Deallocate old content of the message.
    zmq_msg_close (msg_);

146 147 148 149 150 151
    if (!inpipe_alive || !inpipe || !inpipe->read (msg_)) {

        //  No message is available.
        inpipe_alive = false;

        //  Initialise the output parameter to be a 0-byte message.
152
        zmq_msg_init (msg_);
153 154 155
        errno = EAGAIN;
        return -1;
    }
Martin Sustrik's avatar
Martin Sustrik committed
156
    return 0;
157 158
}

Martin Sustrik's avatar
Martin Sustrik committed
159
bool zmq::pair_t::xhas_in ()
160
{
161 162 163 164 165
    if (!inpipe || !inpipe_alive)
        return false;

    inpipe_alive = inpipe->check_read ();
    return inpipe_alive;
166 167
}

Martin Sustrik's avatar
Martin Sustrik committed
168
bool zmq::pair_t::xhas_out ()
169
{
170
    if (!outpipe || !outpipe_alive)
171 172 173 174
        return false;

    outpipe_alive = outpipe->check_write ();
    return outpipe_alive;
175
}
176