Commit 0404b3b6 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #454 from hurtonm/code_cleanup

Style fixes
parents abbe34cd 9d8eb1f9
...@@ -143,8 +143,9 @@ namespace zmq ...@@ -143,8 +143,9 @@ namespace zmq
} }
} }
inline bool message_ready_size (size_t msg_sz){ inline bool message_ready_size (size_t msg_sz)
zmq_assert(false); {
zmq_assert (false);
return false; return false;
} }
......
...@@ -41,7 +41,7 @@ namespace zmq ...@@ -41,7 +41,7 @@ namespace zmq
virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0; virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0;
virtual bool stalled () const = 0; virtual bool stalled () const = 0;
virtual bool message_ready_size (size_t msg_sz) = 0; virtual bool message_ready_size (size_t msg_sz) = 0;
}; };
......
...@@ -114,7 +114,7 @@ namespace zmq ...@@ -114,7 +114,7 @@ namespace zmq
// If true, the identity message is forwarded to the socket. // If true, the identity message is forwarded to the socket.
bool recv_identity; bool recv_identity;
// if true, router socket accepts non-zmq tcp connections // if true, router socket accepts non-zmq tcp connections
bool raw_sock; bool raw_sock;
......
...@@ -93,7 +93,7 @@ bool zmq::raw_decoder_t::raw_message_ready () ...@@ -93,7 +93,7 @@ bool zmq::raw_decoder_t::raw_message_ready ()
// raw_message_ready should never get called in state machine w/o // raw_message_ready should never get called in state machine w/o
// message_ready_size from stream_engine. // message_ready_size from stream_engine.
next_step (in_progress.data (), 1, next_step (in_progress.data (), 1,
&raw_decoder_t::raw_message_ready); &raw_decoder_t::raw_message_ready);
return true; return true;
} }
...@@ -38,7 +38,7 @@ namespace zmq ...@@ -38,7 +38,7 @@ namespace zmq
{ {
public: public:
raw_decoder_t (size_t bufsize_, raw_decoder_t (size_t bufsize_,
int64_t maxmsgsize_, i_msg_sink *msg_sink_); int64_t maxmsgsize_, i_msg_sink *msg_sink_);
virtual ~raw_decoder_t (); virtual ~raw_decoder_t ();
......
...@@ -58,7 +58,6 @@ bool zmq::raw_encoder_t::raw_message_size_ready () ...@@ -58,7 +58,6 @@ bool zmq::raw_encoder_t::raw_message_size_ready ()
bool zmq::raw_encoder_t::raw_message_ready () bool zmq::raw_encoder_t::raw_message_ready ()
{ {
// Destroy content of the old message. // Destroy content of the old message.
int rc = in_progress.close (); int rc = in_progress.close ();
errno_assert (rc == 0); errno_assert (rc == 0);
......
...@@ -40,7 +40,6 @@ ...@@ -40,7 +40,6 @@
namespace zmq namespace zmq
{ {
// Encoder for 0MQ framing protocol. Converts messages into data batches. // Encoder for 0MQ framing protocol. Converts messages into data batches.
class raw_encoder_t : public encoder_base_t <raw_encoder_t> class raw_encoder_t : public encoder_base_t <raw_encoder_t>
......
...@@ -78,8 +78,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) ...@@ -78,8 +78,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
int zmq::router_t::xsetsockopt (int option_, const void *optval_, int zmq::router_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
if (option_ != ZMQ_ROUTER_MANDATORY && if (option_ != ZMQ_ROUTER_MANDATORY
option_ != ZMQ_ROUTER_RAW_SOCK) { && option_ != ZMQ_ROUTER_RAW_SOCK) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
...@@ -87,16 +87,15 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, ...@@ -87,16 +87,15 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
if(option_ == ZMQ_ROUTER_RAW_SOCK){ if (option_ == ZMQ_ROUTER_RAW_SOCK) {
raw_sock = *static_cast <const int*> (optval_); raw_sock = *static_cast <const int*> (optval_);
if(raw_sock){ if (raw_sock) {
options.recv_identity = false; options.recv_identity = false;
options.raw_sock = true; options.raw_sock = true;
} }
}else{
mandatory = *static_cast <const int*> (optval_);
} }
else
mandatory = *static_cast <const int*> (optval_);
return 0; return 0;
} }
...@@ -170,8 +169,8 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) ...@@ -170,8 +169,8 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
it->second.active = false; it->second.active = false;
current_out = NULL; current_out = NULL;
} }
} }
else else
if (mandatory) { if (mandatory) {
more_out = false; more_out = false;
errno = EHOSTUNREACH; errno = EHOSTUNREACH;
...@@ -186,9 +185,9 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) ...@@ -186,9 +185,9 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
return 0; return 0;
} }
// ignore the MORE flag for raw-sock or assert? // Ignore the MORE flag for raw-sock or assert?
if(options.raw_sock) if (options.raw_sock)
msg_->reset_flags(msg_t::more); msg_->reset_flags (msg_t::more);
// Check whether this is the last part of the message. // Check whether this is the last part of the message.
more_out = msg_->flags () & msg_t::more ? true : false; more_out = msg_->flags () & msg_t::more ? true : false;
...@@ -199,13 +198,13 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) ...@@ -199,13 +198,13 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
// Close the remote connection if user has asked to do so // Close the remote connection if user has asked to do so
// by sending zero length message. // by sending zero length message.
// Pending messages in the pipe will be dropped (on receiving term- ack) // Pending messages in the pipe will be dropped (on receiving term- ack)
if (raw_sock && msg_->size() == 0){ if (raw_sock && msg_->size() == 0) {
current_out->terminate(false); current_out->terminate (false);
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
current_out = NULL; current_out = NULL;
return 0; return 0;
} }
bool ok = current_out->write (msg_); bool ok = current_out->write (msg_);
if (unlikely (!ok)) if (unlikely (!ok))
...@@ -349,12 +348,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) ...@@ -349,12 +348,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
blob_t identity; blob_t identity;
bool ok; bool ok;
if(options.raw_sock){ // always assign identity for raw-socket if (options.raw_sock) { // Always assign identity for raw-socket
unsigned char buf [5]; unsigned char buf [5];
buf [0] = 0; buf [0] = 0;
put_uint32 (buf + 1, next_peer_id++); put_uint32 (buf + 1, next_peer_id++);
identity = blob_t (buf, sizeof buf); identity = blob_t (buf, sizeof buf);
}else{ }
else {
msg.init (); msg.init ();
ok = pipe_->read (&msg); ok = pipe_->read (&msg);
if (!ok) if (!ok)
......
...@@ -120,10 +120,10 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -120,10 +120,10 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
identity_received (false), identity_received (false),
addr (addr_) addr (addr_)
{ {
// identities are not exchanged for raw sockets // Identities are not exchanged for raw sockets
if(options.raw_sock){ if (options.raw_sock) {
identity_sent = (true); identity_sent = true;
identity_received = (true); identity_received = true;
} }
} }
...@@ -250,12 +250,12 @@ void zmq::session_base_t::terminated (pipe_t *pipe_) ...@@ -250,12 +250,12 @@ void zmq::session_base_t::terminated (pipe_t *pipe_)
// Remove the pipe from the detached pipes set // Remove the pipe from the detached pipes set
terminating_pipes.erase (pipe_); terminating_pipes.erase (pipe_);
if (!is_terminating() && options.raw_sock){ if (!is_terminating () && options.raw_sock) {
if(engine){ if (engine) {
engine->terminate (); engine->terminate ();
engine = NULL; engine = NULL;
} }
terminate(); terminate ();
} }
......
...@@ -135,7 +135,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -135,7 +135,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
io_object_t::plug (io_thread_); io_object_t::plug (io_thread_);
handle = add_fd (s); handle = add_fd (s);
if(options.raw_sock){ if (options.raw_sock) {
// no handshaking for raw sock, instantiate raw encoder and decoders // no handshaking for raw sock, instantiate raw encoder and decoders
encoder = new (std::nothrow) raw_encoder_t (out_batch_size, session); encoder = new (std::nothrow) raw_encoder_t (out_batch_size, session);
alloc_assert (encoder); alloc_assert (encoder);
...@@ -146,7 +146,8 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, ...@@ -146,7 +146,8 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// disable handshaking for raw socket // disable handshaking for raw socket
handshaking = false; handshaking = false;
}else{ }
else {
// Send the 'length' and 'flags' fields of the identity message. // Send the 'length' and 'flags' fields of the identity message.
// The 'length' field is encoded in the long format. // The 'length' field is encoded in the long format.
outpos = greeting_output_buffer; outpos = greeting_output_buffer;
...@@ -215,13 +216,13 @@ void zmq::stream_engine_t::in_event () ...@@ -215,13 +216,13 @@ void zmq::stream_engine_t::in_event ()
} }
} }
if(options.raw_sock){ if (options.raw_sock) {
if(insize == 0 || !decoder->message_ready_size(insize)){ if (insize == 0 || !decoder->message_ready_size (insize))
processed = 0; processed = 0;
}else{ else
processed = decoder->process_buffer (inpos, insize); processed = decoder->process_buffer (inpos, insize);
} }
}else{ else {
// Push the data to the decoder. // Push the data to the decoder.
processed = decoder->process_buffer (inpos, insize); processed = decoder->process_buffer (inpos, insize);
} }
......
...@@ -31,78 +31,72 @@ ...@@ -31,78 +31,72 @@
#include <assert.h> #include <assert.h>
#include <fcntl.h> #include <fcntl.h>
#include <zmq.h> #include <zmq.h>
#include<unistd.h> #include <unistd.h>
//ToDo: Windows? //ToDo: Windows?
const char *test_str = "TEST-STRING"; const char *test_str = "TEST-STRING";
int tcp_client ()
int tcp_client(){ {
int sockfd, portno;
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
struct hostent *server; struct hostent *server;
portno = 5555; const int portno = 5555;
sockfd = socket(AF_INET, SOCK_STREAM, 0); int sockfd = socket (AF_INET, SOCK_STREAM, 0);
assert(sockfd >=0 ); assert (sockfd >= 0);
server = gethostbyname("localhost"); server = gethostbyname ("localhost");
assert(server); assert (server);
bzero((char *) &serv_addr, sizeof(serv_addr)); bzero (&serv_addr, sizeof serv_addr);
serv_addr.sin_family = AF_INET; serv_addr.sin_family = AF_INET;
bcopy((char *)server->h_addr, bcopy (server->h_addr, &serv_addr.sin_addr.s_addr, server->h_length);
(char *)&serv_addr.sin_addr.s_addr, serv_addr.sin_port = htons (portno);
server->h_length);
serv_addr.sin_port = htons(portno);
if (connect(sockfd,(struct sockaddr *) &serv_addr,sizeof(serv_addr)) < 0) int rc = connect (sockfd, (struct sockaddr *) &serv_addr, sizeof serv_addr);
assert(0); assert (rc == 0);
int nodelay = 1; int nodelay = 1;
int rc = setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay, rc = setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
sizeof (int)); sizeof nodelay);
assert(rc == 0); assert (rc == 0);
return sockfd; return sockfd;
} }
void tcp_client_write(int sockfd, const void *buf, int buf_len){ void tcp_client_write (int sockfd, const void *buf, int buf_len)
assert(buf); {
int n = write(sockfd, buf, buf_len); assert (buf);
assert(n >= 0); int n = write (sockfd, buf, buf_len);
assert (n >= 0);
} }
void tcp_client_read(int sockfd){ void tcp_client_read (int sockfd)
{
struct timeval tm; struct timeval tm;
tm.tv_sec = 1; tm.tv_sec = 1;
tm.tv_usec = 0; tm.tv_usec = 0;
fd_set r; fd_set r;
int sr; char buffer [16];
char buffer[16];
FD_ZERO(&r); FD_ZERO (&r);
FD_SET(sockfd, &r); FD_SET (sockfd, &r);
if ((sr = select(sockfd + 1, &r, NULL, NULL, &tm)) <= 0) int sr = select (sockfd + 1, &r, NULL, NULL, &tm);
{ assert (sr > 0);
assert(0);
}
int n = read(sockfd, buffer, 16); int n = read (sockfd, buffer, 16);
assert(n>0); assert (n > 0);
assert(memcmp(buffer, test_str, strlen(test_str)) == 0); assert (memcmp (buffer, test_str, strlen (test_str)) == 0);
} }
void tcp_client_close (int sockfd)
void tcp_client_close(int sockfd){ {
close(sockfd); close (sockfd);
} }
int main ()
int main(){ {
fprintf (stderr, "test_raw_sock running...\n"); fprintf (stderr, "test_raw_sock running...\n");
zmq_msg_t message; zmq_msg_t message;
...@@ -112,56 +106,49 @@ int main(){ ...@@ -112,56 +106,49 @@ int main(){
void *ctx = zmq_init (1); void *ctx = zmq_init (1);
assert (ctx); assert (ctx);
int raw_sock = 1, rc = 0;
void *sb = zmq_socket (ctx, ZMQ_ROUTER); void *sb = zmq_socket (ctx, ZMQ_ROUTER);
assert (sb); assert (sb);
rc = zmq_setsockopt( sb, ZMQ_ROUTER_RAW_SOCK, &raw_sock, sizeof(int));
assert(rc == 0); int raw_sock = 1;
int rc = zmq_setsockopt (sb, ZMQ_ROUTER_RAW_SOCK, &raw_sock, sizeof raw_sock);
assert (rc == 0);
rc = zmq_bind (sb, "tcp://127.0.0.1:5555"); rc = zmq_bind (sb, "tcp://127.0.0.1:5555");
assert (rc == 0); assert (rc == 0);
int sock_fd = tcp_client(); int sock_fd = tcp_client ();
assert(sock_fd >= 0); assert (sock_fd >= 0);
// =================== // ===================
zmq_msg_init(&message); zmq_msg_init (&message);
zmq_msg_init(&id); zmq_msg_init (&id);
assert (rc == 0); assert (rc == 0);
zmq_pollitem_t items [] = { zmq_pollitem_t items [] = {
{ sb, 0, ZMQ_POLLIN, 0 }, { sb, 0, ZMQ_POLLIN, 0 },
}; };
tcp_client_write(sock_fd, test_str, strlen(test_str)); tcp_client_write (sock_fd, test_str, strlen (test_str));
zmq_poll (items, 1, 500); zmq_poll (items, 1, 500);
if (items [0].revents & ZMQ_POLLIN) { assert (items [0].revents & ZMQ_POLLIN);
int n = zmq_msg_recv (&id, sb, 0); int n = zmq_msg_recv (&id, sb, 0);
assert(n > 0); assert (n > 0);
n = zmq_msg_recv (&message, sb, 0); n = zmq_msg_recv (&message, sb, 0);
assert(n > 0); assert (n > 0);
assert(memcmp(zmq_msg_data (&message), test_str, strlen(test_str)) == 0); assert (memcmp (zmq_msg_data (&message), test_str, strlen (test_str)) == 0);
}else{
assert(0);
}
zmq_msg_send (&id, sb, ZMQ_SNDMORE); zmq_msg_send (&id, sb, ZMQ_SNDMORE);
zmq_msg_send (&message, sb, ZMQ_SNDMORE);// SNDMORE option is ignored zmq_msg_send (&message, sb, ZMQ_SNDMORE); // SNDMORE option is ignored
tcp_client_read(sock_fd); tcp_client_read (sock_fd);
tcp_client_close(sock_fd); tcp_client_close (sock_fd);
zmq_msg_close(&id); zmq_msg_close (&id);
zmq_msg_close(&message); zmq_msg_close (&message);
zmq_close (sb);
zmq_close(sb); zmq_term (ctx);
zmq_term(ctx);
fprintf (stderr, "test_raw_sock PASSED.\n"); fprintf (stderr, "test_raw_sock PASSED.\n");
return 0; return 0;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment