norm_engine.hpp 7.02 KB
Newer Older
bebopagogo's avatar
bebopagogo committed
1 2 3 4 5 6 7 8 9 10 11 12

#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__

#if defined ZMQ_HAVE_NORM

#include "io_object.hpp"
#include "i_engine.hpp"
#include "options.hpp"
#include "v2_decoder.hpp"
#include "v2_encoder.hpp"

13
#include <normApi.h>
bebopagogo's avatar
bebopagogo committed
14 15 16 17 18

namespace zmq
{
    class io_thread_t;
    class session_base_t;
19

bebopagogo's avatar
bebopagogo committed
20 21 22 23 24
    class norm_engine_t : public io_object_t, public i_engine
    {
        public:
            norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
            ~norm_engine_t ();
25

bebopagogo's avatar
bebopagogo committed
26 27 28
            // create NORM instance, session, etc
            int init(const char* network_, bool send, bool recv);
            void shutdown();
29

bebopagogo's avatar
bebopagogo committed
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
            //  i_engine interface implementation.
            //  Plug the engine to the session.
            virtual void plug (zmq::io_thread_t *io_thread_,
                               class session_base_t *session_);

            //  Terminate and deallocate the engine. Note that 'detached'
            //  events are not fired on termination.
            virtual void terminate ();

            //  This method is called by the session to signalise that more
            //  messages can be written to the pipe.
            virtual void restart_input ();

            //  This method is called by the session to signalise that there
            //  are messages to send available.
            virtual void restart_output ();

            virtual void zap_msg_available () {};
48

bebopagogo's avatar
bebopagogo committed
49 50 51 52
            // i_poll_events interface implementation.
            // (we only need in_event() for NormEvent notification)
            // (i.e., don't have any output events or timers (yet))
            void in_event ();
53

bebopagogo's avatar
bebopagogo committed
54 55 56
        private:
            void unplug();
            void send_data();
57 58 59
            void recv_data(NormObjectHandle stream);


bebopagogo's avatar
bebopagogo committed
60
            enum {BUFFER_SIZE = 2048};
61 62

            // Used to keep track of streams from multiple senders
bebopagogo's avatar
bebopagogo committed
63 64 65 66 67 68
            class NormRxStreamState
            {
                public:
                    NormRxStreamState(NormObjectHandle normStream,
                                      int64_t          maxMsgSize);
                    ~NormRxStreamState();
69

bebopagogo's avatar
bebopagogo committed
70 71
                    NormObjectHandle GetStreamHandle() const
                        {return norm_stream;}
72

bebopagogo's avatar
bebopagogo committed
73
                    bool Init();
74

bebopagogo's avatar
bebopagogo committed
75 76 77 78
                    void SetRxReady(bool state)
                        {rx_ready = state;}
                    bool IsRxReady() const
                        {return rx_ready;}
79

bebopagogo's avatar
bebopagogo committed
80 81 82 83
                    void SetSync(bool state)
                        {in_sync = state;}
                    bool InSync() const
                        {return in_sync;}
84

bebopagogo's avatar
bebopagogo committed
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
                    // These are used to feed data to decoder
                    // and its underlying "msg" buffer
                    char* AccessBuffer()
                        {return (char*)(buffer_ptr + buffer_count);}
                    size_t GetBytesNeeded() const
                        {return (buffer_size - buffer_count);}
                    void IncrementBufferCount(size_t count)
                        {buffer_count += count;}
                    msg_t* AccessMsg()
                        {return zmq_decoder->msg();}
                    // This invokes the decoder "decode" method
                    // returning 0 if more data is needed,
                    // 1 if the message is complete, If an error
                    // occurs the 'sync' is dropped and the
                    // decoder re-initialized
                    int Decode();
101

bebopagogo's avatar
bebopagogo committed
102 103 104 105 106
                    class List
                    {
                        public:
                            List();
                            ~List();
107

bebopagogo's avatar
bebopagogo committed
108 109
                            void Append(NormRxStreamState& item);
                            void Remove(NormRxStreamState& item);
110

bebopagogo's avatar
bebopagogo committed
111 112
                            bool IsEmpty() const
                                {return (NULL == head);}
113

bebopagogo's avatar
bebopagogo committed
114
                            void Destroy();
115

bebopagogo's avatar
bebopagogo committed
116 117 118 119 120 121 122 123 124
                            class Iterator
                            {
                                public:
                                    Iterator(const List& list);
                                    NormRxStreamState* GetNextItem();
                                private:
                                    NormRxStreamState* next_item;
                            };
                            friend class Iterator;
125

bebopagogo's avatar
bebopagogo committed
126 127
                        private:
                            NormRxStreamState*  head;
128 129
                            NormRxStreamState*  tail;

bebopagogo's avatar
bebopagogo committed
130
                    };  // end class zmq::norm_engine_t::NormRxStreamState::List
131

bebopagogo's avatar
bebopagogo committed
132
                    friend class List;
133

bebopagogo's avatar
bebopagogo committed
134 135
                    List* AccessList()
                        {return list;}
136 137


bebopagogo's avatar
bebopagogo committed
138 139 140
                private:
                    NormObjectHandle            norm_stream;
                    int64_t                     max_msg_size;
141
                    bool                        in_sync;
bebopagogo's avatar
bebopagogo committed
142 143 144 145 146 147
                    bool                        rx_ready;
                    v2_decoder_t*               zmq_decoder;
                    bool                        skip_norm_sync;
                    unsigned char*              buffer_ptr;
                    size_t                      buffer_size;
                    size_t                      buffer_count;
148

bebopagogo's avatar
bebopagogo committed
149 150 151
                    NormRxStreamState*          prev;
                    NormRxStreamState*          next;
                    NormRxStreamState::List*    list;
152

bebopagogo's avatar
bebopagogo committed
153
            };  // end class zmq::norm_engine_t::NormRxStreamState
154

bebopagogo's avatar
bebopagogo committed
155 156 157 158 159 160 161 162 163
            session_base_t*         zmq_session;
            options_t               options;
            NormInstanceHandle      norm_instance;
            handle_t                norm_descriptor_handle;
            NormSessionHandle       norm_session;
            bool                    is_sender;
            bool                    is_receiver;
            // Sender state
            msg_t                   tx_msg;
164
            v2_encoder_t            zmq_encoder;    // for tx messages (we use v2 for now)
165 166
            NormObjectHandle        norm_tx_stream;
            bool                    tx_first_msg;
bebopagogo's avatar
bebopagogo committed
167
            bool                    tx_more_bit;
168
            bool                    zmq_output_ready; // zmq has msg(s) to send
bebopagogo's avatar
bebopagogo committed
169
            bool                    norm_tx_ready;    // norm has tx queue vacancy
170
            // TBD - maybe don't need buffer if can access zmq message buffer directly?
bebopagogo's avatar
bebopagogo committed
171 172 173
            char                    tx_buffer[BUFFER_SIZE];
            unsigned int            tx_index;
            unsigned int            tx_len;
174

bebopagogo's avatar
bebopagogo committed
175 176 177 178 179 180
            // Receiver state
            // Lists of norm rx streams from remote senders
            bool                    zmq_input_ready; // zmq ready to receive msg(s)
            NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception
            NormRxStreamState::List rx_ready_list;   // rx streams ready for NormStreamRead()
            NormRxStreamState::List msg_ready_list;  // rx streams w/ msg ready for push to zmq
181 182


bebopagogo's avatar
bebopagogo committed
183 184 185 186 187 188
    };  // end class norm_engine_t
}

#endif // ZMQ_HAVE_NORM

#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__