Commit d10f8eab authored by Kenton Varda's avatar Kenton Varda

Don't queue reads/writes in TLS stream.

This is definitely wrong, because it makes the read/write tasks impossible to cancel and possibly makes them outlive the buffers they are reading from / writing to.

I have no idea why I wrote the code this way, TBQH.
parent fd7a63ca
...@@ -162,40 +162,31 @@ public: ...@@ -162,40 +162,31 @@ public:
} }
kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override { kj::Promise<size_t> tryRead(void* buffer, size_t minBytes, size_t maxBytes) override {
readQueue = readQueue.addBranch().then([this,buffer,minBytes,maxBytes](size_t) { return tryReadInternal(buffer, minBytes, maxBytes, 0);
return tryReadInternal(buffer, minBytes, maxBytes, 0);
}).fork();
return readQueue.addBranch();
} }
Promise<void> write(const void* buffer, size_t size) override { Promise<void> write(const void* buffer, size_t size) override {
writeQueue = writeQueue.addBranch().then([this,buffer,size]() { return writeInternal(kj::arrayPtr(reinterpret_cast<const byte*>(buffer), size), nullptr);
return writeInternal(kj::arrayPtr(reinterpret_cast<const byte*>(buffer), size), nullptr);
}).fork();
return writeQueue.addBranch();
} }
Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override { Promise<void> write(ArrayPtr<const ArrayPtr<const byte>> pieces) override {
if (pieces.size() > 0) { return writeInternal(pieces[0], pieces.slice(1, pieces.size()));
writeQueue = writeQueue.addBranch().then([this,pieces]() {
return writeInternal(pieces[0], pieces.slice(1, pieces.size()));
}).fork();
}
return writeQueue.addBranch();
} }
void shutdownWrite() override { void shutdownWrite() override {
KJ_REQUIRE(shutdownTask == nullptr, "already called shutdownWrite()");
// TODO(soon): shutdownWrite() is problematic because it doesn't return a promise. It was // TODO(soon): shutdownWrite() is problematic because it doesn't return a promise. It was
// designed to assume that it would only be called after all writes are finished and that // designed to assume that it would only be called after all writes are finished and that
// there was no reason to block at that point, but SSL sessions don't fit this since they // there was no reason to block at that point, but SSL sessions don't fit this since they
// actually have to send a shutdown message. // actually have to send a shutdown message.
writeQueue = writeQueue.addBranch().then([this]() { shutdownTask = sslCall([this]() {
sslCall([this]() { // The first SSL_shutdown() call is expected to return 0 and may flag a misleading error.
// The first SSL_shutdown() call is expected to return 0 and may flag a misleading error. int result = SSL_shutdown(ssl);
int result = SSL_shutdown(ssl); return result == 0 ? 1 : result;
return result == 0 ? 1 : result; }).ignoreResult().eagerlyEvaluate([](kj::Exception&& e) {
}); KJ_LOG(ERROR, e);
}).fork(); });
} }
void abortRead() override { void abortRead() override {
...@@ -222,8 +213,7 @@ private: ...@@ -222,8 +213,7 @@ private:
kj::Own<kj::AsyncIoStream> ownInner; kj::Own<kj::AsyncIoStream> ownInner;
bool disconnected = false; bool disconnected = false;
kj::ForkedPromise<size_t> readQueue = kj::Promise<size_t>(size_t(0)).fork(); kj::Maybe<kj::Promise<void>> shutdownTask;
kj::ForkedPromise<void> writeQueue = kj::Promise<void>(kj::READY_NOW).fork();
ReadyInputStreamWrapper readBuffer; ReadyInputStreamWrapper readBuffer;
ReadyOutputStreamWrapper writeBuffer; ReadyOutputStreamWrapper writeBuffer;
...@@ -245,6 +235,8 @@ private: ...@@ -245,6 +235,8 @@ private:
Promise<void> writeInternal(kj::ArrayPtr<const byte> first, Promise<void> writeInternal(kj::ArrayPtr<const byte> first,
kj::ArrayPtr<const kj::ArrayPtr<const byte>> rest) { kj::ArrayPtr<const kj::ArrayPtr<const byte>> rest) {
KJ_REQUIRE(shutdownTask == nullptr, "already called shutdownWrite()");
return sslCall([this,first]() { return SSL_write(ssl, first.begin(), first.size()); }) return sslCall([this,first]() { return SSL_write(ssl, first.begin(), first.size()); })
.then([this,first,rest](size_t n) -> kj::Promise<void> { .then([this,first,rest](size_t n) -> kj::Promise<void> {
if (n < first.size()) { if (n < first.size()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment