test_stream.cpp 9.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

20
#include "testutil.hpp"
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38

//  ZMTP protocol greeting structure

typedef unsigned char byte;
typedef struct {
    byte signature [10];    //  0xFF 8*0x00 0x7F
    byte version [2];       //  0x03 0x00 for ZMTP/3.0
    byte mechanism [20];    //  "NULL"
    byte as_server;
    byte filler [31];
} zmtp_greeting_t;

#define ZMTP_DEALER  5      //  Socket type constants

//  This is a greeting matching what 0MQ will send us; note the
//  8-byte size is set to 1 for backwards compatibility

static zmtp_greeting_t greeting
39
= { { 0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F }, { 3, 0 }, { 'N', 'U', 'L', 'L'}, 0, { 0 } };
40 41

static void
42
test_stream_to_dealer (void)
43 44 45 46 47 48 49 50 51 52 53 54 55 56
{
    int rc;

    //  Set up our context and sockets
    void *ctx = zmq_ctx_new ();
    assert (ctx);

    //  We'll be using this socket in raw mode
    void *stream = zmq_socket (ctx, ZMQ_STREAM);
    assert (stream);

    int zero = 0;
    rc = zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero));
    assert (rc == 0);
57 58 59
    int enabled = 1;
    rc = zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
    assert (rc == 0);
60
    rc = zmq_bind (stream, "tcp://127.0.0.1:5556");
61 62 63 64 65 66 67 68 69 70 71 72 73
    assert (rc == 0);

    //  We'll be using this socket as the other peer
    void *dealer = zmq_socket (ctx, ZMQ_DEALER);
    assert (dealer);
    rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero));
    assert (rc == 0);
    rc = zmq_connect (dealer, "tcp://localhost:5556");

    //  Send a message on the dealer socket
    rc = zmq_send (dealer, "Hello", 5, 0);
    assert (rc == 5);

74
    //  Connecting sends a zero message
75 76 77 78 79 80 81 82
    //  First frame is identity
    zmq_msg_t identity;
    rc = zmq_msg_init (&identity);
    assert (rc == 0);
    rc = zmq_msg_recv (&identity, stream, 0);
    assert (rc > 0);
    assert (zmq_msg_more (&identity));

83
    // Verify the existence of Peer-Address metadata
84 85 86
    char const* peer_address = zmq_msg_gets (&identity, "Peer-Address");
    assert (peer_address != 0);
    assert (streq (peer_address, "127.0.0.1"));
87

88
    //  Second frame is zero
89 90
    byte buffer [255];
    rc = zmq_recv (stream, buffer, 255, 0);
91
    assert (rc == 0);
92

93
    // Verify the existence of Peer-Address metadata
94 95 96
    peer_address = zmq_msg_gets (&identity, "Peer-Address");
    assert (peer_address != 0);
    assert (streq (peer_address, "127.0.0.1"));
97

98 99 100 101 102 103
    //  Real data follows
    //  First frame is identity
    rc = zmq_msg_recv (&identity, stream, 0);
    assert (rc > 0);
    assert (zmq_msg_more (&identity));

104
    // Verify the existence of Peer-Address metadata
105 106 107
    peer_address = zmq_msg_gets (&identity, "Peer-Address");
    assert (peer_address != 0);
    assert (streq (peer_address, "127.0.0.1"));
108

109
    //  Second frame is greeting signature
110
    rc = zmq_recv (stream, buffer, 255, 0);
111 112 113 114 115 116 117 118 119 120
    assert (rc == 10);
    assert (memcmp (buffer, greeting.signature, 10) == 0);

    //  Send our own protocol greeting
    rc = zmq_msg_send (&identity, stream, ZMQ_SNDMORE);
    assert (rc > 0);
    rc = zmq_send (stream, &greeting, sizeof (greeting), 0);
    assert (rc == sizeof (greeting));

    //  Now we expect the data from the DEALER socket
121
    //  We want the rest of greeting along with the Ready command
Pieter Hintjens's avatar
Pieter Hintjens committed
122 123
    int bytes_read = 0;
    while (bytes_read < 97) {
124 125 126 127 128
        //  First frame is the identity of the connection (each time)
        rc = zmq_msg_recv (&identity, stream, 0);
        assert (rc > 0);
        assert (zmq_msg_more (&identity));
        //  Second frame contains the next chunk of data
Pieter Hintjens's avatar
Pieter Hintjens committed
129 130 131 132
        rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0);
        assert (rc >= 0);
        bytes_read += rc;
    }
133 134 135 136 137 138

    //  First two bytes are major and minor version numbers.
    assert (buffer [0] == 3);       //  ZMTP/3.0
    assert (buffer [1] == 0);

    //  Mechanism is "NULL"
139 140
    assert (memcmp (buffer + 2, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0);
    assert (memcmp (buffer + 54, "\4\51\5READY", 8) == 0);
141 142
    assert (memcmp (buffer + 62, "\13Socket-Type\0\0\0\6DEALER", 22) == 0);
    assert (memcmp (buffer + 84, "\10Identity\0\0\0\0", 13) == 0);
143 144

    //  Announce we are ready
145
    memcpy (buffer, "\4\51\5READY", 8);
146
    memcpy (buffer + 8, "\13Socket-Type\0\0\0\6ROUTER", 22);
147
    memcpy (buffer + 30, "\10Identity\0\0\0\0", 13);
148 149 150 151

    //  Send Ready command
    rc = zmq_msg_send (&identity, stream, ZMQ_SNDMORE);
    assert (rc > 0);
152 153
    rc = zmq_send (stream, buffer, 43, 0);
    assert (rc == 43);
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199

    //  Now we expect the data from the DEALER socket
    //  First frame is, again, the identity of the connection
    rc = zmq_msg_recv (&identity, stream, 0);
    assert (rc > 0);
    assert (zmq_msg_more (&identity));

    //  Third frame contains Hello message from DEALER
    rc = zmq_recv (stream, buffer, sizeof buffer, 0);
    assert (rc == 7);

    //  Then we have a 5-byte message "Hello"
    assert (buffer [0] == 0);       //  Flags = 0
    assert (buffer [1] == 5);       //  Size = 5
    assert (memcmp (buffer + 2, "Hello", 5) == 0);

    //  Send "World" back to DEALER
    rc = zmq_msg_send (&identity, stream, ZMQ_SNDMORE);
    assert (rc > 0);
    byte world [] = { 0, 5, 'W', 'o', 'r', 'l', 'd' };
    rc = zmq_send (stream, world, sizeof (world), 0);
    assert (rc == sizeof (world));

    //  Expect response on DEALER socket
    rc = zmq_recv (dealer, buffer, 255, 0);
    assert (rc == 5);
    assert (memcmp (buffer, "World", 5) == 0);

    rc = zmq_close (dealer);
    assert (rc == 0);

    rc = zmq_close (stream);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
}


static void
test_stream_to_stream (void)
{
    int rc;
    //  Set-up our context and sockets
    void *ctx = zmq_ctx_new ();
    assert (ctx);
200

201 202
    void *server = zmq_socket (ctx, ZMQ_STREAM);
    assert (server);
203 204 205
    int enabled = 1;
    rc = zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
    assert (rc == 0);
206
    rc = zmq_bind (server, "tcp://127.0.0.1:9070");
207 208 209 210
    assert (rc == 0);

    void *client = zmq_socket (ctx, ZMQ_STREAM);
    assert (client);
211 212
    rc = zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled));
    assert (rc == 0);
213
    rc = zmq_connect (client, "tcp://localhost:9070");
214 215 216
    assert (rc == 0);
    uint8_t id [256];
    size_t id_size = 256;
217
    uint8_t buffer [256];
218

219 220 221 222 223 224 225 226 227 228 229 230
    //  Connecting sends a zero message
    //  Server: First frame is identity, second frame is zero
    id_size = zmq_recv (server, id, 256, 0);
    assert (id_size > 0);
    rc = zmq_recv (server, buffer, 256, 0);
    assert (rc == 0);
    //  Client: First frame is identity, second frame is zero
    id_size = zmq_recv (client, id, 256, 0);
    assert (id_size > 0);
    rc = zmq_recv (client, buffer, 256, 0);
    assert (rc == 0);

231
    //  Sent HTTP request on client socket
232 233 234
    //  Get server identity
    rc = zmq_getsockopt (client, ZMQ_IDENTITY, id, &id_size);
    assert (rc == 0);
235 236 237 238 239 240
    //  First frame is server identity
    rc = zmq_send (client, id, id_size, ZMQ_SNDMORE);
    assert (rc == (int) id_size);
    //  Second frame is HTTP GET request
    rc = zmq_send (client, "GET /\n\n", 7, 0);
    assert (rc == 7);
241

242 243 244 245
    //  Get HTTP request; ID frame and then request
    id_size = zmq_recv (server, id, 256, 0);
    assert (id_size > 0);
    rc = zmq_recv (server, buffer, 256, 0);
246
    assert (rc != -1);
247
    assert (memcmp (buffer, "GET /\n\n", 7) == 0);
248

249 250
    //  Send reply back to client
    char http_response [] =
251 252 253
        "HTTP/1.0 200 OK\r\n"
        "Content-Type: text/plain\r\n"
        "\r\n"
254
        "Hello, World!";
255 256 257 258 259 260 261 262 263 264
    rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
    assert (rc != -1);
    rc = zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE);
    assert (rc != -1);

    //  Send zero to close connection to client
    rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
    assert (rc != -1);
    rc = zmq_send (server, NULL, 0, ZMQ_SNDMORE);
    assert (rc != -1);
265 266 267 268 269 270 271

    //  Get reply at client and check that it's complete
    id_size = zmq_recv (client, id, 256, 0);
    assert (id_size > 0);
    rc = zmq_recv (client, buffer, 256, 0);
    assert (rc == sizeof (http_response));
    assert (memcmp (buffer, http_response, sizeof (http_response)) == 0);
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287

    // //  Get disconnection notification
    // FIXME: why does this block? Bug in STREAM disconnect notification?
    // id_size = zmq_recv (client, id, 256, 0);
    // assert (id_size > 0);
    // rc = zmq_recv (client, buffer, 256, 0);
    // assert (rc == 0);

    rc = zmq_close (server);
    assert (rc == 0);

    rc = zmq_close (client);
    assert (rc == 0);

    rc = zmq_ctx_term (ctx);
    assert (rc == 0);
288 289 290 291 292
}


int main (void)
{
293
    setup_test_environment();
294 295
    test_stream_to_dealer ();
    test_stream_to_stream ();
296
}