Commit 5e85fa6a authored by Luca Boccassi's avatar Luca Boccassi Committed by GitHub

Merge pull request #2698 from sigiesec/client-side-auth-error-events

Problem: no tests for client-side events for successful handshake and authentication failure in handshake
parents 7481fba5 74203729
...@@ -275,17 +275,21 @@ int zmq::curve_client_t::process_error ( ...@@ -275,17 +275,21 @@ int zmq::curve_client_t::process_error (
} }
if (msg_size < 7) { if (msg_size < 7) {
session->get_socket ()->event_handshake_failed_protocol ( session->get_socket ()->event_handshake_failed_protocol (
session->get_endpoint (), ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR); session->get_endpoint (),
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR);
errno = EPROTO; errno = EPROTO;
return -1; return -1;
} }
const size_t error_reason_len = static_cast <size_t> (msg_data [6]); const size_t error_reason_len = static_cast <size_t> (msg_data [6]);
if (error_reason_len > msg_size - 7) { if (error_reason_len > msg_size - 7) {
session->get_socket ()->event_handshake_failed_protocol ( session->get_socket ()->event_handshake_failed_protocol (
session->get_endpoint (), ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR); session->get_endpoint (),
ZMQ_PROTOCOL_ERROR_ZMTP_MALFORMED_COMMAND_ERROR);
errno = EPROTO; errno = EPROTO;
return -1; return -1;
} }
const char *error_reason = reinterpret_cast<const char *> (msg_data) + 7;
handle_error_reason (error_reason, error_reason_len);
state = error_received; state = error_received;
return 0; return 0;
} }
......
...@@ -52,3 +52,14 @@ int zmq::mechanism_base_t::check_basic_command_structure (msg_t *msg_) ...@@ -52,3 +52,14 @@ int zmq::mechanism_base_t::check_basic_command_structure (msg_t *msg_)
return 0; return 0;
} }
void zmq::mechanism_base_t::handle_error_reason (const char *error_reason,
int error_reason_len)
{
if (error_reason_len == 3 && error_reason[1] == '0'
&& error_reason[2] == '0' && error_reason[0] >= '3'
&& error_reason[0] <= '5') {
// it is a ZAP status code, so emit an authentication failure event
session->get_socket ()->event_handshake_failed_auth (
session->get_endpoint (), (error_reason[0] - '0') * 100);
}
}
...@@ -43,6 +43,8 @@ class mechanism_base_t : public mechanism_t ...@@ -43,6 +43,8 @@ class mechanism_base_t : public mechanism_t
session_base_t *const session; session_base_t *const session;
int check_basic_command_structure (msg_t *msg_); int check_basic_command_structure (msg_t *msg_);
void handle_error_reason (const char *error_reason, int error_reason_len);
}; };
} }
......
...@@ -83,15 +83,21 @@ int zmq::null_mechanism_t::next_handshake_command (msg_t *msg_) ...@@ -83,15 +83,21 @@ int zmq::null_mechanism_t::next_handshake_command (msg_t *msg_)
} }
if (zap_reply_received && status_code != "200") { if (zap_reply_received && status_code != "200") {
error_command_sent = true;
if (status_code != "300") {
const size_t status_code_len = 3; const size_t status_code_len = 3;
const int rc = msg_->init_size (6 + 1 + status_code_len); const int rc = msg_->init_size (6 + 1 + status_code_len);
zmq_assert (rc == 0); zmq_assert (rc == 0);
unsigned char *msg_data = static_cast<unsigned char *> (msg_->data ()); unsigned char *msg_data =
static_cast<unsigned char *> (msg_->data ());
memcpy (msg_data, "\5ERROR", 6); memcpy (msg_data, "\5ERROR", 6);
msg_data [6] = status_code_len; msg_data[6] = status_code_len;
memcpy (msg_data + 7, status_code.c_str (), status_code_len); memcpy (msg_data + 7, status_code.c_str (), status_code_len);
error_command_sent = true;
return 0; return 0;
} else {
errno = EAGAIN;
return -1;
}
} }
make_command_with_basic_properties (msg_, "\5READY", 6); make_command_with_basic_properties (msg_, "\5READY", 6);
...@@ -165,6 +171,8 @@ int zmq::null_mechanism_t::process_error_command ( ...@@ -165,6 +171,8 @@ int zmq::null_mechanism_t::process_error_command (
errno = EPROTO; errno = EPROTO;
return -1; return -1;
} }
const char *error_reason = reinterpret_cast<const char *> (cmd_data) + 7;
handle_error_reason (error_reason, error_reason_len);
error_command_received = true; error_command_received = true;
return 0; return 0;
} }
......
...@@ -86,9 +86,8 @@ int zmq::plain_client_t::process_handshake_command (msg_t *msg_) ...@@ -86,9 +86,8 @@ int zmq::plain_client_t::process_handshake_command (msg_t *msg_)
if (data_size >= 6 && !memcmp (cmd_data, "\5ERROR", 6)) if (data_size >= 6 && !memcmp (cmd_data, "\5ERROR", 6))
rc = process_error (cmd_data, data_size); rc = process_error (cmd_data, data_size);
else { else {
// TODO see comment in curve_server_t::process_handshake_command
session->get_socket ()->event_handshake_failed_protocol ( session->get_socket ()->event_handshake_failed_protocol (
session->get_endpoint (), ZMQ_PROTOCOL_ERROR_ZMTP_UNSPECIFIED); session->get_endpoint (), ZMQ_PROTOCOL_ERROR_ZMTP_UNEXPECTED_COMMAND);
errno = EPROTO; errno = EPROTO;
rc = -1; rc = -1;
} }
...@@ -215,6 +214,8 @@ int zmq::plain_client_t::process_error ( ...@@ -215,6 +214,8 @@ int zmq::plain_client_t::process_error (
errno = EPROTO; errno = EPROTO;
return -1; return -1;
} }
const char *error_reason = reinterpret_cast<const char *> (cmd_data) + 7;
handle_error_reason (error_reason, error_reason_len);
state = error_command_received; state = error_command_received;
return 0; return 0;
} }
...@@ -281,9 +281,18 @@ void zap_client_common_handshake_t::handle_zap_status_code () ...@@ -281,9 +281,18 @@ void zap_client_common_handshake_t::handle_zap_status_code ()
// we can assume here that status_code is a valid ZAP status code, // we can assume here that status_code is a valid ZAP status code,
// i.e. 200, 300, 400 or 500 // i.e. 200, 300, 400 or 500
if (status_code[0] == '2') { switch (status_code[0]) {
case '2':
state = zap_reply_ok_state; state = zap_reply_ok_state;
} else { break;
case '3':
// a 300 error code (temporary failure)
// should NOT result in an ERROR message, but instead the
// client should be silently disconnected (see CURVEZMQ RFC)
// therefore, go immediately to state error_sent
state = error_sent;
break;
default:
state = sending_error; state = sending_error;
} }
} }
......
...@@ -53,24 +53,6 @@ const char large_identity[] = "0123456789012345678901234567890123456789" ...@@ -53,24 +53,6 @@ const char large_identity[] = "0123456789012345678901234567890123456789"
"0123456789012345678901234567890123456789" "0123456789012345678901234567890123456789"
"012345678901234"; "012345678901234";
#ifdef ZMQ_BUILD_DRAFT_API
// assert_* are macros rather than functions, to allow assertion failures be
// attributed to the causing source code line
#define assert_no_more_monitor_events_with_timeout(monitor, timeout) \
{ \
int event_count = 0; \
int event, err; \
while ((event = get_monitor_event_with_timeout ((monitor), &err, NULL, \
(timeout))) \
!= -1) { \
++event_count; \
fprintf (stderr, "Unexpected event: %x (err = %i)\n", event, err); \
} \
assert (event_count == 0); \
}
#endif
static void zap_handler_large_identity (void *ctx) static void zap_handler_large_identity (void *ctx)
{ {
zap_handler_generic (ctx, zap_ok, large_identity); zap_handler_generic (ctx, zap_ok, large_identity);
...@@ -81,15 +63,17 @@ void expect_new_client_curve_bounce_fail (void *ctx, ...@@ -81,15 +63,17 @@ void expect_new_client_curve_bounce_fail (void *ctx,
char *client_public, char *client_public,
char *client_secret, char *client_secret,
char *my_endpoint, char *my_endpoint,
void *server) void *server,
void **client_mon = NULL)
{ {
curve_client_data_t curve_client_data = {server_public, client_public, curve_client_data_t curve_client_data = {server_public, client_public,
client_secret}; client_secret};
expect_new_client_bounce_fail ( expect_new_client_bounce_fail (ctx, my_endpoint, server,
ctx, my_endpoint, server, socket_config_curve_client, &curve_client_data); socket_config_curve_client,
&curve_client_data, client_mon);
} }
void test_garbage_key(void *ctx, void test_null_key (void *ctx,
void *server, void *server,
void *server_mon, void *server_mon,
char *my_endpoint, char *my_endpoint,
...@@ -113,7 +97,9 @@ void test_garbage_key(void *ctx, ...@@ -113,7 +97,9 @@ void test_garbage_key(void *ctx,
// long) // long)
fprintf (stderr, fprintf (stderr,
"count of ZMQ_EVENT_HANDSHAKE_FAILED_ENCRYPTION events: %i\n", "count of "
"ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL/"
"ZMQ_PROTOCOL_ERROR_ZMTP_CRYPTOGRAPHIC events: %i\n",
handshake_failed_encryption_event_count); handshake_failed_encryption_event_count);
#endif #endif
} }
...@@ -123,8 +109,10 @@ void test_curve_security_with_valid_credentials ( ...@@ -123,8 +109,10 @@ void test_curve_security_with_valid_credentials (
{ {
curve_client_data_t curve_client_data = { curve_client_data_t curve_client_data = {
valid_server_public, valid_client_public, valid_client_secret}; valid_server_public, valid_client_public, valid_client_secret};
void *client = create_and_connect_client ( void *client_mon;
ctx, my_endpoint, socket_config_curve_client, &curve_client_data); void *client =
create_and_connect_client (ctx, my_endpoint, socket_config_curve_client,
&curve_client_data, &client_mon);
bounce (server, client); bounce (server, client);
int rc = zmq_close (client); int rc = zmq_close (client);
assert (rc == 0); assert (rc == 0);
...@@ -134,6 +122,14 @@ void test_curve_security_with_valid_credentials ( ...@@ -134,6 +122,14 @@ void test_curve_security_with_valid_credentials (
assert (event == ZMQ_EVENT_HANDSHAKE_SUCCEEDED); assert (event == ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
assert_no_more_monitor_events_with_timeout (server_mon, timeout); assert_no_more_monitor_events_with_timeout (server_mon, timeout);
event = get_monitor_event_with_timeout (client_mon, NULL, NULL, -1);
assert (event == ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
assert_no_more_monitor_events_with_timeout (client_mon, timeout);
rc = zmq_close (client_mon);
assert (rc == 0);
#endif #endif
} }
...@@ -147,18 +143,29 @@ void test_curve_security_with_bogus_client_credentials ( ...@@ -147,18 +143,29 @@ void test_curve_security_with_bogus_client_credentials (
char bogus_secret [41]; char bogus_secret [41];
zmq_curve_keypair (bogus_public, bogus_secret); zmq_curve_keypair (bogus_public, bogus_secret);
void *client_mon;
expect_new_client_curve_bounce_fail (ctx, valid_server_public, bogus_public, expect_new_client_curve_bounce_fail (ctx, valid_server_public, bogus_public,
bogus_secret, my_endpoint, server); bogus_secret, my_endpoint, server,
&client_mon);
int event_count = 0; int server_event_count = 0;
#ifdef ZMQ_BUILD_DRAFT_API #ifdef ZMQ_BUILD_DRAFT_API
event_count = expect_monitor_event_multiple ( server_event_count = expect_monitor_event_multiple (
server_mon, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 400); server_mon, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 400);
assert (event_count <= 1); assert (server_event_count <= 1);
int client_event_count = expect_monitor_event_multiple (
client_mon, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 400, true);
// this should actually be client_event_count == 1, but this is not always
// true, see https://github.com/zeromq/libzmq/issues/2705
assert (client_event_count <= 1);
int rc = zmq_close (client_mon);
assert (rc == 0);
#endif #endif
// there may be more than one ZAP request due to repeated attempts by the client // there may be more than one ZAP request due to repeated attempts by the client
assert (0 == event_count assert (0 == server_event_count
|| 1 <= zmq_atomic_counter_value (zap_requests_handled)); || 1 <= zmq_atomic_counter_value (zap_requests_handled));
} }
...@@ -643,35 +650,35 @@ int main (void) ...@@ -643,35 +650,35 @@ int main (void)
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon, shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler); handler);
char garbage_key[] = "0000000000000000000000000000000000000000"; char null_key[] = "0000000000000000000000000000000000000000";
// Check CURVE security with a garbage server key // Check CURVE security with a null server key
// This will be caught by the curve_server class, not passed to ZAP // This will be caught by the curve_server class, not passed to ZAP
fprintf (stderr, "test_garbage_server_key\n"); fprintf (stderr, "test_null_key (server)\n");
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server, setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
&server_mon, my_endpoint); &server_mon, my_endpoint);
test_garbage_key (ctx, server, server_mon, my_endpoint, garbage_key, test_null_key (ctx, server, server_mon, my_endpoint, null_key,
valid_client_public, valid_client_secret); valid_client_public, valid_client_secret);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon, shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler); handler);
// Check CURVE security with a garbage client public key // Check CURVE security with a null client public key
// This will be caught by the curve_server class, not passed to ZAP // This will be caught by the curve_server class, not passed to ZAP
fprintf (stderr, "test_garbage_client_public_key\n"); fprintf (stderr, "test_null_key (client public)\n");
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server, setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
&server_mon, my_endpoint); &server_mon, my_endpoint);
test_garbage_key (ctx, server, server_mon, my_endpoint, valid_server_public, test_null_key (ctx, server, server_mon, my_endpoint, valid_server_public,
garbage_key, valid_client_secret); null_key, valid_client_secret);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon, shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler); handler);
// Check CURVE security with a garbage client secret key // Check CURVE security with a null client secret key
// This will be caught by the curve_server class, not passed to ZAP // This will be caught by the curve_server class, not passed to ZAP
fprintf (stderr, "test_garbage_client_secret_key\n"); fprintf (stderr, "test_null_key (client secret)\n");
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server, setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
&server_mon, my_endpoint); &server_mon, my_endpoint);
test_garbage_key (ctx, server, server_mon, my_endpoint, valid_server_public, test_null_key (ctx, server, server_mon, my_endpoint, valid_server_public,
valid_client_public, garbage_key); valid_client_public, null_key);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon, shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler); handler);
...@@ -682,6 +689,7 @@ int main (void) ...@@ -682,6 +689,7 @@ int main (void)
server_mon, timeout); server_mon, timeout);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon, shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler); handler);
fprintf (stderr, "test_curve_security_with_null_client_credentials\n"); fprintf (stderr, "test_curve_security_with_null_client_credentials\n");
setup_context_and_server_side (&ctx, &handler, &zap_thread, &server, setup_context_and_server_side (&ctx, &handler, &zap_thread, &server,
&server_mon, my_endpoint); &server_mon, my_endpoint);
......
...@@ -66,10 +66,11 @@ void test_zap_unsuccessful (void *ctx, ...@@ -66,10 +66,11 @@ void test_zap_unsuccessful (void *ctx,
int expected_event, int expected_event,
int expected_err, int expected_err,
socket_config_fn socket_config_, socket_config_fn socket_config_,
void *socket_config_data_) void *socket_config_data_,
void **client_mon = NULL)
{ {
expect_new_client_bounce_fail (ctx, my_endpoint, server, socket_config_, expect_new_client_bounce_fail (ctx, my_endpoint, server, socket_config_,
socket_config_data_); socket_config_data_, client_mon);
int events_received = 0; int events_received = 0;
#ifdef ZMQ_BUILD_DRAFT_API #ifdef ZMQ_BUILD_DRAFT_API
...@@ -77,7 +78,8 @@ void test_zap_unsuccessful (void *ctx, ...@@ -77,7 +78,8 @@ void test_zap_unsuccessful (void *ctx,
expect_monitor_event_multiple (server_mon, expected_event, expected_err); expect_monitor_event_multiple (server_mon, expected_event, expected_err);
#endif #endif
// there may be more than one ZAP request due to repeated attempts by the client // there may be more than one ZAP request due to repeated attempts by the
// client (actually only in case if ZAP status code 300)
assert (events_received == 0 assert (events_received == 0
|| 1 <= zmq_atomic_counter_value (zap_requests_handled)); || 1 <= zmq_atomic_counter_value (zap_requests_handled));
} }
...@@ -99,6 +101,62 @@ void test_zap_protocol_error (void *ctx, ...@@ -99,6 +101,62 @@ void test_zap_protocol_error (void *ctx,
socket_config_, socket_config_data_); socket_config_, socket_config_data_);
} }
void test_zap_unsuccessful_status_300 (void *ctx,
char *my_endpoint,
void *server,
void *server_mon,
socket_config_fn client_socket_config_,
void *client_socket_config_data_)
{
void *client_mon;
test_zap_unsuccessful (ctx, my_endpoint, server, server_mon,
#ifdef ZMQ_BUILD_DRAFT_API
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 300,
#else
0, 0,
#endif
client_socket_config_, client_socket_config_data_,
&client_mon);
#ifdef ZMQ_BUILD_DRAFT_API
assert_no_more_monitor_events_with_timeout (client_mon, 250);
int rc = zmq_close (client_mon);
assert (rc == 0);
#endif
}
void test_zap_unsuccessful_status_500 (void *ctx,
char *my_endpoint,
void *server,
void *server_mon,
socket_config_fn client_socket_config_,
void *client_socket_config_data_)
{
void *client_mon;
test_zap_unsuccessful (ctx, my_endpoint, server, server_mon,
#ifdef ZMQ_BUILD_DRAFT_API
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 500,
#else
0, 0,
#endif
client_socket_config_, client_socket_config_data_,
&client_mon);
#ifdef ZMQ_BUILD_DRAFT_API
int events_received = 0;
events_received = expect_monitor_event_multiple (
client_mon, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 500, true);
// this should actually be events_received == 1, but this is not always
// true, see https://github.com/zeromq/libzmq/issues/2705
assert (events_received <= 1);
int rc = zmq_close (client_mon);
assert (rc == 0);
#endif
}
void test_zap_errors (socket_config_fn server_socket_config_, void test_zap_errors (socket_config_fn server_socket_config_,
void *server_socket_config_data_, void *server_socket_config_data_,
socket_config_fn client_socket_config_, socket_config_fn client_socket_config_,
...@@ -192,13 +250,9 @@ void test_zap_errors (socket_config_fn server_socket_config_, ...@@ -192,13 +250,9 @@ void test_zap_errors (socket_config_fn server_socket_config_,
&ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint, &ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint,
&zap_handler_wrong_status_temporary_failure, server_socket_config_, &zap_handler_wrong_status_temporary_failure, server_socket_config_,
server_socket_config_data_); server_socket_config_data_);
test_zap_unsuccessful (ctx, my_endpoint, server, server_mon, test_zap_unsuccessful_status_300 (ctx, my_endpoint, server, server_mon,
#ifdef ZMQ_BUILD_DRAFT_API client_socket_config_,
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 300, client_socket_config_data_);
#else
0, 0,
#endif
client_socket_config_, client_socket_config_data_);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon, shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler); handler);
...@@ -207,13 +261,9 @@ void test_zap_errors (socket_config_fn server_socket_config_, ...@@ -207,13 +261,9 @@ void test_zap_errors (socket_config_fn server_socket_config_,
setup_context_and_server_side ( setup_context_and_server_side (
&ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint, &ctx, &handler, &zap_thread, &server, &server_mon, my_endpoint,
&zap_handler_wrong_status_internal_error, server_socket_config_); &zap_handler_wrong_status_internal_error, server_socket_config_);
test_zap_unsuccessful (ctx, my_endpoint, server, server_mon, test_zap_unsuccessful_status_500 (ctx, my_endpoint, server, server_mon,
#ifdef ZMQ_BUILD_DRAFT_API client_socket_config_,
ZMQ_EVENT_HANDSHAKE_FAILED_AUTH, 500, client_socket_config_data_);
#else
0, 0,
#endif
client_socket_config_, client_socket_config_data_);
shutdown_context_and_server_side (ctx, zap_thread, server, server_mon, shutdown_context_and_server_side (ctx, zap_thread, server, server_mon,
handler); handler);
} }
......
...@@ -288,6 +288,30 @@ void zap_handler (void *ctx) ...@@ -288,6 +288,30 @@ void zap_handler (void *ctx)
zap_handler_generic (ctx, zap_ok); zap_handler_generic (ctx, zap_ok);
} }
void setup_handshake_socket_monitor (void *ctx,
void *server,
void **server_mon,
const char *monitor_endpoint)
{
#ifdef ZMQ_BUILD_DRAFT_API
// Monitor handshake events on the server
int rc = zmq_socket_monitor (server, monitor_endpoint,
ZMQ_EVENT_HANDSHAKE_SUCCEEDED
| ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
| ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
| ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
assert (rc == 0);
// Create socket for collecting monitor events
*server_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (*server_mon);
// Connect it to the inproc endpoints so they'll get events
rc = zmq_connect (*server_mon, monitor_endpoint);
assert (rc == 0);
#endif
}
void setup_context_and_server_side ( void setup_context_and_server_side (
void **ctx, void **ctx,
void **handler, void **handler,
...@@ -335,25 +359,9 @@ void setup_context_and_server_side ( ...@@ -335,25 +359,9 @@ void setup_context_and_server_side (
rc = zmq_getsockopt (*server, ZMQ_LAST_ENDPOINT, my_endpoint, &len); rc = zmq_getsockopt (*server, ZMQ_LAST_ENDPOINT, my_endpoint, &len);
assert (rc == 0); assert (rc == 0);
#ifdef ZMQ_BUILD_DRAFT_API const char server_monitor_endpoint [] = "inproc://monitor-server";
char monitor_endpoint [] = "inproc://monitor-server"; setup_handshake_socket_monitor (*ctx, *server, server_mon,
server_monitor_endpoint);
// Monitor handshake events on the server
rc = zmq_socket_monitor (*server, monitor_endpoint,
ZMQ_EVENT_HANDSHAKE_SUCCEEDED
| ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
| ZMQ_EVENT_HANDSHAKE_FAILED_AUTH
| ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
assert (rc == 0);
// Create socket for collecting monitor events
*server_mon = zmq_socket (*ctx, ZMQ_PAIR);
assert (*server_mon);
// Connect it to the inproc endpoints so they'll get events
rc = zmq_connect (*server_mon, monitor_endpoint);
assert (rc == 0);
#endif
} }
void shutdown_context_and_server_side (void *ctx, void shutdown_context_and_server_side (void *ctx,
...@@ -389,7 +397,8 @@ void shutdown_context_and_server_side (void *ctx, ...@@ -389,7 +397,8 @@ void shutdown_context_and_server_side (void *ctx,
void *create_and_connect_client (void *ctx, void *create_and_connect_client (void *ctx,
char *my_endpoint, char *my_endpoint,
socket_config_fn socket_config_, socket_config_fn socket_config_,
void *socket_config_data_) void *socket_config_data_,
void **client_mon = NULL)
{ {
void *client = zmq_socket (ctx, ZMQ_DEALER); void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client); assert (client);
...@@ -399,6 +408,12 @@ void *create_and_connect_client (void *ctx, ...@@ -399,6 +408,12 @@ void *create_and_connect_client (void *ctx,
int rc = zmq_connect (client, my_endpoint); int rc = zmq_connect (client, my_endpoint);
assert (rc == 0); assert (rc == 0);
if (client_mon)
{
setup_handshake_socket_monitor (ctx, client, client_mon,
"inproc://client-monitor");
}
return client; return client;
} }
...@@ -406,10 +421,11 @@ void expect_new_client_bounce_fail (void *ctx, ...@@ -406,10 +421,11 @@ void expect_new_client_bounce_fail (void *ctx,
char *my_endpoint, char *my_endpoint,
void *server, void *server,
socket_config_fn socket_config_, socket_config_fn socket_config_,
void *socket_config_data_) void *socket_config_data_,
void **client_mon = NULL)
{ {
void *client = create_and_connect_client (ctx, my_endpoint, socket_config_, void *client = create_and_connect_client (ctx, my_endpoint, socket_config_,
socket_config_data_); socket_config_data_, client_mon);
expect_bounce_fail (server, client); expect_bounce_fail (server, client);
close_zero_linger (client); close_zero_linger (client);
} }
...@@ -484,6 +500,19 @@ int get_monitor_event_with_timeout (void *monitor, ...@@ -484,6 +500,19 @@ int get_monitor_event_with_timeout (void *monitor,
} }
#ifdef ZMQ_BUILD_DRAFT_API #ifdef ZMQ_BUILD_DRAFT_API
void print_unexpected_event (int event,
int err,
int expected_event,
int expected_err)
{
fprintf(
stderr,
"Unexpected event: 0x%x, value = %i/0x%x (expected: 0x%x, value "
"= %i/0x%x)\n",
event, err, err, expected_event, expected_err, expected_err);
}
// expects that one or more occurrences of the expected event are received // expects that one or more occurrences of the expected event are received
// via the specified socket monitor // via the specified socket monitor
// returns the number of occurrences of the expected event // returns the number of occurrences of the expected event
...@@ -493,20 +522,29 @@ int get_monitor_event_with_timeout (void *monitor, ...@@ -493,20 +522,29 @@ int get_monitor_event_with_timeout (void *monitor,
// https://github.com/zeromq/libzmq/issues/2644 // https://github.com/zeromq/libzmq/issues/2644
int expect_monitor_event_multiple (void *server_mon, int expect_monitor_event_multiple (void *server_mon,
int expected_event, int expected_event,
int expected_err = -1) int expected_err = -1,
bool optional = false)
{ {
int count_of_expected_events = 0; int count_of_expected_events = 0;
int client_closed_connection = 0; int client_closed_connection = 0;
// infinite timeout at the start int timeout = 250;
int timeout = -1; int wait_time = 0;
int event; int event;
int err; int err;
while ( while (
(event = get_monitor_event_with_timeout (server_mon, &err, NULL, timeout)) (event = get_monitor_event_with_timeout (server_mon, &err, NULL, timeout))
!= -1) { != -1 || !count_of_expected_events) {
timeout = 250; if (event == -1) {
if (optional)
break;
wait_time += timeout;
fprintf (stderr,
"Still waiting for first event after %ims (expected event "
"%x (value %i/%x))\n",
wait_time, expected_event, expected_err, expected_err);
continue;
}
// ignore errors with EPIPE/ECONNRESET/ECONNABORTED, which can happen // ignore errors with EPIPE/ECONNRESET/ECONNABORTED, which can happen
// ECONNRESET can happen on very slow machines, when the engine writes // ECONNRESET can happen on very slow machines, when the engine writes
// to the peer and then tries to read the socket before the peer reads // to the peer and then tries to read the socket before the peer reads
...@@ -522,19 +560,40 @@ int expect_monitor_event_multiple (void *server_mon, ...@@ -522,19 +560,40 @@ int expect_monitor_event_multiple (void *server_mon,
} }
if (event != expected_event if (event != expected_event
|| (-1 != expected_err && err != expected_err)) { || (-1 != expected_err && err != expected_err)) {
fprintf ( print_unexpected_event (event, err, expected_event, expected_err);
stderr,
"Unexpected event: 0x%x, value = %i/0x%x (expected: 0x%x, value "
"= %i/0x%x)\n",
event, err, err, expected_event, expected_err, expected_err);
assert (false); assert (false);
} }
++count_of_expected_events; ++count_of_expected_events;
} }
assert (count_of_expected_events > 0 || client_closed_connection); assert (optional || count_of_expected_events > 0 || client_closed_connection);
return count_of_expected_events; return count_of_expected_events;
} }
// assert_* are macros rather than functions, to allow assertion failures be
// attributed to the causing source code line
#define assert_no_more_monitor_events_with_timeout(monitor, timeout) \
{ \
int event_count = 0; \
int event, err; \
while ((event = get_monitor_event_with_timeout ((monitor), &err, NULL, \
(timeout))) \
!= -1) { \
if (event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL \
&& (err == EPIPE || err == ECONNRESET \
|| err == ECONNABORTED)) { \
fprintf (stderr, \
"Ignored event (skipping any further events): %x " \
"(err = %i)\n", \
event, err); \
continue; \
} \
++event_count; \
print_unexpected_event (event, err, 0, 0); \
} \
assert (event_count == 0); \
}
#endif #endif
#endif #endif
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment