Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
87ccbb9f
Commit
87ccbb9f
authored
Sep 07, 2009
by
Martin Sustrik
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'master' of git@github.com:sustrik/zeromq2
parents
67253f31
d62c7423
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
382 additions
and
640 deletions
+382
-640
local_lat.rb
perf/ruby/local_lat.rb
+16
-50
local_thr.rb
perf/ruby/local_thr.rb
+28
-62
remote_lat.rb
perf/ruby/remote_lat.rb
+28
-36
remote_thr.rb
perf/ruby/remote_thr.rb
+17
-31
Makefile.am
ruby/Makefile.am
+1
-1
rbzmq.cpp
ruby/rbzmq.cpp
+292
-0
zmq.cpp
ruby/zmq.cpp
+0
-460
No files found.
perf/ruby/local_lat.rb
View file @
87ccbb9f
...
...
@@ -18,58 +18,24 @@
require
'librbzmq'
class
AssertionFailure
<
StandardError
if
ARGV
.
length
!=
3
puts
"usage: local_lat <bind-to> <message-size> <roundtrip-count>"
Process
.
exit
end
def
assert
(
bool
,
message
=
'assertion failure'
)
raise
AssertionFailure
.
new
(
message
)
unless
bool
end
if
ARGV
.
length
!=
4
puts
"usage: local_lat <in-interface> <out-interface> <message-size>
<roundtrip-count>"
Process
.
exit
end
in_interface
=
ARGV
[
0
]
out_interface
=
ARGV
[
1
]
message_size
=
ARGV
[
2
]
roundtrip_count
=
ARGV
[
3
]
# Print out the test parameters.
puts
"message size:
#{
message_size
}
[B]"
puts
"roundtrip count:
#{
roundtrip_count
}
"
# Create 0MQ transport.
rb_zmq
=
Zmq
.
new
()
# Create the wiring.
context
=
rb_zmq
.
context
(
1
,
1
)
in_socket
=
rb_zmq
.
socket
(
context
,
ZMQ_SUB
)
out_socket
=
rb_zmq
.
socket
(
context
,
ZMQ_PUB
)
# Bind.
rb_zmq
.
bind
(
in_socket
,
in_interface
.
to_s
)
rb_zmq
.
bind
(
out_socket
,
out_interface
.
to_s
)
# Create message data to send.
out_msg
=
rb_zmq
.
msg_init_size
(
message_size
.
to_i
)
# Get initial timestamp.
start_time
=
Time
.
now
bind_to
=
ARGV
[
0
]
message_size
=
ARGV
[
1
].
to_i
roundtrip_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
s
=
Socket
.
new
(
ctx
,
REP
);
s
.
bind
(
bind_to
);
for
i
in
0
...
roundtrip_count
do
msg
=
s
.
recv
(
0
)
s
.
send
(
msg
,
0
)
end
# The message loop.
for
i
in
0
...
roundtrip_count
.
to_i
do
rb_zmq
.
send
(
out_socket
,
out_msg
,
ZMQ_NOBLOCK
)
in_buf
=
rb_zmq
.
recv
(
in_socket
,
ZMQ_NOBLOCK
)
assert
(
rb_zmq
.
msg_size
(
in_buf
.
msg
)
==
message_size
.
to_i
)
end
sleep
1
# Get final timestamp.
end_time
=
Time
.
now
# Compute and print out the latency.
latency
=
(
end_time
.
to_f
-
start_time
.
to_f
)
*
1000000
/
roundtrip_count
.
to_i
/
2
puts
"Your average latency is "
+
"%0.2f"
%
latency
+
"[us]"
perf/ruby/local_thr.rb
View file @
87ccbb9f
...
...
@@ -17,74 +17,40 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
require
'librbzmq'
class
Context
end
class
Socket
if
ARGV
.
length
!=
3
puts
"usage: local_thr <bind-to> <message-size> <message-count>"
Process
.
exit
end
class
AssertionFailure
<
StandardError
end
bind_to
=
ARGV
[
0
]
message_size
=
ARGV
[
1
].
to_i
message_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
s
=
Socket
.
new
(
ctx
,
SUB
);
s
.
bind
(
bind_to
);
def
assert
(
bool
,
message
=
'assertion failure'
)
raise
AssertionFailure
.
new
(
message
)
unless
bool
msg
=
s
.
recv
(
0
)
start_time
=
Time
.
now
for
i
in
1
...
message_count
.
to_i
do
msg
=
s
.
recv
(
0
)
end
if
ARGV
.
length
!=
3
puts
"usage: local_thr <in-interface> <message-size>"
+
\
" <message-count>"
Process
.
exit
end
end_time
=
Time
.
now
in_interface
=
ARGV
[
0
]
message_size
=
ARGV
[
1
]
message_count
=
ARGV
[
2
]
# Print out the test parameters.
puts
"message size: "
+
message_size
.
to_s
+
" [B]"
puts
"message count: "
+
message_count
.
to_s
# Create 0MQ transport.
rb_zmq
=
Zmq
.
new
();
# Create context.
context
=
rb_zmq
.
context
(
1
,
1
);
# Create the socket.
in_socket
=
rb_zmq
.
socket
(
context
,
ZMQ_SUB
);
# Connect.
rb_zmq
.
connect
(
in_socket
,
in_interface
.
to_s
);
# Receive first message
data
=
rb_zmq
.
recv
(
in_socket
,
ZMQ_NOBLOCK
);
assert
(
rb_zmq
.
msg_size
(
data
.
msg
)
==
message_size
.
to_i
)
# Get initial timestamp.
start_time
=
Time
.
now
# The message loop.
for
i
in
0
...
message_count
.
to_i
-
1
do
data
=
rb_zmq
.
recv
(
in_socket
,
ZMQ_NOBLOCK
);
assert
(
rb_zmq
.
msg_size
(
data
.
msg
)
==
message_size
.
to_i
)
end
# Get terminal timestamp.
end_time
=
Time
.
now
# Compute and print out the throughput.
if
end_time
.
to_f
-
start_time
.
to_f
!=
0
message_throughput
=
message_count
.
to_i
/
(
end_time
.
to_f
-
start_time
.
to_f
);
else
message_throughput
=
message_count
.
to_i
end
megabit_throughput
=
message_throughput
.
to_f
*
message_size
.
to_i
*
8
/
1000000
;
puts
"Your average throughput is "
+
"%0.2f"
%
message_throughput
.
to_s
+
" [msg/s]"
puts
"Your average throughput is "
+
"%0.2f"
%
megabit_throughput
.
to_s
+
" [Mb/s]"
elapsed
=
(
end_time
.
to_f
-
start_time
.
to_f
)
*
1000000
if
elapsed
==
0
elapsed
=
1
end
throughput
=
message_count
*
1000000
/
elapsed
megabits
=
throughput
*
message_size
*
8
/
1000000
puts
"message size: %i [B]"
%
message_size
puts
"message count: %i"
%
message_count
puts
"mean throughput: %i [msg/s]"
%
throughput
puts
"mean throughput: %.3f [Mb/s]"
%
megabits
perf/ruby/remote_lat.rb
View file @
87ccbb9f
...
...
@@ -18,44 +18,36 @@
require
'librbzmq'
class
AssertionFailure
<
StandardError
if
ARGV
.
length
!=
3
puts
"usage: remote_lat <connect-to> <message-size> <roundtrip-count>"
Process
.
exit
end
def
assert
(
bool
,
message
=
'assertion failure'
)
raise
AssertionFailure
.
new
(
message
)
unless
bool
connect_to
=
ARGV
[
0
]
message_size
=
ARGV
[
1
].
to_i
roundtrip_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
s
=
Socket
.
new
(
ctx
,
REQ
);
s
.
connect
(
connect_to
);
msg
=
"
#{
'0'
*
message_size
}
"
start_time
=
Time
.
now
for
i
in
0
...
roundtrip_count
do
s
.
send
(
msg
,
0
)
msg
=
s
.
recv
(
0
)
end
if
ARGV
.
length
!=
4
puts
"usage: remote_lat <in-interface> <out-interface>"
+
\
" <message-size> <roundtrip-count>"
Process
.
exit
end
in_interface
=
ARGV
[
0
]
out_interface
=
ARGV
[
1
]
message_size
=
ARGV
[
2
]
roundtrip_count
=
ARGV
[
3
]
# Create 0MQ transport.
rb_zmq
=
Zmq
.
new
()
# Create the wiring.
context
=
rb_zmq
.
context
(
1
,
1
)
in_socket
=
rb_zmq
.
socket
(
context
,
ZMQ_SUB
)
out_socket
=
rb_zmq
.
socket
(
context
,
ZMQ_PUB
)
# Connect.
rb_zmq
.
connect
(
in_socket
,
in_interface
.
to_s
)
rb_zmq
.
connect
(
out_socket
,
out_interface
.
to_s
)
# The message loop.
for
i
in
0
...
roundtrip_count
.
to_i
do
data
=
rb_zmq
.
recv
(
in_socket
,
ZMQ_NOBLOCK
)
assert
(
rb_zmq
.
msg_size
(
data
.
msg
)
==
message_size
.
to_i
)
rb_zmq
.
send
(
out_socket
,
data
.
msg
,
ZMQ_NOBLOCK
)
end
# Wait till all messages are sent.
sleep
2
end_time
=
Time
.
now
elapsed
=
(
end_time
.
to_f
-
start_time
.
to_f
)
*
1000000
latency
=
elapsed
/
roundtrip_count
/
2
puts
"message size: %i [B]"
%
message_size
puts
"roundtrip count: %i"
%
roundtrip_count
puts
"mean latency: %.3f [us]"
%
latency
perf/ruby/remote_thr.rb
View file @
87ccbb9f
...
...
@@ -18,38 +18,24 @@
require
'librbzmq'
class
AssertionFailure
<
StandardError
if
ARGV
.
length
!=
3
puts
"usage: remote_thr <connect-to> <message-size> <message-count>"
Process
.
exit
end
connect_to
=
ARGV
[
0
]
message_size
=
ARGV
[
1
].
to_i
message_count
=
ARGV
[
2
].
to_i
ctx
=
Context
.
new
(
1
,
1
)
s
=
Socket
.
new
(
ctx
,
PUB
);
s
.
connect
(
connect_to
);
msg
=
"
#{
'0'
*
message_size
}
"
def
assert
(
bool
,
message
=
'assertion failure'
)
raise
AssertionFailure
.
new
(
message
)
unless
bool
for
i
in
0
...
message_count
do
s
.
send
(
msg
,
0
)
end
if
ARGV
.
length
!=
3
puts
"usage: remote_thr <out-interface> <message-size> <message-count>"
Process
.
exit
end
out_interface
=
ARGV
[
0
]
message_size
=
ARGV
[
1
]
message_count
=
ARGV
[
2
]
# Create 0MQ transport.
rb_zmq
=
Zmq
.
new
();
# Create the wiring.
context
=
rb_zmq
.
context
(
1
,
1
);
out_socket
=
rb_zmq
.
socket
(
context
,
ZMQ_PUB
);
rb_zmq
.
bind
(
out_socket
,
out_interface
.
to_s
);
# Create message data to send.
out_msg
=
rb_zmq
.
msg_init_size
(
message_size
.
to_s
);
# The message loop.
for
i
in
0
...
message_count
.
to_i
+
1
do
rb_zmq
.
send
(
out_socket
,
out_msg
,
ZMQ_NOBLOCK
);
end
# Wait till all messages are sent.
sleep
2
sleep
10
ruby/Makefile.am
View file @
87ccbb9f
...
...
@@ -3,7 +3,7 @@ INCLUDES = -I$(top_builddir) -I$(top_srcdir)/include -I$(top_builddir)/include
rblib_LTLIBRARIES
=
librbzmq.la
rblibdir
=
@RUBYDIR@
librbzmq_la_SOURCES
=
zmq.cpp
librbzmq_la_SOURCES
=
rb
zmq.cpp
librbzmq_la_LDFLAGS
=
-version-info
@RBLTVER@
librbzmq_la_CXXFLAGS
=
-Wall
-pedantic
-Werror
-Wno-long-long
...
...
ruby/rbzmq.cpp
0 → 100644
View file @
87ccbb9f
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <zmq.h>
#include <ruby.h>
static
void
context_free
(
void
*
ctx
)
{
if
(
ctx
)
{
int
rc
=
zmq_term
(
ctx
);
assert
(
rc
==
0
);
}
}
static
VALUE
context_alloc
(
VALUE
class_
)
{
return
rb_data_object_alloc
(
class_
,
NULL
,
0
,
context_free
);
}
static
VALUE
context_initialize
(
VALUE
self_
,
VALUE
app_threads_
,
VALUE
io_threads_
)
{
assert
(
!
DATA_PTR
(
self_
));
void
*
ctx
=
zmq_init
(
NUM2INT
(
app_threads_
),
NUM2INT
(
io_threads_
));
if
(
!
ctx
)
{
rb_raise
(
rb_eRuntimeError
,
strerror
(
errno
));
return
Qnil
;
}
DATA_PTR
(
self_
)
=
(
void
*
)
ctx
;
return
self_
;
}
static
void
socket_free
(
void
*
s
)
{
if
(
s
)
{
int
rc
=
zmq_close
(
s
);
assert
(
rc
==
0
);
}
}
static
VALUE
socket_alloc
(
VALUE
class_
)
{
return
rb_data_object_alloc
(
class_
,
NULL
,
0
,
socket_free
);
}
static
VALUE
socket_initialize
(
VALUE
self_
,
VALUE
context_
,
VALUE
type_
)
{
assert
(
!
DATA_PTR
(
self_
));
if
(
strcmp
(
rb_obj_classname
(
context_
),
"Context"
)
!=
0
)
{
rb_raise
(
rb_eArgError
,
"expected Context object"
);
return
Qnil
;
}
void
*
s
=
zmq_socket
(
DATA_PTR
(
context_
),
NUM2INT
(
type_
));
if
(
!
s
)
{
rb_raise
(
rb_eRuntimeError
,
strerror
(
errno
));
return
Qnil
;
}
DATA_PTR
(
self_
)
=
(
void
*
)
s
;
return
self_
;
}
/*
static VALUE rb_setsockopt (VALUE self_, VALUE socket_, VALUE option_,
VALUE optval_)
{
// Get the socket.
void* socket;
Data_Get_Struct (socket_, void*, socket);
int rc = 0;
if (TYPE (optval_) == T_STRING) {
// Forward the code to native 0MQ library.
rc = zmq_setsockopt (socket, NUM2INT (option_),
(void *) StringValueCStr (optval_), RSTRING_LEN (optval_));
}
else if (TYPE (optval_) == T_FLOAT) {
double optval = NUM2DBL (optval_);
// Forward the code to native 0MQ library.
rc = zmq_setsockopt (socket, NUM2INT (option_),
(void*) &optval, 8);
}
else if (TYPE (optval_) == T_FIXNUM) {
long optval = FIX2LONG (optval_);
// Forward the code to native 0MQ library.
rc = zmq_setsockopt (socket, NUM2INT (option_),
(void *) &optval, 4);
}
else if (TYPE (optval_) == T_BIGNUM) {
long optval = NUM2LONG (optval_);
// Forward the code to native 0MQ library.
rc = zmq_setsockopt (socket, NUM2INT (option_),
(void *) &optval, 4);
}
else if (TYPE (optval_) == T_ARRAY) {
// Forward the code to native 0MQ library.
rc = zmq_setsockopt (socket, NUM2INT (option_),
(void *) RARRAY_PTR (optval_), RARRAY_LEN (optval_));
}
else if (TYPE (optval_) == T_STRUCT) {
// Forward the code to native 0MQ library.
rc = zmq_setsockopt (socket, NUM2INT (option_),
(void *) RSTRUCT_PTR (optval_), RSTRUCT_LEN (optval_));
}
else
rb_raise(rb_eRuntimeError, "Unknown type");
assert (rc == 0);
return self_;
}
*/
static
VALUE
socket_bind
(
VALUE
self_
,
VALUE
addr_
)
{
assert
(
DATA_PTR
(
self_
));
int
rc
=
zmq_bind
(
DATA_PTR
(
self_
),
rb_string_value_cstr
(
&
addr_
));
if
(
rc
!=
0
)
{
rb_raise
(
rb_eRuntimeError
,
strerror
(
errno
));
return
Qnil
;
}
return
Qnil
;
}
static
VALUE
socket_connect
(
VALUE
self_
,
VALUE
addr_
)
{
assert
(
DATA_PTR
(
self_
));
int
rc
=
zmq_connect
(
DATA_PTR
(
self_
),
rb_string_value_cstr
(
&
addr_
));
if
(
rc
!=
0
)
{
rb_raise
(
rb_eRuntimeError
,
strerror
(
errno
));
return
Qnil
;
}
return
Qnil
;
}
static
VALUE
socket_send
(
VALUE
self_
,
VALUE
msg_
,
VALUE
flags_
)
{
assert
(
DATA_PTR
(
self_
));
Check_Type
(
msg_
,
T_STRING
);
zmq_msg_t
msg
;
int
rc
=
zmq_msg_init_size
(
&
msg
,
RSTRING_LEN
(
msg_
));
if
(
rc
!=
0
)
{
rb_raise
(
rb_eRuntimeError
,
strerror
(
errno
));
return
Qnil
;
}
memcpy
(
zmq_msg_data
(
&
msg
),
RSTRING_PTR
(
msg_
),
RSTRING_LEN
(
msg_
));
rc
=
zmq_send
(
DATA_PTR
(
self_
),
&
msg
,
NUM2INT
(
flags_
));
if
(
rc
!=
0
&&
errno
==
EAGAIN
)
{
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
return
Qfalse
;
}
if
(
rc
!=
0
)
{
rb_raise
(
rb_eRuntimeError
,
strerror
(
errno
));
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
return
Qnil
;
}
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
return
Qtrue
;
}
static
VALUE
socket_flush
(
VALUE
self_
)
{
assert
(
DATA_PTR
(
self_
));
int
rc
=
zmq_flush
(
DATA_PTR
(
self_
));
if
(
rc
!=
0
)
{
rb_raise
(
rb_eRuntimeError
,
strerror
(
errno
));
return
Qnil
;
}
return
Qnil
;
}
static
VALUE
socket_recv
(
VALUE
self_
,
VALUE
flags_
)
{
assert
(
DATA_PTR
(
self_
));
zmq_msg_t
msg
;
int
rc
=
zmq_msg_init
(
&
msg
);
assert
(
rc
==
0
);
rc
=
zmq_recv
(
DATA_PTR
(
self_
),
&
msg
,
NUM2INT
(
flags_
));
if
(
rc
!=
0
&&
errno
==
EAGAIN
)
{
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
return
Qnil
;
}
if
(
rc
!=
0
)
{
rb_raise
(
rb_eRuntimeError
,
strerror
(
errno
));
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
return
Qnil
;
}
VALUE
message
=
rb_str_new
((
char
*
)
zmq_msg_data
(
&
msg
),
zmq_msg_size
(
&
msg
));
rc
=
zmq_msg_close
(
&
msg
);
assert
(
rc
==
0
);
return
message
;
}
extern
"C"
void
Init_librbzmq
()
{
VALUE
context_type
=
rb_define_class
(
"Context"
,
rb_cObject
);
rb_define_alloc_func
(
context_type
,
context_alloc
);
rb_define_method
(
context_type
,
"initialize"
,
(
VALUE
(
*
)(...))
context_initialize
,
2
);
VALUE
socket_type
=
rb_define_class
(
"Socket"
,
rb_cObject
);
rb_define_alloc_func
(
socket_type
,
socket_alloc
);
rb_define_method
(
socket_type
,
"initialize"
,
(
VALUE
(
*
)(...))
socket_initialize
,
2
);
// rb_define_method (socket_type, "setsockopt",
// (VALUE(*)(...)) socket_setsockopt, 2);
rb_define_method
(
socket_type
,
"bind"
,
(
VALUE
(
*
)(...))
socket_bind
,
1
);
rb_define_method
(
socket_type
,
"connect"
,
(
VALUE
(
*
)(...))
socket_connect
,
1
);
rb_define_method
(
socket_type
,
"send"
,
(
VALUE
(
*
)(...))
socket_send
,
2
);
rb_define_method
(
socket_type
,
"flush"
,
(
VALUE
(
*
)(...))
socket_flush
,
0
);
rb_define_method
(
socket_type
,
"recv"
,
(
VALUE
(
*
)(...))
socket_recv
,
1
);
rb_define_global_const
(
"HWM"
,
INT2NUM
(
ZMQ_HWM
));
rb_define_global_const
(
"LWM"
,
INT2NUM
(
ZMQ_LWM
));
rb_define_global_const
(
"SWAP"
,
INT2NUM
(
ZMQ_SWAP
));
rb_define_global_const
(
"MASK"
,
INT2NUM
(
ZMQ_MASK
));
rb_define_global_const
(
"AFFINITY"
,
INT2NUM
(
ZMQ_AFFINITY
));
rb_define_global_const
(
"IDENTITY"
,
INT2NUM
(
ZMQ_IDENTITY
));
rb_define_global_const
(
"NOBLOCK"
,
INT2NUM
(
ZMQ_NOBLOCK
));
rb_define_global_const
(
"NOFLUSH"
,
INT2NUM
(
ZMQ_NOFLUSH
));
rb_define_global_const
(
"P2P"
,
INT2NUM
(
ZMQ_P2P
));
rb_define_global_const
(
"SUB"
,
INT2NUM
(
ZMQ_SUB
));
rb_define_global_const
(
"PUB"
,
INT2NUM
(
ZMQ_PUB
));
rb_define_global_const
(
"REQ"
,
INT2NUM
(
ZMQ_REQ
));
rb_define_global_const
(
"REP"
,
INT2NUM
(
ZMQ_REP
));
}
ruby/zmq.cpp
deleted
100644 → 0
View file @
67253f31
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <zmq.h>
#include <zmq/err.hpp>
#include <ruby.h>
// Class rb_zmq.
static
VALUE
rb_zmq
;
// Structure to return received data.
static
VALUE
rb_data
;
static
void
rb_free
(
void
*
p
)
{
}
static
VALUE
rb_alloc
(
VALUE
self_
)
{
VALUE
obj
;
obj
=
Data_Wrap_Struct
(
self_
,
0
,
rb_free
,
NULL
);
return
obj
;
}
static
VALUE
rb_msg_init
(
VALUE
self_
)
{
zmq_msg_t
*
msg
;
msg
=
new
zmq_msg_t
;
VALUE
obj
;
int
rc
=
zmq_msg_init
(
msg
);
if
(
rc
==
-
1
)
{
assert
(
errno
==
ENOMEM
);
rb_raise
(
rb_eRuntimeError
,
"Out of memory"
);
}
obj
=
Data_Wrap_Struct
(
rb_zmq
,
0
,
rb_free
,
msg
);
return
obj
;
}
static
VALUE
rb_msg_init_size
(
VALUE
self_
,
VALUE
size_
)
{
zmq_msg_t
*
msg
;
msg
=
new
zmq_msg_t
;
VALUE
obj
;
// Forward the code to zmq library.
int
rc
=
zmq_msg_init_size
(
msg
,
NUM2INT
(
size_
));
if
(
rc
==
-
1
)
{
assert
(
errno
==
ENOMEM
);
rb_raise
(
rb_eRuntimeError
,
"Out of memory"
);
}
obj
=
Data_Wrap_Struct
(
rb_zmq
,
0
,
rb_free
,
msg
);
return
obj
;
}
static
VALUE
rb_msg_init_data
(
VALUE
self_
,
VALUE
data_
,
VALUE
size_
)
{
// Get the message.
zmq_msg_t
*
msg
;
VALUE
obj
;
msg
=
new
zmq_msg_t
;
// Forward the code to zmq library.
int
rc
=
zmq_msg_init_data
(
msg
,
StringValueCStr
(
data_
),
NUM2INT
(
size_
),
rb_free
);
assert
(
rc
==
0
);
obj
=
Data_Wrap_Struct
(
rb_zmq
,
0
,
rb_free
,
msg
);
return
obj
;
}
static
VALUE
rb_msg_close
(
VALUE
self_
,
VALUE
msg_
)
{
// Get the message.
zmq_msg_t
*
msg
;
Data_Get_Struct
(
msg_
,
zmq_msg_t
,
msg
);
// Forward the code to zmq library.
int
rc
=
zmq_close
(
msg
);
assert
(
rc
==
0
);
return
self_
;
}
static
VALUE
rb_msg_move
(
VALUE
self_
,
VALUE
src_
)
{
// Get the message.
zmq_msg_t
*
src
;
Data_Get_Struct
(
src_
,
zmq_msg_t
,
src
);
zmq_msg_t
*
dest
;
dest
=
new
zmq_msg_t
;
VALUE
obj
;
// Forward the code to zmq library.
int
rc
=
zmq_msg_move
(
dest
,
src
);
assert
(
rc
==
0
);
obj
=
Data_Wrap_Struct
(
rb_zmq
,
0
,
rb_free
,
dest
);
return
obj
;
}
static
VALUE
rb_msg_copy
(
VALUE
self_
,
VALUE
src_
)
{
// Get the message.
zmq_msg_t
*
src
;
Data_Get_Struct
(
src_
,
zmq_msg_t
,
src
);
zmq_msg_t
*
dest
;
dest
=
new
zmq_msg_t
;
VALUE
obj
;
// Forward the code to zmq library.
int
rc
=
zmq_msg_copy
(
dest
,
src
);
assert
(
rc
==
0
);
obj
=
Data_Wrap_Struct
(
rb_zmq
,
0
,
rb_free
,
dest
);
return
obj
;
}
static
VALUE
rb_msg_data
(
VALUE
self_
,
VALUE
msg_
)
{
// Get the message.
zmq_msg_t
*
msg
;
Data_Get_Struct
(
msg_
,
zmq_msg_t
,
msg
);
const
char
*
data
;
// Forward the code to zmq library.
data
=
(
const
char
*
)
zmq_msg_data
(
msg
);
return
rb_str_new
(
data
,
zmq_msg_size
(
msg
));
}
static
VALUE
rb_msg_size
(
VALUE
self_
,
VALUE
msg_
)
{
// Get the message.
zmq_msg_t
*
msg
;
Data_Get_Struct
(
msg_
,
zmq_msg_t
,
msg
);
// Forward the code to zmq library.
return
INT2NUM
(
zmq_msg_size
(
msg
));
}
static
VALUE
rb_msg_type
(
VALUE
self_
,
VALUE
msg_
)
{
// Get the message.
zmq_msg_t
*
msg
;
Data_Get_Struct
(
msg_
,
zmq_msg_t
,
msg
);
// Forward the code to zmq library.
return
INT2NUM
(
zmq_msg_type
(
msg
));
}
static
VALUE
rb_init
(
VALUE
self_
)
{
return
self_
;
}
static
VALUE
rb_context
(
VALUE
self_
,
VALUE
app_threads_
,
VALUE
io_threads_
)
{
void
*
context
;
VALUE
obj
;
// Forward the code to zmq library.
context
=
zmq_init
(
NUM2INT
(
app_threads_
),
NUM2INT
(
io_threads_
));
if
(
context
==
NULL
)
{
assert
(
errno
==
EINVAL
);
rb_raise
(
rb_eRuntimeError
,
"Invalid argument"
);
}
obj
=
Data_Wrap_Struct
(
rb_zmq
,
0
,
free
,
context
);
return
self_
;
}
static
VALUE
rb_term
(
VALUE
self_
,
VALUE
context_
)
{
// Get the context.
void
*
context
;
Data_Get_Struct
(
context_
,
void
*
,
context
);
// Forward the code to zmq library.
int
rc
=
zmq_term
((
void
*
)
context
);
assert
(
rc
==
0
);
return
self_
;
}
static
VALUE
rb_socket
(
VALUE
self_
,
VALUE
context_
,
VALUE
type_
)
{
// Get the context.
void
*
context
;
Data_Get_Struct
(
context_
,
void
*
,
context
);
void
*
socket
=
NULL
;
VALUE
obj
;
// Forward the call to native 0MQ library.
socket
=
zmq_socket
(
context
,
NUM2INT
(
type_
));
if
(
socket
==
NULL
)
{
assert
(
errno
==
EMFILE
||
errno
==
EINVAL
);
if
(
errno
==
EMFILE
)
rb_raise
(
rb_eRuntimeError
,
"Too many threads"
);
else
rb_raise
(
rb_eRuntimeError
,
"Invalid argument"
);
}
obj
=
Data_Wrap_Struct
(
rb_zmq
,
0
,
free
,
socket
);
return
obj
;
}
static
VALUE
rb_close
(
VALUE
self_
,
VALUE
socket_
)
{
// Get the message.
void
*
socket
;
Data_Get_Struct
(
socket_
,
void
*
,
socket
);
// Forward the call to native 0MQ library.
int
rc
=
zmq_close
(
socket
);
assert
(
rc
==
0
);
return
self_
;
}
static
VALUE
rb_setsockopt
(
VALUE
self_
,
VALUE
socket_
,
VALUE
option_
,
VALUE
optval_
)
{
// Get the socket.
void
*
socket
;
Data_Get_Struct
(
socket_
,
void
*
,
socket
);
int
rc
=
0
;
if
(
TYPE
(
optval_
)
==
T_STRING
)
{
// Forward the code to native 0MQ library.
rc
=
zmq_setsockopt
(
socket
,
NUM2INT
(
option_
),
(
void
*
)
StringValueCStr
(
optval_
),
RSTRING_LEN
(
optval_
));
}
else
if
(
TYPE
(
optval_
)
==
T_FLOAT
)
{
double
optval
=
NUM2DBL
(
optval_
);
// Forward the code to native 0MQ library.
rc
=
zmq_setsockopt
(
socket
,
NUM2INT
(
option_
),
(
void
*
)
&
optval
,
8
);
}
else
if
(
TYPE
(
optval_
)
==
T_FIXNUM
)
{
long
optval
=
FIX2LONG
(
optval_
);
// Forward the code to native 0MQ library.
rc
=
zmq_setsockopt
(
socket
,
NUM2INT
(
option_
),
(
void
*
)
&
optval
,
4
);
}
else
if
(
TYPE
(
optval_
)
==
T_BIGNUM
)
{
long
optval
=
NUM2LONG
(
optval_
);
// Forward the code to native 0MQ library.
rc
=
zmq_setsockopt
(
socket
,
NUM2INT
(
option_
),
(
void
*
)
&
optval
,
4
);
}
else
if
(
TYPE
(
optval_
)
==
T_ARRAY
)
{
// Forward the code to native 0MQ library.
rc
=
zmq_setsockopt
(
socket
,
NUM2INT
(
option_
),
(
void
*
)
RARRAY_PTR
(
optval_
),
RARRAY_LEN
(
optval_
));
}
else
if
(
TYPE
(
optval_
)
==
T_STRUCT
)
{
// Forward the code to native 0MQ library.
rc
=
zmq_setsockopt
(
socket
,
NUM2INT
(
option_
),
(
void
*
)
RSTRUCT_PTR
(
optval_
),
RSTRUCT_LEN
(
optval_
));
}
else
rb_raise
(
rb_eRuntimeError
,
"Unknown type"
);
assert
(
rc
==
0
);
return
self_
;
}
static
VALUE
rb_bind
(
VALUE
self_
,
VALUE
socket_
,
VALUE
addr_
)
{
// Get the socket.
void
*
socket
;
Data_Get_Struct
(
socket_
,
void
*
,
socket
);
// Forward the code to native 0MQ library.
int
rc
=
zmq_bind
(
socket
,
StringValueCStr
(
addr_
));
if
(
rc
==
-
1
)
{
assert
(
errno
==
EINVAL
||
errno
==
EADDRINUSE
);
if
(
errno
==
EINVAL
)
rb_raise
(
rb_eRuntimeError
,
"Invalid argument"
);
else
rb_raise
(
rb_eRuntimeError
,
"Address in use"
);
}
return
self_
;
}
static
VALUE
rb_connect
(
VALUE
self_
,
VALUE
socket_
,
VALUE
addr_
)
{
// Get the socket.
void
*
socket
;
Data_Get_Struct
(
socket_
,
void
*
,
socket
);
// Forward the code to native 0MQ library.
int
rc
=
zmq_connect
(
socket
,
StringValueCStr
(
addr_
));
if
(
rc
==
-
1
)
{
assert
(
errno
==
EINVAL
||
errno
==
EADDRINUSE
);
if
(
errno
==
EINVAL
)
rb_raise
(
rb_eRuntimeError
,
"Invalid argument"
);
else
rb_raise
(
rb_eRuntimeError
,
"Address in use"
);
}
return
self_
;
}
static
VALUE
rb_send
(
VALUE
self_
,
VALUE
socket_
,
VALUE
msg_
,
VALUE
flags_
)
{
// Get the socket.
void
*
socket
;
Data_Get_Struct
(
socket_
,
void
*
,
socket
);
// Get the message.
zmq_msg_t
*
msg
;
Data_Get_Struct
(
msg_
,
zmq_msg_t
,
msg
);
// Forward the code to native 0MQ library.
int
rc
=
zmq_send
(
socket
,
msg
,
NUM2INT
(
flags_
));
assert
(
rc
==
0
||
(
rc
==
-
1
&&
errno
==
EAGAIN
));
return
INT2NUM
(
rc
);
}
static
VALUE
rb_flush
(
VALUE
self_
,
VALUE
socket_
)
{
// Get the socket.
void
*
socket
;
Data_Get_Struct
(
socket_
,
void
*
,
socket
);
// Forward the code to native 0MQ library.
int
rc
=
zmq_flush
(
socket
);
assert
(
rc
==
0
);
return
self_
;
}
static
VALUE
rb_recv
(
VALUE
self_
,
VALUE
socket_
,
VALUE
flags_
)
{
// Get the socket.
void
*
socket
;
Data_Get_Struct
(
socket_
,
void
*
,
socket
);
// Get the message.
zmq_msg_t
*
msg
;
msg
=
new
zmq_msg_t
;
VALUE
obj
;
// Forward the code to native 0MQ library.
int
rc
=
zmq_recv
(
socket
,
msg
,
NUM2INT
(
flags_
));
assert
(
rc
==
0
||
(
rc
==
-
1
&&
errno
==
EAGAIN
));
obj
=
Data_Wrap_Struct
(
rb_data
,
0
,
rb_free
,
msg
);
return
rb_struct_new
(
obj
,
rc
,
NULL
);
}
extern
"C"
void
Init_librbzmq
()
{
// Define the rb_zmq class.
rb_zmq
=
rb_define_class
(
"Zmq"
,
rb_cObject
);
// Define allocation function for rb_zmq class.
rb_define_alloc_func
(
rb_zmq
,
rb_alloc
);
// Parameters: <name_of_class> <name_of_method_aaccessible_from_ruby>
// <name_of_method_in_the_class> <number_of_arguments>
// number of arguments is alqays except for the 'VALUE self_' argument
// (this pointer).
rb_define_method
(
rb_zmq
,
"msg_init"
,
(
VALUE
(
*
)(...))
rb_msg_init
,
0
);
rb_define_method
(
rb_zmq
,
"msg_init_size"
,
(
VALUE
(
*
)(...))
rb_msg_init_size
,
1
);
rb_define_method
(
rb_zmq
,
"msg_init_data"
,
(
VALUE
(
*
)(...))
rb_msg_init_data
,
3
);
rb_define_method
(
rb_zmq
,
"msg_close"
,
(
VALUE
(
*
)(...))
rb_msg_close
,
1
);
rb_define_method
(
rb_zmq
,
"msg_move"
,
(
VALUE
(
*
)(...))
rb_msg_move
,
1
);
rb_define_method
(
rb_zmq
,
"msg_copy"
,
(
VALUE
(
*
)(...))
rb_msg_copy
,
1
);
rb_define_method
(
rb_zmq
,
"msg_data"
,
(
VALUE
(
*
)(...))
rb_msg_data
,
1
);
rb_define_method
(
rb_zmq
,
"msg_size"
,
(
VALUE
(
*
)(...))
rb_msg_size
,
1
);
rb_define_method
(
rb_zmq
,
"msg_type"
,
(
VALUE
(
*
)(...))
rb_msg_type
,
1
);
rb_define_method
(
rb_zmq
,
"initialize"
,
(
VALUE
(
*
)(...))
rb_init
,
0
);
rb_define_method
(
rb_zmq
,
"term"
,
(
VALUE
(
*
)(...))
rb_term
,
1
);
rb_define_method
(
rb_zmq
,
"free"
,
(
VALUE
(
*
)(...))
rb_free
,
0
);
rb_define_method
(
rb_zmq
,
"context"
,
(
VALUE
(
*
)(...))
rb_context
,
2
);
rb_define_method
(
rb_zmq
,
"socket"
,
(
VALUE
(
*
)(...))
rb_socket
,
2
);
rb_define_method
(
rb_zmq
,
"close"
,
(
VALUE
(
*
)(...))
rb_close
,
1
);
rb_define_method
(
rb_zmq
,
"setsockopt"
,
(
VALUE
(
*
)(...))
rb_setsockopt
,
3
);
rb_define_method
(
rb_zmq
,
"bind"
,
(
VALUE
(
*
)(...))
rb_bind
,
2
);
rb_define_method
(
rb_zmq
,
"connect"
,
(
VALUE
(
*
)(...))
rb_connect
,
2
);
rb_define_method
(
rb_zmq
,
"send"
,
(
VALUE
(
*
)(...))
rb_send
,
3
);
rb_define_method
(
rb_zmq
,
"flush"
,
(
VALUE
(
*
)(...))
rb_flush
,
1
);
rb_define_method
(
rb_zmq
,
"recv"
,
(
VALUE
(
*
)(...))
rb_recv
,
2
);
// Define structure to hold data that are returned from receive function.
rb_data
=
rb_struct_define
(
NULL
,
"msg"
,
"rc"
,
NULL
);
rb_define_const
(
rb_zmq
,
"DATA"
,
rb_data
);
// Define global constants.
rb_define_global_const
(
"ZMQ_MAX_VSM_SIZE"
,
INT2NUM
(
ZMQ_MAX_VSM_SIZE
));
rb_define_global_const
(
"ZMQ_GAP"
,
INT2NUM
(
ZMQ_GAP
));
rb_define_global_const
(
"ZMQ_DELIMITER"
,
INT2NUM
(
ZMQ_DELIMITER
));
rb_define_global_const
(
"ZMQ_VSM"
,
INT2NUM
(
ZMQ_VSM
));
rb_define_global_const
(
"ZMQ_HWM"
,
INT2NUM
(
ZMQ_HWM
));
rb_define_global_const
(
"ZMQ_SWAP"
,
INT2NUM
(
ZMQ_SWAP
));
rb_define_global_const
(
"ZMQ_MASK"
,
INT2NUM
(
ZMQ_MASK
));
rb_define_global_const
(
"ZMQ_AFFINITY"
,
INT2NUM
(
ZMQ_AFFINITY
));
rb_define_global_const
(
"ZMQ_IDENTITY"
,
INT2NUM
(
ZMQ_IDENTITY
));
rb_define_global_const
(
"ZMQ_NOBLOCK"
,
INT2NUM
(
ZMQ_NOBLOCK
));
rb_define_global_const
(
"ZMQ_NOFLUSH"
,
INT2NUM
(
ZMQ_NOFLUSH
));
rb_define_global_const
(
"ZMQ_P2P"
,
INT2NUM
(
ZMQ_P2P
));
rb_define_global_const
(
"ZMQ_SUB"
,
INT2NUM
(
ZMQ_SUB
));
rb_define_global_const
(
"ZMQ_PUB"
,
INT2NUM
(
ZMQ_PUB
));
rb_define_global_const
(
"ZMQ_REQ"
,
INT2NUM
(
ZMQ_REQ
));
rb_define_global_const
(
"ZMQ_REP"
,
INT2NUM
(
ZMQ_REP
));
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment