Commit 08b02a43 authored by malosek's avatar malosek

fixed get_sender_fds and get_receiver_fds for openpgm2

parent 7d672d3a
...@@ -269,9 +269,9 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -269,9 +269,9 @@ int zmq::pgm_socket_t::open_transport (void)
// Set transport->can_send_data = FALSE. // Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport. // Note that NAKs are still generated by the transport.
#ifdef ZMQ_HAVE_OPENPGM1 #if defined ZMQ_HAVE_OPENPGM1
rc = pgm_transport_set_recv_only (g_transport, false); rc = pgm_transport_set_recv_only (g_transport, false);
#elif ZMQ_HAVE_OPENPGM2 #elif defined ZMQ_HAVE_OPENPGM2
rc = pgm_transport_set_recv_only (g_transport, true, false); rc = pgm_transport_set_recv_only (g_transport, true, false);
#endif #endif
if (rc != pgm_ok) { if (rc != pgm_ok) {
...@@ -479,12 +479,13 @@ void zmq::pgm_socket_t::close_transport (void) ...@@ -479,12 +479,13 @@ void zmq::pgm_socket_t::close_transport (void)
// Get receiver fds. recv_fd is from transport->recv_sock // Get receiver fds. recv_fd is from transport->recv_sock
// waiting_pipe_fd is from transport->waiting_pipe [0] // waiting_pipe_fd is from transport->waiting_pipe [0]
int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_, int zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
int *waiting_pipe_fd_) int *waiting_pipe_fd_)
{ {
#ifdef ZMQ_HAVE_WINDOWS zmq_assert (receive_fd_);
zmq_assert (false); zmq_assert (waiting_pipe_fd_);
#else
#if defined ZMQ_HAVE_OPENPGM1
// For POLLIN there are 2 pollfds in pgm_transport. // For POLLIN there are 2 pollfds in pgm_transport.
int fds_array_size = pgm_receiver_fd_count; int fds_array_size = pgm_receiver_fd_count;
pollfd *fds = new pollfd [fds_array_size]; pollfd *fds = new pollfd [fds_array_size];
...@@ -500,39 +501,46 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_, ...@@ -500,39 +501,46 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_,
zmq_assert (rc == pgm_receiver_fd_count); zmq_assert (rc == pgm_receiver_fd_count);
// Store pfds into user allocated space. // Store pfds into user allocated space.
*recv_fd_ = fds [0].fd; *receive_fd_ = fds [0].fd;
*waiting_pipe_fd_ = fds [1].fd; *waiting_pipe_fd_ = fds [1].fd;
delete [] fds; delete [] fds;
#elif defined ZMQ_HAVE_OPENPGM2
// recv_sock2 should not be used - check it.
zmq_assert (g_transport->recv_sock2 == -1);
// Check if transport can receive data and can not send.
zmq_assert (g_transport->can_recv_data);
zmq_assert (!g_transport->can_send_data);
// Take FDs directly from transport.
*receive_fd_ = g_transport->recv_sock;
*waiting_pipe_fd_ = pgm_notify_get_fd (&g_transport->pending_notify);
#endif #endif
return pgm_receiver_fd_count; return pgm_receiver_fd_count;
} }
// Get fds and store them into user allocated memory. // Get fds and store them into user allocated memory.
// sender_fd is from pgm_transport->send_sock. // sender_fd is from pgm_transport->send_sock.
// receive_fd_ is from transport->recv_sock. // receive_fd_ is from transport->recv_sock.
// rdata_notify_fd_ is from transport->rdata_notify (PGM2 only).
int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
int *rdata_notify_fd_) int *rdata_notify_fd_)
{ {
#ifdef ZMQ_HAVE_OPENPGM1
zmq_assert (send_fd_); zmq_assert (send_fd_);
zmq_assert (receive_fd_); zmq_assert (receive_fd_);
#if defined ZMQ_HAVE_OPENPGM1
zmq_assert (!rdata_notify_fd_); zmq_assert (!rdata_notify_fd_);
#elif ZMQ_HAVE_OPENPGM2
zmq_assert (send_fd_);
zmq_assert (receive_fd_);
zmq_assert (rdata_notify_fd_);
#endif
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (false);
#else
// Preallocate pollfds array. // Preallocate pollfds array.
int fds_array_size = pgm_sender_fd_count; int fds_array_size = pgm_sender_fd_count;
pollfd *fds = new pollfd [fds_array_size]; pollfd *fds = new pollfd [fds_array_size];
memset (fds, '\0', fds_array_size * sizeof (fds)); memset (fds, '\0', fds_array_size * sizeof (fds));
// Retrieve pollfds from pgm_transport // Retrieve pollfds from pgm_transport.
int rc = pgm_transport_poll_info (g_transport, fds, &fds_array_size, int rc = pgm_transport_poll_info (g_transport, fds, &fds_array_size,
POLLOUT | POLLIN); POLLOUT | POLLIN);
...@@ -541,18 +549,27 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, ...@@ -541,18 +549,27 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
// Note that fds_array_size parameter can be // Note that fds_array_size parameter can be
// changed inside pgm_transport_poll_info call. // changed inside pgm_transport_poll_info call.
zmq_assert (rc == pgm_sender_fd_count); zmq_assert (rc == pgm_sender_fd_count);
// Store pfds into user allocated space. // Store pfds into user allocated space.
#ifdef ZMQ_HAVE_OPENPGM1
*receive_fd_ = fds [0].fd; *receive_fd_ = fds [0].fd;
*send_fd_ = fds [1].fd; *send_fd_ = fds [1].fd;
#elif ZMQ_HAVE_OPENPGM2
*receive_fd_ = fds [0].fd;
*rdata_notify_fd_ = fds [1].fd;
*send_fd_ = fds [2].fd;
#endif
delete [] fds; delete [] fds;
#elif defined ZMQ_HAVE_OPENPGM2
zmq_assert (rdata_notify_fd_);
// recv_sock2 should not be used - check it.
zmq_assert (g_transport->recv_sock2 == -1);
// Check if transport can send data and can not receive.
zmq_assert (g_transport->can_send_data);
zmq_assert (!g_transport->can_recv_data);
// Take FDs directly from transport.
*receive_fd_ = g_transport->recv_sock;
*rdata_notify_fd_ = pgm_notify_get_fd (&g_transport->rdata_notify);
*send_fd_ = g_transport->send_sock;
#endif #endif
return pgm_sender_fd_count; return pgm_sender_fd_count;
...@@ -562,7 +579,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, ...@@ -562,7 +579,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{ {
#ifdef ZMQ_HAVE_OPENPGM1 #if defined ZMQ_HAVE_OPENPGM1
ssize_t nbytes = 0; ssize_t nbytes = 0;
iovec iov = {data_,data_len_}; iovec iov = {data_,data_len_};
...@@ -580,7 +597,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) ...@@ -580,7 +597,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
// now. We have to call write_one_pkt again. // now. We have to call write_one_pkt again.
nbytes = nbytes == -1 ? 0 : nbytes; nbytes = nbytes == -1 ? 0 : nbytes;
#elif ZMQ_HAVE_OPENPGM2 #elif defined ZMQ_HAVE_OPENPGM2
size_t nbytes = 0; size_t nbytes = 0;
...@@ -642,10 +659,10 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_) ...@@ -642,10 +659,10 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)
// Store size. // Store size.
*size_ = get_max_tsdu_size (); *size_ = get_max_tsdu_size ();
#ifdef ZMQ_HAVE_OPENPGM1 #if defined ZMQ_HAVE_OPENPGM1
// Allocate one packet in tx window. // Allocate one packet in tx window.
return pgm_packetv_alloc (g_transport, false); return pgm_packetv_alloc (g_transport, false);
#elif ZMQ_HAVE_OPENPGM2 #elif defined ZMQ_HAVE_OPENPGM2
// Allocate buffer. // Allocate buffer.
unsigned char *apdu_buff = new unsigned char [*size_]; unsigned char *apdu_buff = new unsigned char [*size_];
zmq_assert (apdu_buff); zmq_assert (apdu_buff);
...@@ -657,9 +674,9 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_) ...@@ -657,9 +674,9 @@ void *zmq::pgm_socket_t::get_buffer (size_t *size_)
// via pgm_packetv_alloc(). // via pgm_packetv_alloc().
void zmq::pgm_socket_t::free_buffer (void *data_) void zmq::pgm_socket_t::free_buffer (void *data_)
{ {
#ifdef ZMQ_HAVE_OPENPGM1 #if defined ZMQ_HAVE_OPENPGM1
pgm_packetv_free1 (g_transport, data_, false); pgm_packetv_free1 (g_transport, data_, false);
#elif ZMQ_HAVE_OPENPGM2 #elif defined ZMQ_HAVE_OPENPGM2
delete [] (unsigned char*) data_; delete [] (unsigned char*) data_;
#endif #endif
} }
......
...@@ -58,7 +58,7 @@ namespace zmq ...@@ -58,7 +58,7 @@ namespace zmq
void close_transport (void); void close_transport (void);
// Get receiver fds and store them into user allocated memory. // Get receiver fds and store them into user allocated memory.
int get_receiver_fds (int *recv_fd_, int *waiting_pipe_fd_); int get_receiver_fds (int *receive_fd_, int *waiting_pipe_fd_);
// Get sender and receiver fds and store it to user allocated // Get sender and receiver fds and store it to user allocated
// memory. Receive fd is used to process NAKs from peers. // memory. Receive fd is used to process NAKs from peers.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment