Commit 5e8b892a authored by Ge Jun's avatar Ge Jun

IOBuf supports appending and owning user allocated buffer

parent 00d85f46
......@@ -180,33 +180,84 @@ size_t IOBuf::new_bigview_count() {
return iobuf::g_newbigview.load(butil::memory_order_relaxed);
}
const uint16_t IOBUF_BLOCK_FLAGS_USER_DATA = 0x1;
typedef void (*UserDataDeleter)(void*);
struct UserDataExtension {
UserDataDeleter deleter;
};
struct IOBuf::Block {
butil::atomic<int> nshared;
uint16_t size;
uint16_t cap;
uint16_t flags;
uint16_t abi_check; // original cap, never be zero.
uint32_t size;
uint32_t cap;
Block* portal_next;
char data[0];
// When flag is 0, data points to `size` bytes starting at `(char*)this+sizeof(Block)'
// When flag & IOBUF_BLOCK_FLAGS_USER_DATA is not 0, data points the user data and
// there's a UserDataExtension object put at `(char*)this+sizeof(Block)'
char* data;
explicit Block(size_t block_size)
: nshared(1), size(0), cap(block_size - offsetof(Block, data))
, portal_next(NULL) {
assert(block_size <= MAX_BLOCK_SIZE);
Block(char* data_in, uint32_t data_size)
: nshared(1)
, flags(0)
, abi_check(0)
, size(0)
, cap(data_size)
, portal_next(NULL)
, data(data_in) {
iobuf::g_nblock.fetch_add(1, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_add(block_size, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_add(data_size + sizeof(Block),
butil::memory_order_relaxed);
}
Block(char* data_in, uint32_t data_size, UserDataDeleter deleter)
: nshared(1)
, flags(IOBUF_BLOCK_FLAGS_USER_DATA)
, abi_check(0)
, size(data_size)
, cap(data_size)
, portal_next(NULL)
, data(data_in) {
get_user_data_extension()->deleter = deleter;
}
// Undefined behavior when (flags & IOBUF_BLOCK_FLAGS_USER_DATA) is 0.
UserDataExtension* get_user_data_extension() {
char* p = (char*)this;
return (UserDataExtension*)(p + sizeof(Block));
}
inline void check_abi() {
#ifndef NDEBUG
if (abi_check != 0) {
LOG(FATAL) << "Your program seems to wrongly contain two "
"ABI-incompatible implementations of IOBuf";
}
#endif
}
void inc_ref() {
check_abi();
nshared.fetch_add(1, butil::memory_order_relaxed);
}
void dec_ref() {
check_abi();
if (nshared.fetch_sub(1, butil::memory_order_release) == 1) {
butil::atomic_thread_fence(butil::memory_order_acquire);
iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_sub(cap + offsetof(Block, data),
butil::memory_order_relaxed);
this->~Block();
iobuf::blockmem_deallocate(this);
if (!flags) {
iobuf::g_nblock.fetch_sub(1, butil::memory_order_relaxed);
iobuf::g_blockmem.fetch_sub(cap + sizeof(Block),
butil::memory_order_relaxed);
this->~Block();
iobuf::blockmem_deallocate(this);
} else if (flags & IOBUF_BLOCK_FLAGS_USER_DATA) {
get_user_data_extension()->deleter(data);
this->~Block();
free(this);
}
}
}
......@@ -227,16 +278,21 @@ IOBuf::Block* get_portal_next(IOBuf::Block const* b) {
return b->portal_next;
}
uint16_t block_cap(IOBuf::Block const *b) {
uint32_t block_cap(IOBuf::Block const *b) {
return b->cap;
}
inline IOBuf::Block* create_block(const size_t block_size) {
void* mem = iobuf::blockmem_allocate(block_size);
if (BAIDU_LIKELY(mem != NULL)) {
return new (mem) IOBuf::Block(block_size);
if (block_size > 0xFFFFFFFFULL) {
LOG(FATAL) << "block_size=" << block_size << " is too large";
return NULL;
}
return NULL;
char* mem = (char*)iobuf::blockmem_allocate(block_size);
if (mem == NULL) {
return NULL;
}
return new (mem) IOBuf::Block(mem + sizeof(IOBuf::Block),
block_size - sizeof(IOBuf::Block));
}
inline IOBuf::Block* create_block() {
......@@ -460,7 +516,6 @@ BAIDU_CASSERT(IOBuf::DEFAULT_BLOCK_SIZE/4096*4096 == IOBuf::DEFAULT_BLOCK_SIZE,
sizeof_block_should_be_multiply_of_4096);
const IOBuf::Area IOBuf::INVALID_AREA;
const size_t IOBuf::DEFAULT_PAYLOAD;
IOBuf::IOBuf(const IOBuf& rhs) {
if (rhs._small()) {
......@@ -1158,6 +1213,24 @@ int IOBuf::appendv(const const_iovec* vec, size_t n) {
return 0;
}
int IOBuf::append_user_data(void* data, size_t size, void (*deleter)(void*)) {
if (size > 0xFFFFFFFFULL - 100) {
LOG(FATAL) << "data_size=" << size << " is too large";
return -1;
}
char* mem = (char*)malloc(sizeof(IOBuf::Block) + sizeof(UserDataExtension));
if (mem == NULL) {
return -1;
}
if (deleter == NULL) {
deleter = ::free;
}
IOBuf::Block* b = new (mem) IOBuf::Block((char*)data, size, deleter);
const IOBuf::BlockRef r = { 0, b->cap, b };
_move_back_ref(r);
return 0;
}
int IOBuf::resize(size_t n, char c) {
const size_t saved_len = length();
if (n < saved_len) {
......@@ -1947,7 +2020,7 @@ void IOBufAsSnappySink::Append(const char* bytes, size_t n) {
char* IOBufAsSnappySink::GetAppendBuffer(size_t length, char* scratch) {
// TODO: butil::IOBuf supports dynamic sized blocks.
if (length <= _buf->DEFAULT_PAYLOAD) {
if (length <= 8000/*just a hint*/) {
if (_buf_stream.Next(reinterpret_cast<void**>(&_cur_buf), &_cur_len)) {
if (_cur_len >= static_cast<int>(length)) {
return _cur_buf;
......
......@@ -55,9 +55,6 @@ friend class IOBufAsZeroCopyInputStream;
friend class IOBufAsZeroCopyOutputStream;
public:
static const size_t DEFAULT_BLOCK_SIZE = 8192;
static const size_t DEFAULT_PAYLOAD = DEFAULT_BLOCK_SIZE - 16/*impl dependent*/;
static const size_t MAX_BLOCK_SIZE = (1 << 16);
static const size_t MAX_PAYLOAD = MAX_BLOCK_SIZE - 16/*impl dependent*/;
static const size_t INITIAL_CAP = 32; // must be power of 2
struct Block;
......@@ -236,6 +233,11 @@ public:
// NOTE: Returns 0 when `s' is empty.
int append(const std::string& s);
// Append the user-data to back side WITHOUT copying.
// The user-data can be split and shared by smaller IOBufs and will be
// deleted using the deleter func when no IOBuf references it anymore.
int append_user_data(void* data, size_t size, void (*deleter)(void*));
// Resizes the buf to a length of n characters.
// If n is smaller than the current length, all bytes after n will be
// truncated.
......
......@@ -38,6 +38,10 @@ IOBuf::Block* get_portal_next(IOBuf::Block const* b);
}
namespace {
const size_t BLOCK_OVERHEAD = 32; //impl dependent
const size_t DEFAULT_PAYLOAD = butil::IOBuf::DEFAULT_BLOCK_SIZE - BLOCK_OVERHEAD;
void check_tls_block() {
ASSERT_EQ((butil::IOBuf::Block*)NULL, butil::iobuf::get_tls_block_head());
printf("tls_block of butil::IOBuf was deleted\n");
......@@ -146,7 +150,7 @@ TEST_F(IOBufTest, pop_front) {
ASSERT_EQ(0UL, buf.length());
ASSERT_TRUE(buf.empty());
for (size_t i = 0; i < butil::IOBuf::DEFAULT_PAYLOAD * 3/2; ++i) {
for (size_t i = 0; i < DEFAULT_PAYLOAD * 3/2; ++i) {
s.push_back(i);
}
buf.append(s);
......@@ -187,7 +191,7 @@ TEST_F(IOBufTest, pop_back) {
ASSERT_EQ(0UL, buf.length());
ASSERT_TRUE(buf.empty());
for (size_t i = 0; i < butil::IOBuf::DEFAULT_PAYLOAD * 3/2; ++i) {
for (size_t i = 0; i < DEFAULT_PAYLOAD * 3/2; ++i) {
s.push_back(i);
}
buf.append(s);
......@@ -250,15 +254,15 @@ TEST_F(IOBufTest, appendv) {
b.to_string());
// Append some long stuff.
const size_t full_len = butil::IOBuf::DEFAULT_PAYLOAD * 9;
const size_t full_len = DEFAULT_PAYLOAD * 9;
char* str = (char*)malloc(full_len);
ASSERT_TRUE(str);
const size_t len1 = full_len / 6;
const size_t len2 = full_len / 3;
const size_t len3 = full_len - len1 - len2;
ASSERT_GT(len1, (size_t)butil::IOBuf::DEFAULT_PAYLOAD);
ASSERT_GT(len2, (size_t)butil::IOBuf::DEFAULT_PAYLOAD);
ASSERT_GT(len3, (size_t)butil::IOBuf::DEFAULT_PAYLOAD);
ASSERT_GT(len1, (size_t)DEFAULT_PAYLOAD);
ASSERT_GT(len2, (size_t)DEFAULT_PAYLOAD);
ASSERT_GT(len3, (size_t)DEFAULT_PAYLOAD);
ASSERT_EQ(full_len, len1 + len2 + len3);
for (size_t i = 0; i < full_len; ++i) {
......@@ -293,11 +297,11 @@ TEST_F(IOBufTest, reserve) {
ASSERT_EQ("goodohello blahblahfoobar", b.to_string());
// append a long string and assign again.
std::string s1(butil::IOBuf::DEFAULT_PAYLOAD * 3, '\0');
std::string s1(DEFAULT_PAYLOAD * 3, '\0');
for (size_t i = 0; i < s1.size(); ++i) {
s1[i] = i * 7;
}
ASSERT_EQ(butil::IOBuf::DEFAULT_PAYLOAD * 3, s1.size());
ASSERT_EQ(DEFAULT_PAYLOAD * 3, s1.size());
// remove everything after reserved area
ASSERT_GE(b.size(), NRESERVED1);
b.pop_back(b.size() - NRESERVED1);
......@@ -309,7 +313,7 @@ TEST_F(IOBufTest, reserve) {
// Reserve long
b.pop_back(b.size() - NRESERVED1);
ASSERT_EQ(NRESERVED1, b.size());
const size_t NRESERVED2 = butil::IOBuf::DEFAULT_PAYLOAD * 3;
const size_t NRESERVED2 = DEFAULT_PAYLOAD * 3;
const butil::IOBuf::Area a2 = b.reserve(NRESERVED2);
ASSERT_EQ(NRESERVED1 + NRESERVED2, b.size());
b.append(s1);
......@@ -589,7 +593,7 @@ TEST_F(IOBufTest, copy_to) {
src.append(seed);
}
b.append(src);
ASSERT_GT(b.size(), butil::IOBuf::DEFAULT_PAYLOAD);
ASSERT_GT(b.size(), DEFAULT_PAYLOAD);
std::string s1;
ASSERT_EQ(src.size(), b.copy_to(&s1));
ASSERT_EQ(src, s1);
......@@ -599,11 +603,11 @@ TEST_F(IOBufTest, copy_to) {
ASSERT_EQ(src.substr(0, 32), s2);
std::string s3;
const std::string expected = src.substr(butil::IOBuf::DEFAULT_PAYLOAD - 1, 33);
ASSERT_EQ(33u, b.copy_to(&s3, 33, butil::IOBuf::DEFAULT_PAYLOAD - 1));
const std::string expected = src.substr(DEFAULT_PAYLOAD - 1, 33);
ASSERT_EQ(33u, b.copy_to(&s3, 33, DEFAULT_PAYLOAD - 1));
ASSERT_EQ(expected, s3);
ASSERT_EQ(33u, b.append_to(&s3, 33, butil::IOBuf::DEFAULT_PAYLOAD - 1));
ASSERT_EQ(33u, b.append_to(&s3, 33, DEFAULT_PAYLOAD - 1));
ASSERT_EQ(expected + expected, s3);
butil::IOBuf b1;
......@@ -615,10 +619,10 @@ TEST_F(IOBufTest, copy_to) {
ASSERT_EQ(src.substr(0, 32), b2.to_string());
butil::IOBuf b3;
ASSERT_EQ(33u, b.append_to(&b3, 33, butil::IOBuf::DEFAULT_PAYLOAD - 1));
ASSERT_EQ(33u, b.append_to(&b3, 33, DEFAULT_PAYLOAD - 1));
ASSERT_EQ(expected, b3.to_string());
ASSERT_EQ(33u, b.append_to(&b3, 33, butil::IOBuf::DEFAULT_PAYLOAD - 1));
ASSERT_EQ(33u, b.append_to(&b3, 33, DEFAULT_PAYLOAD - 1));
ASSERT_EQ(expected + expected, b3.to_string());
}
......@@ -1088,7 +1092,7 @@ TEST_F(IOBufTest, extended_backup) {
butil::iobuf::remove_tls_block_chain();
butil::IOBuf src;
const int BLKSIZE = (i == 0 ? 1024 : butil::IOBuf::DEFAULT_BLOCK_SIZE);
const int PLDSIZE = BLKSIZE - 16; // impl dependent.
const int PLDSIZE = BLKSIZE - BLOCK_OVERHEAD;
butil::IOBufAsZeroCopyOutputStream out_stream1(&src, BLKSIZE);
butil::IOBufAsZeroCopyOutputStream out_stream2(&src);
butil::IOBufAsZeroCopyOutputStream & out_stream =
......@@ -1146,14 +1150,14 @@ TEST_F(IOBufTest, backup_iobuf_never_called_next) {
ASSERT_TRUE(dummy_stream.Next(&dummy_data, &dummy_size));
}
butil::IOBuf src;
const size_t N = butil::IOBuf::DEFAULT_PAYLOAD * 2;
const size_t N = DEFAULT_PAYLOAD * 2;
src.resize(N);
ASSERT_EQ(2u, src.backing_block_num());
ASSERT_EQ(N, src.size());
butil::IOBufAsZeroCopyOutputStream out_stream(&src);
out_stream.BackUp(1); // also succeed.
ASSERT_EQ(-1, out_stream.ByteCount());
ASSERT_EQ(butil::IOBuf::DEFAULT_PAYLOAD * 2 - 1, src.size());
ASSERT_EQ(DEFAULT_PAYLOAD * 2 - 1, src.size());
ASSERT_EQ(2u, src.backing_block_num());
void* data0 = NULL;
int size0 = 0;
......@@ -1232,7 +1236,7 @@ TEST_F(IOBufTest, own_block) {
}
ASSERT_EQ(static_cast<size_t>(alloc_size), buf.length());
ASSERT_EQ(saved_tls_block, butil::iobuf::get_tls_block_head());
ASSERT_EQ(butil::iobuf::block_cap(buf._front_ref().block), BLOCK_SIZE - 16);
ASSERT_EQ(butil::iobuf::block_cap(buf._front_ref().block), BLOCK_SIZE - BLOCK_OVERHEAD);
}
struct Foo1 {
......@@ -1555,4 +1559,84 @@ TEST_F(IOBufTest, copy_to_string_from_iterator) {
}
ASSERT_EQ(nc, b0.length());
}
static void* my_free_params = NULL;
static void my_free(void* m) {
free(m);
my_free_params = m;
}
TEST_F(IOBufTest, append_user_data_and_consume) {
butil::IOBuf b0;
const int REP = 16;
const int len = REP * 256;
char* data = (char*)malloc(len);
for (int i = 0; i < 256; ++i) {
for (int j = 0; j < REP; ++j) {
data[i * REP + j] = (char)i;
}
}
my_free_params = NULL;
ASSERT_EQ(0, b0.append_user_data(data, len, my_free));
ASSERT_EQ(1UL, b0._ref_num());
butil::IOBuf::BlockRef r = b0._front_ref();
ASSERT_EQ(1, butil::iobuf::block_shared_count(r.block));
ASSERT_EQ(len, b0.size());
std::string out;
ASSERT_EQ(len, b0.cutn(&out, len));
ASSERT_TRUE(b0.empty());
ASSERT_EQ(data, my_free_params);
ASSERT_EQ(len, out.size());
// note: cannot memcmp with data which is already free-ed
for (int i = 0; i < 256; ++i) {
for (int j = 0; j < REP; ++j) {
ASSERT_EQ((char)i, out[i * REP + j]);
}
}
}
TEST_F(IOBufTest, append_user_data_and_share) {
butil::IOBuf b0;
const int REP = 16;
const int len = REP * 256;
char* data = (char*)malloc(len);
for (int i = 0; i < 256; ++i) {
for (int j = 0; j < REP; ++j) {
data[i * REP + j] = (char)i;
}
}
my_free_params = NULL;
ASSERT_EQ(0, b0.append_user_data(data, len, my_free));
ASSERT_EQ(1UL, b0._ref_num());
butil::IOBuf::BlockRef r = b0._front_ref();
ASSERT_EQ(1, butil::iobuf::block_shared_count(r.block));
ASSERT_EQ(len, b0.size());
{
butil::IOBuf bufs[256];
for (int i = 0; i < 256; ++i) {
ASSERT_EQ(REP, b0.cutn(&bufs[i], REP));
ASSERT_EQ(len - (i+1) * REP, b0.size());
if (i != 255) {
ASSERT_EQ(1UL, b0._ref_num());
butil::IOBuf::BlockRef r = b0._front_ref();
ASSERT_EQ(i + 2, butil::iobuf::block_shared_count(r.block));
} else {
ASSERT_EQ(0UL, b0._ref_num());
ASSERT_TRUE(b0.empty());
}
}
ASSERT_EQ(NULL, my_free_params);
for (int i = 0; i < 256; ++i) {
std::string out = bufs[i].to_string();
ASSERT_EQ(REP, out.size());
for (int j = 0; j < REP; ++j) {
ASSERT_EQ((char)i, out[j]);
}
}
}
ASSERT_EQ(data, my_free_params);
}
} // namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment