Commit 963c6a8e authored by Pieter Hintjens's avatar Pieter Hintjens

Lots of cleanups to self-tests

* Removed or truncated sleeps so the tests run faster
* Removed dependencies on zmq_utils
* Rewrote a few tests that were confusing
* Minor code cleanups
parent c39cb0bd
/* /*
Copyright (c) 2012 Ian Barber Copyright (c) 2012 Ian Barber
Copyright (c) 2012 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2013 iMatix Corporation
This file is part of 0MQ.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
0MQ is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by
the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or
the Free Software Foundation; either version 3 of the License, or (at your option) any later version.
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>.
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
...@@ -31,12 +29,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>. ...@@ -31,12 +29,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
int main (void) int main (void)
{ {
fprintf (stderr, "test_connect_delay running...\n");
int val; int val;
int rc; int rc;
char buffer[16]; char buffer[16];
int seen = 0;
// TEST 1. // TEST 1.
// First we're going to attempt to send messages to two // First we're going to attempt to send messages to two
// pipes, one connected, the other not. We should see // pipes, one connected, the other not. We should see
...@@ -53,7 +48,7 @@ int main (void) ...@@ -53,7 +48,7 @@ int main (void)
val = 0; val = 0;
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
rc = zmq_bind(to, "tcp://*:6555"); rc = zmq_bind (to, "tcp://*:6555");
assert (rc == 0); assert (rc == 0);
// Create a socket pushing to two endpoints - only 1 message should arrive. // Create a socket pushing to two endpoints - only 1 message should arrive.
...@@ -71,26 +66,22 @@ int main (void) ...@@ -71,26 +66,22 @@ int main (void)
// We send 10 messages, 5 should just get stuck in the queue // We send 10 messages, 5 should just get stuck in the queue
// for the not-yet-connected pipe // for the not-yet-connected pipe
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i) {
{ rc = zmq_send (from, "Hello", 5, 0);
std::string message("message "); assert (rc == 5);
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert(rc >= 0);
} }
// Sleep to allow the messages to be delivered
zmq_sleep (1);
// We now consume from the connected pipe // We now consume from the connected pipe
// - we should see just 5 // - we should see just 5
seen = 0; int timeout = 100;
for (int i = 0; i < 10; ++i) rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
{ assert (rc == 0);
memset (&buffer, 0, sizeof(buffer));
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); int seen = 0;
if( rc == -1) while (true) {
break; rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
if (rc == -1)
break; // Break when we didn't get a message
seen++; seen++;
} }
assert (seen == 5); assert (seen == 5);
...@@ -144,26 +135,21 @@ int main (void) ...@@ -144,26 +135,21 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Send 10 messages, all should be routed to the connected pipe // Send 10 messages, all should be routed to the connected pipe
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i) {
{ rc = zmq_send (from, "Hello", 5, 0);
std::string message("message "); assert (rc == 5);
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
} }
rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
// Sleep to allow the messages to be delivered
zmq_sleep (1);
// Send 10 messages, all should arrive.
seen = 0; seen = 0;
for (int i = 0; i < 10; ++i) while (true) {
{ rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
memset(&buffer, 0, sizeof(buffer)); if (rc == -1)
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); break; // Break when we didn't get a message
// If there is a failed delivery, assert! seen++;
assert (rc != -1);
} }
assert (seen == 10);
rc = zmq_close (from); rc = zmq_close (from);
assert (rc == 0); assert (rc == 0);
...@@ -177,82 +163,75 @@ int main (void) ...@@ -177,82 +163,75 @@ int main (void)
// TEST 3 // TEST 3
// This time we want to validate that the same blocking behaviour // This time we want to validate that the same blocking behaviour
// occurs with an existing connection that is broken. We will send // occurs with an existing connection that is broken. We will send
// messaages to a connected pipe, disconnect and verify the messages // messages to a connected pipe, disconnect and verify the messages
// block. Then we reconnect and verify messages flow again. // block. Then we reconnect and verify messages flow again.
context = zmq_ctx_new(); context = zmq_ctx_new ();
void *context2 = zmq_ctx_new();
to = zmq_socket (context2, ZMQ_PULL); void *backend = zmq_socket (context, ZMQ_DEALER);
assert (to); assert (backend);
rc = zmq_bind (to, "tcp://*:5560"); void *frontend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
int zero = 0;
rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0); assert (rc == 0);
rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero));
val = 0;
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
// Create a socket pushing // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT
from = zmq_socket (context, ZMQ_PUSH); int on = 1;
assert (from); rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on));
val = 0;
rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
val = 1; rc = zmq_bind (backend, "tcp://*:5560");
rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (frontend, "tcp://localhost:5560");
// Connect to the valid socket socket
rc = zmq_connect (from, "tcp://localhost:5560");
assert (rc == 0); assert (rc == 0);
// Allow connections to stabilise // Ping backend to frontend so we know when the connection is up
zmq_sleep(1); rc = zmq_send (backend, "Hello", 5, 0);
assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
assert (rc == 5);
// Send a message, should succeed // Send message from frontend to backend
std::string message("message "); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
rc = zmq_send (from, message.data(), message.size(), 0); assert (rc == 5);
assert (rc >= 0);
rc = zmq_close (to);
assert (rc == 0);
rc = zmq_ctx_term(context2); rc = zmq_close (backend);
assert (rc == 0); assert (rc == 0);
// Give time to process disconnect // Give time to process disconnect
zmq_sleep(1); // There's no way to do this except with a sleep
struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Send a message, should fail // Send a message, should fail
rc = zmq_send (from, message.data(), message.size(), ZMQ_DONTWAIT); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == -1); assert (rc == -1);
context2 = zmq_ctx_new(); // Recreate backend socket
to = zmq_socket (context2, ZMQ_PULL); backend = zmq_socket (context, ZMQ_DEALER);
assert (to); assert (backend);
rc = zmq_bind (to, "tcp://*:5560"); rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0); assert (rc == 0);
rc = zmq_bind (backend, "tcp://*:5560");
val = 0;
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
// Allow connections to stabilise // Ping backend to frontend so we know when the connection is up
zmq_sleep(1); rc = zmq_send (backend, "Hello", 5, 0);
assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
assert (rc == 5);
// After the reconnect, should succeed // After the reconnect, should succeed
rc = zmq_send (from, message.data(), message.size(), 0); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc >= 0); assert (rc == 5);
rc = zmq_close (to); rc = zmq_close (backend);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (from); rc = zmq_close (frontend);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_term(context); rc = zmq_ctx_term(context);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_term(context2);
assert (rc == 0);
} }
...@@ -27,8 +27,6 @@ ...@@ -27,8 +27,6 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_connect_resolve running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -28,14 +28,14 @@ int main(int argc, char** argv) { ...@@ -28,14 +28,14 @@ int main(int argc, char** argv) {
size_t more_size = sizeof(more); size_t more_size = sizeof(more);
int iteration = 0; int iteration = 0;
while(1) { while (1) {
zmq_pollitem_t items [] = { zmq_pollitem_t items [] = {
{ subSocket, 0, ZMQ_POLLIN, 0 }, // read publications { subSocket, 0, ZMQ_POLLIN, 0 }, // read publications
{ pubSocket, 0, ZMQ_POLLIN, 0 }, // read subscriptions { pubSocket, 0, ZMQ_POLLIN, 0 }, // read subscriptions
}; };
zmq_poll(items, 2, 500); int rc = zmq_poll (items, 2, 100);
if (items[1].revents & ZMQ_POLLIN) { if (items [1].revents & ZMQ_POLLIN) {
while (1) { while (1) {
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
...@@ -73,34 +73,29 @@ int main(int argc, char** argv) { ...@@ -73,34 +73,29 @@ int main(int argc, char** argv) {
} }
} }
} }
if (iteration == 1) { if (iteration == 1) {
zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno)); zmq_connect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_connect: %s\n", zmq_strerror(errno));
//zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno)); //zmq_connect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_connect: %s\n", zmq_strerror(errno));
} }
if (iteration == 4) { if (iteration == 4) {
zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno)); zmq_disconnect(subSocket, "inproc://someInProcDescriptor") && printf("zmq_disconnect(%d): %s\n", errno, zmq_strerror(errno));
//zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno)); //zmq_disconnect(subSocket, "tcp://127.0.0.1:30010") && printf("zmq_disconnect: %s\n", zmq_strerror(errno));
} }
if (iteration > 4 && rc == 0)
if (iteration == 10) {
break; break;
}
zmq_msg_t channelEnvlp; zmq_msg_t channelEnvlp;
ZMQ_PREPARE_STRING(channelEnvlp, "foo", 3); ZMQ_PREPARE_STRING(channelEnvlp, "foo", 3);
zmq_sendmsg(pubSocket, &channelEnvlp, ZMQ_SNDMORE) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno)); zmq_msg_send (&channelEnvlp, pubSocket, ZMQ_SNDMORE) >= 0 || printf("zmq_msg_send: %s\n",zmq_strerror(errno));
zmq_msg_close(&channelEnvlp) && printf("zmq_msg_close: %s\n",zmq_strerror(errno)); zmq_msg_close(&channelEnvlp) && printf("zmq_msg_close: %s\n",zmq_strerror(errno));
zmq_msg_t message; zmq_msg_t message;
ZMQ_PREPARE_STRING(message, "this is foo!", 12); ZMQ_PREPARE_STRING(message, "this is foo!", 12);
zmq_sendmsg(pubSocket, &message, 0) >= 0 || printf("zmq_sendmsg: %s\n",zmq_strerror(errno)); zmq_msg_send (&message, pubSocket, 0) >= 0 || printf("zmq_msg_send: %s\n",zmq_strerror(errno));
zmq_msg_close(&message) && printf("zmq_msg_close: %s\n",zmq_strerror(errno)); zmq_msg_close(&message) && printf("zmq_msg_close: %s\n",zmq_strerror(errno));
iteration++; iteration++;
} }
assert(publicationsReceived == 3); assert(publicationsReceived == 3);
assert(!isSubscribed); assert(!isSubscribed);
......
...@@ -19,13 +19,14 @@ ...@@ -19,13 +19,14 @@
*/ */
#include "../include/zmq.h"
#include <stdio.h> #include <stdio.h>
#include "testutil.hpp" #include <string.h>
#undef NDEBUG
#include <assert.h>
int main (void) int main (void)
{ {
fprintf (stderr, "test_hwm running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -27,8 +27,6 @@ ...@@ -27,8 +27,6 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_invalid_rep running...\n");
// Create REQ/ROUTER wiring. // Create REQ/ROUTER wiring.
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <pthread.h> #include <pthread.h>
#include <string.h> #include <string.h>
#include "testutil.hpp" #include "testutil.hpp"
...@@ -34,11 +33,9 @@ static int rep_socket_events; ...@@ -34,11 +33,9 @@ static int rep_socket_events;
const char *addr; const char *addr;
extern "C" // REQ socket monitor thread
static void *req_socket_monitor (void *ctx)
{ {
// REQ socket monitor thread
static void *req_socket_monitor (void *ctx)
{
zmq_event_t event; zmq_event_t event;
int rc; int rc;
...@@ -50,9 +47,11 @@ extern "C" ...@@ -50,9 +47,11 @@ extern "C"
while (true) { while (true) {
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0); rc = zmq_msg_recv (&msg, s, 0);
if (rc == -1 && zmq_errno() == ETERM) break; if (rc == -1 && zmq_errno() == ETERM)
break;
assert (rc != -1); assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event)); memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) { switch (event.event) {
case ZMQ_EVENT_CONNECTED: case ZMQ_EVENT_CONNECTED:
...@@ -85,14 +84,11 @@ extern "C" ...@@ -85,14 +84,11 @@ extern "C"
} }
zmq_close (s); zmq_close (s);
return NULL; return NULL;
}
} }
extern "C" // 2nd REQ socket monitor thread
static void *req2_socket_monitor (void *ctx)
{ {
// 2nd REQ socket monitor thread
static void *req2_socket_monitor (void *ctx)
{
zmq_event_t event; zmq_event_t event;
int rc; int rc;
...@@ -104,9 +100,11 @@ extern "C" ...@@ -104,9 +100,11 @@ extern "C"
while (true) { while (true) {
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0); rc = zmq_msg_recv (&msg, s, 0);
if (rc == -1 && zmq_errno() == ETERM) break; if (rc == -1 && zmq_errno() == ETERM)
break;
assert (rc != -1); assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event)); memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) { switch (event.event) {
case ZMQ_EVENT_CONNECTED: case ZMQ_EVENT_CONNECTED:
...@@ -123,15 +121,11 @@ extern "C" ...@@ -123,15 +121,11 @@ extern "C"
} }
zmq_close (s); zmq_close (s);
return NULL; return NULL;
}
} }
// REP socket monitor thread
extern "C" static void *rep_socket_monitor (void *ctx)
{ {
// REP socket monitor thread
static void *rep_socket_monitor (void *ctx)
{
zmq_event_t event; zmq_event_t event;
int rc; int rc;
...@@ -143,9 +137,11 @@ extern "C" ...@@ -143,9 +137,11 @@ extern "C"
while (true) { while (true) {
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init (&msg); zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0); rc = zmq_msg_recv (&msg, s, 0);
if (rc == -1 && zmq_errno() == ETERM) break; if (rc == -1 && zmq_errno() == ETERM)
break;
assert (rc != -1); assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event)); memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) { switch (event.event) {
case ZMQ_EVENT_LISTENING: case ZMQ_EVENT_LISTENING:
...@@ -178,7 +174,6 @@ extern "C" ...@@ -178,7 +174,6 @@ extern "C"
} }
zmq_close (s); zmq_close (s);
return NULL; return NULL;
}
} }
int main (void) int main (void)
...@@ -230,6 +225,8 @@ int main (void) ...@@ -230,6 +225,8 @@ int main (void)
rc = zmq_connect (req, addr); rc = zmq_connect (req, addr);
assert (rc == 0); assert (rc == 0);
bounce (rep, req);
// 2nd REQ socket // 2nd REQ socket
req2 = zmq_socket (ctx, ZMQ_REQ); req2 = zmq_socket (ctx, ZMQ_REQ);
assert (req2); assert (req2);
...@@ -243,17 +240,13 @@ int main (void) ...@@ -243,17 +240,13 @@ int main (void)
rc = zmq_connect (req2, addr); rc = zmq_connect (req2, addr);
assert (rc == 0); assert (rc == 0);
bounce (rep, req);
// Allow a window for socket events as connect can be async
zmq_sleep (1);
// Close the REP socket // Close the REP socket
rc = zmq_close (rep); rc = zmq_close (rep);
assert (rc == 0); assert (rc == 0);
// Allow some time for detecting error states // Allow some time for detecting error states
zmq_sleep (1); struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Close the REQ socket // Close the REQ socket
rc = zmq_close (req); rc = zmq_close (req);
...@@ -263,9 +256,6 @@ int main (void) ...@@ -263,9 +256,6 @@ int main (void)
rc = zmq_close (req2); rc = zmq_close (req2);
assert (rc == 0); assert (rc == 0);
// Allow for closed or disconnected events to bubble up
zmq_sleep (1);
zmq_ctx_term (ctx); zmq_ctx_term (ctx);
// Expected REP socket events // Expected REP socket events
......
...@@ -53,21 +53,21 @@ int main (void) ...@@ -53,21 +53,21 @@ int main (void)
zmq_msg_t msg; zmq_msg_t msg;
rc = zmq_msg_init (&msg); rc = zmq_msg_init (&msg);
assert (rc == 0); assert (rc == 0);
rc = zmq_recvmsg (sb, &msg, 0); rc = zmq_msg_recv (&msg, sb, 0);
assert (rc >= 0); assert (rc >= 0);
int more = zmq_msg_get (&msg, ZMQ_MORE); int more = zmq_msg_more (&msg);
assert (more == 1); assert (more == 1);
// Then the first part of the message body. // Then the first part of the message body.
rc = zmq_recvmsg (sb, &msg, 0); rc = zmq_msg_recv (&msg, sb, 0);
assert (rc == 1); assert (rc == 1);
more = zmq_msg_get (&msg, ZMQ_MORE); more = zmq_msg_more (&msg);
assert (more == 1); assert (more == 1);
// And finally, the second part of the message body. // And finally, the second part of the message body.
rc = zmq_recvmsg (sb, &msg, 0); rc = zmq_msg_recv (&msg, sb, 0);
assert (rc == 1); assert (rc == 1);
more = zmq_msg_get (&msg, ZMQ_MORE); more = zmq_msg_more (&msg);
assert (more == 0); assert (more == 0);
// Deallocate the infrastructure. // Deallocate the infrastructure.
......
...@@ -23,8 +23,6 @@ ...@@ -23,8 +23,6 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_pair_inproc running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -23,8 +23,6 @@ ...@@ -23,8 +23,6 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_pair_ipc running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -24,8 +24,6 @@ ...@@ -24,8 +24,6 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_pair_tcp running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
/* /*
Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2007-2013 iMatix Corporation
Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
...@@ -18,10 +18,11 @@ ...@@ -18,10 +18,11 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <zmq.h> #include "../include/zmq.h"
#include <assert.h>
#include <string.h> #include <string.h>
#include <stdbool.h> #include <stdbool.h>
#undef NDEBUG
#include <assert.h>
// ZMTP protocol greeting structure // ZMTP protocol greeting structure
...@@ -44,7 +45,6 @@ static zmtp_greeting_t greeting ...@@ -44,7 +45,6 @@ static zmtp_greeting_t greeting
int main (void) int main (void)
{ {
fprintf (stderr, "test_raw_sock running...\n");
int rc; int rc;
// Set up our context and sockets // Set up our context and sockets
......
...@@ -28,8 +28,6 @@ ...@@ -28,8 +28,6 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_reqrep_device running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -66,13 +64,13 @@ int main (void) ...@@ -66,13 +64,13 @@ int main (void)
zmq_msg_t msg; zmq_msg_t msg;
rc = zmq_msg_init (&msg); rc = zmq_msg_init (&msg);
assert (rc == 0); assert (rc == 0);
rc = zmq_recvmsg (router, &msg, 0); rc = zmq_msg_recv (&msg, router, 0);
assert (rc >= 0); assert (rc >= 0);
int rcvmore; int rcvmore;
size_t sz = sizeof (rcvmore); size_t sz = sizeof (rcvmore);
rc = zmq_getsockopt (router, ZMQ_RCVMORE, &rcvmore, &sz); rc = zmq_getsockopt (router, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0); assert (rc == 0);
rc = zmq_sendmsg (dealer, &msg, rcvmore ? ZMQ_SNDMORE : 0); rc = zmq_msg_send (&msg, dealer, rcvmore? ZMQ_SNDMORE: 0);
assert (rc >= 0); assert (rc >= 0);
} }
...@@ -104,12 +102,12 @@ int main (void) ...@@ -104,12 +102,12 @@ int main (void)
zmq_msg_t msg; zmq_msg_t msg;
rc = zmq_msg_init (&msg); rc = zmq_msg_init (&msg);
assert (rc == 0); assert (rc == 0);
rc = zmq_recvmsg (dealer, &msg, 0); rc = zmq_msg_recv (&msg, dealer, 0);
assert (rc >= 0); assert (rc >= 0);
int rcvmore; int rcvmore;
rc = zmq_getsockopt (dealer, ZMQ_RCVMORE, &rcvmore, &sz); rc = zmq_getsockopt (dealer, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0); assert (rc == 0);
rc = zmq_sendmsg (router, &msg, rcvmore ? ZMQ_SNDMORE : 0); rc = zmq_msg_send (&msg, router, rcvmore? ZMQ_SNDMORE: 0);
assert (rc >= 0); assert (rc >= 0);
} }
......
...@@ -23,8 +23,6 @@ ...@@ -23,8 +23,6 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_reqrep_inproc running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -23,8 +23,6 @@ ...@@ -23,8 +23,6 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_reqrep_ipc running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -24,8 +24,6 @@ ...@@ -24,8 +24,6 @@
int main (void) int main (void)
{ {
fprintf (stderr, "test_reqrep_tcp running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
......
...@@ -19,7 +19,6 @@ ...@@ -19,7 +19,6 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#undef NDEBUG #undef NDEBUG
......
...@@ -55,8 +55,6 @@ int main (void) ...@@ -55,8 +55,6 @@ int main (void)
int rc; int rc;
pthread_t threads [THREAD_COUNT]; pthread_t threads [THREAD_COUNT];
fprintf (stderr, "test_shutdown_stress running...\n");
for (j = 0; j != 10; j++) { for (j = 0; j != 10; j++) {
// Check the shutdown with many parallel I/O threads. // Check the shutdown with many parallel I/O threads.
......
...@@ -20,20 +20,17 @@ ...@@ -20,20 +20,17 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h> #include <stdio.h>
#include <time.h>
#undef NDEBUG #undef NDEBUG
#include <assert.h> #include <assert.h>
int main (void) int main (void)
{ {
fprintf (stderr, "test_sub_forward running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
// First, create an intermediate device. // First, create an intermediate device
void *xpub = zmq_socket (ctx, ZMQ_XPUB); void *xpub = zmq_socket (ctx, ZMQ_XPUB);
assert (xpub); assert (xpub);
int rc = zmq_bind (xpub, "tcp://127.0.0.1:5560"); int rc = zmq_bind (xpub, "tcp://127.0.0.1:5560");
...@@ -43,13 +40,13 @@ int main (void) ...@@ -43,13 +40,13 @@ int main (void)
rc = zmq_bind (xsub, "tcp://127.0.0.1:5561"); rc = zmq_bind (xsub, "tcp://127.0.0.1:5561");
assert (rc == 0); assert (rc == 0);
// Create a publisher. // Create a publisher
void *pub = zmq_socket (ctx, ZMQ_PUB); void *pub = zmq_socket (ctx, ZMQ_PUB);
assert (pub); assert (pub);
rc = zmq_connect (pub, "tcp://127.0.0.1:5561"); rc = zmq_connect (pub, "tcp://127.0.0.1:5561");
assert (rc == 0); assert (rc == 0);
// Create a subscriber. // Create a subscriber
void *sub = zmq_socket (ctx, ZMQ_SUB); void *sub = zmq_socket (ctx, ZMQ_SUB);
assert (sub); assert (sub);
rc = zmq_connect (sub, "tcp://127.0.0.1:5560"); rc = zmq_connect (sub, "tcp://127.0.0.1:5560");
...@@ -59,27 +56,28 @@ int main (void) ...@@ -59,27 +56,28 @@ int main (void)
rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0); rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0); assert (rc == 0);
// Pass the subscription upstream through the device. // Pass the subscription upstream through the device
char buff [32]; char buff [32];
rc = zmq_recv (xpub, buff, sizeof (buff), 0); rc = zmq_recv (xpub, buff, sizeof (buff), 0);
assert (rc >= 0); assert (rc >= 0);
rc = zmq_send (xsub, buff, rc, 0); rc = zmq_send (xsub, buff, rc, 0);
assert (rc >= 0); assert (rc >= 0);
// Wait a bit till the subscription gets to the publisher. // Wait a bit till the subscription gets to the publisher
zmq_sleep (1); struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Send an empty message. // Send an empty message
rc = zmq_send (pub, NULL, 0, 0); rc = zmq_send (pub, NULL, 0, 0);
assert (rc == 0); assert (rc == 0);
// Pass the message downstream through the device. // Pass the message downstream through the device
rc = zmq_recv (xsub, buff, sizeof (buff), 0); rc = zmq_recv (xsub, buff, sizeof (buff), 0);
assert (rc >= 0); assert (rc >= 0);
rc = zmq_send (xpub, buff, rc, 0); rc = zmq_send (xpub, buff, rc, 0);
assert (rc >= 0); assert (rc >= 0);
// Receive the message in the subscriber. // Receive the message in the subscriber
rc = zmq_recv (sub, buff, sizeof (buff), 0); rc = zmq_recv (sub, buff, sizeof (buff), 0);
assert (rc == 0); assert (rc == 0);
......
...@@ -20,9 +20,9 @@ ...@@ -20,9 +20,9 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <time.h>
#undef NDEBUG #undef NDEBUG
#include <assert.h> #include <assert.h>
...@@ -33,8 +33,6 @@ int main (void) ...@@ -33,8 +33,6 @@ int main (void)
char buf[32]; char buf[32];
const char *ep = "tcp://127.0.0.1:5560"; const char *ep = "tcp://127.0.0.1:5560";
fprintf (stderr, "unbind endpoint test running...\n");
// Create infrastructure. // Create infrastructure.
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
...@@ -47,24 +45,25 @@ int main (void) ...@@ -47,24 +45,25 @@ int main (void)
rc = zmq_connect (pull, ep); rc = zmq_connect (pull, ep);
assert (rc == 0); assert (rc == 0);
// Pass one message through to ensure the connection is established. // Pass one message through to ensure the connection is established
rc = zmq_send (push, "ABC", 3, 0); rc = zmq_send (push, "ABC", 3, 0);
assert (rc == 3); assert (rc == 3);
rc = zmq_recv (pull, buf, sizeof (buf), 0); rc = zmq_recv (pull, buf, sizeof (buf), 0);
assert (rc == 3); assert (rc == 3);
// Unbind the lisnening endpoint // Unbind the listening endpoint
rc = zmq_unbind (push, ep); rc = zmq_unbind (push, ep);
assert (rc == 0); assert (rc == 0);
// Let events some time // Allow unbind to settle
zmq_sleep (1); struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Check that sending would block (there's no outbound connection). // Check that sending would block (there's no outbound connection)
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
assert (rc == -1 && zmq_errno () == EAGAIN); assert (rc == -1 && zmq_errno () == EAGAIN);
// Clean up. // Clean up
rc = zmq_close (pull); rc = zmq_close (pull);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (push); rc = zmq_close (push);
...@@ -72,10 +71,7 @@ int main (void) ...@@ -72,10 +71,7 @@ int main (void)
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);
// Now the other way round. // Create infrastructure
fprintf (stderr, "disconnect endpoint test running...\n");
// Create infrastructure.
ctx = zmq_ctx_new (); ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
push = zmq_socket (ctx, ZMQ_PUSH); push = zmq_socket (ctx, ZMQ_PUSH);
...@@ -97,8 +93,8 @@ int main (void) ...@@ -97,8 +93,8 @@ int main (void)
rc = zmq_disconnect (push, ep); rc = zmq_disconnect (push, ep);
assert (rc == 0); assert (rc == 0);
// Let events some time // Allow disconnect to settle
zmq_sleep (1); nanosleep (&t, NULL);
// Check that sending would block (there's no inbound connections). // Check that sending would block (there's no inbound connections).
rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT);
......
/* /*
Copyright (c) 2007-2013 iMatix Corporation
Copyright (c) 2010-2011 250bpm s.r.o. Copyright (c) 2010-2011 250bpm s.r.o.
Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
...@@ -19,101 +20,69 @@ ...@@ -19,101 +20,69 @@
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h" #include <sys/time.h>
#include <pthread.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#undef NDEBUG #undef NDEBUG
#include <assert.h> #include <assert.h>
extern "C"
{
void *worker(void *ctx)
{
// Worker thread connects after delay of 1 second. Then it waits
// for 1 more second, so that async connect has time to succeed.
zmq_sleep (1);
void *sc = zmq_socket (ctx, ZMQ_PUSH);
assert (sc);
int rc = zmq_connect (sc, "inproc://timeout_test");
assert (rc == 0);
zmq_sleep (1);
rc = zmq_close (sc);
assert (rc == 0);
return NULL;
}
}
int main (void) int main (void)
{ {
fprintf (stderr, "test_timeo running...\n");
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
assert (ctx); assert (ctx);
// Create a disconnected socket. void *frontend = zmq_socket (ctx, ZMQ_DEALER);
void *sb = zmq_socket (ctx, ZMQ_PULL); assert (frontend);
assert (sb); int rc = zmq_bind (frontend, "inproc://timeout_test");
int rc = zmq_bind (sb, "inproc://timeout_test");
assert (rc == 0); assert (rc == 0);
// Check whether non-blocking recv returns immediately. // Receive on disconnected socket returns immediately
char buf [] = "12345678ABCDEFGH12345678abcdefgh"; char buffer [32];
rc = zmq_recv (sb, buf, 32, ZMQ_DONTWAIT); rc = zmq_recv (frontend, buffer, 32, ZMQ_DONTWAIT);
assert (rc == -1); assert (rc == -1);
assert (zmq_errno() == EAGAIN); assert (zmq_errno() == EAGAIN);
// Check whether recv timeout is honoured. // Check whether receive timeout is honored
int timeout = 500; int timeout = 250;
size_t timeout_size = sizeof timeout; rc = zmq_setsockopt (frontend, ZMQ_RCVTIMEO, &timeout, sizeof (int));
rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
assert (rc == 0); assert (rc == 0);
void *watch = zmq_stopwatch_start ();
rc = zmq_recv (sb, buf, 32, 0);
assert (rc == -1);
assert (zmq_errno () == EAGAIN);
unsigned long elapsed = zmq_stopwatch_stop (watch);
assert (elapsed > 440000 && elapsed < 550000);
// Check whether connection during the wait doesn't distort the timeout. struct timeval before, after;
timeout = 2000; gettimeofday (&before, NULL);
rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size); rc = zmq_recv (frontend, buffer, 32, 0);
assert (rc == 0);
pthread_t thread;
rc = pthread_create (&thread, NULL, worker, ctx);
assert (rc == 0);
watch = zmq_stopwatch_start ();
rc = zmq_recv (sb, buf, 32, 0);
assert (rc == -1); assert (rc == -1);
assert (zmq_errno () == EAGAIN); assert (zmq_errno () == EAGAIN);
elapsed = zmq_stopwatch_stop (watch); gettimeofday (&after, NULL);
assert (elapsed > 1900000 && elapsed < 2100000);
rc = pthread_join (thread, NULL);
assert (rc == 0);
// Check that timeouts don't break normal message transfer. long elapsed = (long)
void *sc = zmq_socket (ctx, ZMQ_PUSH); ((after.tv_sec * 1000 + after.tv_usec / 1000)
assert (sc); - (before.tv_sec * 1000 + before.tv_usec / 1000));
rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
assert (rc == 0); assert (elapsed > 200 && elapsed < 300);
rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size);
// Check that normal message flow works as expected
void *backend = zmq_socket (ctx, ZMQ_DEALER);
assert (backend);
rc = zmq_connect (backend, "inproc://timeout_test");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (sc, "inproc://timeout_test"); rc = zmq_setsockopt (backend, ZMQ_SNDTIMEO, &timeout, sizeof (int));
assert (rc == 0); assert (rc == 0);
rc = zmq_send (sc, buf, 32, 0);
assert (rc == 32);
rc = zmq_recv (sb, buf, 32, 0);
assert (rc == 32);
// Clean-up. rc = zmq_send (backend, "Hello", 5, 0);
rc = zmq_close (sc); assert (rc == 5);
rc = zmq_recv (frontend, buffer, 32, 0);
assert (rc == 5);
// Clean-up
rc = zmq_close (backend);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (sb);
rc = zmq_close (frontend);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_term (ctx); rc = zmq_ctx_term (ctx);
assert (rc == 0); assert (rc == 0);
return 0 ; return 0 ;
} }
...@@ -24,11 +24,11 @@ ...@@ -24,11 +24,11 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include <string.h> #include <string.h>
#undef NDEBUG #undef NDEBUG
#include <assert.h> #include <assert.h>
inline void bounce (void *sb, void *sc) static void
bounce (void *sb, void *sc)
{ {
const char *content = "12345678ABCDEFGH12345678abcdefgh"; const char *content = "12345678ABCDEFGH12345678abcdefgh";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment