norm_engine.hpp 5.27 KB
Newer Older
bebopagogo's avatar
bebopagogo committed
1 2 3 4 5 6 7 8 9 10 11 12

#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__

#if defined ZMQ_HAVE_NORM

#include "io_object.hpp"
#include "i_engine.hpp"
#include "options.hpp"
#include "v2_decoder.hpp"
#include "v2_encoder.hpp"

13
#include <normApi.h>
bebopagogo's avatar
bebopagogo committed
14 15 16

namespace zmq
{
17
class io_thread_t;
18
class msg_t;
19
class session_base_t;
20

21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
class norm_engine_t : public io_object_t, public i_engine
{
  public:
    norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
    ~norm_engine_t ();

    // create NORM instance, session, etc
    int init (const char *network_, bool send, bool recv);
    void shutdown ();

    //  i_engine interface implementation.
    //  Plug the engine to the session.
    virtual void plug (zmq::io_thread_t *io_thread_,
                       class session_base_t *session_);

    //  Terminate and deallocate the engine. Note that 'detached'
    //  events are not fired on termination.
    virtual void terminate ();

    //  This method is called by the session to signalise that more
    //  messages can be written to the pipe.
42
    virtual bool restart_input ();
43 44 45 46

    //  This method is called by the session to signalise that there
    //  are messages to send available.
    virtual void restart_output ();
47

48
    virtual void zap_msg_available (){};
49

50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
    virtual const char *get_endpoint () const;

    // i_poll_events interface implementation.
    // (we only need in_event() for NormEvent notification)
    // (i.e., don't have any output events or timers (yet))
    void in_event ();

  private:
    void unplug ();
    void send_data ();
    void recv_data (NormObjectHandle stream);


    enum
    {
        BUFFER_SIZE = 2048
    };

    // Used to keep track of streams from multiple senders
    class NormRxStreamState
    {
      public:
72 73 74
        NormRxStreamState (NormObjectHandle normStream,
                           int64_t maxMsgSize,
                           bool zeroCopy);
75
        ~NormRxStreamState ();
bebopagogo's avatar
bebopagogo committed
76

77
        NormObjectHandle GetStreamHandle () const { return norm_stream; }
bebopagogo's avatar
bebopagogo committed
78

79
        bool Init ();
bebopagogo's avatar
bebopagogo committed
80

81 82
        void SetRxReady (bool state) { rx_ready = state; }
        bool IsRxReady () const { return rx_ready; }
bebopagogo's avatar
bebopagogo committed
83

84 85
        void SetSync (bool state) { in_sync = state; }
        bool InSync () const { return in_sync; }
86

87 88 89 90 91 92 93 94 95 96 97 98
        // These are used to feed data to decoder
        // and its underlying "msg" buffer
        char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); }
        size_t GetBytesNeeded () const { return (buffer_size - buffer_count); }
        void IncrementBufferCount (size_t count) { buffer_count += count; }
        msg_t *AccessMsg () { return zmq_decoder->msg (); }
        // This invokes the decoder "decode" method
        // returning 0 if more data is needed,
        // 1 if the message is complete, If an error
        // occurs the 'sync' is dropped and the
        // decoder re-initialized
        int Decode ();
99

100 101 102 103 104
        class List
        {
          public:
            List ();
            ~List ();
105

106 107
            void Append (NormRxStreamState &item);
            void Remove (NormRxStreamState &item);
108

109
            bool IsEmpty () const { return (NULL == head); }
110

111
            void Destroy ();
112

113
            class Iterator
bebopagogo's avatar
bebopagogo committed
114
            {
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
              public:
                Iterator (const List &list);
                NormRxStreamState *GetNextItem ();

              private:
                NormRxStreamState *next_item;
            };
            friend class Iterator;

          private:
            NormRxStreamState *head;
            NormRxStreamState *tail;

        }; // end class zmq::norm_engine_t::NormRxStreamState::List

        friend class List;

        List *AccessList () { return list; }


      private:
        NormObjectHandle norm_stream;
        int64_t max_msg_size;
138
        bool zero_copy;
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
        bool in_sync;
        bool rx_ready;
        v2_decoder_t *zmq_decoder;
        bool skip_norm_sync;
        unsigned char *buffer_ptr;
        size_t buffer_size;
        size_t buffer_count;

        NormRxStreamState *prev;
        NormRxStreamState *next;
        NormRxStreamState::List *list;

    }; // end class zmq::norm_engine_t::NormRxStreamState

    session_base_t *zmq_session;
    options_t options;
    NormInstanceHandle norm_instance;
    handle_t norm_descriptor_handle;
    NormSessionHandle norm_session;
    bool is_sender;
    bool is_receiver;
    // Sender state
    msg_t tx_msg;
    v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
    NormObjectHandle norm_tx_stream;
    bool tx_first_msg;
    bool tx_more_bit;
    bool zmq_output_ready; // zmq has msg(s) to send
    bool norm_tx_ready;    // norm has tx queue vacancy
    // TBD - maybe don't need buffer if can access zmq message buffer directly?
    char tx_buffer[BUFFER_SIZE];
    unsigned int tx_index;
    unsigned int tx_len;

    // Receiver state
    // Lists of norm rx streams from remote senders
    bool zmq_input_ready; // zmq ready to receive msg(s)
    NormRxStreamState::List
      rx_pending_list; // rx streams waiting for data reception
    NormRxStreamState::List
      rx_ready_list; // rx streams ready for NormStreamRead()
    NormRxStreamState::List
      msg_ready_list; // rx streams w/ msg ready for push to zmq


}; // end class norm_engine_t
bebopagogo's avatar
bebopagogo committed
185 186 187 188 189
}

#endif // ZMQ_HAVE_NORM

#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__