Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
C
capnproto
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
capnproto
Commits
bfe76435
Commit
bfe76435
authored
Oct 07, 2014
by
Kenton Varda
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #120 from maurer/ezrpc-readeroptions
Add ReaderOptions configurability to EzRpc
parents
ec3ecb7b
f0335e7b
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
75 additions
and
45 deletions
+75
-45
ez-rpc.c++
c++/src/capnp/ez-rpc.c++
+46
-38
ez-rpc.h
c++/src/capnp/ez-rpc.h
+29
-7
No files found.
c++/src/capnp/ez-rpc.c++
View file @
bfe76435
...
@@ -80,9 +80,9 @@ struct EzRpcClient::Impl {
...
@@ -80,9 +80,9 @@ struct EzRpcClient::Impl {
TwoPartyVatNetwork
network
;
TwoPartyVatNetwork
network
;
RpcSystem
<
rpc
::
twoparty
::
SturdyRefHostId
>
rpcSystem
;
RpcSystem
<
rpc
::
twoparty
::
SturdyRefHostId
>
rpcSystem
;
ClientContext
(
kj
::
Own
<
kj
::
AsyncIoStream
>&&
stream
)
ClientContext
(
kj
::
Own
<
kj
::
AsyncIoStream
>&&
stream
,
ReaderOptions
readerOpts
)
:
stream
(
kj
::
mv
(
stream
)),
:
stream
(
kj
::
mv
(
stream
)),
network
(
*
this
->
stream
,
rpc
::
twoparty
::
Side
::
CLIENT
),
network
(
*
this
->
stream
,
rpc
::
twoparty
::
Side
::
CLIENT
,
readerOpts
),
rpcSystem
(
makeRpcClient
(
network
))
{}
rpcSystem
(
makeRpcClient
(
network
))
{}
Capability
::
Client
restore
(
kj
::
StringPtr
name
)
{
Capability
::
Client
restore
(
kj
::
StringPtr
name
)
{
...
@@ -102,39 +102,44 @@ struct EzRpcClient::Impl {
...
@@ -102,39 +102,44 @@ struct EzRpcClient::Impl {
kj
::
Maybe
<
kj
::
Own
<
ClientContext
>>
clientContext
;
kj
::
Maybe
<
kj
::
Own
<
ClientContext
>>
clientContext
;
// Filled in before `setupPromise` resolves.
// Filled in before `setupPromise` resolves.
Impl
(
kj
::
StringPtr
serverAddress
,
uint
defaultPort
)
Impl
(
kj
::
StringPtr
serverAddress
,
uint
defaultPort
,
ReaderOptions
readerOpts
)
:
context
(
EzRpcContext
::
getThreadLocal
()),
:
context
(
EzRpcContext
::
getThreadLocal
()),
setupPromise
(
context
->
getIoProvider
().
getNetwork
()
setupPromise
(
context
->
getIoProvider
().
getNetwork
()
.
parseAddress
(
serverAddress
,
defaultPort
)
.
parseAddress
(
serverAddress
,
defaultPort
)
.
then
([](
kj
::
Own
<
kj
::
NetworkAddress
>&&
addr
)
{
.
then
([
readerOpts
](
kj
::
Own
<
kj
::
NetworkAddress
>&&
addr
)
{
return
addr
->
connect
();
return
addr
->
connect
();
}).
then
([
this
](
kj
::
Own
<
kj
::
AsyncIoStream
>&&
stream
)
{
}).
then
([
this
,
readerOpts
](
kj
::
Own
<
kj
::
AsyncIoStream
>&&
stream
)
{
clientContext
=
kj
::
heap
<
ClientContext
>
(
kj
::
mv
(
stream
));
clientContext
=
kj
::
heap
<
ClientContext
>
(
kj
::
mv
(
stream
),
readerOpts
);
}).
fork
())
{}
}).
fork
())
{}
Impl
(
const
struct
sockaddr
*
serverAddress
,
uint
addrSize
)
Impl
(
const
struct
sockaddr
*
serverAddress
,
uint
addrSize
,
ReaderOptions
readerOpts
)
:
context
(
EzRpcContext
::
getThreadLocal
()),
:
context
(
EzRpcContext
::
getThreadLocal
()),
setupPromise
(
context
->
getIoProvider
().
getNetwork
()
setupPromise
(
context
->
getIoProvider
().
getNetwork
()
.
getSockaddr
(
serverAddress
,
addrSize
)
->
connect
()
.
getSockaddr
(
serverAddress
,
addrSize
)
->
connect
()
.
then
([
this
](
kj
::
Own
<
kj
::
AsyncIoStream
>&&
stream
)
{
.
then
([
this
,
readerOpts
](
kj
::
Own
<
kj
::
AsyncIoStream
>&&
stream
)
{
clientContext
=
kj
::
heap
<
ClientContext
>
(
kj
::
mv
(
stream
));
clientContext
=
kj
::
heap
<
ClientContext
>
(
kj
::
mv
(
stream
),
readerOpts
);
}).
fork
())
{}
}).
fork
())
{}
Impl
(
int
socketFd
)
Impl
(
int
socketFd
,
ReaderOptions
readerOpts
)
:
context
(
EzRpcContext
::
getThreadLocal
()),
:
context
(
EzRpcContext
::
getThreadLocal
()),
setupPromise
(
kj
::
Promise
<
void
>
(
kj
::
READY_NOW
).
fork
()),
setupPromise
(
kj
::
Promise
<
void
>
(
kj
::
READY_NOW
).
fork
()),
clientContext
(
kj
::
heap
<
ClientContext
>
(
clientContext
(
kj
::
heap
<
ClientContext
>
(
context
->
getLowLevelIoProvider
().
wrapSocketFd
(
socketFd
)))
{}
context
->
getLowLevelIoProvider
().
wrapSocketFd
(
socketFd
),
readerOpts
))
{}
};
};
EzRpcClient
::
EzRpcClient
(
kj
::
StringPtr
serverAddress
,
uint
defaultPort
)
EzRpcClient
::
EzRpcClient
(
kj
::
StringPtr
serverAddress
,
uint
defaultPort
,
ReaderOptions
readerOpts
)
:
impl
(
kj
::
heap
<
Impl
>
(
serverAddress
,
defaultPort
))
{}
:
impl
(
kj
::
heap
<
Impl
>
(
serverAddress
,
defaultPort
,
readerOpts
))
{}
EzRpcClient
::
EzRpcClient
(
const
struct
sockaddr
*
serverAddress
,
uint
addrSize
)
EzRpcClient
::
EzRpcClient
(
const
struct
sockaddr
*
serverAddress
,
uint
addrSize
,
ReaderOptions
readerOpts
)
:
impl
(
kj
::
heap
<
Impl
>
(
serverAddress
,
addrSize
))
{}
:
impl
(
kj
::
heap
<
Impl
>
(
serverAddress
,
addrSize
,
readerOpts
))
{}
EzRpcClient
::
EzRpcClient
(
int
socketFd
)
EzRpcClient
::
EzRpcClient
(
int
socketFd
,
ReaderOptions
readerOpts
)
:
impl
(
kj
::
heap
<
Impl
>
(
socketFd
))
{}
:
impl
(
kj
::
heap
<
Impl
>
(
socketFd
,
readerOpts
))
{}
EzRpcClient
::~
EzRpcClient
()
noexcept
(
false
)
{}
EzRpcClient
::~
EzRpcClient
()
noexcept
(
false
)
{}
...
@@ -192,50 +197,51 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
...
@@ -192,50 +197,51 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
TwoPartyVatNetwork
network
;
TwoPartyVatNetwork
network
;
RpcSystem
<
rpc
::
twoparty
::
SturdyRefHostId
>
rpcSystem
;
RpcSystem
<
rpc
::
twoparty
::
SturdyRefHostId
>
rpcSystem
;
ServerContext
(
kj
::
Own
<
kj
::
AsyncIoStream
>&&
stream
,
SturdyRefRestorer
<
Text
>&
restorer
)
ServerContext
(
kj
::
Own
<
kj
::
AsyncIoStream
>&&
stream
,
SturdyRefRestorer
<
Text
>&
restorer
,
ReaderOptions
readerOpts
)
:
stream
(
kj
::
mv
(
stream
)),
:
stream
(
kj
::
mv
(
stream
)),
network
(
*
this
->
stream
,
rpc
::
twoparty
::
Side
::
SERVER
),
network
(
*
this
->
stream
,
rpc
::
twoparty
::
Side
::
SERVER
,
readerOpts
),
rpcSystem
(
makeRpcServer
(
network
,
restorer
))
{}
rpcSystem
(
makeRpcServer
(
network
,
restorer
))
{}
};
};
Impl
(
kj
::
StringPtr
bindAddress
,
uint
defaultPort
)
Impl
(
kj
::
StringPtr
bindAddress
,
uint
defaultPort
,
ReaderOptions
readerOpts
)
:
context
(
EzRpcContext
::
getThreadLocal
()),
portPromise
(
nullptr
),
tasks
(
*
this
)
{
:
context
(
EzRpcContext
::
getThreadLocal
()),
portPromise
(
nullptr
),
tasks
(
*
this
)
{
auto
paf
=
kj
::
newPromiseAndFulfiller
<
uint
>
();
auto
paf
=
kj
::
newPromiseAndFulfiller
<
uint
>
();
portPromise
=
paf
.
promise
.
fork
();
portPromise
=
paf
.
promise
.
fork
();
tasks
.
add
(
context
->
getIoProvider
().
getNetwork
().
parseAddress
(
bindAddress
,
defaultPort
)
tasks
.
add
(
context
->
getIoProvider
().
getNetwork
().
parseAddress
(
bindAddress
,
defaultPort
)
.
then
(
kj
::
mvCapture
(
paf
.
fulfiller
,
.
then
(
kj
::
mvCapture
(
paf
.
fulfiller
,
[
this
](
kj
::
Own
<
kj
::
PromiseFulfiller
<
uint
>>&&
portFulfiller
,
[
this
,
readerOpts
](
kj
::
Own
<
kj
::
PromiseFulfiller
<
uint
>>&&
portFulfiller
,
kj
::
Own
<
kj
::
NetworkAddress
>&&
addr
)
{
kj
::
Own
<
kj
::
NetworkAddress
>&&
addr
)
{
auto
listener
=
addr
->
listen
();
auto
listener
=
addr
->
listen
();
portFulfiller
->
fulfill
(
listener
->
getPort
());
portFulfiller
->
fulfill
(
listener
->
getPort
());
acceptLoop
(
kj
::
mv
(
listener
));
acceptLoop
(
kj
::
mv
(
listener
)
,
readerOpts
);
})));
})));
}
}
Impl
(
struct
sockaddr
*
bindAddress
,
uint
addrSize
)
Impl
(
struct
sockaddr
*
bindAddress
,
uint
addrSize
,
ReaderOptions
readerOpts
)
:
context
(
EzRpcContext
::
getThreadLocal
()),
portPromise
(
nullptr
),
tasks
(
*
this
)
{
:
context
(
EzRpcContext
::
getThreadLocal
()),
portPromise
(
nullptr
),
tasks
(
*
this
)
{
auto
listener
=
context
->
getIoProvider
().
getNetwork
()
auto
listener
=
context
->
getIoProvider
().
getNetwork
()
.
getSockaddr
(
bindAddress
,
addrSize
)
->
listen
();
.
getSockaddr
(
bindAddress
,
addrSize
)
->
listen
();
portPromise
=
kj
::
Promise
<
uint
>
(
listener
->
getPort
()).
fork
();
portPromise
=
kj
::
Promise
<
uint
>
(
listener
->
getPort
()).
fork
();
acceptLoop
(
kj
::
mv
(
listener
));
acceptLoop
(
kj
::
mv
(
listener
)
,
readerOpts
);
}
}
Impl
(
int
socketFd
,
uint
port
)
Impl
(
int
socketFd
,
uint
port
,
ReaderOptions
readerOpts
)
:
context
(
EzRpcContext
::
getThreadLocal
()),
:
context
(
EzRpcContext
::
getThreadLocal
()),
portPromise
(
kj
::
Promise
<
uint
>
(
port
).
fork
()),
portPromise
(
kj
::
Promise
<
uint
>
(
port
).
fork
()),
tasks
(
*
this
)
{
tasks
(
*
this
)
{
acceptLoop
(
context
->
getLowLevelIoProvider
().
wrapListenSocketFd
(
socketFd
));
acceptLoop
(
context
->
getLowLevelIoProvider
().
wrapListenSocketFd
(
socketFd
)
,
readerOpts
);
}
}
void
acceptLoop
(
kj
::
Own
<
kj
::
ConnectionReceiver
>&&
listener
)
{
void
acceptLoop
(
kj
::
Own
<
kj
::
ConnectionReceiver
>&&
listener
,
ReaderOptions
readerOpts
)
{
auto
ptr
=
listener
.
get
();
auto
ptr
=
listener
.
get
();
tasks
.
add
(
ptr
->
accept
().
then
(
kj
::
mvCapture
(
kj
::
mv
(
listener
),
tasks
.
add
(
ptr
->
accept
().
then
(
kj
::
mvCapture
(
kj
::
mv
(
listener
),
[
this
](
kj
::
Own
<
kj
::
ConnectionReceiver
>&&
listener
,
[
this
,
readerOpts
](
kj
::
Own
<
kj
::
ConnectionReceiver
>&&
listener
,
kj
::
Own
<
kj
::
AsyncIoStream
>&&
connection
)
{
kj
::
Own
<
kj
::
AsyncIoStream
>&&
connection
)
{
acceptLoop
(
kj
::
mv
(
listener
));
acceptLoop
(
kj
::
mv
(
listener
)
,
readerOpts
);
auto
server
=
kj
::
heap
<
ServerContext
>
(
kj
::
mv
(
connection
),
*
this
);
auto
server
=
kj
::
heap
<
ServerContext
>
(
kj
::
mv
(
connection
),
*
this
,
readerOpts
);
// Arrange to destroy the server context when all references are gone, or when the
// Arrange to destroy the server context when all references are gone, or when the
// EzRpcServer is destroyed (which will destroy the TaskSet).
// EzRpcServer is destroyed (which will destroy the TaskSet).
...
@@ -258,14 +264,16 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
...
@@ -258,14 +264,16 @@ struct EzRpcServer::Impl final: public SturdyRefRestorer<Text>, public kj::TaskS
}
}
};
};
EzRpcServer
::
EzRpcServer
(
kj
::
StringPtr
bindAddress
,
uint
defaultPort
)
EzRpcServer
::
EzRpcServer
(
kj
::
StringPtr
bindAddress
,
uint
defaultPort
,
:
impl
(
kj
::
heap
<
Impl
>
(
bindAddress
,
defaultPort
))
{}
ReaderOptions
readerOpts
)
:
impl
(
kj
::
heap
<
Impl
>
(
bindAddress
,
defaultPort
,
readerOpts
))
{}
EzRpcServer
::
EzRpcServer
(
struct
sockaddr
*
bindAddress
,
uint
addrSize
)
EzRpcServer
::
EzRpcServer
(
struct
sockaddr
*
bindAddress
,
uint
addrSize
,
:
impl
(
kj
::
heap
<
Impl
>
(
bindAddress
,
addrSize
))
{}
ReaderOptions
readerOpts
)
:
impl
(
kj
::
heap
<
Impl
>
(
bindAddress
,
addrSize
,
readerOpts
))
{}
EzRpcServer
::
EzRpcServer
(
int
socketFd
,
uint
port
)
EzRpcServer
::
EzRpcServer
(
int
socketFd
,
uint
port
,
ReaderOptions
readerOpts
)
:
impl
(
kj
::
heap
<
Impl
>
(
socketFd
,
port
))
{}
:
impl
(
kj
::
heap
<
Impl
>
(
socketFd
,
port
,
readerOpts
))
{}
EzRpcServer
::~
EzRpcServer
()
noexcept
(
false
)
{}
EzRpcServer
::~
EzRpcServer
()
noexcept
(
false
)
{}
...
...
c++/src/capnp/ez-rpc.h
View file @
bfe76435
...
@@ -23,6 +23,7 @@
...
@@ -23,6 +23,7 @@
#define CAPNP_EZ_RPC_H_
#define CAPNP_EZ_RPC_H_
#include "rpc.h"
#include "rpc.h"
#include "message.h"
struct
sockaddr
;
struct
sockaddr
;
...
@@ -91,7 +92,8 @@ class EzRpcClient {
...
@@ -91,7 +92,8 @@ class EzRpcClient {
// - `TwoPartyVatNetwork` in `capnp/rpc-twoparty.h`.
// - `TwoPartyVatNetwork` in `capnp/rpc-twoparty.h`.
public
:
public
:
explicit
EzRpcClient
(
kj
::
StringPtr
serverAddress
,
uint
defaultPort
=
0
);
explicit
EzRpcClient
(
kj
::
StringPtr
serverAddress
,
uint
defaultPort
=
0
,
ReaderOptions
readerOpts
=
ReaderOptions
());
// Construct a new EzRpcClient and connect to the given address. The connection is formed in
// Construct a new EzRpcClient and connect to the given address. The connection is formed in
// the background -- if it fails, calls to capabilities returned by importCap() will fail with an
// the background -- if it fails, calls to capabilities returned by importCap() will fail with an
// appropriate exception.
// appropriate exception.
...
@@ -101,13 +103,23 @@ public:
...
@@ -101,13 +103,23 @@ public:
//
//
// The address is parsed by `kj::Network` in `kj/async-io.h`. See that interface for more info
// The address is parsed by `kj::Network` in `kj/async-io.h`. See that interface for more info
// on the address format, but basically it's what you'd expect.
// on the address format, but basically it's what you'd expect.
//
// `readerOpts` is the ReaderOptions structure used by capnproto to limit the traversal of
// messages. If not specified, a default value of 8M max word traversal and a nesting limit of 64
// deep.
// You should only need to set this if you are receiving errors about messages being too large or
// too deep in normal operation, and should consider restructuring your protocol to use simpler
// or smaller messages if this is an issue for you.
EzRpcClient
(
const
struct
sockaddr
*
serverAddress
,
uint
addrSize
);
EzRpcClient
(
const
struct
sockaddr
*
serverAddress
,
uint
addrSize
,
ReaderOptions
readerOpts
=
ReaderOptions
());
// Like the above constructor, but connects to an already-resolved socket address. Any address
// Like the above constructor, but connects to an already-resolved socket address. Any address
// format supported by `kj::Network` in `kj/async-io.h` is accepted.
// format supported by `kj::Network` in `kj/async-io.h` is accepted.
explicit
EzRpcClient
(
int
socketFd
);
explicit
EzRpcClient
(
int
socketFd
,
ReaderOptions
readerOpts
=
ReaderOptions
()
);
// Create a client on top of an already-connected socket.
// Create a client on top of an already-connected socket.
// `readerOpts` acts as in the first constructor.
~
EzRpcClient
()
noexcept
(
false
);
~
EzRpcClient
()
noexcept
(
false
);
...
@@ -138,7 +150,8 @@ class EzRpcServer {
...
@@ -138,7 +150,8 @@ class EzRpcServer {
// The server counterpart to `EzRpcClient`. See `EzRpcClient` for an example.
// The server counterpart to `EzRpcClient`. See `EzRpcClient` for an example.
public
:
public
:
explicit
EzRpcServer
(
kj
::
StringPtr
bindAddress
,
uint
defaultPort
=
0
);
explicit
EzRpcServer
(
kj
::
StringPtr
bindAddress
,
uint
defaultPort
=
0
,
ReaderOptions
readerOpts
=
ReaderOptions
());
// Construct a new `EzRpcServer` that binds to the given address. An address of "*" means to
// Construct a new `EzRpcServer` that binds to the given address. An address of "*" means to
// bind to all local addresses.
// bind to all local addresses.
//
//
...
@@ -152,14 +165,23 @@ public:
...
@@ -152,14 +165,23 @@ public:
// The server might not begin listening immediately, especially if `bindAddress` needs to be
// The server might not begin listening immediately, especially if `bindAddress` needs to be
// resolved. If you need to wait until the server is definitely up, wait on the promise returned
// resolved. If you need to wait until the server is definitely up, wait on the promise returned
// by `getPort()`.
// by `getPort()`.
//
EzRpcServer
(
struct
sockaddr
*
bindAddress
,
uint
addrSize
);
// `readerOpts` is the ReaderOptions structure used by capnproto to limit the traversal of
// messages. If not specified, a default value of 8M max word traversal and a nesting limit of 64
// deep.
// You should only need to set this if you are receiving errors about messages being too large or
// too deep in normal operation, and should consider restructuring your protocol to use simpler
// or smaller messages if this is an issue for you.
EzRpcServer
(
struct
sockaddr
*
bindAddress
,
uint
addrSize
,
ReaderOptions
readerOpts
=
ReaderOptions
());
// Like the above constructor, but binds to an already-resolved socket address. Any address
// Like the above constructor, but binds to an already-resolved socket address. Any address
// format supported by `kj::Network` in `kj/async-io.h` is accepted.
// format supported by `kj::Network` in `kj/async-io.h` is accepted.
EzRpcServer
(
int
socketFd
,
uint
port
);
EzRpcServer
(
int
socketFd
,
uint
port
,
ReaderOptions
readerOpts
=
ReaderOptions
()
);
// Create a server on top of an already-listening socket (i.e. one on which accept() may be
// Create a server on top of an already-listening socket (i.e. one on which accept() may be
// called). `port` is returned by `getPort()` -- it serves no other purpose.
// called). `port` is returned by `getPort()` -- it serves no other purpose.
// `readerOpts` acts as in the other two above constructors.
~
EzRpcServer
()
noexcept
(
false
);
~
EzRpcServer
()
noexcept
(
false
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment