Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
f40ce4e5
Commit
f40ce4e5
authored
May 15, 2010
by
Jon Dyte
Committed by
Martin Sustrik
May 15, 2010
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
single 0MQ daemon (zmqd) - initial version
parent
6705a3d5
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
379 additions
and
3 deletions
+379
-3
Makefile.am
Makefile.am
+2
-2
configure.in
configure.in
+1
-1
Makefile.am
zmqd/Makefile.am
+8
-0
zmqd.cpp
zmqd/zmqd.cpp
+368
-0
No files found.
Makefile.am
View file @
f40ce4e5
ACLOCAL_AMFLAGS
=
-I
config
SUBDIRS
=
src doc perf devices
DIST_SUBDIRS
=
src doc perf devices builds/msvc
SUBDIRS
=
src doc perf devices
zmqd
DIST_SUBDIRS
=
src doc perf devices
zmqd
builds/msvc
EXTRA_DIST
=
\
$(top_srcdir)
/foreign/openpgm/@pgm_basename@.tar.gz
\
...
...
configure.in
View file @
f40ce4e5
...
...
@@ -382,7 +382,7 @@ AC_OUTPUT(Makefile src/Makefile doc/Makefile
perf/Makefile src/libzmq.pc \
devices/Makefile devices/zmq_forwarder/Makefile \
devices/zmq_streamer/Makefile devices/zmq_queue/Makefile \
builds/msvc/Makefile)
zmqd/Makefile
builds/msvc/Makefile)
# On Linux patch libtool to delete hardcoded paths (rpath).
case "${host_os}" in
...
...
zmqd/Makefile.am
0 → 100644
View file @
f40ce4e5
INCLUDES
=
-I
$(top_srcdir)
/include
bin_PROGRAMS
=
zmqd
zmqd_LDADD
=
$(top_builddir)
/src/libzmq.la
zmqd_SOURCES
=
zmqd.cpp
zmqd/zmqd.cpp
0 → 100644
View file @
f40ce4e5
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <vector>
#include <string>
#include <memory>
#include "../include/zmq.hpp"
#include "../foreign/xmlParser/xmlParser.cpp"
namespace
{
class
device_cfg_t
{
enum
endpoint_direction
{
connect
,
bind
};
typedef
std
::
pair
<
endpoint_direction
,
std
::
string
>
sock_details_t
;
typedef
std
::
vector
<
sock_details_t
>
vsock_dets_t
;
public
:
explicit
device_cfg_t
(
int
type
)
:
device_type
(
type
)
,
context
(
0
)
,
in_socket
(
0
),
out_socket
(
0
)
{
}
virtual
~
device_cfg_t
()
{
delete
out_socket
;
delete
in_socket
;
}
bool
init
(
XMLNode
&
device
)
{
XMLNode
in_node
=
device
.
getChildNode
(
"in"
);
if
(
in_node
.
isEmpty
())
{
fprintf
(
stderr
,
"'in' node is missing in the configuration file
\n
"
);
return
false
;
}
XMLNode
out_node
=
device
.
getChildNode
(
"out"
);
if
(
out_node
.
isEmpty
())
{
fprintf
(
stderr
,
"'out' node is missing in the configuration file
\n
"
);
return
false
;
}
if
(
!
process_node
(
in_node
,
true
,
device_cfg_t
::
bind
))
return
false
;
if
(
!
process_node
(
in_node
,
true
,
device_cfg_t
::
connect
))
return
false
;
if
(
!
process_node
(
out_node
,
false
,
device_cfg_t
::
bind
))
return
false
;
if
(
!
process_node
(
out_node
,
false
,
device_cfg_t
::
connect
))
return
false
;
return
true
;
}
void
set_context
(
zmq
::
context_t
*
context_
)
{
context
=
context_
;
}
zmq
::
context_t
*
get_context
()
const
{
return
context
;
}
virtual
bool
make_sockets
()
=
0
;
bool
set_up_connections
()
{
for
(
vsock_dets_t
::
const_iterator
i
=
in
.
begin
()
;
i
!=
in
.
end
();
++
i
)
{
switch
(
i
->
first
)
{
case
device_cfg_t
:
:
connect
:
in_socket
->
connect
(
i
->
second
.
c_str
());
break
;
case
device_cfg_t
:
:
bind
:
in_socket
->
bind
(
i
->
second
.
c_str
());
}
}
for
(
vsock_dets_t
::
const_iterator
i
=
out
.
begin
()
;
i
!=
out
.
end
();
++
i
)
{
switch
(
i
->
first
)
{
case
device_cfg_t
:
:
connect
:
out_socket
->
connect
(
i
->
second
.
c_str
());
break
;
case
device_cfg_t
:
:
bind
:
out_socket
->
bind
(
i
->
second
.
c_str
());
}
}
return
true
;
}
void
run
()
{
zmq
::
device
(
device_type
,
*
in_socket
,
*
out_socket
);
}
protected
:
bool
make_sockets
(
int
in_type
,
int
out_type
)
{
in_socket
=
new
(
std
::
nothrow
)
zmq
::
socket_t
(
*
context
,
in_type
);
if
(
!
in_socket
)
return
false
;
out_socket
=
new
(
std
::
nothrow
)
zmq
::
socket_t
(
*
context
,
out_type
);
if
(
!
out_socket
)
{
return
false
;
}
return
true
;
}
int
process_node
(
XMLNode
&
target_
,
bool
in_
,
device_cfg_t
::
endpoint_direction
ept_
)
{
const
char
*
name
=
(
ept_
==
device_cfg_t
::
connect
)
?
"connect"
:
"bind"
;
int
n
=
0
;
while
(
true
)
{
XMLNode
connect
=
target_
.
getChildNode
(
name
,
n
);
if
(
connect
.
isEmpty
())
break
;
const
char
*
addr
=
connect
.
getAttribute
(
"addr"
);
if
(
!
addr
)
{
fprintf
(
stderr
,
"'%s' node is missing 'addr' attribute
\n
"
,
name
);
return
0
;
}
if
(
in_
)
in
.
push_back
(
sock_details_t
(
ept_
,
addr
));
else
out
.
push_back
(
sock_details_t
(
ept_
,
addr
));
n
++
;
}
return
1
;
}
protected
:
int
device_type
;
zmq
::
context_t
*
context
;
vsock_dets_t
in
;
vsock_dets_t
out
;
zmq
::
socket_t
*
in_socket
;
zmq
::
socket_t
*
out_socket
;
private
:
void
operator
=
(
device_cfg_t
const
&
);
device_cfg_t
(
device_cfg_t
const
&
);
};
class
queue_device_cfg_t
:
public
device_cfg_t
{
public
:
queue_device_cfg_t
()
:
device_cfg_t
(
ZMQ_QUEUE
)
{}
virtual
bool
make_sockets
(){
return
device_cfg_t
::
make_sockets
(
ZMQ_XREP
,
ZMQ_XREQ
);
}
};
class
streamer_device_cfg_t
:
public
device_cfg_t
{
public
:
streamer_device_cfg_t
()
:
device_cfg_t
(
ZMQ_STREAMER
)
{}
virtual
bool
make_sockets
()
{
return
device_cfg_t
::
make_sockets
(
ZMQ_UPSTREAM
,
ZMQ_DOWNSTREAM
);
}
};
class
forwarder_device_cfg_t
:
public
device_cfg_t
{
public
:
forwarder_device_cfg_t
()
:
device_cfg_t
(
ZMQ_FORWARDER
)
{}
virtual
bool
make_sockets
()
{
if
(
!
device_cfg_t
::
make_sockets
(
ZMQ_SUB
,
ZMQ_PUB
)
)
{
return
false
;
}
in_socket
->
setsockopt
(
ZMQ_SUBSCRIBE
,
""
,
0
);
return
true
;
}
};
device_cfg_t
*
make_device_config
(
XMLNode
&
device
)
{
const
char
*
dev_type
=
device
.
getAttribute
(
"type"
);
if
(
!
dev_type
)
{
fprintf
(
stderr
,
"'device' node is missing 'type' attribute
\n
"
);
return
NULL
;
}
if
(
strcmp
(
dev_type
,
"forwarder"
)
==
0
)
{
return
new
(
std
::
nothrow
)
forwarder_device_cfg_t
;
}
else
if
(
strcmp
(
dev_type
,
"streamer"
)
==
0
)
{
return
new
(
std
::
nothrow
)
streamer_device_cfg_t
;
}
else
if
(
strcmp
(
dev_type
,
"queue"
)
==
0
)
{
return
new
(
std
::
nothrow
)
queue_device_cfg_t
;
}
fprintf
(
stderr
,
"type attribute in the device configuration file "
"should be named 'forwarder', 'streamer' or 'queue'
\n
"
);
return
NULL
;
}
extern
"C"
void
*
worker_function
(
void
*
arg
)
{
if
(
!
arg
)
{
fprintf
(
stderr
,
"arg is null, returning
\n
"
);
return
0
;
}
std
::
auto_ptr
<
device_cfg_t
>
cfg
(
(
device_cfg_t
*
)
arg
);
zmq
::
context_t
*
ctx
=
cfg
->
get_context
();
if
(
!
ctx
)
{
fprintf
(
stderr
,
"no context, returning
\n
"
);
return
0
;
}
if
(
!
cfg
->
make_sockets
())
{
fprintf
(
stderr
,
"failed to make sockets, returning
\n
"
);
return
0
;
}
if
(
!
cfg
->
set_up_connections
())
{
fprintf
(
stderr
,
"failed to set up connections, returning
\n
"
);
return
0
;
}
cfg
->
run
();
return
0
;
}
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
!=
2
)
{
fprintf
(
stderr
,
"usage: zmqd <config-file>
\n
"
);
return
1
;
}
XMLNode
root
=
XMLNode
::
parseFile
(
argv
[
1
]);
if
(
root
.
isEmpty
())
{
fprintf
(
stderr
,
"configuration file not found or not an XML file
\n
"
);
return
1
;
}
if
(
strcmp
(
root
.
getName
(),
"config"
)
!=
0
)
{
fprintf
(
stderr
,
"root element in the configuration file should be "
"named 'config'
\n
"
);
return
1
;
}
std
::
vector
<
device_cfg_t
*>
vdev
;
while
(
true
)
{
XMLNode
device
=
root
.
getChildNode
(
"device"
,
vdev
.
size
());
if
(
device
.
isEmpty
())
break
;
device_cfg_t
*
dev
=
make_device_config
(
device
);
if
(
!
dev
)
{
fprintf
(
stderr
,
"failed to create device config
\n
"
);
return
1
;
}
if
(
!
dev
->
init
(
device
)
)
{
fprintf
(
stderr
,
"error with initialising device configuration
\n
"
);
delete
dev
;
return
1
;
}
vdev
.
push_back
(
dev
);
}
std
::
vector
<
device_cfg_t
*>::
size_type
num_devices
=
vdev
.
size
();
if
(
num_devices
==
0
)
{
fprintf
(
stderr
,
"no devices in the config file
\n
"
);
return
1
;
}
zmq
::
context_t
ctx
(
num_devices
,
1
);
for
(
unsigned
int
i
=
0
;
i
<
num_devices
;
++
i
)
{
vdev
[
i
]
->
set_context
(
&
ctx
);
if
(
i
)
{
pthread_t
worker
;
int
rc
=
pthread_create
(
&
worker
,
NULL
,
&
worker_function
,
(
void
*
)
vdev
[
i
]);
assert
(
rc
==
0
);
}
}
worker_function
((
void
*
)
vdev
[
0
]);
return
0
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment