Commit 65b4f247 authored by Kenton Varda's avatar Kenton Varda

Extend AsyncCapabilityStream to support sending FDs with a message.

Previously, FDs could only be sent separately from data, with the underlying implementation actually sending a dummy one-byte message.

For Cap'n Proto FD passing, it's better if we can attach the FDs to the RPC message they arrived with, because it avoids the need to pre-negotiate where FD passing is supported: if the sender sends FDs but the recipient doesn't arrange to receive them, the FDs will be discarded and closed automatically by the OS. Whereas, if we are sending separate one-byte messages, the recipient needs to know what to do with those.
parent ce27fd77
......@@ -233,6 +233,41 @@ TEST(AsyncIo, CapabilityPipe) {
EXPECT_EQ("bar", result);
EXPECT_EQ("foo", result2);
}
TEST(AsyncIo, CapabilityPipeMultiStreamMessage) {
auto ioContext = setupAsyncIo();
auto pipe = ioContext.provider->newCapabilityPipe();
auto pipe2 = ioContext.provider->newCapabilityPipe();
auto pipe3 = ioContext.provider->newCapabilityPipe();
auto streams = heapArrayBuilder<Own<AsyncCapabilityStream>>(2);
streams.add(kj::mv(pipe2.ends[0]));
streams.add(kj::mv(pipe3.ends[0]));
ArrayPtr<const byte> secondBuf = "bar"_kj.asBytes();
pipe.ends[0]->writeWithStreams("foo"_kj.asBytes(), arrayPtr(&secondBuf, 1), streams.finish())
.wait(ioContext.waitScope);
char receiveBuffer[7];
Own<AsyncCapabilityStream> receiveStreams[3];
auto result = pipe.ends[1]->tryReadWithStreams(receiveBuffer, 6, 7, receiveStreams, 3)
.wait(ioContext.waitScope);
KJ_EXPECT(result.byteCount == 6);
receiveBuffer[6] = '\0';
KJ_EXPECT(kj::StringPtr(receiveBuffer) == "foobar");
KJ_ASSERT(result.capCount == 2);
receiveStreams[0]->write("baz", 3).wait(ioContext.waitScope);
receiveStreams[0] = nullptr;
KJ_EXPECT(pipe2.ends[1]->readAllText().wait(ioContext.waitScope) == "baz");
pipe3.ends[1]->write("qux", 3).wait(ioContext.waitScope);
pipe3.ends[1] = nullptr;
KJ_EXPECT(receiveStreams[1]->readAllText().wait(ioContext.waitScope) == "qux");
}
#endif
TEST(AsyncIo, PipeThread) {
......
This diff is collapsed.
......@@ -1804,6 +1804,25 @@ Tee newTee(Own<AsyncInputStream> input, uint64_t limit) {
return { { mv(branch1), mv(branch2) } };
}
Promise<void> AsyncCapabilityStream::writeWithFds(
ArrayPtr<const byte> data, ArrayPtr<const ArrayPtr<const byte>> moreData,
ArrayPtr<const AutoCloseFd> fds) {
// HACK: AutoCloseFd actually contains an `int` under the hood. We can reinterpret_cast to avoid
// unnecessary memory allocation.
static_assert(sizeof(AutoCloseFd) == sizeof(int));
auto intArray = arrayPtr(reinterpret_cast<const int*>(fds.begin()), fds.size());
// Be extra-paranoid about aliasing rules by injecting a compiler barrier here. Probably
// not necessary but also probably doesn't hurt.
#if _MSC_VER
_ReadWriteBarrier();
#else
__asm__ __volatile__("": : :"memory");
#endif
return writeWithFds(data, moreData, intArray);
}
Promise<Own<AsyncCapabilityStream>> AsyncCapabilityStream::receiveStream() {
return tryReceiveStream()
.then([](Maybe<Own<AsyncCapabilityStream>>&& result)
......@@ -1816,6 +1835,35 @@ Promise<Own<AsyncCapabilityStream>> AsyncCapabilityStream::receiveStream() {
});
}
kj::Promise<Maybe<Own<AsyncCapabilityStream>>> AsyncCapabilityStream::tryReceiveStream() {
struct ResultHolder {
byte b;
Own<AsyncCapabilityStream> stream;
};
auto result = kj::heap<ResultHolder>();
auto promise = tryReadWithStreams(&result->b, 1, 1, &result->stream, 1);
return promise.then([result = kj::mv(result)](ReadResult actual) mutable
-> Maybe<Own<AsyncCapabilityStream>> {
if (actual.byteCount == 0) {
return nullptr;
}
KJ_REQUIRE(actual.capCount == 1,
"expected to receive a capability (e.g. file descirptor via SCM_RIGHTS), but didn't") {
return nullptr;
}
return kj::mv(result->stream);
});
}
Promise<void> AsyncCapabilityStream::sendStream(Own<AsyncCapabilityStream> stream) {
static constexpr byte b = 0;
auto streams = kj::heapArray<Own<AsyncCapabilityStream>>(1);
streams[0] = kj::mv(stream);
return writeWithStreams(arrayPtr(&b, 1), nullptr, kj::mv(streams));
}
Promise<AutoCloseFd> AsyncCapabilityStream::receiveFd() {
return tryReceiveFd().then([](Maybe<AutoCloseFd>&& result) -> Promise<AutoCloseFd> {
KJ_IF_MAYBE(r, result) {
......@@ -1825,11 +1873,35 @@ Promise<AutoCloseFd> AsyncCapabilityStream::receiveFd() {
}
});
}
Promise<Maybe<AutoCloseFd>> AsyncCapabilityStream::tryReceiveFd() {
return KJ_EXCEPTION(UNIMPLEMENTED, "this stream cannot receive file descriptors");
kj::Promise<kj::Maybe<AutoCloseFd>> AsyncCapabilityStream::tryReceiveFd() {
struct ResultHolder {
byte b;
AutoCloseFd fd;
};
auto result = kj::heap<ResultHolder>();
auto promise = tryReadWithFds(&result->b, 1, 1, &result->fd, 1);
return promise.then([result = kj::mv(result)](ReadResult actual) mutable
-> Maybe<AutoCloseFd> {
if (actual.byteCount == 0) {
return nullptr;
}
KJ_REQUIRE(actual.capCount == 1,
"expected to receive a file descriptor (e.g. via SCM_RIGHTS), but didn't") {
return nullptr;
}
return kj::mv(result->fd);
});
}
Promise<void> AsyncCapabilityStream::sendFd(int fd) {
return KJ_EXCEPTION(UNIMPLEMENTED, "this stream cannot send file descriptors");
static constexpr byte b = 0;
auto fds = kj::heapArray<int>(1);
fds[0] = fd;
auto promise = writeWithFds(arrayPtr(&b, 1), nullptr, fds);
return promise.attach(kj::mv(fds));
}
void AsyncIoStream::getsockopt(int level, int option, void* value, uint* length) {
......
......@@ -175,15 +175,57 @@ class AsyncCapabilityStream: public AsyncIoStream {
// broker, or in terms of direct handle passing if at least one process trusts the other.
public:
virtual Promise<void> writeWithFds(ArrayPtr<const byte> data,
ArrayPtr<const ArrayPtr<const byte>> moreData,
ArrayPtr<const int> fds) = 0;
Promise<void> writeWithFds(ArrayPtr<const byte> data,
ArrayPtr<const ArrayPtr<const byte>> moreData,
ArrayPtr<const AutoCloseFd> fds);
// Write some data to the stream with some file descirptors attached to it.
//
// The maximum number of FDs that can be sent at a time is usually subject to an OS-imposed
// limit. On Linux, this is 253. In practice, sending more than a handful of FDs at once is
// probably a bad idea.
struct ReadResult {
size_t byteCount;
size_t capCount;
};
virtual Promise<ReadResult> tryReadWithFds(void* buffer, size_t minBytes, size_t maxBytes,
AutoCloseFd* fdBuffer, size_t maxFds) = 0;
// Read data from the stream that may have file descriptors attached. Any attached descriptors
// will be added to `fds`. If multiple bundles of FDs are encountered in the course of reading
// the amount of data requested by minBytes/maxBytes, then they will be concatenated. If more FDs
// are received than fit in the buffer, then the excess will be discarded and closed -- this
// behavior, while ugly, is important to defend against denial-of-service attacks that may fill
// up the FD table with garbage. Applications must think carefully about how many FDs they really
// need to receive at once and set a well-defined limit.
virtual Promise<void> writeWithStreams(ArrayPtr<const byte> data,
ArrayPtr<const ArrayPtr<const byte>> moreData,
Array<Own<AsyncCapabilityStream>> streams) = 0;
virtual Promise<ReadResult> tryReadWithStreams(
void* buffer, size_t minBytes, size_t maxBytes,
Own<AsyncCapabilityStream>* streamBuffer, size_t maxStreams) = 0;
// Like above, but passes AsyncCapabilityStream objects. The stream implementations must be from
// the same AsyncIoProvider.
// ---------------------------------------------------------------------------
// Helpers for sending individual capabilities.
//
// These are equivalent to the above methods with the constraint that only one FD is
// sent/received at a time and the corresponding data is a single zero-valued byte.
Promise<Own<AsyncCapabilityStream>> receiveStream();
virtual Promise<Maybe<Own<AsyncCapabilityStream>>> tryReceiveStream() = 0;
virtual Promise<void> sendStream(Own<AsyncCapabilityStream> stream) = 0;
// Transfer a stream.
Promise<Maybe<Own<AsyncCapabilityStream>>> tryReceiveStream();
Promise<void> sendStream(Own<AsyncCapabilityStream> stream);
// Transfer a single stream.
Promise<AutoCloseFd> receiveFd();
virtual Promise<Maybe<AutoCloseFd>> tryReceiveFd();
virtual Promise<void> sendFd(int fd);
// Transfer a raw file descriptor. Default implementation throws UNIMPLEMENTED.
Promise<Maybe<AutoCloseFd>> tryReceiveFd();
Promise<void> sendFd(int fd);
// Transfer a single raw file descriptor.
};
struct OneWayPipe {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment