decoder.hpp 7.24 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

Martin Sustrik's avatar
Martin Sustrik committed
20 21
#ifndef __ZMQ_DECODER_HPP_INCLUDED__
#define __ZMQ_DECODER_HPP_INCLUDED__
Martin Sustrik's avatar
Martin Sustrik committed
22 23 24

#include <stddef.h>
#include <string.h>
25
#include <stdlib.h>
Martin Sustrik's avatar
Martin Sustrik committed
26 27
#include <algorithm>

28
#include "err.hpp"
29
#include "msg.hpp"
30
#include "i_decoder.hpp"
31
#include "stdint.hpp"
32

Martin Sustrik's avatar
Martin Sustrik committed
33
namespace zmq
Martin Sustrik's avatar
Martin Sustrik committed
34
{
35
    class i_msg_sink;
36

Martin Sustrik's avatar
Martin Sustrik committed
37 38
    //  Helper base class for decoders that know the amount of data to read
    //  in advance at any moment. Knowing the amount in advance is a property
39 40 41 42 43
    //  of the protocol used. 0MQ framing protocol is based size-prefixed
    //  paradigm, whixh qualifies it to be parsed by this class.
    //  On the other hand, XML-based transports (like XMPP or SOAP) don't allow
    //  for knowing the size of data to read in advance and should use different
    //  decoding algorithms.
Martin Sustrik's avatar
Martin Sustrik committed
44
    //
45
    //  This class implements the state machine that parses the incoming buffer.
Martin Sustrik's avatar
Martin Sustrik committed
46 47
    //  Derived class should implement individual state machine actions.

48
    template <typename T> class decoder_base_t : public i_decoder
Martin Sustrik's avatar
Martin Sustrik committed
49 50 51
    {
    public:

52
        inline decoder_base_t (size_t bufsize_) :
Martin Hurton's avatar
Martin Hurton committed
53
            next (NULL),
54
            read_pos (NULL),
Martin Sustrik's avatar
Martin Sustrik committed
55
            to_read (0),
56 57 58
            bufsize (bufsize_)
        {
            buf = (unsigned char*) malloc (bufsize_);
59
            alloc_assert (buf);
60 61
        }

62 63
        //  The destructor doesn't have to be virtual. It is mad virtual
        //  just to keep ICC and code checking tools from complaining.
64
        inline virtual ~decoder_base_t ()
65 66 67 68 69 70
        {
            free (buf);
        }

        //  Returns a buffer to be filled with binary data.
        inline void get_buffer (unsigned char **data_, size_t *size_)
Martin Sustrik's avatar
Martin Sustrik committed
71
        {
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
            //  If we are expected to read large message, we'll opt for zero-
            //  copy, i.e. we'll ask caller to fill the data directly to the
            //  message. Note that subsequent read(s) are non-blocking, thus
            //  each single read reads at most SO_RCVBUF bytes at once not
            //  depending on how large is the chunk returned from here.
            //  As a consequence, large messages being received won't block
            //  other engines running in the same I/O thread for excessive
            //  amounts of time.
            if (to_read >= bufsize) {
                *data_ = read_pos;
                *size_ = to_read;
                return;
            }

            *data_ = buf;
            *size_ = bufsize;
Martin Sustrik's avatar
Martin Sustrik committed
88 89
        }

90 91 92 93 94
        //  Processes the data in the buffer previously allocated using
        //  get_buffer function. size_ argument specifies nemuber of bytes
        //  actually filled into the buffer. Function returns number of
        //  bytes actually processed.
        inline size_t process_buffer (unsigned char *data_, size_t size_)
Martin Sustrik's avatar
Martin Sustrik committed
95
        {
96 97 98 99
            //  Check if we had an error in previous attempt.
            if (unlikely (!(static_cast <T*> (this)->next)))
                return (size_t) -1;

100 101 102 103 104 105 106
            //  In case of zero-copy simply adjust the pointers, no copying
            //  is required. Also, run the state machine in case all the data
            //  were processed.
            if (data_ == read_pos) {
                read_pos += size_;
                to_read -= size_;

107 108 109 110
                while (!to_read) {
                    if (!(static_cast <T*> (this)->*next) ()) {
                        if (unlikely (!(static_cast <T*> (this)->next)))
                            return (size_t) -1;
111
                        return size_;
112 113
                    }
                }
114 115 116
                return size_;
            }

Martin Sustrik's avatar
Martin Sustrik committed
117 118
            size_t pos = 0;
            while (true) {
119 120 121

                //  Try to get more space in the message to fill in.
                //  If none is available, return.
122 123 124 125
                while (!to_read) {
                    if (!(static_cast <T*> (this)->*next) ()) {
                        if (unlikely (!(static_cast <T*> (this)->next)))
                            return (size_t) -1;
Martin Sustrik's avatar
Martin Sustrik committed
126
                        return pos;
127 128
                    }
                }
129 130

                //  If there are no more data in the buffer, return.
Martin Sustrik's avatar
Martin Sustrik committed
131 132
                if (pos == size_)
                    return pos;
133 134 135 136 137 138 139

                //  Copy the data from buffer to the message.
                size_t to_copy = std::min (to_read, size_ - pos);
                memcpy (read_pos, data_ + pos, to_copy);
                read_pos += to_copy;
                pos += to_copy;
                to_read -= to_copy;
Martin Sustrik's avatar
Martin Sustrik committed
140 141 142
            }
        }

Martin Hurton's avatar
Martin Hurton committed
143 144 145 146 147
        //  Returns true if the decoder has been fed all required data
        //  but cannot proceed with the next decoding step.
        //  False is returned if the decoder has encountered an error.
        bool stalled ()
        {
148
            //  Check whether there was decoding error.
Victor Perron's avatar
Victor Perron committed
149
            if (unlikely (!(static_cast <T*> (this)->next)))
150 151
                return false;

Martin Hurton's avatar
Martin Hurton committed
152 153 154 155 156 157 158 159 160 161 162
            while (!to_read) {
                if (!(static_cast <T*> (this)->*next) ()) {
                    if (unlikely (!(static_cast <T*> (this)->next)))
                        return false;
                    return true;
                }
            }

            return false;
        }

163
        inline bool message_ready_size (size_t /* msg_sz */)
Martin Hurton's avatar
Martin Hurton committed
164 165
        {
            zmq_assert (false);
166 167 168
            return false;
        }

Martin Sustrik's avatar
Martin Sustrik committed
169 170 171 172 173 174 175 176
    protected:

        //  Prototype of state machine action. Action should return false if
        //  it is unable to push the data to the system.
        typedef bool (T::*step_t) ();

        //  This function should be called from derived class to read data
        //  from the buffer and schedule next state machine action.
177
        inline void next_step (void *read_pos_, size_t to_read_,
Martin Sustrik's avatar
Martin Sustrik committed
178 179
            step_t next_)
        {
180
            read_pos = (unsigned char*) read_pos_;
Martin Sustrik's avatar
Martin Sustrik committed
181 182 183 184
            to_read = to_read_;
            next = next_;
        }

185 186 187 188 189 190 191
        //  This function should be called from the derived class to
        //  abort decoder state machine.
        inline void decoding_error ()
        {
            next = NULL;
        }

Martin Hurton's avatar
Martin Hurton committed
192 193
    private:

Martin Hurton's avatar
Martin Hurton committed
194 195 196 197 198
        //  Next step. If set to NULL, it means that associated data stream
        //  is dead. Note that there can be still data in the process in such
        //  case.
        step_t next;

199
        //  Where to store the read data.
200
        unsigned char *read_pos;
201 202

        //  How much data to read before taking next step.
Martin Sustrik's avatar
Martin Sustrik committed
203
        size_t to_read;
204 205

        //  The duffer for data to decode.
206 207 208
        size_t bufsize;
        unsigned char *buf;

209
        decoder_base_t (const decoder_base_t&);
210
        const decoder_base_t &operator = (const decoder_base_t&);
211
    };
Martin Sustrik's avatar
Martin Sustrik committed
212 213 214
}

#endif
215