Commit 4ba34c9d authored by Pieter Hintjens's avatar Pieter Hintjens

Whitespace and style fixes

parent 06660632
...@@ -43,7 +43,8 @@ zmq::address_t::~address_t () ...@@ -43,7 +43,8 @@ zmq::address_t::~address_t ()
} }
} }
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
else if (protocol == "ipc") { else
if (protocol == "ipc") {
if (resolved.ipc_addr) { if (resolved.ipc_addr) {
delete resolved.ipc_addr; delete resolved.ipc_addr;
resolved.ipc_addr = 0; resolved.ipc_addr = 0;
...@@ -55,15 +56,14 @@ zmq::address_t::~address_t () ...@@ -55,15 +56,14 @@ zmq::address_t::~address_t ()
int zmq::address_t::to_string (std::string &addr_) const int zmq::address_t::to_string (std::string &addr_) const
{ {
if (protocol == "tcp") { if (protocol == "tcp") {
if (resolved.tcp_addr) { if (resolved.tcp_addr)
return resolved.tcp_addr->to_string(addr_); return resolved.tcp_addr->to_string(addr_);
}
} }
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
else if (protocol == "ipc") { else
if (resolved.ipc_addr) { if (protocol == "ipc") {
if (resolved.ipc_addr)
return resolved.ipc_addr->to_string(addr_); return resolved.ipc_addr->to_string(addr_);
}
} }
#endif #endif
......
...@@ -145,7 +145,8 @@ void zmq::ipc_connecter_t::start_connecting () ...@@ -145,7 +145,8 @@ void zmq::ipc_connecter_t::start_connecting ()
} }
// Connection establishment may be delayed. Poll for its completion. // Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) { else
if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
......
...@@ -53,7 +53,8 @@ zmq::mtrie_t::~mtrie_t () ...@@ -53,7 +53,8 @@ zmq::mtrie_t::~mtrie_t ()
delete next.node; delete next.node;
next.node = 0; next.node = 0;
} }
else if (count > 1) { else
if (count > 1) {
for (unsigned short i = 0; i != count; ++i) for (unsigned short i = 0; i != count; ++i)
if (next.table [i]) if (next.table [i])
delete next.table [i]; delete next.table [i];
...@@ -90,7 +91,8 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, ...@@ -90,7 +91,8 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
count = 1; count = 1;
next.node = NULL; next.node = NULL;
} }
else if (count == 1) { else
if (count == 1) {
unsigned char oldc = min; unsigned char oldc = min;
mtrie_t *oldp = next.node; mtrie_t *oldp = next.node;
count = (min < c ? c - min : min - c) + 1; count = (min < c ? c - min : min - c) + 1;
...@@ -102,8 +104,8 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, ...@@ -102,8 +104,8 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
min = std::min (min, c); min = std::min (min, c);
next.table [oldc - min] = oldp; next.table [oldc - min] = oldp;
} }
else if (min < c) { else
if (min < c) {
// The new character is above the current character range. // The new character is above the current character range.
unsigned short old_count = count; unsigned short old_count = count;
count = c - min + 1; count = c - min + 1;
...@@ -114,7 +116,6 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, ...@@ -114,7 +116,6 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
next.table [i] = NULL; next.table [i] = NULL;
} }
else { else {
// The new character is below the current character range. // The new character is below the current character range.
unsigned short old_count = count; unsigned short old_count = count;
count = (min + old_count) - c; count = (min + old_count) - c;
...@@ -244,7 +245,8 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, ...@@ -244,7 +245,8 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
count = 0; count = 0;
} }
// Compact the node table if possible // Compact the node table if possible
else if (live_nodes == 1) { else
if (live_nodes == 1) {
// If there's only one live node in the table we can // If there's only one live node in the table we can
// switch to using the more compact single-node // switch to using the more compact single-node
// representation // representation
...@@ -257,7 +259,8 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, ...@@ -257,7 +259,8 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
count = 1; count = 1;
min = new_min; min = new_min;
} }
else if (new_min > min || new_max < min + count - 1) { else
if (new_min > min || new_max < min + count - 1) {
zmq_assert (new_max - new_min + 1 > 1); zmq_assert (new_max - new_min + 1 > 1);
mtrie_t **old_table = next.table; mtrie_t **old_table = next.table;
...@@ -342,7 +345,8 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, ...@@ -342,7 +345,8 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
free (next.table); free (next.table);
next.node = oldp; next.node = oldp;
} }
else if (c == min) { else
if (c == min) {
// We can compact the table "from the left" // We can compact the table "from the left"
unsigned short i; unsigned short i;
for (i = 1; i < count; ++i) for (i = 1; i < count; ++i)
...@@ -358,7 +362,8 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, ...@@ -358,7 +362,8 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
memmove (next.table, old_table + i, sizeof (mtrie_t*) * count); memmove (next.table, old_table + i, sizeof (mtrie_t*) * count);
free (old_table); free (old_table);
} }
else if (c == min + count - 1) { else
if (c == min + count - 1) {
// We can compact the table "from the right" // We can compact the table "from the right"
unsigned short i; unsigned short i;
for (i = 1; i < count; ++i) for (i = 1; i < count; ++i)
......
...@@ -187,16 +187,17 @@ void zmq::pgm_sender_t::out_event () ...@@ -187,16 +187,17 @@ void zmq::pgm_sender_t::out_event ()
size_t nbytes = pgm_socket.send (out_buffer, write_size); size_t nbytes = pgm_socket.send (out_buffer, write_size);
// We can write either all data or 0 which means rate limit reached. // We can write either all data or 0 which means rate limit reached.
if (nbytes == write_size) { if (nbytes == write_size)
write_size = 0; write_size = 0;
} else { else {
zmq_assert (nbytes == 0); zmq_assert (nbytes == 0);
if (errno == ENOMEM) { if (errno == ENOMEM) {
const long timeout = pgm_socket.get_tx_timeout (); const long timeout = pgm_socket.get_tx_timeout ();
add_timer (timeout, tx_timer_id); add_timer (timeout, tx_timer_id);
has_tx_timer = true; has_tx_timer = true;
} else }
else
errno_assert (errno == EBUSY); errno_assert (errno == EBUSY);
} }
} }
...@@ -207,10 +208,13 @@ void zmq::pgm_sender_t::timer_event (int token) ...@@ -207,10 +208,13 @@ void zmq::pgm_sender_t::timer_event (int token)
if (token == rx_timer_id) { if (token == rx_timer_id) {
has_rx_timer = false; has_rx_timer = false;
in_event (); in_event ();
} else if (token == tx_timer_id) { }
else
if (token == tx_timer_id) {
has_tx_timer = false; has_tx_timer = false;
out_event (); out_event ();
} else }
else
zmq_assert (false); zmq_assert (false);
} }
......
...@@ -239,7 +239,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) ...@@ -239,7 +239,8 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES, !pgm_setsockopt (sock, IPPROTO_PGM, PGM_NAK_NCF_RETRIES,
&nak_ncf_retries, sizeof (nak_ncf_retries))) &nak_ncf_retries, sizeof (nak_ncf_retries)))
goto err_abort; goto err_abort;
} else { }
else {
const int send_only = 1, const int send_only = 1,
max_rte = (int) ((options.rate * 1000) / 8), max_rte = (int) ((options.rate * 1000) / 8),
txw_max_tpdu = (int) pgm_max_tpdu, txw_max_tpdu = (int) pgm_max_tpdu,
...@@ -473,7 +474,8 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) ...@@ -473,7 +474,8 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
if (nbytes > 0) { if (nbytes > 0) {
zmq_assert (status == PGM_IO_STATUS_NORMAL); zmq_assert (status == PGM_IO_STATUS_NORMAL);
zmq_assert (nbytes == data_len_); zmq_assert (nbytes == data_len_);
} else { }
else {
zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED ||
status == PGM_IO_STATUS_WOULD_BLOCK); status == PGM_IO_STATUS_WOULD_BLOCK);
...@@ -677,7 +679,8 @@ void zmq::pgm_socket_t::process_upstream () ...@@ -677,7 +679,8 @@ void zmq::pgm_socket_t::process_upstream ()
if (status == PGM_IO_STATUS_TIMER_PENDING) if (status == PGM_IO_STATUS_TIMER_PENDING)
errno = EBUSY; errno = EBUSY;
else if (status == PGM_IO_STATUS_RATE_LIMITED) else
if (status == PGM_IO_STATUS_RATE_LIMITED)
errno = ENOMEM; errno = ENOMEM;
else else
errno = EAGAIN; errno = EAGAIN;
......
...@@ -179,11 +179,11 @@ void zmq::pipe_t::rollback () ...@@ -179,11 +179,11 @@ void zmq::pipe_t::rollback ()
// Remove incomplete message from the outbound pipe. // Remove incomplete message from the outbound pipe.
msg_t msg; msg_t msg;
if (outpipe) { if (outpipe) {
while (outpipe->unwrite (&msg)) { while (outpipe->unwrite (&msg)) {
zmq_assert (msg.flags () & msg_t::more); zmq_assert (msg.flags () & msg_t::more);
int rc = msg.close (); int rc = msg.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
} }
} }
} }
...@@ -324,32 +324,37 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -324,32 +324,37 @@ void zmq::pipe_t::terminate (bool delay_)
// If the pipe is in the final phase of async termination, it's going to // If the pipe is in the final phase of async termination, it's going to
// closed anyway. No need to do anything special here. // closed anyway. No need to do anything special here.
else if (state == terminating) else
if (state == terminating)
return; return;
// The simple sync termination case. Ask the peer to terminate and wait // The simple sync termination case. Ask the peer to terminate and wait
// for the ack. // for the ack.
else if (state == active) { else
if (state == active) {
send_pipe_term (peer); send_pipe_term (peer);
state = terminated; state = terminated;
} }
// There are still pending messages available, but the user calls // There are still pending messages available, but the user calls
// 'terminate'. We can act as if all the pending messages were read. // 'terminate'. We can act as if all the pending messages were read.
else if (state == pending && !delay) { else
if (state == pending && !delay) {
outpipe = NULL; outpipe = NULL;
send_pipe_term_ack (peer); send_pipe_term_ack (peer);
state = terminating; state = terminating;
} }
// If there are pending messages still availabe, do nothing. // If there are pending messages still availabe, do nothing.
else if (state == pending) { else
if (state == pending) {
} }
// We've already got delimiter, but not term command yet. We can ignore // We've already got delimiter, but not term command yet. We can ignore
// the delimiter and ack synchronously terminate as if we were in // the delimiter and ack synchronously terminate as if we were in
// active state. // active state.
else if (state == delimited) { else
if (state == delimited) {
send_pipe_term (peer); send_pipe_term (peer);
state = terminated; state = terminated;
} }
...@@ -363,15 +368,15 @@ void zmq::pipe_t::terminate (bool delay_) ...@@ -363,15 +368,15 @@ void zmq::pipe_t::terminate (bool delay_)
if (outpipe) { if (outpipe) {
// Drop any unfinished outbound messages. // Drop any unfinished outbound messages.
rollback (); rollback ();
// Write the delimiter into the pipe. Note that watermarks are not // Write the delimiter into the pipe. Note that watermarks are not
// checked; thus the delimiter can be written even when the pipe is full. // checked; thus the delimiter can be written even when the pipe is full.
msg_t msg; msg_t msg;
msg.init_delimiter (); msg.init_delimiter ();
outpipe->write (msg, false); outpipe->write (msg, false);
flush (); flush ();
} }
} }
......
...@@ -41,7 +41,8 @@ void zmq::poller_base_t::adjust_load (int amount_) ...@@ -41,7 +41,8 @@ void zmq::poller_base_t::adjust_load (int amount_)
{ {
if (amount_ > 0) if (amount_ > 0)
load.add (amount_); load.add (amount_);
else if (amount_ < 0) else
if (amount_ < 0)
load.sub (-amount_); load.sub (-amount_);
} }
......
...@@ -182,7 +182,8 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) ...@@ -182,7 +182,8 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
bool ok = current_out->write (msg_); bool ok = current_out->write (msg_);
if (unlikely (!ok)) if (unlikely (!ok))
current_out = NULL; current_out = NULL;
else if (!more_out) { else
if (!more_out) {
current_out->flush (); current_out->flush ();
current_out = NULL; current_out = NULL;
} }
......
...@@ -479,7 +479,8 @@ void zmq::session_base_t::start_connecting (bool wait_) ...@@ -479,7 +479,8 @@ void zmq::session_base_t::start_connecting (bool wait_)
send_attach (this, pgm_sender); send_attach (this, pgm_sender);
} }
else if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) { else
if (options.type == ZMQ_SUB || options.type == ZMQ_XSUB) {
// PGM receiver. // PGM receiver.
pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t ( pgm_receiver_t *pgm_receiver = new (std::nothrow) pgm_receiver_t (
......
...@@ -147,7 +147,8 @@ int zmq::signaler_t::wait (int timeout_) ...@@ -147,7 +147,8 @@ int zmq::signaler_t::wait (int timeout_)
errno_assert (errno == EINTR); errno_assert (errno == EINTR);
return -1; return -1;
} }
else if (unlikely (rc == 0)) { else
if (unlikely (rc == 0)) {
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
......
...@@ -507,7 +507,8 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -507,7 +507,8 @@ int zmq::socket_base_t::connect (const char *addr_)
} }
} }
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
else if(protocol == "ipc") { else
if (protocol == "ipc") {
paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t (); paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
alloc_assert (paddr->resolved.ipc_addr); alloc_assert (paddr->resolved.ipc_addr);
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ()); int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
...@@ -1033,7 +1034,6 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) ...@@ -1033,7 +1034,6 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
// Register events to monitor // Register events to monitor
monitor_events = events_; monitor_events = events_;
monitor_socket = zmq_socket( get_ctx (), ZMQ_PAIR); monitor_socket = zmq_socket( get_ctx (), ZMQ_PAIR);
if (monitor_socket == NULL) if (monitor_socket == NULL)
return -1; return -1;
...@@ -1053,112 +1053,123 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) ...@@ -1053,112 +1053,123 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
void zmq::socket_base_t::event_connected (const char *addr_, int fd_) void zmq::socket_base_t::event_connected (const char *addr_, int fd_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_CONNECTED) {
if (!(monitor_events & ZMQ_EVENT_CONNECTED)) return; zmq_event_t event;
event.event = ZMQ_EVENT_CONNECTED; event.event = ZMQ_EVENT_CONNECTED;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.connected.fd = fd_; event.data.connected.fd = fd_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::event_connect_delayed (const char *addr_, int err_) void zmq::socket_base_t::event_connect_delayed (const char *addr_, int err_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) {
if (!(monitor_events & ZMQ_EVENT_CONNECT_DELAYED)) return; zmq_event_t event;
event.event = ZMQ_EVENT_CONNECT_DELAYED; event.event = ZMQ_EVENT_CONNECT_DELAYED;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.connect_delayed.err = err_; event.data.connect_delayed.err = err_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::event_connect_retried (const char *addr_, int interval_) void zmq::socket_base_t::event_connect_retried (const char *addr_, int interval_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) {
if (!(monitor_events & ZMQ_EVENT_CONNECT_RETRIED)) return; zmq_event_t event;
event.event = ZMQ_EVENT_CONNECT_RETRIED; event.event = ZMQ_EVENT_CONNECT_RETRIED;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.connect_retried.interval = interval_; event.data.connect_retried.interval = interval_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::event_listening (const char *addr_, int fd_) void zmq::socket_base_t::event_listening (const char *addr_, int fd_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_LISTENING) {
if (!(monitor_events & ZMQ_EVENT_LISTENING)) return; zmq_event_t event;
event.event = ZMQ_EVENT_LISTENING; event.event = ZMQ_EVENT_LISTENING;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.listening.fd = fd_; event.data.listening.fd = fd_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::event_bind_failed (const char *addr_, int err_) void zmq::socket_base_t::event_bind_failed (const char *addr_, int err_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_BIND_FAILED) {
if (!(monitor_events & ZMQ_EVENT_BIND_FAILED)) return; zmq_event_t event;
event.event = ZMQ_EVENT_BIND_FAILED; event.event = ZMQ_EVENT_BIND_FAILED;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.bind_failed.err = err_; event.data.bind_failed.err = err_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::event_accepted (const char *addr_, int fd_) void zmq::socket_base_t::event_accepted (const char *addr_, int fd_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_ACCEPTED) {
if (!(monitor_events & ZMQ_EVENT_ACCEPTED)) return; zmq_event_t event;
event.event = ZMQ_EVENT_ACCEPTED; event.event = ZMQ_EVENT_ACCEPTED;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.accepted.fd = fd_; event.data.accepted.fd = fd_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::event_accept_failed (const char *addr_, int err_) void zmq::socket_base_t::event_accept_failed (const char *addr_, int err_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) {
if (!(monitor_events & ZMQ_EVENT_ACCEPT_FAILED)) return; zmq_event_t event;
event.event = ZMQ_EVENT_ACCEPT_FAILED; event.event = ZMQ_EVENT_ACCEPT_FAILED;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.accept_failed.err= err_; event.data.accept_failed.err= err_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::event_closed (const char *addr_, int fd_) void zmq::socket_base_t::event_closed (const char *addr_, int fd_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_CLOSED) {
if (!(monitor_events & ZMQ_EVENT_CLOSED)) return; zmq_event_t event;
event.event = ZMQ_EVENT_CLOSED; event.event = ZMQ_EVENT_CLOSED;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.closed.fd = fd_; event.data.closed.fd = fd_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::event_close_failed (const char *addr_, int err_) void zmq::socket_base_t::event_close_failed (const char *addr_, int err_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) {
if (!(monitor_events & ZMQ_EVENT_CLOSE_FAILED)) return; zmq_event_t event;
event.event = ZMQ_EVENT_CLOSE_FAILED; event.event = ZMQ_EVENT_CLOSE_FAILED;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.close_failed.err = err_; event.data.close_failed.err = err_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::event_disconnected (const char *addr_, int fd_) void zmq::socket_base_t::event_disconnected (const char *addr_, int fd_)
{ {
zmq_event_t event; if (monitor_events & ZMQ_EVENT_DISCONNECTED) {
if (!(monitor_events & ZMQ_EVENT_DISCONNECTED)) return; zmq_event_t event;
event.event = ZMQ_EVENT_DISCONNECTED; event.event = ZMQ_EVENT_DISCONNECTED;
event.data.connected.addr = (char *)addr_; event.data.connected.addr = (char *) addr_;
event.data.disconnected.fd = fd_; event.data.disconnected.fd = fd_;
monitor_event (event); monitor_event (event);
}
} }
void zmq::socket_base_t::monitor_event (zmq_event_t event_) void zmq::socket_base_t::monitor_event (zmq_event_t event_)
{ {
zmq_msg_t msg; if (monitor_socket) {
if (!monitor_socket) return; zmq_msg_t msg;
zmq_msg_init_size (&msg, sizeof (event_)); zmq_msg_init_size (&msg, sizeof (event_));
memcpy (zmq_msg_data (&msg), &event_, sizeof (event_)); memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
zmq_sendmsg (monitor_socket, &msg, 0); zmq_sendmsg (monitor_socket, &msg, 0);
zmq_msg_close (&msg); zmq_msg_close (&msg);
}
} }
void zmq::socket_base_t::stop_monitor() void zmq::socket_base_t::stop_monitor()
...@@ -1169,4 +1180,3 @@ void zmq::socket_base_t::stop_monitor() ...@@ -1169,4 +1180,3 @@ void zmq::socket_base_t::stop_monitor()
monitor_events = 0; monitor_events = 0;
} }
} }
...@@ -51,7 +51,8 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_, ...@@ -51,7 +51,8 @@ int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
unsigned char *data = (unsigned char*) msg.data (); unsigned char *data = (unsigned char*) msg.data ();
if (option_ == ZMQ_SUBSCRIBE) if (option_ == ZMQ_SUBSCRIBE)
*data = 1; *data = 1;
else if (option_ == ZMQ_UNSUBSCRIBE) else
if (option_ == ZMQ_UNSUBSCRIBE)
*data = 0; *data = 0;
memcpy (data + 1, optval_, optvallen_); memcpy (data + 1, optval_, optvallen_);
......
...@@ -159,7 +159,8 @@ void zmq::tcp_connecter_t::start_connecting () ...@@ -159,7 +159,8 @@ void zmq::tcp_connecter_t::start_connecting ()
} }
// Connection establishment may be delayed. Poll for its completion. // Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) { else
if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
......
...@@ -48,7 +48,8 @@ zmq::trie_t::~trie_t () ...@@ -48,7 +48,8 @@ zmq::trie_t::~trie_t ()
delete next.node; delete next.node;
next.node = 0; next.node = 0;
} }
else if (count > 1) { else
if (count > 1) {
for (unsigned short i = 0; i != count; ++i) for (unsigned short i = 0; i != count; ++i)
if (next.table [i]) if (next.table [i])
delete next.table [i]; delete next.table [i];
...@@ -74,7 +75,8 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_) ...@@ -74,7 +75,8 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
count = 1; count = 1;
next.node = NULL; next.node = NULL;
} }
else if (count == 1) { else
if (count == 1) {
unsigned char oldc = min; unsigned char oldc = min;
trie_t *oldp = next.node; trie_t *oldp = next.node;
count = (min < c ? c - min : min - c) + 1; count = (min < c ? c - min : min - c) + 1;
...@@ -86,8 +88,8 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_) ...@@ -86,8 +88,8 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
min = std::min (min, c); min = std::min (min, c);
next.table [oldc - min] = oldp; next.table [oldc - min] = oldp;
} }
else if (min < c) { else
if (min < c) {
// The new character is above the current character range. // The new character is above the current character range.
unsigned short old_count = count; unsigned short old_count = count;
count = c - min + 1; count = c - min + 1;
...@@ -136,121 +138,120 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_) ...@@ -136,121 +138,120 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_) bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
{ {
// TODO: Shouldn't an error be reported if the key does not exist? // TODO: Shouldn't an error be reported if the key does not exist?
if (!size_) {
if (!size_) { if (!refcnt)
if (!refcnt) return false;
return false; refcnt--;
refcnt--; return refcnt == 0;
return refcnt == 0; }
} unsigned char c = *prefix_;
if (!count || c < min || c >= min + count)
unsigned char c = *prefix_; return false;
if (!count || c < min || c >= min + count)
return false; trie_t *next_node =
count == 1 ? next.node : next.table [c - min];
trie_t *next_node =
count == 1 ? next.node : next.table [c - min]; if (!next_node)
return false;
if (!next_node)
return false; bool ret = next_node->rm (prefix_ + 1, size_ - 1);
bool ret = next_node->rm (prefix_ + 1, size_ - 1); // Prune redundant nodes
if (next_node->is_redundant ()) {
// Prune redundant nodes delete next_node;
if (next_node->is_redundant ()) { zmq_assert (count > 0);
delete next_node;
zmq_assert (count > 0); if (count == 1) {
// The just pruned node is was the only live node
if (count == 1) { next.node = 0;
// The just pruned node is was the only live node count = 0;
next.node = 0; --live_nodes;
count = 0; zmq_assert (live_nodes == 0);
--live_nodes; }
zmq_assert (live_nodes == 0); else {
} next.table [c - min] = 0;
else { zmq_assert (live_nodes > 1);
next.table [c - min] = 0; --live_nodes;
zmq_assert (live_nodes > 1);
--live_nodes; // Compact the table if possible
if (live_nodes == 1) {
// Compact the table if possible // We can switch to using the more compact single-node
if (live_nodes == 1) { // representation since the table only contains one live node
// We can switch to using the more compact single-node trie_t *node = 0;
// representation since the table only contains one live node // Since we always compact the table the pruned node must
trie_t *node = 0; // either be the left-most or right-most ptr in the node
// Since we always compact the table the pruned node must // table
// either be the left-most or right-most ptr in the node if (c == min) {
// table // The pruned node is the left-most node ptr in the
if (c == min) { // node table => keep the right-most node
// The pruned node is the left-most node ptr in the node = next.table [count - 1];
// node table => keep the right-most node min += count - 1;
node = next.table [count - 1]; }
min += count - 1; else
} if (c == min + count - 1) {
else if (c == min + count - 1) { // The pruned node is the right-most node ptr in the
// The pruned node is the right-most node ptr in the // node table => keep the left-most node
// node table => keep the left-most node node = next.table [0];
node = next.table [0]; }
} zmq_assert (node);
free (next.table);
zmq_assert (node); next.node = node;
free (next.table); count = 1;
next.node = node; }
count = 1; else
} if (c == min) {
else if (c == min) { // We can compact the table "from the left".
// We can compact the table "from the left". // Find the left-most non-null node ptr, which we'll use as
// Find the left-most non-null node ptr, which we'll use as // our new min
// our new min unsigned char new_min = min;
unsigned char new_min = min; for (unsigned short i = 1; i < count; ++i) {
for (unsigned short i = 1; i < count; ++i) { if (next.table [i]) {
if (next.table [i]) { new_min = i + min;
new_min = i + min; break;
break; }
} }
} zmq_assert (new_min != min);
zmq_assert (new_min != min);
trie_t **old_table = next.table;
trie_t **old_table = next.table; zmq_assert (new_min > min);
zmq_assert (new_min > min); zmq_assert (count > new_min - min);
zmq_assert (count > new_min - min);
count = count - (new_min - min);
count = count - (new_min - min); next.table = (trie_t**) malloc (sizeof (trie_t*) * count);
next.table = (trie_t**) malloc (sizeof (trie_t*) * count); alloc_assert (next.table);
alloc_assert (next.table);
memmove (next.table, old_table + (new_min - min),
memmove (next.table, old_table + (new_min - min), sizeof (trie_t*) * count);
sizeof (trie_t*) * count); free (old_table);
free (old_table);
min = new_min;
min = new_min; }
} else
else if (c == min + count - 1) { if (c == min + count - 1) {
// We can compact the table "from the right". // We can compact the table "from the right".
// Find the right-most non-null node ptr, which we'll use to // Find the right-most non-null node ptr, which we'll use to
// determine the new table size // determine the new table size
unsigned short new_count = count; unsigned short new_count = count;
for (unsigned short i = 1; i < count; ++i) { for (unsigned short i = 1; i < count; ++i) {
if (next.table [count - 1 - i]) { if (next.table [count - 1 - i]) {
new_count = count - i; new_count = count - i;
break; break;
} }
} }
zmq_assert (new_count != count); zmq_assert (new_count != count);
count = new_count; count = new_count;
trie_t **old_table = next.table; trie_t **old_table = next.table;
next.table = (trie_t**) malloc (sizeof (trie_t*) * count); next.table = (trie_t**) malloc (sizeof (trie_t*) * count);
alloc_assert (next.table); alloc_assert (next.table);
memmove (next.table, old_table, sizeof (trie_t*) * count); memmove (next.table, old_table, sizeof (trie_t*) * count);
free (old_table); free (old_table);
} }
} }
} }
return ret;
return ret;
} }
bool zmq::trie_t::check (unsigned char *data_, size_t size_) bool zmq::trie_t::check (unsigned char *data_, size_t size_)
......
...@@ -674,15 +674,15 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -674,15 +674,15 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
int nevents = 0; int nevents = 0;
while (true) { while (true) {
// Compute the timeout for the subsequent poll.
// Compute the timeout for the subsequent poll. int timeout;
int timeout; if (first_pass)
if (first_pass) timeout = 0;
timeout = 0; else
else if (timeout_ < 0) if (timeout_ < 0)
timeout = -1; timeout = -1;
else else
timeout = end - now; timeout = end - now;
// Wait for events. // Wait for events.
while (true) { while (true) {
...@@ -694,7 +694,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -694,7 +694,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
errno_assert (rc >= 0); errno_assert (rc >= 0);
break; break;
} }
// Check for the events. // Check for the events.
for (int i = 0; i != nitems_; i++) { for (int i = 0; i != nitems_; i++) {
...@@ -848,7 +847,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) ...@@ -848,7 +847,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
timeout.tv_usec = 0; timeout.tv_usec = 0;
ptimeout = &timeout; ptimeout = &timeout;
} }
else if (timeout_ < 0) else
if (timeout_ < 0)
ptimeout = NULL; ptimeout = NULL;
else { else {
timeout.tv_sec = (long) ((end - now) / 1000); timeout.tv_sec = (long) ((end - now) / 1000);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment