test_poller.cpp 12.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31

    This file is part of libzmq, the ZeroMQ core engine in C++.

    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"

32 33 34 35 36 37 38 39 40 41 42 43 44 45
// duplicated from fd.hpp
#ifdef ZMQ_HAVE_WINDOWS
#if defined _MSC_VER &&_MSC_VER <= 1400
    typedef UINT_PTR fd_t;
    enum {retired_fd = (fd_t)(~0)};
#else
    typedef SOCKET fd_t;
    enum {retired_fd = (fd_t)INVALID_SOCKET};
#endif
#else
    typedef int fd_t;
    enum {retired_fd = -1};
#endif

46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
void test_null_poller_pointers (void *ctx)
{
    int rc = zmq_poller_destroy (NULL);
    assert (rc == -1 && errno == EFAULT);
    void *null_poller = NULL;
    rc = zmq_poller_destroy (&null_poller);
    assert (rc == -1 && errno == EFAULT);

    void *socket = zmq_socket (ctx, ZMQ_PAIR);
    assert (socket != NULL);

    rc = zmq_poller_add (NULL, socket, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_add (&null_poller, socket, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_modify (NULL, socket, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_modify (&null_poller, socket, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_remove (NULL, socket);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_remove (&null_poller, socket);
    assert (rc == -1 && errno == EFAULT);

72
    fd_t fd;
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
    size_t fd_size = sizeof fd;
    rc = zmq_getsockopt(socket, ZMQ_FD, &fd, &fd_size);
    assert (rc == 0);

    rc = zmq_poller_add_fd (NULL, fd, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_add_fd (&null_poller, fd, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_modify_fd (NULL, fd, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_modify_fd (&null_poller, fd, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_remove_fd (NULL, fd);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_remove_fd (&null_poller, fd);
    assert (rc == -1 && errno == EFAULT);

92 93 94 95 96 97 98 99 100 101 102
    zmq_poller_event_t event;
    rc = zmq_poller_wait (NULL, &event, 0);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_wait (&null_poller, &event, 0);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_wait_all (NULL, &event, 1, 0);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_wait_all (&null_poller, &event, 1, 0);
    assert (rc == -1 && errno == EFAULT);

103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
    rc = zmq_close (socket);
    assert (rc == 0);
}

void test_null_socket_pointers ()
{
    void *poller = zmq_poller_new ();
    assert (poller != NULL);

    int rc = zmq_poller_add (poller, NULL, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == ENOTSOCK);

    rc = zmq_poller_modify (poller, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == ENOTSOCK);

    rc = zmq_poller_remove (poller, NULL);
    assert (rc == -1 && errno == ENOTSOCK);

121 122 123
    fd_t null_socket_fd = retired_fd;
    
    rc = zmq_poller_add_fd (poller, null_socket_fd, NULL, ZMQ_POLLIN);
124
    assert (rc == -1 && errno == EBADF);
125 126

    rc = zmq_poller_modify_fd (poller, null_socket_fd, ZMQ_POLLIN);
127
    assert (rc == -1 && errno == EBADF);
128 129

    rc = zmq_poller_remove_fd (poller, null_socket_fd);
130
    assert (rc == -1 && errno == EBADF);
131

132 133 134 135
    rc = zmq_poller_destroy (&poller);
    assert (rc == 0);
}

136 137 138 139 140 141 142 143 144 145 146
void test_null_event_pointers (void *ctx)
{
    void *socket = zmq_socket (ctx, ZMQ_PAIR);
    assert (socket != NULL);

    void *poller = zmq_poller_new ();
    assert (poller != NULL);

    int rc = zmq_poller_add (poller, socket, NULL, ZMQ_POLLIN);
    assert (rc == 0);

147
    rc = zmq_poller_wait (poller, NULL, 0);
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_wait_all (poller, NULL, 1, 0);
    assert (rc == -1 && errno == EFAULT);

    //  TODO this causes an assertion, which is not consistent if the number 
    //  of events may be 0, the pointer should be allowed to by NULL in that 
    //  case too
#if 0
    rc = zmq_poller_wait_all (poller, NULL, 0, 0);
    assert (rc == 0);
#endif

    rc = zmq_poller_destroy (&poller);
    assert (rc == 0);

    rc = zmq_close (socket);
    assert (rc == 0);
}

168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
void test_add_modify_remove_corner_cases(void *ctx)
{
    void *poller = zmq_poller_new ();
    assert (poller != NULL);

    void *zeromq_socket = zmq_socket (ctx, ZMQ_PAIR);
    assert (zeromq_socket != NULL);

    int rc = zmq_poller_add (poller, zeromq_socket, NULL, ZMQ_POLLIN);
    assert (rc == 0);

    //  attempt to add the same socket twice
    rc = zmq_poller_add (poller, zeromq_socket, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EINVAL);

    rc = zmq_poller_remove (poller, zeromq_socket);
    assert (rc == 0);

    //  attempt to remove socket that is not present
    rc = zmq_poller_remove (poller, zeromq_socket);
    assert (rc == -1 && errno == EINVAL);

    //  attempt to modify socket that is not present
    rc = zmq_poller_modify (poller, zeromq_socket, ZMQ_POLLIN);
    assert (rc == -1 && errno == EINVAL);

    //  add a socket with no events
    //  TODO should this really be legal? it does not make any sense...
    rc = zmq_poller_add (poller, zeromq_socket, NULL, 0);
    assert (rc == 0);

    fd_t plain_socket = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
    rc = zmq_poller_add_fd (poller, plain_socket, NULL, ZMQ_POLLIN);
    assert (rc == 0);

    //  attempt to add the same plain socket twice
    rc = zmq_poller_add_fd (poller, plain_socket, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EINVAL);

    rc = zmq_poller_remove_fd (poller, plain_socket);
    assert (rc == 0);

    //  attempt to remove plain socket that is not present
    rc = zmq_poller_remove_fd (poller, plain_socket);
    assert (rc == -1 && errno == EINVAL);

    //  attempt to modify plain socket that is not present
    rc = zmq_poller_modify_fd (poller, plain_socket, ZMQ_POLLIN);
    assert (rc == -1 && errno == EINVAL);

    rc = zmq_poller_destroy (&poller);
    assert (rc == 0);

    rc = zmq_close (zeromq_socket);
    assert (rc == 0);

    rc = close (plain_socket);
    assert (rc == 0);
}

228 229 230 231 232 233 234
void test_wait_corner_cases (void *ctx)
{
    void *poller = zmq_poller_new ();
    assert (poller != NULL);

    zmq_poller_event_t event;
    int rc = zmq_poller_wait(poller, &event, 0);
235
    assert (rc == -1 && errno == EAGAIN);
236

237
    //  this can never return since no socket was registered, and should yield an error
238
    rc = zmq_poller_wait(poller, &event, -1);
239
    assert (rc == -1 && errno == EFAULT);
240

241 242 243
    rc = zmq_poller_wait_all (poller, &event, -1, 0);
    assert (rc == -1 && errno == EINVAL);

244
    rc = zmq_poller_wait_all (poller, &event, 0, 0);
245
    assert (rc == -1 && errno == EAGAIN);
246

247
    //  this can never return since no socket was registered, and should yield an error
248
    rc = zmq_poller_wait_all (poller, &event, 0, -1);
249
    assert (rc == -1 && errno == EFAULT);
250 251 252 253 254

    rc = zmq_poller_destroy (&poller);
    assert (rc == 0);
}

255 256
int main (void)
{
257 258 259 260
    size_t len = MAX_SOCKET_STRING;
    char my_endpoint_0[MAX_SOCKET_STRING];
    char my_endpoint_1[MAX_SOCKET_STRING];

261 262 263 264 265 266 267 268
    setup_test_environment ();

    void *ctx = zmq_ctx_new ();
    assert (ctx);

    //  Create few sockets
    void *vent = zmq_socket (ctx, ZMQ_PUSH);
    assert (vent);
269 270 271
    int rc = zmq_bind (vent, "tcp://127.0.0.1:*");
    assert (rc == 0);
    rc = zmq_getsockopt (vent, ZMQ_LAST_ENDPOINT, my_endpoint_0, &len);
272 273 274 275
    assert (rc == 0);

    void *sink = zmq_socket (ctx, ZMQ_PULL);
    assert (sink);
276
    rc = zmq_connect (sink, my_endpoint_0);
277 278 279 280 281
    assert (rc == 0);

    void *bowl = zmq_socket (ctx, ZMQ_PULL);
    assert (bowl);

282
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
283 284
    void *server = zmq_socket (ctx, ZMQ_SERVER);
    assert (server);
285 286 287 288
    rc = zmq_bind (server, "tcp://127.0.0.1:*");
    assert (rc == 0);
    len = MAX_SOCKET_STRING;
    rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint_1, &len);
289 290 291 292
    assert (rc == 0);

    void *client = zmq_socket (ctx, ZMQ_CLIENT);
    assert (client);
293
#endif
294 295

    //  Set up poller
296
    void *poller = zmq_poller_new ();
297 298 299
    zmq_poller_event_t event;

    // waiting on poller with no registered sockets should report error
300
    rc = zmq_poller_wait (poller, &event, 0);
301
    assert (rc == -1);
302
    assert (errno == EAGAIN);
303 304

    // register sink
305
    rc = zmq_poller_add (poller, sink, sink, ZMQ_POLLIN);
306
    assert (rc == 0);
307

308 309 310
    //  Send a message
    char data[1] = {'H'};
    rc = zmq_send_const (vent, data, 1, 0);
311
    assert (rc == 1);
312 313 314 315 316 317 318 319

    //  We expect a message only on the sink
    rc = zmq_poller_wait (poller, &event, -1);
    assert (rc == 0);
    assert (event.socket == sink);
    assert (event.user_data == sink);
    rc = zmq_recv (sink, data, 1, 0);
    assert (rc == 1);
320 321 322 323

    //  We expect timed out
    rc = zmq_poller_wait (poller, &event, 0);
    assert (rc == -1);
324
    assert (errno == EAGAIN);
325

326
    //  Stop polling sink
327
    rc = zmq_poller_remove (poller, sink);
328 329 330
    assert (rc == 0);

    //  Check we can poll an FD
331
    rc = zmq_connect (bowl, my_endpoint_0);
332 333
    assert (rc == 0);

334 335
    fd_t fd;
    size_t fd_size = sizeof (fd);
336 337 338

    rc = zmq_getsockopt (bowl, ZMQ_FD, &fd, &fd_size);
    assert (rc == 0);
339
    rc = zmq_poller_add_fd (poller, fd, bowl, ZMQ_POLLIN);
340 341 342 343 344 345 346 347
    assert (rc == 0);
    rc = zmq_poller_wait (poller, &event, 500);
    assert (rc == 0);
    assert (event.socket == NULL);
    assert (event.fd == fd);
    assert (event.user_data == bowl);
    zmq_poller_remove_fd (poller, fd);

348
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
349
    //  Polling on thread safe sockets
350
    rc = zmq_poller_add (poller, server, NULL, ZMQ_POLLIN);
351
    assert (rc == 0);
352
    rc = zmq_connect (client, my_endpoint_1);
353 354 355 356 357 358
    assert (rc == 0);
    rc = zmq_send_const (client, data, 1, 0);
    assert (rc == 1);
    rc = zmq_poller_wait (poller, &event, 500);
    assert (rc == 0);
    assert (event.socket == server);
359
    assert (event.user_data == NULL);
360
    rc = zmq_recv (server, data, 1, 0);
361
    assert (rc == 1);
362

363
    //  Polling on pollout
364
    rc = zmq_poller_modify (poller, server, ZMQ_POLLOUT | ZMQ_POLLIN);
365 366 367 368 369 370
    assert (rc == 0);
    rc = zmq_poller_wait (poller, &event, 0);
    assert (rc == 0);
    assert (event.socket == server);
    assert (event.user_data == NULL);
    assert (event.events == ZMQ_POLLOUT);
371 372 373 374

    //  Stop polling server
    rc = zmq_poller_remove (poller, server);
    assert (rc == 0);
375
#endif
376

377
    //  Destroy sockets, poller and ctx
378 379 380 381 382 383
    rc = zmq_close (sink);
    assert (rc == 0);
    rc = zmq_close (vent);
    assert (rc == 0);
    rc = zmq_close (bowl);
    assert (rc == 0);
384
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
385 386 387 388
    rc = zmq_close (server);
    assert (rc == 0);
    rc = zmq_close (client);
    assert (rc == 0);
389
#endif
390

391 392
    test_null_poller_pointers (ctx);
    test_null_socket_pointers ();
393 394
    test_null_event_pointers (ctx);

395
    test_add_modify_remove_corner_cases (ctx);
396
    test_wait_corner_cases (ctx);
397 398

    rc = zmq_poller_destroy (&poller);
399
    assert (rc == 0);
400
    rc = zmq_ctx_term (ctx);
401 402 403 404
    assert (rc == 0);

    return 0;
}