test_stream_disconnect.cpp 10.5 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3

4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27 28 29 30

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "testutil.hpp"
31 32
#include "testutil_unity.hpp"

33 34
#include <string.h>

35
SETUP_TEARDOWN_TESTCONTEXT
36 37 38 39

static const int SERVER = 0;
static const int CLIENT = 1;

40 41
struct test_message_t
{
42
    int turn;
43
    const char *text;
44 45 46
};

// NOTE: messages are sent without null terminator.
47 48 49 50
const test_message_t dialog[] = {
  {CLIENT, "i can haz cheez burger?"},
  {SERVER, "y u no disonnect?"},
  {CLIENT, ""},
51
};
52
const int steps = sizeof (dialog) / sizeof (dialog[0]);
53

54
bool has_more (void *socket_)
55 56
{
    int more = 0;
57
    size_t more_size = sizeof (more);
58
    int rc = zmq_getsockopt (socket_, ZMQ_RCVMORE, &more, &more_size);
59
    if (rc != 0)
60 61 62 63
        return false;
    return more != 0;
}

64
void test_stream_disconnect ()
65
{
66 67 68
    size_t len = MAX_SOCKET_STRING;
    char bind_endpoint[MAX_SOCKET_STRING];
    char connect_endpoint[MAX_SOCKET_STRING];
69
    void *sockets[2];
70

71
    sockets[SERVER] = test_context_socket (ZMQ_STREAM);
72
    int enabled = 1;
73 74 75 76 77
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
      sockets[SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sockets[SERVER], "tcp://0.0.0.0:*"));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_getsockopt (sockets[SERVER], ZMQ_LAST_ENDPOINT, bind_endpoint, &len));
78 79 80 81

    //  Apparently Windows can't connect to 0.0.0.0. A better fix would be welcome.
#ifdef ZMQ_HAVE_WINDOWS
    sprintf (connect_endpoint, "tcp://127.0.0.1:%s",
82
             strrchr (bind_endpoint, ':') + 1);
83 84 85
#else
    strcpy (connect_endpoint, bind_endpoint);
#endif
86

87 88 89 90
    sockets[CLIENT] = test_context_socket (ZMQ_STREAM);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
      sockets[CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sockets[CLIENT], connect_endpoint));
91

92
    // wait for connect notification
93
    // Server: Grab the 1st frame (peer routing id).
94
    zmq_msg_t peer_frame;
95 96 97 98 99
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&peer_frame, sockets[SERVER], 0));
    TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
    TEST_ASSERT_TRUE (has_more (sockets[SERVER]));
100 101 102

    // Server: Grab the 2nd frame (actual payload).
    zmq_msg_t data_frame;
103 104 105 106
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&data_frame, sockets[SERVER], 0));
    TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&data_frame));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
107

108
    // Client: Grab the 1st frame (peer routing id).
109 110 111 112 113
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&peer_frame, sockets[CLIENT], 0));
    TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
    TEST_ASSERT_TRUE (has_more (sockets[CLIENT]));
114 115

    // Client: Grab the 2nd frame (actual payload).
116 117 118 119
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&data_frame, sockets[CLIENT], 0));
    TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&data_frame));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
120 121

    // Send initial message.
122 123
    char blob_data[256];
    size_t blob_size = sizeof (blob_data);
124 125 126
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_getsockopt (sockets[CLIENT], ZMQ_ROUTING_ID, blob_data, &blob_size));
    TEST_ASSERT_GREATER_THAN (0, blob_size);
127
    zmq_msg_t msg;
128
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, blob_size));
129
    memcpy (zmq_msg_data (&msg), blob_data, blob_size);
130 131 132 133 134
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_msg_init_size (&msg, strlen (dialog[0].text)));
135
    memcpy (zmq_msg_data (&msg), dialog[0].text, strlen (dialog[0].text));
136 137 138
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
139 140 141 142 143 144 145

    // TODO: make sure this loop doesn't loop forever if something is wrong
    //       with the test (or the implementation).

    int step = 0;
    while (step < steps) {
        // Wait until something happens.
146 147 148
        zmq_pollitem_t items[] = {
          {sockets[SERVER], 0, ZMQ_POLLIN, 0},
          {sockets[CLIENT], 0, ZMQ_POLLIN, 0},
149
        };
150
        TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (items, 2, 100));
151 152

        // Check for data received by the server.
153
        if (items[SERVER].revents & ZMQ_POLLIN) {
154
            TEST_ASSERT_EQUAL_INT (CLIENT, dialog[step].turn);
155

156
            // Grab the 1st frame (peer routing id).
157
            zmq_msg_t peer_frame;
158 159 160 161 162
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
            TEST_ASSERT_SUCCESS_ERRNO (
              zmq_msg_recv (&peer_frame, sockets[SERVER], 0));
            TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
            TEST_ASSERT_TRUE (has_more (sockets[SERVER]));
163 164 165

            // Grab the 2nd frame (actual payload).
            zmq_msg_t data_frame;
166 167 168
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
            TEST_ASSERT_SUCCESS_ERRNO (
              zmq_msg_recv (&data_frame, sockets[SERVER], 0));
169 170

            // Make sure payload matches what we expect.
171
            const char *const data = (const char *) zmq_msg_data (&data_frame);
172
            const size_t size = zmq_msg_size (&data_frame);
173 174 175
            // 0-length frame is a disconnection notification.  The server
            // should receive it as the last step in the dialogue.
            if (size == 0) {
176
                ++step;
177
                TEST_ASSERT_EQUAL_INT (steps, step);
178
            } else {
179 180
                TEST_ASSERT_EQUAL_INT (strlen (dialog[step].text), size);
                TEST_ASSERT_EQUAL_STRING_LEN (dialog[step].text, data, size);
181

182
                ++step;
183

184
                TEST_ASSERT_LESS_THAN_INT (steps, step);
185 186

                // Prepare the response.
187 188 189
                TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
                TEST_ASSERT_SUCCESS_ERRNO (
                  zmq_msg_init_size (&data_frame, strlen (dialog[step].text)));
190
                memcpy (zmq_msg_data (&data_frame), dialog[step].text,
191
                        zmq_msg_size (&data_frame));
192 193

                // Send the response.
194 195 196 197
                TEST_ASSERT_SUCCESS_ERRNO (
                  zmq_msg_send (&peer_frame, sockets[SERVER], ZMQ_SNDMORE));
                TEST_ASSERT_SUCCESS_ERRNO (
                  zmq_msg_send (&data_frame, sockets[SERVER], ZMQ_SNDMORE));
198 199 200
            }

            // Release resources.
201 202
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
203 204 205
        }

        // Check for data received by the client.
206
        if (items[CLIENT].revents & ZMQ_POLLIN) {
207
            TEST_ASSERT_EQUAL_INT (SERVER, dialog[step].turn);
208

209
            // Grab the 1st frame (peer routing id).
210
            zmq_msg_t peer_frame;
211 212 213 214 215
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
            TEST_ASSERT_SUCCESS_ERRNO (
              zmq_msg_recv (&peer_frame, sockets[CLIENT], 0));
            TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
            TEST_ASSERT_TRUE (has_more (sockets[CLIENT]));
216 217 218

            // Grab the 2nd frame (actual payload).
            zmq_msg_t data_frame;
219 220 221 222
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
            TEST_ASSERT_SUCCESS_ERRNO (
              zmq_msg_recv (&data_frame, sockets[CLIENT], 0));
            TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&data_frame));
223 224

            // Make sure payload matches what we expect.
225
            const char *const data = (const char *) zmq_msg_data (&data_frame);
226
            const size_t size = zmq_msg_size (&data_frame);
227 228
            TEST_ASSERT_EQUAL_INT (strlen (dialog[step].text), size);
            TEST_ASSERT_EQUAL_STRING_LEN (dialog[step].text, data, size);
229 230 231 232

            ++step;

            // Prepare the response (next line in the dialog).
233 234 235 236
            TEST_ASSERT_LESS_THAN_INT (steps, step);
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
            TEST_ASSERT_SUCCESS_ERRNO (
              zmq_msg_init_size (&data_frame, strlen (dialog[step].text)));
237 238
            memcpy (zmq_msg_data (&data_frame), dialog[step].text,
                    zmq_msg_size (&data_frame));
239 240

            // Send the response.
241 242 243 244
            TEST_ASSERT_SUCCESS_ERRNO (
              zmq_msg_send (&peer_frame, sockets[CLIENT], ZMQ_SNDMORE));
            TEST_ASSERT_SUCCESS_ERRNO (
              zmq_msg_send (&data_frame, sockets[CLIENT], ZMQ_SNDMORE));
245 246

            // Release resources.
247 248
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
            TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
249 250
        }
    }
251 252 253 254 255 256 257 258 259 260 261 262
    TEST_ASSERT_EQUAL_INT (steps, step);
    test_context_socket_close (sockets[CLIENT]);
    test_context_socket_close (sockets[SERVER]);
}

int main (int, char **)
{
    setup_test_environment ();

    UNITY_BEGIN ();
    RUN_TEST (test_stream_disconnect);
    return UNITY_END ();
263
}