Commit d9e688ac authored by gejun's avatar gejun

IOBuf supports cut into IWriter and append from IReader

parent de3dcc2e
......@@ -957,6 +957,30 @@ ssize_t IOBuf::pcut_into_file_descriptor(int fd, off_t offset, size_t size_hint)
return nw;
}
ssize_t IOBuf::cut_into_writer(IWriter* writer, size_t size_hint) {
if (empty()) {
return 0;
}
const size_t nref = std::min(_ref_num(), IOBUF_IOV_MAX);
struct iovec vec[nref];
size_t nvec = 0;
size_t cur_len = 0;
do {
IOBuf::BlockRef const& r = _ref_at(nvec);
vec[nvec].iov_base = r.block->data + r.offset;
vec[nvec].iov_len = r.length;
++nvec;
cur_len += r.length;
} while (nvec < nref && cur_len < size_hint);
const ssize_t nw = writer->WriteV(vec, nvec);
if (nw > 0) {
pop_front(nw);
}
return nw;
}
ssize_t IOBuf::cut_into_SSL_channel(SSL* ssl, int* ssl_error) {
*ssl_error = SSL_ERROR_NONE;
if (empty()) {
......@@ -1057,6 +1081,41 @@ ssize_t IOBuf::pcut_multiple_into_file_descriptor(
return nw;
}
ssize_t IOBuf::cut_multiple_into_writer(
IWriter* writer, IOBuf* const* pieces, size_t count) {
if (BAIDU_UNLIKELY(count == 0)) {
return 0;
}
if (1UL == count) {
return pieces[0]->cut_into_writer(writer);
}
struct iovec vec[IOBUF_IOV_MAX];
size_t nvec = 0;
for (size_t i = 0; i < count; ++i) {
const IOBuf* p = pieces[i];
const size_t nref = p->_ref_num();
for (size_t j = 0; j < nref && nvec < IOBUF_IOV_MAX; ++j, ++nvec) {
IOBuf::BlockRef const& r = p->_ref_at(j);
vec[nvec].iov_base = r.block->data + r.offset;
vec[nvec].iov_len = r.length;
}
}
const ssize_t nw = writer->WriteV(vec, nvec);
if (nw <= 0) {
return nw;
}
size_t npop_all = nw;
for (size_t i = 0; i < count; ++i) {
npop_all -= pieces[i]->pop_front(npop_all);
if (npop_all == 0) {
break;
}
}
return nw;
}
void IOBuf::append(const IOBuf& other) {
const size_t nref = other._ref_num();
for (size_t i = 0; i < nref; ++i) {
......@@ -1622,6 +1681,62 @@ ssize_t IOPortal::pappend_from_file_descriptor(
return nr;
}
ssize_t IOPortal::append_from_reader(IReader* reader, size_t max_count) {
iovec vec[MAX_APPEND_IOVEC];
int nvec = 0;
size_t space = 0;
Block* prev_p = NULL;
Block* p = _block;
// Prepare at most MAX_APPEND_IOVEC blocks or space of blocks >= max_count
do {
if (p == NULL) {
p = iobuf::acquire_tls_block();
if (BAIDU_UNLIKELY(!p)) {
errno = ENOMEM;
return -1;
}
if (prev_p != NULL) {
prev_p->portal_next = p;
} else {
_block = p;
}
}
vec[nvec].iov_base = p->data + p->size;
vec[nvec].iov_len = std::min(p->left_space(), max_count - space);
space += vec[nvec].iov_len;
++nvec;
if (space >= max_count || nvec >= MAX_APPEND_IOVEC) {
break;
}
prev_p = p;
p = p->portal_next;
} while (1);
const ssize_t nr = reader->ReadV(vec, nvec);
if (nr <= 0) { // -1 or 0
if (empty()) {
return_cached_blocks();
}
return nr;
}
size_t total_len = nr;
do {
const size_t len = std::min(total_len, _block->left_space());
total_len -= len;
const IOBuf::BlockRef r = { _block->size, (uint32_t)len, _block };
_push_back_ref(r);
_block->size += len;
if (_block->full()) {
Block* const saved_next = _block->portal_next;
_block->dec_ref(); // _block may be deleted
_block = saved_next;
}
} while (total_len);
return nr;
}
ssize_t IOPortal::append_from_SSL_channel(
SSL* ssl, int* ssl_error, size_t max_count) {
size_t nr = 0;
......
......@@ -28,6 +28,7 @@
#include "butil/third_party/snappy/snappy-sinksource.h"
#include "butil/zero_copy_stream_as_streambuf.h"
#include "butil/macros.h"
#include "butil/reader_writer.h"
// For IOBuf::appendv(const const_iovec*, size_t). The only difference of this
// struct from iovec (defined in sys/uio.h) is that iov_base is `const void*'
......@@ -143,6 +144,10 @@ public:
// std::string version, `delim' could be binary
int cut_until(IOBuf* out, const std::string& delim);
// Cut at most `size_hint' bytes(approximately) into the writer
// Returns bytes cut on success, -1 otherwise and errno is set.
ssize_t cut_into_writer(IWriter* writer, size_t size_hint = 1024*1024);
// Cut at most `size_hint' bytes(approximately) into the file descriptor
// Returns bytes cut on success, -1 otherwise and errno is set.
ssize_t cut_into_file_descriptor(int fd, size_t size_hint = 1024*1024);
......@@ -163,12 +168,12 @@ public:
// and the ssl error code will be filled into `ssl_error'
ssize_t cut_into_SSL_channel(struct ssl_st* ssl, int* ssl_error);
// Cut `count' number of `pieces' into SSL channel `ssl'.
// Cut `count' number of `pieces' into the writer.
// Returns bytes cut on success, -1 otherwise and errno is set.
static ssize_t cut_multiple_into_SSL_channel(
struct ssl_st* ssl, IOBuf* const* pieces, size_t count, int* ssl_error);
static ssize_t cut_multiple_into_writer(
IWriter* writer, IOBuf* const* pieces, size_t count);
// Cut `count' number of `pieces' into file descriptor `fd'.
// Cut `count' number of `pieces' into the file descriptor.
// Returns bytes cut on success, -1 otherwise and errno is set.
static ssize_t cut_multiple_into_file_descriptor(
int fd, IOBuf* const* pieces, size_t count);
......@@ -182,6 +187,11 @@ public:
static ssize_t pcut_multiple_into_file_descriptor(
int fd, off_t offset, IOBuf* const* pieces, size_t count);
// Cut `count' number of `pieces' into SSL channel `ssl'.
// Returns bytes cut on success, -1 otherwise and errno is set.
static ssize_t cut_multiple_into_SSL_channel(
struct ssl_st* ssl, IOBuf* const* pieces, size_t count, int* ssl_error);
// Append another IOBuf to back side, payload of the IOBuf is shared
// rather than copied.
void append(const IOBuf& other);
......@@ -427,6 +437,9 @@ public:
~IOPortal();
IOPortal& operator=(const IOPortal& rhs);
// Read at most `max_count' bytes from the reader and append to self.
ssize_t append_from_reader(IReader* reader, size_t max_count);
// Read at most `max_count' bytes from file descriptor `fd' and
// append to self.
ssize_t append_from_file_descriptor(int fd, size_t max_count);
......
// Copyright (c) 2018 Bilibili, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Author: Ge,Jun (gejun@bilibili.com)
// Date: Wed Aug 8 05:51:33 PDT 2018
#ifndef BUTIL_READER_WRITER_H
#define BUTIL_READER_WRITER_H
#include <sys/uio.h> // iovec
namespace butil {
// Abstraction for reading data.
// The simplest implementation is to embed a file descriptor and read from it.
class IReader {
public:
// Semantics of parameters and return value are same as readv(2) except that
// there's no `fd'.
virtual ssize_t ReadV(const iovec* iov, int iovcnt) = 0;
};
// Abstraction for writing data.
// The simplest implementation is to embed a file descriptor and writev into it.
class IWriter {
public:
// Semantics of parameters and return value are same as writev(2) except that
// there's no `fd'.
// WriteV is required to submit data gathered by multiple appends in one
// run and enable the possibility of atomic writes.
virtual ssize_t WriteV(const iovec* iov, int iovcnt) = 0;
};
} // namespace butil
#endif // BUTIL_READER_WRITER_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment