// iobuf - A non-continuous zero-copied buffer
// Copyright (c) 2012 Baidu, Inc.
// 
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// 
//     http://www.apache.org/licenses/LICENSE-2.0
// 
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Author: Ge,Jun (gejun@baidu.com)
// Date: Thu Nov 22 13:57:56 CST 2012

#include <openssl/ssl.h>                   // SSL_*
#include <sys/syscall.h>                   // syscall
#include <fcntl.h>                         // O_RDONLY
#include <errno.h>                         // errno
#include <limits.h>                        // CHAR_BIT
#include <stdexcept>                       // std::invalid_argument
#include "butil/build_config.h"             // ARCH_CPU_X86_64
#include "butil/atomicops.h"                // butil::atomic
#include "butil/thread_local.h"             // thread_atexit
#include "butil/macros.h"                   // BAIDU_CASSERT
#include "butil/logging.h"                  // CHECK, LOG
#include "butil/fd_guard.h"                 // butil::fd_guard
#include "butil/iobuf.h"

namespace butil {
namespace iobuf {

typedef ssize_t (*iov_function)(int fd, const struct iovec *vector,
                                   int count, off_t offset);

// Userpsace preadv
static ssize_t user_preadv(int fd, const struct iovec *vector, 
                           int count, off_t offset) {
    ssize_t total_read = 0;
    for (int i = 0; i < count; ++i) {
        const ssize_t rc = ::pread(fd, vector[i].iov_base, vector[i].iov_len, offset);
        if (rc <= 0) {
            return total_read > 0 ? total_read : rc;
        }
        total_read += rc;
        offset += rc;
        if (rc < (ssize_t)vector[i].iov_len) {
            break;
        }
    }
    return total_read;
}

static ssize_t user_pwritev(int fd, const struct iovec* vector,
                            int count, off_t offset) {
    ssize_t total_write = 0;
    for (int i = 0; i < count; ++i) {
        const ssize_t rc = ::pwrite(fd, vector[i].iov_base, vector[i].iov_len, offset);
        if (rc <= 0) {
            return total_write > 0 ? total_write : rc;
        }
        total_write += rc;
        offset += rc;
        if (rc < (ssize_t)vector[i].iov_len) {
            break;
        }
    }
    return total_write;
}

#if ARCH_CPU_X86_64

#ifndef SYS_preadv
#define SYS_preadv 295
#endif  // SYS_preadv

#ifndef SYS_pwritev
#define SYS_pwritev 296
#endif // SYS_pwritev

// SYS_preadv/SYS_pwritev is available since Linux 2.6.30
static ssize_t sys_preadv(int fd, const struct iovec *vector, 
                          int count, off_t offset) {
    return syscall(SYS_preadv, fd, vector, count, offset);
}

static ssize_t sys_pwritev(int fd, const struct iovec *vector, 
                           int count, off_t offset) {
    return syscall(SYS_pwritev, fd, vector, count, offset);
}

inline iov_function get_preadv_func() {
    butil::fd_guard fd(open("/dev/zero", O_RDONLY));
    if (fd < 0) {
        PLOG(WARNING) << "Fail to open /dev/zero";
        return user_preadv;
    }
#if defined(OS_MACOSX)
    return user_preadv;
#endif
    char dummy[1];
    iovec vec = { dummy, sizeof(dummy) };
    const int rc = syscall(SYS_preadv, (int)fd, &vec, 1, 0);
    if (rc < 0) {
        PLOG(WARNING) << "The kernel doesn't support SYS_preadv, "
                         " use user_preadv instead";
        return user_preadv;
    }
    return sys_preadv;
}

inline iov_function get_pwritev_func() {
    butil::fd_guard fd(open("/dev/null", O_WRONLY));
    if (fd < 0) {
        PLOG(ERROR) << "Fail to open /dev/null";
        return user_pwritev;
    }
#if defined(OS_MACOSX)
    return user_pwritev;
#endif
    char dummy[1];
    iovec vec = { dummy, sizeof(dummy) };
    const int rc = syscall(SYS_pwritev, (int)fd, &vec, 1, 0);
    if (rc < 0) {
        PLOG(WARNING) << "The kernel doesn't support SYS_pwritev, "
                         " use user_pwritev instead";
        return user_pwritev;
    }
    return sys_pwritev;
}

#else   // ARCH_CPU_X86_64

#warning "We don't check whether the kernel supports SYS_preadv or SYS_pwritev " \
         "when the arch is not X86_64, use user space preadv/pwritev directly"

inline iov_function get_preadv_func() {
    return user_preadv;
}

inline iov_function get_pwritev_func() {
    return user_pwritev;
}

#endif  // ARCH_CPU_X86_64

static const size_t FAST_MEMCPY_MAXSIZE = 123;
template <size_t size> struct FastMemcpyBlock {
    int data[size];
};
template <> struct FastMemcpyBlock<0> { };

template <size_t size> class FastMemcpy {
public:
    typedef FastMemcpyBlock<size / sizeof(int)> Block;

    static void* copy(void *dest, const void *src) {
        *(Block*)dest = *(Block*)src;
        if ((size % sizeof(int)) > 2) {
            ((char*)dest)[size-3] = ((char*)src)[size-3];
        }
        if ((size % sizeof(int)) > 1) {
            ((char*)dest)[size-2] = ((char*)src)[size-2];
        }
        if ((size % sizeof(int)) > 0) {
            ((char*)dest)[size-1] = ((char*)src)[size-1];
        }
        return dest;
    }
};

typedef void* (*CopyFn)(void*, const void*);
static CopyFn g_fast_memcpy_fn[FAST_MEMCPY_MAXSIZE + 1];

template <size_t size>
struct InitFastMemcpy : public InitFastMemcpy<size-1> {
    InitFastMemcpy() {
        g_fast_memcpy_fn[size] = FastMemcpy<size>::copy;
    }
};
template <>
class InitFastMemcpy<0> {
public:
    InitFastMemcpy() {
        g_fast_memcpy_fn[0] = FastMemcpy<0>::copy;
    }
};

inline void* cp(void *__restrict dest, const void *__restrict src, size_t n) {
#if defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8))
    // memcpy in gcc 4.8 seems to be faster.
    return memcpy(dest, src, n);
#else
    if (n <= FAST_MEMCPY_MAXSIZE) {
        static InitFastMemcpy<FAST_MEMCPY_MAXSIZE> _init_cp_dummy;
        return g_fast_memcpy_fn[n](dest, src);
    }
    return memcpy(dest, src, n);
#endif
}

// Function pointers to allocate or deallocate memory for a IOBuf::Block
void* (*blockmem_allocate)(size_t) = ::malloc;
void  (*blockmem_deallocate)(void*) = ::free;

// Use default function pointers
void reset_blockmem_allocate_and_deallocate() {
    blockmem_allocate = ::malloc;
    blockmem_deallocate = ::free;
}

butil::static_atomic<size_t> g_nblock = BUTIL_STATIC_ATOMIC_INIT(0);
butil::static_atomic<size_t> g_blockmem = BUTIL_STATIC_ATOMIC_INIT(0);
butil::static_atomic<size_t> g_newbigview = BUTIL_STATIC_ATOMIC_INIT(0);

}  // namespace iobuf

size_t IOBuf::block_count() {
    return iobuf::g_nblock.load(butil::memory_order_relaxed);
}

size_t IOBuf::block_memory() {
    return iobuf::g_blockmem.load(butil::memory_order_relaxed);
}

size_t IOBuf::new_bigview_count() {
    return iobuf::g_newbigview.load(butil::memory_order_relaxed);
}

struct IOBuf::Block {
    butil::atomic<int> nshared;
    uint16_t size;
    uint16_t cap;
    Block* portal_next;
    char data[0];
        
    explicit Block(size_t block_size)
        : nshared(1), size(0), cap(block_size - offsetof(Block, data))
        , portal_next(NULL) {
        assert(block_size <= MAX_BLOCK_SIZE);
        iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
        iobuf::g_blockmem.fetch_add(block_size, butil::memory_order_relaxed);
    }

    void inc_ref() {
        nshared.fetch_add(1, butil::memory_order_relaxed);
    }
        
    void dec_ref() {
        if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
            butil::atomic_thread_fence(butil::memory_order_acquire);
            iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
            iobuf::g_blockmem.fetch_sub(cap + offsetof(Block, data),
                                        butil::memory_order_relaxed);
            this->~Block();
            iobuf::blockmem_deallocate(this);
        }
    }

    int ref_count() const {
        return nshared.load(butil::memory_order_relaxed);
    }

    bool full() const { return size >= cap; }
    size_t left_space() const { return cap - size; }
};

namespace iobuf {

// for unit test
int block_shared_count(IOBuf::Block const* b) { return b->ref_count(); }

IOBuf::Block* get_portal_next(IOBuf::Block const* b) {
    return b->portal_next;
}

uint16_t block_cap(IOBuf::Block const *b) {
    return b->cap;
}

inline IOBuf::Block* create_block(const size_t block_size) {
    void* mem = iobuf::blockmem_allocate(block_size);
    if (BAIDU_LIKELY(mem != NULL)) {
        return new (mem) IOBuf::Block(block_size);
    }
    return NULL;
}

inline IOBuf::Block* create_block() {
    return create_block(IOBuf::DEFAULT_BLOCK_SIZE);
}

// === Share TLS blocks between appending operations ===
// Max number of blocks in each TLS. This is a soft limit namely
// release_tls_block_chain() may exceed this limit sometimes.
const int MAX_BLOCKS_PER_THREAD = 8;

// NOTE: not see differences in examples when CACHE_IOBUF_BLOCKREFS is turned on
// (tcmalloc linked)
#ifdef CACHE_IOBUF_BLOCKREFS
const int MAX_BLOCKREFS_PER_THREAD = 8;
#endif

struct TLSData {
    // Head of the TLS block chain.
    IOBuf::Block* block_head;
    
    // Number of TLS blocks
    int num_blocks;
    
    // True if the remote_tls_block_chain is registered to the thread.
    bool registered;

#ifdef CACHE_IOBUF_BLOCKREFS
    // Reuse array of BlockRef
    int num_blockrefs;
    IOBuf::BlockRef* blockrefs[MAX_BLOCKREFS_PER_THREAD];
#endif
};

#ifdef CACHE_IOBUF_BLOCKREFS
static __thread TLSData g_tls_data = { NULL, 0, false, 0, {} };
#else
static __thread TLSData g_tls_data = { NULL, 0, false };
#endif

// Used in UT
IOBuf::Block* get_tls_block_head() { return g_tls_data.block_head; }
int get_tls_block_count() { return g_tls_data.num_blocks; }

// Number of blocks that can't be returned to TLS which has too many block
// already. This counter should be 0 in most scenarios, otherwise performance
// of appending functions in IOPortal may be lowered.
static butil::static_atomic<size_t> g_num_hit_tls_threshold = BUTIL_STATIC_ATOMIC_INIT(0);

// Called in UT.
void remove_tls_block_chain() {
    TLSData& tls_data = g_tls_data;
    IOBuf::Block* b = tls_data.block_head;
    if (!b) {
        return;
    }
    tls_data.block_head = NULL;
    int n = 0;
    do {
        IOBuf::Block* const saved_next = b->portal_next;
        b->dec_ref();
        b = saved_next;
        ++n;
    } while (b);
    CHECK_EQ(n, tls_data.num_blocks);
    tls_data.num_blocks = 0;
}

// Get a (non-full) block from TLS.
// Notice that the block is not removed from TLS.
inline IOBuf::Block* share_tls_block() {
    TLSData& tls_data = g_tls_data;
    IOBuf::Block* const b = tls_data.block_head;
    if (b != NULL && !b->full()) {
        return b;
    }
    IOBuf::Block* new_block = NULL;
    if (b) {
        new_block = b->portal_next;
        b->dec_ref();
        --tls_data.num_blocks;
    } else if (!tls_data.registered) {
        tls_data.registered = true;
        // Only register atexit at the first time
        butil::thread_atexit(remove_tls_block_chain);
    }
    if (!new_block) {
        new_block = create_block(); // may be NULL
        if (new_block) {
            ++tls_data.num_blocks;
        }
    }
    tls_data.block_head = new_block;
    return new_block;
}

// Return one block to TLS.
inline void release_tls_block(IOBuf::Block *b) {
    if (!b) {
        return;
    }
    TLSData& tls_data = g_tls_data;
    if (b->full()) {
        b->dec_ref();
    } else if (tls_data.num_blocks >= MAX_BLOCKS_PER_THREAD) {
        b->dec_ref();
        g_num_hit_tls_threshold.fetch_add(1, butil::memory_order_relaxed);
    } else {
        b->portal_next = tls_data.block_head;
        tls_data.block_head = b;
        ++tls_data.num_blocks;
        if (!tls_data.registered) {
            tls_data.registered = true;
            butil::thread_atexit(remove_tls_block_chain);
        }
    }
}

// Return chained blocks to TLS.
// NOTE: b MUST be non-NULL and all blocks linked SHOULD not be full.
inline void release_tls_block_chain(IOBuf::Block* b) {
    TLSData& tls_data = g_tls_data;
    size_t n = 0;
    if (tls_data.num_blocks >= MAX_BLOCKS_PER_THREAD) {
        do {
            ++n;
            IOBuf::Block* const saved_next = b->portal_next;
            b->dec_ref();
            b = saved_next;
        } while (b);
        g_num_hit_tls_threshold.fetch_add(n, butil::memory_order_relaxed);
        return;
    }
    IOBuf::Block* first_b = b;
    IOBuf::Block* last_b = NULL;
    do {
        ++n;
        CHECK(!b->full());
        if (b->portal_next == NULL) {
            last_b = b;
            break;
        }
        b = b->portal_next;
    } while (true);
    last_b->portal_next = tls_data.block_head;
    tls_data.block_head = first_b;
    tls_data.num_blocks += n;
    if (!tls_data.registered) {
        tls_data.registered = true;
        butil::thread_atexit(remove_tls_block_chain);
    }
}

// Get and remove one (non-full) block from TLS. If TLS is empty, create one.
inline IOBuf::Block* acquire_tls_block() {
    TLSData& tls_data = g_tls_data;
    IOBuf::Block* b = tls_data.block_head;
    if (!b) {
        return create_block();
    }
    if (b->full()) {
        IOBuf::Block* const saved_next = b->portal_next;
        b->dec_ref();
        tls_data.block_head = saved_next;
        --tls_data.num_blocks;
        b = saved_next;
        if (!b) {
            return create_block();
        }
    }
    tls_data.block_head = b->portal_next;
    --tls_data.num_blocks;
    b->portal_next = NULL;
    return b;
}

inline IOBuf::BlockRef* acquire_blockref_array() {
#ifdef CACHE_IOBUF_BLOCKREFS
    TLSData& tls_data = g_tls_data;
    if (tls_data.num_blockrefs) {
        return tls_data.blockrefs[--tls_data.num_blockrefs];
    }
#endif
    iobuf::g_newbigview.fetch_add(1, butil::memory_order_relaxed);
    return new IOBuf::BlockRef[IOBuf::INITIAL_CAP];
}

inline IOBuf::BlockRef* acquire_blockref_array(size_t cap) {
#ifdef CACHE_IOBUF_BLOCKREFS
    if (cap == IOBuf::INITIAL_CAP) {
        return acquire_blockref_array();
    }
#endif
    iobuf::g_newbigview.fetch_add(1, butil::memory_order_relaxed);
    return new IOBuf::BlockRef[cap];
}

inline void release_blockref_array(IOBuf::BlockRef* refs, size_t cap) {
#ifdef CACHE_IOBUF_BLOCKREFS
    if (cap == IOBuf::INITIAL_CAP) {
        TLSData& tls_data = g_tls_data;
        if (tls_data.num_blockrefs < MAX_BLOCKREFS_PER_THREAD) {
            tls_data.blockrefs[tls_data.num_blockrefs++] = refs;
            return;
        }
    }
#endif
    delete[] refs;
}

}  // namespace iobuf

size_t IOBuf::block_count_hit_tls_threshold() {
    return iobuf::g_num_hit_tls_threshold.load(butil::memory_order_relaxed);
}

BAIDU_CASSERT(sizeof(IOBuf::SmallView) == sizeof(IOBuf::BigView),
              sizeof_small_and_big_view_should_equal);

BAIDU_CASSERT(IOBuf::DEFAULT_BLOCK_SIZE/4096*4096 == IOBuf::DEFAULT_BLOCK_SIZE,
              sizeof_block_should_be_multiply_of_4096);

const IOBuf::Area IOBuf::INVALID_AREA;
const size_t IOBuf::DEFAULT_PAYLOAD;

IOBuf::IOBuf(const IOBuf& rhs) {
    if (rhs._small()) {
        _sv = rhs._sv;
        if (_sv.refs[0].block) {
            _sv.refs[0].block->inc_ref();
        }
        if (_sv.refs[1].block) {
            _sv.refs[1].block->inc_ref();
        }
    } else {
        _bv.magic = -1;
        _bv.start = 0;
        _bv.nref = rhs._bv.nref;
        _bv.cap_mask = rhs._bv.cap_mask;
        _bv.nbytes = rhs._bv.nbytes;
        _bv.refs = iobuf::acquire_blockref_array(_bv.capacity());
        for (size_t i = 0; i < _bv.nref; ++i) {
            _bv.refs[i] = rhs._bv.ref_at(i);
            _bv.refs[i].block->inc_ref();
        }
    }
}

void IOBuf::operator=(const IOBuf& rhs) {
    if (this == &rhs) {
        return;
    }
    if (!rhs._small() && !_small() && _bv.cap_mask == rhs._bv.cap_mask) {
        // Reuse array of refs
        // Remove references to previous blocks.
        for (size_t i = 0; i < _bv.nref; ++i) {
            _bv.ref_at(i).block->dec_ref();
        }
        // References blocks in rhs.
        _bv.start = 0;
        _bv.nref = rhs._bv.nref;
        _bv.nbytes = rhs._bv.nbytes;
        for (size_t i = 0; i < _bv.nref; ++i) {
            _bv.refs[i] = rhs._bv.ref_at(i);
            _bv.refs[i].block->inc_ref();
        }
    } else {
        this->~IOBuf();
        new (this) IOBuf(rhs);
    }
}

template <bool MOVE>
void IOBuf::_push_or_move_back_ref_to_smallview(const BlockRef& r) {
    BlockRef* const refs = _sv.refs;
    if (NULL == refs[0].block) {
        refs[0] = r;
        if (!MOVE) {
            r.block->inc_ref();
        }
        return;
    }
    if (NULL == refs[1].block) {
        if (refs[0].block == r.block &&
            refs[0].offset + refs[0].length == r.offset) { // Merge ref
            refs[0].length += r.length;
            if (MOVE) {
                r.block->dec_ref();
            }
            return;
        }
        refs[1] = r;
        if (!MOVE) {
            r.block->inc_ref();
        }
        return;
    }
    if (refs[1].block == r.block &&
        refs[1].offset + refs[1].length == r.offset) { // Merge ref
        refs[1].length += r.length;
        if (MOVE) {
            r.block->dec_ref();
        }
        return;
    }
    // Convert to BigView
    BlockRef* new_refs = iobuf::acquire_blockref_array();
    new_refs[0] = refs[0];
    new_refs[1] = refs[1];
    new_refs[2] = r;
    const size_t new_nbytes = refs[0].length + refs[1].length + r.length;
    if (!MOVE) {
        r.block->inc_ref();
    }
    _bv.magic = -1;
    _bv.start = 0;
    _bv.refs = new_refs;
    _bv.nref = 3;
    _bv.cap_mask = INITIAL_CAP - 1;
    _bv.nbytes = new_nbytes;
}
// Explicitly initialize templates.
template void IOBuf::_push_or_move_back_ref_to_smallview<true>(const BlockRef&);
template void IOBuf::_push_or_move_back_ref_to_smallview<false>(const BlockRef&);

template <bool MOVE>
void IOBuf::_push_or_move_back_ref_to_bigview(const BlockRef& r) {
    BlockRef& back = _bv.ref_at(_bv.nref - 1);
    if (back.block == r.block && back.offset + back.length == r.offset) {
        // Merge ref
        back.length += r.length;
        _bv.nbytes += r.length;
        if (MOVE) {
            r.block->dec_ref();
        }
        return;
    }

    if (_bv.nref != _bv.capacity()) {
        _bv.ref_at(_bv.nref++) = r;
        _bv.nbytes += r.length;
        if (!MOVE) {
            r.block->inc_ref();
        }
        return;
    }
    // resize, don't modify bv until new_refs is fully assigned
    const uint32_t new_cap = _bv.capacity() * 2;
    BlockRef* new_refs = iobuf::acquire_blockref_array(new_cap);
    for (uint32_t i = 0; i < _bv.nref; ++i) {
        new_refs[i] = _bv.ref_at(i);
    }
    new_refs[_bv.nref++] = r;

    // Change other variables
    _bv.start = 0;
    iobuf::release_blockref_array(_bv.refs, _bv.capacity());
    _bv.refs = new_refs;
    _bv.cap_mask = new_cap - 1;
    _bv.nbytes += r.length;
    if (!MOVE) {
        r.block->inc_ref();
    }
}
// Explicitly initialize templates.
template void IOBuf::_push_or_move_back_ref_to_bigview<true>(const BlockRef&);
template void IOBuf::_push_or_move_back_ref_to_bigview<false>(const BlockRef&);

int IOBuf::_pop_front_ref() {
    if (_small()) {
        if (_sv.refs[0].block != NULL) {
            _sv.refs[0].block->dec_ref();
            _sv.refs[0] = _sv.refs[1];
            reset_block_ref(_sv.refs[1]);
            return 0;
        }
        return -1;
    } else {
        // _bv.nref must be greater than 2
        const uint32_t start = _bv.start;
        _bv.refs[start].block->dec_ref();
        if (--_bv.nref > 2) {
            _bv.start = (start + 1) & _bv.cap_mask;
            _bv.nbytes -= _bv.refs[start].length;
        } else {  // count==2, fall back to SmallView
            BlockRef* const saved_refs = _bv.refs;
            const uint32_t saved_cap_mask = _bv.cap_mask;
            _sv.refs[0] = saved_refs[(start + 1) & saved_cap_mask];
            _sv.refs[1] = saved_refs[(start + 2) & saved_cap_mask];
            iobuf::release_blockref_array(saved_refs, saved_cap_mask + 1);
        }
        return 0;
    }
}

int IOBuf::_pop_back_ref() {
    if (_small()) {
        if (_sv.refs[1].block != NULL) {
            _sv.refs[1].block->dec_ref();
            reset_block_ref(_sv.refs[1]);
            return 0;
        } else if (_sv.refs[0].block != NULL) {
            _sv.refs[0].block->dec_ref();
            reset_block_ref(_sv.refs[0]);
            return 0;
        }
        return -1;
    } else {
        // _bv.nref must be greater than 2
        const uint32_t start = _bv.start;
        IOBuf::BlockRef& back = _bv.refs[(start + _bv.nref - 1) & _bv.cap_mask];
        back.block->dec_ref();
        if (--_bv.nref > 2) {
            _bv.nbytes -= back.length;
        } else {  // count==2, fall back to SmallView
            BlockRef* const saved_refs = _bv.refs;
            const uint32_t saved_cap_mask = _bv.cap_mask;
            _sv.refs[0] = saved_refs[start];
            _sv.refs[1] = saved_refs[(start + 1) & saved_cap_mask];
            iobuf::release_blockref_array(saved_refs, saved_cap_mask + 1);
        }
        return 0;
    }
}

void IOBuf::clear() {
    if (_small()) {
        if (_sv.refs[0].block != NULL) {
            _sv.refs[0].block->dec_ref();
            reset_block_ref(_sv.refs[0]);
                        
            if (_sv.refs[1].block != NULL) {
                _sv.refs[1].block->dec_ref();
                reset_block_ref(_sv.refs[1]);
            }
        }
    } else {
        for (uint32_t i = 0; i < _bv.nref; ++i) { 
            _bv.ref_at(i).block->dec_ref();
        }
        iobuf::release_blockref_array(_bv.refs, _bv.capacity());
        new (this) IOBuf;
    }
}

size_t IOBuf::pop_front(size_t n) {
    const size_t len = length();
    if (n >= len) {
        clear();
        return len;
    }
    const size_t saved_n = n;
    while (n) {  // length() == 0 does not enter
        IOBuf::BlockRef &r = _front_ref();
        if (r.length > n) {
            r.offset += n;
            r.length -= n;
            if (!_small()) {
                _bv.nbytes -= n;
            }
            return saved_n;
        }
        n -= r.length;
        _pop_front_ref();
    }
    return saved_n;
}

bool IOBuf::cut1(char* c) {
    if (empty()) {
        return false;
    }
    IOBuf::BlockRef &r = _front_ref();
    *c = r.block->data[r.offset];
    if (r.length > 1) {
        ++r.offset;
        --r.length;
        if (!_small()) {
            --_bv.nbytes;
        }
    } else {
        _pop_front_ref();
    }
    return true;
}

size_t IOBuf::pop_back(size_t n) {
    const size_t len = length();
    if (n >= len) {
        clear();
        return len;
    }
    const size_t saved_n = n;
    while (n) {  // length() == 0 does not enter
        IOBuf::BlockRef &r = _back_ref();
        if (r.length > n) {
            r.length -= n;
            if (!_small()) {
                _bv.nbytes -= n;
            }
            return saved_n;
        }
        n -= r.length;
        _pop_back_ref();
    }
    return saved_n;
}

size_t IOBuf::cutn(IOBuf* out, size_t n) {
    const size_t len = length();
    if (n > len) {
        n = len;
    }
    const size_t saved_n = n;
    while (n) {   // length() == 0 does not enter
        IOBuf::BlockRef &r = _front_ref();
        if (r.length <= n) {
            out->_push_back_ref(r);
            n -= r.length;
            _pop_front_ref();
        } else {
            const IOBuf::BlockRef cr = { r.offset, (uint32_t)n, r.block };
            out->_push_back_ref(cr);
            
            r.offset += n;
            r.length -= n;
            if (!_small()) {
                _bv.nbytes -= n;
            }
            return saved_n;
        }
    }
    return saved_n;
}

size_t IOBuf::cutn(void* out, size_t n) {
    const size_t len = length();
    if (n > len) {
        n = len;
    }
    const size_t saved_n = n;
    while (n) {   // length() == 0 does not enter
        IOBuf::BlockRef &r = _front_ref();
        if (r.length <= n) {
            iobuf::cp(out, r.block->data + r.offset, r.length);
            out = (char*)out + r.length;
            n -= r.length;
            _pop_front_ref();
        } else {
            iobuf::cp(out, r.block->data + r.offset, n);
            out = (char*)out + n;
            r.offset += n;
            r.length -= n;
            if (!_small()) {
                _bv.nbytes -= n;
            }
            return saved_n;
        }
    }
    return saved_n;
}

size_t IOBuf::cutn(std::string* out, size_t n) {
    if (n == 0) {
        return 0;
    }
    const size_t len = length();
    if (n > len) {
        n = len;
    }
    const size_t old_size = out->size();
    out->resize(out->size() + n);
    cutn(&out[0][old_size], n);
    return n;
}

int IOBuf::_cut_by_char(IOBuf* out, char d) {
    const size_t nref = _ref_num();
    size_t n = 0;
    
    for (size_t i = 0; i < nref; ++i) {
        IOBuf::BlockRef const& r = _ref_at(i);
        char const* const s = r.block->data + r.offset;
        for (uint32_t j = 0; j < r.length; ++j, ++n) {
            if (s[j] == d) {
                // There's no way cutn/pop_front fails
                cutn(out, n);
                pop_front(1);
                return 0;
            }
        }
    }

    return -1;
}

int IOBuf::_cut_by_delim(IOBuf* out, char const* dbegin, size_t ndelim) {
    typedef unsigned long SigType;
    const size_t NMAX = sizeof(SigType);
    
    if (ndelim > NMAX || ndelim > length()) {
        return -1;
    }
    
    SigType dsig = 0;
    for (size_t i = 0; i < ndelim; ++i) {
        dsig = (dsig << CHAR_BIT) | static_cast<SigType>(dbegin[i]);
    }
    
    const SigType SIGMASK =
        (ndelim == NMAX ? (SigType)-1 : (((SigType)1 << (ndelim * CHAR_BIT)) - 1));

    const size_t nref = _ref_num();
    SigType sig = 0;
    size_t n = 0;

    for (size_t i = 0; i < nref; ++i) {
        IOBuf::BlockRef const& r = _ref_at(i);
        char const* const s = r.block->data + r.offset;
        
        for (uint32_t j = 0; j < r.length; ++j, ++n) {
            sig = ((sig << CHAR_BIT) | static_cast<SigType>(s[j])) & SIGMASK;
            if (sig == dsig) {
                // There's no way cutn/pop_front fails
                cutn(out, n + 1 - ndelim);
                pop_front(ndelim);
                return 0;
            }
        }
    }

    return -1;
}

// Since cut_into_file_descriptor() allocates iovec on stack, IOV_MAX=1024
// is too large(in the worst case) for bthreads with small stacks.
static const size_t IOBUF_IOV_MAX = 256;

ssize_t IOBuf::pcut_into_file_descriptor(int fd, off_t offset, size_t size_hint) {
    if (empty()) {
        return 0;
    }
    
    const size_t nref = std::min(_ref_num(), IOBUF_IOV_MAX);
    struct iovec vec[nref];
    size_t nvec = 0;
    size_t cur_len = 0;

    do {
        IOBuf::BlockRef const& r = _ref_at(nvec);
        vec[nvec].iov_base = r.block->data + r.offset;
        vec[nvec].iov_len = r.length;
        ++nvec;
        cur_len += r.length;
    } while (nvec < nref && cur_len < size_hint);

    ssize_t nw = 0;

    if (offset >= 0) {
        static iobuf::iov_function pwritev_func = iobuf::get_pwritev_func();
        nw = pwritev_func(fd, vec, nvec, offset);
    } else {
        nw = ::writev(fd, vec, nvec);
    }
    if (nw > 0) {
        pop_front(nw);
    }
    return nw;
}

ssize_t IOBuf::cut_into_SSL_channel(SSL* ssl, int* ssl_error) {
    *ssl_error = SSL_ERROR_NONE;
    if (empty()) {
        return 0;
    }
    
    IOBuf::BlockRef const& r = _ref_at(0);
    const int nw = SSL_write(ssl, r.block->data + r.offset, r.length);
    if (nw > 0) {
        pop_front(nw);
    }
    *ssl_error = SSL_get_error(ssl, nw);
    return nw;
}

ssize_t IOBuf::cut_multiple_into_SSL_channel(SSL* ssl, IOBuf* const* pieces,
                                             size_t count, int* ssl_error) {
    ssize_t nw = 0;
    *ssl_error = SSL_ERROR_NONE;
    for (size_t i = 0; i < count; ) {
        if (pieces[i]->empty()) {
            ++i;
            continue;
        }

        ssize_t rc = pieces[i]->cut_into_SSL_channel(ssl, ssl_error);
        if (rc > 0) {
            nw += rc;
        } else {
            if (rc < 0) {
                if (*ssl_error == SSL_ERROR_WANT_WRITE
                    || (*ssl_error == SSL_ERROR_SYSCALL
                        && BIO_fd_non_fatal_error(errno) == 1)) {
                    // Non fatal error, tell caller to write again
                    *ssl_error = SSL_ERROR_WANT_WRITE;
                } else {
                    // Other errors are fatal
                    return rc;
                }
            }
            if (nw == 0) {
                nw = rc;    // Nothing written yet, overwrite nw
            }
            break;
        }
    }

    // Flush remaining data inside the BIO buffer layer
    BIO* wbio = SSL_get_wbio(ssl);
    if (BIO_wpending(wbio) > 0) {
        int rc = BIO_flush(wbio);
        if (rc <= 0 && BIO_fd_non_fatal_error(errno) == 0) {
            // Fatal error during BIO_flush
            *ssl_error = SSL_ERROR_SYSCALL;
            return rc;
        }
    }
    return nw;
}

ssize_t IOBuf::pcut_multiple_into_file_descriptor(
    int fd, off_t offset, IOBuf* const* pieces, size_t count) {
    if (BAIDU_UNLIKELY(count == 0)) {
        return 0;
    }
    if (1UL == count) {
        return pieces[0]->pcut_into_file_descriptor(fd, offset);
    }
    struct iovec vec[IOBUF_IOV_MAX];
    size_t nvec = 0;
    for (size_t i = 0; i < count; ++i) {
        const IOBuf* p = pieces[i];
        const size_t nref = p->_ref_num();
        for (size_t j = 0; j < nref && nvec < IOBUF_IOV_MAX; ++j, ++nvec) {
            IOBuf::BlockRef const& r = p->_ref_at(j);
            vec[nvec].iov_base = r.block->data + r.offset;
            vec[nvec].iov_len = r.length;
        }
    }

    ssize_t nw = 0;
    if (offset >= 0) {
        static iobuf::iov_function pwritev_func = iobuf::get_pwritev_func();
        nw = pwritev_func(fd, vec, nvec, offset);
    } else {
        nw = ::writev(fd, vec, nvec);
    }
    if (nw <= 0) {
        return nw;
    }
    size_t npop_all = nw;
    for (size_t i = 0; i < count; ++i) {
        npop_all -= pieces[i]->pop_front(npop_all);
        if (npop_all == 0) {
            break;
        }
    }
    return nw;
}

void IOBuf::append(const IOBuf& other) {
    const size_t nref = other._ref_num();
    for (size_t i = 0; i < nref; ++i) {
        _push_back_ref(other._ref_at(i));
    }
}

void IOBuf::append(const Movable& movable_other) {
    if (empty()) {
        swap(movable_other.value());
    } else {
        butil::IOBuf& other = movable_other.value();
        const size_t nref = other._ref_num();
        for (size_t i = 0; i < nref; ++i) {
            _move_back_ref(other._ref_at(i));
        }
        if (!other._small()) {
            iobuf::release_blockref_array(other._bv.refs, other._bv.capacity());
        }
        new (&other) IOBuf;
    }
}

int IOBuf::push_back(char c) {
    IOBuf::Block* b = iobuf::share_tls_block();
    if (BAIDU_UNLIKELY(!b)) {
        return -1;
    }
    b->data[b->size] = c;
    const IOBuf::BlockRef r = { b->size, 1, b };
    ++b->size;
    _push_back_ref(r);
    return 0;
}

int IOBuf::append(char const* s) {
    if (BAIDU_LIKELY(s != NULL)) {
        return append(s, strlen(s));
    }
    return -1;
}

int IOBuf::append(void const* data, size_t count) {
    if (BAIDU_UNLIKELY(!data)) {
        return -1;
    }
    if (count == 1) {
        return push_back(*((char const*)data));
    }
    size_t total_nc = 0;
    while (total_nc < count) {  // excluded count == 0
        IOBuf::Block* b = iobuf::share_tls_block();
        if (BAIDU_UNLIKELY(!b)) {
            return -1;
        }
        const size_t nc = std::min(count - total_nc, b->left_space());
        iobuf::cp(b->data + b->size, (char*)data + total_nc, nc);
        
        const IOBuf::BlockRef r = { (uint32_t)b->size, (uint32_t)nc, b };
        _push_back_ref(r);
        b->size += nc;
        total_nc += nc;
    }
    return 0;
}

int IOBuf::appendv(const const_iovec* vec, size_t n) {
    size_t offset = 0;
    for (size_t i = 0; i < n;) {
        IOBuf::Block* b = iobuf::share_tls_block();
        if (BAIDU_UNLIKELY(!b)) {
            return -1;
        }
        uint32_t total_cp = 0;
        for (; i < n; ++i, offset = 0) {
            const const_iovec & vec_i = vec[i];
            const size_t nc = std::min(vec_i.iov_len - offset, b->left_space() - total_cp);
            iobuf::cp(b->data + b->size + total_cp, (char*)vec_i.iov_base + offset, nc);
            total_cp += nc;
            offset += nc;
            if (offset != vec_i.iov_len) {
                break;
            }
        }
        
        const IOBuf::BlockRef r = { (uint32_t)b->size, total_cp, b };
        b->size += total_cp;
        _push_back_ref(r);
    }
    return 0;
}

int IOBuf::resize(size_t n, char c) {
    const size_t saved_len = length();
    if (n < saved_len) {
        pop_back(saved_len - n);
        return 0;
    }
    const size_t count = n - saved_len;
    size_t total_nc = 0;
    while (total_nc < count) {  // excluded count == 0
        IOBuf::Block* b = iobuf::share_tls_block();
        if (BAIDU_UNLIKELY(!b)) {
            return -1;
        }
        const size_t nc = std::min(count - total_nc, b->left_space());
        memset(b->data + b->size, c, nc);
        
        const IOBuf::BlockRef r = { (uint32_t)b->size, (uint32_t)nc, b };
        _push_back_ref(r);
        b->size += nc;
        total_nc += nc;
    }
    return 0;
}

// NOTE: We don't use C++ bitwise fields which make copying slower.
static const int REF_INDEX_BITS = 19;
static const int REF_OFFSET_BITS = 15;
static const int AREA_SIZE_BITS = 30;
static const uint32_t MAX_REF_INDEX = (((uint32_t)1) << REF_INDEX_BITS) - 1;
static const uint32_t MAX_REF_OFFSET = (((uint32_t)1) << REF_OFFSET_BITS) - 1;
static const uint32_t MAX_AREA_SIZE = (((uint32_t)1) << AREA_SIZE_BITS) - 1;

inline IOBuf::Area make_area(uint32_t ref_index, uint32_t ref_offset,
                             uint32_t size) {
    if (ref_index > MAX_REF_INDEX ||
        ref_offset > MAX_REF_OFFSET ||
        size > MAX_AREA_SIZE) {
        LOG(ERROR) << "Too big parameters!";
        return IOBuf::INVALID_AREA;
    }
    return (((uint64_t)ref_index) << (REF_OFFSET_BITS + AREA_SIZE_BITS))
        | (((uint64_t)ref_offset) << AREA_SIZE_BITS)
        | size;
}
inline uint32_t get_area_ref_index(IOBuf::Area c) {
    return (c >> (REF_OFFSET_BITS + AREA_SIZE_BITS)) & MAX_REF_INDEX;
}
inline uint32_t get_area_ref_offset(IOBuf::Area c) {
    return (c >> AREA_SIZE_BITS) & MAX_REF_OFFSET;
}
inline uint32_t get_area_size(IOBuf::Area c) {
    return (c & MAX_AREA_SIZE);
}

IOBuf::Area IOBuf::reserve(size_t count) {
    IOBuf::Area result = INVALID_AREA;
    size_t total_nc = 0;
    while (total_nc < count) {  // excluded count == 0
        IOBuf::Block* b = iobuf::share_tls_block();
        if (BAIDU_UNLIKELY(!b)) {
            return INVALID_AREA;
        }
        const size_t nc = std::min(count - total_nc, b->left_space());
        const IOBuf::BlockRef r = { (uint32_t)b->size, (uint32_t)nc, b };
        _push_back_ref(r);
        if (total_nc == 0) {
            // Encode the area at first time. Notice that the pushed ref may
            // be merged with existing ones.
            result = make_area(_ref_num() - 1, _back_ref().length - nc, count);
        }
        total_nc += nc;
        b->size += nc;
    }
    return result;
}

int IOBuf::unsafe_assign(Area area, const void* data) {
    if (area == INVALID_AREA || data == NULL) {
        LOG(ERROR) << "Invalid parameters";
        return -1;
    }
    const uint32_t ref_index = get_area_ref_index(area);
    uint32_t ref_offset = get_area_ref_offset(area);
    uint32_t length = get_area_size(area);
    const size_t nref = _ref_num();
    for (size_t i = ref_index; i < nref; ++i) {
        IOBuf::BlockRef& r = _ref_at(i);
        // NOTE: we can't check if the block is shared with another IOBuf or
        // not since even a single IOBuf may reference a block multiple times
        // (by different BlockRef-s)
        
        const size_t nc = std::min(length, r.length - ref_offset);
        iobuf::cp(r.block->data + r.offset + ref_offset, data, nc);
        if (length == nc) {
            return 0;
        }
        ref_offset = 0;
        length -= nc;
        data = (char*)data + nc;
    }
    
    // Use check because we need to see the stack here.
    CHECK(false) << "IOBuf(" << size() << ", nref=" << _ref_num()
                 << ") is shorter than what we reserved("
                 << "ref=" << get_area_ref_index(area)
                 << " off=" << get_area_ref_offset(area)
                 << " size=" << get_area_size(area)
                 << "), this assignment probably corrupted something...";
    return -1;
}

size_t IOBuf::append_to(IOBuf* buf, size_t n, size_t pos) const {
    const size_t nref = _ref_num();
    // Skip `pos' bytes. `offset' is the starting position in starting BlockRef.
    size_t offset = pos;
    size_t i = 0;
    for (; offset != 0 && i < nref; ++i) {
        IOBuf::BlockRef const& r = _ref_at(i);
        if (offset < (size_t)r.length) {
            break;
        }
        offset -= r.length;
    }
    size_t m = n;
    for (; m != 0 && i < nref; ++i) {
        IOBuf::BlockRef const& r = _ref_at(i);
        const size_t nc = std::min(m, (size_t)r.length - offset);
        const IOBuf::BlockRef r2 = { (uint32_t)(r.offset + offset),
                                     (uint32_t)nc, r.block };
        buf->_push_back_ref(r2);
        offset = 0;
        m -= nc;
    }
    // If nref == 0, here returns 0 correctly
    return n - m;
}

size_t IOBuf::copy_to(void* d, size_t n, size_t pos) const {
    const size_t nref = _ref_num();
    // Skip `pos' bytes. `offset' is the starting position in starting BlockRef.
    size_t offset = pos;
    size_t i = 0;
    for (; offset != 0 && i < nref; ++i) {
        IOBuf::BlockRef const& r = _ref_at(i);
        if (offset < (size_t)r.length) {
            break;
        }
        offset -= r.length;
    }
    size_t m = n;
    for (; m != 0 && i < nref; ++i) {
        IOBuf::BlockRef const& r = _ref_at(i);
        const size_t nc = std::min(m, (size_t)r.length - offset);
        iobuf::cp(d, r.block->data + r.offset + offset, nc);
        offset = 0;
        d = (char*)d + nc;
        m -= nc;
    }
    // If nref == 0, here returns 0 correctly
    return n - m;
}

size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const {
    const size_t len = length();
    if (n + pos > len) {
        if (len <= pos) {
            return 0;
        }
        n = len - pos;
    }
    s->resize(n);
    return copy_to(&(*s)[0], n, pos);
}

size_t IOBuf::append_to(std::string* s, size_t n, size_t pos) const {
    const size_t len = length();
    if (n + pos > len) {
        if (len <= pos) {
            return 0;
        }
        n = len - pos;
    }
    const size_t old_size = s->size();
    s->resize(old_size + n);
    return copy_to(&(*s)[old_size], n, pos);
}


size_t IOBuf::copy_to_cstr(char* s, size_t n, size_t pos) const {
    const size_t nc = copy_to(s, n, pos);
    s[nc] = '\0';
    return nc;
}

void const* IOBuf::fetch(void* d, size_t n) const {
    if (n <= length()) {
        IOBuf::BlockRef const& r0 = _ref_at(0);
        if (n <= r0.length) {
            return r0.block->data + r0.offset;
        }
    
        iobuf::cp(d, r0.block->data + r0.offset, r0.length);
        size_t total_nc = r0.length;
        const size_t nref = _ref_num();
        for (size_t i = 1; i < nref; ++i) {
            IOBuf::BlockRef const& r = _ref_at(i);
            if (n <= r.length + total_nc) {
                iobuf::cp((char*)d + total_nc,
                            r.block->data + r.offset, n - total_nc);
                return d;
            }
            iobuf::cp((char*)d + total_nc, r.block->data + r.offset, r.length);
            total_nc += r.length;
        }
    }
    return NULL;
}

const void* IOBuf::fetch1() const {
    if (!empty()) {
        const IOBuf::BlockRef& r0 = _front_ref();
        return r0.block->data + r0.offset;
    }
    return NULL;
}

std::ostream& operator<<(std::ostream& os, const IOBuf& buf) {
    const size_t n = buf.backing_block_num();
    for (size_t i = 0; i < n; ++i) {
        StringPiece blk = buf.backing_block(i);
        os.write(blk.data(), blk.size());
    }
    return os;
}

static char s_binary_char_map[] = {
    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
    'A', 'B', 'C', 'D', 'E', 'F'
};

class BinaryCharPrinter {
public:
    static const size_t BUF_SIZE = 127;
    explicit BinaryCharPrinter(std::ostream& os)
        : _n(0), _os(&os) {}
    ~BinaryCharPrinter() { flush(); }
    void operator<<(unsigned char c);
    void flush();
private:
    uint32_t _n;
    std::ostream* _os;
    char _buf[BUF_SIZE];
};

void BinaryCharPrinter::flush() {
    if (_n > 0) {
        _os->write(_buf, _n);
        _n = 0;
    }
}

inline void BinaryCharPrinter::operator<<(unsigned char c) {
    if (_n > BUF_SIZE - 3) {
        _os->write(_buf, _n);
        _n = 0;
    }
    if (c >= 32 && c <= 126) { // displayable ascii characters
        if (c != '\\') {
            _buf[_n++] = c;
        } else {
            _buf[_n++] = '\\';
            _buf[_n++] = '\\';
        }
    } else {
        _buf[_n++] = '\\';
        switch (c) {
        case '\b': _buf[_n++] = 'b'; break;
        case '\t': _buf[_n++] = 't'; break;
        case '\n': _buf[_n++] = 'n'; break;
        case '\r': _buf[_n++] = 'r'; break;
        default: 
            _buf[_n++] = s_binary_char_map[c >> 4];
            _buf[_n++] = s_binary_char_map[c & 0xF];
            break;
        }
    }
}

void PrintedAsBinary::print(std::ostream& os) const {
    if (_iobuf) {
        const size_t n = _iobuf->backing_block_num();
        size_t nw = 0;
        BinaryCharPrinter printer(os);
        for (size_t i = 0; i < n; ++i) {
            StringPiece blk = _iobuf->backing_block(i);
            for (size_t j = 0; j < blk.size(); ++j) {
                if (nw >= _max_length) {
                    printer.flush();
                    os << "...<skipping " << _iobuf->size() - nw << " bytes>";
                    return;
                }
                ++nw;
                printer << blk[j];
            }
        }
    } else if (!_data.empty()) {
        BinaryCharPrinter printer(os);
        for (size_t i = 0; i < _data.size(); ++i) {
            if (i >= _max_length) {
                printer.flush();
                os << "...<skipping " << _data.size() - i << " bytes>";
                return;
            }
            printer << _data[i];
        }
    }
}

std::ostream& operator<<(std::ostream& os, const PrintedAsBinary& binary_buf) {
    binary_buf.print(os);
    return os;
}

bool IOBuf::equals(const butil::StringPiece& s) const {
    if (size() != s.size()) {
        return false;
    }
    const size_t nref = _ref_num();
    size_t soff = 0;
    for (size_t i = 0; i < nref; ++i) {
        const BlockRef& r = _ref_at(i);
        if (memcmp(r.block->data + r.offset, s.data() + soff, r.length) != 0) {
            return false;
        }
        soff += r.length;
    }
    return true;
}

StringPiece IOBuf::backing_block(size_t i) const {
    if (i < _ref_num()) {
        const BlockRef& r = _ref_at(i);
        return StringPiece(r.block->data + r.offset, r.length);
    }
    return StringPiece();
}

bool IOBuf::equals(const butil::IOBuf& other) const {
    const size_t sz1 = size();
    if (sz1 != other.size()) {
        return false;
    }
    if (!sz1) {
        return true;
    }
    const BlockRef& r1 = _ref_at(0);
    const char* d1 = r1.block->data + r1.offset;
    size_t len1 = r1.length;
    const BlockRef& r2 = other._ref_at(0);
    const char* d2 = r2.block->data + r2.offset;
    size_t len2 = r2.length;
    const size_t nref1 = _ref_num();
    const size_t nref2 = other._ref_num();
    size_t i = 1;
    size_t j = 1;
    do {
        const size_t cmplen = std::min(len1, len2);
        if (memcmp(d1, d2, cmplen) != 0) {
            return false;
        }
        len1 -= cmplen;
        if (!len1) {
            if (i >= nref1) {
                return true;
            }
            const BlockRef& r = _ref_at(i++);
            d1 = r.block->data + r.offset;
            len1 = r.length;
        } else {
            d1 += cmplen;
        }
        len2 -= cmplen;
        if (!len2) {
            if (j >= nref2) {
                return true;
            }
            const BlockRef& r = other._ref_at(j++);
            d2 = r.block->data + r.offset;
            len2 = r.length;
        } else {
            d2 += cmplen;
        }
    } while (true);
    return true;
}

////////////////////////////// IOPortal //////////////////
IOPortal::~IOPortal() { return_cached_blocks(); }

IOPortal& IOPortal::operator=(const IOPortal& rhs) {
    IOBuf::operator=(rhs);
    return *this;
}

void IOPortal::clear() {
    IOBuf::clear();
    return_cached_blocks();
}

const int MAX_APPEND_IOVEC = 64;

ssize_t IOPortal::pappend_from_file_descriptor(
    int fd, off_t offset, size_t max_count) {
    iovec vec[MAX_APPEND_IOVEC];
    int nvec = 0;
    size_t space = 0;
    Block* prev_p = NULL;
    Block* p = _block;
    // Prepare at most MAX_APPEND_IOVEC blocks or space of blocks >= max_count
    do {
        if (p == NULL) {
            p = iobuf::acquire_tls_block();
            if (BAIDU_UNLIKELY(!p)) {
                errno = ENOMEM;
                return -1;
            }
            if (prev_p != NULL) {
                prev_p->portal_next = p;
            } else {
                _block = p;
            }
        }
        vec[nvec].iov_base = p->data + p->size;
        vec[nvec].iov_len = std::min(p->left_space(), max_count - space);
        space += vec[nvec].iov_len;
        ++nvec;
        if (space >= max_count || nvec >= MAX_APPEND_IOVEC) {
            break;
        }
        prev_p = p;
        p = p->portal_next;
    } while (1);

    ssize_t nr = 0;
    if (offset < 0) {
        nr = readv(fd, vec, nvec);
    } else {
        static iobuf::iov_function preadv_func = iobuf::get_preadv_func();
        nr = preadv_func(fd, vec, nvec, offset);
    }
    if (nr <= 0) {  // -1 or 0
        if (empty()) {
            return_cached_blocks();
        }
        return nr;
    }

    size_t total_len = nr;
    do {
        const size_t len = std::min(total_len, _block->left_space());
        total_len -= len;
        const IOBuf::BlockRef r = { _block->size, (uint32_t)len, _block };
        _push_back_ref(r);
        _block->size += len;
        if (_block->full()) {
            Block* const saved_next = _block->portal_next;
            _block->dec_ref();  // _block may be deleted
            _block = saved_next;
        }
    } while (total_len);
    return nr;
}

ssize_t IOPortal::append_from_SSL_channel(
    SSL* ssl, int* ssl_error, size_t max_count) {
    size_t nr = 0;
    do {
        if (!_block) {
            _block = iobuf::acquire_tls_block();
            if (BAIDU_UNLIKELY(!_block)) {
                errno = ENOMEM;
                *ssl_error = SSL_ERROR_SYSCALL;
                return -1;
            }
        }

        const size_t read_len = std::min(_block->left_space(), max_count - nr);
        const int rc = SSL_read(ssl, _block->data + _block->size, read_len);
        *ssl_error = SSL_get_error(ssl, rc);
        if (rc > 0) {
            const IOBuf::BlockRef r = { (uint32_t)_block->size, (uint32_t)rc, _block };
            _push_back_ref(r);
            _block->size += rc;
            if (_block->full()) {
                Block* const saved_next = _block->portal_next;
                _block->dec_ref();  // _block may be deleted
                _block = saved_next;
            }
            nr += rc;
        } else {
            if (rc < 0) {
                if (*ssl_error == SSL_ERROR_WANT_READ
                    || (*ssl_error == SSL_ERROR_SYSCALL
                        && BIO_fd_non_fatal_error(errno) == 1)) {
                    // Non fatal error, tell caller to read again
                    *ssl_error = SSL_ERROR_WANT_READ;
                } else {
                    // Other errors are fatal
                    return rc;
                }
            }
            return (nr > 0 ? nr : rc);
        }
    } while (nr < max_count);
    return nr;
}

void IOPortal::return_cached_blocks_impl(Block* b) {
    iobuf::release_tls_block_chain(b);
}

IOBufAsZeroCopyInputStream::IOBufAsZeroCopyInputStream(const IOBuf& buf)
    : _nref(buf._ref_num())
    , _ref_index(0)
    , _add_offset(0)
    , _byte_count(0)
    , _buf(&buf) {
    _cur_ref = (_nref > 0 ? &buf._ref_at(0) : NULL);
}

bool IOBufAsZeroCopyInputStream::Next(const void** data, int* size) {
    if (_cur_ref != NULL) {
        *data = _cur_ref->block->data + _cur_ref->offset + _add_offset;
        // Impl. of Backup/Skip guarantees that _add_offset < _cur_ref->length.
        *size = _cur_ref->length - _add_offset;
        _byte_count += _cur_ref->length - _add_offset;
        _add_offset = 0;
        ++_ref_index;
        _cur_ref = (_ref_index < _nref ? &_buf->_ref_at(_ref_index) : NULL);
        return true;
    }
    return false;
}

void IOBufAsZeroCopyInputStream::BackUp(int count) {
    if (_ref_index > 0) {
        --_ref_index;
        _cur_ref = &_buf->_ref_at(_ref_index);
        CHECK(_add_offset == 0 && _cur_ref->length >= (uint32_t)count)
            << "BackUp() is not after a Next()";
        _add_offset = _cur_ref->length - count;
        _byte_count -= count;
    } else {
        LOG(FATAL) << "BackUp an empty ZeroCopyInputStream";
    }
}

bool IOBufAsZeroCopyInputStream::Skip(int count) {
    if (_cur_ref != NULL) {
        do {
            const int left_bytes = _cur_ref->length - _add_offset;
            if (count < left_bytes) {
                _add_offset += count;
                _byte_count += count;
                return true;
            }
            count -= left_bytes;
            _add_offset = 0;
            _byte_count += left_bytes; 
            if (++_ref_index < _nref) {
                _cur_ref = &_buf->_ref_at(_ref_index);
                continue;
            }
            _cur_ref = NULL;
            return false;
        } while (1);
    }
    return false;
}

google::protobuf::int64 IOBufAsZeroCopyInputStream::ByteCount() const {
    return _byte_count;
}

IOBufAsZeroCopyOutputStream::IOBufAsZeroCopyOutputStream(IOBuf* buf)
    : _buf(buf), _block_size(0), _cur_block(NULL), _byte_count(0) {
}

IOBufAsZeroCopyOutputStream::IOBufAsZeroCopyOutputStream(
    IOBuf *buf, uint32_t block_size)
    : _buf(buf)
    , _block_size(block_size)
    , _cur_block(NULL)
    , _byte_count(0) {
    
    if (_block_size <= offsetof(IOBuf::Block, data)) {
        throw std::invalid_argument("block_size is too small");
    }
}

IOBufAsZeroCopyOutputStream::~IOBufAsZeroCopyOutputStream() {
    _release_block();
}

bool IOBufAsZeroCopyOutputStream::Next(void** data, int* size) {
    if (_cur_block == NULL || _cur_block->full()) {
        _release_block();
        if (_block_size > 0) {
            _cur_block = iobuf::create_block(_block_size);
        } else {
            _cur_block = iobuf::acquire_tls_block();
        }
        if (_cur_block == NULL) {
            return false;
        }
    }
    const IOBuf::BlockRef r = { _cur_block->size, 
                                (uint32_t)_cur_block->left_space(),
                                _cur_block };
    *data = _cur_block->data + r.offset;
    *size = r.length;
    _cur_block->size = _cur_block->cap;
    _buf->_push_back_ref(r);
    _byte_count += r.length;
    return true;
}

void IOBufAsZeroCopyOutputStream::BackUp(int count) {
    while (!_buf->empty()) {
        IOBuf::BlockRef& r = _buf->_back_ref();
        if (_cur_block) {  
            // A ordinary BackUp that should be supported by all ZeroCopyOutputStream
            // _cur_block must match end of the IOBuf
            if (r.block != _cur_block) {
                LOG(FATAL) << "r.block=" << r.block
                           << " does not match _cur_block=" << _cur_block;
                return;
            }
            if (r.offset + r.length != _cur_block->size) {
                LOG(FATAL) << "r.offset(" << r.offset << ") + r.length("
                           << r.length << ") != _cur_block->size("
                           << _cur_block->size << ")";
                return;
            }
        } else {
            // An extended BackUp which is undefined in regular 
            // ZeroCopyOutputStream. The `count' given by user is larger than 
            // size of last _cur_block (already released in last iteration).
            if (r.block->ref_count() == 1) {
                // A special case: the block is only referenced by last
                // BlockRef of _buf. Safe to allocate more on the block.
                if (r.offset + r.length != r.block->size) {
                    LOG(FATAL) << "r.offset(" << r.offset << ") + r.length("
                               << r.length << ") != r.block->size("
                               << r.block->size << ")";
                    return;
                }
            } else if (r.offset + r.length != r.block->size) {
                // Last BlockRef does not match end of the block (which is
                // used by other IOBuf already). Unsafe to re-reference
                // the block and allocate more, just pop the bytes.
                _byte_count -= _buf->pop_back(count);
                return;
            } // else Last BlockRef matches end of the block. Even if the
            // block is shared by other IOBuf, it's safe to allocate bytes
            // after block->size.
            _cur_block = r.block;
            _cur_block->inc_ref();
        }
        if (BAIDU_LIKELY(r.length > (uint32_t)count)) {
            r.length -= count;
            if (!_buf->_small()) {
                _buf->_bv.nbytes -= count;
            }
            _cur_block->size -= count;
            _byte_count -= count;
            // Release block for TLS before quiting BackUp() for other
            // code to reuse the block even if this wrapper object is
            // not destructed. Example:
            //    IOBufAsZeroCopyOutputStream wrapper(...);
            //    ParseFromZeroCopyStream(&wrapper, ...); // Calls BackUp
            //    IOBuf buf;
            //    buf.append("foobar");  // can reuse the TLS block.
            if (_block_size == 0) {
                iobuf::release_tls_block(_cur_block);
                _cur_block = NULL;
            }
            return;
        }
        _cur_block->size -= r.length;
        _byte_count -= r.length;
        count -= r.length;
        _buf->_pop_back_ref();
        _release_block();
        if (count == 0) {
            return;
        }
    }
    LOG_IF(FATAL, count != 0) << "BackUp an empty IOBuf";
}

google::protobuf::int64 IOBufAsZeroCopyOutputStream::ByteCount() const {
    return _byte_count;
}

void IOBufAsZeroCopyOutputStream::_release_block() {
    if (_block_size > 0) {
        if (_cur_block) {
            _cur_block->dec_ref();
        }
    } else {
        iobuf::release_tls_block(_cur_block);
    }
    _cur_block = NULL;
}

IOBufAsSnappySink::IOBufAsSnappySink(butil::IOBuf& buf)
    : _cur_buf(NULL), _cur_len(0), _buf(&buf), _buf_stream(&buf) {
}

void IOBufAsSnappySink::Append(const char* bytes, size_t n) {
    if (_cur_len > 0) {
        CHECK(bytes == _cur_buf && static_cast<int>(n) <= _cur_len)
            << "bytes must be _cur_buf";
        _buf_stream.BackUp(_cur_len - n);
        _cur_len = 0;
    } else {
        _buf->append(bytes, n);
    }
}

char* IOBufAsSnappySink::GetAppendBuffer(size_t length, char* scratch) {
    // TODO: butil::IOBuf supports dynamic sized blocks.
    if (length <= _buf->DEFAULT_PAYLOAD) {
        if (_buf_stream.Next(reinterpret_cast<void**>(&_cur_buf), &_cur_len)) { 
            if (_cur_len >= static_cast<int>(length)) {
                return _cur_buf;
            } else {
                _buf_stream.BackUp(_cur_len);
            }
        } else {
            LOG(FATAL) << "Fail to alloc buffer";
        }
    } // else no need to try.
    _cur_buf = NULL;
    _cur_len = 0;
    return scratch;
}

size_t IOBufAsSnappySource::Available() const {
    return _buf->length() - _stream.ByteCount();
}

void IOBufAsSnappySource::Skip(size_t n) {
    _stream.Skip(n);
}

const char* IOBufAsSnappySource::Peek(size_t* len) {
    const char* buffer = NULL;
    int res = 0;
    if (_stream.Next((const void**)&buffer, &res)) {
        *len = res;
        // Source::Peek requires no reposition.
        _stream.BackUp(*len);
        return buffer;
    } else {
        *len = 0;
        return NULL;
    }
}

IOBufAppender::IOBufAppender()
    : _data(NULL)
    , _data_end(NULL)
    , _zc_stream(&_buf) {
}

}  // namespace butil

void* fast_memcpy(void *__restrict dest, const void *__restrict src, size_t n) {
    return butil::iobuf::cp(dest, src, n);
}