Commit 3b4b0e6a authored by gejun's avatar gejun

Add IOBufCutter

parent 6840e36d
......@@ -312,12 +312,6 @@ inline IOBuf::Block* create_block() {
// release_tls_block_chain() may exceed this limit sometimes.
const int MAX_BLOCKS_PER_THREAD = 8;
// NOTE: not see differences in examples when CACHE_IOBUF_BLOCKREFS is turned on
// (tcmalloc linked)
#ifdef CACHE_IOBUF_BLOCKREFS
const int MAX_BLOCKREFS_PER_THREAD = 8;
#endif
struct TLSData {
// Head of the TLS block chain.
IOBuf::Block* block_head;
......@@ -327,19 +321,9 @@ struct TLSData {
// True if the remote_tls_block_chain is registered to the thread.
bool registered;
#ifdef CACHE_IOBUF_BLOCKREFS
// Reuse array of BlockRef
int num_blockrefs;
IOBuf::BlockRef* blockrefs[MAX_BLOCKREFS_PER_THREAD];
#endif
};
#ifdef CACHE_IOBUF_BLOCKREFS
static __thread TLSData g_tls_data = { NULL, 0, false, 0, {} };
#else
static __thread TLSData g_tls_data = { NULL, 0, false };
#endif
// Used in UT
IOBuf::Block* get_tls_block_head() { return g_tls_data.block_head; }
......@@ -477,37 +461,16 @@ IOBuf::Block* acquire_tls_block() {
return b;
}
inline IOBuf::BlockRef* acquire_blockref_array() {
#ifdef CACHE_IOBUF_BLOCKREFS
TLSData& tls_data = g_tls_data;
if (tls_data.num_blockrefs) {
return tls_data.blockrefs[--tls_data.num_blockrefs];
}
#endif
iobuf::g_newbigview.fetch_add(1, butil::memory_order_relaxed);
return new IOBuf::BlockRef[IOBuf::INITIAL_CAP];
}
inline IOBuf::BlockRef* acquire_blockref_array(size_t cap) {
#ifdef CACHE_IOBUF_BLOCKREFS
if (cap == IOBuf::INITIAL_CAP) {
return acquire_blockref_array();
}
#endif
iobuf::g_newbigview.fetch_add(1, butil::memory_order_relaxed);
return new IOBuf::BlockRef[cap];
}
inline IOBuf::BlockRef* acquire_blockref_array() {
return acquire_blockref_array(IOBuf::INITIAL_CAP);
}
inline void release_blockref_array(IOBuf::BlockRef* refs, size_t cap) {
#ifdef CACHE_IOBUF_BLOCKREFS
if (cap == IOBuf::INITIAL_CAP) {
TLSData& tls_data = g_tls_data;
if (tls_data.num_blockrefs < MAX_BLOCKREFS_PER_THREAD) {
tls_data.blockrefs[tls_data.num_blockrefs++] = refs;
return;
}
}
#endif
delete[] refs;
}
......@@ -668,10 +631,13 @@ void IOBuf::_push_or_move_back_ref_to_bigview(const BlockRef& r) {
template void IOBuf::_push_or_move_back_ref_to_bigview<true>(const BlockRef&);
template void IOBuf::_push_or_move_back_ref_to_bigview<false>(const BlockRef&);
int IOBuf::_pop_front_ref() {
template <bool MOVEOUT>
int IOBuf::_pop_or_moveout_front_ref() {
if (_small()) {
if (_sv.refs[0].block != NULL) {
if (!MOVEOUT) {
_sv.refs[0].block->dec_ref();
}
_sv.refs[0] = _sv.refs[1];
reset_block_ref(_sv.refs[1]);
return 0;
......@@ -680,7 +646,9 @@ int IOBuf::_pop_front_ref() {
} else {
// _bv.nref must be greater than 2
const uint32_t start = _bv.start;
if (!MOVEOUT) {
_bv.refs[start].block->dec_ref();
}
if (--_bv.nref > 2) {
_bv.start = (start + 1) & _bv.cap_mask;
_bv.nbytes -= _bv.refs[start].length;
......@@ -694,6 +662,9 @@ int IOBuf::_pop_front_ref() {
return 0;
}
}
// Explicitly initialize templates.
template int IOBuf::_pop_or_moveout_front_ref<true>();
template int IOBuf::_pop_or_moveout_front_ref<false>();
int IOBuf::_pop_back_ref() {
if (_small()) {
......@@ -768,12 +739,12 @@ size_t IOBuf::pop_front(size_t n) {
return saved_n;
}
bool IOBuf::cut1(char* c) {
bool IOBuf::cut1(void* c) {
if (empty()) {
return false;
}
IOBuf::BlockRef &r = _front_ref();
*c = r.block->data[r.offset];
*(char*)c = r.block->data[r.offset];
if (r.length > 1) {
++r.offset;
--r.length;
......@@ -817,9 +788,9 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) {
while (n) { // length() == 0 does not enter
IOBuf::BlockRef &r = _front_ref();
if (r.length <= n) {
out->_push_back_ref(r);
n -= r.length;
_pop_front_ref();
out->_move_back_ref(r);
_moveout_front_ref();
} else {
const IOBuf::BlockRef cr = { r.offset, (uint32_t)n, r.block };
out->_push_back_ref(cr);
......@@ -872,8 +843,7 @@ size_t IOBuf::cutn(std::string* out, size_t n) {
}
const size_t old_size = out->size();
out->resize(out->size() + n);
cutn(&out[0][old_size], n);
return n;
return cutn(&(*out)[old_size], n);
}
int IOBuf::_cut_by_char(IOBuf* out, char d) {
......@@ -1412,10 +1382,10 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos) const {
size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const {
const size_t len = length();
if (n + pos > len) {
if (len <= pos) {
return 0;
}
if (n > len - pos) { // note: n + pos may overflow
n = len - pos;
}
s->resize(n);
......@@ -1424,10 +1394,10 @@ size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const {
size_t IOBuf::append_to(std::string* s, size_t n, size_t pos) const {
const size_t len = length();
if (n + pos > len) {
if (len <= pos) {
return 0;
}
if (n > len - pos) { // note: n + pos may overflow
n = len - pos;
}
const size_t old_size = s->size();
......@@ -1737,6 +1707,135 @@ void IOPortal::return_cached_blocks_impl(Block* b) {
iobuf::release_tls_block_chain(b);
}
//////////////// IOBufCutter ////////////////
IOBufCutter::IOBufCutter(butil::IOBuf* buf)
: _data(NULL)
, _data_end(NULL)
, _block(NULL)
, _buf(buf) {
}
IOBufCutter::~IOBufCutter() {
if (_block) {
if (_data != _data_end) {
IOBuf::BlockRef& fr = _buf->_front_ref();
CHECK_EQ(fr.block, _block);
fr.offset = (uint32_t)((char*)_data - _block->data);
fr.length = (uint32_t)((char*)_data_end - (char*)_data);
} else {
_buf->_pop_front_ref();
}
}
}
bool IOBufCutter::load_next_ref() {
if (_block) {
_buf->_pop_front_ref();
}
if (!_buf->_ref_num()) {
_data = NULL;
_data_end = NULL;
_block = NULL;
return false;
} else {
const IOBuf::BlockRef& r = _buf->_front_ref();
_data = r.block->data + r.offset;
_data_end = (char*)_data + r.length;
_block = r.block;
return true;
}
}
size_t IOBufCutter::slower_copy_to(void* dst, size_t n) {
size_t size = (char*)_data_end - (char*)_data;
if (size == 0) {
if (!load_next_ref()) {
return 0;
}
size = (char*)_data_end - (char*)_data;
if (n <= size) {
memcpy(dst, _data, n);
return n;
}
}
void* const saved_dst = dst;
memcpy(dst, _data, size);
dst = (char*)dst + size;
n -= size;
const size_t nref = _buf->_ref_num();
for (size_t i = 1; i < nref; ++i) {
const IOBuf::BlockRef& r = _buf->_ref_at(i);
const size_t nc = std::min(n, (size_t)r.length);
memcpy(dst, r.block->data + r.offset, nc);
dst = (char*)dst + nc;
n -= nc;
if (n == 0) {
break;
}
}
return (char*)dst - (char*)saved_dst;
}
size_t IOBufCutter::cutn(butil::IOBuf* out, size_t n) {
if (n == 0) {
return 0;
}
const size_t size = (char*)_data_end - (char*)_data;
if (n <= size) {
const IOBuf::BlockRef r = { (uint32_t)((char*)_data - _block->data),
(uint32_t)n,
_block };
out->_push_back_ref(r);
_data = (char*)_data + n;
return n;
} else if (size != 0) {
const IOBuf::BlockRef r = { (uint32_t)((char*)_data - _block->data),
(uint32_t)size,
_block };
out->_move_back_ref(r);
_buf->_moveout_front_ref();
_data = NULL;
_data_end = NULL;
_block = NULL;
return _buf->cutn(out, n - size) + size;
} else {
if (_block) {
_data = NULL;
_data_end = NULL;
_block = NULL;
_buf->_pop_front_ref();
}
return _buf->cutn(out, n);
}
}
size_t IOBufCutter::cutn(void* out, size_t n) {
if (n == 0) {
return 0;
}
const size_t size = (char*)_data_end - (char*)_data;
if (n <= size) {
memcpy(out, _data, n);
_data = (char*)_data + n;
return n;
} else if (size != 0) {
memcpy(out, _data, size);
_buf->_pop_front_ref();
_data = NULL;
_data_end = NULL;
_block = NULL;
return _buf->cutn((char*)out + size, n - size) + size;
} else {
if (_block) {
_data = NULL;
_data_end = NULL;
_block = NULL;
_buf->_pop_front_ref();
}
return _buf->cutn(out, n);
}
}
IOBufAsZeroCopyInputStream::IOBufAsZeroCopyInputStream(const IOBuf& buf)
: _ref_index(0)
, _add_offset(0)
......
......@@ -59,6 +59,7 @@ class IOBuf {
friend class IOBufAsZeroCopyInputStream;
friend class IOBufAsZeroCopyOutputStream;
friend class IOBufBytesIterator;
friend class IOBufCutter;
public:
static const size_t DEFAULT_BLOCK_SIZE = 8192;
static const size_t INITIAL_CAP = 32; // must be power of 2
......@@ -128,7 +129,7 @@ public:
// Returns bytes popped.
size_t pop_back(size_t n);
// Cut off `n' bytes from front side and APPEND to `out'
// Cut off n bytes from front side and APPEND to `out'
// If n == 0, nothing cut; if n >= length(), all bytes are cut
// Returns bytes cut.
size_t cutn(IOBuf* out, size_t n);
......@@ -136,7 +137,7 @@ public:
size_t cutn(std::string* out, size_t n);
// Cut off 1 byte from the front side and set to *c
// Return true on cut, false otherwise.
bool cut1(char* c);
bool cut1(void* c);
// Cut from front side until the characters matches `delim', append
// data before the matched characters to `out'.
......@@ -319,7 +320,8 @@ public:
// the internal buffer.
// If n == 0 and buffer is empty, return value is undefined.
const void* fetch(void* aux_buffer, size_t n) const;
// Just fetch one character.
// Fetch one character from front side.
// Returns pointer to the character, NULL on empty.
const void* fetch1() const;
// Remove all data
......@@ -373,7 +375,14 @@ protected:
// Pop a BlockRef from front side.
// Returns: 0 on success and -1 on empty.
int _pop_front_ref();
int _pop_front_ref() { return _pop_or_moveout_front_ref<false>(); }
// Move a BlockRef out from front side.
// Returns: 0 on success and -1 on empty.
int _moveout_front_ref() { return _pop_or_moveout_front_ref<true>(); }
template <bool MOVEOUT>
int _pop_or_moveout_front_ref();
// Pop a BlockRef from back side.
// Returns: 0 on success and -1 on empty.
......@@ -467,6 +476,51 @@ private:
Block* _block;
};
// Specialized utility to cut from IOBuf faster than using corresponding
// methods in IOBuf.
// Designed for efficiently parsing data from IOBuf.
// The cut IOBuf can be appended during cutting.
class IOBufCutter {
public:
explicit IOBufCutter(butil::IOBuf* buf);
~IOBufCutter();
// Cut off n bytes and APPEND to `out'
// Returns bytes cut.
size_t cutn(butil::IOBuf* out, size_t n);
size_t cutn(std::string* out, size_t n);
size_t cutn(void* out, size_t n);
// Cut off 1 byte from the front side and set to *c
// Return true on cut, false otherwise.
bool cut1(void* data);
// Copy n bytes into `data'
// Returns bytes copied.
size_t copy_to(void* data, size_t n);
// Fetch one character.
// Returns pointer to the character, NULL on empty
const void* fetch1();
// Pop n bytes from front side
// Returns bytes popped.
size_t pop_front(size_t n);
// Uncut bytes
size_t remaining_bytes() const;
private:
size_t slower_copy_to(void* data, size_t n);
bool load_next_ref();
private:
void* _data;
void* _data_end;
IOBuf::Block* _block;
IOBuf* _buf;
};
// Parse protobuf message from IOBuf. Notice that this wrapper does not change
// source IOBuf, which also should not change during lifetime of the wrapper.
// Even if a IOBufAsZeroCopyInputStream is created but parsed, the source
......@@ -631,6 +685,8 @@ private:
int add_block();
void* _data;
// Saving _data_end instead of _size avoid modifying _data and _size
// in each push_back() which is probably a hotspot.
void* _data_end;
IOBuf _buf;
IOBufAsZeroCopyOutputStream _zc_stream;
......
......@@ -189,22 +189,92 @@ inline void IOBuf::_move_back_ref(const BlockRef& r) {
}
}
//////////////// IOBufCutter ////////////////
inline size_t IOBufCutter::remaining_bytes() const {
if (_block) {
return (char*)_data_end - (char*)_data + _buf->size() - _buf->_front_ref().length;
} else {
return _buf->size();
}
}
inline bool IOBufCutter::cut1(void* c) {
if (_data == _data_end) {
if (!load_next_ref()) {
return false;
}
}
*(char*)c = *(const char*)_data;
_data = (char*)_data + 1;
return true;
}
inline const void* IOBufCutter::fetch1() {
if (_data == _data_end) {
if (!load_next_ref()) {
return NULL;
}
}
return _data;
}
inline size_t IOBufCutter::copy_to(void* out, size_t n) {
size_t size = (char*)_data_end - (char*)_data;
if (n <= size) {
memcpy(out, _data, n);
return n;
}
return slower_copy_to(out, n);
}
inline size_t IOBufCutter::pop_front(size_t n) {
const size_t saved_n = n;
do {
const size_t size = (char*)_data_end - (char*)_data;
if (n <= size) {
_data = (char*)_data + n;
return saved_n;
}
if (size != 0) {
n -= size;
}
if (!load_next_ref()) {
return saved_n;
}
} while (true);
}
inline size_t IOBufCutter::cutn(std::string* out, size_t n) {
if (n == 0) {
return 0;
}
const size_t len = remaining_bytes();
if (n > len) {
n = len;
}
const size_t old_size = out->size();
out->resize(out->size() + n);
return cutn(&(*out)[old_size], n);
}
/////////////// IOBufAppender /////////////////
inline int IOBufAppender::append(const void* src, size_t n) {
do {
const size_t size = (char*)_data_end - (char*)_data;
if (n <= size) {
fast_memcpy(_data, src, n);
memcpy(_data, src, n);
_data = (char*)_data + n;
return 0;
}
if (size != 0) {
fast_memcpy(_data, src, size);
memcpy(_data, src, size);
src = (const char*)src + size;
n -= size;
}
if (add_block() != 0) {
return -1;
}
return append(src, n); // tailr
} while (true);
}
inline int IOBufAppender::append(const StringPiece& str) {
......@@ -292,7 +362,7 @@ inline size_t IOBufBytesIterator::copy_and_forward(void* buf, size_t n) {
while (nc < n && _bytes_left != 0) {
const size_t block_size = _block_end - _block_begin;
const size_t to_copy = std::min(block_size, n - nc);
fast_memcpy((char*)buf + nc, _block_begin, to_copy);
memcpy((char*)buf + nc, _block_begin, to_copy);
_block_begin += to_copy;
_bytes_left -= to_copy;
nc += to_copy;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment