test_stream.cpp 11.7 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

30
#include "testutil.hpp"
31 32
#include "testutil_unity.hpp"

33 34
#include <string.h>

35
SETUP_TEARDOWN_TESTCONTEXT
36 37 38

//  ZMTP protocol greeting structure

39
typedef uint8_t byte;
40 41 42
typedef struct
{
    byte signature[10]; //  0xFF 8*0x00 0x7F
43
    byte version[2];    //  0x03 0x01 for ZMTP/3.1
44
    byte mechanism[20]; //  "NULL"
45
    byte as_server;
46
    byte filler[31];
47 48
} zmtp_greeting_t;

49
#define ZMTP_DEALER 5 //  Socket type constants
50 51 52 53

//  This is a greeting matching what 0MQ will send us; note the
//  8-byte size is set to 1 for backwards compatibility

54
static zmtp_greeting_t greeting = {
55
  {0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 1}, {'N', 'U', 'L', 'L'}, 0, {0}};
56

57
static void test_stream_to_dealer ()
58 59
{
    int rc;
60
    char my_endpoint[MAX_SOCKET_STRING];
61 62

    //  We'll be using this socket in raw mode
63
    void *stream = test_context_socket (ZMQ_STREAM);
64 65

    int zero = 0;
66 67
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)));
68
    int enabled = 1;
69 70 71
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
    bind_loopback_ipv4 (stream, my_endpoint, sizeof my_endpoint);
72 73

    //  We'll be using this socket as the other peer
74 75 76 77
    void *dealer = test_context_socket (ZMQ_DEALER);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero)));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint));
78 79

    //  Send a message on the dealer socket
80
    send_string_expect_success (dealer, "Hello", 0);
81

82
    //  Connecting sends a zero message
83 84
    //  First frame is routing id
    zmq_msg_t routing_id;
85 86 87
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&routing_id));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0));
    TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
88

89
    //  Verify the existence of Peer-Address metadata
90
    char const *peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
91 92
    TEST_ASSERT_NOT_NULL (peer_address);
    TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
93

94
    //  Second frame is zero
95
    byte buffer[255];
96 97
    TEST_ASSERT_EQUAL_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 255, 0)));
98

99
    //  Verify the existence of Peer-Address metadata
100
    peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
101 102
    TEST_ASSERT_NOT_NULL (peer_address);
    TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
103

104
    //  Real data follows
105
    //  First frame is routing id
106 107
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0));
    TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
108

109
    //  Verify the existence of Peer-Address metadata
110
    peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
111 112
    TEST_ASSERT_NOT_NULL (peer_address);
    TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
113

114
    //  Second frame is greeting signature
115
    recv_array_expect_success (stream, greeting.signature, 0);
116 117

    //  Send our own protocol greeting
118 119 120 121
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE));
    TEST_ASSERT_EQUAL_INT (
      sizeof (greeting), TEST_ASSERT_SUCCESS_ERRNO (
                           zmq_send (stream, &greeting, sizeof (greeting), 0)));
122 123

    //  Now we expect the data from the DEALER socket
124
    //  We want the rest of greeting along with the Ready command
Pieter Hintjens's avatar
Pieter Hintjens committed
125 126
    int bytes_read = 0;
    while (bytes_read < 97) {
127
        //  First frame is the routing id of the connection (each time)
128 129 130
        TEST_ASSERT_GREATER_THAN_INT (
          0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)));
        TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
131
        //  Second frame contains the next chunk of data
132 133
        TEST_ASSERT_SUCCESS_ERRNO (
          rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0));
Pieter Hintjens's avatar
Pieter Hintjens committed
134 135
        bytes_read += rc;
    }
136 137

    //  First two bytes are major and minor version numbers.
138 139
    TEST_ASSERT_EQUAL_INT (3, buffer[0]); //  ZMTP/3.1
    TEST_ASSERT_EQUAL_INT (1, buffer[1]);
140 141

    //  Mechanism is "NULL"
142 143 144 145 146 147
    TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2,
                                  "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20);
    TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 54, "\4\51\5READY", 8);
    TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 62, "\13Socket-Type\0\0\0\6DEALER",
                                  22);
    TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 84, "\10Identity\0\0\0\0", 13);
148 149

    //  Announce we are ready
150
    memcpy (buffer, "\4\51\5READY", 8);
151
    memcpy (buffer + 8, "\13Socket-Type\0\0\0\6ROUTER", 22);
152
    memcpy (buffer + 30, "\10Identity\0\0\0\0", 13);
153 154

    //  Send Ready command
155 156 157 158
    TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (
                                       &routing_id, stream, ZMQ_SNDMORE)));
    TEST_ASSERT_EQUAL_INT (
      43, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, buffer, 43, 0)));
159 160

    //  Now we expect the data from the DEALER socket
161
    //  First frame is, again, the routing id of the connection
162 163 164
    TEST_ASSERT_GREATER_THAN_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)));
    TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
165 166

    //  Third frame contains Hello message from DEALER
167 168
    TEST_ASSERT_EQUAL_INT (7, TEST_ASSERT_SUCCESS_ERRNO (
                                zmq_recv (stream, buffer, sizeof buffer, 0)));
169 170

    //  Then we have a 5-byte message "Hello"
171 172 173
    TEST_ASSERT_EQUAL_INT (0, buffer[0]); //  Flags = 0
    TEST_ASSERT_EQUAL_INT (5, buffer[1]); //  Size = 5
    TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2, "Hello", 5);
174 175

    //  Send "World" back to DEALER
176 177
    TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (
                                       &routing_id, stream, ZMQ_SNDMORE)));
178
    byte world[] = {0, 5, 'W', 'o', 'r', 'l', 'd'};
179 180 181
    TEST_ASSERT_EQUAL_INT (
      sizeof (world),
      TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, world, sizeof (world), 0)));
182 183

    //  Expect response on DEALER socket
184
    recv_string_expect_success (dealer, "World", 0);
185

186
    //  Test large messages over STREAM socket
187 188
#define size 64000
    uint8_t msgout[size];
189 190 191
    memset (msgout, 0xAB, size);
    zmq_send (dealer, msgout, size, 0);

192
    uint8_t msgin[9 + size];
193 194 195
    memset (msgin, 0, 9 + size);
    bytes_read = 0;
    while (bytes_read < 9 + size) {
196
        //  Get routing id frame
197 198
        TEST_ASSERT_GREATER_THAN_INT (
          0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 256, 0)));
199
        //  Get next chunk
200 201 202 203
        TEST_ASSERT_GREATER_THAN_INT (
          0,
          TEST_ASSERT_SUCCESS_ERRNO (rc = zmq_recv (stream, msgin + bytes_read,
                                                    9 + size - bytes_read, 0)));
204 205
        bytes_read += rc;
    }
206 207
    for (int byte_nbr = 0; byte_nbr < size; byte_nbr++) {
        TEST_ASSERT_EQUAL_UINT8 (0xAB, msgin[9 + byte_nbr]);
208
    }
209 210
    test_context_socket_close (dealer);
    test_context_socket_close (stream);
211 212 213
}


214
static void test_stream_to_stream ()
215
{
216
    char my_endpoint[MAX_SOCKET_STRING];
217
    //  Set-up our context and sockets
218

219
    void *server = test_context_socket (ZMQ_STREAM);
220
    int enabled = 1;
221 222 223 224 225 226 227 228
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
    bind_loopback_ipv4 (server, my_endpoint, sizeof my_endpoint);

    void *client = test_context_socket (ZMQ_STREAM);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
229 230
    uint8_t id[256];
    uint8_t buffer[256];
231

232
    //  Connecting sends a zero message
233
    //  Server: First frame is routing id, second frame is zero
234 235 236 237
    TEST_ASSERT_GREATER_THAN_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0)));
    TEST_ASSERT_EQUAL_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0)));
238
    //  Client: First frame is routing id, second frame is zero
239 240 241 242
    TEST_ASSERT_GREATER_THAN_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0)));
    TEST_ASSERT_EQUAL_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0)));
243

244
    //  Sent HTTP request on client socket
245
    //  Get server routing id
246 247 248
    size_t id_size = sizeof id;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size));
249
    //  First frame is server routing id
250 251
    TEST_ASSERT_EQUAL_INT ((int) id_size, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (
                                            client, id, id_size, ZMQ_SNDMORE)));
252
    //  Second frame is HTTP GET request
253 254
    TEST_ASSERT_EQUAL_INT (
      7, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (client, "GET /\n\n", 7, 0)));
255

256
    //  Get HTTP request; ID frame and then request
257 258 259 260
    TEST_ASSERT_GREATER_THAN_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0)));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0));
    TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, "GET /\n\n", 7);
261

262
    //  Send reply back to client
263 264 265 266
    char http_response[] = "HTTP/1.0 200 OK\r\n"
                           "Content-Type: text/plain\r\n"
                           "\r\n"
                           "Hello, World!";
267 268 269
    TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE));
270 271

    //  Send zero to close connection to client
272 273
    TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, NULL, 0, ZMQ_SNDMORE));
274 275

    //  Get reply at client and check that it's complete
276 277 278 279 280 281 282
    TEST_ASSERT_GREATER_THAN_INT (
      0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0)));
    TEST_ASSERT_EQUAL_INT (
      sizeof http_response,
      TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0)));
    TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, http_response,
                                  sizeof (http_response));
283 284 285 286 287 288 289 290

    // //  Get disconnection notification
    // FIXME: why does this block? Bug in STREAM disconnect notification?
    // id_size = zmq_recv (client, id, 256, 0);
    // assert (id_size > 0);
    // rc = zmq_recv (client, buffer, 256, 0);
    // assert (rc == 0);

291 292
    test_context_socket_close (server);
    test_context_socket_close (client);
293 294
}

295
int main ()
296
{
297
    setup_test_environment ();
298 299 300 301 302

    UNITY_BEGIN ();
    RUN_TEST (test_stream_to_dealer);
    RUN_TEST (test_stream_to_stream);
    return UNITY_END ();
303
}