Commit cc813689 authored by Martin Sustrik's avatar Martin Sustrik

documentation in zmq.h improved

parent e6ca5da1
...@@ -26,83 +26,35 @@ extern "C" { ...@@ -26,83 +26,35 @@ extern "C" {
#include <stddef.h> #include <stddef.h>
// Microsoft Visual Studio uses non-standard way to export/import symbols.
#if defined ZMQ_BUILDING_LIBZMQ_WITH_MSVC #if defined ZMQ_BUILDING_LIBZMQ_WITH_MSVC
#define ZMQ_EXPORT __declspec(dllexport) #define ZMQ_EXPORT __declspec(dllexport)
#elif defined _MSC_VER
#define ZMQ_EXPORT __declspec(dllimport)
#else #else
#define ZMQ_EXPORT #define ZMQ_EXPORT
#endif #endif
////////////////////////////////////////////////////////////////////////////////
// 0MQ message definition.
////////////////////////////////////////////////////////////////////////////////
// Maximal size of "Very Small Message". VSMs are passed by value // Maximal size of "Very Small Message". VSMs are passed by value
// to avoid excessive memory allocation/deallocation. // to avoid excessive memory allocation/deallocation.
// If VMSs larger than 255 bytes are required, type of 'vsm_size' // If VMSs larger than 255 bytes are required, type of 'vsm_size'
// field in zmq_msg_t structure should be modified accordingly. // field in zmq_msg_t structure should be modified accordingly.
#define ZMQ_MAX_VSM_SIZE 30 #define ZMQ_MAX_VSM_SIZE 30
// Message & notification types. // Message types. These integers may be stored in 'content' member of the
#define ZMQ_GAP 1 // message instead of regular pointer to the data.
#define ZMQ_DELIMITER 31 #define ZMQ_DELIMITER 31
#define ZMQ_VSM 32 #define ZMQ_VSM 32
// Socket options.
#define ZMQ_HWM 1 // int64_t
#define ZMQ_LWM 2 // int64_t
#define ZMQ_SWAP 3 // int64_t
#define ZMQ_AFFINITY 4 // int64_t
#define ZMQ_IDENTITY 5 // string
#define ZMQ_SUBSCRIBE 6 // string
#define ZMQ_UNSUBSCRIBE 7 // string
#define ZMQ_RATE 8 // int64_t
#define ZMQ_RECOVERY_IVL 9 // int64_t
#define ZMQ_MCAST_LOOP 10 // int64_t
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.
#define ZMQ_NOBLOCK 1
// zmq_send should not flush the message downstream immediately. Instead, it
// should batch ZMQ_NOFLUSH messages and send them downstream only if zmq_flush
// is invoked. This is an optimisation for cases where several messages are
// sent in a single business transaction. However, the effect is measurable
// only in extremely high-perf scenarios (million messages a second or so).
// If that's not your case, use standard flushing send instead. See exchange
// example for illustration of ZMQ_NOFLUSH functionality.
#define ZMQ_NOFLUSH 2
// Socket to communicate with a single peer. Allows for a singe connect or a
// single accept. There's no message routing or message filtering involved.
#define ZMQ_P2P 0
// Socket to distribute data. Recv fuction is not implemented for this socket
// type. Messages are distributed in fanout fashion to all peers.
#define ZMQ_PUB 1
// Socket to subscribe to distributed data. Send function is not implemented
// for this socket type. However, subscribe function can be used to modify the
// message filter.
#define ZMQ_SUB 2
// Socket to send requests on and receive replies from. Requests are
// load-balanced among all the peers. This socket type doesn't allow for more
// recv's that there were send's.
#define ZMQ_REQ 3
// Socket to receive requests from and send replies to. This socket type allows
// only an alternated sequence of recv's and send's. Each send is routed to
// the peer that the previous recv delivered message from.
#define ZMQ_REP 4
// Option specifying that the sockets should be pollable. This may be a little
// less efficient that raw non-pollable sockets.
#define ZMQ_POLL 1
// Prototype for the message body deallocation functions.
// It is deliberately defined in the way to comply with standard C free.
typedef void (zmq_free_fn) (void *data);
// A message. If 'shared' is true, message content pointed to by 'content' // A message. If 'shared' is true, message content pointed to by 'content'
// is shared, i.e. reference counting is used to manage its lifetime // is shared, i.e. reference counting is used to manage its lifetime
// rather than straighforward malloc/free. struct zmq_msg_content is // rather than straighforward malloc/free. Not that 'content' is not a pointer
// not declared in the API. // to the raw data. Rather it is pointer to zmq::msg_content_t structure
// (see src/msg_content.hpp for its definition).
struct zmq_msg_t struct zmq_msg_t
{ {
void *content; void *content;
...@@ -116,12 +68,15 @@ ZMQ_EXPORT int zmq_msg_init (struct zmq_msg_t *msg); ...@@ -116,12 +68,15 @@ ZMQ_EXPORT int zmq_msg_init (struct zmq_msg_t *msg);
// Initialise a message 'size' bytes long. // Initialise a message 'size' bytes long.
// //
// Errors: ENOMEM - the size is too large to allocate. // Errors: ENOMEM - message is too big to fit into memory.
ZMQ_EXPORT int zmq_msg_init_size (struct zmq_msg_t *msg, size_t size); ZMQ_EXPORT int zmq_msg_init_size (struct zmq_msg_t *msg, size_t size);
// Initialise a message from an existing buffer. Message isn't copied, // Initialise a message from an existing buffer. Message isn't copied,
// instead 0MQ infrastructure take ownership of the buffer and call // instead 0MQ infrastructure takes ownership of the buffer and
// deallocation functio (ffn) once it's not needed anymore. // deallocation function (ffn) will be called once the data are not
// needed anymore. Note that deallocation function prototype is designed
// so that it complies with standard C 'free' function.
typedef void (zmq_free_fn) (void *data);
ZMQ_EXPORT int zmq_msg_init_data (struct zmq_msg_t *msg, void *data, ZMQ_EXPORT int zmq_msg_init_data (struct zmq_msg_t *msg, void *data,
size_t size, zmq_free_fn *ffn); size_t size, zmq_free_fn *ffn);
...@@ -145,35 +100,180 @@ ZMQ_EXPORT void *zmq_msg_data (struct zmq_msg_t *msg); ...@@ -145,35 +100,180 @@ ZMQ_EXPORT void *zmq_msg_data (struct zmq_msg_t *msg);
// Return size of message data (in bytes). // Return size of message data (in bytes).
ZMQ_EXPORT size_t zmq_msg_size (struct zmq_msg_t *msg); ZMQ_EXPORT size_t zmq_msg_size (struct zmq_msg_t *msg);
// Returns type of the message. ////////////////////////////////////////////////////////////////////////////////
ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg); // 0MQ infrastructure (a.k.a. context) initialisation & termination.
////////////////////////////////////////////////////////////////////////////////
// Flag specifying that the sockets within this context should be pollable.
// This may be a little less efficient that raw non-pollable sockets.
#define ZMQ_POLL 1
// Initialise 0MQ context. 'app_threads' specifies maximal number // Initialise 0MQ context. 'app_threads' specifies maximal number
// of application threads that can have open sockets at the same time. // of application threads that can own open sockets at the same time.
// 'io_threads' specifies the size of thread pool to handle I/O operations. // 'io_threads' specifies the size of thread pool to handle I/O operations.
// 'flags' argument is a bitmap composed of the flags defined above.
// //
// Errors: EINVAL - one of the arguments is less than zero or there are no // Errors: EINVAL - one of the arguments is less than zero or there are no
// threads declared at all. // threads declared at all.
ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads, int flags); ZMQ_EXPORT void *zmq_init (int app_threads, int io_threads, int flags);
// Deinitialise 0MQ context including all the open sockets. Closing // Deinitialise 0MQ context. If there are still open sockets, actual
// sockets after zmq_term has been called will result in undefined behaviour. // deinitialisation of the context is delayed till all the sockets are closed.
ZMQ_EXPORT int zmq_term (void *context); ZMQ_EXPORT int zmq_term (void *context);
// Open a socket. ////////////////////////////////////////////////////////////////////////////////
// 0MQ socket definition.
////////////////////////////////////////////////////////////////////////////////
// Creating a 0MQ socket.
// **********************
// Socket to communicate with a single peer. Allows for a singe connect or a
// single accept. There's no message routing or message filtering involved.
#define ZMQ_P2P 0
// Socket to distribute data. Recv fuction is not implemented for this socket
// type. Messages are distributed in fanout fashion to all the peers.
#define ZMQ_PUB 1
// Socket to subscribe for data. Send function is not implemented for this
// socket type. However, subscribe function can be used to modify the
// message filter (see ZMQ_SUBSCRIBE socket option).
#define ZMQ_SUB 2
// Socket to send requests and receive replies. Requests are
// load-balanced among all the peers. This socket type allows
// only an alternated sequence of send's and recv's
#define ZMQ_REQ 3
// Socket to receive requests and send replies. This socket type allows
// only an alternated sequence of recv's and send's. Each send is routed to
// the peer that issued the last received request.
#define ZMQ_REP 4
// Open a socket. 'type' is one of the socket types defined above.
// //
// Errors: EINVAL - invalid socket type. // Errors: EINVAL - invalid socket type.
// EMFILE - the number of application threads entitled to hold open // EMFILE - the number of application threads entitled to hold open
// sockets at the same time was exceeded. // sockets at the same time was exceeded.
ZMQ_EXPORT void *zmq_socket (void *context, int type); ZMQ_EXPORT void *zmq_socket (void *context, int type);
// Destroying the socket.
// **********************
// Close the socket. // Close the socket.
ZMQ_EXPORT int zmq_close (void *s); ZMQ_EXPORT int zmq_close (void *s);
// Sets an option on the socket. // Manipulating socket options.
// EINVAL - unknown option, a value with incorrect length or an invalid value. // ****************************
ZMQ_EXPORT int zmq_setsockopt (void *s, int option_, const void *optval_,
size_t optvallen_); // Available socket options, their types and default values.
// High watermark for the message pipes associated with the socket. The water
// mark cannot be exceeded. If the messages don't fit into the pipe emergency
// mechanisms of the particular socket type are used (block, drop etc.) If HWM
// is set to zero, there are no limits for the content of the pipe.
// Type: int64_t Unit: bytes Default: 0
#define ZMQ_HWM 1
// Low watermark makes sense only if high watermark is defined (is non-zero).
// When the emergency state is reached when messages overflow the pipe, the
// emergency lasts till the size of the pipe decreases to low watermark.
// At that point normal state is resumed.
// Type: int64_t Unit: bytes Default: 0
#define ZMQ_LWM 2
// Swap allows the pipe to exceed high watermark. However, the data are written
// to the disk rather than held in the memory. While the high watermark is not
// exceeded there is no disk activity involved though. The value of the option
// defines maximal size of the swap file.
// Type: int64_t Unit: bytes Default: 0
#define ZMQ_SWAP 3
// Affinity defines which threads in the thread pool will be used to handle
// newly created sockets. This way you can dedicate some of the threads (CPUs)
// to a specific work. Value of 0 means no affinity, work is distributed
// fairly among the threads in the thread pool. For non-zero values, the lowest
// bit corresponds to the thread 1, second lowest bit to the thread 2 etc.
// Thus, value of 3 means that from now on newly created sockets will handle
// I/O activity exclusively using threads no. 1 and 2.
// Type: int64_t Unit: N/A (bitmap) Default: 0
#define ZMQ_AFFINITY 4
// Identity of the socket. Identity is important when restarting applications.
// If the socket has no identity, each run of the application is completely
// separated from other runs. However, with identity application reconnects to
// existing infrastructure left by the previous run. Thus it may receive
// messages that were sent in the meantime, it shares pipe limits with the
// previous run etc.
// Type: string Unit: N/A Default: NULL
#define ZMQ_IDENTITY 5
// Applicable only to 'sub' socket type. Eastablishes new message filter.
// When 'sub' socket is created all the incoming messages are filtered out.
// This option allows you to subscribe for all messages ("*"), messages with
// specific topic ("x.y.z") and/or messages with specific topic prefix
// ("x.y.*"). Topic is one-byte-size-prefixed string located at
// the very beginning of the message. Multiple filters can be attached to
// a single 'sub' socket. In that case message passes if it matches at least
// one of the filters.
// Type: string Unit: N/A Default: N/A
#define ZMQ_SUBSCRIBE 6
// Applicable only to 'sub' socket type. Removes existing message filter.
// The filter specified must match the string passed to ZMQ_SUBSCRIBE options
// exactly. If there were several instances of the same filter created,
// this options removes only one of them, leaving the rest in place
// and functional.
// Type: string Unit: N/A Default: N/A
#define ZMQ_UNSUBSCRIBE 7
// This option applies only to multicast transports (pgm & udp). It specifies
// maximal outgoing data rate that an individual sender socket can send.
// Type: uint64_t Unit: kilobits/second Default: 100
#define ZMQ_RATE 8
// This option applies only to multicast transports (pgm & udp). It specifies
// how long can the receiver socket survive when the sender is inaccessible.
// Keep in mind that large recovery intervals at high data rates result in
// very large recovery buffers, meaning that you can easily overload your box
// by setting say 1 minute recovery interval at 1Gb/s rate (requires
// 7GB in-memory buffer).
// Type: uint64_t Unit: seconds Default: 10
#define ZMQ_RECOVERY_IVL 9
// This option applies only to multicast transports (pgm & udp). Value of 1
// means that the mutlicast packets can be received on the box they were sent
// from. Setting the value to 0 disables the loopback functionality which
// can have negative impact on the performance. if possible, disable
// the loopback in production environments.
// Type: uint64_t Unit: N/A (boolean value) Default: 1
#define ZMQ_MCAST_LOOP 10
// Sets an option on the socket. 'option' argument specifies the option (see
// the option list above). 'optval' is a pointer to the value to set,
// 'optvallen' is the size of the value in bytes.
//
// Errors: EINVAL - unknown option, a value with incorrect length
// or invalid value.
ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,
size_t optvallen);
// Creating connections.
// *********************
// Addresses are composed of the name of the protocol to use followed by ://
// and a protocol-specific address. Available protocols:
//
// tcp - the address is composed of IP address and port delimited by colon
// sign (:). The IP address can be a hostname (with 'connect') or
// a network interface name (with 'bind'). Examples "tcp://eth0:5555",
// "tcp://192.168.0.1:20000", "tcp://hq.mycompany.com:80".
//
// pgm & udp - both protocols have same address format. It's network interface
// to use, semicolon (;), multicast group IP address, colon (:) and
// port. Examples: "pgm://eth2;224.0.0.1:8000",
// "udp://192.168.0.111;224.1.1.1:5555".
// Bind the socket to a particular address. // Bind the socket to a particular address.
ZMQ_EXPORT int zmq_bind (void *s, const char *addr); ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
...@@ -181,12 +281,25 @@ ZMQ_EXPORT int zmq_bind (void *s, const char *addr); ...@@ -181,12 +281,25 @@ ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
// Connect the socket to a particular address. // Connect the socket to a particular address.
ZMQ_EXPORT int zmq_connect (void *s, const char *addr); ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
// Sending and receiving messages.
// *******************************
// The flag specifying that the operation should be performed in
// non-blocking mode. I.e. if it cannot be processed immediately,
// error should be returned with errno set to EAGAIN.
#define ZMQ_NOBLOCK 1
// The flag specifying that zmq_send should not flush the message downstream
// immediately. Instead, it should batch ZMQ_NOFLUSH messages and send them
// downstream only if zmq_flush is invoked. This is an optimisation for cases
// where several messages are sent in a single business transaction. However,
// the effect is measurable only in extremely high-perf scenarios
// (million messages a second or so). If that's not your case, use standard
// flushing send instead.
#define ZMQ_NOFLUSH 2
// Send the message 'msg' to the socket 's'. 'flags' argument can be // Send the message 'msg' to the socket 's'. 'flags' argument can be
// combination of following values: // combination the flags described above.
// ZMQ_NOBLOCK - if message cannot be sent, return immediately.
// ZMQ_NOFLUSH - message won't be sent immediately. It'll be sent with either
// subsequent flushing send or explicit call to zmq_flush
// function.
// //
// Errors: EAGAIN - message cannot be sent at the moment (applies only to // Errors: EAGAIN - message cannot be sent at the moment (applies only to
// non-blocking send). // non-blocking send).
...@@ -199,18 +312,28 @@ ZMQ_EXPORT int zmq_send (void *s, struct zmq_msg_t *msg, int flags); ...@@ -199,18 +312,28 @@ ZMQ_EXPORT int zmq_send (void *s, struct zmq_msg_t *msg, int flags);
ZMQ_EXPORT int zmq_flush (void *s); ZMQ_EXPORT int zmq_flush (void *s);
// Send a message from the socket 's'. 'flags' argument can be combination // Send a message from the socket 's'. 'flags' argument can be combination
// of following values: // of the flags described above.
// ZMQ_NOBLOCK - if message cannot be received, return immediately.
// //
// Errors: EAGAIN - message cannot be received at the moment (applies only to // Errors: EAGAIN - message cannot be received at the moment (applies only to
// non-blocking receive). // non-blocking receive).
// EFAULT - function isn't supported by particular socket type. // EFAULT - function isn't supported by particular socket type.
ZMQ_EXPORT int zmq_recv (void *s, struct zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_recv (void *s, struct zmq_msg_t *msg, int flags);
////////////////////////////////////////////////////////////////////////////////
// Helper functions.
////////////////////////////////////////////////////////////////////////////////
// Helper functions used by perf tests so that they don't have to care // Helper functions used by perf tests so that they don't have to care
// about minutiae of time-related functions on different OS platforms. // about minutiae of time-related functions on different OS platforms.
// Starts the stopwatch. Returns the handle to the watch.
ZMQ_EXPORT void *zmq_stopwatch_start (); ZMQ_EXPORT void *zmq_stopwatch_start ();
// Stops the stopwatch. Returns the number of microseconds elapsed since
// the stopwatch was started.
ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_); ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_);
// Sleeps for specified number of seconds.
ZMQ_EXPORT void zmq_sleep (int seconds_); ZMQ_EXPORT void zmq_sleep (int seconds_);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -32,13 +32,6 @@ namespace zmq ...@@ -32,13 +32,6 @@ namespace zmq
typedef zmq_free_fn free_fn; typedef zmq_free_fn free_fn;
enum message_type_t
{
message_data = 1 << 0,
message_gap = 1 << ZMQ_GAP,
message_delimiter = 1 << ZMQ_DELIMITER
};
// The class masquerades POSIX-style errno error as a C++ exception. // The class masquerades POSIX-style errno error as a C++ exception.
class error_t : public std::exception class error_t : public std::exception
{ {
...@@ -148,12 +141,6 @@ namespace zmq ...@@ -148,12 +141,6 @@ namespace zmq
throw error_t (); throw error_t ();
} }
// Returns message type.
inline message_type_t type ()
{
return (message_type_t) (1 << zmq_msg_type (this));
}
// Returns pointer to message's data buffer. // Returns pointer to message's data buffer.
inline void *data () inline void *data ()
{ {
......
...@@ -84,8 +84,7 @@ int zmq_msg_close (zmq_msg_t *msg_) ...@@ -84,8 +84,7 @@ int zmq_msg_close (zmq_msg_t *msg_)
{ {
// For VSMs and delimiters there are no resources to free. // For VSMs and delimiters there are no resources to free.
if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER || if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER ||
msg_->content == (zmq::msg_content_t*) ZMQ_VSM || msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return 0; return 0;
// If the content is not shared, or if it is shared and the reference. // If the content is not shared, or if it is shared and the reference.
...@@ -118,10 +117,8 @@ int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_) ...@@ -118,10 +117,8 @@ int zmq_msg_copy (zmq_msg_t *dest_, zmq_msg_t *src_)
zmq_msg_close (dest_); zmq_msg_close (dest_);
// VSMs and delimiters require no special handling. // VSMs and delimiters require no special handling.
if (src_->content != if (src_->content != (zmq::msg_content_t*) ZMQ_DELIMITER &&
(zmq::msg_content_t*) ZMQ_DELIMITER && src_->content != (zmq::msg_content_t*) ZMQ_VSM) {
src_->content != (zmq::msg_content_t*) ZMQ_VSM &&
src_->content != (zmq::msg_content_t*) ZMQ_GAP) {
// One reference is added to shared messages. Non-shared messages // One reference is added to shared messages. Non-shared messages
// are turned into shared messages and reference count is set to 2. // are turned into shared messages and reference count is set to 2.
...@@ -142,9 +139,7 @@ void *zmq_msg_data (zmq_msg_t *msg_) ...@@ -142,9 +139,7 @@ void *zmq_msg_data (zmq_msg_t *msg_)
{ {
if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_data; return msg_->vsm_data;
if (msg_->content == if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
(zmq::msg_content_t*) ZMQ_DELIMITER ||
msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return NULL; return NULL;
return ((zmq::msg_content_t*) msg_->content)->data; return ((zmq::msg_content_t*) msg_->content)->data;
...@@ -154,25 +149,12 @@ size_t zmq_msg_size (zmq_msg_t *msg_) ...@@ -154,25 +149,12 @@ size_t zmq_msg_size (zmq_msg_t *msg_)
{ {
if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM) if (msg_->content == (zmq::msg_content_t*) ZMQ_VSM)
return msg_->vsm_size; return msg_->vsm_size;
if (msg_->content == if (msg_->content == (zmq::msg_content_t*) ZMQ_DELIMITER)
(zmq::msg_content_t*) ZMQ_DELIMITER ||
msg_->content == (zmq::msg_content_t*) ZMQ_GAP)
return 0; return 0;
return ((zmq::msg_content_t*) msg_->content)->size; return ((zmq::msg_content_t*) msg_->content)->size;
} }
int zmq_msg_type (zmq_msg_t *msg_)
{
// If it's a genuine message, return 0.
if (msg_->content >= (zmq::msg_content_t*) ZMQ_VSM)
return 0;
// Trick the compiler to believe that content is an integer.
unsigned char *offset = 0;
return (((const unsigned char*) msg_->content) - offset);
}
void *zmq_init (int app_threads_, int io_threads_, int flags_) void *zmq_init (int app_threads_, int io_threads_, int flags_)
{ {
// There should be at least a single thread managed by the dispatcher. // There should be at least a single thread managed by the dispatcher.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment