Commit 73b765e4 authored by Martin Sustrik's avatar Martin Sustrik

PGM transport fixed

parent d5670f34
...@@ -50,12 +50,18 @@ namespace zmq ...@@ -50,12 +50,18 @@ namespace zmq
free (buf); free (buf);
} }
// The function returns a batch of binary data. If offset is not NULL, // The function returns a batch of binary data. The data
// it is filled by offset of the first message in the batch. If there's // are filled to a supplied buffer. If no buffer is supplied (data_
// no beginning of a message in the batch, offset is set to -1. // points to NULL) decoder object will provide buffer of its own.
inline void get_buffer (unsigned char **data_, size_t *size_, // If offset is not NULL, it is filled by offset of the first message
// in the batch.If there's no beginning of a message in the batch,
// offset is set to -1.
inline void get_data (unsigned char **data_, size_t *size_,
int *offset_ = NULL) int *offset_ = NULL)
{ {
unsigned char *buffer = !*data_ ? buf : *data_;
size_t buffersize = !*data_ ? bufsize : *size_;
size_t pos = 0; size_t pos = 0;
if (offset_) if (offset_)
*offset_ = -1; *offset_ = -1;
...@@ -67,7 +73,7 @@ namespace zmq ...@@ -67,7 +73,7 @@ namespace zmq
// in the buffer. // in the buffer.
if (!to_write) { if (!to_write) {
if (!(static_cast <T*> (this)->*next) ()) { if (!(static_cast <T*> (this)->*next) ()) {
*data_ = buf; *data_ = buffer;
*size_ = pos; *size_ = pos;
return; return;
} }
...@@ -91,7 +97,7 @@ namespace zmq ...@@ -91,7 +97,7 @@ namespace zmq
// As a consequence, large messages being sent won't block // As a consequence, large messages being sent won't block
// other engines running in the same I/O thread for excessive // other engines running in the same I/O thread for excessive
// amounts of time. // amounts of time.
if (!pos && to_write >= bufsize) { if (!pos && !*data_ && to_write >= buffersize) {
*data_ = write_pos; *data_ = write_pos;
*size_ = to_write; *size_ = to_write;
write_pos = NULL; write_pos = NULL;
...@@ -100,13 +106,13 @@ namespace zmq ...@@ -100,13 +106,13 @@ namespace zmq
} }
// Copy data to the buffer. If the buffer is full, return. // Copy data to the buffer. If the buffer is full, return.
size_t to_copy = std::min (to_write, bufsize - pos); size_t to_copy = std::min (to_write, buffersize - pos);
memcpy (buf + pos, write_pos, to_copy); memcpy (buffer + pos, write_pos, to_copy);
pos += to_copy; pos += to_copy;
write_pos += to_copy; write_pos += to_copy;
to_write -= to_copy; to_write -= to_copy;
if (pos == bufsize) { if (pos == buffersize) {
*data_ = buf; *data_ = buffer;
*size_ = pos; *size_ = pos;
return; return;
} }
......
...@@ -194,7 +194,7 @@ void zmq::pgm_receiver_t::in_event () ...@@ -194,7 +194,7 @@ void zmq::pgm_receiver_t::in_event ()
it->second.joined = true; it->second.joined = true;
// Create and connect decoder for joined peer. // Create and connect decoder for joined peer.
it->second.decoder = new zmq_decoder_t; it->second.decoder = new zmq_decoder_t (0);
it->second.decoder->set_inout (inout); it->second.decoder->set_inout (inout);
#ifdef ZMQ_HAVE_OPENPGM1 #ifdef ZMQ_HAVE_OPENPGM1
...@@ -209,7 +209,8 @@ void zmq::pgm_receiver_t::in_event () ...@@ -209,7 +209,8 @@ void zmq::pgm_receiver_t::in_event ()
if (nbytes > 0) { if (nbytes > 0) {
// Push all the data to the decoder. // Push all the data to the decoder.
it->second.decoder->write (raw_data, nbytes); // TODO: process_buffer may not process entire buffer!
it->second.decoder->process_buffer (raw_data, nbytes);
} }
} while (nbytes > 0); } while (nbytes > 0);
......
...@@ -49,6 +49,7 @@ ...@@ -49,6 +49,7 @@
zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_,
const options_t &options_, const char *session_name_) : const options_t &options_, const char *session_name_) :
io_object_t (parent_), io_object_t (parent_),
encoder (0),
pgm_socket (false, options_), pgm_socket (false, options_),
options (options_), options (options_),
session_name (session_name_), session_name (session_name_),
...@@ -162,8 +163,9 @@ void zmq::pgm_sender_t::out_event () ...@@ -162,8 +163,9 @@ void zmq::pgm_sender_t::out_event ()
// First two bytes /sizeof (uint16_t)/ are used to store message // First two bytes /sizeof (uint16_t)/ are used to store message
// offset in following steps. // offset in following steps.
write_size = encoder.read (out_buffer + sizeof (uint16_t), unsigned char *bf = out_buffer + sizeof (uint16_t);
out_buffer_size - sizeof (uint16_t), &first_message_offset); write_size = out_buffer_size - sizeof (uint16_t);
encoder.get_data (&bf, &write_size, &first_message_offset);
write_pos = 0; write_pos = 0;
// If there are no data to write stop polling for output. // If there are no data to write stop polling for output.
......
This diff is collapsed.
...@@ -62,7 +62,8 @@ namespace zmq ...@@ -62,7 +62,8 @@ namespace zmq
// Get sender and receiver fds and store it to user allocated // Get sender and receiver fds and store it to user allocated
// memory. Receive fd is used to process NAKs from peers. // memory. Receive fd is used to process NAKs from peers.
int get_sender_fds (int *send_fd_, int *receive_fd_, int *rdata_notify_fd_ = NULL); int get_sender_fds (int *send_fd_, int *receive_fd_,
int *rdata_notify_fd_ = NULL);
// Send data as one APDU, transmit window owned memory. // Send data as one APDU, transmit window owned memory.
size_t send (unsigned char *data_, size_t data_len_); size_t send (unsigned char *data_, size_t data_len_);
...@@ -83,7 +84,7 @@ namespace zmq ...@@ -83,7 +84,7 @@ namespace zmq
protected: protected:
// OpenPGM transport // OpenPGM transport
pgm_transport_t* g_transport; pgm_transport_t* transport;
private: private:
......
...@@ -106,7 +106,9 @@ void zmq::zmq_engine_t::out_event () ...@@ -106,7 +106,9 @@ void zmq::zmq_engine_t::out_event ()
{ {
// If write buffer is empty, try to read new data from the encoder. // If write buffer is empty, try to read new data from the encoder.
if (!outsize) { if (!outsize) {
encoder.get_buffer (&outpos, &outsize);
outpos = NULL;
encoder.get_data (&outpos, &outsize);
// If there is no data to send, stop polling for output. // If there is no data to send, stop polling for output.
if (outsize == 0) { if (outsize == 0) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment