// Copyright (c) 2014 Baidu, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include <algorithm> #include <butil/logging.h> #include <json2pb/pb_to_json.h> #include <json2pb/json_to_pb.h> #include "pb_util.h" #include "json_loader.h" #include <errno.h> namespace brpc { class JsonLoader::Reader { public: explicit Reader(int fd2) : _fd(fd2) , _brace_depth(0) , _quoted(false) , _quote_char(0) {} explicit Reader(const std::string& jsons) : _fd(-1) , _brace_depth(0) , _quoted(false) , _quote_char(0) { _file_buf.append(jsons); } bool get_next_json(butil::IOBuf* json1); bool read_some(); private: int _fd; int _brace_depth; bool _quoted; // quoted by " or ' char _quote_char; butil::IOPortal _file_buf; }; // Load data from the file. bool JsonLoader::Reader::read_some() { if (_fd < 0) { // loading from a string. return false; } ssize_t nr = _file_buf.append_from_file_descriptor(_fd, 65536); if (nr < 0) { if (errno == EINTR) { return read_some(); } PLOG(ERROR) << "Fail to read fd=" << _fd; return false; } else if (nr == 0) { return false; } else { return true; } } // Ignore json only with spaces and newline static bool possibly_valid_json(const butil::IOBuf& json) { butil::IOBufAsZeroCopyInputStream it(json); const void* data = NULL; int size = 0; int total_size = 0; for (; it.Next(&data, &size); total_size += size) { for (int i = 0; i < size; ++i) { char c = ((const char*)data)[i]; if (!isspace(c) && c != '\n') { return true; } } } return false; } // Separate jsons with closed braces. bool JsonLoader::Reader::get_next_json(butil::IOBuf* json1) { if (_file_buf.empty()) { if (!read_some()) { return false; } } json1->clear(); while (1) { butil::IOBufAsZeroCopyInputStream it(_file_buf); const void* data = NULL; int size = 0; int total_size = 0; int skipped = 0; for (; it.Next(&data, &size); total_size += size) { for (int i = 0; i < size; ++i) { const char c = ((const char*)data)[i]; if (_brace_depth == 0) { // skip any character until the first left brace is found. if (c != '{') { ++skipped; continue; } } switch (c) { case '{': if (!_quoted) { ++_brace_depth; } else { VLOG(1) << "Quoted left brace"; } break; case '}': if (!_quoted) { --_brace_depth; if (_brace_depth == 0) { // the braces are closed, a complete object. _file_buf.cutn(json1, total_size + i + 1); json1->pop_front(skipped); return possibly_valid_json(*json1); } else if (_brace_depth < 0) { LOG(ERROR) << "More right braces than left braces"; return false; } } else { VLOG(1) << "Quoted right brace"; } break; case '"': if (_quoted) { if (_quote_char == '"') { _quoted = false; } } else { _quoted = true; _quote_char = '"'; } break; case '\'': if (_quoted) { if (_quote_char == '\'') { _quoted = false; } } else { _quoted = true; _quote_char = '\''; } break; default: break; // just skip } } } if (!_file_buf.empty()) { json1->append(_file_buf); _file_buf.clear(); } if (!read_some()) { json1->pop_front(skipped); if (!json1->empty()) { return possibly_valid_json(*json1); } return false; } } return false; } JsonLoader::JsonLoader(google::protobuf::compiler::Importer* importer, google::protobuf::DynamicMessageFactory* factory, const std::string& service_name, const std::string& method_name) : _importer(importer) , _factory(factory) , _service_name(service_name) , _method_name(method_name) { _request_prototype = pbrpcframework::get_prototype_by_name( _service_name, _method_name, true, _importer, _factory); } void JsonLoader::load_messages( JsonLoader::Reader* ctx, std::deque<google::protobuf::Message*>* out_msgs) { out_msgs->clear(); butil::IOBuf request_json; while (ctx->get_next_json(&request_json)) { VLOG(1) << "Load " << out_msgs->size() + 1 << "-th json=`" << request_json << '\''; std::string error; google::protobuf::Message* request = _request_prototype->New(); butil::IOBufAsZeroCopyInputStream wrapper(request_json); if (!json2pb::JsonToProtoMessage(&wrapper, request, &error)) { LOG(WARNING) << "Fail to convert to pb: " << error << ", json=`" << request_json << '\''; delete request; continue; } out_msgs->push_back(request); LOG_IF(INFO, (out_msgs->size() % 10000) == 0) << "Loaded " << out_msgs->size() << " jsons"; } } void JsonLoader::load_messages( int fd, std::deque<google::protobuf::Message*>* out_msgs) { JsonLoader::Reader ctx(fd); load_messages(&ctx, out_msgs); } void JsonLoader::load_messages( const std::string& jsons, std::deque<google::protobuf::Message*>* out_msgs) { JsonLoader::Reader ctx(jsons); load_messages(&ctx, out_msgs); } } // namespace brpc