Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
C
capnproto
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
capnproto
Commits
ecab2520
Commit
ecab2520
authored
Nov 07, 2013
by
Kenton Varda
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Add code for serializing messages to/parsing from asynchronous streams.
parent
9bb74859
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
550 additions
and
4 deletions
+550
-4
serialize-async-test.c++
c++/src/capnp/serialize-async-test.c++
+253
-0
serialize-async.c++
c++/src/capnp/serialize-async.c++
+182
-0
serialize-async.h
c++/src/capnp/serialize-async.h
+57
-0
async-io.c++
c++/src/kj/async-io.c++
+31
-3
async-io.h
c++/src/kj/async-io.h
+27
-1
No files found.
c++/src/capnp/serialize-async-test.c++
0 → 100644
View file @
ecab2520
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "serialize-async.h"
#include "serialize.h"
#include <kj/debug.h>
#include <kj/thread.h>
#include <kj/async-unix.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include "test-util.h"
#include <gtest/gtest.h>
namespace
capnp
{
namespace
_
{
// private
namespace
{
class
FragmentingOutputStream
:
public
kj
::
OutputStream
{
public
:
FragmentingOutputStream
(
kj
::
OutputStream
&
inner
)
:
inner
(
inner
)
{}
void
write
(
const
void
*
buffer
,
size_t
size
)
override
{
while
(
size
>
0
)
{
size_t
n
=
rand
()
%
size
+
1
;
inner
.
write
(
buffer
,
n
);
usleep
(
5
);
buffer
=
reinterpret_cast
<
const
byte
*>
(
buffer
)
+
n
;
size
-=
n
;
}
}
private
:
kj
::
OutputStream
&
inner
;
};
class
TestMessageBuilder
:
public
MallocMessageBuilder
{
// A MessageBuilder that tries to allocate an exact number of total segments, by allocating
// minimum-size segments until it reaches the number, then allocating one large segment to
// finish.
public
:
explicit
TestMessageBuilder
(
uint
desiredSegmentCount
)
:
MallocMessageBuilder
(
0
,
AllocationStrategy
::
FIXED_SIZE
),
desiredSegmentCount
(
desiredSegmentCount
)
{}
~
TestMessageBuilder
()
{
EXPECT_EQ
(
0u
,
desiredSegmentCount
);
}
kj
::
ArrayPtr
<
word
>
allocateSegment
(
uint
minimumSize
)
override
{
if
(
desiredSegmentCount
<=
1
)
{
if
(
desiredSegmentCount
<
1
)
{
ADD_FAILURE
()
<<
"Allocated more segments than desired."
;
}
else
{
--
desiredSegmentCount
;
}
return
MallocMessageBuilder
::
allocateSegment
(
8192
);
}
else
{
--
desiredSegmentCount
;
return
MallocMessageBuilder
::
allocateSegment
(
minimumSize
);
}
}
private
:
uint
desiredSegmentCount
;
};
class
SerializeAsyncTest
:
public
testing
::
Test
{
protected
:
int
fds
[
2
];
SerializeAsyncTest
()
{
KJ_SYSCALL
(
pipe
(
fds
));
#ifdef F_SETPIPE_SZ
// Force pipe to be small, to test what happens when the write buffer fills up. Note that
// Linux rounds this up to the page size, so we'll still need a large message to hit the limit.
KJ_SYSCALL
(
fcntl
(
fds
[
1
],
F_SETPIPE_SZ
,
(
int
)
64
));
#endif
}
~
SerializeAsyncTest
()
{
close
(
fds
[
0
]);
close
(
fds
[
1
]);
}
};
TEST_F
(
SerializeAsyncTest
,
ParseAsync
)
{
kj
::
UnixEventLoop
loop
;
auto
input
=
kj
::
AsyncInputStream
::
wrapFd
(
fds
[
0
]);
kj
::
FdOutputStream
rawOutput
(
fds
[
1
]);
FragmentingOutputStream
output
(
rawOutput
);
TestMessageBuilder
message
(
1
);
initTestMessage
(
message
.
getRoot
<
TestAllTypes
>
());
auto
promise
=
loop
.
evalLater
([
&
]()
{
return
readMessage
(
*
input
);
});
kj
::
Thread
thread
([
&
]()
{
writeMessage
(
output
,
message
);
});
auto
received
=
loop
.
wait
(
kj
::
mv
(
promise
));
checkTestMessage
(
received
->
getRoot
<
TestAllTypes
>
());
}
TEST_F
(
SerializeAsyncTest
,
ParseAsyncOddSegmentCount
)
{
kj
::
UnixEventLoop
loop
;
auto
input
=
kj
::
AsyncInputStream
::
wrapFd
(
fds
[
0
]);
kj
::
FdOutputStream
rawOutput
(
fds
[
1
]);
FragmentingOutputStream
output
(
rawOutput
);
TestMessageBuilder
message
(
7
);
initTestMessage
(
message
.
getRoot
<
TestAllTypes
>
());
auto
promise
=
loop
.
evalLater
([
&
]()
{
return
readMessage
(
*
input
);
});
kj
::
Thread
thread
([
&
]()
{
writeMessage
(
output
,
message
);
});
auto
received
=
loop
.
wait
(
kj
::
mv
(
promise
));
checkTestMessage
(
received
->
getRoot
<
TestAllTypes
>
());
}
TEST_F
(
SerializeAsyncTest
,
ParseAsyncEvenSegmentCount
)
{
kj
::
UnixEventLoop
loop
;
auto
input
=
kj
::
AsyncInputStream
::
wrapFd
(
fds
[
0
]);
kj
::
FdOutputStream
rawOutput
(
fds
[
1
]);
FragmentingOutputStream
output
(
rawOutput
);
TestMessageBuilder
message
(
10
);
initTestMessage
(
message
.
getRoot
<
TestAllTypes
>
());
auto
promise
=
loop
.
evalLater
([
&
]()
{
return
readMessage
(
*
input
);
});
kj
::
Thread
thread
([
&
]()
{
writeMessage
(
output
,
message
);
});
auto
received
=
loop
.
wait
(
kj
::
mv
(
promise
));
checkTestMessage
(
received
->
getRoot
<
TestAllTypes
>
());
}
TEST_F
(
SerializeAsyncTest
,
WriteAsync
)
{
kj
::
UnixEventLoop
loop
;
auto
output
=
kj
::
AsyncOutputStream
::
wrapFd
(
fds
[
1
]);
TestMessageBuilder
message
(
1
);
auto
root
=
message
.
getRoot
<
TestAllTypes
>
();
auto
list
=
root
.
initStructList
(
16
);
for
(
auto
element
:
list
)
{
initTestMessage
(
element
);
}
kj
::
Thread
thread
([
&
]()
{
StreamFdMessageReader
reader
(
fds
[
0
]);
auto
listReader
=
reader
.
getRoot
<
TestAllTypes
>
().
getStructList
();
EXPECT_EQ
(
list
.
size
(),
listReader
.
size
());
for
(
auto
element
:
listReader
)
{
checkTestMessage
(
element
);
}
});
loop
.
wait
(
loop
.
evalLater
([
&
]()
{
return
writeMessage
(
*
output
,
message
);
}));
}
TEST_F
(
SerializeAsyncTest
,
WriteAsyncOddSegmentCount
)
{
kj
::
UnixEventLoop
loop
;
auto
output
=
kj
::
AsyncOutputStream
::
wrapFd
(
fds
[
1
]);
TestMessageBuilder
message
(
7
);
auto
root
=
message
.
getRoot
<
TestAllTypes
>
();
auto
list
=
root
.
initStructList
(
16
);
for
(
auto
element
:
list
)
{
initTestMessage
(
element
);
}
kj
::
Thread
thread
([
&
]()
{
StreamFdMessageReader
reader
(
fds
[
0
]);
auto
listReader
=
reader
.
getRoot
<
TestAllTypes
>
().
getStructList
();
EXPECT_EQ
(
list
.
size
(),
listReader
.
size
());
for
(
auto
element
:
listReader
)
{
checkTestMessage
(
element
);
}
});
loop
.
wait
(
loop
.
evalLater
([
&
]()
{
return
writeMessage
(
*
output
,
message
);
}));
}
TEST_F
(
SerializeAsyncTest
,
WriteAsyncEvenSegmentCount
)
{
kj
::
UnixEventLoop
loop
;
auto
output
=
kj
::
AsyncOutputStream
::
wrapFd
(
fds
[
1
]);
TestMessageBuilder
message
(
10
);
auto
root
=
message
.
getRoot
<
TestAllTypes
>
();
auto
list
=
root
.
initStructList
(
16
);
for
(
auto
element
:
list
)
{
initTestMessage
(
element
);
}
kj
::
Thread
thread
([
&
]()
{
StreamFdMessageReader
reader
(
fds
[
0
]);
auto
listReader
=
reader
.
getRoot
<
TestAllTypes
>
().
getStructList
();
EXPECT_EQ
(
list
.
size
(),
listReader
.
size
());
for
(
auto
element
:
listReader
)
{
checkTestMessage
(
element
);
}
});
loop
.
wait
(
loop
.
evalLater
([
&
]()
{
return
writeMessage
(
*
output
,
message
);
}));
}
}
// namespace
}
// namespace _ (private)
}
// namespace capnp
c++/src/capnp/serialize-async.c++
0 → 100644
View file @
ecab2520
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "serialize-async.h"
#include <kj/debug.h>
namespace
capnp
{
namespace
{
class
AsyncMessageReader
:
public
MessageReader
{
public
:
inline
AsyncMessageReader
(
ReaderOptions
options
)
:
MessageReader
(
options
)
{}
~
AsyncMessageReader
()
noexcept
(
false
)
{}
kj
::
Promise
<
void
>
read
(
kj
::
AsyncInputStream
&
inputStream
,
kj
::
ArrayPtr
<
word
>
scratchSpace
);
// implements MessageReader ----------------------------------------
kj
::
ArrayPtr
<
const
word
>
getSegment
(
uint
id
)
override
{
if
(
id
>=
segmentCount
())
{
return
nullptr
;
}
else
{
uint32_t
size
=
id
==
0
?
segment0Size
()
:
moreSizes
[
id
-
1
].
get
();
return
kj
::
arrayPtr
(
segmentStarts
[
id
],
size
);
}
}
private
:
_
::
WireValue
<
uint32_t
>
firstWord
[
2
];
kj
::
Array
<
_
::
WireValue
<
uint32_t
>>
moreSizes
;
kj
::
Array
<
const
word
*>
segmentStarts
;
kj
::
Array
<
word
>
ownedSpace
;
// Only if scratchSpace wasn't big enough.
inline
uint
segmentCount
()
{
return
firstWord
[
0
].
get
()
+
1
;
}
inline
uint
segment0Size
()
{
return
firstWord
[
1
].
get
();
}
};
kj
::
Promise
<
void
>
AsyncMessageReader
::
read
(
kj
::
AsyncInputStream
&
inputStream
,
kj
::
ArrayPtr
<
word
>
scratchSpace
)
{
return
inputStream
.
read
(
firstWord
,
sizeof
(
firstWord
))
.
then
([
this
,
&
inputStream
]()
->
kj
::
Promise
<
void
>
{
if
(
segmentCount
()
==
0
)
{
firstWord
[
1
].
set
(
0
);
}
// Reject messages with too many segments for security reasons.
KJ_REQUIRE
(
segmentCount
()
<
512
,
"Message has too many segments."
)
{
return
kj
::
READY_NOW
;
// exception will be propagated
}
if
(
segmentCount
()
>
1
)
{
// Read sizes for all segments except the first. Include padding if necessary.
moreSizes
=
kj
::
heapArray
<
_
::
WireValue
<
uint32_t
>>
(
segmentCount
()
&
~
1
);
return
inputStream
.
read
(
moreSizes
.
begin
(),
moreSizes
.
size
()
*
sizeof
(
moreSizes
[
0
]));
}
else
{
return
kj
::
READY_NOW
;
}
}).
then
([
this
,
&
inputStream
,
scratchSpace
]()
mutable
->
kj
::
Promise
<
void
>
{
size_t
totalWords
=
segment0Size
();
if
(
segmentCount
()
>
1
)
{
for
(
uint
i
=
0
;
i
<
segmentCount
()
-
1
;
i
++
)
{
totalWords
+=
moreSizes
[
i
].
get
();
}
}
// Don't accept a message which the receiver couldn't possibly traverse without hitting the
// traversal limit. Without this check, a malicious client could transmit a very large segment
// size to make the receiver allocate excessive space and possibly crash.
KJ_REQUIRE
(
totalWords
<=
getOptions
().
traversalLimitInWords
,
"Message is too large. To increase the limit on the receiving end, see "
"capnp::ReaderOptions."
)
{
return
kj
::
READY_NOW
;
// exception will be propagated
}
if
(
scratchSpace
.
size
()
<
totalWords
)
{
// TODO(perf): Consider allocating each segment as a separate chunk to reduce memory
// fragmentation.
ownedSpace
=
kj
::
heapArray
<
word
>
(
totalWords
);
scratchSpace
=
ownedSpace
;
}
segmentStarts
=
kj
::
heapArray
<
const
word
*>
(
segmentCount
());
segmentStarts
[
0
]
=
scratchSpace
.
begin
();
if
(
segmentCount
()
>
1
)
{
size_t
offset
=
segment0Size
();
for
(
uint
i
=
1
;
i
<
segmentCount
();
i
++
)
{
segmentStarts
[
i
]
=
scratchSpace
.
begin
()
+
offset
;
offset
+=
moreSizes
[
i
-
1
].
get
();
}
}
return
inputStream
.
read
(
scratchSpace
.
begin
(),
totalWords
*
sizeof
(
word
));
});
}
}
// namespace
kj
::
Promise
<
kj
::
Own
<
MessageReader
>>
readMessage
(
kj
::
AsyncInputStream
&
input
,
ReaderOptions
options
,
kj
::
ArrayPtr
<
word
>
scratchSpace
)
{
auto
reader
=
kj
::
heap
<
AsyncMessageReader
>
(
options
);
auto
promise
=
reader
->
read
(
input
,
scratchSpace
);
return
promise
.
then
(
kj
::
mvCapture
(
reader
,
[](
kj
::
Own
<
MessageReader
>&&
reader
)
{
return
kj
::
mv
(
reader
);
}));
}
// =======================================================================================
namespace
{
struct
WriteArrays
{
// Holds arrays that must remain valid until a write completes.
kj
::
Array
<
_
::
WireValue
<
uint32_t
>>
table
;
kj
::
Array
<
kj
::
ArrayPtr
<
const
byte
>>
pieces
;
};
}
// namespace
kj
::
Promise
<
void
>
writeMessage
(
kj
::
AsyncOutputStream
&
output
,
kj
::
ArrayPtr
<
const
kj
::
ArrayPtr
<
const
word
>>
segments
)
{
KJ_REQUIRE
(
segments
.
size
()
>
0
,
"Tried to serialize uninitialized message."
);
WriteArrays
arrays
;
arrays
.
table
=
kj
::
heapArray
<
_
::
WireValue
<
uint32_t
>>
((
segments
.
size
()
+
2
)
&
~
size_t
(
1
));
// We write the segment count - 1 because this makes the first word zero for single-segment
// messages, improving compression. We don't bother doing this with segment sizes because
// one-word segments are rare anyway.
arrays
.
table
[
0
].
set
(
segments
.
size
()
-
1
);
for
(
uint
i
=
0
;
i
<
segments
.
size
();
i
++
)
{
arrays
.
table
[
i
+
1
].
set
(
segments
[
i
].
size
());
}
if
(
segments
.
size
()
%
2
==
0
)
{
// Set padding byte.
arrays
.
table
[
segments
.
size
()
+
1
].
set
(
0
);
}
arrays
.
pieces
=
kj
::
heapArray
<
kj
::
ArrayPtr
<
const
byte
>>
(
segments
.
size
()
+
1
);
arrays
.
pieces
[
0
]
=
kj
::
arrayPtr
(
reinterpret_cast
<
byte
*>
(
arrays
.
table
.
begin
()),
arrays
.
table
.
size
()
*
sizeof
(
arrays
.
table
[
0
]));
for
(
uint
i
=
0
;
i
<
segments
.
size
();
i
++
)
{
arrays
.
pieces
[
i
+
1
]
=
kj
::
arrayPtr
(
reinterpret_cast
<
const
byte
*>
(
segments
[
i
].
begin
()),
reinterpret_cast
<
const
byte
*>
(
segments
[
i
].
end
()));
}
auto
promise
=
output
.
write
(
arrays
.
pieces
);
// Make sure the arrays aren't freed until the write completes.
return
promise
.
then
(
kj
::
mvCapture
(
arrays
,
[](
WriteArrays
&&
)
{}));
}
}
// namespace capnp
c++/src/capnp/serialize-async.h
0 → 100644
View file @
ecab2520
// Copyright (c) 2013, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef CAPNP_SERIALIZE_ASYNC_H_
#define CAPNP_SERIALIZE_ASYNC_H_
#include <kj/async-io.h>
#include "message.h"
namespace
capnp
{
kj
::
Promise
<
kj
::
Own
<
MessageReader
>>
readMessage
(
kj
::
AsyncInputStream
&
input
,
ReaderOptions
options
=
ReaderOptions
(),
kj
::
ArrayPtr
<
word
>
scratchSpace
=
nullptr
);
// Read a message asynchronously.
//
// `input` must remain valid until the returned promise resolves (or is canceled).
//
// `scratchSpace`, if provided, must remain valid until the returned MessageReader is destroyed.
kj
::
Promise
<
void
>
writeMessage
(
kj
::
AsyncOutputStream
&
output
,
kj
::
ArrayPtr
<
const
kj
::
ArrayPtr
<
const
word
>>
segments
)
KJ_WARN_UNUSED_RESULT
;
kj
::
Promise
<
void
>
writeMessage
(
kj
::
AsyncOutputStream
&
output
,
MessageBuilder
&
builder
)
KJ_WARN_UNUSED_RESULT
;
// Write asynchronously. The parameters must remain valid until the returned promise resolves.
// =======================================================================================
// inline implementation details
inline
kj
::
Promise
<
void
>
writeMessage
(
kj
::
AsyncOutputStream
&
output
,
MessageBuilder
&
builder
)
{
return
writeMessage
(
output
,
builder
.
getSegmentsForOutput
());
}
}
// namespace capnp
#endif // CAPNP_SERIALIZE_ASYNC_H_
c++/src/kj/async-io.c++
View file @
ecab2520
...
...
@@ -50,6 +50,14 @@ UnixEventLoop& eventLoop() {
return
downcast
<
UnixEventLoop
>
(
EventLoop
::
current
());
}
void
setNonblocking
(
int
fd
)
{
int
flags
;
KJ_SYSCALL
(
flags
=
fcntl
(
fd
,
F_GETFL
));
if
((
flags
&
O_NONBLOCK
)
==
0
)
{
KJ_SYSCALL
(
fcntl
(
fd
,
F_SETFL
,
flags
|
O_NONBLOCK
));
}
}
class
OwnedFileDescriptor
{
public
:
OwnedFileDescriptor
(
int
fd
)
:
fd
(
fd
)
{
...
...
@@ -60,8 +68,8 @@ public:
KJ_DREQUIRE
(
fcntl
(
fd
,
F_GETFL
)
&
O_NONBLOCK
,
"You forgot to set NONBLOCK."
);
#else
// On non-Linux, we have to set the flags non-atomically.
fcntl
(
newFd
,
F_SETFD
,
fcntl
(
newF
d
,
F_GETFD
)
|
FD_CLOEXEC
);
fcntl
(
newFd
,
F_SETFL
,
fcntl
(
newF
d
,
F_GETFL
)
|
O_NONBLOCK
);
fcntl
(
fd
,
F_SETFD
,
fcntl
(
f
d
,
F_GETFD
)
|
FD_CLOEXEC
);
fcntl
(
fd
,
F_SETFL
,
fcntl
(
f
d
,
F_GETFL
)
|
O_NONBLOCK
);
#endif
}
...
...
@@ -84,6 +92,7 @@ protected:
class
AsyncStreamFd
:
public
AsyncIoStream
{
public
:
AsyncStreamFd
(
int
readFd
,
int
writeFd
)
:
readFd
(
readFd
),
writeFd
(
writeFd
)
{}
virtual
~
AsyncStreamFd
()
noexcept
(
false
)
{}
Promise
<
size_t
>
read
(
void
*
buffer
,
size_t
minBytes
,
size_t
maxBytes
)
override
{
return
tryReadInternal
(
buffer
,
minBytes
,
maxBytes
,
0
).
then
([
=
](
size_t
result
)
{
...
...
@@ -214,7 +223,7 @@ private:
});
}
else
if
(
morePieces
.
size
()
==
0
)
{
// First piece was fully-consumed and there are no more pieces, so we're done.
KJ_DASSERT
(
n
==
0
);
KJ_DASSERT
(
n
==
firstPiece
.
size
(),
n
);
return
READY_NOW
;
}
else
{
// First piece was fully consumed, so move on to the next piece.
...
...
@@ -621,6 +630,25 @@ private:
}
// namespace
Promise
<
void
>
AsyncInputStream
::
read
(
void
*
buffer
,
size_t
bytes
)
{
return
read
(
buffer
,
bytes
,
bytes
).
thenInAnyThread
([](
size_t
)
{});
}
Own
<
AsyncInputStream
>
AsyncInputStream
::
wrapFd
(
int
fd
)
{
setNonblocking
(
fd
);
return
heap
<
AsyncStreamFd
>
(
fd
,
-
1
);
}
Own
<
AsyncOutputStream
>
AsyncOutputStream
::
wrapFd
(
int
fd
)
{
setNonblocking
(
fd
);
return
heap
<
AsyncStreamFd
>
(
-
1
,
fd
);
}
Own
<
AsyncIoStream
>
AsyncIoStream
::
wrapFd
(
int
fd
)
{
setNonblocking
(
fd
);
return
heap
<
AsyncStreamFd
>
(
fd
,
fd
);
}
OperatingSystem
&
getOperatingSystemSingleton
()
{
static
UnixKernel
os
;
return
os
;
...
...
c++/src/kj/async-io.h
View file @
ecab2520
...
...
@@ -29,26 +29,50 @@
namespace
kj
{
class
AsyncInputStream
{
// Asynchronous equivalent of InputStream (from io.h).
public
:
virtual
Promise
<
size_t
>
read
(
void
*
buffer
,
size_t
minBytes
,
size_t
maxBytes
)
=
0
;
virtual
Promise
<
size_t
>
tryRead
(
void
*
buffer
,
size_t
minBytes
,
size_t
maxBytes
)
=
0
;
Promise
<
size_t
>
read
(
void
*
buffer
,
size_t
bytes
);
Promise
<
void
>
read
(
void
*
buffer
,
size_t
bytes
);
static
Own
<
AsyncInputStream
>
wrapFd
(
int
fd
);
// Create an AsyncInputStream wrapping a file descriptor.
//
// This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already.
};
class
AsyncOutputStream
{
// Asynchronous equivalent of OutputStream (from io.h).
public
:
virtual
Promise
<
void
>
write
(
const
void
*
buffer
,
size_t
size
)
=
0
;
virtual
Promise
<
void
>
write
(
ArrayPtr
<
const
ArrayPtr
<
const
byte
>>
pieces
)
=
0
;
static
Own
<
AsyncOutputStream
>
wrapFd
(
int
fd
);
// Create an AsyncOutputStream wrapping a file descriptor.
//
// This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already.
};
class
AsyncIoStream
:
public
AsyncInputStream
,
public
AsyncOutputStream
{
// A combination input and output stream.
public
:
static
Own
<
AsyncIoStream
>
wrapFd
(
int
fd
);
// Create an AsyncIoStream wrapping a file descriptor.
//
// This will set `fd` to non-blocking mode (i.e. set O_NONBLOCK) if it isn't set already.
};
class
ConnectionReceiver
{
// Represents a server socket listening on a port.
public
:
virtual
Promise
<
Own
<
AsyncIoStream
>>
accept
()
=
0
;
// Accept the next incoming connection.
virtual
uint
getPort
()
=
0
;
// Gets the port number, if applicable (i.e. if listening on IP). This is useful if you didn't
...
...
@@ -60,6 +84,7 @@ class RemoteAddress {
public
:
virtual
Promise
<
Own
<
AsyncIoStream
>>
connect
()
=
0
;
// Make a new connection to this address.
virtual
String
toString
()
=
0
;
// Produce a human-readable string which hopefully can be passed to Network::parseRemoteAddress()
...
...
@@ -73,6 +98,7 @@ class LocalAddress {
public
:
virtual
Own
<
ConnectionReceiver
>
listen
()
=
0
;
// Listen for incoming connections on this address.
virtual
String
toString
()
=
0
;
// Produce a human-readable string which hopefully can be passed to Network::parseRemoteAddress()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment