Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
C
capnproto
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
capnproto
Commits
d690ae52
Commit
d690ae52
authored
May 30, 2019
by
Kenton Varda
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Fix typos.
parent
77f20b46
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
27 additions
and
25 deletions
+27
-25
rpc-twoparty.c++
c++/src/capnp/rpc-twoparty.c++
+17
-16
rpc-twoparty.h
c++/src/capnp/rpc-twoparty.h
+9
-8
async-io.h
c++/src/kj/async-io.h
+1
-1
No files found.
c++/src/capnp/rpc-twoparty.c++
View file @
d690ae52
...
@@ -28,7 +28,7 @@ namespace capnp {
...
@@ -28,7 +28,7 @@ namespace capnp {
TwoPartyVatNetwork
::
TwoPartyVatNetwork
(
kj
::
AsyncIoStream
&
stream
,
rpc
::
twoparty
::
Side
side
,
TwoPartyVatNetwork
::
TwoPartyVatNetwork
(
kj
::
AsyncIoStream
&
stream
,
rpc
::
twoparty
::
Side
side
,
ReaderOptions
receiveOptions
)
ReaderOptions
receiveOptions
)
:
stream
(
&
stream
),
maxFdsPerMesage
(
0
),
side
(
side
),
peerVatId
(
4
),
:
stream
(
&
stream
),
maxFdsPerMes
s
age
(
0
),
side
(
side
),
peerVatId
(
4
),
receiveOptions
(
receiveOptions
),
previousWrite
(
kj
::
READY_NOW
)
{
receiveOptions
(
receiveOptions
),
previousWrite
(
kj
::
READY_NOW
)
{
peerVatId
.
initRoot
<
rpc
::
twoparty
::
VatId
>
().
setSide
(
peerVatId
.
initRoot
<
rpc
::
twoparty
::
VatId
>
().
setSide
(
side
==
rpc
::
twoparty
::
Side
::
CLIENT
?
rpc
::
twoparty
::
Side
::
SERVER
side
==
rpc
::
twoparty
::
Side
::
CLIENT
?
rpc
::
twoparty
::
Side
::
SERVER
...
@@ -39,11 +39,11 @@ TwoPartyVatNetwork::TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty:
...
@@ -39,11 +39,11 @@ TwoPartyVatNetwork::TwoPartyVatNetwork(kj::AsyncIoStream& stream, rpc::twoparty:
disconnectFulfiller
.
fulfiller
=
kj
::
mv
(
paf
.
fulfiller
);
disconnectFulfiller
.
fulfiller
=
kj
::
mv
(
paf
.
fulfiller
);
}
}
TwoPartyVatNetwork
::
TwoPartyVatNetwork
(
kj
::
AsyncCapabilityStream
&
stream
,
uint
maxFdsPerMesage
,
TwoPartyVatNetwork
::
TwoPartyVatNetwork
(
kj
::
AsyncCapabilityStream
&
stream
,
uint
maxFdsPerMes
s
age
,
rpc
::
twoparty
::
Side
side
,
ReaderOptions
receiveOptions
)
rpc
::
twoparty
::
Side
side
,
ReaderOptions
receiveOptions
)
:
TwoPartyVatNetwork
(
stream
,
side
,
receiveOptions
)
{
:
TwoPartyVatNetwork
(
stream
,
side
,
receiveOptions
)
{
this
->
stream
=
&
stream
;
this
->
stream
=
&
stream
;
this
->
maxFdsPerMes
age
=
maxFdsPerMe
sage
;
this
->
maxFdsPerMes
sage
=
maxFdsPerMes
sage
;
}
}
void
TwoPartyVatNetwork
::
FulfillerDisposer
::
disposeImpl
(
void
*
pointer
)
const
{
void
TwoPartyVatNetwork
::
FulfillerDisposer
::
disposeImpl
(
void
*
pointer
)
const
{
...
@@ -182,7 +182,7 @@ kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> TwoPartyVatNetwork::receiveI
...
@@ -182,7 +182,7 @@ kj::Promise<kj::Maybe<kj::Own<IncomingRpcMessage>>> TwoPartyVatNetwork::receiveI
});
});
}
}
KJ_CASE_ONEOF
(
capStream
,
kj
::
AsyncCapabilityStream
*
)
{
KJ_CASE_ONEOF
(
capStream
,
kj
::
AsyncCapabilityStream
*
)
{
auto
fdSpace
=
kj
::
heapArray
<
kj
::
AutoCloseFd
>
(
maxFdsPerMesage
);
auto
fdSpace
=
kj
::
heapArray
<
kj
::
AutoCloseFd
>
(
maxFdsPerMes
s
age
);
auto
promise
=
tryReadMessage
(
*
capStream
,
fdSpace
,
receiveOptions
);
auto
promise
=
tryReadMessage
(
*
capStream
,
fdSpace
,
receiveOptions
);
return
promise
.
then
([
fdSpace
=
kj
::
mv
(
fdSpace
)]
return
promise
.
then
([
fdSpace
=
kj
::
mv
(
fdSpace
)]
(
kj
::
Maybe
<
MessageReaderAndFds
>&&
messageAndFds
)
mutable
(
kj
::
Maybe
<
MessageReaderAndFds
>&&
messageAndFds
)
mutable
...
@@ -237,10 +237,10 @@ struct TwoPartyServer::AcceptedConnection {
...
@@ -237,10 +237,10 @@ struct TwoPartyServer::AcceptedConnection {
explicit
AcceptedConnection
(
Capability
::
Client
bootstrapInterface
,
explicit
AcceptedConnection
(
Capability
::
Client
bootstrapInterface
,
kj
::
Own
<
kj
::
AsyncCapabilityStream
>&&
connectionParam
,
kj
::
Own
<
kj
::
AsyncCapabilityStream
>&&
connectionParam
,
uint
maxFdsPerMesage
)
uint
maxFdsPerMes
s
age
)
:
connection
(
kj
::
mv
(
connectionParam
)),
:
connection
(
kj
::
mv
(
connectionParam
)),
network
(
kj
::
downcast
<
kj
::
AsyncCapabilityStream
>
(
*
connection
),
network
(
kj
::
downcast
<
kj
::
AsyncCapabilityStream
>
(
*
connection
),
maxFdsPerMesage
,
rpc
::
twoparty
::
Side
::
SERVER
),
maxFdsPerMes
s
age
,
rpc
::
twoparty
::
Side
::
SERVER
),
rpcSystem
(
makeRpcServer
(
network
,
kj
::
mv
(
bootstrapInterface
)))
{}
rpcSystem
(
makeRpcServer
(
network
,
kj
::
mv
(
bootstrapInterface
)))
{}
};
};
...
@@ -252,9 +252,10 @@ void TwoPartyServer::accept(kj::Own<kj::AsyncIoStream>&& connection) {
...
@@ -252,9 +252,10 @@ void TwoPartyServer::accept(kj::Own<kj::AsyncIoStream>&& connection) {
tasks
.
add
(
promise
.
attach
(
kj
::
mv
(
connectionState
)));
tasks
.
add
(
promise
.
attach
(
kj
::
mv
(
connectionState
)));
}
}
void
TwoPartyServer
::
accept
(
kj
::
Own
<
kj
::
AsyncCapabilityStream
>&&
connection
,
uint
maxFdsPerMesage
)
{
void
TwoPartyServer
::
accept
(
kj
::
Own
<
kj
::
AsyncCapabilityStream
>&&
connection
,
uint
maxFdsPerMessage
)
{
auto
connectionState
=
kj
::
heap
<
AcceptedConnection
>
(
auto
connectionState
=
kj
::
heap
<
AcceptedConnection
>
(
bootstrapInterface
,
kj
::
mv
(
connection
),
maxFdsPerMesage
);
bootstrapInterface
,
kj
::
mv
(
connection
),
maxFdsPerMes
s
age
);
// Run the connection until disconnect.
// Run the connection until disconnect.
auto
promise
=
connectionState
->
network
.
onDisconnect
();
auto
promise
=
connectionState
->
network
.
onDisconnect
();
...
@@ -270,11 +271,11 @@ kj::Promise<void> TwoPartyServer::listen(kj::ConnectionReceiver& listener) {
...
@@ -270,11 +271,11 @@ kj::Promise<void> TwoPartyServer::listen(kj::ConnectionReceiver& listener) {
}
}
kj
::
Promise
<
void
>
TwoPartyServer
::
listenCapStreamReceiver
(
kj
::
Promise
<
void
>
TwoPartyServer
::
listenCapStreamReceiver
(
kj
::
ConnectionReceiver
&
listener
,
uint
maxFdsPerMesage
)
{
kj
::
ConnectionReceiver
&
listener
,
uint
maxFdsPerMes
s
age
)
{
return
listener
.
accept
()
return
listener
.
accept
()
.
then
([
this
,
&
listener
,
maxFdsPerMesage
](
kj
::
Own
<
kj
::
AsyncIoStream
>&&
connection
)
mutable
{
.
then
([
this
,
&
listener
,
maxFdsPerMes
s
age
](
kj
::
Own
<
kj
::
AsyncIoStream
>&&
connection
)
mutable
{
accept
(
connection
.
downcast
<
kj
::
AsyncCapabilityStream
>
(),
maxFdsPerMesage
);
accept
(
connection
.
downcast
<
kj
::
AsyncCapabilityStream
>
(),
maxFdsPerMes
s
age
);
return
listenCapStreamReceiver
(
listener
,
maxFdsPerMesage
);
return
listenCapStreamReceiver
(
listener
,
maxFdsPerMes
s
age
);
});
});
}
}
...
@@ -287,8 +288,8 @@ TwoPartyClient::TwoPartyClient(kj::AsyncIoStream& connection)
...
@@ -287,8 +288,8 @@ TwoPartyClient::TwoPartyClient(kj::AsyncIoStream& connection)
rpcSystem
(
makeRpcClient
(
network
))
{}
rpcSystem
(
makeRpcClient
(
network
))
{}
TwoPartyClient
::
TwoPartyClient
(
kj
::
AsyncCapabilityStream
&
connection
,
uint
maxFdsPerMesage
)
TwoPartyClient
::
TwoPartyClient
(
kj
::
AsyncCapabilityStream
&
connection
,
uint
maxFdsPerMes
s
age
)
:
network
(
connection
,
maxFdsPerMesage
,
rpc
::
twoparty
::
Side
::
CLIENT
),
:
network
(
connection
,
maxFdsPerMes
s
age
,
rpc
::
twoparty
::
Side
::
CLIENT
),
rpcSystem
(
makeRpcClient
(
network
))
{}
rpcSystem
(
makeRpcClient
(
network
))
{}
TwoPartyClient
::
TwoPartyClient
(
kj
::
AsyncIoStream
&
connection
,
TwoPartyClient
::
TwoPartyClient
(
kj
::
AsyncIoStream
&
connection
,
...
@@ -297,10 +298,10 @@ TwoPartyClient::TwoPartyClient(kj::AsyncIoStream& connection,
...
@@ -297,10 +298,10 @@ TwoPartyClient::TwoPartyClient(kj::AsyncIoStream& connection,
:
network
(
connection
,
side
),
:
network
(
connection
,
side
),
rpcSystem
(
network
,
bootstrapInterface
)
{}
rpcSystem
(
network
,
bootstrapInterface
)
{}
TwoPartyClient
::
TwoPartyClient
(
kj
::
AsyncCapabilityStream
&
connection
,
uint
maxFdsPerMesage
,
TwoPartyClient
::
TwoPartyClient
(
kj
::
AsyncCapabilityStream
&
connection
,
uint
maxFdsPerMes
s
age
,
Capability
::
Client
bootstrapInterface
,
Capability
::
Client
bootstrapInterface
,
rpc
::
twoparty
::
Side
side
)
rpc
::
twoparty
::
Side
side
)
:
network
(
connection
,
maxFdsPerMesage
,
side
),
:
network
(
connection
,
maxFdsPerMes
s
age
,
side
),
rpcSystem
(
network
,
bootstrapInterface
)
{}
rpcSystem
(
network
,
bootstrapInterface
)
{}
Capability
::
Client
TwoPartyClient
::
bootstrap
()
{
Capability
::
Client
TwoPartyClient
::
bootstrap
()
{
...
...
c++/src/capnp/rpc-twoparty.h
View file @
d690ae52
...
@@ -54,11 +54,11 @@ class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
...
@@ -54,11 +54,11 @@ class TwoPartyVatNetwork: public TwoPartyVatNetworkBase,
public
:
public
:
TwoPartyVatNetwork
(
kj
::
AsyncIoStream
&
stream
,
rpc
::
twoparty
::
Side
side
,
TwoPartyVatNetwork
(
kj
::
AsyncIoStream
&
stream
,
rpc
::
twoparty
::
Side
side
,
ReaderOptions
receiveOptions
=
ReaderOptions
());
ReaderOptions
receiveOptions
=
ReaderOptions
());
TwoPartyVatNetwork
(
kj
::
AsyncCapabilityStream
&
stream
,
uint
maxFdsPerMesage
,
TwoPartyVatNetwork
(
kj
::
AsyncCapabilityStream
&
stream
,
uint
maxFdsPerMes
s
age
,
rpc
::
twoparty
::
Side
side
,
ReaderOptions
receiveOptions
=
ReaderOptions
());
rpc
::
twoparty
::
Side
side
,
ReaderOptions
receiveOptions
=
ReaderOptions
());
// To support FD passing, pass an AsyncCapabilityStream and `maxFdsPerMesage`, which specifies
// To support FD passing, pass an AsyncCapabilityStream and `maxFdsPerMes
s
age`, which specifies
// the maximum number of file descriptors to accept from the peer in any one RPC message. It is
// the maximum number of file descriptors to accept from the peer in any one RPC message. It is
// important to keep maxFdsPerMesage low in order to stop DoS attacks that fill up your FD table.
// important to keep maxFdsPerMes
s
age low in order to stop DoS attacks that fill up your FD table.
//
//
// Note that this limit applies only to incoming messages; outgoing messages are allowed to have
// Note that this limit applies only to incoming messages; outgoing messages are allowed to have
// more FDs. Sometimes it makes sense to enforce a limit of zero in one direction while having
// more FDs. Sometimes it makes sense to enforce a limit of zero in one direction while having
...
@@ -85,7 +85,7 @@ private:
...
@@ -85,7 +85,7 @@ private:
class
IncomingMessageImpl
;
class
IncomingMessageImpl
;
kj
::
OneOf
<
kj
::
AsyncIoStream
*
,
kj
::
AsyncCapabilityStream
*>
stream
;
kj
::
OneOf
<
kj
::
AsyncIoStream
*
,
kj
::
AsyncCapabilityStream
*>
stream
;
uint
maxFdsPerMesage
;
uint
maxFdsPerMes
s
age
;
rpc
::
twoparty
::
Side
side
;
rpc
::
twoparty
::
Side
side
;
MallocMessageBuilder
peerVatId
;
MallocMessageBuilder
peerVatId
;
ReaderOptions
receiveOptions
;
ReaderOptions
receiveOptions
;
...
@@ -135,7 +135,7 @@ public:
...
@@ -135,7 +135,7 @@ public:
explicit
TwoPartyServer
(
Capability
::
Client
bootstrapInterface
);
explicit
TwoPartyServer
(
Capability
::
Client
bootstrapInterface
);
void
accept
(
kj
::
Own
<
kj
::
AsyncIoStream
>&&
connection
);
void
accept
(
kj
::
Own
<
kj
::
AsyncIoStream
>&&
connection
);
void
accept
(
kj
::
Own
<
kj
::
AsyncCapabilityStream
>&&
connection
,
uint
maxFdsPerMesage
);
void
accept
(
kj
::
Own
<
kj
::
AsyncCapabilityStream
>&&
connection
,
uint
maxFdsPerMes
s
age
);
// Accepts the connection for servicing.
// Accepts the connection for servicing.
kj
::
Promise
<
void
>
listen
(
kj
::
ConnectionReceiver
&
listener
);
kj
::
Promise
<
void
>
listen
(
kj
::
ConnectionReceiver
&
listener
);
...
@@ -143,7 +143,8 @@ public:
...
@@ -143,7 +143,8 @@ public:
// exception is thrown while trying to accept. You may discard the returned promise to cancel
// exception is thrown while trying to accept. You may discard the returned promise to cancel
// listening.
// listening.
kj
::
Promise
<
void
>
listenCapStreamReceiver
(
kj
::
ConnectionReceiver
&
listener
,
uint
maxFdsPerMesage
);
kj
::
Promise
<
void
>
listenCapStreamReceiver
(
kj
::
ConnectionReceiver
&
listener
,
uint
maxFdsPerMessage
);
// Listen with support for FD transfers. `listener.accept()` must return instances of
// Listen with support for FD transfers. `listener.accept()` must return instances of
// AsyncCapabilityStream, otherwise this will crash.
// AsyncCapabilityStream, otherwise this will crash.
...
@@ -161,10 +162,10 @@ class TwoPartyClient {
...
@@ -161,10 +162,10 @@ class TwoPartyClient {
public
:
public
:
explicit
TwoPartyClient
(
kj
::
AsyncIoStream
&
connection
);
explicit
TwoPartyClient
(
kj
::
AsyncIoStream
&
connection
);
explicit
TwoPartyClient
(
kj
::
AsyncCapabilityStream
&
connection
,
uint
maxFdsPerMesage
);
explicit
TwoPartyClient
(
kj
::
AsyncCapabilityStream
&
connection
,
uint
maxFdsPerMes
s
age
);
TwoPartyClient
(
kj
::
AsyncIoStream
&
connection
,
Capability
::
Client
bootstrapInterface
,
TwoPartyClient
(
kj
::
AsyncIoStream
&
connection
,
Capability
::
Client
bootstrapInterface
,
rpc
::
twoparty
::
Side
side
=
rpc
::
twoparty
::
Side
::
CLIENT
);
rpc
::
twoparty
::
Side
side
=
rpc
::
twoparty
::
Side
::
CLIENT
);
TwoPartyClient
(
kj
::
AsyncCapabilityStream
&
connection
,
uint
maxFdsPerMesage
,
TwoPartyClient
(
kj
::
AsyncCapabilityStream
&
connection
,
uint
maxFdsPerMes
s
age
,
Capability
::
Client
bootstrapInterface
,
Capability
::
Client
bootstrapInterface
,
rpc
::
twoparty
::
Side
side
=
rpc
::
twoparty
::
Side
::
CLIENT
);
rpc
::
twoparty
::
Side
side
=
rpc
::
twoparty
::
Side
::
CLIENT
);
...
...
c++/src/kj/async-io.h
View file @
d690ae52
...
@@ -181,7 +181,7 @@ public:
...
@@ -181,7 +181,7 @@ public:
Promise
<
void
>
writeWithFds
(
ArrayPtr
<
const
byte
>
data
,
Promise
<
void
>
writeWithFds
(
ArrayPtr
<
const
byte
>
data
,
ArrayPtr
<
const
ArrayPtr
<
const
byte
>>
moreData
,
ArrayPtr
<
const
ArrayPtr
<
const
byte
>>
moreData
,
ArrayPtr
<
const
AutoCloseFd
>
fds
);
ArrayPtr
<
const
AutoCloseFd
>
fds
);
// Write some data to the stream with some file desc
ir
ptors attached to it.
// Write some data to the stream with some file desc
ri
ptors attached to it.
//
//
// The maximum number of FDs that can be sent at a time is usually subject to an OS-imposed
// The maximum number of FDs that can be sent at a time is usually subject to an OS-imposed
// limit. On Linux, this is 253. In practice, sending more than a handful of FDs at once is
// limit. On Linux, this is 253. In practice, sending more than a handful of FDs at once is
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment