Commit d13b74e9 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #832 from mrvn/pull_stream-connect-notification

Add STREAM connect notification.
parents 53d0199e afb24b53
...@@ -177,6 +177,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -177,6 +177,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
read_msg = &stream_engine_t::pull_msg_from_session; read_msg = &stream_engine_t::pull_msg_from_session;
write_msg = &stream_engine_t::push_msg_to_session; write_msg = &stream_engine_t::push_msg_to_session;
// For raw sockets, send an initial 0-length message to the
// application so that it knows a peer has connected.
msg_t connector;
connector.init();
int rc = (this->*write_msg) (&connector);
connector.close();
session->flush ();
} }
else { else {
// Send the 'length' and 'flags' fields of the identity message. // Send the 'length' and 'flags' fields of the identity message.
......
...@@ -68,6 +68,7 @@ test_stream_to_dealer (void) ...@@ -68,6 +68,7 @@ test_stream_to_dealer (void)
rc = zmq_send (dealer, "Hello", 5, 0); rc = zmq_send (dealer, "Hello", 5, 0);
assert (rc == 5); assert (rc == 5);
// Connecting sends a zero message
// First frame is identity // First frame is identity
zmq_msg_t identity; zmq_msg_t identity;
rc = zmq_msg_init (&identity); rc = zmq_msg_init (&identity);
...@@ -76,9 +77,19 @@ test_stream_to_dealer (void) ...@@ -76,9 +77,19 @@ test_stream_to_dealer (void)
assert (rc > 0); assert (rc > 0);
assert (zmq_msg_more (&identity)); assert (zmq_msg_more (&identity));
// Second frame is greeting signature // Second frame is zero
byte buffer [255]; byte buffer [255];
rc = zmq_recv (stream, buffer, 255, 0); rc = zmq_recv (stream, buffer, 255, 0);
assert (rc == 0);
// Real data follows
// First frame is identity
rc = zmq_msg_recv (&identity, stream, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
// Second frame is greeting signature
rc = zmq_recv (stream, buffer, 255, 0);
assert (rc == 10); assert (rc == 10);
assert (memcmp (buffer, greeting.signature, 10) == 0); assert (memcmp (buffer, greeting.signature, 10) == 0);
...@@ -178,14 +189,26 @@ test_stream_to_stream (void) ...@@ -178,14 +189,26 @@ test_stream_to_stream (void)
assert (client); assert (client);
rc = zmq_connect (client, "tcp://localhost:9070"); rc = zmq_connect (client, "tcp://localhost:9070");
assert (rc == 0); assert (rc == 0);
// It would be less surprising to get an empty message instead
// of having to fetch the identity like this [PH 2013/06/27]
uint8_t id [256]; uint8_t id [256];
size_t id_size = 256; size_t id_size = 256;
rc = zmq_getsockopt (client, ZMQ_IDENTITY, id, &id_size); uint8_t buffer [256];
assert (rc == 0);
// Connecting sends a zero message
// Server: First frame is identity, second frame is zero
id_size = zmq_recv (server, id, 256, 0);
assert (id_size > 0);
rc = zmq_recv (server, buffer, 256, 0);
assert (rc == 0);
// Client: First frame is identity, second frame is zero
id_size = zmq_recv (client, id, 256, 0);
assert (id_size > 0);
rc = zmq_recv (client, buffer, 256, 0);
assert (rc == 0);
// Sent HTTP request on client socket // Sent HTTP request on client socket
// Get server identity
rc = zmq_getsockopt (client, ZMQ_IDENTITY, id, &id_size);
assert (rc == 0);
// First frame is server identity // First frame is server identity
rc = zmq_send (client, id, id_size, ZMQ_SNDMORE); rc = zmq_send (client, id, id_size, ZMQ_SNDMORE);
assert (rc == (int) id_size); assert (rc == (int) id_size);
...@@ -196,9 +219,8 @@ test_stream_to_stream (void) ...@@ -196,9 +219,8 @@ test_stream_to_stream (void)
// Get HTTP request; ID frame and then request // Get HTTP request; ID frame and then request
id_size = zmq_recv (server, id, 256, 0); id_size = zmq_recv (server, id, 256, 0);
assert (id_size > 0); assert (id_size > 0);
uint8_t buffer [256];
rc = zmq_recv (server, buffer, 256, 0); rc = zmq_recv (server, buffer, 256, 0);
assert (rc > 0); assert (rc != -1);
assert (memcmp (buffer, "GET /\n\n", 7) == 0); assert (memcmp (buffer, "GET /\n\n", 7) == 0);
// Send reply back to client // Send reply back to client
...@@ -207,8 +229,16 @@ test_stream_to_stream (void) ...@@ -207,8 +229,16 @@ test_stream_to_stream (void)
"Content-Type: text/plain\r\n" "Content-Type: text/plain\r\n"
"\r\n" "\r\n"
"Hello, World!"; "Hello, World!";
zmq_send (server, id, id_size, ZMQ_SNDMORE); rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
zmq_send (server, http_response, sizeof (http_response), 0); assert (rc != -1);
rc = zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE);
assert (rc != -1);
// Send zero to close connection to client
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_send (server, NULL, 0, ZMQ_SNDMORE);
assert (rc != -1);
// Get reply at client and check that it's complete // Get reply at client and check that it's complete
id_size = zmq_recv (client, id, 256, 0); id_size = zmq_recv (client, id, 256, 0);
...@@ -216,6 +246,22 @@ test_stream_to_stream (void) ...@@ -216,6 +246,22 @@ test_stream_to_stream (void)
rc = zmq_recv (client, buffer, 256, 0); rc = zmq_recv (client, buffer, 256, 0);
assert (rc == sizeof (http_response)); assert (rc == sizeof (http_response));
assert (memcmp (buffer, http_response, sizeof (http_response)) == 0); assert (memcmp (buffer, http_response, sizeof (http_response)) == 0);
// // Get disconnection notification
// FIXME: why does this block? Bug in STREAM disconnect notification?
// id_size = zmq_recv (client, id, 256, 0);
// assert (id_size > 0);
// rc = zmq_recv (client, buffer, 256, 0);
// assert (rc == 0);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
} }
......
...@@ -68,22 +68,60 @@ int main(int argc, char** argv) ...@@ -68,22 +68,60 @@ int main(int argc, char** argv)
rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666"); rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666");
assert (rc == 0); assert (rc == 0);
// TODO: wait for client to become ready. // wait for connect notification
// Server: Grab the 1st frame (peer identity).
zmq_msg_t peer_frame;
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets [SERVER], 0);
assert (rc != -1);
assert(zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets [SERVER]));
// Server: Grab the 2nd frame (actual payload).
zmq_msg_t data_frame;
rc = zmq_msg_init (&data_frame);
assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets [SERVER], 0);
assert (rc != -1);
assert(zmq_msg_size (&data_frame) == 0);
// Client: Grab the 1st frame (peer identity).
rc = zmq_msg_init (&peer_frame);
assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0);
assert (rc != -1);
assert(zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets [CLIENT]));
// Client: Grab the 2nd frame (actual payload).
rc = zmq_msg_init (&data_frame);
assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets [CLIENT], 0);
assert (rc != -1);
assert(zmq_msg_size (&data_frame) == 0);
// Send initial message. // Send initial message.
char blob_data [256]; char blob_data [256];
size_t blob_size = sizeof(blob_data); size_t blob_size = sizeof(blob_data);
rc = zmq_getsockopt (sockets [CLIENT], ZMQ_IDENTITY, blob_data, &blob_size); rc = zmq_getsockopt (sockets [CLIENT], ZMQ_IDENTITY, blob_data, &blob_size);
assert (rc == 0); assert (rc != -1);
assert(blob_size > 0);
zmq_msg_t msg; zmq_msg_t msg;
zmq_msg_init_size (&msg, blob_size); rc = zmq_msg_init_size (&msg, blob_size);
assert (rc == 0);
memcpy (zmq_msg_data (&msg), blob_data, blob_size); memcpy (zmq_msg_data (&msg), blob_data, blob_size);
zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE); rc = zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE);
zmq_msg_close (&msg); assert (rc != -1);
zmq_msg_init_size (&msg, strlen(dialog [0].text)+1); rc = zmq_msg_close (&msg);
memcpy (zmq_msg_data (&msg), dialog [0].text, strlen(dialog [0].text)+1); assert (rc == 0);
zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE); rc = zmq_msg_init_size (&msg, strlen(dialog [0].text));
zmq_msg_close (&msg); assert (rc == 0);
memcpy (zmq_msg_data (&msg), dialog [0].text, strlen(dialog [0].text));
rc = zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_msg_close (&msg);
assert (rc == 0);
// TODO: make sure this loop doesn't loop forever if something is wrong // TODO: make sure this loop doesn't loop forever if something is wrong
// with the test (or the implementation). // with the test (or the implementation).
...@@ -106,47 +144,64 @@ int main(int argc, char** argv) ...@@ -106,47 +144,64 @@ int main(int argc, char** argv)
// Grab the 1st frame (peer identity). // Grab the 1st frame (peer identity).
zmq_msg_t peer_frame; zmq_msg_t peer_frame;
zmq_msg_init (&peer_frame); rc = zmq_msg_init (&peer_frame);
zmq_msg_recv (&peer_frame, sockets [SERVER], 0); assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets [SERVER], 0);
assert (rc != -1);
assert(zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets [SERVER])); assert (has_more (sockets [SERVER]));
// Grab the 2nd frame (actual payload). // Grab the 2nd frame (actual payload).
zmq_msg_t data_frame; zmq_msg_t data_frame;
zmq_msg_init (&data_frame); rc = zmq_msg_init (&data_frame);
zmq_msg_recv (&data_frame, sockets [SERVER], 0); assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets [SERVER], 0);
assert (rc != -1);
// Make sure payload matches what we expect. // Make sure payload matches what we expect.
const char * const data = (const char*)zmq_msg_data (&data_frame); const char * const data = (const char*)zmq_msg_data (&data_frame);
const int size = zmq_msg_size (&data_frame); const int size = zmq_msg_size (&data_frame);
int cmp = memcmp(dialog [step].text, data, size);
assert (cmp == 0);
++step;
// 0-length frame is a disconnection notification. The server // 0-length frame is a disconnection notification. The server
// should receive it as the last step in the dialogue. // should receive it as the last step in the dialogue.
if (size == 0) { if (size == 0) {
printf ("server received disconnection notification!\n"); printf ("server received disconnection notification!\n");
++step;
assert (step == steps); assert (step == steps);
} }
else { else {
printf ("server received %d bytes.\n", size); printf ("server received %d bytes.\n", size);
fprintf(stderr, "size = %d, len = %ld\n", size, strlen(dialog [step].text));
assert((size_t)size == strlen(dialog [step].text));
int cmp = memcmp(dialog [step].text, data, size);
assert (cmp == 0);
++step;
assert (step < steps); assert (step < steps);
// Prepare the response. // Prepare the response.
zmq_msg_close (&data_frame); rc = zmq_msg_close (&data_frame);
zmq_msg_init_size (&data_frame, strlen (dialog [step].text)); assert (rc == 0);
memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame)); rc = zmq_msg_init_size (&data_frame,
strlen (dialog [step].text));
assert (rc == 0);
memcpy (zmq_msg_data (&data_frame), dialog [step].text,
zmq_msg_size (&data_frame));
// Send the response. // Send the response.
printf ("server sending %d bytes.\n", (int)zmq_msg_size (&data_frame)); printf ("server sending %d bytes.\n",
zmq_msg_send (&peer_frame, sockets [SERVER], ZMQ_SNDMORE); (int)zmq_msg_size (&data_frame));
zmq_msg_send (&data_frame, sockets [SERVER], ZMQ_SNDMORE); rc = zmq_msg_send (&peer_frame, sockets [SERVER], ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_msg_send (&data_frame, sockets [SERVER], ZMQ_SNDMORE);
assert (rc != -1);
} }
// Release resources. // Release resources.
zmq_msg_close (&peer_frame); rc = zmq_msg_close (&peer_frame);
zmq_msg_close (&data_frame); assert (rc == 0);
rc = zmq_msg_close (&data_frame);
assert (rc == 0);
} }
// Check for data received by the client. // Check for data received by the client.
...@@ -155,18 +210,26 @@ int main(int argc, char** argv) ...@@ -155,18 +210,26 @@ int main(int argc, char** argv)
// Grab the 1st frame (peer identity). // Grab the 1st frame (peer identity).
zmq_msg_t peer_frame; zmq_msg_t peer_frame;
zmq_msg_init (&peer_frame); rc = zmq_msg_init (&peer_frame);
zmq_msg_recv (&peer_frame, sockets [CLIENT], 0); assert (rc == 0);
rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0);
assert (rc != -1);
assert(zmq_msg_size (&peer_frame) > 0);
assert (has_more (sockets [CLIENT])); assert (has_more (sockets [CLIENT]));
// Grab the 2nd frame (actual payload). // Grab the 2nd frame (actual payload).
zmq_msg_t data_frame; zmq_msg_t data_frame;
zmq_msg_init (&data_frame); rc = zmq_msg_init (&data_frame);
zmq_msg_recv (&data_frame, sockets [CLIENT], 0); assert (rc == 0);
rc = zmq_msg_recv (&data_frame, sockets [CLIENT], 0);
assert (rc != -1);
assert(zmq_msg_size (&data_frame) > 0);
// Make sure payload matches what we expect. // Make sure payload matches what we expect.
const char * const data = (const char*)zmq_msg_data (&data_frame); const char * const data = (const char*)zmq_msg_data (&data_frame);
const int size = zmq_msg_size (&data_frame); const int size = zmq_msg_size (&data_frame);
fprintf(stderr, "size = %d, len = %ld\n", size, strlen(dialog [step].text));
assert((size_t)size == strlen(dialog [step].text));
int cmp = memcmp(dialog [step].text, data, size); int cmp = memcmp(dialog [step].text, data, size);
assert (cmp == 0); assert (cmp == 0);
...@@ -176,25 +239,34 @@ int main(int argc, char** argv) ...@@ -176,25 +239,34 @@ int main(int argc, char** argv)
// Prepare the response (next line in the dialog). // Prepare the response (next line in the dialog).
assert (step < steps); assert (step < steps);
zmq_msg_close (&data_frame); rc = zmq_msg_close (&data_frame);
zmq_msg_init_size (&data_frame, strlen (dialog [step].text)); assert (rc == 0);
rc = zmq_msg_init_size (&data_frame, strlen (dialog [step].text));
assert (rc == 0);
memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame)); memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame));
// Send the response. // Send the response.
printf ("client sending %d bytes.\n", (int)zmq_msg_size (&data_frame)); printf ("client sending %d bytes.\n", (int)zmq_msg_size (&data_frame));
zmq_msg_send (&peer_frame, sockets [CLIENT], ZMQ_SNDMORE); rc = zmq_msg_send (&peer_frame, sockets [CLIENT], ZMQ_SNDMORE);
zmq_msg_send (&data_frame, sockets [CLIENT], ZMQ_SNDMORE); assert (rc != -1);
rc = zmq_msg_send (&data_frame, sockets [CLIENT], ZMQ_SNDMORE);
assert (rc != -1);
// Release resources. // Release resources.
zmq_msg_close (&peer_frame); rc = zmq_msg_close (&peer_frame);
zmq_msg_close (&data_frame); assert (rc == 0);
rc = zmq_msg_close (&data_frame);
assert (rc == 0);
} }
} }
assert (step == steps); assert (step == steps);
printf ("Done, exiting now.\n"); printf ("Done, exiting now.\n");
zmq_close (sockets [CLIENT]); rc = zmq_close (sockets [CLIENT]);
zmq_close (sockets [SERVER]); assert (rc == 0);
zmq_ctx_term (context); rc = zmq_close (sockets [SERVER]);
assert (rc == 0);
rc = zmq_ctx_term (context);
assert (rc == 0);
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment