Commit 8d6b39c8 authored by Remy Blank's avatar Remy Blank

Add timer functionality to the async IO framework.

parent 00c5f916
...@@ -4,6 +4,7 @@ under the same BSD license terms as the rest of the library. ...@@ -4,6 +4,7 @@ under the same BSD license terms as the rest of the library.
Kenton Varda <temporal@gmail.com>: Primary Author Kenton Varda <temporal@gmail.com>: Primary Author
Jason Choy <jjwchoy@gmail.com>: kj/threadlocal.h and other iOS tweaks Jason Choy <jjwchoy@gmail.com>: kj/threadlocal.h and other iOS tweaks
Remy Blank <rblank@google.com> (contributions copyright Google Inc.)
This file does not list people who maintain their own Cap'n Proto This file does not list people who maintain their own Cap'n Proto
implementations as separate projects. Those people are awesome too! :) implementations as separate projects. Those people are awesome too! :)
...@@ -739,9 +739,28 @@ public: ...@@ -739,9 +739,28 @@ public:
UnixEventPort& eventPort; UnixEventPort& eventPort;
}; };
class TimerImpl final: public Timer {
public:
TimerImpl(UnixEventPort& eventPort): eventPort(eventPort) {}
virtual Time steadyTime() { return eventPort.steadyTime(); }
virtual Promise<void> atSteadyTime(Time time) {
return eventPort.atSteadyTime(time);
}
virtual Promise<void> atTimeFromNow(Time delay) {
return eventPort.atSteadyTime(eventPort.steadyTime() + delay);
}
private:
UnixEventPort& eventPort;
};
class LowLevelAsyncIoProviderImpl final: public LowLevelAsyncIoProvider { class LowLevelAsyncIoProviderImpl final: public LowLevelAsyncIoProvider {
public: public:
LowLevelAsyncIoProviderImpl(): eventLoop(eventPort), waitScope(eventLoop) {} LowLevelAsyncIoProviderImpl()
: eventLoop(eventPort), timer(eventPort), waitScope(eventLoop) {}
inline WaitScope& getWaitScope() { return waitScope; } inline WaitScope& getWaitScope() { return waitScope; }
...@@ -771,9 +790,12 @@ public: ...@@ -771,9 +790,12 @@ public:
return heap<FdConnectionReceiver>(eventPort, fd, flags); return heap<FdConnectionReceiver>(eventPort, fd, flags);
} }
Timer& getTimer() override { return timer; }
private: private:
UnixEventPort eventPort; UnixEventPort eventPort;
EventLoop eventLoop; EventLoop eventLoop;
TimerImpl timer;
WaitScope waitScope; WaitScope waitScope;
}; };
...@@ -944,6 +966,8 @@ public: ...@@ -944,6 +966,8 @@ public:
return { kj::mv(thread), kj::mv(pipe) }; return { kj::mv(thread), kj::mv(pipe) };
} }
Timer& getTimer() override { return lowLevel.getTimer(); }
private: private:
LowLevelAsyncIoProvider& lowLevel; LowLevelAsyncIoProvider& lowLevel;
SocketNetwork network; SocketNetwork network;
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "async.h" #include "async.h"
#include "function.h" #include "function.h"
#include "thread.h" #include "thread.h"
#include "time.h"
namespace kj { namespace kj {
...@@ -196,6 +197,9 @@ public: ...@@ -196,6 +197,9 @@ public:
// //
// TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too // TODO(someday): I'm not entirely comfortable with this interface. It seems to be doing too
// much at once but I'm not sure how to cleanly break it down. // much at once but I'm not sure how to cleanly break it down.
virtual Timer& getTimer() = 0;
// Returns a Timer interface for the underlying event loop.
}; };
class LowLevelAsyncIoProvider { class LowLevelAsyncIoProvider {
...@@ -270,6 +274,9 @@ public: ...@@ -270,6 +274,9 @@ public:
// have had `bind()` and `listen()` called on it, so it's ready for `accept()`. // have had `bind()` and `listen()` called on it, so it's ready for `accept()`.
// //
// `flags` is a bitwise-OR of the values of the `Flags` enum. // `flags` is a bitwise-OR of the values of the `Flags` enum.
virtual Timer& getTimer() = 0;
// Returns a Timer interface for the underlying event loop.
}; };
Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel); Own<AsyncIoProvider> newAsyncIoProvider(LowLevelAsyncIoProvider& lowLevel);
......
...@@ -653,7 +653,7 @@ public: ...@@ -653,7 +653,7 @@ public:
void wait() override { KJ_FAIL_ASSERT("Nothing to wait for."); } void wait() override { KJ_FAIL_ASSERT("Nothing to wait for."); }
void poll() override {} void poll() override {}
void setRunnable(bool runnable) { void setRunnable(bool runnable) override {
this->runnable = runnable; this->runnable = runnable;
++callCount; ++callCount;
} }
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <pthread.h> #include <pthread.h>
#include <algorithm>
namespace kj { namespace kj {
...@@ -314,4 +315,37 @@ TEST_F(AsyncUnixTest, PollNoWait) { ...@@ -314,4 +315,37 @@ TEST_F(AsyncUnixTest, PollNoWait) {
EXPECT_EQ(2, receivedCount); EXPECT_EQ(2, receivedCount);
} }
TEST_F(AsyncUnixTest, SteadyTimers) {
UnixEventPort port;
EventLoop loop(port);
WaitScope waitScope(loop);
auto start = port.steadyTime();
std::vector<Time> expected;
std::vector<Time> actual;
auto addTimer = [&](Time delay) {
expected.push_back(start + std::max(delay, Time()));
port.atSteadyTime(start + delay).then([&]() {
actual.push_back(port.steadyTime());
}).detach([](Exception&& e) { ADD_FAILURE() << str(e).cStr(); });
};
addTimer(30 * MILLISECOND);
addTimer(40 * MILLISECOND);
addTimer(20350 * MICROSECOND);
addTimer(30 * MILLISECOND);
addTimer(-10 * MILLISECOND);
std::sort(expected.begin(), expected.end());
port.atSteadyTime(expected.back() + MILLISECOND).wait(waitScope);
ASSERT_EQ(expected.size(), actual.size());
for (int i = 0; i < expected.size(); ++i) {
EXPECT_LE(expected[i], actual[i]) << "Actual time for timer " << i << "("
<< actual[i] / MICROSECOND << ") lower than expected time ("
<< expected[i] / MICROSECOND << ")";
}
}
} // namespace kj } // namespace kj
...@@ -26,6 +26,9 @@ ...@@ -26,6 +26,9 @@
#include "threadlocal.h" #include "threadlocal.h"
#include <setjmp.h> #include <setjmp.h>
#include <errno.h> #include <errno.h>
#include <inttypes.h>
#include <limits>
#include <chrono>
namespace kj { namespace kj {
...@@ -161,8 +164,38 @@ public: ...@@ -161,8 +164,38 @@ public:
PollPromiseAdapter** prev = nullptr; PollPromiseAdapter** prev = nullptr;
}; };
class UnixEventPort::TimerPromiseAdapter {
public:
TimerPromiseAdapter(PromiseFulfiller<void>& fulfiller, UnixEventPort& port, Time time)
: time(time), fulfiller(fulfiller), port(port) {
pos = port.timers.insert(this);
}
~TimerPromiseAdapter() {
if (pos != port.timers.end()) {
port.timers.erase(pos);
}
}
void fulfill() {
fulfiller.fulfill();
port.timers.erase(pos);
pos = port.timers.end();
}
const Time time;
PromiseFulfiller<void>& fulfiller;
UnixEventPort& port;
Timers::const_iterator pos;
};
bool UnixEventPort::TimerBefore::operator()(TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs) {
return lhs->time < rhs->time;
}
UnixEventPort::UnixEventPort() { UnixEventPort::UnixEventPort() {
pthread_once(&registerReservedSignalOnce, &registerReservedSignal); pthread_once(&registerReservedSignalOnce, &registerReservedSignal);
frozenSteadyTime = currentSteadyTime();
} }
UnixEventPort::~UnixEventPort() {} UnixEventPort::~UnixEventPort() {}
...@@ -245,6 +278,10 @@ private: ...@@ -245,6 +278,10 @@ private:
int pollError = 0; int pollError = 0;
}; };
Promise<void> UnixEventPort::atSteadyTime(Time time) {
return newAdaptedPromise<void, TimerPromiseAdapter>(*this, time);
}
void UnixEventPort::wait() { void UnixEventPort::wait() {
sigset_t newMask; sigset_t newMask;
sigemptyset(&newMask); sigemptyset(&newMask);
...@@ -279,13 +316,32 @@ void UnixEventPort::wait() { ...@@ -279,13 +316,32 @@ void UnixEventPort::wait() {
threadCapture = &capture; threadCapture = &capture;
sigprocmask(SIG_UNBLOCK, &newMask, &origMask); sigprocmask(SIG_UNBLOCK, &newMask, &origMask);
pollContext.run(-1); constexpr Time MAX_TIMEOUT =
std::numeric_limits<int>::digits < std::numeric_limits<uint64_t>::digits ?
int64_t(std::numeric_limits<int>::max() - 1) * MILLISECOND :
Time(std::numeric_limits<uint64_t>::max()) - MILLISECOND;
int pollTimeout = -1;
auto timer = timers.begin();
if (timer != timers.end()) {
Time timeout = (*timer)->time - currentSteadyTime();
if (timeout < Time()) {
pollTimeout = 0;
} else if (timeout <= MAX_TIMEOUT) {
// Round up to the next millisecond
pollTimeout = (timeout + MILLISECOND - unit<Time>()) / MILLISECOND;
} else {
pollTimeout = MAX_TIMEOUT / MILLISECOND;
}
}
pollContext.run(pollTimeout);
sigprocmask(SIG_SETMASK, &origMask, nullptr); sigprocmask(SIG_SETMASK, &origMask, nullptr);
threadCapture = nullptr; threadCapture = nullptr;
// Queue events. // Queue events.
pollContext.processResults(); pollContext.processResults();
processTimers();
} }
void UnixEventPort::poll() { void UnixEventPort::poll() {
...@@ -332,6 +388,7 @@ void UnixEventPort::poll() { ...@@ -332,6 +388,7 @@ void UnixEventPort::poll() {
pollContext.run(0); pollContext.run(0);
pollContext.processResults(); pollContext.processResults();
} }
processTimers();
} }
void UnixEventPort::gotSignal(const siginfo_t& siginfo) { void UnixEventPort::gotSignal(const siginfo_t& siginfo) {
...@@ -347,4 +404,20 @@ void UnixEventPort::gotSignal(const siginfo_t& siginfo) { ...@@ -347,4 +404,20 @@ void UnixEventPort::gotSignal(const siginfo_t& siginfo) {
} }
} }
Time UnixEventPort::currentSteadyTime() {
return Time(std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::steady_clock::now().time_since_epoch()).count());
}
void UnixEventPort::processTimers() {
frozenSteadyTime = currentSteadyTime();
for (;;) {
auto front = timers.begin();
if (front == timers.end() || (*front)->time > frozenSteadyTime) {
break;
}
(*front)->fulfill();
}
}
} // namespace kj } // namespace kj
...@@ -25,10 +25,12 @@ ...@@ -25,10 +25,12 @@
#define KJ_ASYNC_UNIX_H_ #define KJ_ASYNC_UNIX_H_
#include "async.h" #include "async.h"
#include "time.h"
#include "vector.h" #include "vector.h"
#include <signal.h> #include <signal.h>
#include <poll.h> #include <poll.h>
#include <pthread.h> #include <pthread.h>
#include <set>
namespace kj { namespace kj {
...@@ -83,6 +85,9 @@ public: ...@@ -83,6 +85,9 @@ public:
// needs to use SIGUSR1, call this at startup (before any calls to `captureSignal()` and before // needs to use SIGUSR1, call this at startup (before any calls to `captureSignal()` and before
// constructing an `UnixEventPort`) to offer a different signal. // constructing an `UnixEventPort`) to offer a different signal.
Time steadyTime() { return frozenSteadyTime; }
Promise<void> atSteadyTime(Time time);
// implements EventPort ------------------------------------------------------ // implements EventPort ------------------------------------------------------
void wait() override; void wait() override;
void poll() override; void poll() override;
...@@ -90,14 +95,27 @@ public: ...@@ -90,14 +95,27 @@ public:
private: private:
class PollPromiseAdapter; class PollPromiseAdapter;
class SignalPromiseAdapter; class SignalPromiseAdapter;
class TimerPromiseAdapter;
class PollContext; class PollContext;
struct TimerBefore {
bool operator()(TimerPromiseAdapter* lhs, TimerPromiseAdapter* rhs);
};
using Timers = std::multiset<TimerPromiseAdapter*, TimerBefore>;
PollPromiseAdapter* pollHead = nullptr; PollPromiseAdapter* pollHead = nullptr;
PollPromiseAdapter** pollTail = &pollHead; PollPromiseAdapter** pollTail = &pollHead;
SignalPromiseAdapter* signalHead = nullptr; SignalPromiseAdapter* signalHead = nullptr;
SignalPromiseAdapter** signalTail = &signalHead; SignalPromiseAdapter** signalTail = &signalHead;
Timers timers;
Time frozenSteadyTime;
void gotSignal(const siginfo_t& siginfo); void gotSignal(const siginfo_t& siginfo);
Time currentSteadyTime();
void processTimers();
friend class TimerPromiseAdapter;
}; };
} // namespace kj } // namespace kj
......
// Copyright (c) 2014, Kenton Varda <temporal@gmail.com>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef KJ_TIME_H_
#define KJ_TIME_H_
#include "async.h"
#include "units.h"
#include <inttypes.h>
namespace kj {
namespace _ { // private
class MicrosecondLabel;
} // namespace _ (private)
using Time = Quantity<int64_t, _::MicrosecondLabel>;
// A time value, in microseconds.
constexpr Time MICROSECOND = unit<Time>();
constexpr Time MILLISECOND = 1000 * MICROSECOND;
constexpr Time SECOND = 1000 * MILLISECOND;
constexpr Time MINUTE = 60 * SECOND;
constexpr Time HOUR = 60 * MINUTE;
constexpr Time DAY = 24 * HOUR;
class Timer {
// Interface to time and timer functionality. The underlying time unit comes from
// a steady clock, i.e. a clock that increments steadily and is independent of
// system (or wall) time.
public:
virtual Time steadyTime() = 0;
// Returns the current value of a clock that moves steadily forward, independent of any
// changes in the wall clock. The value is updated every time the event loop waits,
// and is constant in-between waits.
virtual Promise<void> atSteadyTime(Time time) = 0;
// Schedules a timer that will trigger when the value returned by steadyTime() reaches
// the given time.
virtual Promise<void> atTimeFromNow(Time delay) = 0;
// Schedules a timer that will trigger after the given delay. The timer uses steadyTime()
// and is therefore immune to system clock updates.
};
} // namespace kj
#endif // KJ_TIME_H_
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment