test_stream_disconnect.cpp 10.3 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30 31 32 33 34

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"

static const int SERVER = 0;
static const int CLIENT = 1;

35 36
struct test_message_t
{
37
    int turn;
38
    const char *text;
39 40 41
};

// NOTE: messages are sent without null terminator.
42 43 44 45
const test_message_t dialog[] = {
  {CLIENT, "i can haz cheez burger?"},
  {SERVER, "y u no disonnect?"},
  {CLIENT, ""},
46
};
47
const int steps = sizeof (dialog) / sizeof (dialog[0]);
48

49
bool has_more (void *socket)
50 51
{
    int more = 0;
52
    size_t more_size = sizeof (more);
53
    int rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
54
    if (rc != 0)
55 56 57 58
        return false;
    return more != 0;
}

59
bool get_routing_id (void *socket, char *data, size_t *size)
60
{
61
    int rc = zmq_getsockopt (socket, ZMQ_ROUTING_ID, data, size);
62 63 64
    return rc == 0;
}

65
int main (int, char **)
66
{
67
    setup_test_environment ();
68

69 70 71
    size_t len = MAX_SOCKET_STRING;
    char bind_endpoint[MAX_SOCKET_STRING];
    char connect_endpoint[MAX_SOCKET_STRING];
72
    void *context = zmq_ctx_new ();
73
    void *sockets[2];
74 75
    int rc = 0;

76
    sockets[SERVER] = zmq_socket (context, ZMQ_STREAM);
77
    int enabled = 1;
78 79
    rc = zmq_setsockopt (sockets[SERVER], ZMQ_STREAM_NOTIFY, &enabled,
                         sizeof (enabled));
80
    assert (rc == 0);
81
    rc = zmq_bind (sockets[SERVER], "tcp://0.0.0.0:*");
82
    assert (rc == 0);
83 84
    rc =
      zmq_getsockopt (sockets[SERVER], ZMQ_LAST_ENDPOINT, bind_endpoint, &len);
85 86 87 88 89
    assert (rc == 0);

    //  Apparently Windows can't connect to 0.0.0.0. A better fix would be welcome.
#ifdef ZMQ_HAVE_WINDOWS
    sprintf (connect_endpoint, "tcp://127.0.0.1:%s",
90
             strrchr (bind_endpoint, ':') + 1);
91 92 93
#else
    strcpy (connect_endpoint, bind_endpoint);
#endif
94

95 96 97
    sockets[CLIENT] = zmq_socket (context, ZMQ_STREAM);
    rc = zmq_setsockopt (sockets[CLIENT], ZMQ_STREAM_NOTIFY, &enabled,
                         sizeof (enabled));
98
    assert (rc == 0);
99
    rc = zmq_connect (sockets[CLIENT], connect_endpoint);
100 101
    assert (rc == 0);

102
    // wait for connect notification
103
    // Server: Grab the 1st frame (peer routing id).
104 105 106
    zmq_msg_t peer_frame;
    rc = zmq_msg_init (&peer_frame);
    assert (rc == 0);
107
    rc = zmq_msg_recv (&peer_frame, sockets[SERVER], 0);
108
    assert (rc != -1);
109 110
    assert (zmq_msg_size (&peer_frame) > 0);
    assert (has_more (sockets[SERVER]));
111 112
    rc = zmq_msg_close (&peer_frame);
    assert (rc == 0);
113 114 115 116 117

    // Server: Grab the 2nd frame (actual payload).
    zmq_msg_t data_frame;
    rc = zmq_msg_init (&data_frame);
    assert (rc == 0);
118
    rc = zmq_msg_recv (&data_frame, sockets[SERVER], 0);
119
    assert (rc != -1);
120
    assert (zmq_msg_size (&data_frame) == 0);
121 122
    rc = zmq_msg_close (&data_frame);
    assert (rc == 0);
123

124
    // Client: Grab the 1st frame (peer routing id).
125 126
    rc = zmq_msg_init (&peer_frame);
    assert (rc == 0);
127
    rc = zmq_msg_recv (&peer_frame, sockets[CLIENT], 0);
128
    assert (rc != -1);
129 130
    assert (zmq_msg_size (&peer_frame) > 0);
    assert (has_more (sockets[CLIENT]));
131 132
    rc = zmq_msg_close (&peer_frame);
    assert (rc == 0);
133 134 135 136

    // Client: Grab the 2nd frame (actual payload).
    rc = zmq_msg_init (&data_frame);
    assert (rc == 0);
137
    rc = zmq_msg_recv (&data_frame, sockets[CLIENT], 0);
138
    assert (rc != -1);
139
    assert (zmq_msg_size (&data_frame) == 0);
140 141
    rc = zmq_msg_close (&data_frame);
    assert (rc == 0);
142 143

    // Send initial message.
144 145 146 147
    char blob_data[256];
    size_t blob_size = sizeof (blob_data);
    rc =
      zmq_getsockopt (sockets[CLIENT], ZMQ_ROUTING_ID, blob_data, &blob_size);
148
    assert (rc != -1);
149
    assert (blob_size > 0);
150
    zmq_msg_t msg;
151 152
    rc = zmq_msg_init_size (&msg, blob_size);
    assert (rc == 0);
153
    memcpy (zmq_msg_data (&msg), blob_data, blob_size);
154
    rc = zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE);
155 156 157
    assert (rc != -1);
    rc = zmq_msg_close (&msg);
    assert (rc == 0);
158
    rc = zmq_msg_init_size (&msg, strlen (dialog[0].text));
159
    assert (rc == 0);
160 161
    memcpy (zmq_msg_data (&msg), dialog[0].text, strlen (dialog[0].text));
    rc = zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE);
162 163 164
    assert (rc != -1);
    rc = zmq_msg_close (&msg);
    assert (rc == 0);
165 166 167 168 169 170 171

    // TODO: make sure this loop doesn't loop forever if something is wrong
    //       with the test (or the implementation).

    int step = 0;
    while (step < steps) {
        // Wait until something happens.
172 173 174
        zmq_pollitem_t items[] = {
          {sockets[SERVER], 0, ZMQ_POLLIN, 0},
          {sockets[CLIENT], 0, ZMQ_POLLIN, 0},
175 176 177 178 179
        };
        int rc = zmq_poll (items, 2, 100);
        assert (rc >= 0);

        // Check for data received by the server.
180 181
        if (items[SERVER].revents & ZMQ_POLLIN) {
            assert (dialog[step].turn == CLIENT);
182

183
            // Grab the 1st frame (peer routing id).
184
            zmq_msg_t peer_frame;
185
            rc = zmq_msg_init (&peer_frame);
186
            assert (rc == 0);
187
            rc = zmq_msg_recv (&peer_frame, sockets[SERVER], 0);
188
            assert (rc != -1);
189 190
            assert (zmq_msg_size (&peer_frame) > 0);
            assert (has_more (sockets[SERVER]));
191 192 193

            // Grab the 2nd frame (actual payload).
            zmq_msg_t data_frame;
194
            rc = zmq_msg_init (&data_frame);
195
            assert (rc == 0);
196
            rc = zmq_msg_recv (&data_frame, sockets[SERVER], 0);
197
            assert (rc != -1);
198 199

            // Make sure payload matches what we expect.
200
            const char *const data = (const char *) zmq_msg_data (&data_frame);
201
            const size_t size = zmq_msg_size (&data_frame);
202 203 204
            // 0-length frame is a disconnection notification.  The server
            // should receive it as the last step in the dialogue.
            if (size == 0) {
205
                ++step;
206
                assert (step == steps);
207
            } else {
208
                assert (size == strlen (dialog[step].text));
209
                int cmp = memcmp (dialog[step].text, data, size);
210
                assert (cmp == 0);
211

212
                ++step;
213

214 215 216
                assert (step < steps);

                // Prepare the response.
217
                rc = zmq_msg_close (&data_frame);
218
                assert (rc == 0);
219 220
                rc =
                  zmq_msg_init_size (&data_frame, strlen (dialog[step].text));
221
                assert (rc == 0);
222
                memcpy (zmq_msg_data (&data_frame), dialog[step].text,
223
                        zmq_msg_size (&data_frame));
224 225

                // Send the response.
226
                rc = zmq_msg_send (&peer_frame, sockets[SERVER], ZMQ_SNDMORE);
227
                assert (rc != -1);
228
                rc = zmq_msg_send (&data_frame, sockets[SERVER], ZMQ_SNDMORE);
229
                assert (rc != -1);
230 231 232
            }

            // Release resources.
233
            rc = zmq_msg_close (&peer_frame);
234
            assert (rc == 0);
235
            rc = zmq_msg_close (&data_frame);
236
            assert (rc == 0);
237 238 239
        }

        // Check for data received by the client.
240 241
        if (items[CLIENT].revents & ZMQ_POLLIN) {
            assert (dialog[step].turn == SERVER);
242

243
            // Grab the 1st frame (peer routing id).
244
            zmq_msg_t peer_frame;
245
            rc = zmq_msg_init (&peer_frame);
246
            assert (rc == 0);
247
            rc = zmq_msg_recv (&peer_frame, sockets[CLIENT], 0);
248
            assert (rc != -1);
249 250
            assert (zmq_msg_size (&peer_frame) > 0);
            assert (has_more (sockets[CLIENT]));
251 252 253

            // Grab the 2nd frame (actual payload).
            zmq_msg_t data_frame;
254
            rc = zmq_msg_init (&data_frame);
255
            assert (rc == 0);
256
            rc = zmq_msg_recv (&data_frame, sockets[CLIENT], 0);
257
            assert (rc != -1);
258
            assert (zmq_msg_size (&data_frame) > 0);
259 260

            // Make sure payload matches what we expect.
261
            const char *const data = (const char *) zmq_msg_data (&data_frame);
262 263
            const size_t size = zmq_msg_size (&data_frame);
            assert (size == strlen (dialog[step].text));
264
            int cmp = memcmp (dialog[step].text, data, size);
265 266 267 268 269 270
            assert (cmp == 0);

            ++step;

            // Prepare the response (next line in the dialog).
            assert (step < steps);
271
            rc = zmq_msg_close (&data_frame);
272
            assert (rc == 0);
273
            rc = zmq_msg_init_size (&data_frame, strlen (dialog[step].text));
274
            assert (rc == 0);
275 276
            memcpy (zmq_msg_data (&data_frame), dialog[step].text,
                    zmq_msg_size (&data_frame));
277 278

            // Send the response.
279
            rc = zmq_msg_send (&peer_frame, sockets[CLIENT], ZMQ_SNDMORE);
280
            assert (rc != -1);
281
            rc = zmq_msg_send (&data_frame, sockets[CLIENT], ZMQ_SNDMORE);
282
            assert (rc != -1);
283 284

            // Release resources.
285
            rc = zmq_msg_close (&peer_frame);
286
            assert (rc == 0);
287
            rc = zmq_msg_close (&data_frame);
288
            assert (rc == 0);
289 290 291
        }
    }
    assert (step == steps);
292
    rc = zmq_close (sockets[CLIENT]);
293
    assert (rc == 0);
294
    rc = zmq_close (sockets[SERVER]);
295 296 297
    assert (rc == 0);
    rc = zmq_ctx_term (context);
    assert (rc == 0);
298 299
    return 0;
}