Commit 45222e8d authored by root's avatar root Committed by caidaojin

fix bugs

parent c2d4f76c
...@@ -29,15 +29,36 @@ int CouchbaseRequest::ParseRequest( ...@@ -29,15 +29,36 @@ int CouchbaseRequest::ParseRequest(
return -1; return -1;
} }
_buf.copy_to(&header, sizeof(header)); _buf.copy_to(&header, sizeof(header));
// TODO: need check header.total_body_length const uint16_t key_len = butil::NetToHost16(header.key_length);
if (header.key_length == 0) { if (key_len == 0) {
return 1; return 1;
} }
*command = static_cast<policy::MemcacheBinaryCommand>(header.command); *command = static_cast<policy::MemcacheBinaryCommand>(header.command);
_buf.copy_to(key, header.key_length, sizeof(header) + header.extras_length); _buf.copy_to(key, key_len, sizeof(header) + header.extras_length);
return 0; return 0;
} }
bool CouchbaseRequest::BuildNewWithVBucketId(CouchbaseRequest* request,
const size_t vbucket_id) const {
if (this == request) {
return false;
}
const size_t n = _buf.size();
policy::MemcacheRequestHeader header;
if (n < sizeof(header)) {
return false;
}
_buf.copy_to(&header, sizeof(header));
header.vbucket_id = butil::HostToNet16(vbucket_id);
request->Clear();
if (request->_buf.append(&header, sizeof(header)) != 0) {
return false;
}
_buf.append_to(&request->_buf, n - sizeof(header), sizeof(header));
request->_pipelined_count = _pipelined_count;
return true;
}
bool CouchbaseRequest::ReplicasGet(const butil::StringPiece& key) { bool CouchbaseRequest::ReplicasGet(const butil::StringPiece& key) {
const policy::MemcacheRequestHeader header = { const policy::MemcacheRequestHeader header = {
policy::MC_MAGIC_REQUEST, policy::MC_MAGIC_REQUEST,
......
...@@ -106,6 +106,9 @@ public: ...@@ -106,6 +106,9 @@ public:
int ParseRequest(std::string* key, int ParseRequest(std::string* key,
policy::MemcacheBinaryCommand* command) const; policy::MemcacheBinaryCommand* command) const;
bool BuildNewWithVBucketId(CouchbaseRequest* request,
const size_t vbucket_id) const;
bool ReplicasGet(const butil::StringPiece& key); bool ReplicasGet(const butil::StringPiece& key);
private: private:
......
...@@ -140,8 +140,8 @@ butil::Status VBucketMapReader::OnReadOnePart(const void* data, size_t length) { ...@@ -140,8 +140,8 @@ butil::Status VBucketMapReader::OnReadOnePart(const void* data, size_t length) {
std::string complete = _buf.substr(pos, new_pos); std::string complete = _buf.substr(pos, new_pos);
butil::VBUCKET_CONFIG_HANDLE vb = butil::VBUCKET_CONFIG_HANDLE vb =
butil::vbucket_config_parse_string(complete.c_str()); butil::vbucket_config_parse_string(complete.c_str());
if (vb != nullptr) {
_listener->UpdateVBucketMap(vb); _listener->UpdateVBucketMap(vb);
if (vb != nullptr) {
butil::vbucket_config_destroy(vb); butil::vbucket_config_destroy(vb);
} }
pos = new_pos + kSeparator.size(); pos = new_pos + kSeparator.size();
...@@ -206,8 +206,8 @@ CouchbaseServerListener::~CouchbaseServerListener() { ...@@ -206,8 +206,8 @@ CouchbaseServerListener::~CouchbaseServerListener() {
void CouchbaseServerListener::InitVBucketMap(const std::string& str) { void CouchbaseServerListener::InitVBucketMap(const std::string& str) {
butil::VBUCKET_CONFIG_HANDLE vb = butil::VBUCKET_CONFIG_HANDLE vb =
butil::vbucket_config_parse_string(str.c_str()); butil::vbucket_config_parse_string(str.c_str());
if (vb != nullptr) {
UpdateVBucketMap(vb); UpdateVBucketMap(vb);
if (vb != nullptr) {
butil::vbucket_config_destroy(vb); butil::vbucket_config_destroy(vb);
} }
} }
...@@ -399,7 +399,12 @@ void CouchbaseChannel::CallMethod(const google::protobuf::MethodDescriptor* meth ...@@ -399,7 +399,12 @@ void CouchbaseChannel::CallMethod(const google::protobuf::MethodDescriptor* meth
cntl->SetFailed(ENODATA,"failed to get mapped channel"); cntl->SetFailed(ENODATA,"failed to get mapped channel");
break; break;
} }
channel->CallMethod(nullptr, cntl, request, response, done); CouchbaseRequest new_req;
if (!req->BuildNewWithVBucketId(&new_req, vb_index)) {
cntl->SetFailed("failed to add vbucket id");
break;
}
channel->CallMethod(nullptr, cntl, &new_req, response, done);
} }
while(FLAGS_retry_during_rebalance) { while(FLAGS_retry_during_rebalance) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment