Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
50a8b9ea
Commit
50a8b9ea
authored
Sep 20, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
'flags' parameter added to zmq_init
parent
edecf75b
Hide whitespace changes
Inline
Side-by-side
Showing
26 changed files
with
65 additions
and
41 deletions
+65
-41
zmq.h
bindings/c/zmq.h
+5
-1
zmq.hpp
bindings/cpp/zmq.hpp
+2
-2
Context.cpp
bindings/java/Context.cpp
+2
-2
Context.java
bindings/java/org/zmq/Context.java
+5
-3
pyzmq.cpp
bindings/python/pyzmq.cpp
+8
-5
rbzmq.cpp
bindings/ruby/rbzmq.cpp
+7
-4
local_lat.c
perf/c/local_lat.c
+1
-1
local_thr.c
perf/c/local_thr.c
+1
-1
remote_lat.c
perf/c/remote_lat.c
+1
-1
remote_thr.c
perf/c/remote_thr.c
+1
-1
local_lat.java
perf/java/local_lat.java
+1
-1
local_thr.java
perf/java/local_thr.java
+1
-1
remote_lat.java
perf/java/remote_lat.java
+1
-1
remote_thr.java
perf/java/remote_thr.java
+1
-1
local_lat.rb
perf/ruby/local_lat.rb
+1
-1
local_thr.rb
perf/ruby/local_thr.rb
+1
-1
remote_lat.rb
perf/ruby/remote_lat.rb
+1
-1
remote_thr.rb
perf/ruby/remote_thr.rb
+1
-1
app_thread.cpp
src/app_thread.cpp
+2
-1
app_thread.hpp
src/app_thread.hpp
+2
-1
dispatcher.cpp
src/dispatcher.cpp
+5
-3
dispatcher.hpp
src/dispatcher.hpp
+1
-1
fd_signaler.cpp
src/fd_signaler.cpp
+6
-0
io_thread.cpp
src/io_thread.cpp
+2
-1
io_thread.hpp
src/io_thread.hpp
+2
-1
zmq.cpp
src/zmq.cpp
+4
-4
No files found.
bindings/c/zmq.h
View file @
50a8b9ea
...
...
@@ -91,6 +91,10 @@ extern "C" {
// the peer that the previous recv delivered message from.
#define ZMQ_REP 4
// Option specifying that the sockets should be pollable. This may be a little
// less efficient that raw non-pollable sockets.
#define ZMQ_POLL 1
// Prototype for the message body deallocation functions.
// It is deliberately defined in the way to comply with standard C free.
typedef
void
(
zmq_free_fn
)
(
void
*
data
);
...
...
@@ -150,7 +154,7 @@ ZMQ_EXPORT int zmq_msg_type (struct zmq_msg_t *msg);
//
// Errors: EINVAL - one of the arguments is less than zero or there are no
// threads declared at all.
ZMQ_EXPORT
void
*
zmq_init
(
int
app_threads
,
int
io_threads
);
ZMQ_EXPORT
void
*
zmq_init
(
int
app_threads
,
int
io_threads
,
int
flags
);
// Deinitialise 0MQ context including all the open sockets. Closing
// sockets after zmq_term has been called will result in undefined behaviour.
...
...
bindings/cpp/zmq.hpp
View file @
50a8b9ea
...
...
@@ -180,9 +180,9 @@ namespace zmq
public
:
inline
context_t
(
int
app_threads_
,
int
io_threads_
)
inline
context_t
(
int
app_threads_
,
int
io_threads_
,
int
flags_
=
0
)
{
ptr
=
zmq_init
(
app_threads_
,
io_threads_
);
ptr
=
zmq_init
(
app_threads_
,
io_threads_
,
flags_
);
if
(
ptr
==
NULL
)
throw
error_t
();
}
...
...
bindings/java/Context.cpp
View file @
50a8b9ea
...
...
@@ -52,7 +52,7 @@ static void raise_exception (JNIEnv *env, int err)
}
JNIEXPORT
void
JNICALL
Java_org_zmq_Context_construct
(
JNIEnv
*
env
,
jobject
obj
,
jint
app_threads
,
jint
io_threads
)
jint
app_threads
,
jint
io_threads
,
jint
flags
)
{
if
(
ctx_handle_fid
==
NULL
)
{
jclass
cls
=
env
->
GetObjectClass
(
obj
);
...
...
@@ -62,7 +62,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Context_construct (JNIEnv *env, jobject obj,
env
->
DeleteLocalRef
(
cls
);
}
void
*
ctx
=
zmq_init
(
app_threads
,
io_threads
);
void
*
ctx
=
zmq_init
(
app_threads
,
io_threads
,
flags
);
if
(
ctx
==
NULL
)
{
raise_exception
(
env
,
errno
);
return
;
...
...
bindings/java/org/zmq/Context.java
View file @
50a8b9ea
...
...
@@ -24,14 +24,16 @@ public class Context {
System
.
loadLibrary
(
"jzmq"
);
}
public
static
final
int
POLL
=
1
;
/**
* Class constructor.
*
* @param appThreads maximum number of application threads.
* @param ioThreads size of the threads pool to handle I/O operations.
*/
public
Context
(
int
appThreads
,
int
ioThreads
)
{
construct
(
appThreads
,
ioThreads
);
public
Context
(
int
appThreads
,
int
ioThreads
,
int
flags
)
{
construct
(
appThreads
,
ioThreads
,
flags
);
}
/**
...
...
@@ -40,7 +42,7 @@ public class Context {
public
native
long
createSocket
(
int
type
);
/** Initialize the JNI interface */
protected
native
void
construct
(
int
appThreads
,
int
ioThreads
);
protected
native
void
construct
(
int
appThreads
,
int
ioThreads
,
int
flags
);
/** Free resources used by JNI driver. */
protected
native
void
finalize
();
...
...
bindings/python/pyzmq.cpp
View file @
50a8b9ea
...
...
@@ -53,15 +53,16 @@ int context_init (context_t *self, PyObject *args, PyObject *kwdict)
{
int
app_threads
;
int
io_threads
;
static
const
char
*
kwlist
[]
=
{
"app_threads"
,
"io_threads"
,
NULL
};
if
(
!
PyArg_ParseTupleAndKeywords
(
args
,
kwdict
,
"ii"
,
(
char
**
)
kwlist
,
&
app_threads
,
&
io_threads
))
{
int
flags
=
0
;
static
const
char
*
kwlist
[]
=
{
"app_threads"
,
"io_threads"
,
"flags"
,
NULL
};
if
(
!
PyArg_ParseTupleAndKeywords
(
args
,
kwdict
,
"ii|i"
,
(
char
**
)
kwlist
,
&
app_threads
,
&
io_threads
,
&
flags
))
{
PyErr_SetString
(
PyExc_SystemError
,
"invalid arguments"
);
return
-
1
;
}
assert
(
!
self
->
handle
);
self
->
handle
=
zmq_init
(
app_threads
,
io_threads
);
self
->
handle
=
zmq_init
(
app_threads
,
io_threads
,
flags
);
if
(
!
self
->
handle
)
{
PyErr_SetString
(
PyExc_SystemError
,
strerror
(
errno
));
return
-
1
;
...
...
@@ -522,7 +523,9 @@ PyMODINIT_FUNC initlibpyzmq ()
t
=
PyInt_FromLong
(
ZMQ_MCAST_LOOP
);
PyDict_SetItemString
(
dict
,
"MCAST_LOOP"
,
t
);
Py_DECREF
(
t
);
t
=
PyInt_FromLong
(
ZMQ_POLL
);
PyDict_SetItemString
(
dict
,
"POLL"
,
t
);
Py_DECREF
(
t
);
}
#if defined _MSC_VER
...
...
bindings/ruby/rbzmq.cpp
View file @
50a8b9ea
...
...
@@ -38,10 +38,11 @@ static VALUE context_alloc (VALUE class_)
}
static
VALUE
context_initialize
(
VALUE
self_
,
VALUE
app_threads_
,
VALUE
io_threads_
)
VALUE
io_threads_
,
VALUE
flags_
)
{
assert
(
!
DATA_PTR
(
self_
));
void
*
ctx
=
zmq_init
(
NUM2INT
(
app_threads_
),
NUM2INT
(
io_threads_
));
void
*
ctx
=
zmq_init
(
NUM2INT
(
app_threads_
),
NUM2INT
(
io_threads_
),
NUM2INT
(
flags_
));
if
(
!
ctx
)
{
rb_raise
(
rb_eRuntimeError
,
strerror
(
errno
));
return
Qnil
;
...
...
@@ -105,8 +106,8 @@ static VALUE socket_setsockopt (VALUE self_, VALUE option_,
rc
=
zmq_setsockopt
(
DATA_PTR
(
self_
),
NUM2INT
(
option_
),
(
void
*
)
&
optval
,
4
);
}
break
;
case
ZMQ_IDENTITY
:
case
ZMQ_SUBSCRIBE
:
case
ZMQ_UNSUBSCRIBE
:
...
...
@@ -236,7 +237,7 @@ extern "C" void Init_librbzmq ()
VALUE
context_type
=
rb_define_class
(
"Context"
,
rb_cObject
);
rb_define_alloc_func
(
context_type
,
context_alloc
);
rb_define_method
(
context_type
,
"initialize"
,
(
VALUE
(
*
)(...))
context_initialize
,
2
);
(
VALUE
(
*
)(...))
context_initialize
,
3
);
VALUE
socket_type
=
rb_define_class
(
"Socket"
,
rb_cObject
);
rb_define_alloc_func
(
socket_type
,
socket_alloc
);
...
...
@@ -274,4 +275,6 @@ extern "C" void Init_librbzmq ()
rb_define_global_const
(
"PUB"
,
INT2NUM
(
ZMQ_PUB
));
rb_define_global_const
(
"REQ"
,
INT2NUM
(
ZMQ_REQ
));
rb_define_global_const
(
"REP"
,
INT2NUM
(
ZMQ_REP
));
rb_define_global_const
(
"POLL"
,
INT2NUM
(
ZMQ_POLL
));
}
perf/c/local_lat.c
View file @
50a8b9ea
...
...
@@ -42,7 +42,7 @@ int main (int argc, char *argv [])
message_size
=
atoi
(
argv
[
2
]);
roundtrip_count
=
atoi
(
argv
[
3
]);
ctx
=
zmq_init
(
1
,
1
);
ctx
=
zmq_init
(
1
,
1
,
0
);
assert
(
ctx
);
s
=
zmq_socket
(
ctx
,
ZMQ_REP
);
...
...
perf/c/local_thr.c
View file @
50a8b9ea
...
...
@@ -45,7 +45,7 @@ int main (int argc, char *argv [])
message_size
=
atoi
(
argv
[
2
]);
message_count
=
atoi
(
argv
[
3
]);
ctx
=
zmq_init
(
1
,
1
);
ctx
=
zmq_init
(
1
,
1
,
0
);
assert
(
ctx
);
s
=
zmq_socket
(
ctx
,
ZMQ_SUB
);
...
...
perf/c/remote_lat.c
View file @
50a8b9ea
...
...
@@ -46,7 +46,7 @@ int main (int argc, char *argv [])
message_size
=
atoi
(
argv
[
2
]);
roundtrip_count
=
atoi
(
argv
[
3
]);
ctx
=
zmq_init
(
1
,
1
);
ctx
=
zmq_init
(
1
,
1
,
0
);
assert
(
ctx
);
s
=
zmq_socket
(
ctx
,
ZMQ_REQ
);
...
...
perf/c/remote_thr.c
View file @
50a8b9ea
...
...
@@ -42,7 +42,7 @@ int main (int argc, char *argv [])
message_size
=
atoi
(
argv
[
2
]);
message_count
=
atoi
(
argv
[
3
]);
ctx
=
zmq_init
(
1
,
1
);
ctx
=
zmq_init
(
1
,
1
,
0
);
assert
(
ctx
);
s
=
zmq_socket
(
ctx
,
ZMQ_PUB
);
...
...
perf/java/local_lat.java
View file @
50a8b9ea
...
...
@@ -33,7 +33,7 @@ class local_lat
int
messageSize
=
Integer
.
parseInt
(
args
[
1
]);
int
roundtripCount
=
Integer
.
parseInt
(
args
[
2
]);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
,
0
);
org
.
zmq
.
Socket
s
=
new
org
.
zmq
.
Socket
(
ctx
,
org
.
zmq
.
Socket
.
REP
);
s
.
bind
(
bindTo
);
...
...
perf/java/local_thr.java
View file @
50a8b9ea
...
...
@@ -33,7 +33,7 @@ class local_thr
long
messageSize
=
Integer
.
parseInt
(
args
[
1
]);
long
messageCount
=
Integer
.
parseInt
(
args
[
2
]);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
,
0
);
org
.
zmq
.
Socket
s
=
new
org
.
zmq
.
Socket
(
ctx
,
org
.
zmq
.
Socket
.
SUB
);
...
...
perf/java/remote_lat.java
View file @
50a8b9ea
...
...
@@ -33,7 +33,7 @@ class remote_lat
int
messageSize
=
Integer
.
parseInt
(
args
[
1
]);
int
roundtripCount
=
Integer
.
parseInt
(
args
[
2
]);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
,
0
);
org
.
zmq
.
Socket
s
=
new
org
.
zmq
.
Socket
(
ctx
,
org
.
zmq
.
Socket
.
REQ
);
s
.
connect
(
connectTo
);
...
...
perf/java/remote_thr.java
View file @
50a8b9ea
...
...
@@ -34,7 +34,7 @@ class remote_thr
int
messageSize
=
Integer
.
parseInt
(
args
[
1
]);
int
messageCount
=
Integer
.
parseInt
(
args
[
2
]);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
,
0
);
org
.
zmq
.
Socket
s
=
new
org
.
zmq
.
Socket
(
ctx
,
org
.
zmq
.
Socket
.
PUB
);
...
...
perf/ruby/local_lat.rb
View file @
50a8b9ea
...
...
@@ -27,7 +27,7 @@ bind_to = ARGV[0]
message_size
=
ARGV
[
1
].
to_i
roundtrip_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
ctx
=
Context
.
new
(
1
,
1
,
0
)
s
=
Socket
.
new
(
ctx
,
REP
);
s
.
bind
(
bind_to
);
...
...
perf/ruby/local_thr.rb
View file @
50a8b9ea
...
...
@@ -27,7 +27,7 @@ bind_to = ARGV[0]
message_size
=
ARGV
[
1
].
to_i
message_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
ctx
=
Context
.
new
(
1
,
1
,
0
)
s
=
Socket
.
new
(
ctx
,
SUB
);
s
.
setsockopt
(
SUBSCRIBE
,
"*"
);
...
...
perf/ruby/remote_lat.rb
View file @
50a8b9ea
...
...
@@ -27,7 +27,7 @@ connect_to = ARGV[0]
message_size
=
ARGV
[
1
].
to_i
roundtrip_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
ctx
=
Context
.
new
(
1
,
1
,
0
)
s
=
Socket
.
new
(
ctx
,
REQ
);
s
.
connect
(
connect_to
);
...
...
perf/ruby/remote_thr.rb
View file @
50a8b9ea
...
...
@@ -27,7 +27,7 @@ connect_to = ARGV[0]
message_size
=
ARGV
[
1
].
to_i
message_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
ctx
=
Context
.
new
(
1
,
1
,
0
)
s
=
Socket
.
new
(
ctx
,
PUB
);
# Add your socket options here.
...
...
src/app_thread.cpp
View file @
50a8b9ea
...
...
@@ -46,7 +46,8 @@
#define ZMQ_DELAY_COMMANDS
#endif
zmq
::
app_thread_t
::
app_thread_t
(
dispatcher_t
*
dispatcher_
,
int
thread_slot_
)
:
zmq
::
app_thread_t
::
app_thread_t
(
dispatcher_t
*
dispatcher_
,
int
thread_slot_
,
int
flags_
)
:
object_t
(
dispatcher_
,
thread_slot_
),
associated
(
false
),
last_processing_time
(
0
)
...
...
src/app_thread.hpp
View file @
50a8b9ea
...
...
@@ -34,7 +34,8 @@ namespace zmq
{
public
:
app_thread_t
(
class
dispatcher_t
*
dispatcher_
,
int
thread_slot_
);
app_thread_t
(
class
dispatcher_t
*
dispatcher_
,
int
thread_slot_
,
int
flags_
);
~
app_thread_t
();
...
...
src/dispatcher.cpp
View file @
50a8b9ea
...
...
@@ -30,7 +30,8 @@
#include "windows.h"
#endif
zmq
::
dispatcher_t
::
dispatcher_t
(
int
app_threads_
,
int
io_threads_
)
:
zmq
::
dispatcher_t
::
dispatcher_t
(
int
app_threads_
,
int
io_threads_
,
int
flags_
)
:
sockets
(
0
),
terminated
(
false
)
{
...
...
@@ -47,7 +48,7 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
// Create application thread proxies.
for
(
int
i
=
0
;
i
!=
app_threads_
;
i
++
)
{
app_thread_t
*
app_thread
=
new
app_thread_t
(
this
,
i
);
app_thread_t
*
app_thread
=
new
app_thread_t
(
this
,
i
,
flags_
);
zmq_assert
(
app_thread
);
app_threads
.
push_back
(
app_thread
);
signalers
.
push_back
(
app_thread
->
get_signaler
());
...
...
@@ -55,7 +56,8 @@ zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_) :
// Create I/O thread objects.
for
(
int
i
=
0
;
i
!=
io_threads_
;
i
++
)
{
io_thread_t
*
io_thread
=
new
io_thread_t
(
this
,
i
+
app_threads_
);
io_thread_t
*
io_thread
=
new
io_thread_t
(
this
,
i
+
app_threads_
,
flags_
);
zmq_assert
(
io_thread
);
io_threads
.
push_back
(
io_thread
);
signalers
.
push_back
(
io_thread
->
get_signaler
());
...
...
src/dispatcher.hpp
View file @
50a8b9ea
...
...
@@ -50,7 +50,7 @@ namespace zmq
// Create the dispatcher object. Matrix of pipes to communicate between
// each socket and each I/O thread is created along with appropriate
// signalers.
dispatcher_t
(
int
app_threads_
,
int
io_threads_
);
dispatcher_t
(
int
app_threads_
,
int
io_threads_
,
int
flags_
);
// This function is called when user invokes zmq_term. If there are
// no more sockets open it'll cause all the infrastructure to be shut
...
...
src/fd_signaler.cpp
View file @
50a8b9ea
...
...
@@ -85,6 +85,12 @@ zmq::fd_t zmq::fd_signaler_t::get_fd ()
zmq
::
fd_signaler_t
::
fd_signaler_t
()
{
// Windows have no 'socketpair' function.
// Here we create the socketpair by hand.
// TODO: Check Windows pipe (CreatePipe). It'll presumably be more
// efficient than the socketpair.
struct
sockaddr_in
addr
;
SOCKET
listener
;
int
addrlen
=
sizeof
(
addr
);
...
...
src/io_thread.cpp
View file @
50a8b9ea
...
...
@@ -32,7 +32,8 @@
#include "dispatcher.hpp"
#include "simple_semaphore.hpp"
zmq
::
io_thread_t
::
io_thread_t
(
dispatcher_t
*
dispatcher_
,
int
thread_slot_
)
:
zmq
::
io_thread_t
::
io_thread_t
(
dispatcher_t
*
dispatcher_
,
int
thread_slot_
,
int
flags_
)
:
object_t
(
dispatcher_
,
thread_slot_
)
{
#if defined ZMQ_FORCE_SELECT
...
...
src/io_thread.hpp
View file @
50a8b9ea
...
...
@@ -37,7 +37,8 @@ namespace zmq
{
public
:
io_thread_t
(
class
dispatcher_t
*
dispatcher_
,
int
thread_slot_
);
io_thread_t
(
class
dispatcher_t
*
dispatcher_
,
int
thread_slot_
,
int
flags_
);
// Clean-up. If the thread was started, it's neccessary to call 'stop'
// before invoking destructor. Otherwise the destructor would hang up.
...
...
src/zmq.cpp
View file @
50a8b9ea
...
...
@@ -173,17 +173,17 @@ int zmq_msg_type (zmq_msg_t *msg_)
return
(((
const
unsigned
char
*
)
msg_
->
content
)
-
offset
);
}
void
*
zmq_init
(
int
app_threads_
,
int
io_threads_
)
void
*
zmq_init
(
int
app_threads_
,
int
io_threads_
,
int
flags_
)
{
// There should be at least a single thread managed by the dispatcher.
if
(
app_threads_
<
0
||
io_threads_
<
0
||
app_threads_
+
io_threads_
==
0
)
{
if
(
app_threads_
<
=
0
||
io_threads_
<=
0
||
app_threads_
>
63
||
io_threads_
>
63
)
{
errno
=
EINVAL
;
return
NULL
;
}
zmq
::
dispatcher_t
*
dispatcher
=
new
zmq
::
dispatcher_t
(
app_threads_
,
io_threads_
);
io_threads_
,
flags_
);
zmq_assert
(
dispatcher
);
return
(
void
*
)
dispatcher
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment