test_inproc_connect.cpp 10.9 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
31
#include "testutil_unity.hpp"
32

33
SETUP_TEARDOWN_TESTCONTEXT
34 35

static void pusher (void * /*unused*/)
36 37
{
    // Connect first
38
    // do not use test_context_socket here, as it is not thread-safe
39
    void *connect_socket = zmq_socket (get_test_context (), ZMQ_PAIR);
40

41
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://sink"));
42 43

    // Queue up some data
44
    send_string_expect_success (connect_socket, "foobar", 0);
45 46

    // Cleanup
47
    TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket));
48 49
}

50
static void simult_conn (void *endpt_)
51
{
52 53
    // Pull out arguments - endpoint string
    const char *endpt = static_cast<const char *> (endpt_);
54 55

    // Connect
56
    // do not use test_context_socket here, as it is not thread-safe
57 58
    void *connect_socket = zmq_socket (get_test_context (), ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, endpt));
59 60

    // Cleanup
61
    TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket));
62 63
}

64
static void simult_bind (void *endpt_)
65 66
{
    // Pull out arguments - context followed by endpoint string
67
    const char *endpt = static_cast<const char *> (endpt_);
68 69

    // Bind
70
    // do not use test_context_socket here, as it is not thread-safe
71 72
    void *bind_socket = zmq_socket (get_test_context (), ZMQ_PUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, endpt));
73 74

    // Cleanup
75
    TEST_ASSERT_SUCCESS_ERRNO (zmq_close (bind_socket));
76 77
}

78
void test_bind_before_connect ()
79 80
{
    // Bind first
81 82
    void *bind_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://bbc"));
83 84

    // Now connect
85 86
    void *connect_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://bbc"));
87

88
    // Queue up some data
89
    send_string_expect_success (connect_socket, "foobar", 0);
90 91

    // Read pending message
92
    recv_string_expect_success (bind_socket, "foobar", 0);
93 94

    // Cleanup
95 96
    test_context_socket_close (connect_socket);
    test_context_socket_close (bind_socket);
97 98
}

99
void test_connect_before_bind ()
100 101
{
    // Connect first
102 103
    void *connect_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb"));
104 105

    // Queue up some data
106
    send_string_expect_success (connect_socket, "foobar", 0);
107 108

    // Now bind
109 110
    void *bind_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbb"));
111

112
    // Read pending message
113
    recv_string_expect_success (bind_socket, "foobar", 0);
114 115

    // Cleanup
116 117
    test_context_socket_close (connect_socket);
    test_context_socket_close (bind_socket);
118 119
}

120
void test_connect_before_bind_pub_sub ()
121 122
{
    // Connect first
123 124
    void *connect_socket = test_context_socket (ZMQ_PUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbbps"));
125 126

    // Queue up some data, this will be dropped
127
    send_string_expect_success (connect_socket, "before", 0);
128 129

    // Now bind
130
    void *bind_socket = test_context_socket (ZMQ_SUB);
131
    TEST_ASSERT_SUCCESS_ERRNO (
132 133
      zmq_setsockopt (bind_socket, ZMQ_SUBSCRIBE, "", 0));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbbps"));
134

135
    // Wait for pub-sub connection to happen
136
    msleep (SETTLE_TIME);
137 138

    // Queue up some data, this not will be dropped
139
    send_string_expect_success (connect_socket, "after", 0);
140 141

    // Read pending message
142
    recv_string_expect_success (bind_socket, "after", 0);
143 144

    // Cleanup
145 146
    test_context_socket_close (connect_socket);
    test_context_socket_close (bind_socket);
147 148
}

149 150 151 152
void test_connect_before_bind_ctx_term ()
{
    for (int i = 0; i < 20; ++i) {
        // Connect first
153
        void *connect_socket = test_context_socket (ZMQ_ROUTER);
154

155
        char ep[32];
156
        sprintf (ep, "inproc://cbbrr%d", i);
157
        TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, ep));
158 159

        // Cleanup
160
        test_context_socket_close (connect_socket);
161 162 163
    }
}

164
void test_multiple_connects ()
165 166 167
{
    const unsigned int no_of_connects = 10;

168
    void *connect_socket[no_of_connects];
169 170

    // Connect first
171
    for (unsigned int i = 0; i < no_of_connects; ++i) {
172
        connect_socket[i] = test_context_socket (ZMQ_PUSH);
173
        TEST_ASSERT_SUCCESS_ERRNO (
174
          zmq_connect (connect_socket[i], "inproc://multiple"));
175 176

        // Queue up some data
177
        send_string_expect_success (connect_socket[i], "foobar", 0);
178 179 180
    }

    // Now bind
181 182
    void *bind_socket = test_context_socket (ZMQ_PULL);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://multiple"));
183

184
    for (unsigned int i = 0; i < no_of_connects; ++i) {
185
        recv_string_expect_success (bind_socket, "foobar", 0);
186 187 188
    }

    // Cleanup
189
    for (unsigned int i = 0; i < no_of_connects; ++i) {
190
        test_context_socket_close (connect_socket[i]);
191 192
    }

193
    test_context_socket_close (bind_socket);
194 195
}

196
void test_multiple_threads ()
197
{
198
    const unsigned int no_of_threads = 30;
199

200
    void *threads[no_of_threads];
201 202

    // Connect first
203
    for (unsigned int i = 0; i < no_of_threads; ++i) {
204
        threads[i] = zmq_threadstart (&pusher, NULL);
205 206 207
    }

    // Now bind
208 209
    void *bind_socket = test_context_socket (ZMQ_PULL);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://sink"));
210

211
    for (unsigned int i = 0; i < no_of_threads; ++i) {
212
        // Read pending message
213
        recv_string_expect_success (bind_socket, "foobar", 0);
214 215 216
    }

    // Cleanup
217 218
    for (unsigned int i = 0; i < no_of_threads; ++i) {
        zmq_threadclose (threads[i]);
219 220
    }

221
    test_context_socket_close (bind_socket);
222 223
}

224 225 226
void test_simultaneous_connect_bind_threads ()
{
    const unsigned int no_of_times = 50;
227
    void *threads[no_of_times * 2];
228
    void *thr_args[no_of_times];
229 230 231
    char endpts[no_of_times][20];

    // Set up thread arguments: context followed by endpoint string
232
    for (unsigned int i = 0; i < no_of_times; ++i) {
233
        thr_args[i] = (void *) endpts[i];
234 235 236 237
        sprintf (endpts[i], "inproc://foo_%d", i);
    }

    // Spawn all threads as simultaneously as possible
238 239 240 241 242
    for (unsigned int i = 0; i < no_of_times; ++i) {
        threads[i * 2 + 0] =
          zmq_threadstart (&simult_conn, (void *) thr_args[i]);
        threads[i * 2 + 1] =
          zmq_threadstart (&simult_bind, (void *) thr_args[i]);
243 244 245
    }

    // Close all threads
246 247 248
    for (unsigned int i = 0; i < no_of_times; ++i) {
        zmq_threadclose (threads[i * 2 + 0]);
        zmq_threadclose (threads[i * 2 + 1]);
249 250 251
    }
}

252
void test_routing_id ()
253 254
{
    //  Create the infrastructure
255
    void *sc = test_context_socket (ZMQ_DEALER);
256

257
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://routing_id"));
258

259
    void *sb = test_context_socket (ZMQ_ROUTER);
260

261
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://routing_id"));
262 263

    //  Send 2-part message.
264 265 266 267
    TEST_ASSERT_EQUAL_INT (
      1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "A", 1, ZMQ_SNDMORE)));
    TEST_ASSERT_EQUAL_INT (
      1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "B", 1, 0)));
268

269
    //  Routing id comes first.
270
    zmq_msg_t msg;
271 272 273
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0));
    TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg));
274 275

    //  Then the first part of the message body.
276 277 278
    TEST_ASSERT_EQUAL_INT (
      1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
    TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg));
279 280

    //  And finally, the second part of the message body.
281 282 283
    TEST_ASSERT_EQUAL_INT (
      1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
    TEST_ASSERT_EQUAL_INT (0, zmq_msg_more (&msg));
284 285

    //  Deallocate the infrastructure.
286 287
    test_context_socket_close (sc);
    test_context_socket_close (sb);
288 289 290 291
}

void test_connect_only ()
{
292 293
    void *connect_socket = test_context_socket (ZMQ_PUSH);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://a"));
294

295
    test_context_socket_close (connect_socket);
296 297
}

298 299 300 301

void test_unbind ()
{
    // Bind and unbind socket 1
302 303 304
    void *bind_socket1 = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket1, "inproc://unbind"));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (bind_socket1, "inproc://unbind"));
305 306

    // Bind socket 2
307 308
    void *bind_socket2 = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket2, "inproc://unbind"));
309 310

    // Now connect
311 312
    void *connect_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://unbind"));
313 314

    // Queue up some data
315
    send_string_expect_success (connect_socket, "foobar", 0);
316 317

    // Read pending message
318
    recv_string_expect_success (bind_socket2, "foobar", 0);
319 320

    // Cleanup
321 322 323
    test_context_socket_close (connect_socket);
    test_context_socket_close (bind_socket1);
    test_context_socket_close (bind_socket2);
324 325
}

326 327 328
void test_shutdown_during_pend ()
{
    // Connect first
329 330
    void *connect_socket = test_context_socket (ZMQ_PAIR);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb"));
331

332
    zmq_ctx_shutdown (get_test_context ());
333 334

    // Cleanup
335
    test_context_socket_close (connect_socket);
336 337
}

338 339
int main (void)
{
340
    setup_test_environment ();
341

342 343 344 345 346 347 348 349 350 351 352 353 354
    UNITY_BEGIN ();
    RUN_TEST (test_bind_before_connect);
    RUN_TEST (test_connect_before_bind);
    RUN_TEST (test_connect_before_bind_pub_sub);
    RUN_TEST (test_connect_before_bind_ctx_term);
    RUN_TEST (test_multiple_connects);
    RUN_TEST (test_multiple_threads);
    RUN_TEST (test_simultaneous_connect_bind_threads);
    RUN_TEST (test_routing_id);
    RUN_TEST (test_connect_only);
    RUN_TEST (test_unbind);
    RUN_TEST (test_shutdown_during_pend);
    return UNITY_END ();
355
}