Commit 39d915de authored by malosek's avatar malosek

PGM2 sender

parent cf6cc012
...@@ -153,7 +153,6 @@ void zmq::pgm_receiver_t::in_event () ...@@ -153,7 +153,6 @@ void zmq::pgm_receiver_t::in_event ()
// information (sizeof uint16_t). // information (sizeof uint16_t).
raw_data += sizeof (uint16_t); raw_data += sizeof (uint16_t);
nbytes -= sizeof (uint16_t); nbytes -= sizeof (uint16_t);
zmq_assert (apdu_offset <= nbytes);
// New peer. // New peer.
if (it == peers.end ()) { if (it == peers.end ()) {
...@@ -174,6 +173,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -174,6 +173,7 @@ void zmq::pgm_receiver_t::in_event ()
// Now is the possibility to join the stream. // Now is the possibility to join the stream.
if (!it->second.joined) { if (!it->second.joined) {
zmq_assert (apdu_offset <= nbytes);
zmq_assert (it->second.decoder == NULL); zmq_assert (it->second.decoder == NULL);
// We have to move data to the begining of the first message. // We have to move data to the begining of the first message.
......
...@@ -68,13 +68,21 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) ...@@ -68,13 +68,21 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
{ {
// Alocate 2 fds for PGM socket. // Alocate 2 fds for PGM socket.
int downlink_socket_fd; int downlink_socket_fd = 0;
int uplink_socket_fd; int uplink_socket_fd = 0;
#ifdef ZMQ_HAVE_OPENPGM2
int rdata_notify_fd = 0;
#endif
encoder.set_inout (inout_); encoder.set_inout (inout_);
// Fill fds from PGM transport. // Fill fds from PGM transport.
#ifdef ZMQ_HAVE_OPENPGM1
pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd); pgm_socket.get_sender_fds (&downlink_socket_fd, &uplink_socket_fd);
#elif ZMQ_HAVE_OPENPGM2
pgm_socket.get_sender_fds
(&downlink_socket_fd, &uplink_socket_fd, &rdata_notify_fd);
#endif
// Add downlink_socket_fd into poller. // Add downlink_socket_fd into poller.
handle = add_fd (downlink_socket_fd); handle = add_fd (downlink_socket_fd);
...@@ -82,9 +90,17 @@ void zmq::pgm_sender_t::plug (i_inout *inout_) ...@@ -82,9 +90,17 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
// Add uplink_socket_fd into the poller. // Add uplink_socket_fd into the poller.
uplink_handle = add_fd (uplink_socket_fd); uplink_handle = add_fd (uplink_socket_fd);
// Add rdata_notify_fd into the poller.
#ifdef ZMQ_HAVE_OPENPGM2
rdata_notify_handle = add_fd (rdata_notify_fd);
#endif
// Set POLLIN. We wont never want to stop polling for uplink = we never // Set POLLIN. We wont never want to stop polling for uplink = we never
// want to stop porocess NAKs. // want to stop porocess NAKs.
set_pollin (uplink_handle); set_pollin (uplink_handle);
#ifdef ZMQ_HAVE_OPENPGM2
set_pollin (rdata_notify_handle);
#endif
// Set POLLOUT for downlink_socket_handle. // Set POLLOUT for downlink_socket_handle.
set_pollout (handle); set_pollout (handle);
...@@ -96,6 +112,9 @@ void zmq::pgm_sender_t::unplug () ...@@ -96,6 +112,9 @@ void zmq::pgm_sender_t::unplug ()
{ {
rm_fd (handle); rm_fd (handle);
rm_fd (uplink_handle); rm_fd (uplink_handle);
#ifdef ZMQ_HAVE_OPENPGM2
rm_fd (rdata_notify_handle);
#endif
encoder.set_inout (NULL); encoder.set_inout (NULL);
inout = NULL; inout = NULL;
} }
...@@ -167,11 +186,12 @@ void zmq::pgm_sender_t::out_event () ...@@ -167,11 +186,12 @@ void zmq::pgm_sender_t::out_event ()
zmq_log (1, "pgm rate limit reached, %s(%i)\n", __FILE__, __LINE__); zmq_log (1, "pgm rate limit reached, %s(%i)\n", __FILE__, __LINE__);
} }
#ifdef ZMQ_HAVE_OPENPGM1
// After sending data slice is owned by tx window. // After sending data slice is owned by tx window.
if (nbytes) { if (nbytes) {
out_buffer = NULL; out_buffer = NULL;
} }
#endif
write_pos += nbytes; write_pos += nbytes;
} }
......
...@@ -76,6 +76,9 @@ namespace zmq ...@@ -76,6 +76,9 @@ namespace zmq
// Poll handle associated with PGM socket. // Poll handle associated with PGM socket.
handle_t handle; handle_t handle;
handle_t uplink_handle; handle_t uplink_handle;
#ifdef ZMQ_HAVE_OPENPGM2
handle_t rdata_notify_handle;
#endif
// Parent session. // Parent session.
i_inout *inout; i_inout *inout;
......
...@@ -39,7 +39,7 @@ ...@@ -39,7 +39,7 @@
#include "uuid.hpp" #include "uuid.hpp"
//#define PGM_SOCKET_DEBUG //#define PGM_SOCKET_DEBUG
//#define PGM_SOCKET_DEBUG_LEVEL 1 //#define PGM_SOCKET_DEBUG_LEVEL 4
// level 1 = key behaviour // level 1 = key behaviour
// level 2 = processing flow // level 2 = processing flow
...@@ -275,7 +275,7 @@ int zmq::pgm_socket_t::open_transport (void) ...@@ -275,7 +275,7 @@ int zmq::pgm_socket_t::open_transport (void)
// Set transport->can_send_data = FALSE. // Set transport->can_send_data = FALSE.
// Note that NAKs are still generated by the transport. // Note that NAKs are still generated by the transport.
rc = pgm_transport_set_recv_only (g_transport, false); rc = pgm_transport_set_recv_only (g_transport, true, false);
if (rc != pgm_ok) { if (rc != pgm_ok) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
...@@ -511,8 +511,18 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_, ...@@ -511,8 +511,18 @@ int zmq::pgm_socket_t::get_receiver_fds (int *recv_fd_,
// Get fds and store them into user allocated memory. // Get fds and store them into user allocated memory.
// sender_fd is from pgm_transport->send_sock. // sender_fd is from pgm_transport->send_sock.
// receive_fd_ is from transport->recv_sock. // receive_fd_ is from transport->recv_sock.
int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_,
int *rdata_notify_fd_)
{ {
#ifdef ZMQ_HAVE_OPENPGM1
zmq_assert (send_fd_);
zmq_assert (receive_fd_);
zmq_assert (!rdata_notify_fd_);
#elif ZMQ_HAVE_OPENPGM2
zmq_assert (send_fd_);
zmq_assert (receive_fd_);
zmq_assert (rdata_notify_fd_);
#endif
// Preallocate pollfds array. // Preallocate pollfds array.
int fds_array_size = pgm_sender_fd_count; int fds_array_size = pgm_sender_fd_count;
...@@ -530,8 +540,14 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) ...@@ -530,8 +540,14 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)
zmq_assert (rc == pgm_sender_fd_count); zmq_assert (rc == pgm_sender_fd_count);
// Store pfds into user allocated space. // Store pfds into user allocated space.
#ifdef ZMQ_HAVE_OPENPGM1
*receive_fd_ = fds [0].fd; *receive_fd_ = fds [0].fd;
*send_fd_ = fds [1].fd; *send_fd_ = fds [1].fd;
#elif ZMQ_HAVE_OPENPGM2
*receive_fd_ = fds [0].fd;
*rdata_notify_fd_ = fds [1].fd;
*send_fd_ = fds [2].fd;
#endif
delete [] fds; delete [] fds;
...@@ -542,10 +558,9 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_) ...@@ -542,10 +558,9 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{ {
ssize_t nbytes = 0;
#ifdef ZMQ_HAVE_OPENPGM1 #ifdef ZMQ_HAVE_OPENPGM1
ssize_t nbytes = 0;
iovec iov = {data_,data_len_}; iovec iov = {data_,data_len_};
nbytes = pgm_transport_send_packetv (g_transport, &iov, 1, nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,
...@@ -561,13 +576,30 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) ...@@ -561,13 +576,30 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
// now. We have to call write_one_pkt again. // now. We have to call write_one_pkt again.
nbytes = nbytes == -1 ? 0 : nbytes; nbytes = nbytes == -1 ? 0 : nbytes;
zmq_log (4, "wrote %iB, %s(%i)\n", (int)nbytes, __FILE__, __LINE__); #elif ZMQ_HAVE_OPENPGM2
size_t nbytes = 0;
PGMIOStatus status = pgm_send (g_transport, data_, data_len_, &nbytes);
if (nbytes != data_len_) {
zmq_log (1, "status %i, data_len %i, wrote %iB, %s(%i)\n",
(int) status, (int) data_len_, (int) nbytes, __FILE__, __LINE__);
zmq_assert (status == PGM_IO_STATUS_AGAIN2);
zmq_assert (nbytes == 0);
}
#endif
zmq_log (4, "wrote %i/%iB, %s(%i)\n", (int) nbytes, (int) data_len_,
__FILE__, __LINE__);
// We have to write all data as one packet. // We have to write all data as one packet.
if (nbytes > 0) { if (nbytes > 0) {
zmq_assert (nbytes == (ssize_t)data_len_); zmq_log (1, "data sent %i, %s(%i)\n", (int) nbytes,
__FILE__, __LINE__);
zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_);
} }
#endif
return nbytes; return nbytes;
} }
...@@ -603,15 +635,17 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_) ...@@ -603,15 +635,17 @@ size_t zmq::pgm_socket_t::get_max_apdu_at_once (size_t readbuf_size_)
// content via pgm_transport_send() calls or unused with pgm_packetv_free1(). // content via pgm_transport_send() calls or unused with pgm_packetv_free1().
void *zmq::pgm_socket_t::get_buffer (size_t *size_) void *zmq::pgm_socket_t::get_buffer (size_t *size_)
{ {
#ifdef ZMQ_HAVE_OPENPGM1
// Store size. // Store size.
*size_ = get_max_tsdu_size (); *size_ = get_max_tsdu_size ();
// Allocate one packet. #ifdef ZMQ_HAVE_OPENPGM1
// Allocate one packet in tx window.
return pgm_packetv_alloc (g_transport, false); return pgm_packetv_alloc (g_transport, false);
#elif ZMQ_HAVE_OPENPGM2 #elif ZMQ_HAVE_OPENPGM2
zmq_assert (false); // Allocate buffer.
unsigned char *apdu_buff = new unsigned char [*size_];
zmq_assert (apdu_buff);
return apdu_buff;
#endif #endif
} }
...@@ -622,7 +656,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_) ...@@ -622,7 +656,7 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
#ifdef ZMQ_HAVE_OPENPGM1 #ifdef ZMQ_HAVE_OPENPGM1
pgm_packetv_free1 (g_transport, data_, false); pgm_packetv_free1 (g_transport, data_, false);
#elif ZMQ_HAVE_OPENPGM2 #elif ZMQ_HAVE_OPENPGM2
zmq_assert (false); delete [] (unsigned char*) data_;
#endif #endif
} }
...@@ -718,6 +752,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ...@@ -718,6 +752,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n", zmq_log (1, "PGMIOStatus %i, nbytes_rec %i, %s(%i).\n",
status, (int) nbytes_rec, __FILE__, __LINE__); status, (int) nbytes_rec, __FILE__, __LINE__);
zmq_assert (false);
nbytes_rec = 0; nbytes_rec = 0;
return -1; return -1;
} }
...@@ -767,21 +803,34 @@ void zmq::pgm_socket_t::process_upstream (void) ...@@ -767,21 +803,34 @@ void zmq::pgm_socket_t::process_upstream (void)
{ {
zmq_log (1, "On upstream packet, %s(%i)\n", __FILE__, __LINE__); zmq_log (1, "On upstream packet, %s(%i)\n", __FILE__, __LINE__);
ssize_t dummy_bytes = 0; pgm_msgv_t dummy_msg;
#ifdef ZMQ_HAVE_OPENPGM1 #ifdef ZMQ_HAVE_OPENPGM1
ssize_t dummy_bytes = 0;
// We acctually do not want to read any data here we are going to // We acctually do not want to read any data here we are going to
// process NAK. // process NAK.
pgm_msgv_t dummy_msg;
dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg, dummy_bytes = pgm_transport_recvmsgv (g_transport, &dummy_msg,
1, MSG_DONTWAIT); 1, MSG_DONTWAIT);
// No data should be returned.
zmq_assert (dummy_bytes == -1 && errno == EAGAIN);
#elif defined ZMQ_HAVE_OPENPGM2 #elif defined ZMQ_HAVE_OPENPGM2
zmq_assert (false); size_t dummy_bytes = 0;
GError *pgm_error = NULL;
PGMIOStatus status = pgm_recvmsgv (g_transport, &dummy_msg,
1, MSG_DONTWAIT, &dummy_bytes, &pgm_error);
zmq_log (1, "upstream status %i, nbytes %i, %s(%i)\n",
(int) status, (int) dummy_bytes, __FILE__, __LINE__);
// No data should be returned.
zmq_assert (dummy_bytes == 0 && status == PGM_IO_STATUS_AGAIN);
#endif #endif
// No data should be returned.
zmq_assert (dummy_bytes == -1 && errno == EAGAIN);
} }
#endif #endif
......
...@@ -65,7 +65,7 @@ namespace zmq ...@@ -65,7 +65,7 @@ namespace zmq
// Get sender and receiver fds and store it to user allocated // Get sender and receiver fds and store it to user allocated
// memory. Receive fd is used to process NAKs from peers. // memory. Receive fd is used to process NAKs from peers.
int get_sender_fds (int *send_fd_, int *receive_fd_); int get_sender_fds (int *send_fd_, int *receive_fd_, int *rdata_notify_fd_ = NULL);
// Send data as one APDU, transmit window owned memory. // Send data as one APDU, transmit window owned memory.
size_t send (unsigned char *data_, size_t data_len_); size_t send (unsigned char *data_, size_t data_len_);
...@@ -143,8 +143,12 @@ namespace zmq ...@@ -143,8 +143,12 @@ namespace zmq
size_t pgm_msgv_len; size_t pgm_msgv_len;
// Sender transport uses 2 fd. // Sender transport uses 2 fd.
#ifdef ZMQ_HAVE_OPENPGM1
enum {pgm_sender_fd_count = 2}; enum {pgm_sender_fd_count = 2};
#elif ZMQ_HAVE_OPENPGM2
enum {pgm_sender_fd_count = 3};
#endif
// Receiver transport uses 2 fd. // Receiver transport uses 2 fd.
enum {pgm_receiver_fd_count = 2}; enum {pgm_receiver_fd_count = 2};
#endif #endif
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment