// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <map> #include <gtest/gtest.h> #include "butil/recordio.h" #include "butil/fast_rand.h" #include "butil/string_printf.h" #include "butil/file_util.h" namespace { class StringReader : public butil::IReader { public: StringReader(const std::string& str, bool report_eagain_on_end = false) : _str(str) , _offset(0) , _report_eagain_on_end(report_eagain_on_end) {} ssize_t ReadV(const iovec* iov, int iovcnt) override { size_t total_nc = 0; for (int i = 0; i < iovcnt; ++i) { void* dst = iov[i].iov_base; size_t len = iov[i].iov_len; size_t remain = _str.size() - _offset; size_t nc = std::min(len, remain); memcpy(dst, _str.data() + _offset, nc); _offset += nc; total_nc += nc; if (_offset == _str.size()) { break; } } if (_report_eagain_on_end && total_nc == 0) { errno = EAGAIN; return -1; } return total_nc; } private: std::string _str; size_t _offset; bool _report_eagain_on_end; }; class StringWriter : public butil::IWriter { public: ssize_t WriteV(const iovec* iov, int iovcnt) override { const size_t old_size = _str.size(); for (int i = 0; i < iovcnt; ++i) { _str.append((char*)iov[i].iov_base, iov[i].iov_len); } return _str.size() - old_size; } const std::string& str() const { return _str; } private: std::string _str; }; TEST(RecordIOTest, empty_record) { butil::Record r; ASSERT_EQ((size_t)0, r.MetaCount()); ASSERT_TRUE(r.Meta("foo") == NULL); ASSERT_FALSE(r.RemoveMeta("foo")); ASSERT_TRUE(r.Payload().empty()); ASSERT_TRUE(r.MutablePayload()->empty()); } TEST(RecordIOTest, manipulate_record) { butil::Record r1; ASSERT_EQ((size_t)0, r1.MetaCount()); butil::IOBuf* foo_val = r1.MutableMeta("foo"); ASSERT_EQ((size_t)1, r1.MetaCount()); ASSERT_TRUE(foo_val->empty()); foo_val->append("foo_data"); ASSERT_EQ(foo_val, r1.MutableMeta("foo")); ASSERT_EQ((size_t)1, r1.MetaCount()); ASSERT_EQ("foo_data", *foo_val); ASSERT_EQ(foo_val, r1.Meta("foo")); butil::IOBuf* bar_val = r1.MutableMeta("bar"); ASSERT_EQ((size_t)2, r1.MetaCount()); ASSERT_TRUE(bar_val->empty()); bar_val->append("bar_data"); ASSERT_EQ(bar_val, r1.MutableMeta("bar")); ASSERT_EQ((size_t)2, r1.MetaCount()); ASSERT_EQ("bar_data", *bar_val); ASSERT_EQ(bar_val, r1.Meta("bar")); butil::Record r2 = r1; ASSERT_TRUE(r1.RemoveMeta("foo")); ASSERT_EQ((size_t)1, r1.MetaCount()); ASSERT_TRUE(r1.Meta("foo") == NULL); ASSERT_EQ(foo_val, r2.Meta("foo")); ASSERT_EQ("foo_data", *foo_val); } TEST(RecordIOTest, invalid_name) { char name[258]; for (size_t i = 0; i < sizeof(name); ++i) { name[i] = 'a'; } name[sizeof(name) - 1] = 0; butil::Record r; ASSERT_EQ(NULL, r.MutableMeta(name)); } TEST(RecordIOTest, write_read_basic) { StringWriter sw; butil::RecordWriter rw(&sw); butil::Record src; ASSERT_EQ(0, rw.Write(src)); butil::IOBuf* foo_val = src.MutableMeta("foo"); foo_val->append("foo_data"); ASSERT_EQ(0, rw.Write(src)); butil::IOBuf* bar_val = src.MutableMeta("bar"); bar_val->append("bar_data"); ASSERT_EQ(0, rw.Write(src)); src.MutablePayload()->append("payload_data"); ASSERT_EQ(0, rw.Write(src)); ASSERT_EQ(0, rw.Flush()); std::cout << "len=" << sw.str().size() << " content=" << butil::PrintedAsBinary(sw.str(), 256) << std::endl; StringReader sr(sw.str()); butil::RecordReader rr(&sr); butil::Record r1; ASSERT_TRUE(rr.ReadNext(&r1)); ASSERT_EQ(0, rr.last_error()); ASSERT_EQ((size_t)0, r1.MetaCount()); ASSERT_TRUE(r1.Payload().empty()); butil::Record r2; ASSERT_TRUE(rr.ReadNext(&r2)); ASSERT_EQ(0, rr.last_error()); ASSERT_EQ((size_t)1, r2.MetaCount()); ASSERT_EQ("foo", r2.MetaAt(0).name); ASSERT_EQ("foo_data", *r2.MetaAt(0).data); ASSERT_TRUE(r2.Payload().empty()); butil::Record r3; ASSERT_TRUE(rr.ReadNext(&r3)); ASSERT_EQ(0, rr.last_error()); ASSERT_EQ((size_t)2, r3.MetaCount()); ASSERT_EQ("foo", r3.MetaAt(0).name); ASSERT_EQ("foo_data", *r3.MetaAt(0).data); ASSERT_EQ("bar", r3.MetaAt(1).name); ASSERT_EQ("bar_data", *r3.MetaAt(1).data); ASSERT_TRUE(r3.Payload().empty()); butil::Record r4; ASSERT_TRUE(rr.ReadNext(&r4)); ASSERT_EQ(0, rr.last_error()); ASSERT_EQ((size_t)2, r4.MetaCount()); ASSERT_EQ("foo", r4.MetaAt(0).name); ASSERT_EQ("foo_data", *r4.MetaAt(0).data); ASSERT_EQ("bar", r4.MetaAt(1).name); ASSERT_EQ("bar_data", *r4.MetaAt(1).data); ASSERT_EQ("payload_data", r4.Payload()); ASSERT_FALSE(rr.ReadNext(NULL)); ASSERT_EQ((int)butil::RecordReader::END_OF_READER, rr.last_error()); ASSERT_EQ(sw.str().size(), rr.offset()); } TEST(RecordIOTest, incomplete_reader) { StringWriter sw; butil::RecordWriter rw(&sw); butil::Record src; butil::IOBuf* foo_val = src.MutableMeta("foo"); foo_val->append("foo_data"); ASSERT_EQ(0, rw.Write(src)); butil::IOBuf* bar_val = src.MutableMeta("bar"); bar_val->append("bar_data"); ASSERT_EQ(0, rw.Write(src)); ASSERT_EQ(0, rw.Flush()); std::string data = sw.str(); std::cout << "len=" << data.size() << " content=" << butil::PrintedAsBinary(data, 256) << std::endl; StringReader sr(data, true); butil::RecordReader rr(&sr); butil::Record r2; ASSERT_TRUE(rr.ReadNext(&r2)); ASSERT_EQ(0, rr.last_error()); ASSERT_EQ((size_t)1, r2.MetaCount()); ASSERT_EQ("foo", r2.MetaAt(0).name); ASSERT_EQ("foo_data", *r2.MetaAt(0).data); ASSERT_TRUE(r2.Payload().empty()); butil::Record r3; ASSERT_TRUE(rr.ReadNext(&r3)); ASSERT_EQ(0, rr.last_error()); ASSERT_EQ((size_t)2, r3.MetaCount()); ASSERT_EQ("foo", r3.MetaAt(0).name); ASSERT_EQ("foo_data", *r3.MetaAt(0).data); ASSERT_EQ("bar", r3.MetaAt(1).name); ASSERT_EQ("bar_data", *r3.MetaAt(1).data); ASSERT_TRUE(r3.Payload().empty()); ASSERT_FALSE(rr.ReadNext(NULL)); ASSERT_EQ(EAGAIN, rr.last_error()); ASSERT_EQ(sw.str().size(), rr.offset()); } static std::string rand_string(int min_len, int max_len) { const int len = butil::fast_rand_in(min_len, max_len); std::string str; str.reserve(len); for (int i = 0; i < len; ++i) { str.push_back(butil::fast_rand_in('a', 'Z')); } return str; } TEST(RecordIOTest, write_read_random) { StringWriter sw; butil::RecordWriter rw(&sw); const int N = 1024; std::vector<std::pair<std::string, std::string>> name_value_list; size_t nbytes = 0; std::map<int, size_t> breaking_offsets; for (int i = 0; i < N; ++i) { butil::Record src; std::string value = rand_string(10, 20); std::string name = butil::string_printf("name_%d_", i) + value; src.MutableMeta(name)->append(value); ASSERT_EQ(0, rw.Write(src)); if (butil::fast_rand_less_than(70) == 0) { breaking_offsets[i] = nbytes; } else { name_value_list.push_back(std::make_pair(name, value)); } nbytes += src.ByteSize(); } ASSERT_EQ(0, rw.Flush()); std::string str = sw.str(); ASSERT_EQ(nbytes, str.size()); // break some records int break_idx = 0; for (auto it = breaking_offsets.begin(); it != breaking_offsets.end(); ++it) { switch (break_idx++ % 10) { case 0: str[it->second] = 'r'; break; case 1: str[it->second + 1] = 'd'; break; case 2: str[it->second + 2] = 'i'; break; case 3: str[it->second + 3] = 'o'; break; case 4: ++str[it->second + 4]; break; case 5: str[it->second + 4] = 8; break; case 6: ++str[it->second + 5]; break; case 7: ++str[it->second + 6]; break; case 8: ++str[it->second + 7]; break; case 9: ++str[it->second + 8]; break; default: ASSERT_TRUE(false) << "never"; } } ASSERT_EQ((size_t)N - breaking_offsets.size(), name_value_list.size()); std::cout << "sw.size=" << str.size() << " nbreak=" << breaking_offsets.size() << std::endl; StringReader sr(str); ASSERT_LT(0, butil::WriteFile(butil::FilePath("recordio_ref.io"), str.data(), str.size())); butil::RecordReader rr(&sr); size_t j = 0; butil::Record r; for (; rr.ReadNext(&r); ++j) { ASSERT_LT(j, name_value_list.size()); ASSERT_EQ((size_t)1, r.MetaCount()); ASSERT_EQ(name_value_list[j].first, r.MetaAt(0).name) << j; ASSERT_EQ(name_value_list[j].second, *r.MetaAt(0).data); } ASSERT_EQ((int)butil::RecordReader::END_OF_READER, rr.last_error()); ASSERT_EQ(j, name_value_list.size()); ASSERT_LE(str.size() - rr.offset(), 3); } } // namespace