test_inproc_connect.cpp 11 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
31
#include "testutil_unity.hpp"
32

33 34 35 36 37 38 39 40 41 42 43
void setUp ()
{
    setup_test_context ();
}

void tearDown ()
{
    teardown_test_context ();
}

static void pusher (void * /*unused*/)
44 45
{
    // Connect first
46
    // do not use test_context_socket here, as it is not thread-safe
47
    void *connect_socket = zmq_socket (get_test_context (), ZMQ_PAIR);
48

49
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://sink"));
50 51

    // Queue up some data
52
    send_string_expect_success (connect_socket, "foobar", 0);
53 54

    // Cleanup
55
    TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket));
56 57
}

58
static void simult_conn (void *endpt_)
59
{
60 61
    // Pull out arguments - endpoint string
    const char *endpt = static_cast<const char *> (endpt_);
62 63

    // Connect
64
    // do not use test_context_socket here, as it is not thread-safe
65 66
    void *connect_socket = zmq_socket (get_test_context (), ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, endpt));
67 68

    // Cleanup
69
    TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket));
70 71
}

72
static void simult_bind (void *endpt_)
73 74
{
    // Pull out arguments - context followed by endpoint string
75
    const char *endpt = static_cast<const char *> (endpt_);
76 77

    // Bind
78
    // do not use test_context_socket here, as it is not thread-safe
79 80
    void *bind_socket = zmq_socket (get_test_context (), ZMQ_PUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, endpt));
81 82

    // Cleanup
83
    TEST_ASSERT_SUCCESS_ERRNO (zmq_close (bind_socket));
84 85
}

86
void test_bind_before_connect ()
87 88
{
    // Bind first
89 90
    void *bind_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://bbc"));
91 92

    // Now connect
93 94
    void *connect_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://bbc"));
95

96
    // Queue up some data
97
    send_string_expect_success (connect_socket, "foobar", 0);
98 99

    // Read pending message
100
    recv_string_expect_success (bind_socket, "foobar", 0);
101 102

    // Cleanup
103 104
    test_context_socket_close (connect_socket);
    test_context_socket_close (bind_socket);
105 106
}

107
void test_connect_before_bind ()
108 109
{
    // Connect first
110 111
    void *connect_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb"));
112 113

    // Queue up some data
114
    send_string_expect_success (connect_socket, "foobar", 0);
115 116

    // Now bind
117 118
    void *bind_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbb"));
119

120
    // Read pending message
121
    recv_string_expect_success (bind_socket, "foobar", 0);
122 123

    // Cleanup
124 125
    test_context_socket_close (connect_socket);
    test_context_socket_close (bind_socket);
126 127
}

128
void test_connect_before_bind_pub_sub ()
129 130
{
    // Connect first
131 132
    void *connect_socket = test_context_socket (ZMQ_PUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbbps"));
133 134

    // Queue up some data, this will be dropped
135
    send_string_expect_success (connect_socket, "before", 0);
136 137

    // Now bind
138
    void *bind_socket = test_context_socket (ZMQ_SUB);
139
    TEST_ASSERT_SUCCESS_ERRNO (
140 141
      zmq_setsockopt (bind_socket, ZMQ_SUBSCRIBE, "", 0));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbbps"));
142

143
    // Wait for pub-sub connection to happen
144
    msleep (SETTLE_TIME);
145 146

    // Queue up some data, this not will be dropped
147
    send_string_expect_success (connect_socket, "after", 0);
148 149

    // Read pending message
150
    recv_string_expect_success (bind_socket, "after", 0);
151 152

    // Cleanup
153 154
    test_context_socket_close (connect_socket);
    test_context_socket_close (bind_socket);
155 156
}

157 158 159 160
void test_connect_before_bind_ctx_term ()
{
    for (int i = 0; i < 20; ++i) {
        // Connect first
161
        void *connect_socket = test_context_socket (ZMQ_ROUTER);
162 163

        char ep[20];
164
        sprintf (ep, "inproc://cbbrr%d", i);
165
        TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, ep));
166 167

        // Cleanup
168
        test_context_socket_close (connect_socket);
169 170 171
    }
}

172
void test_multiple_connects ()
173 174 175
{
    const unsigned int no_of_connects = 10;

176
    void *connect_socket[no_of_connects];
177 178

    // Connect first
179
    for (unsigned int i = 0; i < no_of_connects; ++i) {
180
        connect_socket[i] = test_context_socket (ZMQ_PUSH);
181
        TEST_ASSERT_SUCCESS_ERRNO (
182
          zmq_connect (connect_socket[i], "inproc://multiple"));
183 184

        // Queue up some data
185
        send_string_expect_success (connect_socket[i], "foobar", 0);
186 187 188
    }

    // Now bind
189 190
    void *bind_socket = test_context_socket (ZMQ_PULL);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://multiple"));
191

192
    for (unsigned int i = 0; i < no_of_connects; ++i) {
193
        recv_string_expect_success (bind_socket, "foobar", 0);
194 195 196
    }

    // Cleanup
197
    for (unsigned int i = 0; i < no_of_connects; ++i) {
198
        test_context_socket_close (connect_socket[i]);
199 200
    }

201
    test_context_socket_close (bind_socket);
202 203
}

204
void test_multiple_threads ()
205
{
206
    const unsigned int no_of_threads = 30;
207

208
    void *threads[no_of_threads];
209 210

    // Connect first
211
    for (unsigned int i = 0; i < no_of_threads; ++i) {
212
        threads[i] = zmq_threadstart (&pusher, NULL);
213 214 215
    }

    // Now bind
216 217
    void *bind_socket = test_context_socket (ZMQ_PULL);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://sink"));
218

219
    for (unsigned int i = 0; i < no_of_threads; ++i) {
220
        // Read pending message
221
        recv_string_expect_success (bind_socket, "foobar", 0);
222 223 224
    }

    // Cleanup
225 226
    for (unsigned int i = 0; i < no_of_threads; ++i) {
        zmq_threadclose (threads[i]);
227 228
    }

229
    test_context_socket_close (bind_socket);
230 231
}

232 233 234
void test_simultaneous_connect_bind_threads ()
{
    const unsigned int no_of_times = 50;
235
    void *threads[no_of_times * 2];
236
    void *thr_args[no_of_times];
237 238 239
    char endpts[no_of_times][20];

    // Set up thread arguments: context followed by endpoint string
240
    for (unsigned int i = 0; i < no_of_times; ++i) {
241
        thr_args[i] = (void *) endpts[i];
242 243 244 245
        sprintf (endpts[i], "inproc://foo_%d", i);
    }

    // Spawn all threads as simultaneously as possible
246 247 248 249 250
    for (unsigned int i = 0; i < no_of_times; ++i) {
        threads[i * 2 + 0] =
          zmq_threadstart (&simult_conn, (void *) thr_args[i]);
        threads[i * 2 + 1] =
          zmq_threadstart (&simult_bind, (void *) thr_args[i]);
251 252 253
    }

    // Close all threads
254 255 256
    for (unsigned int i = 0; i < no_of_times; ++i) {
        zmq_threadclose (threads[i * 2 + 0]);
        zmq_threadclose (threads[i * 2 + 1]);
257 258 259
    }
}

260
void test_routing_id ()
261 262
{
    //  Create the infrastructure
263
    void *sc = test_context_socket (ZMQ_DEALER);
264

265
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://routing_id"));
266

267
    void *sb = test_context_socket (ZMQ_ROUTER);
268

269
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://routing_id"));
270 271

    //  Send 2-part message.
272 273 274 275
    TEST_ASSERT_EQUAL_INT (
      1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "A", 1, ZMQ_SNDMORE)));
    TEST_ASSERT_EQUAL_INT (
      1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "B", 1, 0)));
276

277
    //  Routing id comes first.
278
    zmq_msg_t msg;
279 280 281
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0));
    TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg));
282 283

    //  Then the first part of the message body.
284 285 286
    TEST_ASSERT_EQUAL_INT (
      1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
    TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg));
287 288

    //  And finally, the second part of the message body.
289 290 291
    TEST_ASSERT_EQUAL_INT (
      1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
    TEST_ASSERT_EQUAL_INT (0, zmq_msg_more (&msg));
292 293

    //  Deallocate the infrastructure.
294 295
    test_context_socket_close (sc);
    test_context_socket_close (sb);
296 297 298 299
}

void test_connect_only ()
{
300 301
    void *connect_socket = test_context_socket (ZMQ_PUSH);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://a"));
302

303
    test_context_socket_close (connect_socket);
304 305
}

306 307 308 309

void test_unbind ()
{
    // Bind and unbind socket 1
310 311 312
    void *bind_socket1 = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket1, "inproc://unbind"));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (bind_socket1, "inproc://unbind"));
313 314

    // Bind socket 2
315 316
    void *bind_socket2 = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket2, "inproc://unbind"));
317 318

    // Now connect
319 320
    void *connect_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://unbind"));
321 322

    // Queue up some data
323
    send_string_expect_success (connect_socket, "foobar", 0);
324 325

    // Read pending message
326
    recv_string_expect_success (bind_socket2, "foobar", 0);
327 328

    // Cleanup
329 330 331
    test_context_socket_close (connect_socket);
    test_context_socket_close (bind_socket1);
    test_context_socket_close (bind_socket2);
332 333
}

334 335 336
void test_shutdown_during_pend ()
{
    // Connect first
337 338
    void *connect_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb"));
339

340
    zmq_ctx_shutdown (get_test_context ());
341 342

    // Cleanup
343
    test_context_socket_close (connect_socket);
344 345
}

346 347
int main (void)
{
348
    setup_test_environment ();
349

350 351 352 353 354 355 356 357 358 359 360 361 362
    UNITY_BEGIN ();
    RUN_TEST (test_bind_before_connect);
    RUN_TEST (test_connect_before_bind);
    RUN_TEST (test_connect_before_bind_pub_sub);
    RUN_TEST (test_connect_before_bind_ctx_term);
    RUN_TEST (test_multiple_connects);
    RUN_TEST (test_multiple_threads);
    RUN_TEST (test_simultaneous_connect_bind_threads);
    RUN_TEST (test_routing_id);
    RUN_TEST (test_connect_only);
    RUN_TEST (test_unbind);
    RUN_TEST (test_shutdown_during_pend);
    return UNITY_END ();
363
}