Commit 8bfe482a authored by Kenton Varda's avatar Kenton Varda

Allow SchemaLoader to use a callback to lazily load schema nodes, including…

Allow SchemaLoader to use a callback to lazily load schema nodes, including dependencies of loaded nodes.
parent 2ca6d90c
......@@ -147,11 +147,16 @@ struct RawSchema {
//
// This is an internal structure which could change in the future.
uint64_t id;
const word* encodedNode;
// Encoded SchemaNode, readable via readMessageUnchecked<schema::Node>(encodedNode).
const RawSchema* const* dependencies;
// Pointers to other types on which this one depends, sorted by ID.
// Pointers to other types on which this one depends, sorted by ID. The schemas in this table
// may be uninitialized -- you must call ensureInitialized() on the one you wish to use before
// using it.
//
// TODO(someday): Make this a hashtable.
struct MemberInfo {
......@@ -171,6 +176,23 @@ struct RawSchema {
// Points to the RawSchema of a compiled-in type to which it is safe to cast any DynamicValue
// with this schema. This is null for all compiled-in types; it is only set by SchemaLoader on
// dynamically-loaded types.
class Initializer {
public:
virtual void init(const RawSchema* schema) const = 0;
};
const Initializer* lazyInitializer;
// Lazy initializer, invoked by ensureInitialized().
inline void ensureInitialized() const {
// Lazy initialization support. Invoke to ensure that initialization has taken place. This
// is required in particular when traversing the dependency list. RawSchemas for compiled-in
// types are always initialized; only dynamically-loaded schemas may be lazy.
const Initializer* i = __atomic_load_n(&lazyInitializer, __ATOMIC_ACQUIRE);
if (i != nullptr) i->init(this);
}
};
template <typename T>
......
......@@ -208,6 +208,8 @@ TEST(SchemaLoader, Incompatible) {
loadUnderAlternateTypeId<test::TestAllTypes>(loader, typeId<test::TestListDefaults>()));
}
// TODO(test): More extensively test upgrade/downgrade checks.
TEST(SchemaLoader, Enumerate) {
SchemaLoader loader;
loader.loadCompiledTypeAndDependencies<TestAllTypes>();
......@@ -222,7 +224,88 @@ TEST(SchemaLoader, Enumerate) {
}
}
// TODO(test): More extensively test upgrade/downgrade checks.
TEST(SchemaLoader, EnumerateNoPlaceholders) {
SchemaLoader loader;
Schema schema = loader.load(Schema::from<TestDefaults>().getProto());
{
auto list = loader.getAllLoaded();
ASSERT_EQ(1u, list.size());
EXPECT_TRUE(list[0] == schema);
}
Schema dep = schema.getDependency(typeId<TestAllTypes>());
{
auto list = loader.getAllLoaded();
ASSERT_EQ(2u, list.size());
if (list[0] == schema) {
EXPECT_TRUE(list[1] == dep);
} else {
EXPECT_TRUE(list[0] == dep);
EXPECT_TRUE(list[1] == schema);
}
}
}
class FakeLoaderCallback: public SchemaLoader::LazyLoadCallback {
public:
FakeLoaderCallback(const schema::Node::Reader node): node(node), loaded(false) {}
bool isLoaded() { return loaded; }
void load(const SchemaLoader& loader, uint64_t id) const override {
if (id == 1234) {
// Magic "not found" ID.
return;
}
EXPECT_EQ(node.getId(), id);
EXPECT_FALSE(loaded);
loaded = true;
loader.loadIfNew(node);
}
private:
const schema::Node::Reader node;
mutable bool loaded = false;
};
TEST(SchemaLoader, LazyLoad) {
FakeLoaderCallback callback(Schema::from<TestAllTypes>().getProto());
SchemaLoader loader(callback);
EXPECT_TRUE(loader.tryGet(1234) == nullptr);
EXPECT_FALSE(callback.isLoaded());
Schema schema = loader.get(typeId<TestAllTypes>());
EXPECT_TRUE(callback.isLoaded());
EXPECT_EQ(schema.getProto().getDisplayName(),
Schema::from<TestAllTypes>().getProto().getDisplayName());
EXPECT_EQ(schema, schema.getDependency(typeId<TestAllTypes>()));
EXPECT_EQ(schema, loader.get(typeId<TestAllTypes>()));
}
TEST(SchemaLoader, LazyLoadGetDependency) {
FakeLoaderCallback callback(Schema::from<TestAllTypes>().getProto());
SchemaLoader loader(callback);
Schema schema = loader.load(Schema::from<TestDefaults>().getProto());
EXPECT_FALSE(callback.isLoaded());
Schema dep = schema.getDependency(typeId<TestAllTypes>());
EXPECT_TRUE(callback.isLoaded());
EXPECT_EQ(dep.getProto().getDisplayName(),
Schema::from<TestAllTypes>().getProto().getDisplayName());
EXPECT_EQ(dep, schema.getDependency(typeId<TestAllTypes>()));
EXPECT_EQ(dep, loader.get(typeId<TestAllTypes>()));
}
} // namespace
} // namespace _ (private)
......
......@@ -33,22 +33,50 @@
namespace capnp {
class SchemaLoader::InitializerImpl: public _::RawSchema::Initializer {
public:
inline explicit InitializerImpl(const SchemaLoader& loader): loader(loader), callback(nullptr) {}
inline InitializerImpl(const SchemaLoader& loader, const LazyLoadCallback& callback)
: loader(loader), callback(callback) {}
inline kj::Maybe<const LazyLoadCallback&> getCallback() const { return callback; }
void init(const _::RawSchema* schema) const override;
inline bool operator==(decltype(nullptr)) const { return callback == nullptr; }
private:
const SchemaLoader& loader;
kj::Maybe<const LazyLoadCallback&> callback;
};
class SchemaLoader::Impl {
public:
_::RawSchema* load(const schema::Node::Reader& reader);
inline explicit Impl(const SchemaLoader& loader): initializer(loader) {}
inline Impl(const SchemaLoader& loader, const LazyLoadCallback& callback)
: initializer(loader, callback) {}
_::RawSchema* load(const schema::Node::Reader& reader, bool isPlaceholder);
_::RawSchema* loadNative(const _::RawSchema* nativeSchema);
_::RawSchema* loadEmpty(uint64_t id, kj::StringPtr name, schema::Node::Body::Which kind);
// Create a dummy empty schema of the given kind for the given id and load it.
_::RawSchema* tryGet(uint64_t typeId) const;
struct TryGetResult {
_::RawSchema* schema;
kj::Maybe<const LazyLoadCallback&> callback;
};
TryGetResult tryGet(uint64_t typeId) const;
kj::Array<Schema> getAllLoaded() const;
kj::Arena arena;
private:
std::unordered_map<uint64_t, _::RawSchema*> schemas;
InitializerImpl initializer;
};
// =======================================================================================
......@@ -396,7 +424,7 @@ private:
}
void validateTypeId(uint64_t id, schema::Node::Body::Which expectedKind) {
_::RawSchema* existing = loader.tryGet(id);
_::RawSchema* existing = loader.tryGet(id).schema;
if (existing != nullptr) {
auto node = readMessageUnchecked<schema::Node>(existing->encodedNode);
VALIDATE_SCHEMA(node.getBody().which() == expectedKind,
......@@ -406,9 +434,8 @@ private:
return;
}
// TODO(cleanup): str() really needs to return something NUL-terminated...
dependencies.insert(std::make_pair(id, loader.loadEmpty(
id, kj::str("(unknown type used by ", nodeName , ")", '\0').begin(), expectedKind)));
id, kj::str("(unknown type used by ", nodeName , ")"), expectedKind)));
}
#undef VALIDATE_SCHEMA
......@@ -423,7 +450,7 @@ public:
bool shouldReplace(const schema::Node::Reader& existingNode,
const schema::Node::Reader& replacement,
bool replacementIsNative) {
bool preferReplacementIfEquivalent) {
KJ_CONTEXT("checking compatibility with previously-loaded node of the same id",
existingNode.getDisplayName());
......@@ -434,9 +461,8 @@ public:
checkCompatibility(existingNode, replacement);
// Prefer the newer schema. If neither is newer, prefer native types, otherwise prefer the
// existing type.
return replacementIsNative ? compatibility != OLDER : compatibility == NEWER;
// Prefer the newer schema.
return preferReplacementIfEquivalent ? compatibility != OLDER : compatibility == NEWER;
}
private:
......@@ -798,8 +824,7 @@ private:
MallocMessageBuilder builder(kj::arrayPtr(scratch, sizeof(scratch)));
auto node = builder.initRoot<schema::Node>();
node.setId(structTypeId);
// TODO(cleanup): str() really needs to return something NUL-terminated...
node.setDisplayName(kj::str("(unknown type used in ", nodeName, ")", '\0').begin());
node.setDisplayName(kj::str("(unknown type used in ", nodeName, ")"));
auto structNode = node.getBody().initStructNode();
switch (type.getBody().which()) {
......@@ -864,7 +889,7 @@ private:
member.setCodeOrder(0);
member.getBody().initFieldMember().setType(type);
loader.load(node);
loader.load(node, true);
}
bool canUpgradeToData(const schema::Type::Reader& type) {
......@@ -959,7 +984,7 @@ private:
// =======================================================================================
_::RawSchema* SchemaLoader::Impl::load(const schema::Node::Reader& reader) {
_::RawSchema* SchemaLoader::Impl::load(const schema::Node::Reader& reader, bool isPlaceholder) {
// Make a copy of the node which can be used unchecked.
size_t size = reader.totalSizeInWords() + 1;
kj::ArrayPtr<word> validated = arena.allocateArray<word>(size);
......@@ -979,68 +1004,110 @@ _::RawSchema* SchemaLoader::Impl::load(const schema::Node::Reader& reader) {
// Check if we already have a schema for this ID.
_::RawSchema*& slot = schemas[validatedReader.getId()];
bool shouldReplace;
if (slot == nullptr) {
// Nope, allocate a new RawSchema.
slot = &arena.allocate<_::RawSchema>();
slot->id = validatedReader.getId();
slot->canCastTo = nullptr;
shouldReplace = true;
} else {
// Yes, check if it is compatible and figure out which schema is newer.
if (slot->lazyInitializer == nullptr) {
// The existing slot is not a placeholder, so whether we overwrite it or not, we cannot
// end up with a placeholder.
isPlaceholder = false;
}
auto existing = readMessageUnchecked<schema::Node>(slot->encodedNode);
CompatibilityChecker checker(*this);
if (!checker.shouldReplace(existing, validatedReader, false)) {
// The new schema does not appear to be any newer than the existing one, so keep the existing.
return slot;
}
// Prefer to replace the existing schema if the existing schema is a placeholder. Otherwise,
// prefer to keep the existing schema.
shouldReplace = checker.shouldReplace(
existing, validatedReader, slot->lazyInitializer != nullptr);
}
// Initialize the RawSchema.
slot->encodedNode = validated.begin();
slot->dependencies = validator.makeDependencyArray(&slot->dependencyCount);
slot->membersByName = validator.makeMemberInfoArray(&slot->memberCount);
if (shouldReplace) {
// Initialize the RawSchema.
slot->encodedNode = validated.begin();
slot->dependencies = validator.makeDependencyArray(&slot->dependencyCount);
slot->membersByName = validator.makeMemberInfoArray(&slot->memberCount);
}
if (isPlaceholder) {
slot->lazyInitializer = &initializer;
} else {
// If this schema is not newly-allocated, it may already be in the wild, specifically in the
// dependency list of other schemas. Once the initializer is null, it is live, so we must do
// a release-store here.
__atomic_store_n(&slot->lazyInitializer, nullptr, __ATOMIC_RELEASE);
}
return slot;
}
_::RawSchema* SchemaLoader::Impl::loadNative(const _::RawSchema* nativeSchema) {
auto reader = readMessageUnchecked<schema::Node>(nativeSchema->encodedNode);
_::RawSchema*& slot = schemas[reader.getId()];
_::RawSchema*& slot = schemas[nativeSchema->id];
bool shouldReplace;
if (slot == nullptr) {
slot = &arena.allocate<_::RawSchema>();
shouldReplace = true;
} else if (slot->canCastTo != nullptr) {
// Already loaded natively, or we're currently in the process of loading natively and there
// was a dependency cycle.
KJ_REQUIRE(slot->canCastTo == nativeSchema,
"two different compiled-in type have the same type ID",
reader.getId(), reader.getDisplayName(),
nativeSchema->id,
readMessageUnchecked<schema::Node>(nativeSchema->encodedNode).getDisplayName(),
readMessageUnchecked<schema::Node>(slot->canCastTo->encodedNode).getDisplayName());
// Already loaded.
return slot;
} else {
auto existing = readMessageUnchecked<schema::Node>(slot->encodedNode);
auto native = readMessageUnchecked<schema::Node>(nativeSchema->encodedNode);
CompatibilityChecker checker(*this);
if (!checker.shouldReplace(existing, native, true)) {
// The existing schema is newer, so just make sure the dependencies are loaded.
slot->canCastTo = nativeSchema;
for (uint i = 0; i < nativeSchema->dependencyCount; i++) {
loadNative(nativeSchema->dependencies[i]);
}
return slot;
}
shouldReplace = checker.shouldReplace(existing, native, true);
}
// Set the slot to a copy of the native schema.
*slot = *nativeSchema;
// Since we recurse below, the slot in the hash map could move around. Copy out the pointer
// for subsequent use.
_::RawSchema* result = slot;
if (shouldReplace) {
// Set the schema to a copy of the native schema.
*kj::implicitCast<_::RawSchema*>(result) = *nativeSchema;
// Indicate that casting is safe. Note that it's important to set this before recursively
// loading dependencies, so that cycles don't cause infinite loops!
result->canCastTo = nativeSchema;
// Except that we need to set the dependency list to point at other loader-owned RawSchemas.
kj::ArrayPtr<const _::RawSchema*> dependencies =
arena.allocateArray<const _::RawSchema*>(result->dependencyCount);
for (uint i = 0; i < nativeSchema->dependencyCount; i++) {
dependencies[i] = loadNative(nativeSchema->dependencies[i]);
}
result->dependencies = dependencies.begin();
} else {
// The existing schema is newer.
// Indicate that casting is safe.
slot->canCastTo = nativeSchema;
// Indicate that casting is safe. Note that it's important to set this before recursively
// loading dependencies, so that cycles don't cause infinite loops!
result->canCastTo = nativeSchema;
// Except that we need to set the dependency list to point at other loader-owned RawSchemas.
kj::ArrayPtr<const _::RawSchema*> dependencies =
arena.allocateArray<const _::RawSchema*>(slot->dependencyCount);
for (uint i = 0; i < nativeSchema->dependencyCount; i++) {
dependencies[i] = loadNative(nativeSchema->dependencies[i]);
// Make sure the dependencies are loaded and compatible.
for (uint i = 0; i < nativeSchema->dependencyCount; i++) {
loadNative(nativeSchema->dependencies[i]);
}
}
slot->dependencies = dependencies.begin();
return slot;
// If this schema is not newly-allocated, it may already be in the wild, specifically in the
// dependency list of other schemas. Once the initializer is null, it is live, so we must do
// a release-store here.
__atomic_store_n(&result->lazyInitializer, nullptr, __ATOMIC_RELEASE);
return result;
}
_::RawSchema* SchemaLoader::Impl::loadEmpty(
......@@ -1063,57 +1130,107 @@ _::RawSchema* SchemaLoader::Impl::loadEmpty(
break;
}
return load(node);
return load(node, true);
}
_::RawSchema* SchemaLoader::Impl::tryGet(uint64_t typeId) const {
SchemaLoader::Impl::TryGetResult SchemaLoader::Impl::tryGet(uint64_t typeId) const {
auto iter = schemas.find(typeId);
if (iter == schemas.end()) {
return nullptr;
return {nullptr, initializer.getCallback()};
} else {
return iter->second;
return {iter->second, initializer.getCallback()};
}
}
kj::Array<Schema> SchemaLoader::Impl::getAllLoaded() const {
kj::Array<Schema> result = kj::heapArray<Schema>(schemas.size());
size_t count = 0;
for (auto& schema: schemas) {
if (schema.second->lazyInitializer == nullptr) ++count;
}
kj::Array<Schema> result = kj::heapArray<Schema>(count);
size_t i = 0;
for (auto& schema: schemas) {
result[i++] = Schema(schema.second);
if (schema.second->lazyInitializer == nullptr) result[i++] = Schema(schema.second);
}
return result;
}
void SchemaLoader::InitializerImpl::init(const _::RawSchema* schema) const {
KJ_IF_MAYBE(c, callback) {
c->load(loader, schema->id);
}
if (schema->lazyInitializer != nullptr) {
// The callback declined to load a schema. We need to disable the initializer so that it
// doesn't get invoked again later, as we can no longer modify this schema once it is in use.
// Lock the loader for read to make sure no one is concurrently loading a replacement for this
// schema node.
auto lock = loader.impl.lockForRead();
// Get the mutable version of the schema.
_::RawSchema* mutableSchema = lock->get()->tryGet(schema->id).schema;
KJ_ASSERT(mutableSchema == schema,
"A schema not belonging to this loader used its initializer.");
// Disable the initializer.
__atomic_store_n(&mutableSchema->lazyInitializer, nullptr, __ATOMIC_RELEASE);
}
}
// =======================================================================================
SchemaLoader::SchemaLoader(): impl(kj::heap<Impl>()) {}
SchemaLoader::SchemaLoader(): impl(kj::heap<Impl>(*this)) {}
SchemaLoader::SchemaLoader(const LazyLoadCallback& callback)
: impl(kj::heap<Impl>(*this, callback)) {}
SchemaLoader::~SchemaLoader() noexcept(false) {}
Schema SchemaLoader::get(uint64_t id) const {
_::RawSchema* raw = impl->tryGet(id);
KJ_REQUIRE(raw != nullptr, "no schema node loaded for id", id);
return Schema(raw);
KJ_IF_MAYBE(result, tryGet(id)) {
return *result;
} else {
KJ_FAIL_REQUIRE("no schema node loaded for id", id);
}
}
kj::Maybe<Schema> SchemaLoader::tryGet(uint64_t id) const {
_::RawSchema* raw = impl->tryGet(id);
if (raw == nullptr) {
return nullptr;
auto getResult = impl.lockForRead()->get()->tryGet(id);
if (getResult.schema == nullptr || getResult.schema->lazyInitializer != nullptr) {
KJ_IF_MAYBE(c, getResult.callback) {
c->load(*this, id);
}
getResult = impl.lockForRead()->get()->tryGet(id);
}
if (getResult.schema != nullptr && getResult.schema->lazyInitializer == nullptr) {
return Schema(getResult.schema);
} else {
return Schema(raw);
return nullptr;
}
}
Schema SchemaLoader::load(const schema::Node::Reader& reader) {
return Schema(impl->load(reader));
return Schema(impl.lock()->get()->load(reader, false));
}
Schema SchemaLoader::loadIfNew(const schema::Node::Reader& reader) const {
auto locked = impl.lock();
auto getResult = locked->get()->tryGet(reader.getId());
if (getResult.schema == nullptr || getResult.schema->lazyInitializer != nullptr) {
// Doesn't exist yet, or the existing schema is a placeholder and therefore has not yet been
// seen publicly. Go ahead and load the incoming reader.
return Schema(locked->get()->load(reader, false));
} else {
return Schema(getResult.schema);
}
}
kj::Array<Schema> SchemaLoader::getAllLoaded() const {
return impl->getAllLoaded();
return impl.lockForRead()->get()->getAllLoaded();
}
void SchemaLoader::loadNative(const _::RawSchema* nativeSchema) {
impl->loadNative(nativeSchema);
impl.lock()->get()->loadNative(nativeSchema);
}
} // namespace capnp
......@@ -26,12 +26,34 @@
#include "schema.h"
#include <kj/memory.h>
#include <kj/mutex.h>
namespace capnp {
class SchemaLoader {
public:
class LazyLoadCallback {
public:
virtual void load(const SchemaLoader& loader, uint64_t id) const = 0;
// Request that the schema node with the given ID be loaded into the given SchemaLoader. If
// the callback is able to find a schema for this ID, it should invoke `loadIfNew()` on
// `loader` to load it. If no such node exists, it should simply do nothing and return.
//
// The callback is allowed to load schema nodes other than the one requested, e.g. because it
// expects they will be needed soon.
//
// If the `SchemaLoader` is used from multiple threads, the callback must be thread-safe.
// In particular, it's possible for multiple threads to invoke `load()` with the same ID.
// If the callback performs a large amount of work to look up IDs, it should be sure to
// de-dup these requests.
};
SchemaLoader();
SchemaLoader(const LazyLoadCallback& callback);
// Construct a SchemaLoader which will invoke the given callback when a schema node is requested
// that isn't already loaded.
~SchemaLoader() noexcept(false);
KJ_DISALLOW_COPY(SchemaLoader);
......@@ -79,6 +101,11 @@ public:
// Also note that unknown types are not considered invalid. Instead, the dynamic API returns
// a DynamicValue with type UNKNOWN for these.
Schema loadIfNew(const schema::Node::Reader& reader) const;
// Like `load()` but does nothing if a schema with the same ID is already loaded. In contrast,
// `load()` would attempt to compare the schemas and take the newer one. `loadIfNew()` is safe
// to call even while concurrently using schemas from this loader.
template <typename T>
void loadCompiledTypeAndDependencies();
// Load the schema for the given compiled-in type and all of its dependencies.
......@@ -96,7 +123,8 @@ private:
class Validator;
class CompatibilityChecker;
class Impl;
kj::Own<Impl> impl;
class InitializerImpl;
kj::MutexGuarded<kj::Own<Impl>> impl;
void loadNative(const _::RawSchema* nativeSchema);
};
......
......@@ -38,11 +38,12 @@ Schema Schema::getDependency(uint64_t id) const {
while (lower < upper) {
uint mid = (lower + upper) / 2;
Schema candidate(raw->dependencies[mid]);
const _::RawSchema* candidate = raw->dependencies[mid];
uint64_t candidateId = candidate.getProto().getId();
uint64_t candidateId = candidate->id;
if (candidateId == id) {
return candidate;
candidate->ensureInitialized();
return Schema(candidate);
} else if (candidateId < id) {
lower = mid + 1;
} else {
......
......@@ -86,7 +86,10 @@ public:
private:
const _::RawSchema* raw;
inline explicit Schema(const _::RawSchema* raw): raw(raw) {}
inline explicit Schema(const _::RawSchema* raw): raw(raw) {
KJ_IREQUIRE(raw->lazyInitializer == nullptr,
"Must call ensureInitialized() on RawSchema before constructing Schema.");
}
template <typename T> static inline Schema fromImpl() {
return Schema(&_::rawSchema<T>());
......
......@@ -50,15 +50,15 @@ Mutex::~Mutex() {
KJ_PTHREAD_CLEANUP(pthread_rwlock_destroy(&mutex));
}
void Mutex::lock() noexcept {
void Mutex::lock() {
KJ_PTHREAD_CALL(pthread_rwlock_wrlock(&mutex));
}
void Mutex::readLock() noexcept {
void Mutex::readLock() {
KJ_PTHREAD_CALL(pthread_rwlock_rdlock(&mutex));
}
void Mutex::unlock(bool lockedForRead) noexcept {
void Mutex::unlock(bool lockedForRead) {
KJ_PTHREAD_CALL(pthread_rwlock_unlock(&mutex));
}
......
......@@ -47,9 +47,9 @@ public:
~Mutex();
KJ_DISALLOW_COPY(Mutex);
void lock() noexcept;
void readLock() noexcept;
void unlock(bool lockedForRead) noexcept;
void lock();
void readLock();
void unlock(bool lockedForRead);
private:
mutable pthread_rwlock_t mutex;
......
......@@ -69,8 +69,8 @@ static const ::capnp::_::RawSchema::MemberInfo m_{{schemaId}}[] = {
{{/schemaMembersByName}}
};
const ::capnp::_::RawSchema s_{{schemaId}} = {
b_{{schemaId}}.words, d_{{schemaId}}, m_{{schemaId}},
{{schemaDependencyCount}}, {{schemaMemberCount}}, nullptr
0x{{schemaId}}, b_{{schemaId}}.words, d_{{schemaId}}, m_{{schemaId}},
{{schemaDependencyCount}}, {{schemaMemberCount}}, nullptr, nullptr
};
{{/typeSchema}}
{{/fileTypes}}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment