Commit f7bd543c authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #351 from hurtonm/code_cleanup

Code cleanup
parents 8152502f 24b79c7e
......@@ -113,7 +113,7 @@ int zmq::ctx_t::terminate ()
int rc = term_mailbox.recv (&cmd, -1);
if (rc == -1 && errno == EINTR)
return -1;
zmq_assert (rc == 0);
errno_assert (rc == 0);
zmq_assert (cmd.type == command_t::done);
slot_sync.lock ();
zmq_assert (sockets.empty ());
......
......@@ -89,7 +89,7 @@ bool zmq::dealer_t::xhas_in ()
int rc = dealer_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
if (rc != 0 && errno == EAGAIN)
return false;
zmq_assert (rc == 0);
errno_assert (rc == 0);
prefetched = true;
return true;
}
......
......@@ -133,7 +133,7 @@ void zmq::dist_t::distribute (msg_t *msg_, int flags_)
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
zmq_assert (rc == 0);
errno_assert (rc == 0);
return;
}
......
......@@ -75,7 +75,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
errno_assert (rc == 0);
// Round-robin over the pipes to get the next message.
for (pipes_t::size_type count = active; count != 0; count--) {
while (active > 0) {
// Try to fetch new message. If we've already read part of the message
// subsequent part should be immediately available.
......@@ -84,7 +84,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
// Check the atomicity of the message. If we've already received the
// first part of the message we should get the remaining parts
// without blocking.
zmq_assert (!(more && !fetched));
zmq_assert (!more || fetched);
// Note that when message is not fetched, current pipe is deactivated
// and replaced by another active pipe. Thus we don't have to increase
......@@ -92,21 +92,16 @@ int zmq::fq_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
if (fetched) {
if (pipe_)
*pipe_ = pipes [current];
more =
msg_->flags () & msg_t::more ? true : false;
if (!more) {
current++;
if (current >= active)
current = 0;
}
more = msg_->flags () & msg_t::more? true: false;
if (!more)
current = (current + 1) % active;
return 0;
}
else {
active--;
pipes.swap (current, active);
if (current == active)
current = 0;
}
active--;
pipes.swap (current, active);
if (current == active)
current = 0;
}
// No message is available. Initialise the output parameter
......@@ -127,7 +122,7 @@ bool zmq::fq_t::has_in ()
// queueing algorithm. If there are no messages available current will
// get back to its original value. Otherwise it'll point to the first
// pipe holding messages, skipping only pipes with no messages available.
for (pipes_t::size_type count = active; count != 0; count--) {
while (active > 0) {
if (pipes [current]->check_read ())
return true;
......
......@@ -84,7 +84,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
zmq_assert (rc == 0);
errno_assert (rc == 0);
return 0;
}
......
......@@ -94,7 +94,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
count = (min < c ? c - min : min - c) + 1;
next.table = (mtrie_t**)
malloc (sizeof (mtrie_t*) * count);
zmq_assert (next.table);
alloc_assert (next.table);
for (unsigned short i = 0; i != count; ++i)
next.table [i] = 0;
min = std::min (min, c);
......@@ -107,7 +107,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
count = c - min + 1;
next.table = (mtrie_t**) realloc ((void*) next.table,
sizeof (mtrie_t*) * count);
zmq_assert (next.table);
alloc_assert (next.table);
for (unsigned short i = old_count; i != count; i++)
next.table [i] = NULL;
}
......@@ -118,7 +118,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
count = (min + old_count) - c;
next.table = (mtrie_t**) realloc ((void*) next.table,
sizeof (mtrie_t*) * count);
zmq_assert (next.table);
alloc_assert (next.table);
memmove (next.table + min - c, next.table,
old_count * sizeof (mtrie_t*));
for (unsigned short i = 0; i != min - c; i++)
......@@ -132,7 +132,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
if (!next.node) {
next.node = new (std::nothrow) mtrie_t;
++live_nodes;
zmq_assert (next.node);
alloc_assert (next.node);
}
return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_);
}
......@@ -140,7 +140,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
if (!next.table [c - min]) {
next.table [c - min] = new (std::nothrow) mtrie_t;
++live_nodes;
zmq_assert (next.table [c - min]);
alloc_assert (next.table [c - min]);
}
return next.table [c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_);
}
......@@ -260,7 +260,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
count = new_max - new_min + 1;
next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
zmq_assert (next.table);
alloc_assert (next.table);
memmove (next.table, old_table + (new_min - min),
sizeof (mtrie_t*) * count);
......@@ -353,7 +353,7 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
count = count - (new_min - min);
next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
zmq_assert (next.table);
alloc_assert (next.table);
memmove (next.table, old_table + (new_min - min),
sizeof (mtrie_t*) * count);
......@@ -375,7 +375,7 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
mtrie_t **old_table = next.table;
next.table = (mtrie_t**) malloc (sizeof (mtrie_t*) * count);
zmq_assert (next.table);
alloc_assert (next.table);
memmove (next.table, old_table, sizeof (mtrie_t*) * count);
free (old_table);
......
......@@ -197,7 +197,7 @@ void zmq::pgm_sender_t::out_event ()
add_timer (timeout, tx_timer_id);
has_tx_timer = true;
} else
zmq_assert (errno == EBUSY);
errno_assert (errno == EBUSY);
}
}
......
......@@ -155,7 +155,8 @@ int zmq::session_base_t::read (msg_t *msg_)
// First message to send is identity (if required).
if (send_identity) {
zmq_assert (!(msg_->flags () & msg_t::more));
msg_->init_size (options.identity_size);
int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (msg_->data (), options.identity, options.identity_size);
send_identity = false;
incomplete_in = false;
......@@ -449,7 +450,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
alloc_assert (pgm_sender);
int rc = pgm_sender->init (udp_encapsulation, addr->address.c_str ());
zmq_assert (rc == 0);
errno_assert (rc == 0);
send_attach (this, pgm_sender);
}
......@@ -461,7 +462,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
alloc_assert (pgm_receiver);
int rc = pgm_receiver->init (udp_encapsulation, addr->address.c_str ());
zmq_assert (rc == 0);
errno_assert (rc == 0);
send_attach (this, pgm_receiver);
}
......
......@@ -144,7 +144,7 @@ int zmq::signaler_t::wait (int timeout_)
pfd.events = POLLIN;
int rc = poll (&pfd, 1, timeout_);
if (unlikely (rc < 0)) {
zmq_assert (errno == EINTR);
errno_assert (errno == EINTR);
return -1;
}
else if (unlikely (rc == 0)) {
......@@ -173,7 +173,7 @@ int zmq::signaler_t::wait (int timeout_)
int rc = select (r + 1, &fds, NULL, NULL,
timeout_ >= 0 ? &timeout : NULL);
if (unlikely (rc < 0)) {
zmq_assert (errno == EINTR);
errno_assert (errno == EINTR);
return -1;
}
#endif
......
......@@ -448,7 +448,7 @@ int zmq::socket_base_t::connect (const char *addr_)
if (options.send_identity) {
msg_t id;
rc = id.init_size (options.identity_size);
zmq_assert (rc == 0);
errno_assert (rc == 0);
memcpy (id.data (), options.identity, options.identity_size);
id.set_flags (msg_t::identity);
bool written = pipes [0]->write (&id);
......@@ -460,7 +460,7 @@ int zmq::socket_base_t::connect (const char *addr_)
if (peer.options.send_identity) {
msg_t id;
rc = id.init_size (peer.options.identity_size);
zmq_assert (rc == 0);
errno_assert (rc == 0);
memcpy (id.data (), peer.options.identity, peer.options.identity_size);
id.set_flags (msg_t::identity);
bool written = pipes [1]->write (&id);
......@@ -487,12 +487,12 @@ int zmq::socket_base_t::connect (const char *addr_)
}
address_t *paddr = new (std::nothrow) address_t (protocol, address);
zmq_assert (paddr);
alloc_assert (paddr);
// Resolve address (if needed by the protocol)
if (protocol == "tcp") {
paddr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
zmq_assert (paddr->resolved.tcp_addr);
alloc_assert (paddr->resolved.tcp_addr);
int rc = paddr->resolved.tcp_addr->resolve (
address.c_str (), false, options.ipv4only ? true : false);
if (rc != 0) {
......@@ -503,7 +503,7 @@ int zmq::socket_base_t::connect (const char *addr_)
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
else if(protocol == "ipc") {
paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
zmq_assert (paddr->resolved.ipc_addr);
alloc_assert (paddr->resolved.ipc_addr);
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
if (rc != 0) {
delete paddr;
......
......@@ -59,14 +59,14 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
// Create a socket.
int fd = open_socket (AF_INET, SOCK_DGRAM, 0);
zmq_assert (fd != -1);
errno_assert (fd != -1);
// Retrieve number of interfaces.
lifnum ifn;
ifn.lifn_family = AF_INET;
ifn.lifn_flags = 0;
int rc = ioctl (fd, SIOCGLIFNUM, (char*) &ifn);
zmq_assert (rc != -1);
errno_assert (rc != -1);
// Allocate memory to get interface names.
size_t ifr_size = sizeof (struct lifreq) * ifn.lifn_count;
......@@ -80,7 +80,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
ifc.lifc_len = ifr_size;
ifc.lifc_buf = ifr;
rc = ioctl (fd, SIOCGLIFCONF, (char*) &ifc);
zmq_assert (rc != -1);
errno_assert (rc != -1);
// Find the interface with the specified name and AF_INET family.
bool found = false;
......@@ -89,7 +89,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
n ++, ifrp ++) {
if (!strcmp (nic_, ifrp->lifr_name)) {
rc = ioctl (fd, SIOCGLIFADDR, (char*) ifrp);
zmq_assert (rc != -1);
errno_assert (rc != -1);
if (ifrp->lifr_addr.ss_family == AF_INET) {
address.ipv4 = *(sockaddr_in*) &ifrp->lifr_addr;
found = true;
......@@ -124,7 +124,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
// Create a socket.
int sd = open_socket (AF_INET, SOCK_DGRAM, 0);
zmq_assert (sd != -1);
errno_assert (sd != -1);
struct ifreq ifr;
......@@ -162,7 +162,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv4only_)
// Get the addresses.
ifaddrs* ifa = NULL;
int rc = getifaddrs (&ifa);
zmq_assert (rc == 0);
errno_assert (rc == 0);
zmq_assert (ifa != NULL);
// Find the corresponding network interface.
......
......@@ -80,7 +80,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
count = (min < c ? c - min : min - c) + 1;
next.table = (trie_t**)
malloc (sizeof (trie_t*) * count);
zmq_assert (next.table);
alloc_assert (next.table);
for (unsigned short i = 0; i != count; ++i)
next.table [i] = 0;
min = std::min (min, c);
......@@ -117,7 +117,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
if (count == 1) {
if (!next.node) {
next.node = new (std::nothrow) trie_t;
zmq_assert (next.node);
alloc_assert (next.node);
++live_nodes;
zmq_assert (live_nodes == 1);
}
......@@ -126,7 +126,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
else {
if (!next.table [c - min]) {
next.table [c - min] = new (std::nothrow) trie_t;
zmq_assert (next.table [c - min]);
alloc_assert (next.table [c - min]);
++live_nodes;
zmq_assert (live_nodes > 1);
}
......@@ -218,7 +218,7 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
count = count - (new_min - min);
next.table = (trie_t**) malloc (sizeof (trie_t*) * count);
zmq_assert (next.table);
alloc_assert (next.table);
memmove (next.table, old_table + (new_min - min),
sizeof (trie_t*) * count);
......@@ -242,7 +242,7 @@ bool zmq::trie_t::rm (unsigned char *prefix_, size_t size_)
trie_t **old_table = next.table;
next.table = (trie_t**) malloc (sizeof (trie_t*) * count);
zmq_assert (next.table);
alloc_assert (next.table);
memmove (next.table, old_table, sizeof (trie_t*) * count);
free (old_table);
......
......@@ -150,7 +150,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
// from the pipe.
while (msg_->flags () & msg_t::more) {
rc = fq.recv (msg_);
zmq_assert (rc == 0);
errno_assert (rc == 0);
}
}
}
......@@ -176,7 +176,7 @@ bool zmq::xsub_t::xhas_in ()
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0) {
zmq_assert (errno == EAGAIN);
errno_assert (errno == EAGAIN);
return false;
}
......@@ -190,7 +190,7 @@ bool zmq::xsub_t::xhas_in ()
// from the pipe.
while (message.flags () & msg_t::more) {
rc = fq.recv (&message);
zmq_assert (rc == 0);
errno_assert (rc == 0);
}
}
}
......@@ -208,7 +208,7 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_,
// Create the subsctription message.
msg_t msg;
int rc = msg.init_size (size_ + 1);
zmq_assert (rc == 0);
errno_assert (rc == 0);
unsigned char *data = (unsigned char*) msg.data ();
data [0] = 1;
memcpy (data + 1, data_, size_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment