encoder.hpp 5.62 KB
Newer Older
Martin Sustrik's avatar
Martin Sustrik committed
1
/*
2
    Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
Martin Sustrik's avatar
Martin Sustrik committed
3 4 5 6

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
7
    the terms of the GNU Lesser General Public License as published by
Martin Sustrik's avatar
Martin Sustrik committed
8 9 10 11 12 13
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
    GNU Lesser General Public License for more details.
Martin Sustrik's avatar
Martin Sustrik committed
15

16
    You should have received a copy of the GNU Lesser General Public License
Martin Sustrik's avatar
Martin Sustrik committed
17 18 19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

Martin Sustrik's avatar
Martin Sustrik committed
20 21
#ifndef __ZMQ_ENCODER_HPP_INCLUDED__
#define __ZMQ_ENCODER_HPP_INCLUDED__
Martin Sustrik's avatar
Martin Sustrik committed
22

23 24 25 26 27 28
#if defined(_MSC_VER)
#ifndef NOMINMAX
#define NOMINMAX
#endif
#endif

Martin Sustrik's avatar
Martin Sustrik committed
29 30
#include <stddef.h>
#include <string.h>
31
#include <stdlib.h>
Martin Sustrik's avatar
Martin Sustrik committed
32 33
#include <algorithm>

34
#include "err.hpp"
35
#include "msg.hpp"
36
#include "i_encoder.hpp"
37

Martin Sustrik's avatar
Martin Sustrik committed
38
namespace zmq
Martin Sustrik's avatar
Martin Sustrik committed
39 40 41 42 43 44
{

    //  Helper base class for encoders. It implements the state machine that
    //  fills the outgoing buffer. Derived classes should implement individual
    //  state machine actions.

45
    template <typename T> class encoder_base_t : public i_encoder
Martin Sustrik's avatar
Martin Sustrik committed
46 47 48
    {
    public:

49
        inline encoder_base_t (size_t bufsize_) :
50 51
            bufsize (bufsize_),
            in_progress (NULL)
Martin Sustrik's avatar
Martin Sustrik committed
52
        {
53
            buf = (unsigned char*) malloc (bufsize_);
54
            alloc_assert (buf);
Martin Sustrik's avatar
Martin Sustrik committed
55 56
        }

57
        //  The destructor doesn't have to be virtual. It is made virtual
58
        //  just to keep ICC and code checking tools from complaining.
59
        inline virtual ~encoder_base_t ()
60 61 62 63
        {
            free (buf);
        }

Martin Sustrik's avatar
Martin Sustrik committed
64 65 66
        //  The function returns a batch of binary data. The data
        //  are filled to a supplied buffer. If no buffer is supplied (data_
        //  points to NULL) decoder object will provide buffer of its own.
67
        inline size_t encode (unsigned char **data_, size_t size_)
Martin Sustrik's avatar
Martin Sustrik committed
68
        {
Martin Sustrik's avatar
Martin Sustrik committed
69
            unsigned char *buffer = !*data_ ? buf : *data_;
70
            size_t buffersize = !*data_ ? bufsize : size_;
Martin Sustrik's avatar
Martin Sustrik committed
71

72 73
            if (in_progress == NULL)
                return 0;
74

Martin Hurton's avatar
Martin Hurton committed
75 76
            size_t pos = 0;
            while (pos < buffersize) {
77 78 79 80 81

                //  If there are no more data to return, run the state machine.
                //  If there are still no data, return what we already have
                //  in the buffer.
                if (!to_write) {
82 83 84 85 86 87
                    if (new_msg_flag) {
                        int rc = in_progress->close ();
                        errno_assert (rc == 0);
                        rc = in_progress->init ();
                        errno_assert (rc == 0);
                        in_progress = NULL;
Martin Hurton's avatar
Martin Hurton committed
88
                        break;
89 90
                    }
                    (static_cast <T*> (this)->*next) ();
91
                }
92

93 94 95 96 97 98 99
                //  If there are no data in the buffer yet and we are able to
                //  fill whole buffer in a single go, let's use zero-copy.
                //  There's no disadvantage to it as we cannot stuck multiple
                //  messages into the buffer anyway. Note that subsequent
                //  write(s) are non-blocking, thus each single write writes
                //  at most SO_SNDBUF bytes at once not depending on how large
                //  is the chunk returned from here.
100 101 102
                //  As a consequence, large messages being sent won't block
                //  other engines running in the same I/O thread for excessive
                //  amounts of time.
Martin Sustrik's avatar
Martin Sustrik committed
103
                if (!pos && !*data_ && to_write >= buffersize) {
104
                    *data_ = write_pos;
105
                    pos = to_write;
106
                    write_pos = NULL;
107
                    to_write = 0;
108
                    return pos;
109 110
                }

111
                //  Copy data to the buffer. If the buffer is full, return.
Martin Sustrik's avatar
Martin Sustrik committed
112 113
                size_t to_copy = std::min (to_write, buffersize - pos);
                memcpy (buffer + pos, write_pos, to_copy);
114 115 116
                pos += to_copy;
                write_pos += to_copy;
                to_write -= to_copy;
Martin Sustrik's avatar
Martin Sustrik committed
117
            }
Martin Hurton's avatar
Martin Hurton committed
118 119

            *data_ = buffer;
120 121 122 123 124 125 126 127
            return pos;
        }

        void load_msg (msg_t *msg_)
        {
            zmq_assert (in_progress == NULL);
            in_progress = msg_;
            (static_cast <T*> (this)->*next) ();
Martin Sustrik's avatar
Martin Sustrik committed
128
        }
129

130 131 132 133 134
        inline bool has_data ()
        {
            return to_write > 0;
        }

Martin Sustrik's avatar
Martin Sustrik committed
135 136 137
    protected:

        //  Prototype of state machine action.
138
        typedef void (T::*step_t) ();
Martin Sustrik's avatar
Martin Sustrik committed
139 140

        //  This function should be called from derived class to write the data
141
        //  to the buffer and schedule next state machine action.
Martin Sustrik's avatar
Martin Sustrik committed
142
        inline void next_step (void *write_pos_, size_t to_write_,
143
            step_t next_, bool new_msg_flag_)
Martin Sustrik's avatar
Martin Sustrik committed
144 145 146 147
        {
            write_pos = (unsigned char*) write_pos_;
            to_write = to_write_;
            next = next_;
148
            new_msg_flag = new_msg_flag_;
Martin Sustrik's avatar
Martin Sustrik committed
149 150 151 152
        }

    private:

153
        //  Where to get the data to write from.
Martin Sustrik's avatar
Martin Sustrik committed
154
        unsigned char *write_pos;
155 156

        //  How much data to write before next step should be executed.
Martin Sustrik's avatar
Martin Sustrik committed
157
        size_t to_write;
158 159 160

        //  Next step. If set to NULL, it means that associated data stream
        //  is dead.
Martin Sustrik's avatar
Martin Sustrik committed
161
        step_t next;
162

163
        bool new_msg_flag;
Martin Sustrik's avatar
Martin Sustrik committed
164

165
        //  The buffer for encoded data.
166 167 168
        size_t bufsize;
        unsigned char *buf;

169 170
        encoder_base_t (const encoder_base_t&);
        void operator = (const encoder_base_t&);
171 172 173 174 175

    protected:

        msg_t *in_progress;

176
    };
Martin Sustrik's avatar
Martin Sustrik committed
177 178 179
}

#endif
180