Commit ec082414 authored by Pieter Hintjens's avatar Pieter Hintjens

Merge pull request #462 from hurtonm/raw_fixes

Fix raw mode on reconnect
parents 4e028ecb c1e960b3
...@@ -120,11 +120,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, ...@@ -120,11 +120,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
identity_received (false), identity_received (false),
addr (addr_) addr (addr_)
{ {
// Identities are not exchanged for raw sockets
if (options.raw_sock) {
identity_sent = true;
identity_received = true;
}
} }
zmq::session_base_t::~session_base_t () zmq::session_base_t::~session_base_t ()
...@@ -156,8 +151,9 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) ...@@ -156,8 +151,9 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::pull_msg (msg_t *msg_) int zmq::session_base_t::pull_msg (msg_t *msg_)
{ {
// First message to send is identity // Unless the socket is in raw mode, the first
if (unlikely (!identity_sent)) { // message we send is its identity.
if (unlikely (!identity_sent && !options.raw_sock)) {
int rc = msg_->init_size (options.identity_size); int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0); errno_assert (rc == 0);
memcpy (msg_->data (), options.identity, options.identity_size); memcpy (msg_->data (), options.identity, options.identity_size);
...@@ -177,8 +173,9 @@ int zmq::session_base_t::pull_msg (msg_t *msg_) ...@@ -177,8 +173,9 @@ int zmq::session_base_t::pull_msg (msg_t *msg_)
int zmq::session_base_t::push_msg (msg_t *msg_) int zmq::session_base_t::push_msg (msg_t *msg_)
{ {
// First message to receive is identity // Unless the socket is in raw mode, the first
if (unlikely (!identity_received)) { // message we receive is its identity.
if (unlikely (!identity_received && !options.raw_sock)) {
msg_->set_flags (msg_t::identity); msg_->set_flags (msg_t::identity);
identity_received = true; identity_received = true;
if (!options.recv_identity) { if (!options.recv_identity) {
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include <fcntl.h> #include <fcntl.h>
#include <zmq.h> #include <zmq.h>
#include <unistd.h> #include <unistd.h>
#include <poll.h>
//ToDo: Windows? //ToDo: Windows?
const char *test_str = "TEST-STRING"; const char *test_str = "TEST-STRING";
...@@ -63,6 +64,42 @@ int tcp_client () ...@@ -63,6 +64,42 @@ int tcp_client ()
return sockfd; return sockfd;
} }
int tcp_server ()
{
int listenfd = socket (AF_INET, SOCK_STREAM, 0);
assert (listenfd != -1);
int flag = 1;
int rc = setsockopt (listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag);
assert (rc == 0);
struct sockaddr_in serv_addr;
bzero (&serv_addr, sizeof serv_addr);
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl (INADDR_ANY);
serv_addr.sin_port = htons (5555);
rc = bind (listenfd, (struct sockaddr *) &serv_addr, sizeof serv_addr);
assert (rc == 0);
rc = listen (listenfd, 8);
assert (rc == 0);
int sockfd = accept (listenfd, NULL, NULL);
assert (sockfd != -1);
rc = close (listenfd);
assert (rc == 0);
int flags = fcntl (sockfd, F_GETFL, 0);
if (flags == -1)
flags = 0;
rc = fcntl (sockfd, F_SETFL, flags | O_NONBLOCK);
assert (rc != -1);
return sockfd;
}
void tcp_client_write (int sockfd, const void *buf, int buf_len) void tcp_client_write (int sockfd, const void *buf, int buf_len)
{ {
assert (buf); assert (buf);
...@@ -90,11 +127,75 @@ void tcp_client_read (int sockfd) ...@@ -90,11 +127,75 @@ void tcp_client_read (int sockfd)
assert (memcmp (buffer, test_str, strlen (test_str)) == 0); assert (memcmp (buffer, test_str, strlen (test_str)) == 0);
} }
size_t tcp_read (int s, char *buf, size_t bufsize)
{
size_t bytes_read = 0;
struct pollfd pfd = {s, POLLIN};
int rc = poll (&pfd, 1, 100);
while (rc > 0 && bytes_read < bufsize) {
int n = read (s, buf + bytes_read, bufsize - bytes_read);
if (n <= 0)
return bytes_read;
bytes_read += n;
rc = poll (&pfd, 1, 100);
}
return bytes_read;
}
void tcp_client_close (int sockfd) void tcp_client_close (int sockfd)
{ {
close (sockfd); close (sockfd);
} }
void test_zmq_connect ()
{
void *ctx = zmq_init (1);
assert (ctx);
void *zs = zmq_socket (ctx, ZMQ_ROUTER);
assert (zs);
int rc = zmq_setsockopt (zs, ZMQ_IDENTITY, "X", 1);
assert (rc == 0);
int raw_sock = 1;
rc = zmq_setsockopt (zs, ZMQ_ROUTER_RAW, &raw_sock, sizeof raw_sock);
assert (rc == 0);
rc = zmq_connect (zs, "tcp://127.0.0.1:5555");
assert (rc == 0);
int i;
for (i = 0; i < 8; i++) {
int server_fd = tcp_server ();
assert (server_fd != -1);
zmq_msg_t msg;
rc = zmq_msg_init_size (&msg, strlen (test_str));
assert (rc == 0);
memcpy (zmq_msg_data (&msg), test_str, strlen (test_str));
rc = zmq_msg_send (&msg, zs, 0);
char buffer [128];
size_t bytes_read = tcp_read (server_fd, buffer, sizeof buffer);
assert (bytes_read == strlen (test_str)
|| memcmp (buffer, test_str, bytes_read) == 0);
rc = close (server_fd);
assert (rc == 0);
}
rc = zmq_close (zs);
assert (rc == 0);
rc = zmq_term (ctx);
assert (rc == 0);
}
int main () int main ()
{ {
fprintf (stderr, "test_raw_sock running...\n"); fprintf (stderr, "test_raw_sock running...\n");
...@@ -148,6 +249,8 @@ int main () ...@@ -148,6 +249,8 @@ int main ()
zmq_close (sb); zmq_close (sb);
zmq_term (ctx); zmq_term (ctx);
test_zmq_connect ();
fprintf (stderr, "test_raw_sock PASSED.\n"); fprintf (stderr, "test_raw_sock PASSED.\n");
return 0; return 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment