Commit 36bfaaab authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #332 from hurtonm/fix_issue_264

Fix issue #264
parents 9098f4d6 776563fc
...@@ -51,6 +51,11 @@ void zmq::decoder_t::set_session (session_base_t *session_) ...@@ -51,6 +51,11 @@ void zmq::decoder_t::set_session (session_base_t *session_)
session = session_; session = session_;
} }
bool zmq::decoder_t::stalled () const
{
return next == &decoder_t::message_ready;
}
bool zmq::decoder_t::one_byte_size_ready () bool zmq::decoder_t::one_byte_size_ready ()
{ {
// First byte of size is read. If it is 0xff read 8-byte size. // First byte of size is read. If it is 0xff read 8-byte size.
......
...@@ -52,9 +52,9 @@ namespace zmq ...@@ -52,9 +52,9 @@ namespace zmq
public: public:
inline decoder_base_t (size_t bufsize_) : inline decoder_base_t (size_t bufsize_) :
next (NULL),
read_pos (NULL), read_pos (NULL),
to_read (0), to_read (0),
next (NULL),
bufsize (bufsize_) bufsize (bufsize_)
{ {
buf = (unsigned char*) malloc (bufsize_); buf = (unsigned char*) malloc (bufsize_);
...@@ -165,6 +165,11 @@ namespace zmq ...@@ -165,6 +165,11 @@ namespace zmq
next = NULL; next = NULL;
} }
// Next step. If set to NULL, it means that associated data stream
// is dead. Note that there can be still data in the process in such
// case.
step_t next;
private: private:
// Where to store the read data. // Where to store the read data.
...@@ -173,11 +178,6 @@ namespace zmq ...@@ -173,11 +178,6 @@ namespace zmq
// How much data to read before taking next step. // How much data to read before taking next step.
size_t to_read; size_t to_read;
// Next step. If set to NULL, it means that associated data stream
// is dead. Note that there can be still data in the process in such
// case.
step_t next;
// The duffer for data to decode. // The duffer for data to decode.
size_t bufsize; size_t bufsize;
unsigned char *buf; unsigned char *buf;
...@@ -197,6 +197,10 @@ namespace zmq ...@@ -197,6 +197,10 @@ namespace zmq
void set_session (zmq::session_base_t *session_); void set_session (zmq::session_base_t *session_);
// Returns true if there is a decoded message
// waiting to be delivered to the session.
bool stalled () const;
private: private:
bool one_byte_size_ready (); bool one_byte_size_ready ();
......
...@@ -47,6 +47,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : ...@@ -47,6 +47,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
inpos (NULL), inpos (NULL),
insize (0), insize (0),
decoder (in_batch_size, options_.maxmsgsize), decoder (in_batch_size, options_.maxmsgsize),
input_error (false),
outpos (NULL), outpos (NULL),
outsize (0), outsize (0),
encoder (out_batch_size), encoder (out_batch_size),
...@@ -55,7 +56,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : ...@@ -55,7 +56,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
options (options_), options (options_),
plugged (false) plugged (false)
{ {
// Get the socket into non-blocking mode. // Put the socket into non-blocking mode.
unblock_socket (s); unblock_socket (s);
// Set the socket buffer limits for the underlying socket. // Set the socket buffer limits for the underlying socket.
...@@ -202,8 +203,18 @@ void zmq::stream_engine_t::in_event () ...@@ -202,8 +203,18 @@ void zmq::stream_engine_t::in_event ()
session->flush (); session->flush ();
} }
if (session && disconnection) // Input error has occurred. If the last decoded
error (); // message has already been accepted, we terminate
// the engine immediately. Otherwise, we stop
// waiting for input events and postpone the termination
// until after the session has accepted the message.
if (session != NULL && disconnection) {
input_error = true;
if (decoder.stalled ())
reset_pollin (handle);
else
error ();
}
} }
void zmq::stream_engine_t::out_event () void zmq::stream_engine_t::out_event ()
...@@ -235,9 +246,11 @@ void zmq::stream_engine_t::out_event () ...@@ -235,9 +246,11 @@ void zmq::stream_engine_t::out_event ()
// written should be reasonably modest. // written should be reasonably modest.
int nbytes = write (outpos, outsize); int nbytes = write (outpos, outsize);
// Handle problems with the connection. // IO error has occurred. We stop waiting for output events.
// The engine is not terminated until we detect input error;
// this is necessary to prevent losing incomming messages.
if (nbytes == -1) { if (nbytes == -1) {
error (); reset_pollout (handle);
return; return;
} }
...@@ -258,6 +271,17 @@ void zmq::stream_engine_t::activate_out () ...@@ -258,6 +271,17 @@ void zmq::stream_engine_t::activate_out ()
void zmq::stream_engine_t::activate_in () void zmq::stream_engine_t::activate_in ()
{ {
if (input_error) {
// There was an input error but the engine could not
// be terminated (due to the stalled decoder).
// Flush the pending message and terminate the engine now.
decoder.process_buffer (inpos, 0);
zmq_assert (!decoder.stalled ());
session->flush ();
error ();
return;
}
set_pollin (handle); set_pollin (handle);
// Speculative read. // Speculative read.
......
...@@ -83,6 +83,7 @@ namespace zmq ...@@ -83,6 +83,7 @@ namespace zmq
unsigned char *inpos; unsigned char *inpos;
size_t insize; size_t insize;
decoder_t decoder; decoder_t decoder;
bool input_error;
unsigned char *outpos; unsigned char *outpos;
size_t outsize; size_t outsize;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment