Commit 11fec367 authored by malosek's avatar malosek

added pending event fd handling by the pgm_sender

parent 472ddf8d
...@@ -54,6 +54,8 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) ...@@ -54,6 +54,8 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
out_buffer_size = pgm_socket.get_max_tsdu_size (); out_buffer_size = pgm_socket.get_max_tsdu_size ();
out_buffer = (unsigned char*) malloc (out_buffer_size); out_buffer = (unsigned char*) malloc (out_buffer_size);
zmq_assert (out_buffer); zmq_assert (out_buffer);
return rc;
} }
void zmq::pgm_sender_t::plug (i_inout *inout_) void zmq::pgm_sender_t::plug (i_inout *inout_)
...@@ -62,20 +64,24 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) ...@@ -62,20 +64,24 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
int downlink_socket_fd = 0; int downlink_socket_fd = 0;
int uplink_socket_fd = 0; int uplink_socket_fd = 0;
int rdata_notify_fd = 0; int rdata_notify_fd = 0;
int pending_notify_fd = 0;
encoder.set_inout (inout_); encoder.set_inout (inout_);
// Fill fds from PGM transport and add them to the poller. // Fill fds from PGM transport and add them to the poller.
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd, pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd,
&rdata_notify_fd); &rdata_notify_fd, &pending_notify_fd);
handle = add_fd (downlink_socket_fd); handle = add_fd (downlink_socket_fd);
uplink_handle = add_fd (uplink_socket_fd); uplink_handle = add_fd (uplink_socket_fd);
rdata_notify_handle = add_fd (rdata_notify_fd); rdata_notify_handle = add_fd (rdata_notify_fd);
pending_notify_handle = add_fd (pending_notify_fd);
// Set POLLIN. We wont never want to stop polling for uplink = we never // Set POLLIN. We wont never want to stop polling for uplink = we never
// want to stop porocess NAKs. // want to stop porocess NAKs.
set_pollin (uplink_handle); set_pollin (uplink_handle);
set_pollin (rdata_notify_handle); set_pollin (rdata_notify_handle);
set_pollin (pending_notify_handle);
// Set POLLOUT for downlink_socket_handle. // Set POLLOUT for downlink_socket_handle.
set_pollout (handle); set_pollout (handle);
...@@ -86,6 +92,7 @@ void zmq::pgm_sender_t::unplug () ...@@ -86,6 +92,7 @@ void zmq::pgm_sender_t::unplug ()
rm_fd (handle); rm_fd (handle);
rm_fd (uplink_handle); rm_fd (uplink_handle);
rm_fd (rdata_notify_handle); rm_fd (rdata_notify_handle);
rm_fd (pending_notify_handle);
encoder.set_inout (NULL); encoder.set_inout (NULL);
} }
......
...@@ -72,6 +72,7 @@ namespace zmq ...@@ -72,6 +72,7 @@ namespace zmq
handle_t handle; handle_t handle;
handle_t uplink_handle; handle_t uplink_handle;
handle_t rdata_notify_handle; handle_t rdata_notify_handle;
handle_t pending_notify_handle;
// Output buffer from pgm_socket. // Output buffer from pgm_socket.
unsigned char *out_buffer; unsigned char *out_buffer;
......
...@@ -375,13 +375,15 @@ void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_, ...@@ -375,13 +375,15 @@ void zmq::pgm_socket_t::get_receiver_fds (int *receive_fd_,
// sender_fd is from pgm_transport->send_sock. // sender_fd is from pgm_transport->send_sock.
// receive_fd_ is from transport->recv_sock. // receive_fd_ is from transport->recv_sock.
// rdata_notify_fd_ is from transport->rdata_notify. // rdata_notify_fd_ is from transport->rdata_notify.
// pending_notify_fd_ is from transport->pending_notify.
void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
int *rdata_notify_fd_) int *rdata_notify_fd_, int *pending_notify_fd_)
{ {
zmq_assert (send_fd_); zmq_assert (send_fd_);
zmq_assert (receive_fd_); zmq_assert (receive_fd_);
zmq_assert (rdata_notify_fd_); zmq_assert (rdata_notify_fd_);
zmq_assert (pending_notify_fd_);
// recv_sock2 should not be used - check it. // recv_sock2 should not be used - check it.
zmq_assert (transport->recv_sock2 == -1); zmq_assert (transport->recv_sock2 == -1);
...@@ -390,10 +392,12 @@ void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_, ...@@ -390,10 +392,12 @@ void zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
zmq_assert (transport->can_send_data); zmq_assert (transport->can_send_data);
zmq_assert (!transport->can_recv_data); zmq_assert (!transport->can_recv_data);
// Take FDs directly from transport. // Take FDs from transport.
*send_fd_ = pgm_transport_get_send_fd (transport);
*receive_fd_ = pgm_transport_get_recv_fd (transport); *receive_fd_ = pgm_transport_get_recv_fd (transport);
*rdata_notify_fd_ = pgm_transport_get_repair_fd (transport); *rdata_notify_fd_ = pgm_transport_get_repair_fd (transport);
*send_fd_ = pgm_transport_get_send_fd (transport); *pending_notify_fd_ = pgm_transport_get_pending_fd (transport);
} }
// Send one APDU, transmit window owned memory. // Send one APDU, transmit window owned memory.
......
...@@ -56,7 +56,7 @@ namespace zmq ...@@ -56,7 +56,7 @@ namespace zmq
// Get sender and receiver fds and store it to user allocated // Get sender and receiver fds and store it to user allocated
// memory. Receive fd is used to process NAKs from peers. // memory. Receive fd is used to process NAKs from peers.
void get_sender_fds (int *send_fd_, int *receive_fd_, void get_sender_fds (int *send_fd_, int *receive_fd_,
int *rdata_notify_fd_); int *rdata_notify_fd_, int *pending_notify_fd_);
// Send data as one APDU, transmit window owned memory. // Send data as one APDU, transmit window owned memory.
size_t send (unsigned char *data_, size_t data_len_); size_t send (unsigned char *data_, size_t data_len_);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment