Commit dfe19080 authored by Jens Auer's avatar Jens Auer

Fixed wrong buffer end detection in v2_decoder.

zero-copy msg_t::init cannot be used when the message exceeds either
the buffer end or the last received byte. To detect this, the buffer
is now resized to the numnber of received bytes.
parent b3f2acf7
......@@ -73,6 +73,10 @@ namespace zmq
return bufsize;
}
void resize(size_t new_size)
{
bufsize = new_size;
}
private:
size_t bufsize;
unsigned char* buf;
......@@ -190,6 +194,11 @@ namespace zmq
return 0;
}
virtual void resize_buffer(size_t new_size)
{
allocator->resize(new_size);
}
protected:
// Prototype of state machine action. Action should return false if
......
......@@ -46,6 +46,7 @@ namespace zmq
virtual void get_buffer (unsigned char **data_, size_t *size_) = 0;
virtual void resize_buffer(size_t) = 0;
// Decodes data pointed to by data_.
// When a message is decoded, 1 is returned.
// When the decoder needs more data, 0 is returnd.
......@@ -54,6 +55,8 @@ namespace zmq
size_t &processed) = 0;
virtual msg_t *msg () = 0;
};
}
......
......@@ -56,7 +56,7 @@ namespace zmq
virtual msg_t *msg () { return &in_progress; }
virtual void resize_buffer(size_t) {}
private:
......
......@@ -295,6 +295,7 @@ void zmq::stream_engine_t::in_event ()
decoder->get_buffer (&inpos, &bufsize);
const int rc = tcp_read (s, inpos, bufsize);
if (rc == 0) {
error (connection_error);
return;
......@@ -307,6 +308,8 @@ void zmq::stream_engine_t::in_event ()
// Adjust input size
insize = static_cast <size_t> (rc);
// Adjust buffer size to received bytes
decoder->resize_buffer(insize);
}
int rc = 0;
......
......@@ -43,7 +43,8 @@
zmq::shared_message_memory_allocator::shared_message_memory_allocator(size_t bufsize_):
buf(NULL),
bufsize( bufsize_ )
bufsize( 0 ),
maxsize( bufsize_ )
{
}
......@@ -71,6 +72,7 @@ unsigned char* zmq::shared_message_memory_allocator::allocate()
new(buf) atomic_counter_t(1);
}
bufsize = maxsize;
return buf + sizeof( zmq::atomic_counter_t);
}
......@@ -78,12 +80,15 @@ void zmq::shared_message_memory_allocator::deallocate()
{
free(buf);
buf = NULL;
bufsize = 0;
}
unsigned char* zmq::shared_message_memory_allocator::release()
{
unsigned char* b = buf;
buf = NULL;
bufsize = 0;
return b;
}
......
......@@ -80,9 +80,16 @@ namespace zmq
return buf;
}
void resize(size_t new_size)
{
bufsize = new_size;
}
private:
unsigned char* buf;
size_t bufsize;
size_t maxsize;
zmq::atomic_counter_t* msg_refcnt;
};
// Decoder for ZMTP/2.x framing protocol. Converts data stream into messages.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment