Commit d68cffe5 authored by Kenton Varda's avatar Kenton Varda

Add Win32EventPort::allowApc() to allow people to enable APCs.

Fixes #435
parent c99ade06
...@@ -137,6 +137,31 @@ KJ_TEST("Win32IocpEventPort timer") { ...@@ -137,6 +137,31 @@ KJ_TEST("Win32IocpEventPort timer") {
KJ_EXPECT(port.getTimer().now() - start >= 10 * MILLISECONDS); KJ_EXPECT(port.getTimer().now() - start >= 10 * MILLISECONDS);
} }
VOID CALLBACK testApcProc(ULONG_PTR dwParam) {
reinterpret_cast<kj::PromiseFulfiller<void>*>(dwParam)->fulfill();
}
KJ_TEST("Win32IocpEventPort APC") {
if (GetProcAddress(GetModuleHandle("ntdll.dll"), "wine_get_version") != nullptr) {
// TODO(cleanup): Periodically check if Wine supports this yet.
KJ_LOG(WARNING, "detected that we're running under wine and this test won't work; skipping");
return;
}
Win32IocpEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
port.allowApc();
auto paf = kj::newPromiseAndFulfiller<void>();
KJ_WIN32(QueueUserAPC(&testApcProc, GetCurrentThread(),
reinterpret_cast<ULONG_PTR>(paf.fulfiller.get())));
paf.promise.wait(waitScope);
}
} // namespace } // namespace
} // namespace kj } // namespace kj
......
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
#include "refcount.h" #include "refcount.h"
#include <ntsecapi.h> // NTSTATUS
#include <ntstatus.h> // STATUS_SUCCESS
#undef ERROR // dammit windows.h #undef ERROR // dammit windows.h
...@@ -184,14 +186,41 @@ void Win32IocpEventPort::wake() const { ...@@ -184,14 +186,41 @@ void Win32IocpEventPort::wake() const {
} }
void Win32IocpEventPort::waitIocp(DWORD timeoutMs) { void Win32IocpEventPort::waitIocp(DWORD timeoutMs) {
if (isAllowApc) {
ULONG countReceived = 0;
OVERLAPPED_ENTRY entry;
memset(&entry, 0, sizeof(entry));
if (GetQueuedCompletionStatusEx(iocp, &entry, 1, &countReceived, timeoutMs, TRUE)) {
KJ_ASSERT(countReceived == 1);
if (entry.lpOverlapped == nullptr) {
// wake() called in another thread, or APC queued.
} else {
DWORD error = ERROR_SUCCESS;
if (entry.lpOverlapped->Internal != STATUS_SUCCESS) {
error = LsaNtStatusToWinError(entry.lpOverlapped->Internal);
}
static_cast<IoPromiseAdapter*>(entry.lpOverlapped)
->done(IoResult { error, entry.dwNumberOfBytesTransferred });
}
} else {
// Call failed.
DWORD error = GetLastError();
if (error == WAIT_TIMEOUT || error == WAIT_IO_COMPLETION) {
// WAIT_TIMEOUT = timed out (dunno why this isn't ERROR_TIMEOUT??)
// WAIT_IO_COMPLETION = APC queued
// Either way, nothing to do.
return;
} else {
KJ_FAIL_WIN32("GetQueuedCompletionStatusEx()", error, error, entry.lpOverlapped);
}
}
} else {
DWORD bytesTransferred; DWORD bytesTransferred;
ULONG_PTR completionKey; ULONG_PTR completionKey;
LPOVERLAPPED overlapped = nullptr; LPOVERLAPPED overlapped = nullptr;
// TODO(someday): Should we use GetQueuedCompletionStatusEx()? It would allow us to read multiple
// events in one call and would let us wait in an alertable state, which would allow users to
// use APCs. However, it currently isn't implemented on Wine (as of 1.9.22).
BOOL success = GetQueuedCompletionStatus( BOOL success = GetQueuedCompletionStatus(
iocp, &bytesTransferred, &completionKey, &overlapped, timeoutMs); iocp, &bytesTransferred, &completionKey, &overlapped, timeoutMs);
...@@ -210,6 +239,7 @@ void Win32IocpEventPort::waitIocp(DWORD timeoutMs) { ...@@ -210,6 +239,7 @@ void Win32IocpEventPort::waitIocp(DWORD timeoutMs) {
DWORD error = success ? ERROR_SUCCESS : GetLastError(); DWORD error = success ? ERROR_SUCCESS : GetLastError();
static_cast<IoPromiseAdapter*>(overlapped)->done(IoResult { error, bytesTransferred }); static_cast<IoPromiseAdapter*>(overlapped)->done(IoResult { error, bytesTransferred });
} }
}
} }
bool Win32IocpEventPort::receivedWake() { bool Win32IocpEventPort::receivedWake() {
......
...@@ -136,6 +136,18 @@ public: ...@@ -136,6 +136,18 @@ public:
// Given a handle that supports waiting for it to become "signaled" via WaitForSingleObject(), // Given a handle that supports waiting for it to become "signaled" via WaitForSingleObject(),
// return an object that can wait for this state using the EventPort. // return an object that can wait for this state using the EventPort.
// ---------------------------------------------------------------------------
// APCs
virtual void allowApc() = 0;
// If this is ever called, the Win32EventPort will switch modes so that APCs can be scheduled
// on the thread, e.g. through the Win32 QueueUserAPC() call. In the future, this may be enabled
// by default. However, as of this writing, Wine does not support the necessary
// GetQueuedCompletionStatusEx() call, thus allowApc() breaks Wine support. (Tested on Wine
// 1.8.7.)
//
// If the event port implementation can't support APCs for some reason, this throws.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// time // time
...@@ -191,6 +203,7 @@ public: ...@@ -191,6 +203,7 @@ public:
Own<IoObserver> observeIo(HANDLE handle) override; Own<IoObserver> observeIo(HANDLE handle) override;
Own<SignalObserver> observeSignalState(HANDLE handle) override; Own<SignalObserver> observeSignalState(HANDLE handle) override;
Timer& getTimer() override { return timerImpl; } Timer& getTimer() override { return timerImpl; }
void allowApc() override { isAllowApc = true; }
private: private:
class IoPromiseAdapter; class IoPromiseAdapter;
...@@ -202,6 +215,7 @@ private: ...@@ -202,6 +215,7 @@ private:
Win32WaitObjectThreadPool waitThreads; Win32WaitObjectThreadPool waitThreads;
TimerImpl timerImpl; TimerImpl timerImpl;
mutable std::atomic<bool> sentWake {false}; mutable std::atomic<bool> sentWake {false};
bool isAllowApc = false;
static TimePoint readClock(); static TimePoint readClock();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment