Commit 7865f96e authored by Martin Hurton's avatar Martin Hurton

Don't pass flags to xsend method

parent 793895c4
...@@ -49,9 +49,9 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) ...@@ -49,9 +49,9 @@ void zmq::dealer_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
lb.attach (pipe_); lb.attach (pipe_);
} }
int zmq::dealer_t::xsend (msg_t *msg_, int flags_) int zmq::dealer_t::xsend (msg_t *msg_)
{ {
return lb.send (msg_, flags_); return lb.send (msg_);
} }
int zmq::dealer_t::xrecv (msg_t *msg_, int flags_) int zmq::dealer_t::xrecv (msg_t *msg_, int flags_)
......
...@@ -47,7 +47,7 @@ namespace zmq ...@@ -47,7 +47,7 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsend (zmq::msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -103,19 +103,19 @@ void zmq::dist_t::activated (pipe_t *pipe_) ...@@ -103,19 +103,19 @@ void zmq::dist_t::activated (pipe_t *pipe_)
} }
} }
int zmq::dist_t::send_to_all (msg_t *msg_, int flags_) int zmq::dist_t::send_to_all (msg_t *msg_)
{ {
matching = active; matching = active;
return send_to_matching (msg_, flags_); return send_to_matching (msg_);
} }
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) int zmq::dist_t::send_to_matching (msg_t *msg_)
{ {
// Is this end of a multipart message? // Is this end of a multipart message?
bool msg_more = msg_->flags () & msg_t::more ? true : false; bool msg_more = msg_->flags () & msg_t::more ? true : false;
// Push the message to matching pipes. // Push the message to matching pipes.
distribute (msg_, flags_); distribute (msg_);
// If mutlipart message is fully sent, activate all the eligible pipes. // If mutlipart message is fully sent, activate all the eligible pipes.
if (!msg_more) if (!msg_more)
...@@ -126,11 +126,8 @@ int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) ...@@ -126,11 +126,8 @@ int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
return 0; return 0;
} }
void zmq::dist_t::distribute (msg_t *msg_, int flags_) void zmq::dist_t::distribute (msg_t *msg_)
{ {
// flags_ is unused
(void)flags_;
// If there are no matching pipes available, simply drop the message. // If there are no matching pipes available, simply drop the message.
if (matching == 0) { if (matching == 0) {
int rc = msg_->close (); int rc = msg_->close ();
......
...@@ -58,10 +58,10 @@ namespace zmq ...@@ -58,10 +58,10 @@ namespace zmq
void terminated (zmq::pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_);
// Send the message to the matching outbound pipes. // Send the message to the matching outbound pipes.
int send_to_matching (zmq::msg_t *msg_, int flags_); int send_to_matching (zmq::msg_t *msg_);
// Send the message to all the outbound pipes. // Send the message to all the outbound pipes.
int send_to_all (zmq::msg_t *msg_, int flags_); int send_to_all (zmq::msg_t *msg_);
bool has_out (); bool has_out ();
...@@ -72,7 +72,7 @@ namespace zmq ...@@ -72,7 +72,7 @@ namespace zmq
bool write (zmq::pipe_t *pipe_, zmq::msg_t *msg_); bool write (zmq::pipe_t *pipe_, zmq::msg_t *msg_);
// Put the message to all active pipes. // Put the message to all active pipes.
void distribute (zmq::msg_t *msg_, int flags_); void distribute (zmq::msg_t *msg_);
// List of outbound pipes. // List of outbound pipes.
typedef array_t <zmq::pipe_t, 2> pipes_t; typedef array_t <zmq::pipe_t, 2> pipes_t;
......
...@@ -71,11 +71,8 @@ void zmq::lb_t::activated (pipe_t *pipe_) ...@@ -71,11 +71,8 @@ void zmq::lb_t::activated (pipe_t *pipe_)
active++; active++;
} }
int zmq::lb_t::send (msg_t *msg_, int flags_) int zmq::lb_t::send (msg_t *msg_)
{ {
// flags_ is unused
(void)flags_;
// Drop the message if required. If we are at the end of the message // Drop the message if required. If we are at the end of the message
// switch back to non-dropping mode. // switch back to non-dropping mode.
if (dropping) { if (dropping) {
......
...@@ -42,7 +42,7 @@ namespace zmq ...@@ -42,7 +42,7 @@ namespace zmq
void activated (pipe_t *pipe_); void activated (pipe_t *pipe_);
void terminated (pipe_t *pipe_); void terminated (pipe_t *pipe_);
int send (msg_t *msg_, int flags_); int send (msg_t *msg_);
bool has_out (); bool has_out ();
private: private:
......
...@@ -69,14 +69,14 @@ void zmq::pair_t::xwrite_activated (pipe_t *) ...@@ -69,14 +69,14 @@ void zmq::pair_t::xwrite_activated (pipe_t *)
// There's nothing to do here. // There's nothing to do here.
} }
int zmq::pair_t::xsend (msg_t *msg_, int flags_) int zmq::pair_t::xsend (msg_t *msg_)
{ {
if (!pipe || !pipe->write (msg_)) { if (!pipe || !pipe->write (msg_)) {
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
if (!(flags_ & ZMQ_SNDMORE)) if (!(msg_->flags () & msg_t::more))
pipe->flush (); pipe->flush ();
// Detach the original message from the data buffer. // Detach the original message from the data buffer.
......
...@@ -43,7 +43,7 @@ namespace zmq ...@@ -43,7 +43,7 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsend (zmq::msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -53,9 +53,9 @@ void zmq::push_t::xterminated (pipe_t *pipe_) ...@@ -53,9 +53,9 @@ void zmq::push_t::xterminated (pipe_t *pipe_)
lb.terminated (pipe_); lb.terminated (pipe_);
} }
int zmq::push_t::xsend (msg_t *msg_, int flags_) int zmq::push_t::xsend (msg_t *msg_)
{ {
return lb.send (msg_, flags_); return lb.send (msg_);
} }
bool zmq::push_t::xhas_out () bool zmq::push_t::xhas_out ()
......
...@@ -46,7 +46,7 @@ namespace zmq ...@@ -46,7 +46,7 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsend (zmq::msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
void xwrite_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_);
void xterminated (zmq::pipe_t *pipe_); void xterminated (zmq::pipe_t *pipe_);
......
...@@ -35,7 +35,7 @@ zmq::rep_t::~rep_t () ...@@ -35,7 +35,7 @@ zmq::rep_t::~rep_t ()
{ {
} }
int zmq::rep_t::xsend (msg_t *msg_, int flags_) int zmq::rep_t::xsend (msg_t *msg_)
{ {
// If we are in the middle of receiving a request, we cannot send reply. // If we are in the middle of receiving a request, we cannot send reply.
if (!sending_reply) { if (!sending_reply) {
...@@ -46,7 +46,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_) ...@@ -46,7 +46,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
bool more = msg_->flags () & msg_t::more ? true : false; bool more = msg_->flags () & msg_t::more ? true : false;
// Push message to the reply pipe. // Push message to the reply pipe.
int rc = router_t::xsend (msg_, flags_); int rc = router_t::xsend (msg_);
if (rc != 0) if (rc != 0)
return rc; return rc;
...@@ -78,7 +78,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) ...@@ -78,7 +78,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
bool bottom = (msg_->size () == 0); bool bottom = (msg_->size () == 0);
// Push it to the reply pipe. // Push it to the reply pipe.
rc = router_t::xsend (msg_, flags_); rc = router_t::xsend (msg_);
errno_assert (rc == 0); errno_assert (rc == 0);
if (bottom) if (bottom)
......
...@@ -40,7 +40,7 @@ namespace zmq ...@@ -40,7 +40,7 @@ namespace zmq
~rep_t (); ~rep_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
int xsend (zmq::msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -39,7 +39,7 @@ zmq::req_t::~req_t () ...@@ -39,7 +39,7 @@ zmq::req_t::~req_t ()
{ {
} }
int zmq::req_t::xsend (msg_t *msg_, int flags_) int zmq::req_t::xsend (msg_t *msg_)
{ {
// If we've sent a request and we still haven't got the reply, // If we've sent a request and we still haven't got the reply,
// we can't send another request. // we can't send another request.
...@@ -54,7 +54,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) ...@@ -54,7 +54,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
int rc = bottom.init (); int rc = bottom.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
bottom.set_flags (msg_t::more); bottom.set_flags (msg_t::more);
rc = dealer_t::xsend (&bottom, 0); rc = dealer_t::xsend (&bottom);
if (rc != 0) if (rc != 0)
return -1; return -1;
message_begins = false; message_begins = false;
...@@ -62,7 +62,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) ...@@ -62,7 +62,7 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
bool more = msg_->flags () & msg_t::more ? true : false; bool more = msg_->flags () & msg_t::more ? true : false;
int rc = dealer_t::xsend (msg_, flags_); int rc = dealer_t::xsend (msg_);
if (rc != 0) if (rc != 0)
return rc; return rc;
......
...@@ -42,7 +42,7 @@ namespace zmq ...@@ -42,7 +42,7 @@ namespace zmq
~req_t (); ~req_t ();
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
int xsend (zmq::msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -140,11 +140,8 @@ void zmq::router_t::xwrite_activated (pipe_t *pipe_) ...@@ -140,11 +140,8 @@ void zmq::router_t::xwrite_activated (pipe_t *pipe_)
it->second.active = true; it->second.active = true;
} }
int zmq::router_t::xsend (msg_t *msg_, int flags_) int zmq::router_t::xsend (msg_t *msg_)
{ {
// flags_ is unused
(void)flags_;
// If this is the first part of the message it's the ID of the // If this is the first part of the message it's the ID of the
// peer to send the message to. // peer to send the message to.
if (!more_out) { if (!more_out) {
......
...@@ -50,7 +50,7 @@ namespace zmq ...@@ -50,7 +50,7 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (msg_t *msg_, int flags_); int xsend (msg_t *msg_);
int xrecv (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -622,7 +622,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -622,7 +622,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
msg_->set_flags (msg_t::more); msg_->set_flags (msg_t::more);
// Try to send the message. // Try to send the message.
rc = xsend (msg_, flags_); rc = xsend (msg_);
if (rc == 0) if (rc == 0)
return 0; return 0;
if (unlikely (errno != EAGAIN)) if (unlikely (errno != EAGAIN))
...@@ -644,7 +644,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) ...@@ -644,7 +644,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
while (true) { while (true) {
if (unlikely (process_commands (timeout, false) != 0)) if (unlikely (process_commands (timeout, false) != 0))
return -1; return -1;
rc = xsend (msg_, flags_); rc = xsend (msg_);
if (rc == 0) if (rc == 0)
break; break;
if (unlikely (errno != EAGAIN)) if (unlikely (errno != EAGAIN))
...@@ -886,7 +886,7 @@ bool zmq::socket_base_t::xhas_out () ...@@ -886,7 +886,7 @@ bool zmq::socket_base_t::xhas_out ()
return false; return false;
} }
int zmq::socket_base_t::xsend (msg_t *, int) int zmq::socket_base_t::xsend (msg_t *)
{ {
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
......
...@@ -133,7 +133,7 @@ namespace zmq ...@@ -133,7 +133,7 @@ namespace zmq
// The default implementation assumes that send is not supported. // The default implementation assumes that send is not supported.
virtual bool xhas_out (); virtual bool xhas_out ();
virtual int xsend (zmq::msg_t *msg_, int flags_); virtual int xsend (zmq::msg_t *msg_);
// The default implementation assumes that recv in not supported. // The default implementation assumes that recv in not supported.
virtual bool xhas_in (); virtual bool xhas_in ();
......
...@@ -58,7 +58,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, ...@@ -58,7 +58,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
// Pass it further on in the stack. // Pass it further on in the stack.
int err = 0; int err = 0;
rc = xsub_t::xsend (&msg, 0); rc = xsub_t::xsend (&msg);
if (rc != 0) if (rc != 0)
err = errno; err = errno;
int rc2 = msg.close (); int rc2 = msg.close ();
...@@ -68,7 +68,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, ...@@ -68,7 +68,7 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
return rc; return rc;
} }
int zmq::sub_t::xsend (msg_t *, int) int zmq::sub_t::xsend (msg_t *)
{ {
// Overload the XSUB's send. // Overload the XSUB's send.
errno = ENOTSUP; errno = ENOTSUP;
......
...@@ -42,7 +42,7 @@ namespace zmq ...@@ -42,7 +42,7 @@ namespace zmq
protected: protected:
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq::msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
private: private:
......
...@@ -115,7 +115,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) ...@@ -115,7 +115,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
self->dist.match (pipe_); self->dist.match (pipe_);
} }
int zmq::xpub_t::xsend (msg_t *msg_, int flags_) int zmq::xpub_t::xsend (msg_t *msg_)
{ {
bool msg_more = msg_->flags () & msg_t::more ? true : false; bool msg_more = msg_->flags () & msg_t::more ? true : false;
...@@ -126,7 +126,7 @@ int zmq::xpub_t::xsend (msg_t *msg_, int flags_) ...@@ -126,7 +126,7 @@ int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
// Send the message to all the pipes that were marked as matching // Send the message to all the pipes that were marked as matching
// in the previous step. // in the previous step.
int rc = dist.send_to_matching (msg_, flags_); int rc = dist.send_to_matching (msg_);
if (rc != 0) if (rc != 0)
return rc; return rc;
......
...@@ -48,7 +48,7 @@ namespace zmq ...@@ -48,7 +48,7 @@ namespace zmq
// Implementations of virtual functions from socket_base_t. // Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_ = false); void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_ = false);
int xsend (zmq::msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
int xrecv (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
......
...@@ -82,7 +82,7 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_) ...@@ -82,7 +82,7 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
pipe_->flush (); pipe_->flush ();
} }
int zmq::xsub_t::xsend (msg_t *msg_, int flags_) int zmq::xsub_t::xsend (msg_t *msg_)
{ {
size_t size = msg_->size (); size_t size = msg_->size ();
unsigned char *data = (unsigned char*) msg_->data (); unsigned char *data = (unsigned char*) msg_->data ();
...@@ -100,11 +100,11 @@ int zmq::xsub_t::xsend (msg_t *msg_, int flags_) ...@@ -100,11 +100,11 @@ int zmq::xsub_t::xsend (msg_t *msg_, int flags_)
// doing it here as well breaks ZMQ_XPUB_VERBOSE // doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved // when there are forwarding devices involved
subscriptions.add (data + 1, size - 1); subscriptions.add (data + 1, size - 1);
return dist.send_to_all (msg_, flags_); return dist.send_to_all (msg_);
} }
else { else {
if (subscriptions.rm (data + 1, size - 1)) if (subscriptions.rm (data + 1, size - 1))
return dist.send_to_all (msg_, flags_); return dist.send_to_all (msg_);
} }
int rc = msg_->close (); int rc = msg_->close ();
......
...@@ -46,7 +46,7 @@ namespace zmq ...@@ -46,7 +46,7 @@ namespace zmq
// Overloads of functions from socket_base_t. // Overloads of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
int xsend (zmq::msg_t *msg_, int flags_); int xsend (zmq::msg_t *msg_);
bool xhas_out (); bool xhas_out ();
int xrecv (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment