Commit 2b748f82 authored by gejun's avatar gejun

Rename read_bytes() in RecordReader to offset() to avoid misusage and adjust the UT

parent 50aaa985
...@@ -210,7 +210,7 @@ int RecordReader::CutRecord(Record* rec) { ...@@ -210,7 +210,7 @@ int RecordReader::CutRecord(Record* rec) {
if (*(const uint32_t*)dummy != *(const uint32_t*)BRPC_RECORDIO_MAGIC) { if (*(const uint32_t*)dummy != *(const uint32_t*)BRPC_RECORDIO_MAGIC) {
LOG(ERROR) << "Invalid magic_num=" LOG(ERROR) << "Invalid magic_num="
<< butil::PrintedAsBinary(std::string((char*)headbuf, 4)) << butil::PrintedAsBinary(std::string((char*)headbuf, 4))
<< ", offset=" << read_bytes(); << ", offset=" << offset();
return -1; return -1;
} }
uint32_t tmp = NetToHost32(*(const uint32_t*)(headbuf + 4)); uint32_t tmp = NetToHost32(*(const uint32_t*)(headbuf + 4));
...@@ -224,7 +224,7 @@ int RecordReader::CutRecord(Record* rec) { ...@@ -224,7 +224,7 @@ int RecordReader::CutRecord(Record* rec) {
<< std::hex << tmp << std::dec << std::hex << tmp << std::dec
<< "(metabit=" << has_meta << "(metabit=" << has_meta
<< " size=" << data_size << " size=" << data_size
<< " offset=" << read_bytes() << " offset=" << offset()
<< "), expected=" << (unsigned)headbuf[8] << "), expected=" << (unsigned)headbuf[8]
<< " actual=" << (unsigned)checksum; << " actual=" << (unsigned)checksum;
return -1; return -1;
...@@ -233,7 +233,7 @@ int RecordReader::CutRecord(Record* rec) { ...@@ -233,7 +233,7 @@ int RecordReader::CutRecord(Record* rec) {
LOG(ERROR) << "data_size=" << data_size LOG(ERROR) << "data_size=" << data_size
<< " is larger than -recordio_max_record_size=" << " is larger than -recordio_max_record_size="
<< FLAGS_recordio_max_record_size << FLAGS_recordio_max_record_size
<< ", offset=" << read_bytes(); << ", offset=" << offset();
return -1; return -1;
} }
if (_cutter.remaining_bytes() < data_size) { if (_cutter.remaining_bytes() < data_size) {
...@@ -257,13 +257,13 @@ int RecordReader::CutRecord(Record* rec) { ...@@ -257,13 +257,13 @@ int RecordReader::CutRecord(Record* rec) {
if (consumed_bytes + 5 + name_size + meta_size > data_size) { if (consumed_bytes + 5 + name_size + meta_size > data_size) {
LOG(ERROR) << name << ".meta_size=" << meta_size LOG(ERROR) << name << ".meta_size=" << meta_size
<< " is inconsistent with its data_size=" << data_size << " is inconsistent with its data_size=" << data_size
<< ", offset=" << read_bytes(); << ", offset=" << offset();
return -1; return -1;
} }
butil::IOBuf* meta = rec->MutableMeta(name, true/*null_on_found*/); butil::IOBuf* meta = rec->MutableMeta(name, true/*null_on_found*/);
if (meta == NULL) { if (meta == NULL) {
LOG(ERROR) << "Fail to add meta=" << name LOG(ERROR) << "Fail to add meta=" << name
<< ", offset=" << read_bytes(); << ", offset=" << offset();
return -1; return -1;
} }
_cutter.cutn(meta, meta_size); _cutter.cutn(meta, meta_size);
......
...@@ -102,8 +102,10 @@ public: ...@@ -102,8 +102,10 @@ public:
// END_OF_READER means all data in the IReader are successfully consumed. // END_OF_READER means all data in the IReader are successfully consumed.
int last_error() const { return _last_error; } int last_error() const { return _last_error; }
// Total bytes of all read records. // Total bytes consumed.
size_t read_bytes() const { return _ncut; } // NOTE: this value may not equal to read bytes from the IReader even if
// the reader runs out, due to parsing errors.
size_t offset() const { return _ncut; }
private: private:
bool CutUntilNextRecordCandidate(); bool CutUntilNextRecordCandidate();
......
...@@ -164,7 +164,7 @@ TEST(RecordIOTest, write_read_basic) { ...@@ -164,7 +164,7 @@ TEST(RecordIOTest, write_read_basic) {
ASSERT_FALSE(rr.ReadNext(NULL)); ASSERT_FALSE(rr.ReadNext(NULL));
ASSERT_EQ((int)butil::RecordReader::END_OF_READER, rr.last_error()); ASSERT_EQ((int)butil::RecordReader::END_OF_READER, rr.last_error());
ASSERT_EQ(sw.str().size(), rr.read_bytes()); ASSERT_EQ(sw.str().size(), rr.offset());
} }
TEST(RecordIOTest, incomplete_reader) { TEST(RecordIOTest, incomplete_reader) {
...@@ -208,7 +208,7 @@ TEST(RecordIOTest, incomplete_reader) { ...@@ -208,7 +208,7 @@ TEST(RecordIOTest, incomplete_reader) {
ASSERT_FALSE(rr.ReadNext(NULL)); ASSERT_FALSE(rr.ReadNext(NULL));
ASSERT_EQ(EAGAIN, rr.last_error()); ASSERT_EQ(EAGAIN, rr.last_error());
ASSERT_EQ(sw.str().size(), rr.read_bytes()); ASSERT_EQ(sw.str().size(), rr.offset());
} }
static std::string rand_string(int min_len, int max_len) { static std::string rand_string(int min_len, int max_len) {
...@@ -299,8 +299,8 @@ TEST(RecordIOTest, write_read_random) { ...@@ -299,8 +299,8 @@ TEST(RecordIOTest, write_read_random) {
ASSERT_EQ(name_value_list[j].second, *r.MetaAt(0).data); ASSERT_EQ(name_value_list[j].second, *r.MetaAt(0).data);
} }
ASSERT_EQ((int)butil::RecordReader::END_OF_READER, rr.last_error()); ASSERT_EQ((int)butil::RecordReader::END_OF_READER, rr.last_error());
ASSERT_EQ(str.size(), rr.read_bytes());
ASSERT_EQ(j, name_value_list.size()); ASSERT_EQ(j, name_value_list.size());
ASSERT_LE(str.size() - rr.offset(), 3);
} }
} // namespace } // namespace
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment