Commit 7d3fa3af authored by Martin Hurton's avatar Martin Hurton

Tell the session why the engine has stopped

parent adddda17
...@@ -366,7 +366,8 @@ void zmq::session_base_t::process_attach (i_engine *engine_) ...@@ -366,7 +366,8 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
engine->plug (io_thread, this); engine->plug (io_thread, this);
} }
void zmq::session_base_t::engine_error () void zmq::session_base_t::engine_error (
zmq::stream_engine_t::error_reason_t reason)
{ {
// Engine is dead. Let's forget about it. // Engine is dead. Let's forget about it.
engine = NULL; engine = NULL;
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "io_object.hpp" #include "io_object.hpp"
#include "pipe.hpp" #include "pipe.hpp"
#include "socket_base.hpp" #include "socket_base.hpp"
#include "stream_engine.hpp"
namespace zmq namespace zmq
{ {
...@@ -55,7 +56,7 @@ namespace zmq ...@@ -55,7 +56,7 @@ namespace zmq
// Following functions are the interface exposed towards the engine. // Following functions are the interface exposed towards the engine.
virtual void reset (); virtual void reset ();
void flush (); void flush ();
void engine_error (); void engine_error (zmq::stream_engine_t::error_reason_t reason);
// i_pipe_events interface implementation. // i_pipe_events interface implementation.
void read_activated (zmq::pipe_t *pipe_); void read_activated (zmq::pipe_t *pipe_);
......
...@@ -274,12 +274,12 @@ void zmq::stream_engine_t::in_event () ...@@ -274,12 +274,12 @@ void zmq::stream_engine_t::in_event ()
int const rc = read (inpos, bufsize); int const rc = read (inpos, bufsize);
if (rc == 0) { if (rc == 0) {
error (); error (connection_error);
return; return;
} }
if (rc == -1) { if (rc == -1) {
if (errno != EAGAIN) if (errno != EAGAIN)
error (); error (connection_error);
return; return;
} }
...@@ -306,7 +306,7 @@ void zmq::stream_engine_t::in_event () ...@@ -306,7 +306,7 @@ void zmq::stream_engine_t::in_event ()
// or the session has rejected the message. // or the session has rejected the message.
if (rc == -1) { if (rc == -1) {
if (errno != EAGAIN) { if (errno != EAGAIN) {
error (); error (connection_error);
return; return;
} }
input_stopped = true; input_stopped = true;
...@@ -407,7 +407,7 @@ void zmq::stream_engine_t::restart_input () ...@@ -407,7 +407,7 @@ void zmq::stream_engine_t::restart_input ()
if (errno == EAGAIN) if (errno == EAGAIN)
session->flush (); session->flush ();
else else
error (); error (protocol_error);
return; return;
} }
...@@ -427,8 +427,11 @@ void zmq::stream_engine_t::restart_input () ...@@ -427,8 +427,11 @@ void zmq::stream_engine_t::restart_input ()
if (rc == -1 && errno == EAGAIN) if (rc == -1 && errno == EAGAIN)
session->flush (); session->flush ();
else else
if (rc == -1 || io_error) if (io_error)
error (); error (connection_error);
else
if (rc == -1)
error (protocol_error);
else { else {
input_stopped = false; input_stopped = false;
set_pollin (handle); set_pollin (handle);
...@@ -448,12 +451,12 @@ bool zmq::stream_engine_t::handshake () ...@@ -448,12 +451,12 @@ bool zmq::stream_engine_t::handshake ()
const int n = read (greeting_recv + greeting_bytes_read, const int n = read (greeting_recv + greeting_bytes_read,
greeting_size - greeting_bytes_read); greeting_size - greeting_bytes_read);
if (n == 0) { if (n == 0) {
error (); error (connection_error);
return false; return false;
} }
if (n == -1) { if (n == -1) {
if (errno != EAGAIN) if (errno != EAGAIN)
error (); error (connection_error);
return false; return false;
} }
...@@ -631,7 +634,7 @@ bool zmq::stream_engine_t::handshake () ...@@ -631,7 +634,7 @@ bool zmq::stream_engine_t::handshake ()
} }
#endif #endif
else { else {
error (); error (protocol_error);
return false; return false;
} }
next_msg = &stream_engine_t::next_handshake_command; next_msg = &stream_engine_t::next_handshake_command;
...@@ -732,7 +735,7 @@ void zmq::stream_engine_t::zap_msg_available () ...@@ -732,7 +735,7 @@ void zmq::stream_engine_t::zap_msg_available ()
const int rc = mechanism->zap_msg_available (); const int rc = mechanism->zap_msg_available ();
if (rc == -1) { if (rc == -1) {
error (); error (protocol_error);
return; return;
} }
if (input_stopped) if (input_stopped)
...@@ -871,7 +874,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) ...@@ -871,7 +874,7 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_)
return push_msg_to_session (msg_); return push_msg_to_session (msg_);
} }
void zmq::stream_engine_t::error () void zmq::stream_engine_t::error (error_reason_t reason)
{ {
if (options.raw_sock) { if (options.raw_sock) {
// For raw sockets, send a final 0-length message to the application // For raw sockets, send a final 0-length message to the application
...@@ -884,7 +887,7 @@ void zmq::stream_engine_t::error () ...@@ -884,7 +887,7 @@ void zmq::stream_engine_t::error ()
zmq_assert (session); zmq_assert (session);
socket->event_disconnected (endpoint, s); socket->event_disconnected (endpoint, s);
session->flush (); session->flush ();
session->engine_error (); session->engine_error (reason);
unplug (); unplug ();
delete this; delete this;
} }
...@@ -1006,5 +1009,5 @@ void zmq::stream_engine_t::timer_event (int id_) ...@@ -1006,5 +1009,5 @@ void zmq::stream_engine_t::timer_event (int id_)
has_handshake_timer = false; has_handshake_timer = false;
// handshake timer expired before handshake completed, so engine fails // handshake timer expired before handshake completed, so engine fails
error (); error (timeout_error);
} }
...@@ -53,6 +53,12 @@ namespace zmq ...@@ -53,6 +53,12 @@ namespace zmq
{ {
public: public:
enum error_reason_t {
protocol_error,
connection_error,
timeout_error
};
stream_engine_t (fd_t fd_, const options_t &options_, stream_engine_t (fd_t fd_, const options_t &options_,
const std::string &endpoint); const std::string &endpoint);
~stream_engine_t (); ~stream_engine_t ();
...@@ -78,7 +84,7 @@ namespace zmq ...@@ -78,7 +84,7 @@ namespace zmq
void unplug (); void unplug ();
// Function to handle network disconnections. // Function to handle network disconnections.
void error (); void error (error_reason_t reason);
// Receives the greeting message from the peer. // Receives the greeting message from the peer.
int receive_greeting (); int receive_greeting ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment