norm_engine.hpp 7.07 KB
Newer Older
bebopagogo's avatar
bebopagogo committed
1 2 3 4 5 6 7 8 9 10 11 12

#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__

#if defined ZMQ_HAVE_NORM

#include "io_object.hpp"
#include "i_engine.hpp"
#include "options.hpp"
#include "v2_decoder.hpp"
#include "v2_encoder.hpp"

13
#include <normApi.h>
bebopagogo's avatar
bebopagogo committed
14 15 16 17 18

namespace zmq
{
    class io_thread_t;
    class session_base_t;
19

bebopagogo's avatar
bebopagogo committed
20 21 22 23 24
    class norm_engine_t : public io_object_t, public i_engine
    {
        public:
            norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
            ~norm_engine_t ();
25

bebopagogo's avatar
bebopagogo committed
26 27 28
            // create NORM instance, session, etc
            int init(const char* network_, bool send, bool recv);
            void shutdown();
29

bebopagogo's avatar
bebopagogo committed
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
            //  i_engine interface implementation.
            //  Plug the engine to the session.
            virtual void plug (zmq::io_thread_t *io_thread_,
                               class session_base_t *session_);

            //  Terminate and deallocate the engine. Note that 'detached'
            //  events are not fired on termination.
            virtual void terminate ();

            //  This method is called by the session to signalise that more
            //  messages can be written to the pipe.
            virtual void restart_input ();

            //  This method is called by the session to signalise that there
            //  are messages to send available.
            virtual void restart_output ();

            virtual void zap_msg_available () {};
48

49 50
            virtual const char *get_endpoint () const;

bebopagogo's avatar
bebopagogo committed
51 52 53 54
            // i_poll_events interface implementation.
            // (we only need in_event() for NormEvent notification)
            // (i.e., don't have any output events or timers (yet))
            void in_event ();
55

bebopagogo's avatar
bebopagogo committed
56 57 58
        private:
            void unplug();
            void send_data();
59 60 61
            void recv_data(NormObjectHandle stream);


bebopagogo's avatar
bebopagogo committed
62
            enum {BUFFER_SIZE = 2048};
63 64

            // Used to keep track of streams from multiple senders
bebopagogo's avatar
bebopagogo committed
65 66 67 68 69 70
            class NormRxStreamState
            {
                public:
                    NormRxStreamState(NormObjectHandle normStream,
                                      int64_t          maxMsgSize);
                    ~NormRxStreamState();
71

bebopagogo's avatar
bebopagogo committed
72 73
                    NormObjectHandle GetStreamHandle() const
                        {return norm_stream;}
74

bebopagogo's avatar
bebopagogo committed
75
                    bool Init();
76

bebopagogo's avatar
bebopagogo committed
77 78 79 80
                    void SetRxReady(bool state)
                        {rx_ready = state;}
                    bool IsRxReady() const
                        {return rx_ready;}
81

bebopagogo's avatar
bebopagogo committed
82 83 84 85
                    void SetSync(bool state)
                        {in_sync = state;}
                    bool InSync() const
                        {return in_sync;}
86

bebopagogo's avatar
bebopagogo committed
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
                    // These are used to feed data to decoder
                    // and its underlying "msg" buffer
                    char* AccessBuffer()
                        {return (char*)(buffer_ptr + buffer_count);}
                    size_t GetBytesNeeded() const
                        {return (buffer_size - buffer_count);}
                    void IncrementBufferCount(size_t count)
                        {buffer_count += count;}
                    msg_t* AccessMsg()
                        {return zmq_decoder->msg();}
                    // This invokes the decoder "decode" method
                    // returning 0 if more data is needed,
                    // 1 if the message is complete, If an error
                    // occurs the 'sync' is dropped and the
                    // decoder re-initialized
                    int Decode();
103

bebopagogo's avatar
bebopagogo committed
104 105 106 107 108
                    class List
                    {
                        public:
                            List();
                            ~List();
109

bebopagogo's avatar
bebopagogo committed
110 111
                            void Append(NormRxStreamState& item);
                            void Remove(NormRxStreamState& item);
112

bebopagogo's avatar
bebopagogo committed
113 114
                            bool IsEmpty() const
                                {return (NULL == head);}
115

bebopagogo's avatar
bebopagogo committed
116
                            void Destroy();
117

bebopagogo's avatar
bebopagogo committed
118 119 120 121 122 123 124 125 126
                            class Iterator
                            {
                                public:
                                    Iterator(const List& list);
                                    NormRxStreamState* GetNextItem();
                                private:
                                    NormRxStreamState* next_item;
                            };
                            friend class Iterator;
127

bebopagogo's avatar
bebopagogo committed
128 129
                        private:
                            NormRxStreamState*  head;
130 131
                            NormRxStreamState*  tail;

bebopagogo's avatar
bebopagogo committed
132
                    };  // end class zmq::norm_engine_t::NormRxStreamState::List
133

bebopagogo's avatar
bebopagogo committed
134
                    friend class List;
135

bebopagogo's avatar
bebopagogo committed
136 137
                    List* AccessList()
                        {return list;}
138 139


bebopagogo's avatar
bebopagogo committed
140 141 142
                private:
                    NormObjectHandle            norm_stream;
                    int64_t                     max_msg_size;
143
                    bool                        in_sync;
bebopagogo's avatar
bebopagogo committed
144 145 146 147 148 149
                    bool                        rx_ready;
                    v2_decoder_t*               zmq_decoder;
                    bool                        skip_norm_sync;
                    unsigned char*              buffer_ptr;
                    size_t                      buffer_size;
                    size_t                      buffer_count;
150

bebopagogo's avatar
bebopagogo committed
151 152 153
                    NormRxStreamState*          prev;
                    NormRxStreamState*          next;
                    NormRxStreamState::List*    list;
154

bebopagogo's avatar
bebopagogo committed
155
            };  // end class zmq::norm_engine_t::NormRxStreamState
156

bebopagogo's avatar
bebopagogo committed
157 158 159 160 161 162 163 164 165
            session_base_t*         zmq_session;
            options_t               options;
            NormInstanceHandle      norm_instance;
            handle_t                norm_descriptor_handle;
            NormSessionHandle       norm_session;
            bool                    is_sender;
            bool                    is_receiver;
            // Sender state
            msg_t                   tx_msg;
166
            v2_encoder_t            zmq_encoder;    // for tx messages (we use v2 for now)
167 168
            NormObjectHandle        norm_tx_stream;
            bool                    tx_first_msg;
bebopagogo's avatar
bebopagogo committed
169
            bool                    tx_more_bit;
170
            bool                    zmq_output_ready; // zmq has msg(s) to send
bebopagogo's avatar
bebopagogo committed
171
            bool                    norm_tx_ready;    // norm has tx queue vacancy
172
            // TBD - maybe don't need buffer if can access zmq message buffer directly?
bebopagogo's avatar
bebopagogo committed
173 174 175
            char                    tx_buffer[BUFFER_SIZE];
            unsigned int            tx_index;
            unsigned int            tx_len;
176

bebopagogo's avatar
bebopagogo committed
177 178 179 180 181 182
            // Receiver state
            // Lists of norm rx streams from remote senders
            bool                    zmq_input_ready; // zmq ready to receive msg(s)
            NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception
            NormRxStreamState::List rx_ready_list;   // rx streams ready for NormStreamRead()
            NormRxStreamState::List msg_ready_list;  // rx streams w/ msg ready for push to zmq
183 184


bebopagogo's avatar
bebopagogo committed
185 186 187 188 189 190
    };  // end class norm_engine_t
}

#endif // ZMQ_HAVE_NORM

#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__