norm_engine.cpp 25.9 KB
Newer Older
bebopagogo's avatar
bebopagogo committed
1

2 3
#include "precompiled.hpp"

bebopagogo's avatar
bebopagogo committed
4 5 6 7 8 9 10 11
#include "platform.hpp"

#if defined ZMQ_HAVE_NORM

#include "norm_engine.hpp"
#include "session_base.hpp"
#include "v2_protocol.hpp"

12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_,
                                   const options_t &options_) :
    io_object_t (parent_),
    zmq_session (NULL),
    options (options_),
    norm_instance (NORM_INSTANCE_INVALID),
    norm_session (NORM_SESSION_INVALID),
    is_sender (false),
    is_receiver (false),
    zmq_encoder (0),
    norm_tx_stream (NORM_OBJECT_INVALID),
    tx_first_msg (true),
    tx_more_bit (false),
    zmq_output_ready (false),
    norm_tx_ready (false),
    tx_index (0),
    tx_len (0),
    zmq_input_ready (false)
bebopagogo's avatar
bebopagogo committed
30
{
31 32
    int rc = tx_msg.init ();
    errno_assert (0 == rc);
bebopagogo's avatar
bebopagogo committed
33 34
}

35
zmq::norm_engine_t::~norm_engine_t ()
bebopagogo's avatar
bebopagogo committed
36
{
37
    shutdown (); // in case it was not already called
bebopagogo's avatar
bebopagogo committed
38 39 40
}


41
int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
bebopagogo's avatar
bebopagogo committed
42 43 44 45
{
    // Parse the "network_" address int "iface", "addr", and "port"
    // norm endpoint format: [id,][<iface>;]<addr>:<port>
    // First, look for optional local NormNodeId
46
    // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId)
bebopagogo's avatar
bebopagogo committed
47
    NormNodeId localId = NORM_NODE_ANY;
48 49
    const char *ifacePtr = strchr (network_, ',');
    if (NULL != ifacePtr) {
bebopagogo's avatar
bebopagogo committed
50
        size_t idLen = ifacePtr - network_;
51 52
        if (idLen > 31)
            idLen = 31;
bebopagogo's avatar
bebopagogo committed
53
        char idText[32];
54
        strncpy (idText, network_, idLen);
bebopagogo's avatar
bebopagogo committed
55
        idText[idLen] = '\0';
56
        localId = (NormNodeId) atoi (idText);
bebopagogo's avatar
bebopagogo committed
57
        ifacePtr++;
58
    } else {
bebopagogo's avatar
bebopagogo committed
59 60
        ifacePtr = network_;
    }
61

bebopagogo's avatar
bebopagogo committed
62 63
    // Second, look for optional multicast ifaceName
    char ifaceName[256];
64 65
    const char *addrPtr = strchr (ifacePtr, ';');
    if (NULL != addrPtr) {
bebopagogo's avatar
bebopagogo committed
66
        size_t ifaceLen = addrPtr - ifacePtr;
67 68 69
        if (ifaceLen > 255)
            ifaceLen = 255; // return error instead?
        strncpy (ifaceName, ifacePtr, ifaceLen);
bebopagogo's avatar
bebopagogo committed
70 71 72
        ifaceName[ifaceLen] = '\0';
        ifacePtr = ifaceName;
        addrPtr++;
73
    } else {
bebopagogo's avatar
bebopagogo committed
74 75 76
        addrPtr = ifacePtr;
        ifacePtr = NULL;
    }
77

bebopagogo's avatar
bebopagogo committed
78
    // Finally, parse IP address and port number
79 80
    const char *portPtr = strrchr (addrPtr, ':');
    if (NULL == portPtr) {
bebopagogo's avatar
bebopagogo committed
81 82 83
        errno = EINVAL;
        return -1;
    }
84

bebopagogo's avatar
bebopagogo committed
85 86
    char addr[256];
    size_t addrLen = portPtr - addrPtr;
87 88 89
    if (addrLen > 255)
        addrLen = 255;
    strncpy (addr, addrPtr, addrLen);
bebopagogo's avatar
bebopagogo committed
90 91
    addr[addrLen] = '\0';
    portPtr++;
92
    unsigned short portNumber = atoi (portPtr);
93

94 95
    if (NORM_INSTANCE_INVALID == norm_instance) {
        if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance ())) {
bebopagogo's avatar
bebopagogo committed
96 97 98 99
            // errno set by whatever caused NormCreateInstance() to fail
            return -1;
        }
    }
100

bebopagogo's avatar
bebopagogo committed
101 102
    // TBD - What do we use for our local NormNodeId?
    //       (for now we use automatic, IP addr based assignment or passed in 'id')
103 104 105
    //       a) Use ZMQ Identity somehow?
    //       b) Add function to use iface addr
    //       c) Randomize and implement a NORM session layer
bebopagogo's avatar
bebopagogo committed
106
    //          conflict detection/resolution protocol
107

108 109
    norm_session = NormCreateSession (norm_instance, addr, portNumber, localId);
    if (NORM_SESSION_INVALID == norm_session) {
bebopagogo's avatar
bebopagogo committed
110
        int savedErrno = errno;
111
        NormDestroyInstance (norm_instance);
bebopagogo's avatar
bebopagogo committed
112 113 114 115
        norm_instance = NORM_INSTANCE_INVALID;
        errno = savedErrno;
        return -1;
    }
116
    // There's many other useful NORM options that could be applied here
117 118 119
    if (NormIsUnicastAddress (addr)) {
        NormSetDefaultUnicastNack (norm_session, true);
    } else {
bebopagogo's avatar
bebopagogo committed
120 121
        // These only apply for multicast sessions
        //NormSetTTL(norm_session, options.multicast_hops);  // ZMQ default is 1
122 123 124 125 126 127 128 129 130
        NormSetTTL (
          norm_session,
          255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported
        NormSetRxPortReuse (
          norm_session,
          true); // port reuse doesn't work for non-connected unicast
        NormSetLoopback (norm_session,
                         true); // needed when multicast users on same machine
        if (NULL != ifacePtr) {
bebopagogo's avatar
bebopagogo committed
131 132
            // Note a bad interface may not be caught until sender or receiver start
            // (Since sender/receiver is not yet started, this always succeeds here)
133
            NormSetMulticastInterface (norm_session, ifacePtr);
bebopagogo's avatar
bebopagogo committed
134 135
        }
    }
136

137
    if (recv) {
138 139 140
        // The alternative NORM_SYNC_CURRENT here would provide "instant"
        // receiver sync to the sender's _current_ message transmission.
        // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
141 142
        NormSetDefaultSyncPolicy (norm_session, NORM_SYNC_STREAM);
        if (!NormStartReceiver (norm_session, 2 * 1024 * 1024)) {
bebopagogo's avatar
bebopagogo committed
143 144
            // errno set by whatever failed
            int savedErrno = errno;
145
            NormDestroyInstance (norm_instance); // session gets closed, too
bebopagogo's avatar
bebopagogo committed
146 147 148 149 150 151 152
            norm_session = NORM_SESSION_INVALID;
            norm_instance = NORM_INSTANCE_INVALID;
            errno = savedErrno;
            return -1;
        }
        is_receiver = true;
    }
153

154
    if (send) {
bebopagogo's avatar
bebopagogo committed
155
        // Pick a random sender instance id (aka norm sender session id)
156
        NormSessionId instanceId = NormGetRandomSessionId ();
bebopagogo's avatar
bebopagogo committed
157
        // TBD - provide "options" for some NORM sender parameters
158 159
        if (!NormStartSender (norm_session, instanceId, 2 * 1024 * 1024, 1400,
                              16, 4)) {
bebopagogo's avatar
bebopagogo committed
160 161
            // errno set by whatever failed
            int savedErrno = errno;
162
            NormDestroyInstance (norm_instance); // session gets closed, too
bebopagogo's avatar
bebopagogo committed
163 164 165 166
            norm_session = NORM_SESSION_INVALID;
            norm_instance = NORM_INSTANCE_INVALID;
            errno = savedErrno;
            return -1;
167
        }
168
        NormSetCongestionControl (norm_session, true);
bebopagogo's avatar
bebopagogo committed
169
        norm_tx_ready = true;
170
        is_sender = true;
171 172 173
        if (NORM_OBJECT_INVALID
            == (norm_tx_stream =
                  NormStreamOpen (norm_session, 2 * 1024 * 1024))) {
bebopagogo's avatar
bebopagogo committed
174 175
            // errno set by whatever failed
            int savedErrno = errno;
176
            NormDestroyInstance (norm_instance); // session gets closed, too
bebopagogo's avatar
bebopagogo committed
177 178 179 180 181 182
            norm_session = NORM_SESSION_INVALID;
            norm_instance = NORM_INSTANCE_INVALID;
            errno = savedErrno;
            return -1;
        }
    }
183

184 185 186
    //NormSetMessageTrace(norm_session, true);
    //NormSetDebugLevel(3);
    //NormOpenDebugLog(norm_instance, "normLog.txt");
187

188 189
    return 0; // no error
} // end zmq::norm_engine_t::init()
bebopagogo's avatar
bebopagogo committed
190

191
void zmq::norm_engine_t::shutdown ()
bebopagogo's avatar
bebopagogo committed
192 193
{
    // TBD - implement a more graceful shutdown option
194 195
    if (is_receiver) {
        NormStopReceiver (norm_session);
196

bebopagogo's avatar
bebopagogo committed
197
        // delete any active NormRxStreamState
198 199 200
        rx_pending_list.Destroy ();
        rx_ready_list.Destroy ();
        msg_ready_list.Destroy ();
201

bebopagogo's avatar
bebopagogo committed
202 203
        is_receiver = false;
    }
204 205
    if (is_sender) {
        NormStopSender (norm_session);
bebopagogo's avatar
bebopagogo committed
206 207
        is_sender = false;
    }
208 209
    if (NORM_SESSION_INVALID != norm_session) {
        NormDestroySession (norm_session);
bebopagogo's avatar
bebopagogo committed
210 211
        norm_session = NORM_SESSION_INVALID;
    }
212 213 214
    if (NORM_INSTANCE_INVALID != norm_instance) {
        NormStopInstance (norm_instance);
        NormDestroyInstance (norm_instance);
bebopagogo's avatar
bebopagogo committed
215 216
        norm_instance = NORM_INSTANCE_INVALID;
    }
217
} // end zmq::norm_engine_t::shutdown()
bebopagogo's avatar
bebopagogo committed
218

219 220
void zmq::norm_engine_t::plug (io_thread_t *io_thread_,
                               session_base_t *session_)
bebopagogo's avatar
bebopagogo committed
221 222 223
{
    // TBD - we may assign the NORM engine to an io_thread in the future???
    zmq_session = session_;
224 225 226 227
    if (is_sender)
        zmq_output_ready = true;
    if (is_receiver)
        zmq_input_ready = true;
228

229 230
    fd_t normDescriptor = NormGetDescriptor (norm_instance);
    norm_descriptor_handle = add_fd (normDescriptor);
bebopagogo's avatar
bebopagogo committed
231
    // Set POLLIN for notification of pending NormEvents
232
    set_pollin (norm_descriptor_handle);
233

234 235
    if (is_sender)
        send_data ();
236

237
} // end zmq::norm_engine_t::init()
bebopagogo's avatar
bebopagogo committed
238

239
void zmq::norm_engine_t::unplug ()
bebopagogo's avatar
bebopagogo committed
240
{
241
    rm_fd (norm_descriptor_handle);
242

bebopagogo's avatar
bebopagogo committed
243
    zmq_session = NULL;
244
} // end zmq::norm_engine_t::unplug()
bebopagogo's avatar
bebopagogo committed
245

246
void zmq::norm_engine_t::terminate ()
bebopagogo's avatar
bebopagogo committed
247
{
248 249
    unplug ();
    shutdown ();
bebopagogo's avatar
bebopagogo committed
250 251 252
    delete this;
}

253
void zmq::norm_engine_t::restart_output ()
bebopagogo's avatar
bebopagogo committed
254 255 256
{
    // There's new message data available from the session
    zmq_output_ready = true;
257 258
    if (norm_tx_ready)
        send_data ();
259

260
} // end zmq::norm_engine_t::restart_output()
bebopagogo's avatar
bebopagogo committed
261

262
void zmq::norm_engine_t::send_data ()
bebopagogo's avatar
bebopagogo committed
263 264
{
    // Here we write as much as is available or we can
265 266
    while (zmq_output_ready && norm_tx_ready) {
        if (0 == tx_len) {
267 268 269
            // Our tx_buffer needs data to send
            // Get more data from encoder
            size_t space = BUFFER_SIZE;
270 271 272 273
            unsigned char *bufPtr = (unsigned char *) tx_buffer;
            tx_len = zmq_encoder.encode (&bufPtr, space);
            if (0 == tx_len) {
                if (tx_first_msg) {
274 275
                    // We don't need to mark eom/flush until a message is sent
                    tx_first_msg = false;
276
                } else {
277 278 279 280 281 282 283
                    // A prior message was completely written to stream, so
                    // mark end-of-message and possibly flush (to force packet transmission,
                    // even if it's not a full segment so message gets delivered quickly)
                    // NormStreamMarkEom(norm_tx_stream);  // the flush below marks eom
                    // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
                    // but makes sure content is delivered quickly.  Positive acknowledgements
                    // with flush override would make NORM more succinct here
284
                    NormStreamFlush (norm_tx_stream, true, NORM_FLUSH_ACTIVE);
285 286
                }
                // Need to pull and load a new message to send
287
                if (-1 == zmq_session->pull_msg (&tx_msg)) {
288
                    // We need to wait for "restart_output()" to be called by ZMQ
289 290 291
                    zmq_output_ready = false;
                    break;
                }
292
                zmq_encoder.load_msg (&tx_msg);
293 294 295 296 297 298 299
                // Should we write message size header for NORM to use? Or expect NORM
                // receiver to decode ZMQ message framing format(s)?
                // OK - we need to use a byte to denote when the ZMQ frame is the _first_
                //      frame of a message so it can be decoded properly when a receiver
                //      'syncs' mid-stream.  We key off the the state of the 'more_flag'
                //      I.e.,If  more_flag _was_ false previously, this is the first
                //      frame of a ZMQ message.
300
                if (tx_more_bit)
301 302
                    tx_buffer[0] =
                      (char) 0xff; // this is not first frame of message
303
                else
304 305
                    tx_buffer[0] = 0x00; // this is first frame of message
                tx_more_bit = (0 != (tx_msg.flags () & msg_t::more));
306 307 308
                // Go ahead an get a first chunk of the message
                bufPtr++;
                space--;
309
                tx_len = 1 + zmq_encoder.encode (&bufPtr, space);
310 311 312
                tx_index = 0;
            }
        }
bebopagogo's avatar
bebopagogo committed
313
        // Do we have data in our tx_buffer pending
314
        if (tx_index < tx_len) {
315
            // We have data in our tx_buffer to send, so write it to the stream
316 317 318
            tx_index += NormStreamWrite (norm_tx_stream, tx_buffer + tx_index,
                                         tx_len - tx_index);
            if (tx_index < tx_len) {
bebopagogo's avatar
bebopagogo committed
319 320 321 322
                // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
                norm_tx_ready = false;
                break;
            }
323
            tx_len = 0; // all buffered data was written
bebopagogo's avatar
bebopagogo committed
324
        }
325 326
    } // end while (zmq_output_ready && norm_tx_ready)
} // end zmq::norm_engine_t::send_data()
bebopagogo's avatar
bebopagogo committed
327

328
void zmq::norm_engine_t::in_event ()
bebopagogo's avatar
bebopagogo committed
329 330 331
{
    // This means a NormEvent is pending, so call NormGetNextEvent() and handle
    NormEvent event;
332
    if (!NormGetNextEvent (norm_instance, &event)) {
bebopagogo's avatar
bebopagogo committed
333
        // NORM has died before we unplugged?!
334
        zmq_assert (false);
bebopagogo's avatar
bebopagogo committed
335 336
        return;
    }
337

338
    switch (event.type) {
bebopagogo's avatar
bebopagogo committed
339 340
        case NORM_TX_QUEUE_VACANCY:
        case NORM_TX_QUEUE_EMPTY:
341
            if (!norm_tx_ready) {
bebopagogo's avatar
bebopagogo committed
342
                norm_tx_ready = true;
343
                send_data ();
bebopagogo's avatar
bebopagogo committed
344 345
            }
            break;
346

bebopagogo's avatar
bebopagogo committed
347
        case NORM_RX_OBJECT_NEW:
348
            //break;
bebopagogo's avatar
bebopagogo committed
349
        case NORM_RX_OBJECT_UPDATED:
350
            recv_data (event.object);
bebopagogo's avatar
bebopagogo committed
351
            break;
352

353 354 355 356
        case NORM_RX_OBJECT_ABORTED: {
            NormRxStreamState *rxState =
              (NormRxStreamState *) NormObjectGetUserData (event.object);
            if (NULL != rxState) {
357 358 359
                // Remove the state from the list it's in
                // This is now unnecessary since deletion takes care of list removal
                // but in the interest of being clear ...
360 361 362
                NormRxStreamState::List *list = rxState->AccessList ();
                if (NULL != list)
                    list->Remove (*rxState);
bebopagogo's avatar
bebopagogo committed
363
            }
364
            delete rxState;
bebopagogo's avatar
bebopagogo committed
365
            break;
366
        }
bebopagogo's avatar
bebopagogo committed
367 368 369 370 371 372 373 374 375
        case NORM_REMOTE_SENDER_INACTIVE:
            // Here we free resources used for this formerly active sender.
            // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
            //  get some messages delivered twice.  NORM_SYNC_CURRENT would
            // mitigate that but might miss data at startup. Always tradeoffs.
            // Instead of immediately deleting, we could instead initiate a
            // user configurable timeout here to wait some amount of time
            // after this event to declare the remote sender truly dead
            // and delete its state???
376
            NormNodeDelete (event.sender);
bebopagogo's avatar
bebopagogo committed
377
            break;
378

bebopagogo's avatar
bebopagogo committed
379
        default:
380
            // We ignore some NORM events
bebopagogo's avatar
bebopagogo committed
381 382
            break;
    }
383
} // zmq::norm_engine_t::in_event()
bebopagogo's avatar
bebopagogo committed
384

385
bool zmq::norm_engine_t::restart_input ()
bebopagogo's avatar
bebopagogo committed
386 387 388 389
{
    // TBD - should we check/assert that zmq_input_ready was false???
    zmq_input_ready = true;
    // Process any pending received messages
390 391
    if (!msg_ready_list.IsEmpty ())
        recv_data (NORM_OBJECT_INVALID);
392

393
    return true;
394
} // end zmq::norm_engine_t::restart_input()
bebopagogo's avatar
bebopagogo committed
395

396
void zmq::norm_engine_t::recv_data (NormObjectHandle object)
bebopagogo's avatar
bebopagogo committed
397
{
398
    if (NORM_OBJECT_INVALID != object) {
bebopagogo's avatar
bebopagogo committed
399 400 401
        // Call result of NORM_RX_OBJECT_UPDATED notification
        // This is a rx_ready indication for a new or existing rx stream
        // First, determine if this is a stream we already know
402
        zmq_assert (NORM_OBJECT_STREAM == NormObjectGetType (object));
bebopagogo's avatar
bebopagogo committed
403 404
        // Since there can be multiple senders (publishers), we keep
        // state for each separate rx stream.
405 406 407
        NormRxStreamState *rxState =
          (NormRxStreamState *) NormObjectGetUserData (object);
        if (NULL == rxState) {
bebopagogo's avatar
bebopagogo committed
408
            // This is a new stream, so create rxState with zmq decoder, etc
409
            rxState = new (std::nothrow)
410 411
              NormRxStreamState (object, options.maxmsgsize, options.zero_copy,
                                 options.in_batch_size);
412
            errno_assert (rxState);
413

414 415
            if (!rxState->Init ()) {
                errno_assert (false);
bebopagogo's avatar
bebopagogo committed
416 417 418
                delete rxState;
                return;
            }
419 420
            NormObjectSetUserData (object, rxState);
        } else if (!rxState->IsRxReady ()) {
bebopagogo's avatar
bebopagogo committed
421 422
            // Existing non-ready stream, so remove from pending
            // list to be promoted to rx_ready_list ...
423
            rx_pending_list.Remove (*rxState);
bebopagogo's avatar
bebopagogo committed
424
        }
425
        if (!rxState->IsRxReady ()) {
bebopagogo's avatar
bebopagogo committed
426
            // TBD - prepend up front for immediate service?
427 428
            rxState->SetRxReady (true);
            rx_ready_list.Append (*rxState);
bebopagogo's avatar
bebopagogo committed
429 430 431 432
        }
    }
    // This loop repeats until we've read all data available from "rx ready" inbound streams
    // and pushed any accumulated messages we can up to the zmq session.
433 434
    while (!rx_ready_list.IsEmpty ()
           || (zmq_input_ready && !msg_ready_list.IsEmpty ())) {
bebopagogo's avatar
bebopagogo committed
435 436
        // Iterate through our rx_ready streams, reading data into the decoder
        // (This services incoming "rx ready" streams in a round-robin fashion)
437 438 439 440 441
        NormRxStreamState::List::Iterator iterator (rx_ready_list);
        NormRxStreamState *rxState;
        while (NULL != (rxState = iterator.GetNextItem ())) {
            switch (rxState->Decode ()) {
                case 1: // msg completed
bebopagogo's avatar
bebopagogo committed
442
                    // Complete message decoded, move this stream to msg_ready_list
443
                    // to push the message up to the session below.  Note the stream
bebopagogo's avatar
bebopagogo committed
444
                    // will be returned to the "rx_ready_list" after that's done
445 446
                    rx_ready_list.Remove (*rxState);
                    msg_ready_list.Append (*rxState);
bebopagogo's avatar
bebopagogo committed
447
                    continue;
448

bebopagogo's avatar
bebopagogo committed
449 450
                case -1: // decoding error (shouldn't happen w/ NORM, but ...)
                    // We need to re-sync this stream (decoder buffer was reset)
451
                    rxState->SetSync (false);
bebopagogo's avatar
bebopagogo committed
452
                    break;
453

454
                default: // 0 - need more data
bebopagogo's avatar
bebopagogo committed
455 456 457
                    break;
            }
            // Get more data from this stream
458
            NormObjectHandle stream = rxState->GetStreamHandle ();
bebopagogo's avatar
bebopagogo committed
459
            // First, make sure we're in sync ...
460
            while (!rxState->InSync ()) {
bebopagogo's avatar
bebopagogo committed
461
                // seek NORM message start
462
                if (!NormStreamSeekMsgStart (stream)) {
bebopagogo's avatar
bebopagogo committed
463 464 465 466 467 468
                    // Need to wait for more data
                    break;
                }
                // read message 'flag' byte to see if this it's a 'final' frame
                char syncFlag;
                unsigned int numBytes = 1;
469
                if (!NormStreamRead (stream, &syncFlag, &numBytes)) {
bebopagogo's avatar
bebopagogo committed
470
                    // broken stream (shouldn't happen after seek msg start?)
471
                    zmq_assert (false);
bebopagogo's avatar
bebopagogo committed
472 473
                    continue;
                }
474
                if (0 == numBytes) {
bebopagogo's avatar
bebopagogo committed
475 476 477 478
                    // This probably shouldn't happen either since we found msg start
                    // Need to wait for more data
                    break;
                }
479 480
                if (0 == syncFlag)
                    rxState->SetSync (true);
bebopagogo's avatar
bebopagogo committed
481
                // else keep seeking ...
482 483
            } // end while(!rxState->InSync())
            if (!rxState->InSync ()) {
bebopagogo's avatar
bebopagogo committed
484 485
                // Need more data for this stream, so remove from "rx ready"
                // list and iterate to next "rx ready" stream
486
                rxState->SetRxReady (false);
bebopagogo's avatar
bebopagogo committed
487
                // Move from rx_ready_list to rx_pending_list
488 489
                rx_ready_list.Remove (*rxState);
                rx_pending_list.Append (*rxState);
bebopagogo's avatar
bebopagogo committed
490 491 492 493
                continue;
            }
            // Now we're actually ready to read data from the NORM stream to the zmq_decoder
            // the underlying zmq_decoder->get_buffer() call sets how much is needed.
494 495
            unsigned int numBytes = rxState->GetBytesNeeded ();
            if (!NormStreamRead (stream, rxState->AccessBuffer (), &numBytes)) {
bebopagogo's avatar
bebopagogo committed
496
                // broken NORM stream, so re-sync
497
                rxState->Init (); // TBD - check result
bebopagogo's avatar
bebopagogo committed
498 499 500 501
                // This will retry syncing, and getting data from this stream
                // since we don't increment the "it" iterator
                continue;
            }
502 503
            rxState->IncrementBufferCount (numBytes);
            if (0 == numBytes) {
bebopagogo's avatar
bebopagogo committed
504 505
                // All the data available has been read
                // Need to wait for NORM_RX_OBJECT_UPDATED for this stream
506
                rxState->SetRxReady (false);
bebopagogo's avatar
bebopagogo committed
507
                // Move from rx_ready_list to rx_pending_list
508 509
                rx_ready_list.Remove (*rxState);
                rx_pending_list.Append (*rxState);
bebopagogo's avatar
bebopagogo committed
510
            }
511
        } // end while(NULL != (rxState = iterator.GetNextItem()))
512

513
        if (zmq_input_ready) {
bebopagogo's avatar
bebopagogo committed
514
            // At this point, we've made a pass through the "rx_ready" stream list
515 516
            // Now make a pass through the "msg_pending" list (if the zmq session
            // ready for more input).  This may possibly return streams back to
bebopagogo's avatar
bebopagogo committed
517
            // the "rx ready" stream list after their pending message is handled
518 519 520 521 522 523 524
            NormRxStreamState::List::Iterator iterator (msg_ready_list);
            NormRxStreamState *rxState;
            while (NULL != (rxState = iterator.GetNextItem ())) {
                msg_t *msg = rxState->AccessMsg ();
                int rc = zmq_session->push_msg (msg);
                if (-1 == rc) {
                    if (EAGAIN == errno) {
bebopagogo's avatar
bebopagogo committed
525 526 527
                        // need to wait until session calls "restart_input()"
                        zmq_input_ready = false;
                        break;
528
                    } else {
bebopagogo's avatar
bebopagogo committed
529 530
                        // session rejected message?
                        // TBD - handle this better
531
                        zmq_assert (false);
bebopagogo's avatar
bebopagogo committed
532 533 534
                    }
                }
                // else message was accepted.
535 536 537 538 539 540 541 542 543 544
                msg_ready_list.Remove (*rxState);
                if (
                  rxState
                    ->IsRxReady ()) // Move back to "rx_ready" list to read more data
                    rx_ready_list.Append (*rxState);
                else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED
                    msg_ready_list.Append (*rxState);
            } // end while(NULL != (rxState = iterator.GetNextItem()))
        }     // end if (zmq_input_ready)
    } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
545

bebopagogo's avatar
bebopagogo committed
546
    // Alert zmq of the messages we have pushed up
547 548 549 550 551
    zmq_session->flush ();

} // end zmq::norm_engine_t::recv_data()

zmq::norm_engine_t::NormRxStreamState::NormRxStreamState (
552 553 554 555
  NormObjectHandle normStream,
  int64_t maxMsgSize,
  bool zeroCopy,
  int inBatchSize) :
556 557
    norm_stream (normStream),
    max_msg_size (maxMsgSize),
558
    zero_copy (zeroCopy),
559
    in_batch_size (inBatchSize),
560 561 562 563 564 565 566 567 568 569
    in_sync (false),
    rx_ready (false),
    zmq_decoder (NULL),
    skip_norm_sync (false),
    buffer_ptr (NULL),
    buffer_size (0),
    buffer_count (0),
    prev (NULL),
    next (NULL),
    list (NULL)
bebopagogo's avatar
bebopagogo committed
570 571 572
{
}

573
zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState ()
bebopagogo's avatar
bebopagogo committed
574
{
575
    if (NULL != zmq_decoder) {
bebopagogo's avatar
bebopagogo committed
576 577 578
        delete zmq_decoder;
        zmq_decoder = NULL;
    }
579 580
    if (NULL != list) {
        list->Remove (*this);
bebopagogo's avatar
bebopagogo committed
581 582 583 584
        list = NULL;
    }
}

585
bool zmq::norm_engine_t::NormRxStreamState::Init ()
bebopagogo's avatar
bebopagogo committed
586 587 588
{
    in_sync = false;
    skip_norm_sync = false;
589 590
    if (NULL != zmq_decoder)
        delete zmq_decoder;
591 592
    zmq_decoder =
      new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy);
bebopagogo's avatar
bebopagogo committed
593
    alloc_assert (zmq_decoder);
594
    if (NULL != zmq_decoder) {
bebopagogo's avatar
bebopagogo committed
595 596
        buffer_count = 0;
        buffer_size = 0;
597
        zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
bebopagogo's avatar
bebopagogo committed
598
        return true;
599
    } else {
bebopagogo's avatar
bebopagogo committed
600 601
        return false;
    }
602
} // end zmq::norm_engine_t::NormRxStreamState::Init()
bebopagogo's avatar
bebopagogo committed
603 604 605

// This decodes any pending data sitting in our stream decoder buffer
// It returns 1 upon message completion, -1 on error, 1 on msg completion
606
int zmq::norm_engine_t::NormRxStreamState::Decode ()
bebopagogo's avatar
bebopagogo committed
607 608
{
    // If we have pending bytes to decode, process those first
609
    while (buffer_count > 0) {
bebopagogo's avatar
bebopagogo committed
610 611
        // There's pending data for the decoder to decode
        size_t processed = 0;
612

bebopagogo's avatar
bebopagogo committed
613 614 615
        // This a bit of a kludgy approach used to weed
        // out the NORM ZMQ message transport "syncFlag" byte
        // from the ZMQ message stream being decoded (but it works!)
616
        if (skip_norm_sync) {
bebopagogo's avatar
bebopagogo committed
617 618 619 620
            buffer_ptr++;
            buffer_count--;
            skip_norm_sync = false;
        }
621

622
        int rc = zmq_decoder->decode (buffer_ptr, buffer_count, processed);
bebopagogo's avatar
bebopagogo committed
623 624
        buffer_ptr += processed;
        buffer_count -= processed;
625
        switch (rc) {
bebopagogo's avatar
bebopagogo committed
626 627
            case 1:
                // msg completed
628
                if (0 == buffer_count) {
bebopagogo's avatar
bebopagogo committed
629
                    buffer_size = 0;
630
                    zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
bebopagogo's avatar
bebopagogo committed
631 632 633 634 635 636
                }
                skip_norm_sync = true;
                return 1;
            case -1:
                // decoder error (reset decoder and state variables)
                in_sync = false;
637 638
                skip_norm_sync = false; // will get consumed by norm sync check
                Init ();
bebopagogo's avatar
bebopagogo committed
639
                break;
640

bebopagogo's avatar
bebopagogo committed
641 642 643 644 645 646 647 648
            case 0:
                // need more data, keep decoding until buffer exhausted
                break;
        }
    }
    // Reset buffer pointer/count for next read
    buffer_count = 0;
    buffer_size = 0;
649 650
    zmq_decoder->get_buffer (&buffer_ptr, &buffer_size);
    return 0; //  need more data
651

652
} // end zmq::norm_engine_t::NormRxStreamState::Decode()
bebopagogo's avatar
bebopagogo committed
653

654
zmq::norm_engine_t::NormRxStreamState::List::List () : head (NULL), tail (NULL)
bebopagogo's avatar
bebopagogo committed
655 656 657
{
}

658
zmq::norm_engine_t::NormRxStreamState::List::~List ()
bebopagogo's avatar
bebopagogo committed
659
{
660
    Destroy ();
bebopagogo's avatar
bebopagogo committed
661 662
}

663
void zmq::norm_engine_t::NormRxStreamState::List::Destroy ()
bebopagogo's avatar
bebopagogo committed
664
{
665 666 667
    NormRxStreamState *item = head;
    while (NULL != item) {
        Remove (*item);
bebopagogo's avatar
bebopagogo committed
668 669 670
        delete item;
        item = head;
    }
671
} // end zmq::norm_engine_t::NormRxStreamState::List::Destroy()
bebopagogo's avatar
bebopagogo committed
672

673 674
void zmq::norm_engine_t::NormRxStreamState::List::Append (
  NormRxStreamState &item)
bebopagogo's avatar
bebopagogo committed
675 676 677 678 679 680 681 682 683
{
    item.prev = tail;
    if (NULL != tail)
        tail->next = &item;
    else
        head = &item;
    item.next = NULL;
    tail = &item;
    item.list = this;
684
} // end zmq::norm_engine_t::NormRxStreamState::List::Append()
bebopagogo's avatar
bebopagogo committed
685

686 687
void zmq::norm_engine_t::NormRxStreamState::List::Remove (
  NormRxStreamState &item)
bebopagogo's avatar
bebopagogo committed
688 689 690 691 692 693
{
    if (NULL != item.prev)
        item.prev->next = item.next;
    else
        head = item.next;
    if (NULL != item.next)
694
        item.next->prev = item.prev;
bebopagogo's avatar
bebopagogo committed
695 696 697 698
    else
        tail = item.prev;
    item.prev = item.next = NULL;
    item.list = NULL;
699
} // end zmq::norm_engine_t::NormRxStreamState::List::Remove()
bebopagogo's avatar
bebopagogo committed
700

701 702 703
zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator (
  const List &list) :
    next_item (list.head)
bebopagogo's avatar
bebopagogo committed
704 705 706
{
}

707 708
zmq::norm_engine_t::NormRxStreamState *
zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem ()
bebopagogo's avatar
bebopagogo committed
709
{
710 711 712
    NormRxStreamState *nextItem = next_item;
    if (NULL != nextItem)
        next_item = nextItem->next;
bebopagogo's avatar
bebopagogo committed
713
    return nextItem;
714
} // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
715

716
const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const
717
{
718
    return _empty_endpoint;
719
}
bebopagogo's avatar
bebopagogo committed
720 721

#endif // ZMQ_HAVE_NORM