Commit 56ead844 authored by Jon Dyte's avatar Jon Dyte

Experimental function zmq_recviov doesnt work correctly in a couple of cases

1) VSM - you cannot hand out the 'data' address as it was not allocated on the heap
2) for other messages the 'data' address cannot be handed out either, as it not the address
originally returned by malloc and hence cannot be passed to 'free'.
see msg.cpp
u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_);
....
u.lmsg.content->data = u.lmsg.content + 1;

So the function is changed to always malloc a data buffer and copy the data into it.
There is a possible optimisation using memmove for the non-VSM case but that is not done yet.
parent 0bf5a314
...@@ -478,10 +478,6 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) ...@@ -478,10 +478,6 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
// The iov_base* buffers of each iovec *a_ filled in by this // The iov_base* buffers of each iovec *a_ filled in by this
// function may be freed using free(). // function may be freed using free().
// //
// Implementation note: We assume zmq::msg_t buffer allocated
// by zmq::recvmsg can be freed by free().
// We assume it is safe to steal these buffers by simply
// not closing the zmq::msg_t.
// //
int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
{ {
...@@ -498,8 +494,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) ...@@ -498,8 +494,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
*count_ = 0; *count_ = 0;
for (size_t i = 0; recvmore && i < count; ++i) { for (size_t i = 0; recvmore && i < count; ++i) {
// Cheat! We never close any msg
// because we want to steal the buffer.
zmq_msg_t msg; zmq_msg_t msg;
int rc = zmq_msg_init (&msg); int rc = zmq_msg_init (&msg);
errno_assert (rc == 0); errno_assert (rc == 0);
...@@ -513,15 +508,21 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) ...@@ -513,15 +508,21 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
nread = -1; nread = -1;
break; break;
} }
++*count_;
++nread;
// Cheat: acquire zmq_msg buffer.
a_[i].iov_base = static_cast<char *> (zmq_msg_data (&msg));
a_[i].iov_len = zmq_msg_size (&msg); a_[i].iov_len = zmq_msg_size (&msg);
a_[i].iov_base = malloc(a_[i].iov_len);
if (!a_[i].iov_base) {
errno = ENOMEM;
return -1;
}
memcpy(a_[i].iov_base,static_cast<char *> (zmq_msg_data (&msg)),
a_[i].iov_len);
// Assume zmq_socket ZMQ_RVCMORE is properly set. // Assume zmq_socket ZMQ_RVCMORE is properly set.
recvmore = ((zmq::msg_t*) (void *) &msg)->flags () & zmq::msg_t::more; recvmore = ((zmq::msg_t*) (void *) &msg)->flags () & zmq::msg_t::more;
rc = zmq_msg_close(&msg);
errno_assert (rc == 0);
++*count_;
++nread;
} }
return nread; return nread;
} }
......
...@@ -20,7 +20,8 @@ noinst_PROGRAMS = test_pair_inproc \ ...@@ -20,7 +20,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_router_mandatory \ test_router_mandatory \
test_raw_sock \ test_raw_sock \
test_disconnect_inproc \ test_disconnect_inproc \
test_ctx_options test_ctx_options \
test_iov
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
......
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#undef NDEBUG
#include <assert.h>
// XSI vector I/O
#if defined ZMQ_HAVE_UIO
#include <sys/uio.h>
#else
struct iovec {
void *iov_base;
size_t iov_len;
};
#endif
void do_check(void* sb, void* sc, unsigned int msgsz)
{
int rc;
int sum =0;
for (int i = 0; i < 10; i++)
{
zmq_msg_t msg;
zmq_msg_init_size(&msg, msgsz);
void * data = zmq_msg_data(&msg);
memcpy(data,&i, sizeof(int));
rc = zmq_msg_send(&msg,sc,i==9 ? 0 :ZMQ_SNDMORE);
assert (rc == (int)msgsz);
zmq_msg_close(&msg);
sum += i;
}
struct iovec ibuffer[32] ;
memset(&ibuffer[0], 0, sizeof(ibuffer));
size_t count = 10;
rc = zmq_recviov(sb,&ibuffer[0],&count,0);
assert (rc == 10);
int rsum=0;
for(;count;--count)
{
int v;
memcpy(&v,ibuffer[count-1].iov_base,sizeof(int));
rsum += v;
assert(ibuffer[count-1].iov_len == msgsz);
// free up the memory
free(ibuffer[count-1].iov_base);
}
assert ( sum == rsum );
}
int main (void)
{
void *ctx = zmq_ctx_new ();
assert (ctx);
int rc;
void *sb = zmq_socket (ctx, ZMQ_PULL);
assert (sb);
rc = zmq_bind (sb, "inproc://a");
assert (rc == 0);
::sleep(1);
void *sc = zmq_socket (ctx, ZMQ_PUSH);
rc = zmq_connect (sc, "inproc://a");
assert (rc == 0);
// message bigger than vsm max
do_check(sb,sc,100);
// message smaller than vsm max
do_check(sb,sc,10);
rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment