test_router_mandatory.cpp 9.2 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "testutil.hpp"
31

32
#ifdef ZMQ_BUILD_DRAFT_API
33
bool send_msg_to_peer_if_ready (void *router, const char *peer_routing_id)
34
{
35
    int rc = zmq_socket_get_peer_state (router, peer_routing_id, 1);
36
    if (rc == -1)
37
        printf ("zmq_socket_get_peer_state failed for %s: %i\n", peer_routing_id,
38 39 40
                errno);
    assert (rc != -1);
    if (rc & ZMQ_POLLOUT) {
41
        rc = zmq_send (router, peer_routing_id, 1, ZMQ_SNDMORE | ZMQ_DONTWAIT);
42 43 44 45 46 47 48 49 50 51
        assert (rc == 1);
        rc = zmq_send (router, "Hello", 5, ZMQ_DONTWAIT);
        assert (rc == 5);

        return true;
    }
    return false;
}
#endif

52 53 54 55 56 57 58 59
void test_get_peer_state ()
{
#ifdef ZMQ_BUILD_DRAFT_API
    void *ctx = zmq_ctx_new ();
    assert (ctx);
    void *router = zmq_socket (ctx, ZMQ_ROUTER);
    assert (router);

60
    int rc;
61 62 63 64
    int mandatory = 1;
    rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory,
                         sizeof (mandatory));

65 66
    const char *my_endpoint = "inproc://test_get_peer_state";
    rc = zmq_bind (router, my_endpoint);
67 68
    assert (rc == 0);

69 70
    void *dealer1 = zmq_socket (ctx, ZMQ_DEALER);
    assert (dealer1);
71 72 73 74 75 76

    void *dealer2 = zmq_socket (ctx, ZMQ_DEALER);
    assert (dealer2);

    //  Lower HWMs to allow doing the test with fewer messages
    int hwm = 100;
77 78 79 80 81 82 83
    rc = zmq_setsockopt (router, ZMQ_SNDHWM, &hwm, sizeof (int));
    assert (rc == 0);
    rc = zmq_setsockopt (dealer1, ZMQ_RCVHWM, &hwm, sizeof (int));
    assert (rc == 0);
    rc = zmq_setsockopt (dealer2, ZMQ_RCVHWM, &hwm, sizeof (int));
    assert (rc == 0);

84 85
    const char *dealer1_routing_id = "X";
    const char *dealer2_routing_id = "Y";
86 87

    //  Name dealer1 "X" and connect it to our router
88
    rc = zmq_setsockopt (dealer1, ZMQ_ROUTING_ID, dealer1_routing_id, 1);
89 90 91 92
    assert (rc == 0);
    rc = zmq_connect (dealer1, my_endpoint);
    assert (rc == 0);

93
    //  Name dealer2 "Y" and connect it to our router
94
    rc = zmq_setsockopt (dealer2, ZMQ_ROUTING_ID, dealer2_routing_id, 1);
95 96 97 98
    assert (rc == 0);
    rc = zmq_connect (dealer2, my_endpoint);
    assert (rc == 0);

99
    //  Get message from both dealers to know when connection is ready
100 101 102 103 104
    char buffer[255];
    rc = zmq_send (dealer1, "Hello", 5, 0);
    assert (rc == 5);
    rc = zmq_recv (router, buffer, 255, 0);
    assert (rc == 1);
105
    assert (0 == memcmp (buffer, dealer1_routing_id, rc));
106 107 108 109 110 111 112
    rc = zmq_recv (router, buffer, 255, 0);
    assert (rc == 5);

    rc = zmq_send (dealer2, "Hello", 5, 0);
    assert (rc == 5);
    rc = zmq_recv (router, buffer, 255, 0);
    assert (rc == 1);
113
    assert (0 == memcmp (buffer, dealer2_routing_id, rc));
114 115 116
    rc = zmq_recv (router, buffer, 255, 0);
    assert (rc == 5);

117 118 119 120 121 122 123 124 125
    void *poller = zmq_poller_new ();
    assert (poller);

    //  Poll on router and dealer1, but not on dealer2
    rc = zmq_poller_add (poller, router, NULL, ZMQ_POLLOUT);
    assert (rc == 0);
    rc = zmq_poller_add (poller, dealer1, NULL, ZMQ_POLLIN);
    assert (rc == 0);

126 127
    const unsigned int count = 10000;
    const unsigned int event_size = 2;
128
    bool dealer2_blocked = false;
129
    unsigned int dealer1_sent = 0, dealer2_sent = 0, dealer1_received = 0;
130
    zmq_poller_event_t events[event_size];
131
    for (unsigned int iteration = 0; iteration < count; ++iteration) {
132 133
        rc = zmq_poller_wait_all (poller, events, event_size, -1);
        assert (rc != -1);
134
        for (unsigned int event_no = 0; event_no < event_size; ++event_no) {
135 136 137
            const zmq_poller_event_t &current_event = events[event_no];
            if (current_event.socket == router
                && current_event.events & ZMQ_POLLOUT) {
138
                if (send_msg_to_peer_if_ready (router, dealer1_routing_id))
139
                    ++dealer1_sent;
140

141
                if (send_msg_to_peer_if_ready (router, dealer2_routing_id))
142
                    ++dealer2_sent;
143
                else
144
                    dealer2_blocked = true;
145
            }
146 147
            if (current_event.socket == dealer1
                && current_event.events & ZMQ_POLLIN) {
148 149
                rc = zmq_recv (dealer1, buffer, 255, ZMQ_DONTWAIT);
                assert (rc == 5);
150 151 152 153 154
                int more;
                size_t more_size = sizeof (more);
                rc = zmq_getsockopt (dealer1, ZMQ_RCVMORE, &more, &more_size);
                assert (rc == 0);
                assert (!more);
155 156

                ++dealer1_received;
157 158 159 160
            }
            // never read from dealer2, so its pipe becomes full eventually
        }
    }
161
    printf ("dealer1_sent = %u, dealer2_sent = %u, dealer1_received = %u\n",
162 163
            dealer1_sent, dealer2_sent, dealer1_received);
    assert (dealer2_blocked);
164
    zmq_poller_destroy (&poller);
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179

    rc = zmq_close (router);
    assert (rc == 0);

    rc = zmq_close (dealer1);
    assert (rc == 0);

    rc = zmq_close (dealer2);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
#endif
}

180 181 182
void test_get_peer_state_corner_cases ()
{
#ifdef ZMQ_BUILD_DRAFT_API
183
    const char peer_routing_id[] = "foo";
184 185 186

    //  call get_peer_state with NULL socket
    int rc =
187
      zmq_socket_get_peer_state (NULL, peer_routing_id, strlen (peer_routing_id));
188 189 190 191 192 193 194 195 196 197 198
    assert (rc == -1 && errno == ENOTSOCK);

    void *ctx = zmq_ctx_new ();
    assert (ctx);
    void *dealer = zmq_socket (ctx, ZMQ_DEALER);
    assert (dealer);
    void *router = zmq_socket (ctx, ZMQ_ROUTER);
    assert (router);

    //  call get_peer_state with a non-ROUTER socket
    rc =
199
      zmq_socket_get_peer_state (dealer, peer_routing_id, strlen (peer_routing_id));
200 201
    assert (rc == -1 && errno == ENOTSUP);

202
    //  call get_peer_state for an unknown routing id
203
    rc =
204
      zmq_socket_get_peer_state (router, peer_routing_id, strlen (peer_routing_id));
205 206 207 208 209 210 211 212 213 214 215 216 217 218
    assert (rc == -1 && errno == EHOSTUNREACH);

    rc = zmq_close (router);
    assert (rc == 0);

    rc = zmq_close (dealer);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);

#endif
}

219
void test_basic ()
220
{
221 222
    size_t len = MAX_SOCKET_STRING;
    char my_endpoint[MAX_SOCKET_STRING];
223
    void *ctx = zmq_ctx_new ();
224
    assert (ctx);
225 226
    void *router = zmq_socket (ctx, ZMQ_ROUTER);
    assert (router);
227

228 229 230 231
    int rc = zmq_bind (router, "tcp://127.0.0.1:*");
    assert (rc == 0);

    rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
232 233
    assert (rc == 0);

234 235 236
    //  Send a message to an unknown peer with the default setting
    //  This will not report any error
    rc = zmq_send (router, "UNKNOWN", 7, ZMQ_SNDMORE);
237
    assert (rc == 7);
238
    rc = zmq_send (router, "DATA", 4, 0);
239 240
    assert (rc == 4);

241 242
    //  Send a message to an unknown peer with mandatory routing
    //  This will fail
243
    int mandatory = 1;
244 245
    rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory,
                         sizeof (mandatory));
246
    assert (rc == 0);
247
    rc = zmq_send (router, "UNKNOWN", 7, ZMQ_SNDMORE);
Martin Hurton's avatar
Martin Hurton committed
248
    assert (rc == -1 && errno == EHOSTUNREACH);
249

250 251 252
    //  Create dealer called "X" and connect it to our router
    void *dealer = zmq_socket (ctx, ZMQ_DEALER);
    assert (dealer);
253
    rc = zmq_setsockopt (dealer, ZMQ_ROUTING_ID, "X", 1);
254
    assert (rc == 0);
255
    rc = zmq_connect (dealer, my_endpoint);
256 257
    assert (rc == 0);

258
    //  Get message from dealer to know when connection is ready
259
    char buffer[255];
260 261 262 263
    rc = zmq_send (dealer, "Hello", 5, 0);
    assert (rc == 5);
    rc = zmq_recv (router, buffer, 255, 0);
    assert (rc == 1);
264
    assert (buffer[0] == 'X');
265

266 267 268
    //  Send a message to connected dealer now
    //  It should work
    rc = zmq_send (router, "X", 1, ZMQ_SNDMORE);
269
    assert (rc == 1);
270
    rc = zmq_send (router, "Hello", 5, 0);
271
    assert (rc == 5);
272

273
    rc = zmq_close (router);
274 275
    assert (rc == 0);

276
    rc = zmq_close (dealer);
277 278
    assert (rc == 0);

279
    rc = zmq_ctx_term (ctx);
280
    assert (rc == 0);
281 282 283 284 285 286 287
}

int main (void)
{
    setup_test_environment ();

    test_basic ();
288
    test_get_peer_state ();
289
    test_get_peer_state_corner_cases ();
290

291
    return 0;
292
}