Commit 9275f4d5 authored by Ge Jun's avatar Ge Jun

Simplify IOBufAsZeroCopyInputStream

parent c22b29d0
......@@ -1809,45 +1809,46 @@ void IOPortal::return_cached_blocks_impl(Block* b) {
}
IOBufAsZeroCopyInputStream::IOBufAsZeroCopyInputStream(const IOBuf& buf)
: _nref(buf._ref_num())
, _ref_index(0)
: _ref_index(0)
, _add_offset(0)
, _byte_count(0)
, _buf(&buf) {
_cur_ref = (_nref > 0 ? &buf._ref_at(0) : NULL);
}
bool IOBufAsZeroCopyInputStream::Next(const void** data, int* size) {
if (_cur_ref != NULL) {
*data = _cur_ref->block->data + _cur_ref->offset + _add_offset;
// Impl. of Backup/Skip guarantees that _add_offset < _cur_ref->length.
*size = _cur_ref->length - _add_offset;
_byte_count += _cur_ref->length - _add_offset;
const IOBuf::BlockRef* cur_ref = _buf->_pref_at(_ref_index);
if (cur_ref == NULL) {
return false;
}
*data = cur_ref->block->data + cur_ref->offset + _add_offset;
// Impl. of Backup/Skip guarantees that _add_offset < cur_ref->length.
*size = cur_ref->length - _add_offset;
_byte_count += cur_ref->length - _add_offset;
_add_offset = 0;
++_ref_index;
_cur_ref = (_ref_index < _nref ? &_buf->_ref_at(_ref_index) : NULL);
return true;
}
return false;
}
void IOBufAsZeroCopyInputStream::BackUp(int count) {
if (_ref_index > 0) {
--_ref_index;
_cur_ref = &_buf->_ref_at(_ref_index);
CHECK(_add_offset == 0 && _cur_ref->length >= (uint32_t)count)
const IOBuf::BlockRef* cur_ref = _buf->_pref_at(--_ref_index);
CHECK(_add_offset == 0 && cur_ref->length >= (uint32_t)count)
<< "BackUp() is not after a Next()";
_add_offset = _cur_ref->length - count;
_add_offset = cur_ref->length - count;
_byte_count -= count;
} else {
LOG(FATAL) << "BackUp an empty ZeroCopyInputStream";
}
}
// Skips a number of bytes. Returns false if the end of the stream is
// reached or some input error occurred. In the end-of-stream case, the
// stream is advanced to the end of the stream (so ByteCount() will return
// the total size of the stream).
bool IOBufAsZeroCopyInputStream::Skip(int count) {
if (_cur_ref != NULL) {
do {
const int left_bytes = _cur_ref->length - _add_offset;
const IOBuf::BlockRef* cur_ref = _buf->_pref_at(_ref_index);
while (cur_ref) {
const int left_bytes = cur_ref->length - _add_offset;
if (count < left_bytes) {
_add_offset += count;
_byte_count += count;
......@@ -1856,13 +1857,7 @@ bool IOBufAsZeroCopyInputStream::Skip(int count) {
count -= left_bytes;
_add_offset = 0;
_byte_count += left_bytes;
if (++_ref_index < _nref) {
_cur_ref = &_buf->_ref_at(_ref_index);
continue;
}
_cur_ref = NULL;
return false;
} while (1);
cur_ref = _buf->_pref_at(++_ref_index);
}
return false;
}
......
......@@ -389,6 +389,10 @@ protected:
BlockRef& _ref_at(size_t i);
const BlockRef& _ref_at(size_t i) const;
// Get pointer to n-th BlockRef(counting from front)
// If i is out-of-range, NULL is returned.
const BlockRef* _pref_at(size_t i) const;
private:
union {
BigView _bv;
......@@ -490,18 +494,15 @@ class IOBufAsZeroCopyInputStream
public:
explicit IOBufAsZeroCopyInputStream(const IOBuf&);
// @ZeroCopyInputStream
bool Next(const void** data, int* size);
void BackUp(int count);
bool Skip(int count);
google::protobuf::int64 ByteCount() const;
bool Next(const void** data, int* size) override;
void BackUp(int count) override;
bool Skip(int count) override;
google::protobuf::int64 ByteCount() const override;
private:
int _nref;
int _ref_index;
int _add_offset;
google::protobuf::int64 _byte_count;
const IOBuf::BlockRef* _cur_ref;
const IOBuf* _buf;
};
......@@ -525,10 +526,9 @@ public:
IOBufAsZeroCopyOutputStream(IOBuf*, uint32_t block_size);
~IOBufAsZeroCopyOutputStream();
// @ZeroCopyOutputStream
bool Next(void** data, int* size);
void BackUp(int count); // `count' can be as long as ByteCount()
google::protobuf::int64 ByteCount() const;
bool Next(void** data, int* size) override;
void BackUp(int count) override; // `count' can be as long as ByteCount()
google::protobuf::int64 ByteCount() const override;
private:
void _release_block();
......
......@@ -156,6 +156,14 @@ inline const IOBuf::BlockRef& IOBuf::_ref_at(size_t i) const {
return _small() ? _sv.refs[i] : _bv.ref_at(i);
}
inline const IOBuf::BlockRef* IOBuf::_pref_at(size_t i) const {
if (_small()) {
return i < (!!_sv.refs[0].block + !!_sv.refs[1].block) ? &_sv.refs[i] : NULL;
} else {
return i < _bv.nref ? &_bv.ref_at(i) : NULL;
}
}
inline bool operator==(const IOBuf::BlockRef& r1, const IOBuf::BlockRef& r2) {
return r1.offset == r2.offset && r1.length == r2.length &&
r1.block == r2.block;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment