Commit a6886807 authored by gejun's avatar gejun

Add reader and writer for binary records

parent 3b4b0e6a
......@@ -146,6 +146,7 @@ BUTIL_SOURCES = \
src/butil/containers/case_ignored_flat_map.cpp \
src/butil/iobuf.cpp \
src/butil/binary_printer.cpp \
src/butil/recordio.cc \
src/butil/popen.cpp
ifeq ($(SYSTEM), Linux)
......
This diff is collapsed.
// recordio - A binary format to transport data from end to end.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Author: Ge,Jun (jge666@gmail.com)
// Date: Thu Nov 22 13:57:56 CST 2012
#ifndef BUTIL_RECORDIO_H
#define BUTIL_RECORDIO_H
#include "butil/iobuf.h"
#include <memory>
namespace butil {
class Record {
public:
struct NamedMeta {
std::string name;
std::shared_ptr<butil::IOBuf> data;
};
// Number of meta
size_t MetaCount() const { return _metas.size(); }
// Get i-th Meta, out-of-range accesses may crash.
const NamedMeta& MetaAt(size_t i) const { return _metas[i]; }
// Get meta by name.
const butil::IOBuf* Meta(const char* name) const;
// Add meta.
// Returns a modifiable pointer to the meta with the name.
// If null_on_found is true and meta with the name is present, NULL is returned.
butil::IOBuf* MutableMeta(const char* name, bool null_on_found = false);
butil::IOBuf* MutableMeta(const std::string& name, bool null_on_found = false);
// Remove meta with the name.
// Returns true on erased.
bool RemoveMeta(const butil::StringPiece& name);
// Get the payload.
const butil::IOBuf& Payload() const { return _payload; }
// Get a modifiable pointer to the payload.
butil::IOBuf* MutablePayload() { return &_payload; }
// Clear payload and remove all meta.
void Clear();
// Byte size of serialized form of this record.
size_t ByteSize() const;
private:
butil::IOBuf _payload;
std::vector<NamedMeta> _metas;
};
// Parse records from the IReader, corrupted records will be skipped.
// Example:
// RecordReader rd(ireader);
// Record rec;
// while (rd.ReadNext(&rec)) {
// HandleRecord(rec);
// }
// if (rd.last_error() != RecordReader::END_OF_READER) {
// LOG(FATAL) << "Critical error occurred";
// }
class RecordReader {
public:
static const int END_OF_READER = -1;
explicit RecordReader(IReader* reader);
// Returns true on success and `out' is overwritten by the record.
// False otherwise, check last_error() for the error which is treated as permanent.
bool ReadNext(Record* out);
// 0 means no error.
// END_OF_READER means all bytes in the input reader are consumed and
// turned into records.
int last_error() const { return _last_error; }
// Total bytes of all read records.
size_t read_bytes() const { return _ncut; }
private:
bool CutUntilNextRecordCandidate();
int CutRecord(Record* rec);
private:
IReader* _reader;
IOPortal _portal;
IOBufCutter _cutter;
size_t _ncut;
int _last_error;
};
// Write records into the IWriter.
class RecordWriter {
public:
explicit RecordWriter(IWriter* writer);
// Serialize the record into internal buffer and NOT flush into the IWriter.
int WriteWithoutFlush(const Record&);
// Serialize the record into internal buffer and flush into the IWriter.
int Write(const Record&);
// Flush internal buffer into the IWriter.
// Returns 0 on success, error code otherwise.
int Flush();
private:
IOBuf _buf;
IWriter* _writer;
};
} // namespace butil
#endif // BUTIL_RECORDIO_H
......@@ -110,6 +110,7 @@ TEST_BUTIL_SOURCES = \
flat_map_unittest.cpp \
crc32c_unittest.cc \
iobuf_unittest.cpp \
recordio_unittest.cpp \
test_switches.cc \
scoped_locale.cc \
popen_unittest.cpp \
......
#include <gtest/gtest.h>
#include "butil/recordio.h"
#include "butil/fast_rand.h"
#include "butil/string_printf.h"
#include "butil/file_util.h"
namespace {
class StringReader : public butil::IReader {
public:
StringReader(const std::string& str)
: _str(str), _offset(0) {}
ssize_t ReadV(const iovec* iov, int iovcnt) override {
size_t total_nc = 0;
for (int i = 0; i < iovcnt; ++i) {
void* dst = iov[i].iov_base;
size_t len = iov[i].iov_len;
size_t remain = _str.size() - _offset;
size_t nc = std::min(len, remain);
memcpy(dst, _str.data() + _offset, nc);
_offset += nc;
total_nc += nc;
if (_offset == _str.size()) {
break;
}
}
return total_nc;
}
private:
std::string _str;
size_t _offset;
};
class StringWriter : public butil::IWriter {
public:
ssize_t WriteV(const iovec* iov, int iovcnt) override {
const size_t old_size = _str.size();
for (int i = 0; i < iovcnt; ++i) {
_str.append((char*)iov[i].iov_base, iov[i].iov_len);
}
return _str.size() - old_size;
}
const std::string& str() const { return _str; }
private:
std::string _str;
};
TEST(RecordIOTest, empty_record) {
butil::Record r;
ASSERT_EQ((size_t)0, r.MetaCount());
ASSERT_TRUE(r.Meta("foo") == NULL);
ASSERT_FALSE(r.RemoveMeta("foo"));
ASSERT_TRUE(r.Payload().empty());
ASSERT_TRUE(r.MutablePayload()->empty());
}
TEST(RecordIOTest, manipulate_record) {
butil::Record r1;
ASSERT_EQ((size_t)0, r1.MetaCount());
butil::IOBuf* foo_val = r1.MutableMeta("foo");
ASSERT_EQ((size_t)1, r1.MetaCount());
ASSERT_TRUE(foo_val->empty());
foo_val->append("foo_data");
ASSERT_EQ(foo_val, r1.MutableMeta("foo"));
ASSERT_EQ((size_t)1, r1.MetaCount());
ASSERT_EQ("foo_data", *foo_val);
ASSERT_EQ(foo_val, r1.Meta("foo"));
butil::IOBuf* bar_val = r1.MutableMeta("bar");
ASSERT_EQ((size_t)2, r1.MetaCount());
ASSERT_TRUE(bar_val->empty());
bar_val->append("bar_data");
ASSERT_EQ(bar_val, r1.MutableMeta("bar"));
ASSERT_EQ((size_t)2, r1.MetaCount());
ASSERT_EQ("bar_data", *bar_val);
ASSERT_EQ(bar_val, r1.Meta("bar"));
butil::Record r2 = r1;
ASSERT_TRUE(r1.RemoveMeta("foo"));
ASSERT_EQ((size_t)1, r1.MetaCount());
ASSERT_TRUE(r1.Meta("foo") == NULL);
ASSERT_EQ(foo_val, r2.Meta("foo"));
ASSERT_EQ("foo_data", *foo_val);
}
TEST(RecordIOTest, invalid_name) {
char name[258];
for (size_t i = 0; i < sizeof(name); ++i) {
name[i] = 'a';
}
name[sizeof(name) - 1] = 0;
butil::Record r;
ASSERT_EQ(NULL, r.MutableMeta(name));
}
TEST(RecordIOTest, write_read_basic) {
StringWriter sw;
butil::RecordWriter rw(&sw);
butil::Record src;
ASSERT_EQ(0, rw.Write(src));
butil::IOBuf* foo_val = src.MutableMeta("foo");
foo_val->append("foo_data");
ASSERT_EQ(0, rw.Write(src));
butil::IOBuf* bar_val = src.MutableMeta("bar");
bar_val->append("bar_data");
ASSERT_EQ(0, rw.Write(src));
src.MutablePayload()->append("payload_data");
ASSERT_EQ(0, rw.Write(src));
ASSERT_EQ(0, rw.Flush());
std::cout << "len=" << sw.str().size()
<< " content=" << butil::PrintedAsBinary(sw.str(), 256) << std::endl;
StringReader sr(sw.str());
butil::RecordReader rr(&sr);
butil::Record r1;
ASSERT_TRUE(rr.ReadNext(&r1));
ASSERT_EQ(0, rr.last_error());
ASSERT_EQ((size_t)0, r1.MetaCount());
ASSERT_TRUE(r1.Payload().empty());
butil::Record r2;
ASSERT_TRUE(rr.ReadNext(&r2));
ASSERT_EQ(0, rr.last_error());
ASSERT_EQ((size_t)1, r2.MetaCount());
ASSERT_EQ("foo", r2.MetaAt(0).name);
ASSERT_EQ("foo_data", *r2.MetaAt(0).data);
ASSERT_TRUE(r2.Payload().empty());
butil::Record r3;
ASSERT_TRUE(rr.ReadNext(&r3));
ASSERT_EQ(0, rr.last_error());
ASSERT_EQ((size_t)2, r3.MetaCount());
ASSERT_EQ("foo", r3.MetaAt(0).name);
ASSERT_EQ("foo_data", *r3.MetaAt(0).data);
ASSERT_EQ("bar", r3.MetaAt(1).name);
ASSERT_EQ("bar_data", *r3.MetaAt(1).data);
ASSERT_TRUE(r3.Payload().empty());
butil::Record r4;
ASSERT_TRUE(rr.ReadNext(&r4));
ASSERT_EQ(0, rr.last_error());
ASSERT_EQ((size_t)2, r4.MetaCount());
ASSERT_EQ("foo", r4.MetaAt(0).name);
ASSERT_EQ("foo_data", *r4.MetaAt(0).data);
ASSERT_EQ("bar", r4.MetaAt(1).name);
ASSERT_EQ("bar_data", *r4.MetaAt(1).data);
ASSERT_EQ("payload_data", r4.Payload());
ASSERT_FALSE(rr.ReadNext(NULL));
ASSERT_EQ((int)butil::RecordReader::END_OF_READER, rr.last_error());
ASSERT_EQ(sw.str().size(), rr.read_bytes());
}
static std::string rand_string(int min_len, int max_len) {
const int len = butil::fast_rand_in(min_len, max_len);
std::string str;
str.reserve(len);
for (int i = 0; i < len; ++i) {
str.push_back(butil::fast_rand_in('a', 'Z'));
}
return str;
}
TEST(RecordIOTest, write_read_random) {
StringWriter sw;
butil::RecordWriter rw(&sw);
const int N = 1024;
std::vector<std::pair<std::string, std::string>> name_value_list;
size_t nbytes = 0;
std::map<int, size_t> breaking_offsets;
for (int i = 0; i < N; ++i) {
butil::Record src;
std::string value = rand_string(10, 20);
std::string name = butil::string_printf("name_%d_", i) + value;
src.MutableMeta(name)->append(value);
ASSERT_EQ(0, rw.Write(src));
if (butil::fast_rand_less_than(70) == 0) {
breaking_offsets[i] = nbytes;
} else {
name_value_list.push_back(std::make_pair(name, value));
}
nbytes += src.ByteSize();
}
ASSERT_EQ(0, rw.Flush());
std::string str = sw.str();
ASSERT_EQ(nbytes, str.size());
// break some records
int break_idx = 0;
for (auto it = breaking_offsets.begin(); it != breaking_offsets.end(); ++it) {
switch (break_idx++ % 10) {
case 0:
str[it->second] = 'r';
break;
case 1:
str[it->second + 1] = 'd';
break;
case 2:
str[it->second + 2] = 'i';
break;
case 3:
str[it->second + 3] = 'o';
break;
case 4:
++str[it->second + 4];
break;
case 5:
str[it->second + 4] = 8;
break;
case 6:
++str[it->second + 5];
break;
case 7:
++str[it->second + 6];
break;
case 8:
++str[it->second + 7];
break;
case 9:
++str[it->second + 8];
break;
default:
ASSERT_TRUE(false) << "never";
}
}
ASSERT_EQ((size_t)N - breaking_offsets.size(), name_value_list.size());
std::cout << "sw.size=" << str.size()
<< " nbreak=" << breaking_offsets.size() << std::endl;
StringReader sr(str);
ASSERT_LT(0, butil::WriteFile(butil::FilePath("recordio_ref.io"), str.data(), str.size()));
butil::RecordReader rr(&sr);
size_t j = 0;
butil::Record r;
for (; rr.ReadNext(&r); ++j) {
ASSERT_LT(j, name_value_list.size());
ASSERT_EQ((size_t)1, r.MetaCount());
ASSERT_EQ(name_value_list[j].first, r.MetaAt(0).name) << j;
ASSERT_EQ(name_value_list[j].second, *r.MetaAt(0).data);
}
ASSERT_EQ((int)butil::RecordReader::END_OF_READER, rr.last_error());
ASSERT_EQ(str.size(), rr.read_bytes());
ASSERT_EQ(j, name_value_list.size());
}
} // namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment