Commit d5529fe9 authored by caidaojin's avatar caidaojin

sync with master

parent 423fe173
...@@ -118,11 +118,6 @@ BUTIL_SRCS = [ ...@@ -118,11 +118,6 @@ BUTIL_SRCS = [
"src/butil/third_party/snappy/snappy-stubs-internal.cc", "src/butil/third_party/snappy/snappy-stubs-internal.cc",
"src/butil/third_party/snappy/snappy.cc", "src/butil/third_party/snappy/snappy.cc",
"src/butil/third_party/murmurhash3/murmurhash3.cpp", "src/butil/third_party/murmurhash3/murmurhash3.cpp",
"src/butil/third_party/libvbucket/rfc1321/md5c.c",
"src/butil/third_party/libvbucket/cJSON.c",
"src/butil/third_party/libvbucket/crc32.c",
"src/butil/third_party/libvbucket/ketama.c",
"src/butil/third_party/libvbucket/vbucket.c",
"src/butil/arena.cpp", "src/butil/arena.cpp",
"src/butil/at_exit.cc", "src/butil/at_exit.cc",
"src/butil/atomicops_internals_x86_gcc.cc", "src/butil/atomicops_internals_x86_gcc.cc",
......
...@@ -47,11 +47,6 @@ BUTIL_SOURCES = \ ...@@ -47,11 +47,6 @@ BUTIL_SOURCES = \
src/butil/third_party/snappy/snappy-stubs-internal.cc \ src/butil/third_party/snappy/snappy-stubs-internal.cc \
src/butil/third_party/snappy/snappy.cc \ src/butil/third_party/snappy/snappy.cc \
src/butil/third_party/murmurhash3/murmurhash3.cpp \ src/butil/third_party/murmurhash3/murmurhash3.cpp \
src/butil/third_party/libvbucket/rfc1321/md5c.c \
src/butil/third_party/libvbucket/cJSON.c \
src/butil/third_party/libvbucket/crc32.c \
src/butil/third_party/libvbucket/ketama.c \
src/butil/third_party/libvbucket/vbucket.c \
src/butil/arena.cpp \ src/butil/arena.cpp \
src/butil/at_exit.cc \ src/butil/at_exit.cc \
src/butil/atomicops_internals_x86_gcc.cc \ src/butil/atomicops_internals_x86_gcc.cc \
......
...@@ -124,8 +124,6 @@ struct ChannelOptions { ...@@ -124,8 +124,6 @@ struct ChannelOptions {
class Channel : public ChannelBase { class Channel : public ChannelBase {
friend class Controller; friend class Controller;
friend class SelectiveChannel; friend class SelectiveChannel;
friend class CouchbaseChannel;
friend class CouchbaseServerListener;
public: public:
Channel(ProfilerLinker = ProfilerLinker()); Channel(ProfilerLinker = ProfilerLinker());
~Channel(); ~Channel();
......
...@@ -108,8 +108,6 @@ friend class ThriftStub; ...@@ -108,8 +108,6 @@ friend class ThriftStub;
friend class schan::Sender; friend class schan::Sender;
friend class schan::SubDone; friend class schan::SubDone;
friend class policy::OnServerStreamCreated; friend class policy::OnServerStreamCreated;
friend class CouchbaseChannel;
friend class CouchbaseDone;
friend int StreamCreate(StreamId*, Controller&, const StreamOptions*); friend int StreamCreate(StreamId*, Controller&, const StreamOptions*);
friend int StreamAccept(StreamId*, Controller&, const StreamOptions*); friend int StreamAccept(StreamId*, Controller&, const StreamOptions*);
friend void policy::ProcessMongoRequest(InputMessageBase*); friend void policy::ProcessMongoRequest(InputMessageBase*);
......
// Copyright (c) 2018 Qiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Daojin Cai (caidaojin@qiyi.com)
#include "brpc/couchbase.h"
#include "brpc/policy/memcache_binary_header.h"
#include "butil/string_printf.h"
#include "butil/sys_byteorder.h"
namespace brpc {
int CouchbaseRequest::ParseRequest(
std::string* key, policy::MemcacheBinaryCommand* command) const {
const size_t n = _buf.size();
policy::MemcacheRequestHeader header;
if (n < sizeof(header)) {
return -1;
}
_buf.copy_to(&header, sizeof(header));
const uint16_t key_len = butil::NetToHost16(header.key_length);
if (key_len == 0) {
return 1;
}
*command = static_cast<policy::MemcacheBinaryCommand>(header.command);
_buf.copy_to(key, key_len, sizeof(header) + header.extras_length);
return 0;
}
bool CouchbaseRequest::BuildVBucketId(const size_t vbucket_id,
CouchbaseRequest* request) const {
if (this == request) {
return false;
}
const size_t n = _buf.size();
policy::MemcacheRequestHeader header;
if (n < sizeof(header)) {
return false;
}
_buf.copy_to(&header, sizeof(header));
header.vbucket_id = butil::HostToNet16(vbucket_id);
request->Clear();
if (request->_buf.append(&header, sizeof(header)) != 0) {
return false;
}
_buf.append_to(&request->_buf, n - sizeof(header), sizeof(header));
request->_pipelined_count = _pipelined_count;
request->_read_replicas = _read_replicas;
return true;
}
bool CouchbaseRequest::ReplicasGet(const butil::StringPiece& key,
const size_t vbucket_id) {
const policy::MemcacheRequestHeader header = {
policy::MC_MAGIC_REQUEST,
0x83,
butil::HostToNet16(key.size()),
0,
policy::MC_BINARY_RAW_BYTES,
butil::HostToNet16(vbucket_id),
butil::HostToNet32(key.size()),
0,
0
};
if (_buf.append(&header, sizeof(header))) {
return false;
}
if (_buf.append(key.data(), key.size())) {
return false;
}
_read_replicas = true;
++_pipelined_count;
return true;
}
bool CouchbaseResponse::RecoverOptCodeForReplicasRead() {
const size_t n = _buf.size();
policy::MemcacheResponseHeader header;
if (n < sizeof(header)) {
butil::string_printf(&_err, "buffer is too small to contain a header");
return false;
}
_buf.copy_to(&header, sizeof(header));
if (header.command != (uint8_t)policy::MC_BINARY_REPLICAS_READ) {
butil::string_printf(&_err, "not a replicas get response");
return false;
}
header.command = (uint8_t)policy::MC_BINARY_GET;
CouchbaseResponse response;
if (response._buf.append(&header, sizeof(header))) {
return false;
}
_buf.append_to(&response._buf, n - sizeof(header), sizeof(header));
Swap(&response);
return true;
}
bool CouchbaseResponse::GetStatus(Status* st) {
const size_t n = _buf.size();
policy::MemcacheResponseHeader header;
if (n < sizeof(header)) {
butil::string_printf(&_err, "buffer is too small to contain a header");
return false;
}
_buf.copy_to(&header, sizeof(header));
if (n < sizeof(header) + header.total_body_length) {
butil::string_printf(&_err, "response=%u < header=%u + body=%u",
(unsigned)n, (unsigned)sizeof(header), header.total_body_length);
return false;
}
*st = static_cast<Status>(header.status);
return true;
}
} // namespace brpc
// Copyright (c) 2018 Qiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Daojin Cai (caidaojin@qiyi.com)
#ifndef BRPC_COUCHBASE_H
#define BRPC_COUCHBASE_H
#include "brpc/memcache.h"
#include "brpc/policy/memcache_binary_header.h"
namespace brpc {
// Request to couchbase.
// Do not support pipeline multiple operations in one request and sent now.
// Do not support Flush/Version
class CouchbaseRequest : public MemcacheRequest {
friend class CouchbaseChannel;
friend class VBucketContext;
public:
void Swap(CouchbaseRequest* other) {
MemcacheRequest::Swap(other);
}
bool Get(const butil::StringPiece& key, bool read_replicas = false) {
MemcacheRequest::Clear();
_read_replicas = read_replicas;
return MemcacheRequest::Get(key);
}
bool Set(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Set(key, value, flags, exptime, cas_value);
}
bool Add(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Add(key, value, flags, exptime, cas_value);
}
bool Replace(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Replace(key, value, flags, exptime, cas_value);
}
bool Append(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Append(key, value, flags, exptime, cas_value);
}
bool Prepend(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Prepend(key, value, flags, exptime, cas_value);
}
bool Delete(const butil::StringPiece& key) {
MemcacheRequest::Clear();
return MemcacheRequest::Delete(key);
}
bool Flush(uint32_t timeout) {
MemcacheRequest::Clear();
return MemcacheRequest::Flush(timeout);
}
bool Increment(const butil::StringPiece& key, uint64_t delta,
uint64_t initial_value, uint32_t exptime) {
MemcacheRequest::Clear();
return MemcacheRequest::Increment(key, delta, initial_value, exptime);
}
bool Decrement(const butil::StringPiece& key, uint64_t delta,
uint64_t initial_value, uint32_t exptime) {
MemcacheRequest::Clear();
return MemcacheRequest::Decrement(key, delta, initial_value, exptime);
}
bool Touch(const butil::StringPiece& key, uint32_t exptime) {
MemcacheRequest::Clear();
return MemcacheRequest::Touch(key, exptime);
}
bool Version() {
MemcacheRequest::Clear();
return MemcacheRequest::Version();
}
CouchbaseRequest* New() const { return new CouchbaseRequest;}
void CopyFrom(const CouchbaseRequest& from) {
MemcacheRequest::CopyFrom(from);
_read_replicas = from._read_replicas;
}
private:
int ParseRequest(std::string* key,
policy::MemcacheBinaryCommand* command) const;
bool BuildVBucketId(const size_t vbucket_id,
CouchbaseRequest* request) const;
bool ReplicasGet(const butil::StringPiece& key, const size_t vbucket_id);
void MergeFrom(const CouchbaseRequest& from);
int pipelined_count();
bool read_replicas() const { return _read_replicas; }
bool _read_replicas = false;
};
// Response from couchbase.
class CouchbaseResponse : public MemcacheResponse {
public:
void Swap(CouchbaseResponse* other) {
MemcacheResponse::Swap(other);
}
CouchbaseResponse* New() const { return new CouchbaseResponse;}
void CopyFrom(const CouchbaseResponse& from) {
MemcacheResponse::CopyFrom(from);
}
bool GetStatus(Status* status);
bool RecoverOptCodeForReplicasRead();
private:
void MergeFrom(const CouchbaseResponse& from);
int pipelined_count();
};
} // namespace brpc
#endif // BRPC_COUCHBASE_H
This diff is collapsed.
// Copyright (c) 2018 Iqiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Daojin Cai (caidaojin@qiyi.com)
#ifndef BRPC_COUCHBASE_CHANNEL_H
#define BRPC_COUCHBASE_CHANNEL_H
#include <vector>
#include <unordered_map>
#include "brpc/channel.h"
#include "brpc/couchbase.h"
#include "butil/containers/doubly_buffered_data.h"
namespace brpc {
// It is used to detect the new master server of vbuckets when the lastest
// vbucket mapping has not been received during rebalance.
class DetectedMaster {
public:
DetectedMaster() : _verified(false), _index(-1) {}
butil::atomic<bool> _verified;
butil::atomic<int> _index;
private:
DetectedMaster(const DetectedMaster&) = delete;
DetectedMaster& operator=(const DetectedMaster&) = delete;
};
using CouchbaseChannelMap =
std::unordered_map<std::string, std::unique_ptr<Channel>>;
using DetectedVBucketMap = std::vector<DetectedMaster>;
// Couchbase has two type of distribution used to map keys to servers.
// One is vbucket distribution and other is ketama distribution.
// This struct describes vbucket distribution of couchbase.
// 'num_replicas': the number of copies that will be stored on servers of one
// vbucket. Each vbucket must have this number of servers
// indexes plus one.
// '_vbucket': A zero-based indexed by vBucketId. The entries in the _vbucket
// are arrays of integers, where each integer is a zero-based
// index into the '_servers'.
// '_fvbucket': It is fast forward map with same struct as _vbucket. It is
// used to provide the final vBubcket-to-server map during the
// statrt of the rebalance.
// '_servers': all servers of a bucket.
// '_channel_map': the memcache channel for each server.
// TODO: support ketama vbucket distribution
struct VBucketServerMap {
uint64_t _version = 0;
int _num_replicas = 0;
std::vector<std::vector<int>> _vbucket;
std::vector<std::vector<int>> _fvbucket;
std::vector<std::string> _servers;
CouchbaseChannelMap _channel_map;
};
enum VBucketStatus {
FORWARD_CREATE = 0x00,
FORWARD_FINISH = 0x01,
FORWARD_KEEPING = 0x02,
FORWARD_CHANGE = 0x03,
MASTER_CHANGE_WITHOUT_F = 0x04,
MASTER_KEEPING_WITHOUT_F = 0x05,
NO_CHANGE = 0x06,
};
class CouchbaseServerListener;
class VBucketContext;
// A couchbase channel maps different key to sub memcache channel according to
// current vbuckets mapping. It retrieves current vbuckets mapping by maintain
// an connection for streaming updates from ther couchbase server.
//
// CAUTION:
// ========
// For async rpc, Should not delete this channel until rpc done.
class CouchbaseChannel : public ChannelBase/*non-copyable*/ {
friend class CouchbaseServerListener;
friend class VBucketContext;
friend class CouchbaseDone;
public:
CouchbaseChannel();
~CouchbaseChannel();
// You MUST initialize a couchbasechannel before using it.
// 'Server_addr': address list of couchbase servers. On these addresses, we
// can get vbucket map. Like following: "addr1:port1,addr2:port2"
// 'bucket_name': the bucket name of couchbase server to access.
// 'options': is used for each memcache channel of vbucket. The protocol
// should be PROTOCOL_MEMCACHE. If 'options' is null,
// use default options.
int Init(const char* server_addr, const char* bucket_name,
const ChannelOptions* options);
// 'listen_url': from this url, we can get vbucket map. Usually, it is
// somthing like following:
// "http://host:port/pools/default/bucketsStreaming/bucket_name" or
// "http://host:port/pools/default/buckets/bucket_name"
int Init(const char* listen_url, const ChannelOptions* options);
// TODO: Do not support pipeline mode now.
// Send request to the mapped channel according to the key of request.
void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done);
void Describe(std::ostream& os, const DescribeOptions& options);
private:
int CheckHealth();
Channel* SelectBackupChannel(const VBucketServerMap* vb_map,
const size_t vb_index, const int reason,
VBucketContext* context);
Channel* GetMappedChannel(const std::string* server,
const VBucketServerMap* vb_map);
const VBucketServerMap* vbucket_map();
bool IsNeedRetry(const Controller* cntl, const VBucketContext& context,
CouchbaseResponse* response, int* reason,
std::string* error_text);
bool DoRetry(const int reason, Controller* cntl,
CouchbaseResponse* response, VBucketContext* vb_ct);
int GetDetectedMaster(const VBucketServerMap* vb_map, const size_t vb_index);
void UpdateDetectedMasterIfNeeded(const int reason,
const VBucketContext& context);
bool IsInRebalancing(const VBucketServerMap* vb_map) {
return !vb_map->_fvbucket.empty();
}
const std::string* GetNextRetryServer(
const VBucketStatus change, const int reason,
const VBucketServerMap* vb_map, const size_t vb_index,
VBucketContext* context);
bool UpdateVBucketServerMap(
const int num_replicas,
std::vector<std::vector<int>>& vbucket,
std::vector<std::vector<int>>& fvbucket,
std::vector<std::string>& servers,
const std::vector<std::string>& added_servers,
const std::vector<std::string>& removed_serverss);
static bool Update(VBucketServerMap& vbucket_map,
const ChannelOptions* options,
const int num_replicas,
std::vector<std::vector<int>>& vbucket,
std::vector<std::vector<int>>& fvbucket,
std::vector<std::string>& servers,
const std::vector<std::string>& added_servers,
const std::vector<std::string>& removed_servers);
std::string GetAuthentication() const;
// Options for each memcache channel of real servers.
ChannelOptions _common_options;
// Listener monitor and update vbucket map.
std::unique_ptr<CouchbaseServerListener> _listener;
// We need detect new vbucket map due to current vbucket map is invalid
// during rebalance.
std::unique_ptr<DetectedVBucketMap> _detected_vbucket_map;
butil::DoublyBufferedData<VBucketServerMap> _vbucket_map;
};
} // namespace brpc
#endif // BRPC_COUCHBASE_CHANNEL_H
...@@ -30,7 +30,6 @@ ...@@ -30,7 +30,6 @@
#include "brpc/policy/domain_naming_service.h" #include "brpc/policy/domain_naming_service.h"
#include "brpc/policy/remote_file_naming_service.h" #include "brpc/policy/remote_file_naming_service.h"
#include "brpc/policy/consul_naming_service.h" #include "brpc/policy/consul_naming_service.h"
#include "brpc/policy/couchbase_naming_service.h"
// Load Balancers // Load Balancers
#include "brpc/policy/round_robin_load_balancer.h" #include "brpc/policy/round_robin_load_balancer.h"
...@@ -120,7 +119,6 @@ struct GlobalExtensions { ...@@ -120,7 +119,6 @@ struct GlobalExtensions {
DomainNamingService dns; DomainNamingService dns;
RemoteFileNamingService rfns; RemoteFileNamingService rfns;
ConsulNamingService cns; ConsulNamingService cns;
CouchbaseNamingService cblns;
RoundRobinLoadBalancer rr_lb; RoundRobinLoadBalancer rr_lb;
WeightedRoundRobinLoadBalancer wrr_lb; WeightedRoundRobinLoadBalancer wrr_lb;
...@@ -339,7 +337,6 @@ static void GlobalInitializeOrDieImpl() { ...@@ -339,7 +337,6 @@ static void GlobalInitializeOrDieImpl() {
NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns); NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns); NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns);
NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns); NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns);
NamingServiceExtension()->RegisterOrDie("couchbase_list", &g_ext->cblns);
// Load Balancers // Load Balancers
LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb); LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb);
......
...@@ -451,8 +451,6 @@ const char* MemcacheResponse::status_str(Status st) { ...@@ -451,8 +451,6 @@ const char* MemcacheResponse::status_str(Status st) {
return "Not stored"; return "Not stored";
case STATUS_DELTA_BADVAL: case STATUS_DELTA_BADVAL:
return "Bad delta"; return "Bad delta";
case STATUS_NOT_MY_VBUCKET:
return "Not my vbucket";
case STATUS_AUTH_ERROR: case STATUS_AUTH_ERROR:
return "authentication error"; return "authentication error";
case STATUS_AUTH_CONTINUE: case STATUS_AUTH_CONTINUE:
......
...@@ -111,7 +111,7 @@ public: ...@@ -111,7 +111,7 @@ public:
butil::IOBuf& raw_buffer() { return _buf; } butil::IOBuf& raw_buffer() { return _buf; }
const butil::IOBuf& raw_buffer() const { return _buf; } const butil::IOBuf& raw_buffer() const { return _buf; }
protected: private:
bool GetOrDelete(uint8_t command, const butil::StringPiece& key); bool GetOrDelete(uint8_t command, const butil::StringPiece& key);
bool Counter(uint8_t command, const butil::StringPiece& key, uint64_t delta, bool Counter(uint8_t command, const butil::StringPiece& key, uint64_t delta,
uint64_t initial_value, uint32_t exptime); uint64_t initial_value, uint32_t exptime);
...@@ -172,7 +172,6 @@ public: ...@@ -172,7 +172,6 @@ public:
STATUS_EINVAL = 0x04, STATUS_EINVAL = 0x04,
STATUS_NOT_STORED = 0x05, STATUS_NOT_STORED = 0x05,
STATUS_DELTA_BADVAL = 0x06, STATUS_DELTA_BADVAL = 0x06,
STATUS_NOT_MY_VBUCKET = 0x07,
STATUS_AUTH_ERROR = 0x20, STATUS_AUTH_ERROR = 0x20,
STATUS_AUTH_CONTINUE = 0x21, STATUS_AUTH_CONTINUE = 0x21,
STATUS_UNKNOWN_COMMAND = 0x81, STATUS_UNKNOWN_COMMAND = 0x81,
...@@ -231,7 +230,7 @@ public: ...@@ -231,7 +230,7 @@ public:
static const char* status_str(Status); static const char* status_str(Status);
protected: private:
bool PopCounter(uint8_t command, uint64_t* new_value, uint64_t* cas_value); bool PopCounter(uint8_t command, uint64_t* new_value, uint64_t* cas_value);
bool PopStore(uint8_t command, uint64_t* cas_value); bool PopStore(uint8_t command, uint64_t* cas_value);
......
...@@ -38,9 +38,6 @@ class CouchbaseAuthenticator : public Authenticator { ...@@ -38,9 +38,6 @@ class CouchbaseAuthenticator : public Authenticator {
brpc::AuthContext*) const { brpc::AuthContext*) const {
return 0; return 0;
} }
const std::string& bucket_name() const { return bucket_name_; }
const std::string& bucket_password() const { return bucket_password_; }
private: private:
const std::string bucket_name_; const std::string bucket_name_;
......
// Copyright (c) 2018 Iqiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Cai,Daojin (Caidaojin@qiyi.com)
#include <stdlib.h> // strtol
#include <string> // std::string
#include <set> // std::set
#include "butil/string_splitter.h" // StringSplitter
#include "butil/strings/string_piece.h"
#include "butil/strings/string_split.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/log.h"
#include "brpc/policy/couchbase_naming_service.h"
namespace brpc {
namespace policy {
// Defined in file_naming_service.cpp
bool SplitIntoServerAndTag(const butil::StringPiece& line,
butil::StringPiece* server_addr,
butil::StringPiece* tag);
butil::Mutex CouchbaseNamingService::_mutex;
std::unordered_map<std::string, std::string> CouchbaseNamingService::servers_map;
bool CouchbaseNamingService::ParseListenUrl(
const butil::StringPiece listen_url, std::string* server,
std::string* streaming_uri, std::string* init_uri) {
do {
const size_t pos = listen_url.find("//");
if (pos == listen_url.npos) {
break;
}
const size_t host_pos = listen_url.find('/', pos + 2);
if (host_pos == listen_url.npos) {
break;
}
butil::StringPiece sub_str = listen_url.substr(pos + 2, host_pos - pos - 2);
server->clear();
server->append(sub_str.data(), sub_str.length());
butil::EndPoint point;
if (butil::str2endpoint(server->c_str(), &point) != 0) {
LOG(FATAL) << "Failed to get address and port \'"
<< server << "\'.";
break;
}
butil::StringPiece uri_sub = listen_url;
uri_sub.remove_prefix(host_pos);
size_t uri_pos = uri_sub.find("/bucketsStreaming/");
if (uri_pos != uri_sub.npos) {
streaming_uri->clear();
streaming_uri->append(uri_sub.data(), uri_sub.length());
init_uri->clear();
init_uri->append(uri_sub.data(), uri_pos);
init_uri->append("/buckets/");
butil::StringPiece bucket_name = uri_sub;
bucket_name.remove_prefix(uri_pos + std::strlen("/bucketsStreaming/"));
init_uri->append(bucket_name.data(), bucket_name.length());
return true;
}
uri_pos = uri_sub.find("/buckets/");
if (uri_pos != uri_sub.npos) {
init_uri->clear();
init_uri->append(uri_sub.data(), uri_sub.length());
streaming_uri->clear();
streaming_uri->append(uri_sub.data(), uri_pos);
streaming_uri->append("/bucketsStreaming/");
butil::StringPiece bucket_name = uri_sub;
bucket_name.remove_prefix(uri_pos + std::strlen("/buckets/"));
streaming_uri->append(bucket_name.data(), bucket_name.length());
return true;
}
} while (false);
LOG(FATAL) << "Failed to parse listen url \'" << listen_url << "\'.";
return false;
}
bool CouchbaseNamingService::ParseNamingServiceUrl(const butil::StringPiece ns_url,
std::string* listen_port) {
butil::StringPiece protocol;
std::string server_list;
const size_t pos = ns_url.find("//");
if (pos != ns_url.npos) {
protocol = ns_url.substr(0, pos);
butil::StringPiece sub = ns_url.substr(pos+2);
server_list.append(sub.data(), sub.length());
}
if (protocol != "couchbase_list:" && server_list.empty()) {
LOG(FATAL) << "Invalid couchbase naming service " << ns_url;
return false;
}
std::vector<std::string> server_array;
butil::SplitString(server_list, ',', &server_array);
listen_port->clear();
for (const std::string& addr_port : server_array) {
butil::EndPoint point;
if (butil::str2endpoint(addr_port.c_str(), &point) != 0) {
LOG(FATAL) << "Failed to get endpoint from \'" << addr_port
<< "\' of the naming server url \'" << ns_url << "\'.";
return false;
}
if (listen_port->empty()) {
*listen_port = butil::IntToString(point.port);
}
}
return true;
}
int CouchbaseNamingService::GetServers(const char *service_name,
std::vector<ServerNode>* servers) {
servers->clear();
// Sort/unique the inserted vector is faster, but may have a different order
// of addresses from the file. To make assertions in tests easier, we use
// set to de-duplicate and keep the order.
std::set<ServerNode> presence;
std::string line;
if (!service_name) {
LOG(FATAL) << "Param[service_name] is NULL";
return -1;
}
std::string new_servers(service_name);
{
BAIDU_SCOPED_LOCK(_mutex);
const auto& iter = servers_map.find(new_servers);
if (iter != servers_map.end()) {
new_servers = iter->second;
}
}
RemoveUniqueSuffix(new_servers);
for (butil::StringSplitter sp(new_servers.c_str(), ','); sp != NULL; ++sp) {
line.assign(sp.field(), sp.length());
butil::StringPiece addr;
butil::StringPiece tag;
if (!SplitIntoServerAndTag(line, &addr, &tag)) {
continue;
}
const_cast<char*>(addr.data())[addr.size()] = '\0'; // safe
butil::EndPoint point;
if (str2endpoint(addr.data(), &point) != 0 &&
hostname2endpoint(addr.data(), &point) != 0) {
LOG(ERROR) << "Invalid address=`" << addr << '\'';
continue;
}
ServerNode node;
node.addr = point;
tag.CopyToString(&node.tag);
if (presence.insert(node).second) {
servers->push_back(node);
} else {
RPC_VLOG << "Duplicated server=" << node;
}
}
RPC_VLOG << "Got " << servers->size()
<< (servers->size() > 1 ? " servers" : " server");
return 0;
}
void CouchbaseNamingService::Describe(
std::ostream& os, const DescribeOptions&) const {
os << "Couchbase_list";
return;
}
NamingService* CouchbaseNamingService::New() const {
return new CouchbaseNamingService;
}
void CouchbaseNamingService::Destroy() {
delete this;
}
void CouchbaseNamingService::ResetCouchbaseListenerServers(
const std::string& service_name, std::string& new_servers) {
BAIDU_SCOPED_LOCK(_mutex);
auto iter = servers_map.find(service_name);
if (iter != servers_map.end()) {
iter->second.swap(new_servers);
} else {
servers_map.emplace(service_name, new_servers);
}
}
std::string CouchbaseNamingService::AddUniqueSuffix(
const char* name_url, const char* unique_id) {
std::string couchbase_name_url;
couchbase_name_url.append(name_url);
couchbase_name_url.append(1, '_');
couchbase_name_url.append(unique_id);
return std::move(couchbase_name_url);
}
void CouchbaseNamingService::RemoveUniqueSuffix(std::string& name_service) {
const size_t pos = name_service.find('_');
if (pos != std::string::npos) {
name_service.resize(pos);
}
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2018 Iqiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Cai,Daojin (caidaojin@qiyi.com)
#ifndef BRPC_POLICY_COUCHBASE_NAMING_SERVICE
#define BRPC_POLICY_COUCHBASE_NAMING_SERVICE
#include <unordered_map>
#include "brpc/periodic_naming_service.h"
namespace brpc {
class CouchbaseServerListener;
}
namespace brpc {
namespace policy {
// It is only used for couchbase channel. It updates servers for listen channel
// of CouchbaseServerListener. The naming service format is like
// "couchbase_list://addr1:port,addr:port_****" where "_****" is a unique id for
// each couchbase channel since we can not share naming service and "addr*:port"
// are avalible servers for initializing.
// After initialization, it get the latest server list periodically from
// 'servers_map' by service name as key.
class CouchbaseNamingService : public PeriodicNamingService {
friend brpc::CouchbaseServerListener;
private:
static butil::Mutex _mutex;
// Store the lastest server list for each couchbase channel.
// Key is service name of each couchbase channel and value is the latest
// server list. It is like following:
// key: addr1:port,addr2:port_****
// value: addr1:port,addr2:port,addr3:port
static std::unordered_map<std::string, std::string> servers_map;
int GetServers(const char *service_name,
std::vector<ServerNode>* servers);
static bool ParseNamingServiceUrl(butil::StringPiece ns_url,
std::string* listen_port);
static bool ParseListenUrl(
const butil::StringPiece listen_url, std::string* server_address,
std::string* streaming_uri, std::string* init_uri);
// Clear naming server data when couchbase channel destroyed.
static void ClearNamingServiceData(const std::string& service_name) {
BAIDU_SCOPED_LOCK(_mutex);
servers_map.erase(service_name);
}
// Called by couchbase listener when vbucekt map changing.
// It set new server list for key 'service_name' in servers_map.
static void ResetCouchbaseListenerServers(const std::string& service_name,
std::string& new_servers);
// For couchbase listeners, we should not share this name service object.
// So we append couchbase listener address to make name_url unique.
// Input: couchbase_list://address1:port1,address2:port2
// Output: couchbase_list://address1:port1,address2:port2_****
static std::string AddUniqueSuffix(const char* name_url,
const char* unique_id);
// Reserve handling to AddPrefixBeforeAddress.
void RemoveUniqueSuffix(std::string& name_service);
void Describe(std::ostream& os, const DescribeOptions& options) const;
NamingService* New() const;
void Destroy();
};
} // namespace policy
} // namespace brpc
#endif //BRPC_POLICY_COUCHBASE_NAMING_SERVICE
...@@ -91,10 +91,7 @@ enum MemcacheBinaryCommand { ...@@ -91,10 +91,7 @@ enum MemcacheBinaryCommand {
MC_BINARY_RINCR = 0x39, MC_BINARY_RINCR = 0x39,
MC_BINARY_RINCRQ = 0x3a, MC_BINARY_RINCRQ = 0x3a,
MC_BINARY_RDECR = 0x3b, MC_BINARY_RDECR = 0x3b,
MC_BINARY_RDECRQ = 0x3c, MC_BINARY_RDECRQ = 0x3c
// Replicas read for couchbase
MC_BINARY_REPLICAS_READ = 0x83
// End Range operations // End Range operations
}; };
......
...@@ -64,7 +64,6 @@ static void InitSupportedCommandMap() { ...@@ -64,7 +64,6 @@ static void InitSupportedCommandMap() {
butil::bit_array_set(supported_cmd_map, MC_BINARY_STAT); butil::bit_array_set(supported_cmd_map, MC_BINARY_STAT);
butil::bit_array_set(supported_cmd_map, MC_BINARY_TOUCH); butil::bit_array_set(supported_cmd_map, MC_BINARY_TOUCH);
butil::bit_array_set(supported_cmd_map, MC_BINARY_SASL_AUTH); butil::bit_array_set(supported_cmd_map, MC_BINARY_SASL_AUTH);
butil::bit_array_set(supported_cmd_map, MC_BINARY_REPLICAS_READ);
} }
inline bool IsSupportedCommand(uint8_t command) { inline bool IsSupportedCommand(uint8_t command) {
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment