Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
a71d0028
Commit
a71d0028
authored
Sep 16, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' of git@github.com:sustrik/zeromq2
parents
4631fde7
7a5db604
Hide whitespace changes
Inline
Side-by-side
Showing
16 changed files
with
149 additions
and
71 deletions
+149
-71
zmq.h
c/zmq.h
+1
-1
Socket.cpp
java/Socket.cpp
+1
-0
Socket.java
java/org/zmq/Socket.java
+1
-0
local_thr.c
perf/c/local_thr.c
+7
-1
remote_thr.c
perf/c/remote_thr.c
+4
-1
local_thr.cpp
perf/cpp/local_thr.cpp
+7
-1
remote_thr.cpp
perf/cpp/remote_thr.cpp
+5
-1
local_thr.java
perf/java/local_thr.java
+37
-31
remote_thr.java
perf/java/remote_thr.java
+32
-28
local_thr.py
perf/python/local_thr.py
+7
-1
remote_thr.py
perf/python/remote_thr.py
+5
-1
local_thr.rb
perf/ruby/local_thr.rb
+6
-1
remote_thr.rb
perf/ruby/remote_thr.rb
+5
-1
pyzmq.cpp
python/pyzmq.cpp
+17
-1
rbzmq.cpp
ruby/rbzmq.cpp
+5
-0
socket_base.cpp
src/socket_base.cpp
+9
-2
No files found.
c/zmq.h
View file @
a71d0028
...
@@ -53,7 +53,7 @@ extern "C" {
...
@@ -53,7 +53,7 @@ extern "C" {
#define ZMQ_UNSUBSCRIBE 7 // string
#define ZMQ_UNSUBSCRIBE 7 // string
#define ZMQ_RATE 8 // int64_t
#define ZMQ_RATE 8 // int64_t
#define ZMQ_RECOVERY_IVL 9 // int64_t
#define ZMQ_RECOVERY_IVL 9 // int64_t
#define ZMQ_MCAST_LOOP 10 //
boolean
#define ZMQ_MCAST_LOOP 10 //
int64_t
// The operation should be performed in non-blocking mode. I.e. if it cannot
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.
// be processed immediately, error should be returned with errno set to EAGAIN.
...
...
java/Socket.cpp
View file @
a71d0028
...
@@ -98,6 +98,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__IJ (JNIEnv *env,
...
@@ -98,6 +98,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__IJ (JNIEnv *env,
case
ZMQ_AFFINITY
:
case
ZMQ_AFFINITY
:
case
ZMQ_RATE
:
case
ZMQ_RATE
:
case
ZMQ_RECOVERY_IVL
:
case
ZMQ_RECOVERY_IVL
:
case
ZMQ_MCAST_LOOP
:
{
{
void
*
s
=
(
void
*
)
env
->
GetLongField
(
obj
,
socket_handle_fid
);
void
*
s
=
(
void
*
)
env
->
GetLongField
(
obj
,
socket_handle_fid
);
assert
(
s
);
assert
(
s
);
...
...
java/org/zmq/Socket.java
View file @
a71d0028
...
@@ -44,6 +44,7 @@ public class Socket
...
@@ -44,6 +44,7 @@ public class Socket
public
static
final
int
UNSUBSCRIBE
=
7
;
public
static
final
int
UNSUBSCRIBE
=
7
;
public
static
final
int
RATE
=
8
;
public
static
final
int
RATE
=
8
;
public
static
final
int
RECOVERY_IVL
=
9
;
public
static
final
int
RECOVERY_IVL
=
9
;
public
static
final
int
MCAST_LOOP
=
10
;
/**
/**
* Class constructor.
* Class constructor.
...
...
perf/c/local_thr.c
View file @
a71d0028
...
@@ -48,9 +48,15 @@ int main (int argc, char *argv [])
...
@@ -48,9 +48,15 @@ int main (int argc, char *argv [])
ctx
=
zmq_init
(
1
,
1
);
ctx
=
zmq_init
(
1
,
1
);
assert
(
ctx
);
assert
(
ctx
);
s
=
zmq_socket
(
ctx
,
ZMQ_
P2P
);
s
=
zmq_socket
(
ctx
,
ZMQ_
SUB
);
assert
(
s
);
assert
(
s
);
rc
=
zmq_setsockopt
(
s
,
ZMQ_SUBSCRIBE
,
"*"
,
1
);
assert
(
rc
==
0
);
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
rc
=
zmq_bind
(
s
,
bind_to
);
rc
=
zmq_bind
(
s
,
bind_to
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
...
perf/c/remote_thr.c
View file @
a71d0028
...
@@ -45,9 +45,12 @@ int main (int argc, char *argv [])
...
@@ -45,9 +45,12 @@ int main (int argc, char *argv [])
ctx
=
zmq_init
(
1
,
1
);
ctx
=
zmq_init
(
1
,
1
);
assert
(
ctx
);
assert
(
ctx
);
s
=
zmq_socket
(
ctx
,
ZMQ_P
2P
);
s
=
zmq_socket
(
ctx
,
ZMQ_P
UB
);
assert
(
s
);
assert
(
s
);
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
rc
=
zmq_connect
(
s
,
connect_to
);
rc
=
zmq_connect
(
s
,
connect_to
);
assert
(
rc
==
0
);
assert
(
rc
==
0
);
...
...
perf/cpp/local_thr.cpp
View file @
a71d0028
...
@@ -36,7 +36,13 @@ int main (int argc, char *argv [])
...
@@ -36,7 +36,13 @@ int main (int argc, char *argv [])
zmq
::
context_t
ctx
(
1
,
1
);
zmq
::
context_t
ctx
(
1
,
1
);
zmq
::
socket_t
s
(
ctx
,
ZMQ_P2P
);
zmq
::
socket_t
s
(
ctx
,
ZMQ_SUB
);
s
.
setsockopt
(
ZMQ_SUBSCRIBE
,
"*"
,
1
);
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s
.
bind
(
bind_to
);
s
.
bind
(
bind_to
);
zmq
::
message_t
msg
;
zmq
::
message_t
msg
;
...
...
perf/cpp/remote_thr.cpp
View file @
a71d0028
...
@@ -36,7 +36,11 @@ int main (int argc, char *argv [])
...
@@ -36,7 +36,11 @@ int main (int argc, char *argv [])
zmq
::
context_t
ctx
(
1
,
1
);
zmq
::
context_t
ctx
(
1
,
1
);
zmq
::
socket_t
s
(
ctx
,
ZMQ_P2P
);
zmq
::
socket_t
s
(
ctx
,
ZMQ_PUB
);
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s
.
connect
(
connect_to
);
s
.
connect
(
connect_to
);
for
(
int
i
=
0
;
i
!=
message_count
;
i
++
)
{
for
(
int
i
=
0
;
i
!=
message_count
;
i
++
)
{
...
...
perf/java/local_thr.java
View file @
a71d0028
...
@@ -21,45 +21,51 @@ import org.zmq.*;
...
@@ -21,45 +21,51 @@ import org.zmq.*;
class
local_thr
class
local_thr
{
{
public
static
void
main
(
String
[]
args
)
public
static
void
main
(
String
[]
args
)
{
{
if
(
args
.
length
!=
3
)
{
if
(
args
.
length
!=
3
)
{
System
.
out
.
println
(
"usage: local_thr <bind-to> "
+
System
.
out
.
println
(
"usage: local_thr <bind-to> "
+
"<message size> <message count>"
);
"<message size> <message count>"
);
return
;
return
;
}
}
String
bindTo
=
args
[
0
];
String
bindTo
=
args
[
0
];
long
messageSize
=
Integer
.
parseInt
(
args
[
1
]);
long
messageSize
=
Integer
.
parseInt
(
args
[
1
]);
long
messageCount
=
Integer
.
parseInt
(
args
[
2
]);
long
messageCount
=
Integer
.
parseInt
(
args
[
2
]);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
);
org
.
zmq
.
Socket
s
=
new
org
.
zmq
.
Socket
(
ctx
,
org
.
zmq
.
Socket
.
P2P
);
org
.
zmq
.
Socket
s
=
new
org
.
zmq
.
Socket
(
ctx
,
org
.
zmq
.
Socket
.
SUB
);
s
.
bind
(
bindTo
);
byte
[]
data
=
s
.
recv
(
0
);
s
.
setsockopt
(
org
.
zmq
.
Socket
.
SUBSCRIBE
,
"*"
);
assert
(
data
.
length
==
messageSize
);
long
start
=
System
.
currentTimeMillis
();
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
for
(
int
i
=
1
;
i
!=
messageCount
;
i
++)
{
s
.
bind
(
bindTo
);
data
=
s
.
recv
(
0
);
assert
(
data
.
length
==
messageSize
);
}
long
end
=
System
.
currentTimeMillis
();
byte
[]
data
=
s
.
recv
(
0
);
assert
(
data
.
length
==
messageSize
);
long
elapsed
=
(
end
-
start
)
*
1000
;
long
start
=
System
.
currentTimeMillis
();
if
(
elapsed
==
0
)
elapsed
=
1
;
long
throughput
=
messageCount
*
1000000
/
elapsed
;
for
(
int
i
=
1
;
i
!=
messageCount
;
i
++)
{
double
megabits
=
(
double
)
(
throughput
*
messageSize
*
8
)
/
1000000
;
data
=
s
.
recv
(
0
);
assert
(
data
.
length
==
messageSize
);
}
System
.
out
.
println
(
"message size: "
+
messageSize
+
" [B]"
);
long
end
=
System
.
currentTimeMillis
();
System
.
out
.
println
(
"message count: "
+
messageCount
);
System
.
out
.
println
(
"mean throughput: "
+
throughput
+
"[msg/s]"
);
long
elapsed
=
(
end
-
start
)
*
1000
;
System
.
out
.
println
(
"mean throughput: "
+
megabits
+
"[Mb/s]"
);
if
(
elapsed
==
0
)
}
elapsed
=
1
;
long
throughput
=
messageCount
*
1000000
/
elapsed
;
double
megabits
=
(
double
)
(
throughput
*
messageSize
*
8
)
/
1000000
;
System
.
out
.
println
(
"message size: "
+
messageSize
+
" [B]"
);
System
.
out
.
println
(
"message count: "
+
messageCount
);
System
.
out
.
println
(
"mean throughput: "
+
throughput
+
"[msg/s]"
);
System
.
out
.
println
(
"mean throughput: "
+
megabits
+
"[Mb/s]"
);
}
}
}
perf/java/remote_thr.java
View file @
a71d0028
...
@@ -21,33 +21,37 @@ import org.zmq.*;
...
@@ -21,33 +21,37 @@ import org.zmq.*;
class
remote_thr
class
remote_thr
{
{
public
static
void
main
(
String
[]
args
)
public
static
void
main
(
String
[]
args
)
{
{
if
(
args
.
length
!=
3
)
{
if
(
args
.
length
!=
3
)
{
System
.
out
.
println
(
"usage: remote_thr <connect-to> "
+
System
.
out
.
println
(
"usage: remote_thr <connect-to> "
+
"<message-size> <message-count>"
);
"<message-size> <message-count>"
);
return
;
return
;
}
}
// Parse the command line arguments.
// Parse the command line arguments.
String
connectTo
=
args
[
0
];
String
connectTo
=
args
[
0
];
int
messageSize
=
Integer
.
parseInt
(
args
[
1
]);
int
messageSize
=
Integer
.
parseInt
(
args
[
1
]);
int
messageCount
=
Integer
.
parseInt
(
args
[
2
]);
int
messageCount
=
Integer
.
parseInt
(
args
[
2
]);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
);
org
.
zmq
.
Context
ctx
=
new
org
.
zmq
.
Context
(
1
,
1
);
org
.
zmq
.
Socket
s
=
new
org
.
zmq
.
Socket
(
ctx
,
org
.
zmq
.
Socket
.
P2P
);
org
.
zmq
.
Socket
s
=
new
org
.
zmq
.
Socket
(
ctx
,
org
.
zmq
.
Socket
.
PUB
);
s
.
connect
(
connectTo
);
// Add your socket options here.
byte
msg
[]
=
new
byte
[
messageSize
];
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
for
(
int
i
=
0
;
i
!=
messageCount
;
i
++)
s
.
send
(
msg
,
0
);
s
.
connect
(
connectTo
);
try
{
byte
msg
[]
=
new
byte
[
messageSize
];
Thread
.
sleep
(
10000
);
for
(
int
i
=
0
;
i
!=
messageCount
;
i
++)
}
s
.
send
(
msg
,
0
);
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
try
{
}
Thread
.
sleep
(
10000
);
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
}
}
}
}
perf/python/local_thr.py
View file @
a71d0028
...
@@ -35,7 +35,13 @@ def main ():
...
@@ -35,7 +35,13 @@ def main ():
sys
.
exit
(
1
)
sys
.
exit
(
1
)
ctx
=
libpyzmq
.
Context
(
1
,
1
);
ctx
=
libpyzmq
.
Context
(
1
,
1
);
s
=
libpyzmq
.
Socket
(
ctx
,
libpyzmq
.
P2P
)
s
=
libpyzmq
.
Socket
(
ctx
,
libpyzmq
.
SUB
)
s
.
setsockopt
(
libpyzmq
.
SUBSCRIBE
,
"*"
);
# Add your socket options here.
# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s
.
bind
(
bind_to
)
s
.
bind
(
bind_to
)
msg
=
s
.
recv
()
msg
=
s
.
recv
()
...
...
perf/python/remote_thr.py
View file @
a71d0028
...
@@ -35,7 +35,11 @@ def main ():
...
@@ -35,7 +35,11 @@ def main ():
sys
.
exit
(
1
)
sys
.
exit
(
1
)
ctx
=
libpyzmq
.
Context
(
1
,
1
);
ctx
=
libpyzmq
.
Context
(
1
,
1
);
s
=
libpyzmq
.
Socket
(
ctx
,
libpyzmq
.
P2P
)
s
=
libpyzmq
.
Socket
(
ctx
,
libpyzmq
.
PUB
)
# Add your socket options here.
# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s
.
connect
(
connect_to
)
s
.
connect
(
connect_to
)
msg
=
''
.
join
([
' '
for
n
in
range
(
0
,
message_size
)])
msg
=
''
.
join
([
' '
for
n
in
range
(
0
,
message_size
)])
...
...
perf/ruby/local_thr.rb
View file @
a71d0028
...
@@ -28,7 +28,12 @@ message_size = ARGV[1].to_i
...
@@ -28,7 +28,12 @@ message_size = ARGV[1].to_i
message_count
=
ARGV
[
2
].
to_i
message_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
ctx
=
Context
.
new
(
1
,
1
)
s
=
Socket
.
new
(
ctx
,
P2P
);
s
=
Socket
.
new
(
ctx
,
SUB
);
s
.
setsockopt
(
SUBSCRIBE
,
"*"
);
# Add your socket options here.
# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s
.
bind
(
bind_to
);
s
.
bind
(
bind_to
);
msg
=
s
.
recv
(
0
)
msg
=
s
.
recv
(
0
)
...
...
perf/ruby/remote_thr.rb
View file @
a71d0028
...
@@ -28,7 +28,11 @@ message_size = ARGV[1].to_i
...
@@ -28,7 +28,11 @@ message_size = ARGV[1].to_i
message_count
=
ARGV
[
2
].
to_i
message_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
ctx
=
Context
.
new
(
1
,
1
)
s
=
Socket
.
new
(
ctx
,
P2P
);
s
=
Socket
.
new
(
ctx
,
PUB
);
# Add your socket options here.
# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s
.
connect
(
connect_to
);
s
.
connect
(
connect_to
);
msg
=
"
#{
'0'
*
message_size
}
"
msg
=
"
#{
'0'
*
message_size
}
"
...
...
python/pyzmq.cpp
View file @
a71d0028
...
@@ -453,7 +453,7 @@ PyMODINIT_FUNC initlibpyzmq ()
...
@@ -453,7 +453,7 @@ PyMODINIT_FUNC initlibpyzmq ()
PyObject
*
dict
=
PyModule_GetDict
(
module
);
PyObject
*
dict
=
PyModule_GetDict
(
module
);
assert
(
dict
);
assert
(
dict
);
PyObject
*
t
;
PyObject
*
t
;
t
=
PyInt_FromLong
(
ZMQ_NOBLOCK
);
t
=
PyInt_FromLong
(
ZMQ_NOBLOCK
);
PyDict_SetItemString
(
dict
,
"NOBLOCK"
,
t
);
PyDict_SetItemString
(
dict
,
"NOBLOCK"
,
t
);
Py_DECREF
(
t
);
Py_DECREF
(
t
);
...
@@ -489,7 +489,23 @@ PyMODINIT_FUNC initlibpyzmq ()
...
@@ -489,7 +489,23 @@ PyMODINIT_FUNC initlibpyzmq ()
Py_DECREF
(
t
);
Py_DECREF
(
t
);
t
=
PyInt_FromLong
(
ZMQ_IDENTITY
);
t
=
PyInt_FromLong
(
ZMQ_IDENTITY
);
PyDict_SetItemString
(
dict
,
"IDENTITY"
,
t
);
PyDict_SetItemString
(
dict
,
"IDENTITY"
,
t
);
Py_DECREF
(
t
);
t
=
PyInt_FromLong
(
ZMQ_SUBSCRIBE
);
PyDict_SetItemString
(
dict
,
"SUBSCRIBE"
,
t
);
Py_DECREF
(
t
);
t
=
PyInt_FromLong
(
ZMQ_UNSUBSCRIBE
);
PyDict_SetItemString
(
dict
,
"UNSUBSCRIBE"
,
t
);
Py_DECREF
(
t
);
t
=
PyInt_FromLong
(
ZMQ_RATE
);
PyDict_SetItemString
(
dict
,
"RATE"
,
t
);
Py_DECREF
(
t
);
Py_DECREF
(
t
);
t
=
PyInt_FromLong
(
ZMQ_RECOVERY_IVL
);
PyDict_SetItemString
(
dict
,
"RECOVERY_IVL"
,
t
);
Py_DECREF
(
t
);
t
=
PyInt_FromLong
(
ZMQ_MCAST_LOOP
);
PyDict_SetItemString
(
dict
,
"MCAST_LOOP"
,
t
);
Py_DECREF
(
t
);
}
}
#if defined _MSC_VER
#if defined _MSC_VER
...
...
ruby/rbzmq.cpp
View file @
a71d0028
...
@@ -282,6 +282,11 @@ extern "C" void Init_librbzmq ()
...
@@ -282,6 +282,11 @@ extern "C" void Init_librbzmq ()
rb_define_global_const
(
"SWAP"
,
INT2NUM
(
ZMQ_SWAP
));
rb_define_global_const
(
"SWAP"
,
INT2NUM
(
ZMQ_SWAP
));
rb_define_global_const
(
"AFFINITY"
,
INT2NUM
(
ZMQ_AFFINITY
));
rb_define_global_const
(
"AFFINITY"
,
INT2NUM
(
ZMQ_AFFINITY
));
rb_define_global_const
(
"IDENTITY"
,
INT2NUM
(
ZMQ_IDENTITY
));
rb_define_global_const
(
"IDENTITY"
,
INT2NUM
(
ZMQ_IDENTITY
));
rb_define_global_const
(
"SUBSCRIBE"
,
INT2NUM
(
ZMQ_SUBSCRIBE
));
rb_define_global_const
(
"UNSUBSCRIBE"
,
INT2NUM
(
ZMQ_UNSUBSCRIBE
));
rb_define_global_const
(
"RATE"
,
INT2NUM
(
ZMQ_RATE
));
rb_define_global_const
(
"RECOVERY_IVL"
,
INT2NUM
(
ZMQ_RECOVERY_IVL
));
rb_define_global_const
(
"MCAST_LOOP"
,
INT2NUM
(
ZMQ_MCAST_LOOP
));
rb_define_global_const
(
"NOBLOCK"
,
INT2NUM
(
ZMQ_NOBLOCK
));
rb_define_global_const
(
"NOBLOCK"
,
INT2NUM
(
ZMQ_NOBLOCK
));
rb_define_global_const
(
"NOFLUSH"
,
INT2NUM
(
ZMQ_NOFLUSH
));
rb_define_global_const
(
"NOFLUSH"
,
INT2NUM
(
ZMQ_NOFLUSH
));
...
...
src/socket_base.cpp
View file @
a71d0028
...
@@ -158,11 +158,18 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
...
@@ -158,11 +158,18 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
return
0
;
return
0
;
case
ZMQ_MCAST_LOOP
:
case
ZMQ_MCAST_LOOP
:
if
(
optvallen_
!=
sizeof
(
bool
))
{
if
(
optvallen_
!=
sizeof
(
int64_t
))
{
errno
=
EINVAL
;
return
-
1
;
}
if
((
int64_t
)
*
((
int64_t
*
)
optval_
)
==
0
||
(
int64_t
)
*
((
int64_t
*
)
optval_
)
==
1
)
{
options
.
use_multicast_loop
=
(
bool
)
*
((
int64_t
*
)
optval_
);
}
else
{
errno
=
EINVAL
;
errno
=
EINVAL
;
return
-
1
;
return
-
1
;
}
}
options
.
use_multicast_loop
=
optval_
;
return
0
;
return
0
;
default
:
default
:
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment