Commit f5acbb50 authored by Martin Sustrik's avatar Martin Sustrik

naming cleanup: zmq_encoder->encoder, zmq_decoder->decoder

parent e45583c0
...@@ -120,15 +120,15 @@ libzmq_la_SOURCES = \ ...@@ -120,15 +120,15 @@ libzmq_la_SOURCES = \
ypipe.hpp \ ypipe.hpp \
yqueue.hpp \ yqueue.hpp \
zmq_connecter.hpp \ zmq_connecter.hpp \
zmq_decoder.hpp \
zmq_encoder.hpp \
zmq_engine.hpp \ zmq_engine.hpp \
zmq_init.hpp \ zmq_init.hpp \
zmq_listener.hpp \ zmq_listener.hpp \
command.cpp \ command.cpp \
ctx.cpp \ ctx.cpp \
connect_session.cpp \ connect_session.cpp \
decoder.cpp \
devpoll.cpp \ devpoll.cpp \
encoder.cpp \
epoll.cpp \ epoll.cpp \
err.cpp \ err.cpp \
forwarder.cpp \ forwarder.cpp \
...@@ -172,8 +172,6 @@ libzmq_la_SOURCES = \ ...@@ -172,8 +172,6 @@ libzmq_la_SOURCES = \
xreq.cpp \ xreq.cpp \
zmq.cpp \ zmq.cpp \
zmq_connecter.cpp \ zmq_connecter.cpp \
zmq_decoder.cpp \
zmq_encoder.cpp \
zmq_engine.cpp \ zmq_engine.cpp \
zmq_init.cpp \ zmq_init.cpp \
zmq_listener.cpp zmq_listener.cpp
......
...@@ -20,38 +20,38 @@ ...@@ -20,38 +20,38 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include "zmq_decoder.hpp" #include "decoder.hpp"
#include "i_inout.hpp" #include "i_inout.hpp"
#include "wire.hpp" #include "wire.hpp"
#include "err.hpp" #include "err.hpp"
zmq::zmq_decoder_t::zmq_decoder_t (size_t bufsize_) : zmq::decoder_t::decoder_t (size_t bufsize_) :
decoder_t <zmq_decoder_t> (bufsize_), decoder_base_t <decoder_t> (bufsize_),
destination (NULL) destination (NULL)
{ {
zmq_msg_init (&in_progress); zmq_msg_init (&in_progress);
// At the beginning, read one byte and go to one_byte_size_ready state. // At the beginning, read one byte and go to one_byte_size_ready state.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
} }
zmq::zmq_decoder_t::~zmq_decoder_t () zmq::decoder_t::~decoder_t ()
{ {
zmq_msg_close (&in_progress); zmq_msg_close (&in_progress);
} }
void zmq::zmq_decoder_t::set_inout (i_inout *destination_) void zmq::decoder_t::set_inout (i_inout *destination_)
{ {
destination = destination_; destination = destination_;
} }
bool zmq::zmq_decoder_t::one_byte_size_ready () bool zmq::decoder_t::one_byte_size_ready ()
{ {
// First byte of size is read. If it is 0xff read 8-byte size. // First byte of size is read. If it is 0xff read 8-byte size.
// Otherwise allocate the buffer for message data and read the // Otherwise allocate the buffer for message data and read the
// message data into it. // message data into it.
if (*tmpbuf == 0xff) if (*tmpbuf == 0xff)
next_step (tmpbuf, 8, &zmq_decoder_t::eight_byte_size_ready); next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready);
else { else {
// TODO: Handle over-sized message decently. // TODO: Handle over-sized message decently.
...@@ -64,12 +64,12 @@ bool zmq::zmq_decoder_t::one_byte_size_ready () ...@@ -64,12 +64,12 @@ bool zmq::zmq_decoder_t::one_byte_size_ready ()
// message and thus we can treat it as uninitialised... // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1); int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
errno_assert (rc == 0); errno_assert (rc == 0);
next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); next_step (tmpbuf, 1, &decoder_t::flags_ready);
} }
return true; return true;
} }
bool zmq::zmq_decoder_t::eight_byte_size_ready () bool zmq::decoder_t::eight_byte_size_ready ()
{ {
// 8-byte size is read. Allocate the buffer for message body and // 8-byte size is read. Allocate the buffer for message body and
// read the message data into it. // read the message data into it.
...@@ -86,29 +86,29 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready () ...@@ -86,29 +86,29 @@ bool zmq::zmq_decoder_t::eight_byte_size_ready ()
// message and thus we can treat it as uninitialised... // message and thus we can treat it as uninitialised...
int rc = zmq_msg_init_size (&in_progress, size - 1); int rc = zmq_msg_init_size (&in_progress, size - 1);
errno_assert (rc == 0); errno_assert (rc == 0);
next_step (tmpbuf, 1, &zmq_decoder_t::flags_ready); next_step (tmpbuf, 1, &decoder_t::flags_ready);
return true; return true;
} }
bool zmq::zmq_decoder_t::flags_ready () bool zmq::decoder_t::flags_ready ()
{ {
// Store the flags from the wire into the message structure. // Store the flags from the wire into the message structure.
in_progress.flags = tmpbuf [0]; in_progress.flags = tmpbuf [0];
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&zmq_decoder_t::message_ready); &decoder_t::message_ready);
return true; return true;
} }
bool zmq::zmq_decoder_t::message_ready () bool zmq::decoder_t::message_ready ()
{ {
// Message is completely read. Push it further and start reading // Message is completely read. Push it further and start reading
// new message. (in_progress is a 0-byte message after this point.) // new message. (in_progress is a 0-byte message after this point.)
if (!destination || !destination->write (&in_progress)) if (!destination || !destination->write (&in_progress))
return false; return false;
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready); next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
return true; return true;
} }
...@@ -27,25 +27,27 @@ ...@@ -27,25 +27,27 @@
#include "err.hpp" #include "err.hpp"
#include "../include/zmq.h"
namespace zmq namespace zmq
{ {
// Helper base class for decoders that know the amount of data to read // Helper base class for decoders that know the amount of data to read
// in advance at any moment. Knowing the amount in advance is a property // in advance at any moment. Knowing the amount in advance is a property
// of the protocol used. Both AMQP and backend protocol are based on // of the protocol used. 0MQ framing protocol is based size-prefixed
// size-prefixed paradigm, therefore they are using decoder_t to parse // paradigm, whixh qualifies it to be parsed by this class.
// the messages. On the other hand, XML-based transports (like XMPP or // On the other hand, XML-based transports (like XMPP or SOAP) don't allow
// SOAP) don't allow for knowing the size of data to read in advance and // for knowing the size of data to read in advance and should use different
// should use different decoding algorithms. // decoding algorithms.
// //
// Decoder implements the state machine that parses the incoming buffer. // This class implements the state machine that parses the incoming buffer.
// Derived class should implement individual state machine actions. // Derived class should implement individual state machine actions.
template <typename T> class decoder_t template <typename T> class decoder_base_t
{ {
public: public:
inline decoder_t (size_t bufsize_) : inline decoder_base_t (size_t bufsize_) :
read_pos (NULL), read_pos (NULL),
to_read (0), to_read (0),
next (NULL), next (NULL),
...@@ -57,7 +59,7 @@ namespace zmq ...@@ -57,7 +59,7 @@ namespace zmq
// The destructor doesn't have to be virtual. It is mad virtual // The destructor doesn't have to be virtual. It is mad virtual
// just to keep ICC and code checking tools from complaining. // just to keep ICC and code checking tools from complaining.
inline virtual ~decoder_t () inline virtual ~decoder_base_t ()
{ {
free (buf); free (buf);
} }
...@@ -149,6 +151,32 @@ namespace zmq ...@@ -149,6 +151,32 @@ namespace zmq
size_t bufsize; size_t bufsize;
unsigned char *buf; unsigned char *buf;
decoder_base_t (const decoder_base_t&);
void operator = (const decoder_base_t&);
};
// Decoder for 0MQ framing protocol. Converts data batches into messages.
class decoder_t : public decoder_base_t <decoder_t>
{
public:
decoder_t (size_t bufsize_);
~decoder_t ();
void set_inout (struct i_inout *destination_);
private:
bool one_byte_size_ready ();
bool eight_byte_size_ready ();
bool flags_ready ();
bool message_ready ();
struct i_inout *destination;
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
decoder_t (const decoder_t&); decoder_t (const decoder_t&);
void operator = (const decoder_t&); void operator = (const decoder_t&);
}; };
...@@ -156,3 +184,4 @@ namespace zmq ...@@ -156,3 +184,4 @@ namespace zmq
} }
#endif #endif
...@@ -17,39 +17,39 @@ ...@@ -17,39 +17,39 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "zmq_encoder.hpp" #include "encoder.hpp"
#include "i_inout.hpp" #include "i_inout.hpp"
#include "wire.hpp" #include "wire.hpp"
zmq::zmq_encoder_t::zmq_encoder_t (size_t bufsize_) : zmq::encoder_t::encoder_t (size_t bufsize_) :
encoder_t <zmq_encoder_t> (bufsize_), encoder_base_t <encoder_t> (bufsize_),
source (NULL) source (NULL)
{ {
zmq_msg_init (&in_progress); zmq_msg_init (&in_progress);
// Write 0 bytes to the batch and go to message_ready state. // Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &zmq_encoder_t::message_ready, true); next_step (NULL, 0, &encoder_t::message_ready, true);
} }
zmq::zmq_encoder_t::~zmq_encoder_t () zmq::encoder_t::~encoder_t ()
{ {
zmq_msg_close (&in_progress); zmq_msg_close (&in_progress);
} }
void zmq::zmq_encoder_t::set_inout (i_inout *source_) void zmq::encoder_t::set_inout (i_inout *source_)
{ {
source = source_; source = source_;
} }
bool zmq::zmq_encoder_t::size_ready () bool zmq::encoder_t::size_ready ()
{ {
// Write message body into the buffer. // Write message body into the buffer.
next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress), next_step (zmq_msg_data (&in_progress), zmq_msg_size (&in_progress),
&zmq_encoder_t::message_ready, false); &encoder_t::message_ready, false);
return true; return true;
} }
bool zmq::zmq_encoder_t::message_ready () bool zmq::encoder_t::message_ready ()
{ {
// Destroy content of the old message. // Destroy content of the old message.
zmq_msg_close (&in_progress); zmq_msg_close (&in_progress);
...@@ -75,14 +75,14 @@ bool zmq::zmq_encoder_t::message_ready () ...@@ -75,14 +75,14 @@ bool zmq::zmq_encoder_t::message_ready ()
if (size < 255) { if (size < 255) {
tmpbuf [0] = (unsigned char) size; tmpbuf [0] = (unsigned char) size;
tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED); tmpbuf [1] = (in_progress.flags & ~ZMQ_MSG_SHARED);
next_step (tmpbuf, 2, &zmq_encoder_t::size_ready, next_step (tmpbuf, 2, &encoder_t::size_ready,
!(in_progress.flags & ZMQ_MSG_MORE)); !(in_progress.flags & ZMQ_MSG_MORE));
} }
else { else {
tmpbuf [0] = 0xff; tmpbuf [0] = 0xff;
put_uint64 (tmpbuf + 1, size); put_uint64 (tmpbuf + 1, size);
tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED); tmpbuf [9] = (in_progress.flags & ~ZMQ_MSG_SHARED);
next_step (tmpbuf, 10, &zmq_encoder_t::size_ready, next_step (tmpbuf, 10, &encoder_t::size_ready,
!(in_progress.flags & ZMQ_MSG_MORE)); !(in_progress.flags & ZMQ_MSG_MORE));
} }
return true; return true;
......
...@@ -20,11 +20,6 @@ ...@@ -20,11 +20,6 @@
#ifndef __ZMQ_ENCODER_HPP_INCLUDED__ #ifndef __ZMQ_ENCODER_HPP_INCLUDED__
#define __ZMQ_ENCODER_HPP_INCLUDED__ #define __ZMQ_ENCODER_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#endif
#include <stddef.h> #include <stddef.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -32,6 +27,8 @@ ...@@ -32,6 +27,8 @@
#include "err.hpp" #include "err.hpp"
#include "../include/zmq.h"
namespace zmq namespace zmq
{ {
...@@ -39,11 +36,11 @@ namespace zmq ...@@ -39,11 +36,11 @@ namespace zmq
// fills the outgoing buffer. Derived classes should implement individual // fills the outgoing buffer. Derived classes should implement individual
// state machine actions. // state machine actions.
template <typename T> class encoder_t template <typename T> class encoder_base_t
{ {
public: public:
inline encoder_t (size_t bufsize_) : inline encoder_base_t (size_t bufsize_) :
bufsize (bufsize_) bufsize (bufsize_)
{ {
buf = (unsigned char*) malloc (bufsize_); buf = (unsigned char*) malloc (bufsize_);
...@@ -52,7 +49,7 @@ namespace zmq ...@@ -52,7 +49,7 @@ namespace zmq
// The destructor doesn't have to be virtual. It is mad virtual // The destructor doesn't have to be virtual. It is mad virtual
// just to keep ICC and code checking tools from complaining. // just to keep ICC and code checking tools from complaining.
inline virtual ~encoder_t () inline virtual ~encoder_base_t ()
{ {
free (buf); free (buf);
} }
...@@ -153,10 +150,34 @@ namespace zmq ...@@ -153,10 +150,34 @@ namespace zmq
size_t bufsize; size_t bufsize;
unsigned char *buf; unsigned char *buf;
encoder_base_t (const encoder_base_t&);
void operator = (const encoder_base_t&);
};
// Encoder for 0MQ framing protocol. Converts messages into data batches.
class encoder_t : public encoder_base_t <encoder_t>
{
public:
encoder_t (size_t bufsize_);
~encoder_t ();
void set_inout (struct i_inout *source_);
private:
bool size_ready ();
bool message_ready ();
struct i_inout *source;
::zmq_msg_t in_progress;
unsigned char tmpbuf [10];
encoder_t (const encoder_t&); encoder_t (const encoder_t&);
void operator = (const encoder_t&); void operator = (const encoder_t&);
}; };
} }
#endif #endif
...@@ -195,7 +195,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -195,7 +195,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true; it->second.joined = true;
// Create and connect decoder for the peer. // Create and connect decoder for the peer.
it->second.decoder = new (std::nothrow) zmq_decoder_t (0); it->second.decoder = new (std::nothrow) decoder_t (0);
it->second.decoder->set_inout (inout); it->second.decoder->set_inout (inout);
} }
......
...@@ -34,7 +34,7 @@ ...@@ -34,7 +34,7 @@
#include "io_object.hpp" #include "io_object.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "options.hpp" #include "options.hpp"
#include "zmq_decoder.hpp" #include "decoder.hpp"
#include "pgm_socket.hpp" #include "pgm_socket.hpp"
namespace zmq namespace zmq
...@@ -68,7 +68,7 @@ namespace zmq ...@@ -68,7 +68,7 @@ namespace zmq
struct peer_info_t struct peer_info_t
{ {
bool joined; bool joined;
zmq_decoder_t *decoder; decoder_t *decoder;
}; };
struct tsi_comp struct tsi_comp
...@@ -98,7 +98,7 @@ namespace zmq ...@@ -98,7 +98,7 @@ namespace zmq
i_inout *inout; i_inout *inout;
// Most recently used decoder. // Most recently used decoder.
zmq_decoder_t *mru_decoder; decoder_t *mru_decoder;
// Number of bytes not consumed by the decoder due to pipe overflow. // Number of bytes not consumed by the decoder due to pipe overflow.
size_t pending_bytes; size_t pending_bytes;
......
...@@ -33,7 +33,7 @@ ...@@ -33,7 +33,7 @@
#include "i_engine.hpp" #include "i_engine.hpp"
#include "options.hpp" #include "options.hpp"
#include "pgm_socket.hpp" #include "pgm_socket.hpp"
#include "zmq_encoder.hpp" #include "encoder.hpp"
namespace zmq namespace zmq
{ {
...@@ -62,7 +62,7 @@ namespace zmq ...@@ -62,7 +62,7 @@ namespace zmq
private: private:
// Message encoder. // Message encoder.
zmq_encoder_t encoder; encoder_t encoder;
// PGM socket. // PGM socket.
pgm_socket_t pgm_socket; pgm_socket_t pgm_socket;
......
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
#define __ZMQ_ZMQ_DECODER_HPP_INCLUDED__
#include "../include/zmq.h"
#include "decoder.hpp"
#include "blob.hpp"
namespace zmq
{
// Decoder for 0MQ backend protocol. Converts data batches into messages.
class zmq_decoder_t : public decoder_t <zmq_decoder_t>
{
public:
zmq_decoder_t (size_t bufsize_);
~zmq_decoder_t ();
void set_inout (struct i_inout *destination_);
private:
bool one_byte_size_ready ();
bool eight_byte_size_ready ();
bool flags_ready ();
bool message_ready ();
struct i_inout *destination;
unsigned char tmpbuf [8];
::zmq_msg_t in_progress;
zmq_decoder_t (const zmq_decoder_t&);
void operator = (const zmq_decoder_t&);
};
}
#endif
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__
#define __ZMQ_ZMQ_ENCODER_HPP_INCLUDED__
#include "../include/zmq.h"
#include "encoder.hpp"
namespace zmq
{
// Encoder for 0MQ backend protocol. Converts messages into data batches.
class zmq_encoder_t : public encoder_t <zmq_encoder_t>
{
public:
zmq_encoder_t (size_t bufsize_);
~zmq_encoder_t ();
void set_inout (struct i_inout *source_);
private:
bool size_ready ();
bool message_ready ();
struct i_inout *source;
::zmq_msg_t in_progress;
unsigned char tmpbuf [10];
zmq_encoder_t (const zmq_encoder_t&);
void operator = (const zmq_encoder_t&);
};
}
#endif
...@@ -27,8 +27,8 @@ ...@@ -27,8 +27,8 @@
#include "i_engine.hpp" #include "i_engine.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_socket.hpp" #include "tcp_socket.hpp"
#include "zmq_encoder.hpp" #include "encoder.hpp"
#include "zmq_decoder.hpp" #include "decoder.hpp"
#include "options.hpp" #include "options.hpp"
namespace zmq namespace zmq
...@@ -62,11 +62,11 @@ namespace zmq ...@@ -62,11 +62,11 @@ namespace zmq
unsigned char *inpos; unsigned char *inpos;
size_t insize; size_t insize;
zmq_decoder_t decoder; decoder_t decoder;
unsigned char *outpos; unsigned char *outpos;
size_t outsize; size_t outsize;
zmq_encoder_t encoder; encoder_t encoder;
i_inout *inout; i_inout *inout;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment