test_poller.cpp 12.1 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31

    This file is part of libzmq, the ZeroMQ core engine in C++.

    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"

32 33
// duplicated from fd.hpp
#ifdef ZMQ_HAVE_WINDOWS
34 35 36 37 38 39
#if defined _MSC_VER && _MSC_VER <= 1400
typedef UINT_PTR fd_t;
enum
{
    retired_fd = (fd_t) (~0)
};
40
#else
41 42 43 44 45
typedef SOCKET fd_t;
enum
{
    retired_fd = (fd_t) INVALID_SOCKET
};
46 47
#endif
#else
48 49 50 51 52
typedef int fd_t;
enum
{
    retired_fd = -1
};
53 54
#endif

55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
void test_null_poller_pointers (void *ctx)
{
    int rc = zmq_poller_destroy (NULL);
    assert (rc == -1 && errno == EFAULT);
    void *null_poller = NULL;
    rc = zmq_poller_destroy (&null_poller);
    assert (rc == -1 && errno == EFAULT);

    void *socket = zmq_socket (ctx, ZMQ_PAIR);
    assert (socket != NULL);

    rc = zmq_poller_add (NULL, socket, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_add (&null_poller, socket, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_modify (NULL, socket, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_modify (&null_poller, socket, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_remove (NULL, socket);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_remove (&null_poller, socket);
    assert (rc == -1 && errno == EFAULT);

81
    fd_t fd;
82
    size_t fd_size = sizeof fd;
83
    rc = zmq_getsockopt (socket, ZMQ_FD, &fd, &fd_size);
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
    assert (rc == 0);

    rc = zmq_poller_add_fd (NULL, fd, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_add_fd (&null_poller, fd, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_modify_fd (NULL, fd, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_modify_fd (&null_poller, fd, ZMQ_POLLIN);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_remove_fd (NULL, fd);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_remove_fd (&null_poller, fd);
    assert (rc == -1 && errno == EFAULT);

101 102 103 104 105 106 107 108 109 110 111
    zmq_poller_event_t event;
    rc = zmq_poller_wait (NULL, &event, 0);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_wait (&null_poller, &event, 0);
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_wait_all (NULL, &event, 1, 0);
    assert (rc == -1 && errno == EFAULT);
    rc = zmq_poller_wait_all (&null_poller, &event, 1, 0);
    assert (rc == -1 && errno == EFAULT);

112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
    rc = zmq_close (socket);
    assert (rc == 0);
}

void test_null_socket_pointers ()
{
    void *poller = zmq_poller_new ();
    assert (poller != NULL);

    int rc = zmq_poller_add (poller, NULL, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == ENOTSOCK);

    rc = zmq_poller_modify (poller, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == ENOTSOCK);

    rc = zmq_poller_remove (poller, NULL);
    assert (rc == -1 && errno == ENOTSOCK);

130
    fd_t null_socket_fd = retired_fd;
131

132
    rc = zmq_poller_add_fd (poller, null_socket_fd, NULL, ZMQ_POLLIN);
133
    assert (rc == -1 && errno == EBADF);
134 135

    rc = zmq_poller_modify_fd (poller, null_socket_fd, ZMQ_POLLIN);
136
    assert (rc == -1 && errno == EBADF);
137 138

    rc = zmq_poller_remove_fd (poller, null_socket_fd);
139
    assert (rc == -1 && errno == EBADF);
140

141 142 143 144
    rc = zmq_poller_destroy (&poller);
    assert (rc == 0);
}

145 146 147 148 149 150 151 152 153 154 155
void test_null_event_pointers (void *ctx)
{
    void *socket = zmq_socket (ctx, ZMQ_PAIR);
    assert (socket != NULL);

    void *poller = zmq_poller_new ();
    assert (poller != NULL);

    int rc = zmq_poller_add (poller, socket, NULL, ZMQ_POLLIN);
    assert (rc == 0);

156
    rc = zmq_poller_wait (poller, NULL, 0);
157 158 159 160 161
    assert (rc == -1 && errno == EFAULT);

    rc = zmq_poller_wait_all (poller, NULL, 1, 0);
    assert (rc == -1 && errno == EFAULT);

162 163
    //  TODO this causes an assertion, which is not consistent if the number
    //  of events may be 0, the pointer should be allowed to by NULL in that
164 165 166 167 168 169 170 171 172 173 174 175 176
    //  case too
#if 0
    rc = zmq_poller_wait_all (poller, NULL, 0, 0);
    assert (rc == 0);
#endif

    rc = zmq_poller_destroy (&poller);
    assert (rc == 0);

    rc = zmq_close (socket);
    assert (rc == 0);
}

177
void test_add_modify_remove_corner_cases (void *ctx)
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
{
    void *poller = zmq_poller_new ();
    assert (poller != NULL);

    void *zeromq_socket = zmq_socket (ctx, ZMQ_PAIR);
    assert (zeromq_socket != NULL);

    int rc = zmq_poller_add (poller, zeromq_socket, NULL, ZMQ_POLLIN);
    assert (rc == 0);

    //  attempt to add the same socket twice
    rc = zmq_poller_add (poller, zeromq_socket, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EINVAL);

    rc = zmq_poller_remove (poller, zeromq_socket);
    assert (rc == 0);

    //  attempt to remove socket that is not present
    rc = zmq_poller_remove (poller, zeromq_socket);
    assert (rc == -1 && errno == EINVAL);

    //  attempt to modify socket that is not present
    rc = zmq_poller_modify (poller, zeromq_socket, ZMQ_POLLIN);
    assert (rc == -1 && errno == EINVAL);

    //  add a socket with no events
    //  TODO should this really be legal? it does not make any sense...
    rc = zmq_poller_add (poller, zeromq_socket, NULL, 0);
    assert (rc == 0);

    fd_t plain_socket = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
    rc = zmq_poller_add_fd (poller, plain_socket, NULL, ZMQ_POLLIN);
    assert (rc == 0);

    //  attempt to add the same plain socket twice
    rc = zmq_poller_add_fd (poller, plain_socket, NULL, ZMQ_POLLIN);
    assert (rc == -1 && errno == EINVAL);

    rc = zmq_poller_remove_fd (poller, plain_socket);
    assert (rc == 0);

    //  attempt to remove plain socket that is not present
    rc = zmq_poller_remove_fd (poller, plain_socket);
    assert (rc == -1 && errno == EINVAL);

    //  attempt to modify plain socket that is not present
    rc = zmq_poller_modify_fd (poller, plain_socket, ZMQ_POLLIN);
    assert (rc == -1 && errno == EINVAL);

    rc = zmq_poller_destroy (&poller);
    assert (rc == 0);

    rc = zmq_close (zeromq_socket);
    assert (rc == 0);

    rc = close (plain_socket);
    assert (rc == 0);
}

237
void test_wait_corner_cases (void)
238 239 240 241 242
{
    void *poller = zmq_poller_new ();
    assert (poller != NULL);

    zmq_poller_event_t event;
243
    int rc = zmq_poller_wait (poller, &event, 0);
244
    assert (rc == -1 && errno == EAGAIN);
245

246
    //  this can never return since no socket was registered, and should yield an error
247
    rc = zmq_poller_wait (poller, &event, -1);
248
    assert (rc == -1 && errno == EFAULT);
249

250 251 252
    rc = zmq_poller_wait_all (poller, &event, -1, 0);
    assert (rc == -1 && errno == EINVAL);

253
    rc = zmq_poller_wait_all (poller, &event, 0, 0);
254
    assert (rc == -1 && errno == EAGAIN);
255

256
    //  this can never return since no socket was registered, and should yield an error
257
    rc = zmq_poller_wait_all (poller, &event, 0, -1);
258
    assert (rc == -1 && errno == EFAULT);
259 260 261 262 263

    rc = zmq_poller_destroy (&poller);
    assert (rc == 0);
}

264 265
int main (void)
{
266 267 268 269
    size_t len = MAX_SOCKET_STRING;
    char my_endpoint_0[MAX_SOCKET_STRING];
    char my_endpoint_1[MAX_SOCKET_STRING];

270 271 272 273 274 275 276 277
    setup_test_environment ();

    void *ctx = zmq_ctx_new ();
    assert (ctx);

    //  Create few sockets
    void *vent = zmq_socket (ctx, ZMQ_PUSH);
    assert (vent);
278 279 280
    int rc = zmq_bind (vent, "tcp://127.0.0.1:*");
    assert (rc == 0);
    rc = zmq_getsockopt (vent, ZMQ_LAST_ENDPOINT, my_endpoint_0, &len);
281 282 283 284
    assert (rc == 0);

    void *sink = zmq_socket (ctx, ZMQ_PULL);
    assert (sink);
285
    rc = zmq_connect (sink, my_endpoint_0);
286 287 288 289 290
    assert (rc == 0);

    void *bowl = zmq_socket (ctx, ZMQ_PULL);
    assert (bowl);

291
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
292 293
    void *server = zmq_socket (ctx, ZMQ_SERVER);
    assert (server);
294 295 296 297
    rc = zmq_bind (server, "tcp://127.0.0.1:*");
    assert (rc == 0);
    len = MAX_SOCKET_STRING;
    rc = zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, my_endpoint_1, &len);
298 299 300 301
    assert (rc == 0);

    void *client = zmq_socket (ctx, ZMQ_CLIENT);
    assert (client);
302
#endif
303 304

    //  Set up poller
305
    void *poller = zmq_poller_new ();
306 307 308
    zmq_poller_event_t event;

    // waiting on poller with no registered sockets should report error
309
    rc = zmq_poller_wait (poller, &event, 0);
310
    assert (rc == -1);
311
    assert (errno == EAGAIN);
312 313

    // register sink
314
    rc = zmq_poller_add (poller, sink, sink, ZMQ_POLLIN);
315
    assert (rc == 0);
316

317 318 319
    //  Send a message
    char data[1] = {'H'};
    rc = zmq_send_const (vent, data, 1, 0);
320
    assert (rc == 1);
321 322 323 324 325 326 327 328

    //  We expect a message only on the sink
    rc = zmq_poller_wait (poller, &event, -1);
    assert (rc == 0);
    assert (event.socket == sink);
    assert (event.user_data == sink);
    rc = zmq_recv (sink, data, 1, 0);
    assert (rc == 1);
329 330 331 332

    //  We expect timed out
    rc = zmq_poller_wait (poller, &event, 0);
    assert (rc == -1);
333
    assert (errno == EAGAIN);
334

335
    //  Stop polling sink
336
    rc = zmq_poller_remove (poller, sink);
337 338 339
    assert (rc == 0);

    //  Check we can poll an FD
340
    rc = zmq_connect (bowl, my_endpoint_0);
341 342
    assert (rc == 0);

343 344
    fd_t fd;
    size_t fd_size = sizeof (fd);
345 346 347

    rc = zmq_getsockopt (bowl, ZMQ_FD, &fd, &fd_size);
    assert (rc == 0);
348
    rc = zmq_poller_add_fd (poller, fd, bowl, ZMQ_POLLIN);
349 350 351 352 353 354 355 356
    assert (rc == 0);
    rc = zmq_poller_wait (poller, &event, 500);
    assert (rc == 0);
    assert (event.socket == NULL);
    assert (event.fd == fd);
    assert (event.user_data == bowl);
    zmq_poller_remove_fd (poller, fd);

357
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
358
    //  Polling on thread safe sockets
359
    rc = zmq_poller_add (poller, server, NULL, ZMQ_POLLIN);
360
    assert (rc == 0);
361
    rc = zmq_connect (client, my_endpoint_1);
362 363 364 365 366 367
    assert (rc == 0);
    rc = zmq_send_const (client, data, 1, 0);
    assert (rc == 1);
    rc = zmq_poller_wait (poller, &event, 500);
    assert (rc == 0);
    assert (event.socket == server);
368
    assert (event.user_data == NULL);
369
    rc = zmq_recv (server, data, 1, 0);
370
    assert (rc == 1);
371

372
    //  Polling on pollout
373
    rc = zmq_poller_modify (poller, server, ZMQ_POLLOUT | ZMQ_POLLIN);
374 375 376 377 378 379
    assert (rc == 0);
    rc = zmq_poller_wait (poller, &event, 0);
    assert (rc == 0);
    assert (event.socket == server);
    assert (event.user_data == NULL);
    assert (event.events == ZMQ_POLLOUT);
380 381 382 383

    //  Stop polling server
    rc = zmq_poller_remove (poller, server);
    assert (rc == 0);
384
#endif
385

386
    //  Destroy sockets, poller and ctx
387 388 389 390 391 392
    rc = zmq_close (sink);
    assert (rc == 0);
    rc = zmq_close (vent);
    assert (rc == 0);
    rc = zmq_close (bowl);
    assert (rc == 0);
393
#if defined(ZMQ_SERVER) && defined(ZMQ_CLIENT)
394 395 396 397
    rc = zmq_close (server);
    assert (rc == 0);
    rc = zmq_close (client);
    assert (rc == 0);
398
#endif
399

400 401
    test_null_poller_pointers (ctx);
    test_null_socket_pointers ();
402 403
    test_null_event_pointers (ctx);

404
    test_add_modify_remove_corner_cases (ctx);
405
    test_wait_corner_cases ();
406 407

    rc = zmq_poller_destroy (&poller);
408
    assert (rc == 0);
409
    rc = zmq_ctx_term (ctx);
410 411 412 413
    assert (rc == 0);

    return 0;
}