Commit f5268fb0 authored by Kenton Varda's avatar Kenton Varda

Implement Windows IOCP-based EventPort.

parent 517e1e4d
// Copyright (c) 2013-2014 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
#include "async-win32.h"
#include "thread.h"
#include "test.h"
namespace kj {
namespace {
KJ_TEST("Win32IocpEventPort I/O operations") {
Win32IocpEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto pipeName = kj::str("\\\\.\\Pipe\\kj-async-win32-test.", GetCurrentProcessId());
HANDLE readEnd_, writeEnd_;
KJ_WIN32(readEnd_ = CreateNamedPipeA(pipeName.cStr(),
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_WAIT,
1, 0, 0, 0, NULL));
AutoCloseHandle readEnd(readEnd_);
KJ_WIN32(writeEnd_ = CreateFileA(pipeName.cStr(), GENERIC_WRITE, 0, NULL, OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL, NULL));
AutoCloseHandle writeEnd(writeEnd_);
auto observer = port.observeIo(readEnd);
auto op = observer->newOperation(0);
byte buffer[256];
KJ_ASSERT(!ReadFile(readEnd, buffer, sizeof(buffer), NULL, op->getOverlapped()));
DWORD error = GetLastError();
if (error != ERROR_IO_PENDING) {
KJ_FAIL_WIN32("ReadFile()", error);
}
bool done = false;
auto promise = op->onComplete().then([&](Win32EventPort::IoResult result) {
done = true;
return result;
}).eagerlyEvaluate(nullptr);
KJ_EXPECT(!done);
evalLater([]() {}).wait(waitScope);
evalLater([]() {}).wait(waitScope);
evalLater([]() {}).wait(waitScope);
evalLater([]() {}).wait(waitScope);
evalLater([]() {}).wait(waitScope);
KJ_EXPECT(!done);
DWORD bytesWritten;
KJ_WIN32(WriteFile(writeEnd, "foo", 3, &bytesWritten, NULL));
KJ_EXPECT(bytesWritten == 3);
auto result = promise.wait(waitScope);
KJ_EXPECT(result.errorCode == ERROR_SUCCESS);
KJ_EXPECT(result.bytesTransferred == 3);
KJ_EXPECT(kj::str(kj::arrayPtr(buffer, 3).asChars()) == "foo");
}
KJ_TEST("Win32IocpEventPort::wake()") {
Win32IocpEventPort port;
Thread thread([&]() {
Sleep(10);
port.wake();
});
KJ_EXPECT(port.wait());
}
KJ_TEST("Win32IocpEventPort::wake() on poll()") {
Win32IocpEventPort port;
volatile bool woken = false;
Thread thread([&]() {
Sleep(10);
port.wake();
woken = true;
});
KJ_EXPECT(!port.poll());
while (!woken) Sleep(10);
KJ_EXPECT(port.poll());
}
KJ_TEST("Win32IocpEventPort timer") {
Win32IocpEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto start = port.getTimer().now();
bool done = false;
auto promise = port.getTimer().afterDelay(10 * MILLISECONDS).then([&]() {
done = true;
}).eagerlyEvaluate(nullptr);
KJ_EXPECT(!done);
evalLater([]() {}).wait(waitScope);
evalLater([]() {}).wait(waitScope);
evalLater([]() {}).wait(waitScope);
evalLater([]() {}).wait(waitScope);
evalLater([]() {}).wait(waitScope);
KJ_EXPECT(!done);
promise.wait(waitScope);
KJ_EXPECT(done);
KJ_EXPECT(port.getTimer().now() - start >= 10 * MILLISECONDS);
}
} // namespace
} // namespace kj
#endif // _WIN32
// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#if _WIN32
// Request Vista-level APIs.
#define WINVER 0x0600
#define _WIN32_WINNT 0x0600
#include "async-win32.h"
#include "debug.h"
#include <chrono>
#include "refcount.h"
#undef ERROR // dammit windows.h
namespace kj {
Win32IocpEventPort::Win32IocpEventPort()
: iocp(newIocpHandle()), thread(openCurrentThread()), timerImpl(readClock()) {}
Win32IocpEventPort::~Win32IocpEventPort() noexcept(false) {}
class Win32IocpEventPort::IoPromiseAdapter final: public OVERLAPPED {
public:
IoPromiseAdapter(PromiseFulfiller<IoResult>& fulfiller, Win32IocpEventPort& port,
uint64_t offset, IoPromiseAdapter** selfPtr)
: fulfiller(fulfiller), port(port) {
*selfPtr = this;
memset(implicitCast<OVERLAPPED*>(this), 0, sizeof(OVERLAPPED));
this->Offset = offset & 0x00000000FFFFFFFFull;
this->OffsetHigh = offset >> 32;
}
~IoPromiseAdapter() {
if (handle != INVALID_HANDLE_VALUE) {
// Need to cancel the I/O.
//
// Note: Even if HasOverlappedIoCompleted(this) is true, CancelIoEx() still seems needed to
// force the completion event.
if (!CancelIoEx(handle, this)) {
DWORD error = GetLastError();
// ERROR_NOT_FOUND probably means the operation already completed and is enqueued on the
// IOCP.
//
// ERROR_INVALID_HANDLE probably means that, amid a mass of destructors, the HANDLE was
// closed before all of the I/O promises were destroyed. We tolerate this so long as the
// I/O promises are also destroyed before returning to the event loop, hence the I/O
// tasks won't actually continue on a dead handle.
//
// TODO(cleanup): ERROR_INVALID_HANDLE really shouldn't be allowed. Unfortunately, the
// refcounted nature of capabilities and the RPC system seems to mean that objects
// are unwound in the wrong order in several of Cap'n Proto's tests. So we live with this
// for now. Note that even if a new handle is opened with the same numeric value, it
// should be hardless to call CancelIoEx() on it because it couldn't possibly be using
// the same OVERLAPPED structure.
if (error != ERROR_NOT_FOUND && error != ERROR_INVALID_HANDLE) {
KJ_FAIL_WIN32("CancelIoEx()", error, handle);
}
}
// We have to wait for the IOCP to poop out the event, so that we can safely destroy the
// OVERLAPPED.
while (handle != INVALID_HANDLE_VALUE) {
port.waitIocp(INFINITE);
}
}
}
void start(HANDLE handle) {
KJ_ASSERT(this->handle == INVALID_HANDLE_VALUE);
this->handle = handle;
}
void done(IoResult result) {
KJ_ASSERT(handle != INVALID_HANDLE_VALUE);
handle = INVALID_HANDLE_VALUE;
fulfiller.fulfill(kj::mv(result));
}
private:
PromiseFulfiller<IoResult>& fulfiller;
Win32IocpEventPort& port;
HANDLE handle = INVALID_HANDLE_VALUE;
// If an I/O operation is currently enqueued, the handle on which it is enqueued.
};
class Win32IocpEventPort::IoOperationImpl final: public Win32EventPort::IoOperation {
public:
explicit IoOperationImpl(Win32IocpEventPort& port, HANDLE handle, uint64_t offset)
: handle(handle),
promise(newAdaptedPromise<IoResult, IoPromiseAdapter>(port, offset, &promiseAdapter)) {}
LPOVERLAPPED getOverlapped() override {
KJ_REQUIRE(promiseAdapter != nullptr, "already called onComplete()");
return promiseAdapter;
}
Promise<IoResult> onComplete() override {
KJ_REQUIRE(promiseAdapter != nullptr, "can only call onComplete() once");
promiseAdapter->start(handle);
promiseAdapter = nullptr;
return kj::mv(promise);
}
private:
HANDLE handle;
IoPromiseAdapter* promiseAdapter;
Promise<IoResult> promise;
};
class Win32IocpEventPort::IoObserverImpl final: public Win32EventPort::IoObserver {
public:
IoObserverImpl(Win32IocpEventPort& port, HANDLE handle)
: port(port), handle(handle) {
KJ_WIN32(CreateIoCompletionPort(handle, port.iocp, 0, 1), handle, port.iocp.get());
}
Own<IoOperation> newOperation(uint64_t offset) {
return heap<IoOperationImpl>(port, handle, offset);
}
private:
Win32IocpEventPort& port;
HANDLE handle;
};
Own<Win32EventPort::IoObserver> Win32IocpEventPort::observeIo(HANDLE handle) {
return heap<IoObserverImpl>(*this, handle);
}
Own<Win32EventPort::SignalObserver> Win32IocpEventPort::observeSignalState(HANDLE handle) {
return waitThreads.observeSignalState(handle);
}
TimePoint Win32IocpEventPort::readClock() {
return origin<TimePoint>() + std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count() * NANOSECONDS;
}
bool Win32IocpEventPort::wait() {
waitIocp(timerImpl.timeoutToNextEvent(readClock(), MILLISECONDS, INFINITE - 1)
.map([](uint64_t t) -> DWORD { return t; })
.orDefault(INFINITE));
timerImpl.advanceTo(readClock());
return receivedWake();
}
bool Win32IocpEventPort::poll() {
waitIocp(0);
return receivedWake();
}
void Win32IocpEventPort::wake() const {
if (!__atomic_load_n(&sentWake, __ATOMIC_ACQUIRE)) {
__atomic_store_n(&sentWake, true, __ATOMIC_RELEASE);
KJ_WIN32(PostQueuedCompletionStatus(iocp, 0, 0, nullptr));
}
}
void Win32IocpEventPort::waitIocp(DWORD timeoutMs) {
DWORD bytesTransferred;
ULONG_PTR completionKey;
LPOVERLAPPED overlapped = nullptr;
// TODO(someday): Should we use GetQueuedCompletionStatusEx()? It would allow us to read multiple
// events in one call and would let us wait in an alertable state, which would allow users to
// use APCs. However, it currently isn't implemented on Wine (as of 1.9.22).
BOOL success = GetQueuedCompletionStatus(
iocp, &bytesTransferred, &completionKey, &overlapped, timeoutMs);
if (overlapped == nullptr) {
if (success) {
// wake() called in another thread.
} else {
DWORD error = GetLastError();
if (error == WAIT_TIMEOUT) {
// Great, nothing to do. (Why this is WAIT_TIMEOUT and not ERROR_TIMEOUT I'm not sure.)
} else {
KJ_FAIL_WIN32("GetQueuedCompletionStatus()", error, error, overlapped);
}
}
} else {
DWORD error = success ? ERROR_SUCCESS : GetLastError();
static_cast<IoPromiseAdapter*>(overlapped)->done(IoResult { error, bytesTransferred });
}
}
bool Win32IocpEventPort::receivedWake() {
if (__atomic_load_n(&sentWake, __ATOMIC_ACQUIRE)) {
__atomic_store_n(&sentWake, false, __ATOMIC_RELEASE);
return true;
} else {
return false;
}
}
AutoCloseHandle Win32IocpEventPort::newIocpHandle() {
HANDLE h;
KJ_WIN32(h = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1));
return AutoCloseHandle(h);
}
AutoCloseHandle Win32IocpEventPort::openCurrentThread() {
HANDLE process = GetCurrentProcess();
HANDLE result;
KJ_WIN32(DuplicateHandle(process, GetCurrentThread(), process, &result,
0, FALSE, DUPLICATE_SAME_ACCESS));
return AutoCloseHandle(result);
}
// =======================================================================================
Win32WaitObjectThreadPool::Win32WaitObjectThreadPool(uint mainThreadCount) {}
Own<Win32EventPort::SignalObserver> Win32WaitObjectThreadPool::observeSignalState(HANDLE handle) {
KJ_UNIMPLEMENTED("wait for win32 handles");
}
uint Win32WaitObjectThreadPool::prepareMainThreadWait(HANDLE* handles[]) {
KJ_UNIMPLEMENTED("wait for win32 handles");
}
bool Win32WaitObjectThreadPool::finishedMainThreadWait(DWORD returnCode) {
KJ_UNIMPLEMENTED("wait for win32 handles");
}
} // namespace kj
#endif // _WIN32
// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
// Licensed under the MIT License:
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#ifndef KJ_ASYNC_WIN32_H_
#define KJ_ASYNC_WIN32_H_
#if !_WIN32
#error "This file is Windows-specific. On Unix, include async-unix.h instead."
#endif
#include "async.h"
#include "time.h"
#include "io.h"
#include <inttypes.h>
// Include windows.h as lean as possible. (If you need more of the Windows API for your app,
// #include windows.h yourself before including this header.)
#define WIN32_LEAN_AND_MEAN 1
#define NOSERVICE 1
#define NOMCX 1
#define NOIME 1
#include <windows.h>
#include "windows-sanity.h"
namespace kj {
class Win32EventPort: public EventPort {
// Abstract base interface for EventPorts that can listen on Win32 event types. Due to the
// absurd complexity of the Win32 API, it's not possible to standardize on a single
// implementation of EventPort. In particular, there is no way for a single thread to use I/O
// completion ports (the most efficient way of handling I/O) while at the same time waiting for
// signalable handles or UI messages.
//
// Note that UI messages are not supported at all by this interface because the message queue
// is implemented by user32.dll and we want libkj to depend only on kernel32.dll. A separate
// compat library could provide a Win32EventPort implementation that works with the UI message
// queue.
public:
// ---------------------------------------------------------------------------
// overlapped I/O
struct IoResult {
DWORD errorCode;
DWORD bytesTransferred;
};
class IoOperation {
public:
virtual LPOVERLAPPED getOverlapped() = 0;
// Gets the OVERLAPPED structure to pass to the Win32 I/O call. Do NOT modify it; just pass it
// on.
virtual Promise<IoResult> onComplete() = 0;
// After making the Win32 call, if the return value indicates that the operation was
// successfully queued (i.e. the completion event will definitely occur), call this to wait
// for completion.
//
// You MUST call this if the operation was successfully queued, and you MUST NOT call this
// otherwise. If the Win32 call failed (without queuing any operation or event) then you should
// simply drop the IoOperation object.
//
// Dropping the returned Promise cancels the operation via Win32's CancelIoEx(). The destructor
// will wait for the cancellation to complete, such that after dropping the proimse it is safe
// to free the buffer that the operation was reading from / writing to.
//
// You may safely drop the `IoOperation` while still waiting for this promise. You may not,
// however, drop the `IoObserver`.
};
class IoObserver {
public:
virtual Own<IoOperation> newOperation(uint64_t offset) = 0;
// Begin an I/O operation. For file operations, `offset` is the offset within the file at
// which the operation will start. For stream operations, `offset` is ignored.
};
virtual Own<IoObserver> observeIo(HANDLE handle) = 0;
// Given a handle which supports overlapped I/O, arrange to receive I/O completion events via
// this EventPort.
//
// Different Win32EventPort implementations may handle this in different ways, such as by using
// completion routines (APCs) or by using I/O completion ports. The caller should not assume
// any particular technique.
//
// WARNING: It is only safe to call observeIo() on a particular handle once during its lifetime.
// You cannot observe the same handle from multiple Win32EventPorts, even if not at the same
// time. This is because the Win32 API provides no way to disassociate a handle from an I/O
// completion port once it is associated.
// ---------------------------------------------------------------------------
// signalable handles
//
// Warning: Due to limitations in the Win32 API, implementations of EventPort may be forced to
// spawn additional threads to wait for signaled objects. This is necessary if the EventPort
// implementation is based on I/O completion ports, or if you need to wait on more than 64
// handles at once.
class SignalObserver {
public:
virtual Promise<void> onSignaled() = 0;
// Returns a promise that completes the next time the handle enters the signaled state.
//
// Depending on the type of handle, the handle may automatically be reset to a non-signaled
// state before the promise resolves. The underlying implementaiton uses WaitForSingleObject()
// or an equivalent wait call, so check the documentation for that to understand the semantics.
//
// If the handle is a mutex and it is abandoned without being unlocked, the promise breaks with
// an exception.
virtual Promise<bool> onSignaledOrAbandoned() = 0;
// Like onSingaled(), but instead of throwing when a mutex is abandoned, resolves to `true`.
// Resolves to `false` for non-abandoned signals.
};
virtual Own<SignalObserver> observeSignalState(HANDLE handle) = 0;
// Given a handle that supports waiting for it to become "signaled" via WaitForSingleObject(),
// return an object that can wait for this state using the EventPort.
// ---------------------------------------------------------------------------
// time
virtual Timer& getTimer() = 0;
};
class Win32WaitObjectThreadPool {
// Helper class that implements Win32EventPort::observeSignalState() by spawning additional
// threads as needed to perform the actual waiting.
//
// This class is intended to be used to assist in building Win32EventPort implementations.
public:
Win32WaitObjectThreadPool(uint mainThreadCount = 0);
// `mainThreadCount` indicates the number of objects the main thread is able to listen on
// directly. Typically this would be zero (e.g. if the main thread watches an I/O completion
// port) or MAXIMUM_WAIT_OBJECTS (e.g. if the main thread is a UI thread but can use
// MsgWaitForMultipleObjectsEx() to wait on some handles at the same time as messages).
Own<Win32EventPort::SignalObserver> observeSignalState(HANDLE handle);
// Implemetns Win32EventPort::observeSignalState().
uint prepareMainThreadWait(HANDLE* handles[]);
// Call immediately before invoking WaitForMultipleObjects() or similar in the main thread.
// Fills in `handles` with the handle pointers to wait on, and returns the number of handles
// in this array. (The array should be allocated to be at least the size passed to the
// constructor).
//
// There's no need to call this if `mainThreadCount` as passed to the constructor was zero.
bool finishedMainThreadWait(DWORD returnCode);
// Call immediately after invoking WaitForMultipleObjects() or similar in the main thread,
// passing the value returend by that call. Returns true if the event indicated by `returnCode`
// has been handled (i.e. it was WAIT_OBJECT_n or WAIT_ABANDONED_n where n is in-range for the
// last call to prepareMainThreadWait()).
};
class Win32IocpEventPort final: public Win32EventPort {
// An EventPort implementation which uses Windows I/O completion ports to listen for events.
//
// With this implementation, observeSignalState() requires spawning a separate thread.
public:
Win32IocpEventPort();
~Win32IocpEventPort() noexcept(false);
// implements EventPort ------------------------------------------------------
bool wait() override;
bool poll() override;
void wake() const override;
// implements Win32IocpEventPort ---------------------------------------------
Own<IoObserver> observeIo(HANDLE handle) override;
Own<SignalObserver> observeSignalState(HANDLE handle) override;
Timer& getTimer() override { return timerImpl; }
private:
class IoPromiseAdapter;
class IoOperationImpl;
class IoObserverImpl;
AutoCloseHandle iocp;
AutoCloseHandle thread;
Win32WaitObjectThreadPool waitThreads;
TimerImpl timerImpl;
mutable bool sentWake = false;
static TimePoint readClock();
void waitIocp(DWORD timeoutMs);
// Wait on the I/O completion port for up to timeoutMs and pump events. Does not advance the
// timer; caller must do that.
bool receivedWake();
static AutoCloseHandle newIocpHandle();
static AutoCloseHandle openCurrentThread();
};
} // namespace kj
#endif // KJ_ASYNC_WIN32_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment