Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in / Register
Toggle navigation
C
capnproto
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Packages
Packages
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
submodule
capnproto
Commits
000e2a99
Unverified
Commit
000e2a99
authored
Dec 06, 2019
by
Kenton Varda
Committed by
GitHub
Dec 06, 2019
Browse files
Options
Browse Files
Download
Plain Diff
Merge pull request #913 from capnproto/fibers
Implement fibers
parents
935d5216
53d21772
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
552 additions
and
53 deletions
+552
-53
async-inl.h
c++/src/kj/async-inl.h
+119
-13
async-prelude.h
c++/src/kj/async-prelude.h
+3
-10
async-test.c++
c++/src/kj/async-test.c++
+88
-0
async.c++
c++/src/kj/async.c++
+289
-9
async.h
c++/src/kj/async.h
+53
-21
No files found.
c++/src/kj/async-inl.h
View file @
000e2a99
...
...
@@ -194,6 +194,22 @@ public:
// If this node wraps some other PromiseNode, get the wrapped node. Used for debug tracing.
// Default implementation returns nullptr.
template
<
typename
T
>
static
Own
<
PromiseNode
>
from
(
T
&&
promise
)
{
// Given a Promise, extract the PromiseNode.
return
kj
::
mv
(
promise
.
node
);
}
template
<
typename
T
>
static
PromiseNode
&
from
(
T
&
promise
)
{
// Given a Promise, extract the PromiseNode.
return
*
promise
.
node
;
}
template
<
typename
T
>
static
T
to
(
Own
<
PromiseNode
>&&
node
)
{
// Construct a Promise from a PromiseNode. (T should be a Promise type.)
return
T
(
false
,
kj
::
mv
(
node
));
}
protected
:
class
OnReadyEvent
{
// Helper class for implementing onReady().
...
...
@@ -213,6 +229,13 @@ protected:
// -------------------------------------------------------------------
template
<
typename
T
>
inline
NeverDone
::
operator
Promise
<
T
>
()
const
{
return
PromiseNode
::
to
<
Promise
<
T
>>
(
neverDone
());
}
// -------------------------------------------------------------------
class
ImmediatePromiseNodeBase
:
public
PromiseNode
{
public
:
ImmediatePromiseNodeBase
();
...
...
@@ -557,7 +580,7 @@ public:
ForkHub
(
Own
<
PromiseNode
>&&
inner
)
:
ForkHubBase
(
kj
::
mv
(
inner
),
result
)
{}
Promise
<
_
::
UnfixVoid
<
T
>>
addBranch
()
{
return
Promise
<
_
::
UnfixVoid
<
T
>>
(
false
,
kj
::
heap
<
ForkBranch
<
T
>>
(
addRef
(
*
this
)));
return
_
::
PromiseNode
::
to
<
Promise
<
_
::
UnfixVoid
<
T
>>>
(
kj
::
heap
<
ForkBranch
<
T
>>
(
addRef
(
*
this
)));
}
_
::
SplitTuplePromise
<
T
>
split
()
{
...
...
@@ -574,8 +597,8 @@ private:
template
<
size_t
index
>
ReducePromises
<
typename
SplitBranch
<
T
,
index
>::
Element
>
addSplit
()
{
return
ReducePromises
<
typename
SplitBranch
<
T
,
index
>::
Element
>
(
false
,
maybeChain
(
kj
::
heap
<
SplitBranch
<
T
,
index
>>
(
addRef
(
*
this
)),
return
_
::
PromiseNode
::
to
<
ReducePromises
<
typename
SplitBranch
<
T
,
index
>::
Element
>
>
(
maybeChain
(
kj
::
heap
<
SplitBranch
<
T
,
index
>>
(
addRef
(
*
this
)),
implicitCast
<
typename
SplitBranch
<
T
,
index
>::
Element
*>
(
nullptr
)));
}
};
...
...
@@ -848,6 +871,81 @@ private:
}
};
// -------------------------------------------------------------------
class
FiberBase
:
public
PromiseNode
,
private
Event
{
// Base class for the outer PromiseNode representing a fiber.
public
:
FiberBase
(
size_t
stackSize
,
_
::
ExceptionOrValue
&
result
);
~
FiberBase
()
noexcept
(
false
);
void
start
()
{
armDepthFirst
();
}
// Call immediately after construction to begin executing the fiber.
class
WaitDoneEvent
;
void
onReady
(
_
::
Event
*
event
)
noexcept
override
;
PromiseNode
*
getInnerForTrace
()
override
;
protected
:
bool
isFinished
()
{
return
state
==
FINISHED
;
}
private
:
enum
{
WAITING
,
RUNNING
,
CANCELED
,
FINISHED
}
state
;
size_t
stackSize
;
#if _WIN32 || __CYGWIN__
void
*
osFiber
;
#else
struct
Impl
;
Impl
&
impl
;
#endif
_
::
PromiseNode
*
currentInner
=
nullptr
;
OnReadyEvent
onReadyEvent
;
_
::
ExceptionOrValue
&
result
;
void
run
();
virtual
void
runImpl
(
WaitScope
&
waitScope
)
=
0
;
struct
StartRoutine
;
void
switchToFiber
();
void
switchToMain
();
Maybe
<
Own
<
Event
>>
fire
()
override
;
// Implements Event. Each time the event is fired, switchToFiber() is called.
friend
class
WaitScope
;
friend
void
_
::
waitImpl
(
Own
<
_
::
PromiseNode
>&&
node
,
_
::
ExceptionOrValue
&
result
,
WaitScope
&
waitScope
);
friend
bool
_
::
pollImpl
(
_
::
PromiseNode
&
node
,
WaitScope
&
waitScope
);
};
template
<
typename
Func
>
class
Fiber
final
:
public
FiberBase
{
public
:
Fiber
(
size_t
stackSize
,
Func
&&
func
)
:
FiberBase
(
stackSize
,
result
),
func
(
kj
::
fwd
<
Func
>
(
func
))
{}
typedef
FixVoid
<
decltype
(
kj
::
instance
<
Func
&>
()(
kj
::
instance
<
WaitScope
&>
()))
>
ResultType
;
void
get
(
ExceptionOrValue
&
output
)
noexcept
override
{
KJ_IREQUIRE
(
isFinished
());
output
.
as
<
ResultType
>
()
=
kj
::
mv
(
result
);
}
private
:
Func
func
;
ExceptionOr
<
ResultType
>
result
;
void
runImpl
(
WaitScope
&
waitScope
)
override
{
result
.
template
as
<
ResultType
>
()
=
MaybeVoidCaller
<
WaitScope
&
,
ResultType
>::
apply
(
func
,
waitScope
);
}
};
}
// namespace _ (private)
// =======================================================================================
...
...
@@ -868,7 +966,7 @@ PromiseForResult<Func, T> Promise<T>::then(Func&& func, ErrorFunc&& errorHandler
Own
<
_
::
PromiseNode
>
intermediate
=
heap
<
_
::
TransformPromiseNode
<
ResultT
,
_
::
FixVoid
<
T
>
,
Func
,
ErrorFunc
>>
(
kj
::
mv
(
node
),
kj
::
fwd
<
Func
>
(
func
),
kj
::
fwd
<
ErrorFunc
>
(
errorHandler
));
auto
result
=
_
::
ChainPromises
<
_
::
ReturnType
<
Func
,
T
>>
(
false
,
auto
result
=
_
::
PromiseNode
::
to
<
_
::
ChainPromises
<
_
::
ReturnType
<
Func
,
T
>>>
(
_
::
maybeChain
(
kj
::
mv
(
intermediate
),
implicitCast
<
ResultT
*>
(
nullptr
)));
return
_
::
maybeReduce
(
kj
::
mv
(
result
),
false
);
}
...
...
@@ -991,6 +1089,17 @@ inline PromiseForResult<Func, void> evalNow(Func&& func) {
return
result
;
}
template
<
typename
Func
>
inline
PromiseForResult
<
Func
,
WaitScope
&>
startFiber
(
size_t
stackSize
,
Func
&&
func
)
{
typedef
_
::
FixVoid
<
_
::
ReturnType
<
Func
,
WaitScope
&>>
ResultT
;
Own
<
_
::
FiberBase
>
intermediate
=
kj
::
heap
<
_
::
Fiber
<
Func
>>
(
stackSize
,
kj
::
fwd
<
Func
>
(
func
));
intermediate
->
start
();
auto
result
=
_
::
PromiseNode
::
to
<
_
::
ChainPromises
<
_
::
ReturnType
<
Func
,
WaitScope
&>>>
(
_
::
maybeChain
(
kj
::
mv
(
intermediate
),
implicitCast
<
ResultT
*>
(
nullptr
)));
return
_
::
maybeReduce
(
kj
::
mv
(
result
),
false
);
}
template
<
typename
T
>
template
<
typename
ErrorFunc
>
void
Promise
<
T
>::
detach
(
ErrorFunc
&&
errorHandler
)
{
...
...
@@ -1005,8 +1114,8 @@ void Promise<void>::detach(ErrorFunc&& errorHandler) {
template
<
typename
T
>
Promise
<
Array
<
T
>>
joinPromises
(
Array
<
Promise
<
T
>>&&
promises
)
{
return
Promise
<
Array
<
T
>>
(
false
,
kj
::
heap
<
_
::
ArrayJoinPromiseNode
<
T
>>
(
KJ_MAP
(
p
,
promises
)
{
return
kj
::
mv
(
p
.
node
);
},
return
_
::
PromiseNode
::
to
<
Promise
<
Array
<
T
>>>
(
kj
::
heap
<
_
::
ArrayJoinPromiseNode
<
T
>>
(
KJ_MAP
(
p
,
promises
)
{
return
_
::
PromiseNode
::
from
(
kj
::
mv
(
p
)
);
},
heapArray
<
_
::
ExceptionOr
<
T
>>
(
promises
.
size
())));
}
...
...
@@ -1134,7 +1243,7 @@ _::ReducePromises<T> newAdaptedPromise(Params&&... adapterConstructorParams) {
Own
<
_
::
PromiseNode
>
intermediate
(
heap
<
_
::
AdapterPromiseNode
<
_
::
FixVoid
<
T
>
,
Adapter
>>
(
kj
::
fwd
<
Params
>
(
adapterConstructorParams
)...));
return
_
::
ReducePromises
<
T
>
(
false
,
return
_
::
PromiseNode
::
to
<
_
::
ReducePromises
<
T
>>
(
_
::
maybeChain
(
kj
::
mv
(
intermediate
),
implicitCast
<
T
*>
(
nullptr
)));
}
...
...
@@ -1144,7 +1253,7 @@ PromiseFulfillerPair<T> newPromiseAndFulfiller() {
Own
<
_
::
PromiseNode
>
intermediate
(
heap
<
_
::
AdapterPromiseNode
<
_
::
FixVoid
<
T
>
,
_
::
PromiseAndFulfillerAdapter
<
T
>>>
(
*
wrapper
));
_
::
ReducePromises
<
T
>
promise
(
false
,
auto
promise
=
_
::
PromiseNode
::
to
<
_
::
ReducePromises
<
T
>>
(
_
::
maybeChain
(
kj
::
mv
(
intermediate
),
implicitCast
<
T
*>
(
nullptr
)));
return
PromiseFulfillerPair
<
T
>
{
kj
::
mv
(
promise
),
kj
::
mv
(
wrapper
)
};
...
...
@@ -1171,9 +1280,6 @@ protected:
// Run the function. If the function returns a promise, returns the inner PromiseNode, otherwise
// returns null.
template
<
typename
T
>
Own
<
PromiseNode
>
extractNode
(
Promise
<
T
>
promise
)
{
return
kj
::
mv
(
promise
.
node
);
}
// implements PromiseNode ----------------------------------------------------
void
onReady
(
Event
*
event
)
noexcept
override
;
...
...
@@ -1271,7 +1377,7 @@ public:
typedef
_
::
FixVoid
<
_
::
UnwrapPromise
<
PromiseForResult
<
Func
,
void
>>>
ResultT
;
kj
::
Maybe
<
Own
<
_
::
PromiseNode
>>
execute
()
override
{
auto
result
=
extractNode
(
func
());
auto
result
=
_
::
PromiseNode
::
from
(
func
());
KJ_IREQUIRE
(
result
.
get
()
!=
nullptr
);
return
kj
::
mv
(
result
);
}
...
...
@@ -1300,7 +1406,7 @@ template <typename Func>
PromiseForResult
<
Func
,
void
>
Executor
::
executeAsync
(
Func
&&
func
)
const
{
auto
event
=
kj
::
heap
<
_
::
XThreadEventImpl
<
Func
>>
(
kj
::
fwd
<
Func
>
(
func
),
*
this
);
send
(
*
event
,
false
);
return
PromiseForResult
<
Func
,
void
>
(
false
,
kj
::
mv
(
event
));
return
_
::
PromiseNode
::
to
<
PromiseForResult
<
Func
,
void
>>
(
kj
::
mv
(
event
));
}
}
// namespace kj
...
...
c++/src/kj/async-prelude.h
View file @
000e2a99
...
...
@@ -188,6 +188,7 @@ class PromiseNode;
class
ChainPromiseNode
;
template
<
typename
T
>
class
ForkHub
;
class
FiberBase
;
class
Event
;
class
XThreadEvent
;
...
...
@@ -203,15 +204,9 @@ private:
PromiseBase
()
=
default
;
PromiseBase
(
Own
<
PromiseNode
>&&
node
)
:
node
(
kj
::
mv
(
node
))
{}
friend
class
kj
::
EventLoop
;
friend
class
ChainPromiseNode
;
template
<
typename
>
friend
class
kj
::
Promise
;
friend
class
kj
::
TaskSet
;
template
<
typename
U
>
friend
Promise
<
Array
<
U
>>
kj
::
joinPromises
(
Array
<
Promise
<
U
>>&&
promises
);
friend
Promise
<
void
>
kj
::
joinPromises
(
Array
<
Promise
<
void
>>&&
promises
);
friend
class
XThreadEvent
;
friend
class
PromiseNode
;
};
void
detach
(
kj
::
Promise
<
void
>&&
promise
);
...
...
@@ -224,9 +219,7 @@ Own<PromiseNode> neverDone();
class
NeverDone
{
public
:
template
<
typename
T
>
operator
Promise
<
T
>
()
const
{
return
Promise
<
T
>
(
false
,
neverDone
());
}
operator
Promise
<
T
>
()
const
;
KJ_NORETURN
(
void
wait
(
WaitScope
&
waitScope
)
const
);
};
...
...
c++/src/kj/async-test.c++
View file @
000e2a99
...
...
@@ -844,5 +844,93 @@ KJ_TEST("exclusiveJoin both events complete simultaneously") {
KJ_EXPECT
(
!
joined
.
poll
(
waitScope
));
}
KJ_TEST
(
"start a fiber"
)
{
EventLoop
loop
;
WaitScope
waitScope
(
loop
);
auto
paf
=
newPromiseAndFulfiller
<
int
>
();
Promise
<
StringPtr
>
fiber
=
startFiber
(
65536
,
[
promise
=
kj
::
mv
(
paf
.
promise
)](
WaitScope
&
fiberScope
)
mutable
{
int
i
=
promise
.
wait
(
fiberScope
);
KJ_EXPECT
(
i
==
123
);
return
"foo"
_kj
;
});
KJ_EXPECT
(
!
fiber
.
poll
(
waitScope
));
paf
.
fulfiller
->
fulfill
(
123
);
KJ_ASSERT
(
fiber
.
poll
(
waitScope
));
KJ_EXPECT
(
fiber
.
wait
(
waitScope
)
==
"foo"
);
}
KJ_TEST
(
"fiber promise chaining"
)
{
EventLoop
loop
;
WaitScope
waitScope
(
loop
);
auto
paf
=
newPromiseAndFulfiller
<
int
>
();
bool
ran
=
false
;
Promise
<
int
>
fiber
=
startFiber
(
65536
,
[
promise
=
kj
::
mv
(
paf
.
promise
),
&
ran
](
WaitScope
&
fiberScope
)
mutable
{
ran
=
true
;
return
kj
::
mv
(
promise
);
});
KJ_EXPECT
(
!
ran
);
KJ_EXPECT
(
!
fiber
.
poll
(
waitScope
));
KJ_EXPECT
(
ran
);
paf
.
fulfiller
->
fulfill
(
123
);
KJ_ASSERT
(
fiber
.
poll
(
waitScope
));
KJ_EXPECT
(
fiber
.
wait
(
waitScope
)
==
123
);
}
KJ_TEST
(
"throw from a fiber"
)
{
EventLoop
loop
;
WaitScope
waitScope
(
loop
);
auto
paf
=
newPromiseAndFulfiller
<
void
>
();
Promise
<
void
>
fiber
=
startFiber
(
65536
,
[
promise
=
kj
::
mv
(
paf
.
promise
)](
WaitScope
&
fiberScope
)
mutable
{
promise
.
wait
(
fiberScope
);
KJ_FAIL_EXPECT
(
"wait() should have thrown"
);
});
KJ_EXPECT
(
!
fiber
.
poll
(
waitScope
));
paf
.
fulfiller
->
reject
(
KJ_EXCEPTION
(
FAILED
,
"test exception"
));
KJ_ASSERT
(
fiber
.
poll
(
waitScope
));
KJ_EXPECT_THROW_MESSAGE
(
"test exception"
,
fiber
.
wait
(
waitScope
));
}
KJ_TEST
(
"cancel a fiber"
)
{
EventLoop
loop
;
WaitScope
waitScope
(
loop
);
auto
paf
=
newPromiseAndFulfiller
<
int
>
();
bool
exited
=
false
;
{
Promise
<
StringPtr
>
fiber
=
startFiber
(
65536
,
[
promise
=
kj
::
mv
(
paf
.
promise
),
&
exited
](
WaitScope
&
fiberScope
)
mutable
{
KJ_DEFER
(
exited
=
true
);
int
i
=
promise
.
wait
(
fiberScope
);
KJ_EXPECT
(
i
==
123
);
return
"foo"
_kj
;
});
KJ_EXPECT
(
!
fiber
.
poll
(
waitScope
));
KJ_EXPECT
(
!
exited
);
}
KJ_EXPECT
(
exited
);
}
}
// namespace
}
// namespace kj
c++/src/kj/async.c++
View file @
000e2a99
...
...
@@ -19,8 +19,18 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
#if _WIN32
|| __CYGWIN__
#define WIN32_LEAN_AND_MEAN 1 // lolz
#elif __APPLE__
// getcontext() and friends are marked deprecated on MacOS but seemingly no replacement is
// provided. It appears as if they deprecated it solely because the standards bodies deprecated it,
// which they seemingly did mainly because the proper sematics are too difficult for them to
// define. I doubt MacOS would actually remove these functions as they are widely used. But if they
// do, then I guess we'll need to fall back to using setjmp()/longjmp(), and some sort of hack
// involving sigaltstack() (and generating a fake signal I guess) in order to initialize the fiber
// in the first place. Or we could use assembly, I suppose. Either way, ick.
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#define _XOPEN_SOURCE // Must be defined to see getcontext() on MacOS.
#endif
#include "async.h"
...
...
@@ -29,10 +39,17 @@
#include "threadlocal.h"
#include "mutex.h"
#if _WIN32
#include <windows.h> //
just for Sleep(0)
#if _WIN32
|| __CYGWIN__
#include <windows.h> //
for Sleep(0) and fibers
#include "windows-sanity.h"
#else
#include <ucontext.h>
#include <sys/mman.h> // mmap(), for allocating new stacks
#include <unistd.h> // sysconf()
#include <errno.h>
#endif
#if !_WIN32
#include <sched.h> // just for sched_yield()
#endif
...
...
@@ -234,7 +251,7 @@ private:
};
void
TaskSet
::
add
(
Promise
<
void
>&&
promise
)
{
auto
task
=
heap
<
Task
>
(
*
this
,
kj
::
mv
(
promise
.
node
));
auto
task
=
heap
<
Task
>
(
*
this
,
_
::
PromiseNode
::
from
(
kj
::
mv
(
promise
)
));
KJ_IF_MAYBE
(
head
,
tasks
)
{
head
->
get
()
->
prev
=
&
task
->
next
;
task
->
next
=
kj
::
mv
(
tasks
);
...
...
@@ -691,6 +708,220 @@ const Executor& getCurrentThreadExecutor() {
return
currentEventLoop
().
getExecutor
();
}
// =======================================================================================
// Fiber implementation.
namespace
_
{
// private
#if !(_WIN32 || __CYGWIN__)
struct
FiberBase
::
Impl
{
// This struct serves two purposes:
// - It contains OS-specific state that we don't want to declare in the header.
// - It is allocated at the top of the fiber's stack area, so the Impl pointer also serves to
// track where the stack was allocated.
ucontext_t
fiberContext
;
ucontext_t
originalContext
;
static
Impl
&
alloc
(
size_t
stackSize
)
{
// TODO(perf): Freelist stacks to avoid TLB flushes.
#ifndef MAP_ANONYMOUS
#define MAP_ANONYMOUS MAP_ANON
#endif
#ifndef MAP_STACK
#define MAP_STACK 0
#endif
size_t
pageSize
=
getPageSize
();
size_t
allocSize
=
stackSize
+
pageSize
;
// size plus guard page
// Allocate virtual address space for the stack but make it inaccessible initially.
// TODO(someday): Does it make sense to use MAP_GROWSDOWN on Linux? It's a kind of bizarre flag
// that causes the mapping to automatically allocate extra pages (beyond the range specified)
// until it hits something...
void
*
stack
=
mmap
(
nullptr
,
allocSize
,
PROT_NONE
,
MAP_ANONYMOUS
|
MAP_PRIVATE
|
MAP_STACK
,
-
1
,
0
);
if
(
stack
==
MAP_FAILED
)
{
KJ_FAIL_SYSCALL
(
"mmap(new stack)"
,
errno
);
}
KJ_ON_SCOPE_FAILURE
({
KJ_SYSCALL
(
munmap
(
stack
,
allocSize
))
{
break
;
}
});
// Now mark everything except the guard page as read-write. We assume the stack grows down, so
// the guard page is at the beginning. No modern architecture uses stacks that grow up.
KJ_SYSCALL
(
mprotect
(
reinterpret_cast
<
byte
*>
(
stack
)
+
pageSize
,
stackSize
,
PROT_READ
|
PROT_WRITE
));
// Stick `Impl` at the top of the stack.
Impl
&
impl
=
*
(
reinterpret_cast
<
Impl
*>
(
reinterpret_cast
<
byte
*>
(
stack
)
+
allocSize
)
-
1
);
// Note: mmap() allocates zero'd pages so we don't have to memset() anything here.
KJ_SYSCALL
(
getcontext
(
&
impl
.
fiberContext
));
impl
.
fiberContext
.
uc_stack
.
ss_size
=
allocSize
-
sizeof
(
Impl
);
impl
.
fiberContext
.
uc_stack
.
ss_sp
=
reinterpret_cast
<
char
*>
(
stack
);
impl
.
fiberContext
.
uc_stack
.
ss_flags
=
0
;
impl
.
fiberContext
.
uc_link
=
&
impl
.
originalContext
;
return
impl
;
}
static
void
free
(
Impl
&
impl
,
size_t
stackSize
)
{
size_t
allocSize
=
stackSize
+
getPageSize
();
void
*
stack
=
reinterpret_cast
<
byte
*>
(
&
impl
+
1
)
-
allocSize
;
KJ_SYSCALL
(
munmap
(
stack
,
allocSize
))
{
break
;
}
}
static
size_t
getPageSize
()
{
#ifndef _SC_PAGESIZE
#define _SC_PAGESIZE _SC_PAGE_SIZE
#endif
static
size_t
result
=
sysconf
(
_SC_PAGE_SIZE
);
return
result
;
}
};
#endif
struct
FiberBase
::
StartRoutine
{
#if _WIN32 || __CYGWIN__
static
void
WINAPI
run
(
LPVOID
ptr
)
{
// This is the static C-style function we pass to CreateFiber().
auto
&
fiber
=
*
reinterpret_cast
<
FiberBase
*>
(
ptr
);
fiber
.
run
();
// On Windows, if the fiber main function returns, the thread exits. We need to explicitly switch
// back to the main stack.
fiber
.
switchToMain
();
}
#else
static
void
run
(
int
arg1
,
int
arg2
)
{
// This is the static C-style function we pass to makeContext().
// POSIX says the arguments are ints, not pointers. So we split our pointer in half in order to
// work correctly on 64-bit machines. Gross.
uintptr_t
ptr
=
static_cast
<
uint
>
(
arg1
);
ptr
|=
static_cast
<
uintptr_t
>
(
static_cast
<
uint
>
(
arg2
))
<<
(
sizeof
(
ptr
)
*
4
);
reinterpret_cast
<
FiberBase
*>
(
ptr
)
->
run
();
}
#endif
};
FiberBase
::
FiberBase
(
size_t
stackSizeParam
,
_
::
ExceptionOrValue
&
result
)
:
state
(
WAITING
),
// Force stackSize to a reasonable minimum.
stackSize
(
kj
::
max
(
stackSizeParam
,
65536
)),
#if !(_WIN32 || __CYGWIN__)
impl
(
Impl
::
alloc
(
stackSize
)),
#endif
result
(
result
)
{
#if _WIN32 || __CYGWIN__
auto
&
eventLoop
=
currentEventLoop
();
if
(
eventLoop
.
mainFiber
==
nullptr
)
{
// First time we've created a fiber. We need to convert the main stack into a fiber as well
// before we can start switching.
eventLoop
.
mainFiber
=
ConvertThreadToFiber
(
nullptr
);
}
KJ_WIN32
(
osFiber
=
CreateFiber
(
stackSize
,
&
StartRoutine
::
run
,
this
));
#else
// Note: Nothing below here can throw. If that changes then we need to call Impl::free(impl)
// on exceptions...
// POSIX says the arguments are ints, not pointers. So we split our pointer in half in order to
// work correctly on 64-bit machines. Gross.
uintptr_t
ptr
=
reinterpret_cast
<
uintptr_t
>
(
this
);
int
arg1
=
ptr
&
((
uintptr_t
(
1
)
<<
(
sizeof
(
ptr
)
*
4
))
-
1
);
int
arg2
=
ptr
>>
(
sizeof
(
ptr
)
*
4
);
makecontext
(
&
impl
.
fiberContext
,
reinterpret_cast
<
void
(
*
)()
>
(
&
StartRoutine
::
run
),
2
,
arg1
,
arg2
);
#endif
}
FiberBase
::~
FiberBase
()
noexcept
(
false
)
{
#if _WIN32 || __CYGWIN__
KJ_DEFER
(
DeleteFiber
(
osFiber
));
#else
KJ_DEFER
(
Impl
::
free
(
impl
,
stackSize
));
#endif
switch
(
state
)
{
case
WAITING
:
// We can't just free the stack while the fiber is running. We need to force it to execute
// until finished, so we cause it to throw an exception.
state
=
CANCELED
;
switchToFiber
();
// The fiber should only switch back to the main stack on completion, because any further
// calls to wait() would throw before trying to switch.
KJ_ASSERT
(
state
==
FINISHED
);
break
;
case
RUNNING
:
case
CANCELED
:
// Bad news.
KJ_LOG
(
FATAL
,
"fiber tried to destroy itself"
);
::
abort
();
break
;
case
FINISHED
:
// Normal completion, yay.
break
;
}
}
Maybe
<
Own
<
Event
>>
FiberBase
::
fire
()
{
KJ_ASSERT
(
state
==
WAITING
);
state
=
RUNNING
;
switchToFiber
();
return
nullptr
;
}
void
FiberBase
::
switchToFiber
()
{
// Switch from the main stack to the fiber. Returns once the fiber either calls switchToMain()
// or returns from its main function.
#if _WIN32 || __CYGWIN__
SwitchToFiber
(
osFiber
);
#else
KJ_SYSCALL
(
swapcontext
(
&
impl
.
originalContext
,
&
impl
.
fiberContext
));
#endif
}
void
FiberBase
::
switchToMain
()
{
// Switch from the fiber to the main stack. Returns the next time the main stack calls
// switchToFiber().
#if _WIN32 || __CYGWIN__
SwitchToFiber
(
currentEventLoop
().
mainFiber
);
#else
KJ_SYSCALL
(
swapcontext
(
&
impl
.
fiberContext
,
&
impl
.
originalContext
));
#endif
}
void
FiberBase
::
run
()
{
state
=
RUNNING
;
KJ_DEFER
(
state
=
FINISHED
);
WaitScope
waitScope
(
currentEventLoop
(),
*
this
);
KJ_IF_MAYBE
(
exception
,
kj
::
runCatchingExceptions
([
&
]()
{
runImpl
(
waitScope
);
}))
{
result
.
addException
(
kj
::
mv
(
*
exception
));
}
onReadyEvent
.
arm
();
}
void
FiberBase
::
onReady
(
_
::
Event
*
event
)
noexcept
{
onReadyEvent
.
init
(
event
);
}
PromiseNode
*
FiberBase
::
getInnerForTrace
()
{
return
currentInner
;
}
}
// namespace _ (private)
// =======================================================================================
void
EventPort
::
setRunnable
(
bool
runnable
)
{}
...
...
@@ -708,6 +939,15 @@ EventLoop::EventLoop(EventPort& port)
daemons
(
kj
::
heap
<
TaskSet
>
(
_
::
LoggingErrorHandler
::
instance
))
{}
EventLoop
::~
EventLoop
()
noexcept
(
false
)
{
#if _WIN32 || __CYGWIN__
KJ_DEFER
({
if
(
mainFiber
!=
nullptr
)
{
// We converted the thread to a fiber, need to convert it back.
KJ_WIN32
(
ConvertFiberToThread
());
}
});
#endif
// Destroy all "daemon" tasks, noting that their destructors might try to access the EventLoop
// some more.
daemons
=
nullptr
;
...
...
@@ -866,9 +1106,47 @@ void WaitScope::poll() {
namespace
_
{
// private
static
kj
::
Exception
fiberCanceledException
()
{
// Construct the exception to throw from wait() when the fiber has been canceled (because the
// promise returned by startFiber() was dropped before completion).
//
// TODO(someday): Should we have a dedicated exception type for cancellation? Do we even want
// to build stack traces and such for these?
return
KJ_EXCEPTION
(
FAILED
,
"This fiber is being canceled."
);
};
void
waitImpl
(
Own
<
_
::
PromiseNode
>&&
node
,
_
::
ExceptionOrValue
&
result
,
WaitScope
&
waitScope
)
{
EventLoop
&
loop
=
waitScope
.
loop
;
KJ_REQUIRE
(
&
loop
==
threadLocalEventLoop
,
"WaitScope not valid for this thread."
);
KJ_IF_MAYBE
(
fiber
,
waitScope
.
fiber
)
{
if
(
fiber
->
state
==
FiberBase
::
CANCELED
)
{
result
.
addException
(
fiberCanceledException
());
return
;
}
KJ_REQUIRE
(
fiber
->
state
==
FiberBase
::
RUNNING
,
"This WaitScope can only be used within the fiber that created it."
);
node
->
setSelfPointer
(
&
node
);
node
->
onReady
(
fiber
);
fiber
->
currentInner
=
node
;
KJ_DEFER
(
fiber
->
currentInner
=
nullptr
);
// Switch to the main stack to run the event loop.
fiber
->
state
=
FiberBase
::
WAITING
;
fiber
->
switchToMain
();
// The main stack switched back to us, meaning either the event we registered with
// node->onReady() fired, or we are being canceled by FiberBase's destructor.
if
(
fiber
->
state
==
FiberBase
::
CANCELED
)
{
result
.
addException
(
fiberCanceledException
());
return
;
}
KJ_ASSERT
(
fiber
->
state
==
FiberBase
::
RUNNING
);
}
else
{
KJ_REQUIRE
(
!
loop
.
running
,
"wait() is not allowed from within event callbacks."
);
BoolEvent
doneEvent
;
...
...
@@ -892,6 +1170,7 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope
}
loop
.
setRunnable
(
loop
.
isRunnable
());
}
node
->
get
(
result
);
KJ_IF_MAYBE
(
exception
,
kj
::
runCatchingExceptions
([
&
]()
{
...
...
@@ -904,6 +1183,7 @@ void waitImpl(Own<_::PromiseNode>&& node, _::ExceptionOrValue& result, WaitScope
bool
pollImpl
(
_
::
PromiseNode
&
node
,
WaitScope
&
waitScope
)
{
EventLoop
&
loop
=
waitScope
.
loop
;
KJ_REQUIRE
(
&
loop
==
threadLocalEventLoop
,
"WaitScope not valid for this thread."
);
KJ_REQUIRE
(
waitScope
.
fiber
==
nullptr
,
"poll() is not supported in fibers."
);
KJ_REQUIRE
(
!
loop
.
running
,
"poll() is not allowed from within event callbacks."
);
BoolEvent
doneEvent
;
...
...
@@ -931,11 +1211,11 @@ bool pollImpl(_::PromiseNode& node, WaitScope& waitScope) {
}
Promise
<
void
>
yield
()
{
return
Promise
<
void
>
(
false
,
kj
::
heap
<
YieldPromiseNode
>
());
return
_
::
PromiseNode
::
to
<
Promise
<
void
>>
(
kj
::
heap
<
YieldPromiseNode
>
());
}
Promise
<
void
>
yieldHarder
()
{
return
Promise
<
void
>
(
false
,
kj
::
heap
<
YieldHarderPromiseNode
>
());
return
_
::
PromiseNode
::
to
<
Promise
<
void
>>
(
kj
::
heap
<
YieldHarderPromiseNode
>
());
}
Own
<
PromiseNode
>
neverDone
()
{
...
...
@@ -1379,7 +1659,7 @@ Maybe<Own<Event>> ChainPromiseNode::fire() {
}
else
KJ_IF_MAYBE
(
value
,
intermediate
.
value
)
{
// There is a value and no exception. The value is itself a promise. Adopt it as our
// step2.
inner
=
kj
::
mv
(
value
->
node
);
inner
=
_
::
PromiseNode
::
from
(
kj
::
mv
(
*
value
)
);
}
else
{
// We can only get here if inner->get() returned neither an exception nor a
// value, which never actually happens.
...
...
@@ -1551,8 +1831,8 @@ void ArrayJoinPromiseNode<void>::getNoError(ExceptionOrValue& output) noexcept {
}
// namespace _ (private)
Promise
<
void
>
joinPromises
(
Array
<
Promise
<
void
>>&&
promises
)
{
return
Promise
<
void
>
(
false
,
kj
::
heap
<
_
::
ArrayJoinPromiseNode
<
void
>>
(
KJ_MAP
(
p
,
promises
)
{
return
kj
::
mv
(
p
.
node
);
},
return
_
::
PromiseNode
::
to
<
Promise
<
void
>>
(
kj
::
heap
<
_
::
ArrayJoinPromiseNode
<
void
>>
(
KJ_MAP
(
p
,
promises
)
{
return
_
::
PromiseNode
::
from
(
kj
::
mv
(
p
)
);
},
heapArray
<
_
::
ExceptionOr
<
_
::
Void
>>
(
promises
.
size
())));
}
...
...
c++/src/kj/async.h
View file @
000e2a99
...
...
@@ -229,8 +229,16 @@ public:
// around them in arbitrary ways. Therefore, callers really need to know if a function they
// are calling might wait(), and the `WaitScope&` parameter makes this clear.
//
// TODO(someday): Implement fibers, and let them call wait() even when they are handling an
// event.
// Usually, there is only one `WaitScope` for each `EventLoop`, and it can only be used at the
// top level of the thread owning the loop. Calling `wait()` with this `WaitScope` is what
// actually causes the event loop to run at all. This top-level `WaitScope` cannot be used
// recursively, so cannot be used within an event callback.
//
// However, it is possible to obtain a `WaitScope` in lower-level code by using fibers. Use
// kj::startFiber() to start some code executing on an alternate call stack. That code will get
// its own `WaitScope` allowing it to operate in a synchronous style. In this case, `wait()`
// switches back to the main stack in order to run the event loop, returning to the fiber's stack
// once the awaited promise resolves.
bool
poll
(
WaitScope
&
waitScope
);
// Returns true if a call to wait() would complete without blocking, false if it would block.
...
...
@@ -244,6 +252,8 @@ public:
// The first poll() verifies that the promise doesn't resolve early, which would otherwise be
// hard to do deterministically. The second poll() allows you to check that the promise has
// resolved and avoid a wait() that might deadlock in the case that it hasn't.
//
// poll() is not supported in fibers; it will throw an exception.
ForkedPromise
<
T
>
fork
()
KJ_WARN_UNUSED_RESULT
;
// Forks the promise, so that multiple different clients can independently wait on the result.
...
...
@@ -305,24 +315,7 @@ private:
Promise
(
bool
,
Own
<
_
::
PromiseNode
>&&
node
)
:
PromiseBase
(
kj
::
mv
(
node
))
{}
// Second parameter prevent ambiguity with immediate-value constructor.
template
<
typename
>
friend
class
Promise
;
friend
class
EventLoop
;
template
<
typename
U
,
typename
Adapter
,
typename
...
Params
>
friend
_
::
ReducePromises
<
U
>
newAdaptedPromise
(
Params
&&
...
adapterConstructorParams
);
template
<
typename
U
>
friend
PromiseFulfillerPair
<
U
>
newPromiseAndFulfiller
();
template
<
typename
>
friend
class
_
::
ForkHub
;
friend
class
TaskSet
;
friend
Promise
<
void
>
_
::
yield
();
friend
Promise
<
void
>
_
::
yieldHarder
();
friend
class
_
::
NeverDone
;
template
<
typename
U
>
friend
Promise
<
Array
<
U
>>
joinPromises
(
Array
<
Promise
<
U
>>&&
promises
);
friend
Promise
<
void
>
joinPromises
(
Array
<
Promise
<
void
>>&&
promises
);
friend
class
_
::
XThreadEvent
;
friend
class
Executor
;
friend
class
_
::
PromiseNode
;
};
template
<
typename
T
>
...
...
@@ -397,6 +390,29 @@ PromiseForResult<Func, void> evalLast(Func&& func) KJ_WARN_UNUSED_RESULT;
// callback enqueues new events, then latter callbacks will not execute until those events are
// drained.
template
<
typename
Func
>
PromiseForResult
<
Func
,
WaitScope
&>
startFiber
(
size_t
stackSize
,
Func
&&
func
)
KJ_WARN_UNUSED_RESULT
;
// Executes `func()` in a fiber, returning a promise for the eventual reseult. `func()` will be
// passed a `WaitScope&` as its parameter, allowing it to call `.wait()` on promises. Thus, `func()`
// can be written in a synchronous, blocking style, instead of using `.then()`. This is often much
// easier to write and read, and may even be significantly faster if it allows the use of stack
// allocation rather than heap allocation.
//
// However, fibers have a major disadvantage: memory must be allocated for the fiber's call stack.
// The entire stack must be allocated at once, making it necessary to choose a stack size upfront
// that is big enough for whatever the fiber needs to do. Estimating this is often difficult. That
// said, over-estimating is not too terrible since pages of the stack will actually be allocated
// lazily when first accessed; actual memory usage will correspond to the "high watermark" of the
// actual stack usage. That said, this lazy allocation forces page faults, which can be quite slow.
// Worse, freeing a stack forces a TLB flush and shootdown -- all currently-executing threads will
// have to be interrupted to flush their CPU cores' TLB caches.
//
// In short, when performance matters, you should try to avoid creating fibers very frequently.
//
// TODO(perf): We should add a mechanism for freelisting stacks. However, this improves CPU usage
// at the expense of memory usage: stacks on the freelist will consume however many pages they
// used at their high watermark, forever.
template
<
typename
T
>
Promise
<
Array
<
T
>>
joinPromises
(
Array
<
Promise
<
T
>>&&
promises
);
// Join an array of promises into a promise for an array.
...
...
@@ -891,6 +907,10 @@ private:
Own
<
TaskSet
>
daemons
;
#if _WIN32 || __CYGWIN__
void
*
mainFiber
=
nullptr
;
#endif
bool
turn
();
void
setRunnable
(
bool
runnable
);
void
enterScope
();
...
...
@@ -907,6 +927,7 @@ private:
friend
class
WaitScope
;
friend
class
Executor
;
friend
class
_
::
XThreadEvent
;
friend
class
_
::
FiberBase
;
};
class
WaitScope
{
...
...
@@ -920,20 +941,31 @@ class WaitScope {
public
:
inline
explicit
WaitScope
(
EventLoop
&
loop
)
:
loop
(
loop
)
{
loop
.
enterScope
();
}
inline
~
WaitScope
()
{
loop
.
leaveScope
();
}
inline
~
WaitScope
()
{
if
(
fiber
==
nullptr
)
loop
.
leaveScope
();
}
KJ_DISALLOW_COPY
(
WaitScope
);
void
poll
();
// Pumps the event queue and polls for I/O until there's nothing left to do (without blocking).
//
// Not supported in fibers.
void
setBusyPollInterval
(
uint
count
)
{
busyPollInterval
=
count
;
}
// Set the maximum number of events to run in a row before calling poll() on the EventPort to
// check for new I/O.
//
// This has no effect when used in a fiber.
private
:
EventLoop
&
loop
;
uint
busyPollInterval
=
kj
::
maxValue
;
kj
::
Maybe
<
_
::
FiberBase
&>
fiber
;
explicit
WaitScope
(
EventLoop
&
loop
,
_
::
FiberBase
&
fiber
)
:
loop
(
loop
),
fiber
(
fiber
)
{}
friend
class
EventLoop
;
friend
class
_
::
FiberBase
;
friend
void
_
::
waitImpl
(
Own
<
_
::
PromiseNode
>&&
node
,
_
::
ExceptionOrValue
&
result
,
WaitScope
&
waitScope
);
friend
bool
_
::
pollImpl
(
_
::
PromiseNode
&
node
,
WaitScope
&
waitScope
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment