test_immediate.cpp 7.47 KB
Newer Older
1
/*
2
    Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
    
4
    This file is part of libzmq, the ZeroMQ core engine in C++.
5

6 7 8
    libzmq is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License (LGPL) as published
    by the Free Software Foundation; either version 3 of the License, or
9 10
    (at your option) any later version.

11 12 13 14 15 16 17 18 19 20 21 22 23 24
    As a special exception, the Contributors give you permission to link
    this library with independent modules to produce an executable,
    regardless of the license terms of these independent modules, and to
    copy and distribute the resulting executable under terms of your choice,
    provided that you also meet, for each linked independent module, the
    terms and conditions of the license of that module. An independent
    module is a module which is not derived from or based on this library.
    If you modify this library, you must extend this exception to your
    version of the library.

    libzmq is distributed in the hope that it will be useful, but WITHOUT
    ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
    License for more details.
25 26 27

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
28 29
*/

30
#include "testutil.hpp"
31
#include "testutil_unity.hpp"
32

33
SETUP_TEARDOWN_TESTCONTEXT
34 35

void test_immediate_1 ()
36 37 38 39
{
    int val;
    int rc;
    char buffer[16];
40 41
    size_t len = MAX_SOCKET_STRING;
    char my_endpoint[MAX_SOCKET_STRING];
42
    // TEST 1.
43 44 45 46
    // First we're going to attempt to send messages to two
    // pipes, one connected, the other not. We should see
    // the PUSH load balancing to both pipes, and hence half
    // of the messages getting queued, as connect() creates a
47 48
    // pipe immediately.

49
    void *to = test_context_socket (ZMQ_PULL);
50

51
    // Bind the one valid receiver
52
    val = 0;
53 54 55
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
    bind_loopback_ipv4 (to, my_endpoint, len);
56 57

    // Create a socket pushing to two endpoints - only 1 message should arrive.
58
    void *from = test_context_socket (ZMQ_PUSH);
59 60

    val = 0;
61 62 63 64
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
    // This pipe will not connect (provided the ephemeral port is not 5556)
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tcp://localhost:5556"));
65
    // This pipe will
66
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, my_endpoint));
67

68 69
    msleep (SETTLE_TIME);

70 71
    // We send 10 messages, 5 should just get stuck in the queue
    // for the not-yet-connected pipe
72
    for (int i = 0; i < 10; ++i) {
73
        send_string_expect_success (from, "Hello", 0);
74 75
    }

76 77
    // We now consume from the connected pipe
    // - we should see just 5
78
    int timeout = 250;
79 80
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
81 82 83 84 85

    int seen = 0;
    while (true) {
        rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
        if (rc == -1)
86
            break; //  Break when we didn't get a message
87 88
        seen++;
    }
89
    TEST_ASSERT_EQUAL_INT (5, seen);
90

91 92 93
    test_context_socket_close (from);
    test_context_socket_close (to);
}
94 95


96 97
void test_immediate_2 ()
{
98 99 100 101
    // This time we will do the same thing, connect two pipes,
    // one of which will succeed in connecting to a bound
    // receiver, the other of which will fail. However, we will
    // also set the delay attach on connect flag, which should
102
    // cause the pipe attachment to be delayed until the connection
103
    // succeeds.
104

105
    // Bind the valid socket
106 107 108 109
    void *to = test_context_socket (ZMQ_PULL);
    size_t len = MAX_SOCKET_STRING;
    char my_endpoint[MAX_SOCKET_STRING];
    bind_loopback_ipv4 (to, my_endpoint, len);
110

111 112 113
    int val = 0;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
114 115

    // Create a socket pushing to two endpoints - all messages should arrive.
116
    void *from = test_context_socket (ZMQ_PUSH);
117 118

    val = 0;
119 120
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
121

122
    // Set the key flag
123
    val = 1;
124 125
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (from, ZMQ_IMMEDIATE, &val, sizeof (val)));
126

127
    // Connect to the invalid socket
128
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tcp://localhost:5561"));
129
    // Connect to the valid socket
130
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, my_endpoint));
131

132
    // Send 10 messages, all should be routed to the connected pipe
133
    for (int i = 0; i < 10; ++i) {
134
        send_string_expect_success (from, "Hello", 0);
135
    }
136 137 138
    int timeout = 250;
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
139

140
    int seen = 0;
141
    while (true) {
142 143
        char buffer[16];
        int rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
144
        if (rc == -1)
145
            break; //  Break when we didn't get a message
146
        seen++;
147
    }
148
    TEST_ASSERT_EQUAL_INT (10, seen);
149

150 151 152
    test_context_socket_close (from);
    test_context_socket_close (to);
}
153

154 155
void test_immediate_3 ()
{
156 157
    // This time we want to validate that the same blocking behaviour
    // occurs with an existing connection that is broken. We will send
158
    // messages to a connected pipe, disconnect and verify the messages
159
    // block. Then we reconnect and verify messages flow again.
160 161
    void *backend = test_context_socket (ZMQ_DEALER);
    void *frontend = test_context_socket (ZMQ_DEALER);
162

163
    int zero = 0;
164 165 166 167
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero)));
168

169
    //  Frontend connects to backend using IMMEDIATE
170
    int on = 1;
171 172 173 174 175 176 177 178
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (frontend, ZMQ_IMMEDIATE, &on, sizeof (on)));

    size_t len = MAX_SOCKET_STRING;
    char my_endpoint[MAX_SOCKET_STRING];
    bind_loopback_ipv4 (backend, my_endpoint, len);

    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (frontend, my_endpoint));
179 180

    //  Ping backend to frontend so we know when the connection is up
181 182
    send_string_expect_success (backend, "Hello", 0);
    recv_string_expect_success (frontend, "Hello", 0);
183

184
    // Send message from frontend to backend
185
    send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
186

187
    test_context_socket_close (backend);
188

189
    //  Give time to process disconnect
190
    msleep (SETTLE_TIME * 10);
191

192
    // Send a message, should fail
193 194
    TEST_ASSERT_FAILURE_ERRNO (EAGAIN,
                               zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT));
195

196
    //  Recreate backend socket
197 198 199 200
    backend = test_context_socket (ZMQ_DEALER);
    TEST_ASSERT_SUCCESS_ERRNO (
      zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, my_endpoint));
201 202

    //  Ping backend to frontend so we know when the connection is up
203 204
    send_string_expect_success (backend, "Hello", 0);
    recv_string_expect_success (frontend, "Hello", 0);
205

206
    // After the reconnect, should succeed
207
    send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
208

209 210 211
    test_context_socket_close (backend);
    test_context_socket_close (frontend);
}
212

213 214 215 216 217 218 219 220
int main (void)
{
    setup_test_environment ();
    UNITY_BEGIN ();
    RUN_TEST (test_immediate_1);
    RUN_TEST (test_immediate_2);
    RUN_TEST (test_immediate_3);
    return UNITY_END ();
221
}