Commit 61ad236e authored by Martin Sustrik's avatar Martin Sustrik

ZMQ_NOFLUSH and zmq_flush obsoleted

parent c42343d3
MAN1 = zmq_forwarder.1 zmq_streamer.1 zmq_queue.1 MAN1 = zmq_forwarder.1 zmq_streamer.1 zmq_queue.1
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_flush.3 zmq_init.3 \ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \ zmq_poll.3 zmq_recv.3 zmq_send.3 zmq_setsockopt.3 zmq_socket.3 \
......
...@@ -115,7 +115,6 @@ Establishing a message flow:: ...@@ -115,7 +115,6 @@ Establishing a message flow::
Sending and receiving messages:: Sending and receiving messages::
linkzmq:zmq_send[3] linkzmq:zmq_send[3]
linkzmq:zmq_flush[3]
linkzmq:zmq_recv[3] linkzmq:zmq_recv[3]
......
zmq_flush(3)
============
NAME
----
zmq_flush - flush messages queued on a socket
SYNOPSIS
--------
*int zmq_flush (void '*socket');*
DESCRIPTION
-----------
The _zmq_flush()_ function shall flush messages previously queued on the socket
referenced by the 'socket' argument. The _zmq_flush()_ function only affects
messages that have been queued on the _message queue_ associated with 'socket'
using the 'ZMQ_NOFLUSH' flag to the _zmq_send()_ function. If no such messages
exist, the function has no effect.
CAUTION: A successful invocation of _zmq_flush()_ does not indicate that the
flushed messages have been transmitted to the network, or even that such a
transmission has been initiated by 0MQ. This function exists merely as a way
for the application programmer to supply a hint to the 0MQ infrastructure that
the queued messages *may* be flushed as a single batch.
RETURN VALUE
------------
The _zmq_flush()_ function shall return zero if successful. Otherwise it shall
return `-1` and set 'errno' to one of the values defined below.
ERRORS
------
*ENOTSUP*::
The _zmq_flush()_ operation is not supported by this socket type.
*EFSM*::
The _zmq_flush()_ operation cannot be performed on this socket at the moment
due to the socket not being in the appropriate state.
SEE ALSO
--------
linkzmq:zmq_send[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHOR
------
The 0MQ documentation was written by Martin Sustrik <sustrik@250bpm.com> and
Martin Lucina <mato@kotelna.sk>.
...@@ -23,13 +23,6 @@ Specifies that the operation should be performed in non-blocking mode. If the ...@@ -23,13 +23,6 @@ Specifies that the operation should be performed in non-blocking mode. If the
message cannot be queued on the underlying _message queue_ associated with message cannot be queued on the underlying _message queue_ associated with
'socket', the _zmq_send()_ function shall fail with 'errno' set to EAGAIN. 'socket', the _zmq_send()_ function shall fail with 'errno' set to EAGAIN.
*ZMQ_NOFLUSH*::
Specifies that the _zmq_send()_ function should not flush the underlying
_message queue_ associated with 'socket' to the network automatically.
Instead, it should batch all messages queued with the 'ZMQ_NOFLUSH' flag and
only flush the _message queue_ once either a message without the 'ZMQ_NOFLUSH'
flag is queued, or manually on invocation of the _zmq_flush()_ function.
NOTE: A successful invocation of _zmq_send()_ does not indicate that the NOTE: A successful invocation of _zmq_send()_ does not indicate that the
message has been transmitted to the network, only that it has been queued on message has been transmitted to the network, only that it has been queued on
the _message queue_ associated with the socket and 0MQ has assumed the _message queue_ associated with the socket and 0MQ has assumed
...@@ -73,7 +66,6 @@ assert (rc == 0); ...@@ -73,7 +66,6 @@ assert (rc == 0);
SEE ALSO SEE ALSO
-------- --------
linkzmq:zmq_flush[3]
linkzmq:zmq_recv[3] linkzmq:zmq_recv[3]
linkzmq:zmq_socket[7] linkzmq:zmq_socket[7]
linkzmq:zmq[7] linkzmq:zmq[7]
......
...@@ -125,7 +125,6 @@ linkzmq:zmq_setsockopt[3] ...@@ -125,7 +125,6 @@ linkzmq:zmq_setsockopt[3]
linkzmq:zmq_bind[3] linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3] linkzmq:zmq_connect[3]
linkzmq:zmq_send[3] linkzmq:zmq_send[3]
linkzmq:zmq_flush[3]
linkzmq:zmq_recv[3] linkzmq:zmq_recv[3]
......
...@@ -236,13 +236,6 @@ namespace zmq ...@@ -236,13 +236,6 @@ namespace zmq
throw error_t (); throw error_t ();
} }
inline void flush ()
{
int rc = zmq_flush (ptr);
if (rc != 0)
throw error_t ();
}
inline bool recv (message_t *msg_, int flags_ = 0) inline bool recv (message_t *msg_, int flags_ = 0)
{ {
int rc = zmq_recv (ptr, msg_, flags_); int rc = zmq_recv (ptr, msg_, flags_);
......
...@@ -83,16 +83,6 @@ int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -83,16 +83,6 @@ int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
return lb.send (msg_, flags_); return lb.send (msg_, flags_);
} }
int zmq::downstream_t::xflush ()
{
// TODO: Maybe there's a point in flushing messages downstream.
// It may be useful in the case where number of messages in a single
// transaction is much greater than the number of attached pipes.
errno = ENOTSUP;
return -1;
}
int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
errno = ENOTSUP; errno = ENOTSUP;
......
...@@ -43,7 +43,6 @@ namespace zmq ...@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_); void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -80,8 +80,7 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_) ...@@ -80,8 +80,7 @@ int zmq::lb_t::send (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
if (!(flags_ & ZMQ_NOFLUSH)) pipes [current]->flush ();
pipes [current]->flush ();
// Detach the message from the data buffer. // Detach the message from the data buffer.
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
......
...@@ -100,8 +100,7 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -100,8 +100,7 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
if (!(flags_ & ZMQ_NOFLUSH)) outpipe->flush ();
outpipe->flush ();
// Detach the original message from the data buffer. // Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
...@@ -110,13 +109,6 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -110,13 +109,6 @@ int zmq::p2p_t::xsend (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::p2p_t::xflush ()
{
if (outpipe)
outpipe->flush ();
return 0;
}
int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
// Deallocate old content of the message. // Deallocate old content of the message.
......
...@@ -42,7 +42,6 @@ namespace zmq ...@@ -42,7 +42,6 @@ namespace zmq
void xrevive (class writer_t *pipe_); void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -107,8 +107,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -107,8 +107,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
bool written = out_pipes [i]->write (msg_); bool written = out_pipes [i]->write (msg_);
zmq_assert (written); zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH)) out_pipes [i]->flush ();
out_pipes [i]->flush ();
} }
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
...@@ -121,8 +120,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -121,8 +120,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
if (pipes_count == 1) { if (pipes_count == 1) {
bool written = out_pipes [0]->write (msg_); bool written = out_pipes [0]->write (msg_);
zmq_assert (written); zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH)) out_pipes [0]->flush ();
out_pipes [0]->flush ();
int rc = zmq_msg_init (msg_); int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0); zmq_assert (rc == 0);
return 0; return 0;
...@@ -142,8 +140,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -142,8 +140,7 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) { for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
bool written = out_pipes [i]->write (msg_); bool written = out_pipes [i]->write (msg_);
zmq_assert (written); zmq_assert (written);
if (!(flags_ & ZMQ_NOFLUSH)) out_pipes [i]->flush ();
out_pipes [i]->flush ();
} }
// Detach the original message from the data buffer. // Detach the original message from the data buffer.
...@@ -153,14 +150,6 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -153,14 +150,6 @@ int zmq::pub_t::xsend (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::pub_t::xflush ()
{
out_pipes_t::size_type pipe_count = out_pipes.size ();
for (out_pipes_t::size_type i = 0; i != pipe_count; i++)
out_pipes [i]->flush ();
return 0;
}
int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
errno = ENOTSUP; errno = ENOTSUP;
......
...@@ -43,7 +43,6 @@ namespace zmq ...@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_); void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -188,12 +188,6 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -188,12 +188,6 @@ int zmq::rep_t::xsend (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::rep_t::xflush ()
{
errno = ENOTSUP;
return -1;
}
int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
// Deallocate old content of the message. // Deallocate old content of the message.
......
...@@ -43,7 +43,6 @@ namespace zmq ...@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_); void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -207,12 +207,6 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -207,12 +207,6 @@ int zmq::req_t::xsend (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::req_t::xflush ()
{
errno = ENOTSUP;
return -1;
}
int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
// Deallocate old content of the message. // Deallocate old content of the message.
......
...@@ -43,7 +43,6 @@ namespace zmq ...@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_); void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -335,11 +335,6 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_) ...@@ -335,11 +335,6 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::socket_base_t::flush ()
{
return xflush ();
}
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{ {
// Get the message. // Get the message.
......
...@@ -52,7 +52,6 @@ namespace zmq ...@@ -52,7 +52,6 @@ namespace zmq
int bind (const char *addr_); int bind (const char *addr_);
int connect (const char *addr_); int connect (const char *addr_);
int send (zmq_msg_t *msg_, int flags_); int send (zmq_msg_t *msg_, int flags_);
int flush ();
int recv (zmq_msg_t *msg_, int flags_); int recv (zmq_msg_t *msg_, int flags_);
int close (); int close ();
...@@ -113,7 +112,6 @@ namespace zmq ...@@ -113,7 +112,6 @@ namespace zmq
virtual int xsetsockopt (int option_, const void *optval_, virtual int xsetsockopt (int option_, const void *optval_,
size_t optvallen_) = 0; size_t optvallen_) = 0;
virtual int xsend (zmq_msg_t *msg_, int options_) = 0; virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
virtual int xflush () = 0;
virtual int xrecv (zmq_msg_t *msg_, int options_) = 0; virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
virtual bool xhas_in () = 0; virtual bool xhas_in () = 0;
virtual bool xhas_out () = 0; virtual bool xhas_out () = 0;
......
...@@ -98,12 +98,6 @@ int zmq::sub_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -98,12 +98,6 @@ int zmq::sub_t::xsend (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
int zmq::sub_t::xflush ()
{
errno = ENOTSUP;
return -1;
}
int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::sub_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
// If there's already a message prepared by a previous call to zmq_poll, // If there's already a message prepared by a previous call to zmq_poll,
......
...@@ -48,7 +48,6 @@ namespace zmq ...@@ -48,7 +48,6 @@ namespace zmq
void xrevive (class writer_t *pipe_); void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -81,12 +81,6 @@ int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -81,12 +81,6 @@ int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
return -1; return -1;
} }
int zmq::upstream_t::xflush ()
{
errno = ENOTSUP;
return -1;
}
int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
return fq.recv (msg_, flags_); return fq.recv (msg_, flags_);
......
...@@ -43,7 +43,6 @@ namespace zmq ...@@ -43,7 +43,6 @@ namespace zmq
void xrevive (class writer_t *pipe_); void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -129,12 +129,6 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -129,12 +129,6 @@ int zmq::xrep_t::xsend (zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
int zmq::xrep_t::xflush ()
{
zmq_assert (false);
return -1;
}
int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::xrep_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
return fq.recv (msg_, flags_); return fq.recv (msg_, flags_);
......
...@@ -46,7 +46,6 @@ namespace zmq ...@@ -46,7 +46,6 @@ namespace zmq
void xrevive (class writer_t *pipe_); void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -80,13 +80,6 @@ int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_) ...@@ -80,13 +80,6 @@ int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
return lb.send (msg_, flags_); return lb.send (msg_, flags_);
} }
int zmq::xreq_t::xflush ()
{
// TODO: Implement flushing.
zmq_assert (false);
return -1;
}
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_) int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
{ {
return fq.recv (msg_, flags_); return fq.recv (msg_, flags_);
......
...@@ -44,7 +44,6 @@ namespace zmq ...@@ -44,7 +44,6 @@ namespace zmq
void xrevive (class writer_t *pipe_); void xrevive (class writer_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_); int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_); int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in (); bool xhas_in ();
bool xhas_out (); bool xhas_out ();
......
...@@ -315,7 +315,8 @@ int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) ...@@ -315,7 +315,8 @@ int zmq_send (void *s_, zmq_msg_t *msg_, int flags_)
int zmq_flush (void *s_) int zmq_flush (void *s_)
{ {
return (((zmq::socket_base_t*) s_)->flush ()); errno = ENOTSUP;
return -1;
} }
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment