Commit 507718ee authored by Martin Sustrik's avatar Martin Sustrik

ZMQ_HWM type changed to int

Signed-off-by: 's avatarMartin Sustrik <sustrik@250bpm.com>
parent bd9d7715
...@@ -73,7 +73,7 @@ type. ...@@ -73,7 +73,7 @@ type.
The default 'ZMQ_HWM' value of zero means "no limit". The default 'ZMQ_HWM' value of zero means "no limit".
[horizontal] [horizontal]
Option value type:: uint64_t Option value type:: int
Option value unit:: messages Option value unit:: messages
Default value:: 0 Default value:: 0
Applicable socket types:: all Applicable socket types:: all
...@@ -348,7 +348,7 @@ EXAMPLE ...@@ -348,7 +348,7 @@ EXAMPLE
.Retrieving the high water mark .Retrieving the high water mark
---- ----
/* Retrieve high water mark into hwm */ /* Retrieve high water mark into hwm */
int64_t hwm; int hwm;
size_t hwm_size = sizeof (hwm); size_t hwm_size = sizeof (hwm);
rc = zmq_getsockopt (socket, ZMQ_HWM, &hwm, &hwm_size); rc = zmq_getsockopt (socket, ZMQ_HWM, &hwm, &hwm_size);
assert (rc == 0); assert (rc == 0);
......
...@@ -41,7 +41,7 @@ type. ...@@ -41,7 +41,7 @@ type.
The default 'ZMQ_HWM' value of zero means "no limit". The default 'ZMQ_HWM' value of zero means "no limit".
[horizontal] [horizontal]
Option value type:: uint64_t Option value type:: int
Option value unit:: messages Option value unit:: messages
Default value:: 0 Default value:: 0
Applicable socket types:: all Applicable socket types:: all
......
...@@ -50,11 +50,11 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ...@@ -50,11 +50,11 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
switch (option_) { switch (option_) {
case ZMQ_HWM: case ZMQ_HWM:
if (optvallen_ != sizeof (uint64_t)) { if (optvallen_ != sizeof (int) || *((int*) optval_) < 0) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
hwm = *((uint64_t*) optval_); hwm = *((int*) optval_);
return 0; return 0;
case ZMQ_AFFINITY: case ZMQ_AFFINITY:
...@@ -169,11 +169,11 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) ...@@ -169,11 +169,11 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
switch (option_) { switch (option_) {
case ZMQ_HWM: case ZMQ_HWM:
if (*optvallen_ < sizeof (uint64_t)) { if (*optvallen_ < sizeof (int)) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
*((uint64_t*) optval_) = hwm; *((int*) optval_) = hwm;
*optvallen_ = sizeof (uint64_t); *optvallen_ = sizeof (uint64_t);
return 0; return 0;
......
...@@ -35,7 +35,9 @@ namespace zmq ...@@ -35,7 +35,9 @@ namespace zmq
int setsockopt (int option_, const void *optval_, size_t optvallen_); int setsockopt (int option_, const void *optval_, size_t optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_); int getsockopt (int option_, void *optval_, size_t *optvallen_);
uint64_t hwm; // High-water mark for messages in pipe.
int hwm;
uint64_t affinity; uint64_t affinity;
blob_t identity; blob_t identity;
......
...@@ -25,8 +25,7 @@ ...@@ -25,8 +25,7 @@
#include "pipe.hpp" #include "pipe.hpp"
#include "likely.hpp" #include "likely.hpp"
zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, int lwm_) :
uint64_t lwm_) :
object_t (parent_), object_t (parent_),
active (true), active (true),
pipe (pipe_), pipe (pipe_),
...@@ -163,7 +162,7 @@ void zmq::reader_t::process_pipe_term_ack () ...@@ -163,7 +162,7 @@ void zmq::reader_t::process_pipe_term_ack ()
} }
zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_, zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, reader_t *reader_,
uint64_t hwm_) : int hwm_) :
object_t (parent_), object_t (parent_),
active (true), active (true),
pipe (pipe_), pipe (pipe_),
...@@ -288,11 +287,11 @@ void zmq::writer_t::process_pipe_term () ...@@ -288,11 +287,11 @@ void zmq::writer_t::process_pipe_term ()
bool zmq::writer_t::pipe_full () bool zmq::writer_t::pipe_full ()
{ {
return hwm > 0 && msgs_written - msgs_read == hwm; return hwm > 0 && msgs_written - msgs_read == uint64_t (hwm);
} }
void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, reader_t **reader_, writer_t **writer_) int hwm_, reader_t **reader_, writer_t **writer_)
{ {
// First compute the low water mark. Following point should be taken // First compute the low water mark. Following point should be taken
// into consideration: // into consideration:
...@@ -314,7 +313,7 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_, ...@@ -314,7 +313,7 @@ void zmq::create_pipe (object_t *reader_parent_, object_t *writer_parent_,
// That done, we still we have to account for the cases where // That done, we still we have to account for the cases where
// HWM < max_wm_delta thus driving LWM to negative numbers. // HWM < max_wm_delta thus driving LWM to negative numbers.
// Let's make LWM 1/2 of HWM in such cases. // Let's make LWM 1/2 of HWM in such cases.
uint64_t lwm = (hwm_ > max_wm_delta * 2) ? int lwm = (hwm_ > max_wm_delta * 2) ?
hwm_ - max_wm_delta : (hwm_ + 1) / 2; hwm_ - max_wm_delta : (hwm_ + 1) / 2;
// Create all three objects pipe consists of: the pipe per se, reader and // Create all three objects pipe consists of: the pipe per se, reader and
......
...@@ -23,19 +23,18 @@ ...@@ -23,19 +23,18 @@
#include "../include/zmq.h" #include "../include/zmq.h"
#include "stdint.hpp"
#include "array.hpp" #include "array.hpp"
#include "ypipe.hpp" #include "ypipe.hpp"
#include "config.hpp" #include "config.hpp"
#include "object.hpp" #include "object.hpp"
#include "stdint.hpp"
namespace zmq namespace zmq
{ {
// Creates a pipe. Returns pointer to reader and writer objects. // Creates a pipe. Returns pointer to reader and writer objects.
void create_pipe (object_t *reader_parent_, object_t *writer_parent_, void create_pipe (object_t *reader_parent_, object_t *writer_parent_,
uint64_t hwm_, class reader_t **reader_, int hwm_, class reader_t **reader_, class writer_t **writer_);
class writer_t **writer_);
// The shutdown mechanism for pipe works as follows: Either endpoint // The shutdown mechanism for pipe works as follows: Either endpoint
// (or even both of them) can ask pipe to terminate by calling 'terminate' // (or even both of them) can ask pipe to terminate by calling 'terminate'
...@@ -57,7 +56,7 @@ namespace zmq ...@@ -57,7 +56,7 @@ namespace zmq
class reader_t : public object_t, public array_item_t class reader_t : public object_t, public array_item_t
{ {
friend void create_pipe (object_t*, object_t*, uint64_t, friend void create_pipe (object_t*, object_t*, int,
reader_t**, writer_t**); reader_t**, writer_t**);
friend class writer_t; friend class writer_t;
...@@ -77,7 +76,7 @@ namespace zmq ...@@ -77,7 +76,7 @@ namespace zmq
private: private:
reader_t (class object_t *parent_, pipe_t *pipe_, uint64_t lwm_); reader_t (class object_t *parent_, pipe_t *pipe_, int lwm_);
~reader_t (); ~reader_t ();
// To be called only by writer itself! // To be called only by writer itself!
...@@ -100,7 +99,7 @@ namespace zmq ...@@ -100,7 +99,7 @@ namespace zmq
class writer_t *writer; class writer_t *writer;
// Low watermark for in-memory storage (in bytes). // Low watermark for in-memory storage (in bytes).
uint64_t lwm; int lwm;
// Number of messages read so far. // Number of messages read so far.
uint64_t msgs_read; uint64_t msgs_read;
...@@ -126,7 +125,7 @@ namespace zmq ...@@ -126,7 +125,7 @@ namespace zmq
class writer_t : public object_t, public array_item_t class writer_t : public object_t, public array_item_t
{ {
friend void create_pipe (object_t*, object_t*, uint64_t, friend void create_pipe (object_t*, object_t*, int,
reader_t**, writer_t**); reader_t**, writer_t**);
public: public:
...@@ -155,7 +154,7 @@ namespace zmq ...@@ -155,7 +154,7 @@ namespace zmq
private: private:
writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_, writer_t (class object_t *parent_, pipe_t *pipe_, reader_t *reader_,
uint64_t hwm_); int hwm_);
~writer_t (); ~writer_t ();
// Command handlers. // Command handlers.
...@@ -175,7 +174,7 @@ namespace zmq ...@@ -175,7 +174,7 @@ namespace zmq
reader_t *reader; reader_t *reader;
// High watermark for in-memory storage (in bytes). // High watermark for in-memory storage (in bytes).
uint64_t hwm; int hwm;
// Last confirmed number of messages read from the pipe. // Last confirmed number of messages read from the pipe.
// The actual number can be higher. // The actual number can be higher.
......
...@@ -377,7 +377,7 @@ int zmq::socket_base_t::connect (const char *addr_) ...@@ -377,7 +377,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// The total HWM for an inproc connection should be the sum of // The total HWM for an inproc connection should be the sum of
// the binder's HWM and the connector's HWM. // the binder's HWM and the connector's HWM.
int64_t hwm; int hwm;
if (options.hwm == 0 || peer.options.hwm == 0) if (options.hwm == 0 || peer.options.hwm == 0)
hwm = 0; hwm = 0;
else else
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include <assert.h> #include <assert.h>
#include "../src/stdint.hpp"
#include "testutil.hpp" #include "testutil.hpp"
int main (int argc, char *argv []) int main (int argc, char *argv [])
...@@ -33,7 +32,7 @@ int main (int argc, char *argv []) ...@@ -33,7 +32,7 @@ int main (int argc, char *argv [])
// buffer space should be 4 messages. // buffer space should be 4 messages.
void *sb = zmq_socket (ctx, ZMQ_PULL); void *sb = zmq_socket (ctx, ZMQ_PULL);
assert (sb); assert (sb);
uint64_t hwm = 2; int hwm = 2;
int rc = zmq_setsockopt (sb, ZMQ_HWM, &hwm, sizeof (hwm)); int rc = zmq_setsockopt (sb, ZMQ_HWM, &hwm, sizeof (hwm));
assert (rc == 0); assert (rc == 0);
rc = zmq_bind (sb, "inproc://a"); rc = zmq_bind (sb, "inproc://a");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment