Commit 2928c91a authored by Martin Hurton's avatar Martin Hurton

Implement ZAP and integrate it with PLAIN mechanism

parent 4e47084d
...@@ -46,6 +46,8 @@ namespace zmq ...@@ -46,6 +46,8 @@ namespace zmq
// This method is called by the session to signalise that there // This method is called by the session to signalise that there
// are messages to send available. // are messages to send available.
virtual void activate_out () = 0; virtual void activate_out () = 0;
virtual void zap_msg_available () = 0;
}; };
} }
......
...@@ -46,6 +46,9 @@ namespace zmq ...@@ -46,6 +46,9 @@ namespace zmq
// Process the handshake message received from the peer. // Process the handshake message received from the peer.
virtual int process_handshake_message (msg_t *msg_) = 0; virtual int process_handshake_message (msg_t *msg_) = 0;
// Notifies mechanism about availability of ZAP message.
virtual int zap_msg_available () { return 0; }
// True iff the handshake stage is complete? // True iff the handshake stage is complete?
virtual bool is_handshake_complete () const = 0; virtual bool is_handshake_complete () const = 0;
......
...@@ -59,6 +59,7 @@ namespace zmq ...@@ -59,6 +59,7 @@ namespace zmq
void terminate (); void terminate ();
void activate_in (); void activate_in ();
void activate_out (); void activate_out ();
void zap_msg_available () {}
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
......
...@@ -58,6 +58,7 @@ namespace zmq ...@@ -58,6 +58,7 @@ namespace zmq
void terminate (); void terminate ();
void activate_in (); void activate_in ();
void activate_out (); void activate_out ();
void zap_msg_available () {}
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
......
...@@ -26,12 +26,15 @@ ...@@ -26,12 +26,15 @@
#include <string> #include <string>
#include "msg.hpp" #include "msg.hpp"
#include "session_base.hpp"
#include "err.hpp" #include "err.hpp"
#include "plain_mechanism.hpp" #include "plain_mechanism.hpp"
#include "wire.hpp" #include "wire.hpp"
zmq::plain_mechanism_t::plain_mechanism_t (const options_t &options_) : zmq::plain_mechanism_t::plain_mechanism_t (session_base_t *session_,
const options_t &options_) :
mechanism_t (options_), mechanism_t (options_),
session (session_),
state (options.as_server? waiting_for_hello: sending_hello) state (options.as_server? waiting_for_hello: sending_hello)
{ {
} }
...@@ -79,8 +82,16 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_) ...@@ -79,8 +82,16 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_)
switch (state) { switch (state) {
case waiting_for_hello: case waiting_for_hello:
rc = process_hello_command (msg_); rc = process_hello_command (msg_);
if (rc == 0) if (rc == 0) {
state = sending_welcome; rc = receive_and_process_zap_reply ();
if (rc == 0)
state = sending_welcome;
else
if (errno == EAGAIN) {
rc = 0;
state = waiting_for_zap_reply;
}
}
break; break;
case waiting_for_welcome: case waiting_for_welcome:
rc = process_welcome_command (msg_); rc = process_welcome_command (msg_);
...@@ -107,7 +118,7 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_) ...@@ -107,7 +118,7 @@ int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_)
rc = msg_->init (); rc = msg_->init ();
errno_assert (rc == 0); errno_assert (rc == 0);
} }
return 0; return rc;
} }
bool zmq::plain_mechanism_t::is_handshake_complete () const bool zmq::plain_mechanism_t::is_handshake_complete () const
...@@ -115,6 +126,17 @@ bool zmq::plain_mechanism_t::is_handshake_complete () const ...@@ -115,6 +126,17 @@ bool zmq::plain_mechanism_t::is_handshake_complete () const
return state == ready; return state == ready;
} }
int zmq::plain_mechanism_t::zap_msg_available ()
{
if (state != waiting_for_zap_reply) {
errno = EFSM;
return -1;
}
const int rc = receive_and_process_zap_reply ();
if (rc == 0)
state = sending_welcome;
return rc;
}
int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const
{ {
...@@ -192,9 +214,66 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_) ...@@ -192,9 +214,66 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_)
errno = EPROTO; errno = EPROTO;
return -1; return -1;
} }
// TODO: Add user authentication // Use ZAP protocol (RFC 27) to authenticate user.
// Note: maybe use RFC 27 (ZAP) for this int rc = session->zap_connect ();
if (rc == -1) {
errno = EPROTO;
return -1;
}
msg_t msg;
// Address delimiter frame
rc = msg.init ();
errno_assert (rc == 0);
msg.set_flags (msg_t::more);
rc = session->write_zap_msg (&msg);
errno_assert (rc == 0);
// Version frame
rc = msg.init_size (3);
errno_assert (rc == 0);
memcpy (msg.data (), "1.0", 3);
msg.set_flags (msg_t::more);
rc = session->write_zap_msg (&msg);
errno_assert (rc == 0);
// Sequence frame
rc = msg.init_size (1);
errno_assert (rc == 0);
memcpy (msg.data (), "1", 1);
msg.set_flags (msg_t::more);
rc = session->write_zap_msg (&msg);
errno_assert (rc == 0);
// Domain frame
rc = msg.init ();
errno_assert (rc == 0);
msg.set_flags (msg_t::more);
rc = session->write_zap_msg (&msg);
errno_assert (rc == 0);
// Mechanism frame
rc = msg.init_size (5);
errno_assert (rc == 0);
memcpy (msg.data (), "PLAIN", 5);
msg.set_flags (msg_t::more);
rc = session->write_zap_msg (&msg);
errno_assert (rc == 0);
// Credentials frame
rc = msg.init_size (1 + username_length + 1 + password_length);
errno_assert (rc == 0);
char *data_ptr = static_cast <char *> (msg.data ());
*data_ptr++ = static_cast <unsigned char> (username_length);
memcpy (data_ptr, username.c_str (), username_length);
data_ptr += username_length;
*data_ptr++ = static_cast <unsigned char> (password_length);
memcpy (data_ptr, password.c_str (), password_length);
rc = session->write_zap_msg (&msg);
errno_assert (rc == 0);
return 0; return 0;
} }
...@@ -306,6 +385,65 @@ int zmq::plain_mechanism_t::process_ready_command (msg_t *msg_) ...@@ -306,6 +385,65 @@ int zmq::plain_mechanism_t::process_ready_command (msg_t *msg_)
return parse_property_list (ptr + 8, bytes_left - 8); return parse_property_list (ptr + 8, bytes_left - 8);
} }
int zmq::plain_mechanism_t::receive_and_process_zap_reply ()
{
int rc = 0;
msg_t msg [6];
for (int i = 0; i < 6; i++) {
rc = msg [i].init ();
errno_assert (rc == 0);
}
for (int i = 0; i < 6; i++) {
rc = session->read_zap_msg (&msg [i]);
if (rc == -1)
break;
if ((msg [i].flags () & msg_t::more) == (i < 5? 0: msg_t::more)) {
errno = EPROTO;
rc = -1;
break;
}
}
if (rc != 0)
goto error;
return 0;
// Address delimiter frame
if (msg [0].size () > 0) {
errno = EPROTO;
goto error;
}
// Version frame
if (msg [1].size () != 3 || memcmp (msg [1].data (), "1.0", 3)) {
errno = EPROTO;
goto error;
}
// Sequence number frame
if (msg [2].size () != 1 || memcmp (msg [2].data (), "1", 1)) {
errno = EPROTO;
goto error;
}
// Status code frame
if (msg [3].size () != 3 || memcmp (msg [3].data (), "200", 3)) {
errno = EACCES;
goto error;
}
error:
for (int i = 0; i < 6; i++) {
const int rc2 = msg [i].close ();
errno_assert (rc2 == 0);
}
return rc;
}
int zmq::plain_mechanism_t::parse_property_list (const unsigned char *ptr, int zmq::plain_mechanism_t::parse_property_list (const unsigned char *ptr,
size_t bytes_left) size_t bytes_left)
{ {
......
...@@ -27,17 +27,20 @@ namespace zmq ...@@ -27,17 +27,20 @@ namespace zmq
{ {
class msg_t; class msg_t;
class session_base_t;
class plain_mechanism_t : public mechanism_t class plain_mechanism_t : public mechanism_t
{ {
public: public:
plain_mechanism_t (const options_t &options_); plain_mechanism_t (session_base_t *session_,
const options_t &options_);
virtual ~plain_mechanism_t (); virtual ~plain_mechanism_t ();
// mechanism implementation // mechanism implementation
virtual int next_handshake_message (msg_t *msg_); virtual int next_handshake_message (msg_t *msg_);
virtual int process_handshake_message (msg_t *msg_); virtual int process_handshake_message (msg_t *msg_);
virtual int zap_msg_available ();
virtual bool is_handshake_complete () const; virtual bool is_handshake_complete () const;
private: private:
...@@ -51,9 +54,11 @@ namespace zmq ...@@ -51,9 +54,11 @@ namespace zmq
waiting_for_initiate, waiting_for_initiate,
sending_ready, sending_ready,
waiting_for_ready, waiting_for_ready,
waiting_for_zap_reply,
ready ready
}; };
session_base_t * const session;
state_t state; state_t state;
int hello_command (msg_t *msg_) const; int hello_command (msg_t *msg_) const;
...@@ -66,6 +71,8 @@ namespace zmq ...@@ -66,6 +71,8 @@ namespace zmq
int process_ready_command (msg_t *msg_); int process_ready_command (msg_t *msg_);
int process_initiate_command (msg_t *msg_); int process_initiate_command (msg_t *msg_);
int receive_and_process_zap_reply ();
int parse_property_list (const unsigned char *ptr, size_t length); int parse_property_list (const unsigned char *ptr, size_t length);
}; };
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "pgm_receiver.hpp" #include "pgm_receiver.hpp"
#include "address.hpp" #include "address.hpp"
#include "ctx.hpp"
#include "req.hpp" #include "req.hpp"
#include "dealer.hpp" #include "dealer.hpp"
#include "rep.hpp" #include "rep.hpp"
...@@ -105,6 +106,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -105,6 +106,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_object_t (io_thread_), io_object_t (io_thread_),
connect (connect_), connect (connect_),
pipe (NULL), pipe (NULL),
zap_pipe (NULL),
incomplete_in (false), incomplete_in (false),
pending (false), pending (false),
engine (NULL), engine (NULL),
...@@ -118,6 +120,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -118,6 +120,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
zmq::session_base_t::~session_base_t () zmq::session_base_t::~session_base_t ()
{ {
zmq_assert (!pipe); zmq_assert (!pipe);
zmq_assert (!zap_pipe);
// If there's still a pending linger timer, remove it. // If there's still a pending linger timer, remove it.
if (has_linger_timer) { if (has_linger_timer) {
...@@ -165,6 +168,39 @@ int zmq::session_base_t::push_msg (msg_t *msg_) ...@@ -165,6 +168,39 @@ int zmq::session_base_t::push_msg (msg_t *msg_)
return -1; return -1;
} }
int zmq::session_base_t::read_zap_msg (msg_t *msg_)
{
if (zap_pipe == NULL) {
errno = ENOTCONN;
return -1;
}
if (!zap_pipe->read (msg_)) {
errno = EAGAIN;
return -1;
}
return 0;
}
int zmq::session_base_t::write_zap_msg (msg_t *msg_)
{
if (zap_pipe == NULL) {
errno = ENOTCONN;
return -1;
}
const bool ok = zap_pipe->write (msg_);
zmq_assert (ok);
if ((msg_->flags () & msg_t::more) == 0)
zap_pipe->flush ();
const int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
void zmq::session_base_t::reset () void zmq::session_base_t::reset ()
{ {
} }
...@@ -200,11 +236,17 @@ void zmq::session_base_t::clean_pipes () ...@@ -200,11 +236,17 @@ void zmq::session_base_t::clean_pipes ()
void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
{ {
// Drop the reference to the deallocated pipe if required. // Drop the reference to the deallocated pipe if required.
zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1); zmq_assert (pipe_ == pipe
|| pipe_ == zap_pipe
|| terminating_pipes.count (pipe_) == 1);
if (pipe == pipe_) if (pipe_ == pipe)
// If this is our current pipe, remove it // If this is our current pipe, remove it
pipe = NULL; pipe = NULL;
else
if (pipe_ == zap_pipe) {
zap_pipe = NULL;
}
else else
// Remove the pipe from the detached pipes set // Remove the pipe from the detached pipes set
terminating_pipes.erase (pipe_); terminating_pipes.erase (pipe_);
...@@ -220,22 +262,27 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) ...@@ -220,22 +262,27 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
// If we are waiting for pending messages to be sent, at this point // If we are waiting for pending messages to be sent, at this point
// we are sure that there will be no more messages and we can proceed // we are sure that there will be no more messages and we can proceed
// with termination safely. // with termination safely.
if (pending && !pipe && terminating_pipes.empty ()) if (pending && !pipe && !zap_pipe && terminating_pipes.empty ())
proceed_with_term (); proceed_with_term ();
} }
void zmq::session_base_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::read_activated (pipe_t *pipe_)
{ {
// Skip activating if we're detaching this pipe // Skip activating if we're detaching this pipe
if (pipe != pipe_) { if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) {
zmq_assert (terminating_pipes.count (pipe_) == 1); zmq_assert (terminating_pipes.count (pipe_) == 1);
return; return;
} }
if (likely (engine != NULL)) if (unlikely (engine == NULL)) {
pipe->check_read ();
return;
}
if (likely (pipe_ == pipe))
engine->activate_out (); engine->activate_out ();
else else
pipe->check_read (); engine->zap_msg_available ();
} }
void zmq::session_base_t::write_activated (pipe_t *pipe_) void zmq::session_base_t::write_activated (pipe_t *pipe_)
...@@ -268,6 +315,50 @@ void zmq::session_base_t::process_plug () ...@@ -268,6 +315,50 @@ void zmq::session_base_t::process_plug ()
start_connecting (false); start_connecting (false);
} }
int zmq::session_base_t::zap_connect ()
{
zmq_assert (zap_pipe == NULL);
endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
if (peer.socket == NULL) {
errno = ECONNREFUSED;
return -1;
}
if (peer.options.type != ZMQ_REP
&& peer.options.type != ZMQ_ROUTER) {
errno = ECONNREFUSED;
return -1;
}
// Create a bi-directional pipe that will connect
// session with zap socket.
object_t *parents [2] = {this, peer.socket};
pipe_t *new_pipes [2] = {NULL, NULL};
int hwms [2] = {0, 0};
bool delays [2] = {false, false};
int rc = pipepair (parents, new_pipes, hwms, delays);
errno_assert (rc == 0);
// Attach local end of the pipe to this socket object.
zap_pipe = new_pipes [0];
zap_pipe->set_event_sink (this);
send_bind (peer.socket, new_pipes [1], false);
// Send empty identity if required by the peer.
if (peer.options.recv_identity) {
msg_t id;
rc = id.init ();
errno_assert (rc == 0);
id.set_flags (msg_t::identity);
bool ok = zap_pipe->write (&id);
zmq_assert (ok);
zap_pipe->flush ();
}
return 0;
}
void zmq::session_base_t::process_attach (i_engine *engine_) void zmq::session_base_t::process_attach (i_engine *engine_)
{ {
zmq_assert (engine_ != NULL); zmq_assert (engine_ != NULL);
...@@ -312,6 +403,9 @@ void zmq::session_base_t::detach () ...@@ -312,6 +403,9 @@ void zmq::session_base_t::detach ()
// Just in case there's only a delimiter in the pipe. // Just in case there's only a delimiter in the pipe.
if (pipe) if (pipe)
pipe->check_read (); pipe->check_read ();
if (zap_pipe)
zap_pipe->check_read ();
} }
void zmq::session_base_t::process_term (int linger_) void zmq::session_base_t::process_term (int linger_)
...@@ -321,30 +415,35 @@ void zmq::session_base_t::process_term (int linger_) ...@@ -321,30 +415,35 @@ void zmq::session_base_t::process_term (int linger_)
// If the termination of the pipe happens before the term command is // If the termination of the pipe happens before the term command is
// delivered there's nothing much to do. We can proceed with the // delivered there's nothing much to do. We can proceed with the
// stadard termination immediately. // stadard termination immediately.
if (!pipe) { if (!pipe && !zap_pipe) {
proceed_with_term (); proceed_with_term ();
return; return;
} }
pending = true; pending = true;
// If there's finite linger value, delay the termination. if (pipe != NULL) {
// If linger is infinite (negative) we don't even have to set // If there's finite linger value, delay the termination.
// the timer. // If linger is infinite (negative) we don't even have to set
if (linger_ > 0) { // the timer.
zmq_assert (!has_linger_timer); if (linger_ > 0) {
add_timer (linger_, linger_timer_id); zmq_assert (!has_linger_timer);
has_linger_timer = true; add_timer (linger_, linger_timer_id);
} has_linger_timer = true;
}
// Start pipe termination process. Delay the termination till all messages
// are processed in case the linger time is non-zero.
pipe->terminate (linger_ != 0);
// Start pipe termination process. Delay the termination till all messages // TODO: Should this go into pipe_t::terminate ?
// are processed in case the linger time is non-zero. // In case there's no engine and there's only delimiter in the
pipe->terminate (linger_ != 0); // pipe it wouldn't be ever read. Thus we check for it explicitly.
pipe->check_read ();
}
// TODO: Should this go into pipe_t::terminate ? if (zap_pipe != NULL)
// In case there's no engine and there's only delimiter in the zap_pipe->terminate (false);
// pipe it wouldn't be ever read. Thus we check for it explicitly.
pipe->check_read ();
} }
void zmq::session_base_t::proceed_with_term () void zmq::session_base_t::proceed_with_term ()
......
...@@ -67,11 +67,23 @@ namespace zmq ...@@ -67,11 +67,23 @@ namespace zmq
// The function takes ownership of the message. // The function takes ownership of the message.
int push_msg (msg_t *msg_); int push_msg (msg_t *msg_);
int zap_connect ();
// Fetches a message. Returns 0 if successful; -1 otherwise. // Fetches a message. Returns 0 if successful; -1 otherwise.
// The caller is responsible for freeing the message when no // The caller is responsible for freeing the message when no
// longer used. // longer used.
int pull_msg (msg_t *msg_); int pull_msg (msg_t *msg_);
// Receives message from ZAP socket.
// Returns 0 on success; -1 otherwise.
// The caller is responsible for freeing the message.
int read_zap_msg (msg_t *msg_);
// Sends message to ZAP socket.
// Returns 0 on success; -1 otherwise.
// The function takes ownership of the message.
int write_zap_msg (msg_t *msg_);
socket_base_t *get_socket (); socket_base_t *get_socket ();
protected: protected:
...@@ -109,6 +121,9 @@ namespace zmq ...@@ -109,6 +121,9 @@ namespace zmq
// Pipe connecting the session to its socket. // Pipe connecting the session to its socket.
zmq::pipe_t *pipe; zmq::pipe_t *pipe;
// Pipe used to exchange messages with ZAP socket.
zmq::pipe_t *zap_pipe;
// This set is added to with pipes we are disconnecting, but haven't yet completed // This set is added to with pipes we are disconnecting, but haven't yet completed
std::set <pipe_t *> terminating_pipes; std::set <pipe_t *> terminating_pipes;
......
...@@ -70,7 +70,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, ...@@ -70,7 +70,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
read_msg (&stream_engine_t::read_identity), read_msg (&stream_engine_t::read_identity),
write_msg (&stream_engine_t::write_identity), write_msg (&stream_engine_t::write_identity),
io_error (false), io_error (false),
congested (false),
subscription_required (false), subscription_required (false),
mechanism (NULL), mechanism (NULL),
input_paused (false), input_paused (false),
...@@ -222,7 +221,7 @@ void zmq::stream_engine_t::in_event () ...@@ -222,7 +221,7 @@ void zmq::stream_engine_t::in_event ()
zmq_assert (decoder); zmq_assert (decoder);
// If there has been an I/O error, stop polling. // If there has been an I/O error, stop polling.
if (congested) { if (input_paused) {
rm_fd (handle); rm_fd (handle);
io_error = true; io_error = true;
return; return;
...@@ -270,7 +269,7 @@ void zmq::stream_engine_t::in_event () ...@@ -270,7 +269,7 @@ void zmq::stream_engine_t::in_event ()
error (); error ();
return; return;
} }
congested = true; input_paused = true;
reset_pollin (handle); reset_pollin (handle);
} }
...@@ -309,6 +308,7 @@ void zmq::stream_engine_t::out_event () ...@@ -309,6 +308,7 @@ void zmq::stream_engine_t::out_event ()
// If there is no data to send, stop polling for output. // If there is no data to send, stop polling for output.
if (outsize == 0) { if (outsize == 0) {
output_paused = true;
reset_pollout (handle); reset_pollout (handle);
return; return;
} }
...@@ -350,7 +350,10 @@ void zmq::stream_engine_t::activate_out () ...@@ -350,7 +350,10 @@ void zmq::stream_engine_t::activate_out ()
if (unlikely (io_error)) if (unlikely (io_error))
return; return;
set_pollout (handle); if (likely (output_paused)) {
set_pollout (handle);
output_paused = false;
}
// Speculative write: The assumption is that at the moment new message // Speculative write: The assumption is that at the moment new message
// was sent by the user the socket is probably available for writing. // was sent by the user the socket is probably available for writing.
...@@ -361,7 +364,7 @@ void zmq::stream_engine_t::activate_out () ...@@ -361,7 +364,7 @@ void zmq::stream_engine_t::activate_out ()
void zmq::stream_engine_t::activate_in () void zmq::stream_engine_t::activate_in ()
{ {
zmq_assert (congested); zmq_assert (input_paused);
zmq_assert (session != NULL); zmq_assert (session != NULL);
zmq_assert (decoder != NULL); zmq_assert (decoder != NULL);
...@@ -393,7 +396,7 @@ void zmq::stream_engine_t::activate_in () ...@@ -393,7 +396,7 @@ void zmq::stream_engine_t::activate_in ()
if (rc == -1 || io_error) if (rc == -1 || io_error)
error (); error ();
else { else {
congested = false; input_paused = false;
set_pollin (handle); set_pollin (handle);
session->flush (); session->flush ();
...@@ -533,7 +536,7 @@ bool zmq::stream_engine_t::handshake () ...@@ -533,7 +536,7 @@ bool zmq::stream_engine_t::handshake ()
} }
else else
if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { if (memcmp (greeting_recv + 12, "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) {
mechanism = new (std::nothrow) plain_mechanism_t (options); mechanism = new (std::nothrow) plain_mechanism_t (session, options);
alloc_assert (mechanism); alloc_assert (mechanism);
} }
else { else {
...@@ -596,15 +599,8 @@ int zmq::stream_engine_t::next_handshake_message (msg_t *msg_) ...@@ -596,15 +599,8 @@ int zmq::stream_engine_t::next_handshake_message (msg_t *msg_)
if (rc == 0) { if (rc == 0) {
if (mechanism->is_handshake_complete ()) if (mechanism->is_handshake_complete ())
mechanism_ready (); mechanism_ready ();
if (input_paused) { if (input_paused)
activate_in (); activate_in ();
input_paused = false;
}
}
else
if (rc == -1) {
zmq_assert (errno == EAGAIN);
output_paused = true;
} }
return rc; return rc;
...@@ -618,18 +614,28 @@ int zmq::stream_engine_t::process_handshake_message (msg_t *msg_) ...@@ -618,18 +614,28 @@ int zmq::stream_engine_t::process_handshake_message (msg_t *msg_)
if (rc == 0) { if (rc == 0) {
if (mechanism->is_handshake_complete ()) if (mechanism->is_handshake_complete ())
mechanism_ready (); mechanism_ready ();
if (output_paused) { if (output_paused)
activate_out (); activate_out ();
output_paused = false;
}
} }
else
if (rc == -1 && errno == EAGAIN)
input_paused = true;
return rc; return rc;
} }
void zmq::stream_engine_t::zap_msg_available ()
{
zmq_assert (mechanism != NULL);
const int rc = mechanism->zap_msg_available ();
if (rc == -1) {
error ();
return;
}
if (input_paused)
activate_in ();
if (output_paused)
activate_out ();
}
void zmq::stream_engine_t::mechanism_ready () void zmq::stream_engine_t::mechanism_ready ()
{ {
if (options.recv_identity) { if (options.recv_identity) {
......
...@@ -62,6 +62,7 @@ namespace zmq ...@@ -62,6 +62,7 @@ namespace zmq
void terminate (); void terminate ();
void activate_in (); void activate_in ();
void activate_out (); void activate_out ();
void zap_msg_available ();
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
...@@ -168,10 +169,6 @@ namespace zmq ...@@ -168,10 +169,6 @@ namespace zmq
bool io_error; bool io_error;
// True iff the session could not accept more
// messages due to flow control.
bool congested;
// Indicates whether the engine is to inject a phony // Indicates whether the engine is to inject a phony
// subscription message into the incomming stream. // subscription message into the incomming stream.
// Needed to support old peers. // Needed to support old peers.
...@@ -179,7 +176,10 @@ namespace zmq ...@@ -179,7 +176,10 @@ namespace zmq
mechanism_t *mechanism; mechanism_t *mechanism;
// True iff the engine couldn't consume the last decoded message.
bool input_paused; bool input_paused;
// True iff the engine doesn't have any message to encode.
bool output_paused; bool output_paused;
// Socket // Socket
......
...@@ -17,8 +17,124 @@ ...@@ -17,8 +17,124 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <pthread.h>
#include <string.h>
#include "testutil.hpp" #include "testutil.hpp"
static bool
authenticate (const unsigned char *data, size_t data_length)
{
const char *username = "admin";
const size_t username_length = strlen (username);
const char *password = "password";
const size_t password_length = strlen (password);
if (data_length != 1 + username_length + 1 + password_length)
return false;
if (data [0] != username_length)
return false;
if (memcmp (data + 1, username, username_length))
return false;
if (data [1 + username_length] != password_length)
return false;
if (memcmp (data + 1 + username_length + 1, password, password_length))
return false;
return true;
}
static void *
zap_handler (void *zap)
{
int rc, more;
size_t optlen;
zmq_msg_t version, seqno, domain, mechanism, credentials;
zmq_msg_t status_code, status_text, user_id;
// Version
rc = zmq_msg_init (&version);
assert (rc == 0);
rc = zmq_msg_recv (&version, zap, 0);
assert (rc == 3 && memcmp (zmq_msg_data (&version), "1.0", 3) == 0);
optlen = sizeof more;
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
assert (rc == 0 && more == 1);
// Sequence number
rc = zmq_msg_init (&seqno);
assert (rc == 0);
rc = zmq_msg_recv (&seqno, zap, 0);
assert (rc != -1);
optlen = sizeof more;
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
assert (rc == 0 && more == 1);
// Domain
rc = zmq_msg_init (&domain);
assert (rc == 0);
rc = zmq_msg_recv (&domain, zap, 0);
assert (rc != -1);
optlen = sizeof more;
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
assert (rc == 0 && more == 1);
// Mechanism
rc = zmq_msg_init (&mechanism);
assert (rc == 0);
rc = zmq_msg_recv (&mechanism, zap, 0);
assert (rc == 5 && memcmp (zmq_msg_data (&mechanism), "PLAIN", 5) == 0);
optlen = sizeof more;
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
assert (rc == 0 && more == 1);
// Credentials
rc = zmq_msg_init (&credentials);
assert (rc == 0);
rc = zmq_msg_recv (&credentials, zap, 0);
optlen = sizeof more;
rc = zmq_getsockopt (zap, ZMQ_RCVMORE, &more, &optlen);
assert (rc == 0 && more == 0);
const bool auth_ok =
authenticate ((unsigned char *) zmq_msg_data (&credentials),
zmq_msg_size (&credentials));
rc = zmq_msg_send (&version, zap, ZMQ_SNDMORE);
assert (rc == 3);
rc = zmq_msg_send (&seqno, zap, ZMQ_SNDMORE);
assert (rc != -1);
rc = zmq_msg_init_size (&status_code, 3);
assert (rc == 0);
memcpy (zmq_msg_data (&status_code), auth_ok? "200": "400", 3);
rc = zmq_msg_send (&status_code, zap, ZMQ_SNDMORE);
assert (rc == 3);
rc = zmq_msg_init (&status_text);
assert (rc == 0);
rc = zmq_msg_send (&status_text, zap, ZMQ_SNDMORE);
assert (rc == 0);
rc = zmq_msg_init (&user_id);
assert (rc == 0);
rc = zmq_msg_send (&user_id, zap, 0);
assert (rc == 0);
rc = zmq_msg_close (&domain);
assert (rc == 0);
rc = zmq_msg_close (&mechanism);
assert (rc == 0);
rc = zmq_msg_close (&credentials);
assert (rc == 0);
rc = zmq_close (zap);
assert (rc == 0);
return NULL;
}
int main (void) int main (void)
{ {
void *ctx = zmq_ctx_new (); void *ctx = zmq_ctx_new ();
...@@ -122,6 +238,18 @@ int main (void) ...@@ -122,6 +238,18 @@ int main (void)
assert (rc == 0); assert (rc == 0);
assert (as_server == 1); assert (as_server == 1);
// Create and bind ZAP socket
void *zap = zmq_socket (ctx, ZMQ_REP);
assert (zap);
rc = zmq_bind (zap, "inproc://zeromq.zap.01");
assert (rc == 0);
// Spawn ZAP handler
pthread_t zap_thread;
rc = pthread_create (&zap_thread, NULL, &zap_handler, zap);
assert (rc == 0);
rc = zmq_bind (server, "tcp://*:9998"); rc = zmq_bind (server, "tcp://*:9998");
assert (rc == 0); assert (rc == 0);
rc = zmq_connect (client, "tcp://localhost:9998"); rc = zmq_connect (client, "tcp://localhost:9998");
...@@ -133,6 +261,9 @@ int main (void) ...@@ -133,6 +261,9 @@ int main (void)
assert (rc == 0); assert (rc == 0);
rc = zmq_close (server); rc = zmq_close (server);
assert (rc == 0); assert (rc == 0);
// Wait until ZAP handler terminates.
pthread_join (zap_thread, NULL);
// Check PLAIN security -- two servers trying to talk to each other // Check PLAIN security -- two servers trying to talk to each other
server = zmq_socket (ctx, ZMQ_DEALER); server = zmq_socket (ctx, ZMQ_DEALER);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment