Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
L
libzmq
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
libzmq
Commits
33f22d0d
Commit
33f22d0d
authored
Mar 15, 2014
by
bebopagogo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
added norm_engine
parent
c91a638a
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
77 additions
and
9 deletions
+77
-9
configure.ac
configure.ac
+22
-0
Makefile.am
src/Makefile.am
+2
-0
session_base.cpp
src/session_base.cpp
+36
-2
socket_base.cpp
src/socket_base.cpp
+17
-7
No files found.
configure.ac
View file @
33f22d0d
...
@@ -463,6 +463,28 @@ fi
...
@@ -463,6 +463,28 @@ fi
AC_SUBST(pgm_basename)
AC_SUBST(pgm_basename)
# This uses "--with-norm" to point to the "norm" directory
# for "norm/include" and "norm/lib"
#(if "--with-norm=yes" is given, then assume installed on system)
AC_ARG_WITH([norm], [AS_HELP_STRING([--with-norm],
[build libzmq with NORM protocol extension, optionally specifying norm path [default=no]])],
[with_norm_ext=$withval], [with_norm_ext=no])
AC_MSG_CHECKING("with_norm_ext = ${with_norm_ext}")
if test "x$with_norm_ext" != "xno"; then
AC_DEFINE(ZMQ_HAVE_NORM, 1, [Have NORM protocol extension])
if test "x$wwith_norm_ext" != "xyes"; then
norm_path="${with_norm_ext}"
LIBZMQ_EXTRA_CXXFLAGS="-I${norm_path}/include ${LIBZMQ_EXTRA_CXXFLAGS}"
LIBZMQ_EXTRA_LDFLAGS="-I${norm_path}/include ${LIBZMQ_EXTRA_LDFLAGS}"
fi
LIBS="-lnorm $LIBS"
fi
# Set -Wall, -Werror and -pedantic
# Set -Wall, -Werror and -pedantic
AC_LANG_PUSH([C++])
AC_LANG_PUSH([C++])
...
...
src/Makefile.am
View file @
33f22d0d
...
@@ -45,6 +45,7 @@ libzmq_la_SOURCES = \
...
@@ -45,6 +45,7 @@ libzmq_la_SOURCES = \
msg.hpp
\
msg.hpp
\
mtrie.hpp
\
mtrie.hpp
\
mutex.hpp
\
mutex.hpp
\
norm_engine.hpp
\
null_mechanism.hpp
\
null_mechanism.hpp
\
object.hpp
\
object.hpp
\
options.hpp
\
options.hpp
\
...
@@ -112,6 +113,7 @@ libzmq_la_SOURCES = \
...
@@ -112,6 +113,7 @@ libzmq_la_SOURCES = \
mechanism.cpp
\
mechanism.cpp
\
msg.cpp
\
msg.cpp
\
mtrie.cpp
\
mtrie.cpp
\
norm_engine.cpp
\
null_mechanism.cpp
\
null_mechanism.cpp
\
object.cpp
\
object.cpp
\
options.cpp
\
options.cpp
\
...
...
src/session_base.cpp
View file @
33f22d0d
...
@@ -28,6 +28,7 @@
...
@@ -28,6 +28,7 @@
#include "pgm_sender.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "pgm_receiver.hpp"
#include "address.hpp"
#include "address.hpp"
#include "norm_engine.hpp"
#include "ctx.hpp"
#include "ctx.hpp"
#include "req.hpp"
#include "req.hpp"
...
@@ -449,8 +450,9 @@ void zmq::session_base_t::reconnect ()
...
@@ -449,8 +450,9 @@ void zmq::session_base_t::reconnect ()
{
{
// For delayed connect situations, terminate the pipe
// For delayed connect situations, terminate the pipe
// and reestablish later on
// and reestablish later on
if
(
pipe
&&
options
.
immediate
==
1
if
(
pipe
&&
1
==
options
.
immediate
==
1
&&
addr
->
protocol
!=
"pgm"
&&
addr
->
protocol
!=
"epgm"
)
{
&&
addr
->
protocol
!=
"pgm"
&&
addr
->
protocol
!=
"epgm"
&&
addr
->
protocol
!=
"norm"
)
{
pipe
->
hiccup
();
pipe
->
hiccup
();
pipe
->
terminate
(
false
);
pipe
->
terminate
(
false
);
terminating_pipes
.
insert
(
pipe
);
terminating_pipes
.
insert
(
pipe
);
...
@@ -549,6 +551,38 @@ void zmq::session_base_t::start_connecting (bool wait_)
...
@@ -549,6 +551,38 @@ void zmq::session_base_t::start_connecting (bool wait_)
return
;
return
;
}
}
#endif
#endif
#ifdef ZMQ_HAVE_NORM
if
(
addr
->
protocol
==
"norm"
)
{
// At this point we'll create message pipes to the session straight
// away. There's no point in delaying it as no concept of 'connect'
// exists with NORM anyway.
if
(
options
.
type
==
ZMQ_PUB
||
options
.
type
==
ZMQ_XPUB
)
{
// NORM sender.
norm_engine_t
*
norm_sender
=
new
(
std
::
nothrow
)
norm_engine_t
(
io_thread
,
options
);
alloc_assert
(
norm_sender
);
int
rc
=
norm_sender
->
init
(
addr
->
address
.
c_str
(),
true
,
false
);
errno_assert
(
rc
==
0
);
send_attach
(
this
,
norm_sender
);
}
else
{
// ZMQ_SUB or ZMQ_XSUB
// NORM receiver.
norm_engine_t
*
norm_receiver
=
new
(
std
::
nothrow
)
norm_engine_t
(
io_thread
,
options
);
alloc_assert
(
norm_receiver
);
int
rc
=
norm_receiver
->
init
(
addr
->
address
.
c_str
(),
false
,
true
);
errno_assert
(
rc
==
0
);
send_attach
(
this
,
norm_receiver
);
}
return
;
}
#endif // ZMQ_HAVE_NORM
zmq_assert
(
false
);
zmq_assert
(
false
);
}
}
...
...
src/socket_base.cpp
View file @
33f22d0d
...
@@ -190,11 +190,11 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
...
@@ -190,11 +190,11 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
{
{
// First check out whether the protcol is something we are aware of.
// First check out whether the protcol is something we are aware of.
if
(
protocol_
!=
"inproc"
&&
protocol_
!=
"ipc"
&&
protocol_
!=
"tcp"
&&
if
(
protocol_
!=
"inproc"
&&
protocol_
!=
"ipc"
&&
protocol_
!=
"tcp"
&&
protocol_
!=
"pgm"
&&
protocol_
!=
"epgm"
&&
protocol_
!=
"tipc"
)
{
protocol_
!=
"pgm"
&&
protocol_
!=
"epgm"
&&
protocol_
!=
"tipc"
&&
protocol_
!=
"norm"
)
{
errno
=
EPROTONOSUPPORT
;
errno
=
EPROTONOSUPPORT
;
return
-
1
;
return
-
1
;
}
}
// If 0MQ is not compiled with OpenPGM, pgm and epgm transports
// If 0MQ is not compiled with OpenPGM, pgm and epgm transports
// are not avaialble.
// are not avaialble.
#if !defined ZMQ_HAVE_OPENPGM
#if !defined ZMQ_HAVE_OPENPGM
...
@@ -203,6 +203,13 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
...
@@ -203,6 +203,13 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
return
-
1
;
return
-
1
;
}
}
#endif
#endif
#if !defined ZMQ_HAVE_NORM
if
(
protocol_
==
"norm"
)
{
errno
=
EPROTONOSUPPORT
;
return
-
1
;
}
#endif // !ZMQ_HAVE_NORM
// IPC transport is not available on Windows and OpenVMS.
// IPC transport is not available on Windows and OpenVMS.
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
#if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
...
@@ -224,7 +231,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
...
@@ -224,7 +231,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
// Check whether socket type and transport protocol match.
// Check whether socket type and transport protocol match.
// Specifically, multicast protocols can't be combined with
// Specifically, multicast protocols can't be combined with
// bi-directional messaging patterns (socket types).
// bi-directional messaging patterns (socket types).
if
((
protocol_
==
"pgm"
||
protocol_
==
"epgm"
)
&&
if
((
protocol_
==
"pgm"
||
protocol_
==
"epgm"
||
protocol_
==
"norm"
)
&&
options
.
type
!=
ZMQ_PUB
&&
options
.
type
!=
ZMQ_SUB
&&
options
.
type
!=
ZMQ_PUB
&&
options
.
type
!=
ZMQ_SUB
&&
options
.
type
!=
ZMQ_XPUB
&&
options
.
type
!=
ZMQ_XSUB
)
{
options
.
type
!=
ZMQ_XPUB
&&
options
.
type
!=
ZMQ_XSUB
)
{
errno
=
ENOCOMPATPROTO
;
errno
=
ENOCOMPATPROTO
;
...
@@ -362,9 +369,9 @@ int zmq::socket_base_t::bind (const char *addr_)
...
@@ -362,9 +369,9 @@ int zmq::socket_base_t::bind (const char *addr_)
return
rc
;
return
rc
;
}
}
if
(
protocol
==
"pgm"
||
protocol
==
"epgm"
)
{
if
(
protocol
==
"pgm"
||
protocol
==
"epgm"
||
protocol
==
"norm"
)
{
// For convenience's sake, bind can be used interchageable with
// For convenience's sake, bind can be used interchageable with
// connect for PGM
and EPG
M transports.
// connect for PGM
, EPGM and NOR
M transports.
return
connect
(
addr_
);
return
connect
(
addr_
);
}
}
...
@@ -600,6 +607,9 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -600,6 +607,9 @@ int zmq::socket_base_t::connect (const char *addr_)
}
}
}
}
#endif
#endif
// TBD - Should we check address for ZMQ_HAVE_NORM???
#ifdef ZMQ_HAVE_OPENPGM
#ifdef ZMQ_HAVE_OPENPGM
if
(
protocol
==
"pgm"
||
protocol
==
"epgm"
)
{
if
(
protocol
==
"pgm"
||
protocol
==
"epgm"
)
{
struct
pgm_addrinfo_t
*
res
=
NULL
;
struct
pgm_addrinfo_t
*
res
=
NULL
;
...
@@ -630,8 +640,8 @@ int zmq::socket_base_t::connect (const char *addr_)
...
@@ -630,8 +640,8 @@ int zmq::socket_base_t::connect (const char *addr_)
errno_assert
(
session
);
errno_assert
(
session
);
// PGM does not support subscription forwarding; ask for all data to be
// PGM does not support subscription forwarding; ask for all data to be
// sent to this pipe.
// sent to this pipe.
(same for NORM, currently?)
bool
subscribe_to_all
=
protocol
==
"pgm"
||
protocol
==
"epgm"
;
bool
subscribe_to_all
=
protocol
==
"pgm"
||
protocol
==
"epgm"
||
protocol
==
"norm"
;
pipe_t
*
newpipe
=
NULL
;
pipe_t
*
newpipe
=
NULL
;
if
(
options
.
immediate
!=
1
||
subscribe_to_all
)
{
if
(
options
.
immediate
!=
1
||
subscribe_to_all
)
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment