Commit 9fbdcc79 authored by malosek's avatar malosek

removed reset method from zmq_decoder_t

parent 969522bb
......@@ -46,6 +46,7 @@
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_, const char *session_name_) :
io_object_t (parent_),
decoder (NULL),
pgm_socket (true, options_),
options (options_),
session_name (session_name_),
......@@ -56,10 +57,15 @@ zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
zmq::pgm_receiver_t::~pgm_receiver_t ()
{
if (decoder)
delete decoder;
}
int zmq::pgm_receiver_t::init (const char *network_)
{
decoder = new zmq_decoder_t;
zmq_assert (decoder);
return pgm_socket.init (network_);
}
......@@ -69,7 +75,7 @@ void zmq::pgm_receiver_t::plug (i_inout *inout_)
int socket_fd;
int waiting_pipe_fd;
decoder.set_inout (inout_);
decoder->set_inout (inout_);
// Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
......@@ -91,7 +97,7 @@ void zmq::pgm_receiver_t::unplug ()
{
rm_fd (socket_handle);
rm_fd (pipe_handle);
decoder.set_inout (NULL);
decoder->set_inout (NULL);
inout = NULL;
}
......@@ -105,9 +111,14 @@ void zmq::pgm_receiver_t::reconnect ()
// Save inout ptr.
i_inout *inout_tmp = inout;
// PGM receiver is not joined anymore.
joined = false;
// Unplug - plug PGM transport.
unplug ();
decoder.reset ();
delete decoder;
decoder = new zmq_decoder_t;
zmq_assert (decoder);
plug (inout_tmp);
}
......@@ -121,7 +132,7 @@ void zmq::pgm_receiver_t::in_event ()
while ((nbytes = receive_with_offset (&data_with_offset)) > 0) {
// Push all the data to the decoder.
decoder.write ((unsigned char*)data_with_offset, nbytes);
decoder->write ((unsigned char*)data_with_offset, nbytes);
}
// Flush any messages decoder may have produced to the dispatcher.
......@@ -130,12 +141,6 @@ void zmq::pgm_receiver_t::in_event ()
// Data loss detected.
if (nbytes == -1) {
// Throw message in progress from decoder
decoder.reset ();
// PGM receive is not joined anymore.
joined = false;
// Recreate PGM transport.
reconnect ();
}
......
......@@ -64,7 +64,7 @@ namespace zmq
ssize_t receive_with_offset (void **data_);
// Message decoder.
zmq_decoder_t decoder;
zmq_decoder_t *decoder;
// PGM socket.
pgm_socket_t pgm_socket;
......
......@@ -36,16 +36,6 @@ zmq::zmq_decoder_t::~zmq_decoder_t ()
zmq_msg_close (&in_progress);
}
void zmq::zmq_decoder_t::reset ()
{
// Free and reinit message buffer.
zmq_msg_close (&in_progress);
zmq_msg_init (&in_progress);
// Restart the FSM.
next_step (tmpbuf, 1, &zmq_decoder_t::one_byte_size_ready);
}
void zmq::zmq_decoder_t::set_inout (i_inout *destination_)
{
destination = destination_;
......
......@@ -37,8 +37,6 @@ namespace zmq
void set_inout (struct i_inout *destination_);
// Clears any partially decoded messages.
void reset ();
private:
bool one_byte_size_ready ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment