Commit d14be624 authored by Steven McCoy's avatar Steven McCoy Committed by Martin Sustrik

more fixes to (e)pgm transport

parent 96d85b20
...@@ -135,6 +135,11 @@ void zmq::pgm_receiver_t::in_event () ...@@ -135,6 +135,11 @@ void zmq::pgm_receiver_t::in_event ()
zmq_assert (pending_bytes == 0); zmq_assert (pending_bytes == 0);
if (has_rx_timer) {
cancel_timer (rx_timer_id);
has_rx_timer = false;
}
// TODO: This loop can effectively block other engines in the same I/O // TODO: This loop can effectively block other engines in the same I/O
// thread in the case of high load. // thread in the case of high load.
while (true) { while (true) {
...@@ -144,8 +149,15 @@ void zmq::pgm_receiver_t::in_event () ...@@ -144,8 +149,15 @@ void zmq::pgm_receiver_t::in_event ()
// No data to process. This may happen if the packet received is // No data to process. This may happen if the packet received is
// neither ODATA nor ODATA. // neither ODATA nor ODATA.
if (received < 0) if (received == 0) {
const int last_errno = errno;
if (last_errno == ENOMEM || last_errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout ();
add_timer (timeout, rx_timer_id);
has_rx_timer = true;
}
break; break;
}
// Find the peer based on its TSI. // Find the peer based on its TSI.
peers_t::iterator it = peers.find (*tsi); peers_t::iterator it = peers.find (*tsi);
...@@ -219,5 +231,12 @@ void zmq::pgm_receiver_t::in_event () ...@@ -219,5 +231,12 @@ void zmq::pgm_receiver_t::in_event ()
inout->flush (); inout->flush ();
} }
void zmq::pgm_receiver_t::timer_event (int token)
{
zmq_assert (token == rx_timer_id);
in_event ();
}
#endif #endif
...@@ -59,9 +59,16 @@ namespace zmq ...@@ -59,9 +59,16 @@ namespace zmq
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
void timer_event (int token);
private: private:
// RX timeout timer ID.
enum {rx_timer_id = 0xa1};
// RX timer is running.
bool has_rx_timer;
// If joined is true we are already getting messages from the peer. // If joined is true we are already getting messages from the peer.
// It it's false, we are getting data but still we haven't seen // It it's false, we are getting data but still we haven't seen
// beginning of a message. // beginning of a message.
......
...@@ -123,8 +123,19 @@ zmq::pgm_sender_t::~pgm_sender_t () ...@@ -123,8 +123,19 @@ zmq::pgm_sender_t::~pgm_sender_t ()
void zmq::pgm_sender_t::in_event () void zmq::pgm_sender_t::in_event ()
{ {
if (has_rx_timer) {
cancel_timer (rx_timer_id);
has_rx_timer = false;
}
// In event on sender side means NAK or SPMR receiving from some peer. // In event on sender side means NAK or SPMR receiving from some peer.
pgm_socket.process_upstream (); pgm_socket.process_upstream ();
const int last_errno = errno;
if (last_errno == ENOMEM || last_errno == EBUSY) {
const long timeout = pgm_socket.get_rx_timeout ();
add_timer (timeout, rx_timer_id);
has_rx_timer = true;
}
} }
void zmq::pgm_sender_t::out_event () void zmq::pgm_sender_t::out_event ()
...@@ -152,14 +163,36 @@ void zmq::pgm_sender_t::out_event () ...@@ -152,14 +163,36 @@ void zmq::pgm_sender_t::out_event ()
put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset); put_uint16 (out_buffer, offset == -1 ? 0xffff : (uint16_t) offset);
} }
if (has_rx_timer) {
cancel_timer (rx_timer_id);
has_rx_timer = false;
}
// Send the data. // Send the data.
size_t nbytes = pgm_socket.send (out_buffer, write_size); size_t nbytes = pgm_socket.send (out_buffer, write_size);
// We can write either all data or 0 which means rate limit reached. // We can write either all data or 0 which means rate limit reached.
if (nbytes == write_size) if (nbytes == write_size) {
write_size = 0; write_size = 0;
else } else {
zmq_assert (nbytes == 0); zmq_assert (nbytes == 0);
if (errno == ENOMEM) {
const long timeout = pgm_socket.get_tx_timeout ();
add_timer (timeout, tx_timer_id);
has_tx_timer = true;
} else
zmq_assert (errno == EBUSY);
}
}
void zmq::pgm_sender_t::timer_event (int token)
{
if (token == rx_timer_id)
in_event ();
zmq_assert (token == tx_timer_id);
out_event ();
} }
#endif #endif
......
...@@ -58,9 +58,16 @@ namespace zmq ...@@ -58,9 +58,16 @@ namespace zmq
// i_poll_events interface implementation. // i_poll_events interface implementation.
void in_event (); void in_event ();
void out_event (); void out_event ();
void timer_event (int token);
private: private:
// TX and RX timeout timer ID's.
enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1};
// Timers are running.
bool has_tx_timer, has_rx_timer;
// Message encoder. // Message encoder.
encoder_t encoder; encoder_t encoder;
......
...@@ -417,18 +417,55 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) ...@@ -417,18 +417,55 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
const int status = pgm_send (sock, data_, data_len_, &nbytes); const int status = pgm_send (sock, data_, data_len_, &nbytes);
if (nbytes != data_len_) { // We have to write all data as one packet.
if (nbytes > 0) {
zmq_assert (status == PGM_IO_STATUS_NORMAL);
zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_);
} else {
zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK); zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK);
zmq_assert (nbytes == 0);
if (status == PGM_IO_STATUS_RATE_LIMITED)
errno = ENOMEM;
else
errno = EBUSY;
} }
// We have to write all data as one packet. // Save return value.
if (nbytes > 0) last_tx_status = status;
zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_);
return nbytes; return nbytes;
} }
long zmq::pgm_socket_t::get_rx_timeout ()
{
if (last_rx_status != PGM_IO_STATUS_RATE_LIMITED && last_rx_status != PGM_IO_STATUS_TIMER_PENDING)
return -1;
struct timeval tv;
socklen_t optlen = sizeof (tv);
const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, last_rx_status == PGM_IO_STATUS_RATE_LIMITED ? PGM_RATE_REMAIN : PGM_TIME_REMAIN, &tv, &optlen);
zmq_assert (rc);
const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
return timeout;
}
long zmq::pgm_socket_t::get_tx_timeout ()
{
if (last_tx_status != PGM_IO_STATUS_RATE_LIMITED)
return -1;
struct timeval tv;
socklen_t optlen = sizeof (tv);
const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
zmq_assert (rc);
const long timeout = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
return timeout;
}
// Return max TSDU size without fragmentation from current PGM transport. // Return max TSDU size without fragmentation from current PGM transport.
size_t zmq::pgm_socket_t::get_max_tsdu_size () size_t zmq::pgm_socket_t::get_max_tsdu_size ()
{ {
...@@ -457,7 +494,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ...@@ -457,7 +494,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
nbytes_processed = 0; nbytes_processed = 0;
pgm_msgv_processed = 0; pgm_msgv_processed = 0;
errno = EAGAIN; errno = EAGAIN;
return -1; return 0;
} }
// If we have are going first time or if we have processed all pgm_msgv_t // If we have are going first time or if we have processed all pgm_msgv_t
...@@ -479,23 +516,19 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ...@@ -479,23 +516,19 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// Invalid parameters // Invalid parameters
zmq_assert (status != PGM_IO_STATUS_ERROR); zmq_assert (status != PGM_IO_STATUS_ERROR);
last_rx_status = status;
// In a case when no ODATA/RDATA fired POLLIN event (SPM...) // In a case when no ODATA/RDATA fired POLLIN event (SPM...)
// pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING. // pgm_recvmsg returns PGM_IO_STATUS_TIMER_PENDING.
if (status == PGM_IO_STATUS_TIMER_PENDING) { if (status == PGM_IO_STATUS_TIMER_PENDING) {
zmq_assert (nbytes_rec == 0); zmq_assert (nbytes_rec == 0);
struct timeval tv;
socklen_t optlen = sizeof (tv);
const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_TIME_REMAIN, &tv, &optlen);
zmq_assert (rc);
// In case if no RDATA/ODATA caused POLLIN 0 is // In case if no RDATA/ODATA caused POLLIN 0 is
// returned. // returned.
nbytes_rec = 0; nbytes_rec = 0;
errno = EBUSY; errno = EBUSY;
return -1; return 0;
} }
// Send SPMR, NAK, ACK is rate limited. // Send SPMR, NAK, ACK is rate limited.
...@@ -503,15 +536,11 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ...@@ -503,15 +536,11 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (nbytes_rec == 0); zmq_assert (nbytes_rec == 0);
struct timeval tv;
socklen_t optlen = sizeof (tv);
const bool rc = pgm_getsockopt (sock, IPPROTO_PGM, PGM_RATE_REMAIN, &tv, &optlen);
// In case if no RDATA/ODATA caused POLLIN 0 is // In case if no RDATA/ODATA caused POLLIN 0 is
// returned. // returned.
nbytes_rec = 0; nbytes_rec = 0;
errno = EBUSY; errno = ENOMEM;
return -1; return 0;
} }
// No peers and hence no incoming packets. // No peers and hence no incoming packets.
...@@ -523,7 +552,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ...@@ -523,7 +552,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
// returned. // returned.
nbytes_rec = 0; nbytes_rec = 0;
errno = EAGAIN; errno = EAGAIN;
return -1; return 0;
} }
// Data loss. // Data loss.
...@@ -548,6 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) ...@@ -548,6 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_)
zmq_assert (pgm_msgv_processed <= pgm_msgv_len); zmq_assert (pgm_msgv_processed <= pgm_msgv_len);
} }
// Zero byte payloads are valid in PGM, but not 0MQ protocol.
zmq_assert (nbytes_rec > 0); zmq_assert (nbytes_rec > 0);
// Only one APDU per pgm_msgv_t structure is allowed. // Only one APDU per pgm_msgv_t structure is allowed.
...@@ -587,6 +617,15 @@ void zmq::pgm_socket_t::process_upstream () ...@@ -587,6 +617,15 @@ void zmq::pgm_socket_t::process_upstream ()
// No data should be returned. // No data should be returned.
zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING ||
status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK)); status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK));
last_rx_status = status;
if (status == PGM_IO_STATUS_TIMER_PENDING)
errno = EBUSY;
else if (status == PGM_IO_STATUS_RATE_LIMITED)
errno = ENOMEM;
else
errno = EAGAIN;
} }
#endif #endif
......
...@@ -67,6 +67,9 @@ namespace zmq ...@@ -67,6 +67,9 @@ namespace zmq
// Receive data from pgm socket. // Receive data from pgm socket.
ssize_t receive (void **data_, const pgm_tsi_t **tsi_); ssize_t receive (void **data_, const pgm_tsi_t **tsi_);
long get_rx_timeout ();
long get_tx_timeout ();
// POLLIN on sender side should mean NAK or SPMR receiving. // POLLIN on sender side should mean NAK or SPMR receiving.
// process_upstream function is used to handle such a situation. // process_upstream function is used to handle such a situation.
void process_upstream (); void process_upstream ();
...@@ -76,6 +79,8 @@ namespace zmq ...@@ -76,6 +79,8 @@ namespace zmq
// OpenPGM transport // OpenPGM transport
pgm_sock_t* sock; pgm_sock_t* sock;
int last_rx_status, last_tx_status;
// Associated socket options. // Associated socket options.
options_t options; options_t options;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment