decoder.hpp 7.87 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
Martin Sustrik's avatar
Martin Sustrik committed
2
    Copyright (c) 2009-2011 250bpm s.r.o.
3
    Copyright (c) 2007-2009 iMatix Corporation
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
5 6 7 8

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
9
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
10 11 12 13 14 15
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
17

18
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
19 20 21
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

Martin Sustrik's avatar
Martin Sustrik committed
22 23
#ifndef __ZMQ_DECODER_HPP_INCLUDED__
#define __ZMQ_DECODER_HPP_INCLUDED__
Martin Sustrik's avatar
Martin Sustrik committed
24 25 26

#include <stddef.h>
#include <string.h>
27
#include <stdlib.h>
Martin Sustrik's avatar
Martin Sustrik committed
28 29
#include <algorithm>

30
#include "err.hpp"
31
#include "msg.hpp"
32
#include "i_decoder.hpp"
33
#include "stdint.hpp"
34

Martin Sustrik's avatar
Martin Sustrik committed
35
namespace zmq
Martin Sustrik's avatar
Martin Sustrik committed
36 37
{

38
    class i_msg_sink;
39

Martin Sustrik's avatar
Martin Sustrik committed
40 41
    //  Helper base class for decoders that know the amount of data to read
    //  in advance at any moment. Knowing the amount in advance is a property
42 43 44 45 46
    //  of the protocol used. 0MQ framing protocol is based size-prefixed
    //  paradigm, whixh qualifies it to be parsed by this class.
    //  On the other hand, XML-based transports (like XMPP or SOAP) don't allow
    //  for knowing the size of data to read in advance and should use different
    //  decoding algorithms.
Martin Sustrik's avatar
Martin Sustrik committed
47
    //
48
    //  This class implements the state machine that parses the incoming buffer.
Martin Sustrik's avatar
Martin Sustrik committed
49 50
    //  Derived class should implement individual state machine actions.

51
    template <typename T> class decoder_base_t : public i_decoder
Martin Sustrik's avatar
Martin Sustrik committed
52 53 54
    {
    public:

55
        inline decoder_base_t (size_t bufsize_) :
Martin Hurton's avatar
Martin Hurton committed
56
            next (NULL),
57
            read_pos (NULL),
Martin Sustrik's avatar
Martin Sustrik committed
58
            to_read (0),
59 60 61
            bufsize (bufsize_)
        {
            buf = (unsigned char*) malloc (bufsize_);
62
            alloc_assert (buf);
63 64
        }

65 66
        //  The destructor doesn't have to be virtual. It is mad virtual
        //  just to keep ICC and code checking tools from complaining.
67
        inline virtual ~decoder_base_t ()
68 69 70 71 72 73
        {
            free (buf);
        }

        //  Returns a buffer to be filled with binary data.
        inline void get_buffer (unsigned char **data_, size_t *size_)
Martin Sustrik's avatar
Martin Sustrik committed
74
        {
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
            //  If we are expected to read large message, we'll opt for zero-
            //  copy, i.e. we'll ask caller to fill the data directly to the
            //  message. Note that subsequent read(s) are non-blocking, thus
            //  each single read reads at most SO_RCVBUF bytes at once not
            //  depending on how large is the chunk returned from here.
            //  As a consequence, large messages being received won't block
            //  other engines running in the same I/O thread for excessive
            //  amounts of time.
            if (to_read >= bufsize) {
                *data_ = read_pos;
                *size_ = to_read;
                return;
            }

            *data_ = buf;
            *size_ = bufsize;
Martin Sustrik's avatar
Martin Sustrik committed
91 92
        }

93 94 95 96 97
        //  Processes the data in the buffer previously allocated using
        //  get_buffer function. size_ argument specifies nemuber of bytes
        //  actually filled into the buffer. Function returns number of
        //  bytes actually processed.
        inline size_t process_buffer (unsigned char *data_, size_t size_)
Martin Sustrik's avatar
Martin Sustrik committed
98
        {
99 100 101 102
            //  Check if we had an error in previous attempt.
            if (unlikely (!(static_cast <T*> (this)->next)))
                return (size_t) -1;

103 104 105 106 107 108 109
            //  In case of zero-copy simply adjust the pointers, no copying
            //  is required. Also, run the state machine in case all the data
            //  were processed.
            if (data_ == read_pos) {
                read_pos += size_;
                to_read -= size_;

110 111 112 113
                while (!to_read) {
                    if (!(static_cast <T*> (this)->*next) ()) {
                        if (unlikely (!(static_cast <T*> (this)->next)))
                            return (size_t) -1;
114
                        return size_;
115 116
                    }
                }
117 118 119
                return size_;
            }

Martin Sustrik's avatar
Martin Sustrik committed
120 121
            size_t pos = 0;
            while (true) {
122 123 124

                //  Try to get more space in the message to fill in.
                //  If none is available, return.
125 126 127 128
                while (!to_read) {
                    if (!(static_cast <T*> (this)->*next) ()) {
                        if (unlikely (!(static_cast <T*> (this)->next)))
                            return (size_t) -1;
Martin Sustrik's avatar
Martin Sustrik committed
129
                        return pos;
130 131
                    }
                }
132 133

                //  If there are no more data in the buffer, return.
Martin Sustrik's avatar
Martin Sustrik committed
134 135
                if (pos == size_)
                    return pos;
136 137 138 139 140 141 142

                //  Copy the data from buffer to the message.
                size_t to_copy = std::min (to_read, size_ - pos);
                memcpy (read_pos, data_ + pos, to_copy);
                read_pos += to_copy;
                pos += to_copy;
                to_read -= to_copy;
Martin Sustrik's avatar
Martin Sustrik committed
143 144 145
            }
        }

Martin Hurton's avatar
Martin Hurton committed
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
        //  Returns true if the decoder has been fed all required data
        //  but cannot proceed with the next decoding step.
        //  False is returned if the decoder has encountered an error.
        bool stalled ()
        {
            while (!to_read) {
                if (!(static_cast <T*> (this)->*next) ()) {
                    if (unlikely (!(static_cast <T*> (this)->next)))
                        return false;
                    return true;
                }
            }

            return false;
        }

Martin Hurton's avatar
Martin Hurton committed
162 163 164
        inline bool message_ready_size (size_t msg_sz)
        {
            zmq_assert (false);
165 166 167
            return false;
        }

Martin Sustrik's avatar
Martin Sustrik committed
168 169 170 171 172 173 174 175
    protected:

        //  Prototype of state machine action. Action should return false if
        //  it is unable to push the data to the system.
        typedef bool (T::*step_t) ();

        //  This function should be called from derived class to read data
        //  from the buffer and schedule next state machine action.
176
        inline void next_step (void *read_pos_, size_t to_read_,
Martin Sustrik's avatar
Martin Sustrik committed
177 178
            step_t next_)
        {
179
            read_pos = (unsigned char*) read_pos_;
Martin Sustrik's avatar
Martin Sustrik committed
180 181 182 183
            to_read = to_read_;
            next = next_;
        }

184 185 186 187 188 189 190
        //  This function should be called from the derived class to
        //  abort decoder state machine.
        inline void decoding_error ()
        {
            next = NULL;
        }

Martin Hurton's avatar
Martin Hurton committed
191 192
    private:

Martin Hurton's avatar
Martin Hurton committed
193 194 195 196 197
        //  Next step. If set to NULL, it means that associated data stream
        //  is dead. Note that there can be still data in the process in such
        //  case.
        step_t next;

198
        //  Where to store the read data.
199
        unsigned char *read_pos;
200 201

        //  How much data to read before taking next step.
Martin Sustrik's avatar
Martin Sustrik committed
202
        size_t to_read;
203 204

        //  The duffer for data to decode.
205 206 207
        size_t bufsize;
        unsigned char *buf;

208
        decoder_base_t (const decoder_base_t&);
209
        const decoder_base_t &operator = (const decoder_base_t&);
210 211 212 213 214 215 216 217
    };

    //  Decoder for 0MQ framing protocol. Converts data batches into messages.

    class decoder_t : public decoder_base_t <decoder_t>
    {
    public:

218
        decoder_t (size_t bufsize_, int64_t maxmsgsize_);
219 220
        ~decoder_t ();

221 222
        //  Set the receiver of decoded messages.
        void set_msg_sink (i_msg_sink *msg_sink_);
223 224 225 226 227 228 229 230

    private:

        bool one_byte_size_ready ();
        bool eight_byte_size_ready ();
        bool flags_ready ();
        bool message_ready ();

231
        i_msg_sink *msg_sink;
232
        unsigned char tmpbuf [8];
233
        msg_t in_progress;
234

235 236
        int64_t maxmsgsize;

Martin Sustrik's avatar
Martin Sustrik committed
237 238 239 240 241 242 243
        decoder_t (const decoder_t&);
        void operator = (const decoder_t&);
    };

}

#endif
244