Commit df243a84 authored by root's avatar root

support rebalance handling

parent 942c8932
......@@ -118,6 +118,7 @@ BUTIL_SRCS = [
"src/butil/third_party/snappy/snappy-stubs-internal.cc",
"src/butil/third_party/snappy/snappy.cc",
"src/butil/third_party/murmurhash3/murmurhash3.cpp",
"src/butil/third_party/libvbucket/rfc1321/md5c.c",
"src/butil/third_party/libvbucket/cJSON.c",
"src/butil/third_party/libvbucket/crc32.c",
"src/butil/third_party/libvbucket/ketama.c",
......
......@@ -207,6 +207,7 @@ set(BUTIL_SOURCES
${CMAKE_SOURCE_DIR}/src/butil/third_party/snappy/snappy-stubs-internal.cc
${CMAKE_SOURCE_DIR}/src/butil/third_party/snappy/snappy.cc
${CMAKE_SOURCE_DIR}/src/butil/third_party/murmurhash3/murmurhash3.cpp
${CMAKE_SOURCE_DIR}/src/butil/third_party/libvbucket/rfc1321/md5c.c
${CMAKE_SOURCE_DIR}/src/butil/third_party/libvbucket/cJSON.c
${CMAKE_SOURCE_DIR}/src/butil/third_party/libvbucket/crc32.c
${CMAKE_SOURCE_DIR}/src/butil/third_party/libvbucket/ketama.c
......
......@@ -47,6 +47,7 @@ BUTIL_SOURCES = \
src/butil/third_party/snappy/snappy-stubs-internal.cc \
src/butil/third_party/snappy/snappy.cc \
src/butil/third_party/murmurhash3/murmurhash3.cpp \
src/butil/third_party/libvbucket/rfc1321/md5c.c \
src/butil/third_party/libvbucket/cJSON.c \
src/butil/third_party/libvbucket/crc32.c \
src/butil/third_party/libvbucket/ketama.c \
......
......@@ -124,6 +124,8 @@ struct ChannelOptions {
class Channel : public ChannelBase {
friend class Controller;
friend class SelectiveChannel;
friend class CouchbaseChannel;
friend class CouchbaseServerListener;
public:
Channel(ProfilerLinker = ProfilerLinker());
~Channel();
......
......@@ -62,8 +62,6 @@ class MongoContext;
class RetryPolicy;
class InputMessageBase;
class ThriftStub;
class CouchbaseChannel;
class CouchbaseDone;
namespace policy {
class OnServerStreamCreated;
void ProcessMongoRequest(InputMessageBase*);
......
......@@ -38,8 +38,8 @@ int CouchbaseRequest::ParseRequest(
return 0;
}
bool CouchbaseRequest::BuildNewWithVBucketId(CouchbaseRequest* request,
const size_t vbucket_id) const {
bool CouchbaseRequest::BuildVBucketId(const size_t vbucket_id,
CouchbaseRequest* request) const {
if (this == request) {
return false;
}
......@@ -56,17 +56,19 @@ bool CouchbaseRequest::BuildNewWithVBucketId(CouchbaseRequest* request,
}
_buf.append_to(&request->_buf, n - sizeof(header), sizeof(header));
request->_pipelined_count = _pipelined_count;
request->_read_replicas = _read_replicas;
return true;
}
bool CouchbaseRequest::ReplicasGet(const butil::StringPiece& key) {
bool CouchbaseRequest::ReplicasGet(const butil::StringPiece& key,
const size_t vbucket_id) {
const policy::MemcacheRequestHeader header = {
policy::MC_MAGIC_REQUEST,
0x83,
butil::HostToNet16(key.size()),
0,
policy::MC_BINARY_RAW_BYTES,
0,
butil::HostToNet16(vbucket_id),
butil::HostToNet32(key.size()),
0,
0
......@@ -77,10 +79,33 @@ bool CouchbaseRequest::ReplicasGet(const butil::StringPiece& key) {
if (_buf.append(key.data(), key.size())) {
return false;
}
_read_replicas = true;
++_pipelined_count;
return true;
}
bool CouchbaseResponse::RecoverOptCodeForReplicasRead() {
const size_t n = _buf.size();
policy::MemcacheResponseHeader header;
if (n < sizeof(header)) {
butil::string_printf(&_err, "buffer is too small to contain a header");
return false;
}
_buf.copy_to(&header, sizeof(header));
if (header.command != (uint8_t)policy::MC_BINARY_REPLICAS_READ) {
butil::string_printf(&_err, "not a replicas get response");
return false;
}
header.command = (uint8_t)policy::MC_BINARY_GET;
CouchbaseResponse response;
if (response._buf.append(&header, sizeof(header))) {
return false;
}
_buf.append_to(&response._buf, n - sizeof(header), sizeof(header));
Swap(&response);
return true;
}
bool CouchbaseResponse::GetStatus(Status* st) {
const size_t n = _buf.size();
policy::MemcacheResponseHeader header;
......
......@@ -24,14 +24,18 @@ namespace brpc {
// Request to couchbase.
// Do not support pipeline multiple operations in one request and sent now.
// Do not support Flush/Version
class CouchbaseRequest : public MemcacheRequest {
friend class CouchbaseChannel;
friend class VBucketContext;
public:
void Swap(CouchbaseRequest* other) {
MemcacheRequest::Swap(other);
}
bool Get(const butil::StringPiece& key) {
bool Get(const butil::StringPiece& key, bool read_replicas = false) {
MemcacheRequest::Clear();
_read_replicas = read_replicas;
return MemcacheRequest::Get(key);
}
......@@ -101,24 +105,28 @@ public:
void CopyFrom(const CouchbaseRequest& from) {
MemcacheRequest::CopyFrom(from);
_read_replicas = from._read_replicas;
}
private:
int ParseRequest(std::string* key,
policy::MemcacheBinaryCommand* command) const;
bool BuildNewWithVBucketId(CouchbaseRequest* request,
const size_t vbucket_id) const;
bool BuildVBucketId(const size_t vbucket_id,
CouchbaseRequest* request) const;
bool ReplicasGet(const butil::StringPiece& key);
bool ReplicasGet(const butil::StringPiece& key, const size_t vbucket_id);
private:
void MergeFrom(const CouchbaseRequest& from);
int pipelined_count();
bool read_replicas() const { return _read_replicas; }
bool _read_replicas = false;
};
// Request to couchbase.
// Do not support pipeline multiple operations in one request and sent now.
// Response from couchbase.
class CouchbaseResponse : public MemcacheResponse {
public:
void Swap(CouchbaseResponse* other) {
......@@ -133,6 +141,8 @@ public:
bool GetStatus(Status* status);
bool RecoverOptCodeForReplicasRead();
private:
void MergeFrom(const CouchbaseResponse& from);
......
......@@ -16,9 +16,13 @@
#include "brpc/couchbase_channel.h"
#include "brpc/policy/couchbase_authenticator.h"
#include "brpc/policy/couchbase_naming_service.h"
#include "brpc/progressive_reader.h"
#include "bthread/bthread.h"
#include "butil/atomicops.h"
#include "butil/base64.h"
#include "butil/string_splitter.h"
#include "butil/strings/string_number_conversions.h"
#include "butil/third_party/libvbucket/hash.h"
#include "butil/third_party/libvbucket/vbucket.h"
......@@ -26,37 +30,80 @@ namespace brpc {
DEFINE_string(couchbase_authorization_http_basic, "",
"Http basic authorization of couchbase");
DEFINE_string(couchbase_bucket_init_string, "",
"If the string is set, 'CouchbaseServerListener' will build vbucket map"
"directly by parsing from this string in initialization");
DEFINE_string(couchbase_bucket_streaming_url,
"/pools/default/bucketsStreaming/",
"Monitor couchbase vbuckets map through this url");
DEFINE_int32(listener_retry_times, 5,
DEFINE_string(couchbase_bucket_name, "",
"couchbase bucket name to access");
DEFINE_int32(couchbase_listen_retry_times, 5,
"Retry times to create couchbase vbucket map monitoring connection."
"Listen thread will sleep a while when reach this times.");
DEFINE_int32(listener_sleep_interval_ms, 100,
DEFINE_int32(couchbase_listen_interval_ms, 1000,
"Listen thread sleep for the number of milliseconds after creating"
"vbucket map monitoring connection failure.");
DEFINE_bool(retry_during_rebalance, true,
"A swith indicating whether to open retry during rebalance");
DEFINE_bool(replicas_read_flag, false,
"Read replicas for get request in case of master node failure."
"This does not ensure that the data is the most current.");
DEFINE_bool(couchbase_disable_retry_during_rebalance, false,
"A swith indicating whether to open retry during rebalance status");
DEFINE_bool(couchbase_disable_retry_during_active, false,
"A swith indicating whether to open retry during active status");
// Define error_code about retry during rebalance.
enum RetryReason {
// No need retry, dummy value.
DEFAULT_DUMMY = 0,
// No need retry, Rpc failed except cases include in SERVER_DOWN.
RPC_FAILED = 1,
// Server is down, need retry other servers during rebalance.
SERVER_DOWN = 2,
// Server is not mapped to the bucket, need retry other servers during rebalance.
RPC_SUCCESS_BUT_WRONG_SERVER = 3,
// Server is mapped to the bucket, retry the same server.
RPC_SUCCESS_BUT_RESPONSE_FAULT = 4,
// No need retry, response is ok.
RESPONSE_OK = 5,
};
enum ServerType {
// Master server choosed.
MASTER_SERVER = 0x00,
// Detected server choosed during rebalance
DETECTED_SERVER = 0x01,
// Replica server choosed for replicas read.
REPLICA_SERVER = 0x02,
};
namespace {
const butil::StringPiece kSeparator("\n\n\n\n", 4);
const std::string kBucketStreamingUrlPrefix("/pools/default/bucketsStreaming/");
const std::string kBucketUrlPrefix("/pools/default/buckets/");
// The maximum number of vbuckets that couchbase support
const size_t kCouchbaseMaxVBuckets = 65536;
}
class CouchbaseServerListener;
class LoadBalancerWithNaming;
class VBucketContext;
enum RetryReason {
RPC_FAILED = 0,
WRONG_SERVER = 1,
RESPONSE_FAULT = 2,
};
// Get master server address(addr:port) of a vbucket.
// Return pointer to server if found, otherwise nullptr.
// If 'index' is not null, set server index to it.
const std::string* GetMaster(
const VBucketServerMap* vb_map, const size_t vb_index, int* index = nullptr);
// Get forward master server address(addr:port) of a vbucket.
// Return pointer to server if found, otherwise nullptr.
// If 'index' is not null, set index of found server to it.
const std::string* GetForwardMaster(
const VBucketServerMap* vb_map, const size_t vb_index, int* index = nullptr);
// Get replicas server address(addr:port) of a vbucket.
// Return pointer to server if found, otherwise nullptr.
// 'offset': zero-based index of vbucket. 0 indicating the first replica server.
// 'from_fvb': Get replica from fvbucket if true, otherwise get from vbucket.
const std::string* GetReplica(const VBucketServerMap* vb_map,
const size_t vb_index);
// Get vbucket id of key belonged to.
size_t Hash(const butil::StringPiece& key, const size_t vbuckets_num);
class CouchbaseServerListener;
class VBucketMapReader : public ProgressiveReader {
public:
......@@ -84,16 +131,28 @@ public:
butil::Mutex _mutex;
};
// TODO: Inherit from SharedObject. Couchbase channels to connect same server
// can share the same listener.
class CouchbaseServerListener {
public:
CouchbaseServerListener(const char* server_addr, CouchbaseChannel* channel)
: _server_addr(server_addr),
_url(FLAGS_couchbase_bucket_streaming_url),
CouchbaseServerListener(const char* server_list, const char* bucket_name,
CouchbaseChannel* channel)
: _streaming_url(kBucketStreamingUrlPrefix + bucket_name),
_cb_channel(channel),
_reader(new VBucketMapReader(this)) {
Init();
Init(server_list, kBucketUrlPrefix + bucket_name);
}
CouchbaseServerListener(const char* listen_url, CouchbaseChannel* channel)
: _cb_channel(channel),
_reader(new VBucketMapReader(this)) {
std::string init_url;
std::string server;
if (!policy::CouchbaseNamingService::ParseListenUrl(
listen_url, &server, &_streaming_url, &init_url)) {
return;
}
Init(server.c_str(), init_url);
}
~CouchbaseServerListener();
void UpdateVBucketMap(butil::VBUCKET_CONFIG_HANDLE vbucket);
......@@ -104,21 +163,21 @@ private:
CouchbaseServerListener(const CouchbaseServerListener&) = delete;
CouchbaseServerListener& operator=(const CouchbaseServerListener&) = delete;
void Init();
void Init(const char* server_list, const std::string& init_url);
void InitVBucketMap(const std::string& str);
bool InitVBucketMap(const std::string& url);
static void* ListenThread(void* arg);
bthread_t _listen_bth;
// Server address list of couchbase servers. From these servers(host:port),
// we can monitor vbucket map.
const std::string _server_addr;
// REST/JSON url to monitor vbucket map.
const std::string _url;
std::string _streaming_url;
std::string _auth;
std::string _listen_port;
std::string _service_name;
CouchbaseChannel* _cb_channel;
// Monitor couchbase vbuckets map on this channel.
// TODO: Add/removed server due to rebalance/failover.
Channel _listen_channel;
// If _reader is not attached to listen socket, it will be released in
// CouchbaseServerListener desconstruction. Otherwise, it will be released
......@@ -137,11 +196,11 @@ butil::Status VBucketMapReader::OnReadOnePart(const void* data, size_t length) {
size_t pos = 0;
size_t new_pos = _buf.find(kSeparator.data(), pos, kSeparator.size());
while (new_pos != std::string::npos) {
std::string complete = _buf.substr(pos, new_pos);
std::string complete = _buf.substr(pos, new_pos - pos);
butil::VBUCKET_CONFIG_HANDLE vb =
butil::vbucket_config_parse_string(complete.c_str());
_listener->UpdateVBucketMap(vb);
if (vb != nullptr) {
_listener->UpdateVBucketMap(vb);
butil::vbucket_config_destroy(vb);
}
pos = new_pos + kSeparator.size();
......@@ -166,11 +225,26 @@ void VBucketMapReader::OnEndOfMessage(const butil::Status& status) {
return;
}
}
// If '_listener' is desconstructed, release this object.
std::unique_ptr<VBucketMapReader> release(this);
}
void CouchbaseServerListener::Init() {
CouchbaseServerListener::~CouchbaseServerListener() {
std::unique_lock<butil::Mutex> mu(_reader->_mutex);
bthread_stop(_listen_bth);
bthread_join(_listen_bth, nullptr);
if (!_reader->IsAttached()) {
mu.unlock();
std::unique_ptr<VBucketMapReader> p(_reader);
} else {
_reader->Destroy();
}
policy::CouchbaseNamingService::ClearNamingServiceData(_service_name);
}
void CouchbaseServerListener::Init(const char* server_list,
const std::string& init_url) {
if (!FLAGS_couchbase_authorization_http_basic.empty()) {
butil::Base64Encode(FLAGS_couchbase_authorization_http_basic, &_auth);
_auth = "Basic " + _auth;
......@@ -183,33 +257,52 @@ void CouchbaseServerListener::Init() {
}
ChannelOptions options;
options.protocol = PROTOCOL_HTTP;
CHECK(_listen_channel.Init(_server_addr.c_str(), "rr", &options) == 0)
std::string ns_servers;
butil::StringPiece servers(server_list);
if (servers.find("//") == servers.npos) {
ns_servers.append("couchbase_list://");
}
ns_servers.append(server_list);
if (policy::CouchbaseNamingService::ParseNamingServiceUrl(
ns_servers, &_listen_port)) {
std::string unique_id = "_" + butil::Uint64ToString(reinterpret_cast<uint64_t>(this));
ns_servers += unique_id;
_service_name = server_list + unique_id;
CHECK(_listen_channel.Init(ns_servers.c_str(), "rr", &options) == 0)
<< "Failed to init listen channel.";
if (!FLAGS_couchbase_bucket_init_string.empty()) {
InitVBucketMap(FLAGS_couchbase_bucket_init_string);
} else {
LOG(FATAL) << "Failed to init couchbase listener.";
return;
}
if (!InitVBucketMap(init_url)) {
LOG(ERROR) << "Failed to init vbucket map.";
}
CreateListener();
}
CouchbaseServerListener::~CouchbaseServerListener() {
std::unique_lock<butil::Mutex> mu(_reader->_mutex);
bthread_stop(_listen_bth);
bthread_join(_listen_bth, nullptr);
if (!_reader->IsAttached()) {
mu.unlock();
std::unique_ptr<VBucketMapReader> p(_reader);
} else {
_reader->Destroy();
bool CouchbaseServerListener::InitVBucketMap(const std::string& uri) {
Controller cntl;
for (int i = 0; i != FLAGS_couchbase_listen_retry_times; ++i) {
cntl.Reset();
if (!_auth.empty()) {
cntl.http_request().SetHeader("Authorization", _auth);
}
}
void CouchbaseServerListener::InitVBucketMap(const std::string& str) {
cntl.http_request().uri() = uri;
_listen_channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
if (cntl.Failed()) {
LOG(ERROR) << "Failed to get vbucket map: " << cntl.ErrorText();
continue;
}
std::string str = cntl.response_attachment().to_string();
butil::VBUCKET_CONFIG_HANDLE vb =
butil::vbucket_config_parse_string(str.c_str());
UpdateVBucketMap(vb);
if (vb != nullptr) {
UpdateVBucketMap(vb);
butil::vbucket_config_destroy(vb);
return true;
}
}
return false;
}
void* CouchbaseServerListener::ListenThread(void* arg) {
......@@ -219,11 +312,11 @@ void* CouchbaseServerListener::ListenThread(void* arg) {
listener->_reader->Detach();
Controller cntl;
int i = 0;
for (; i != FLAGS_listener_retry_times; ++i) {
for (; i != FLAGS_couchbase_listen_retry_times; ++i) {
if (!listener->_auth.empty()) {
cntl.http_request().SetHeader("Authorization", listener->_auth);
}
cntl.http_request().uri() = listener->_url;
cntl.http_request().uri() = listener->_streaming_url;
cntl.response_will_be_read_progressively();
listener->_listen_channel.CallMethod(nullptr, &cntl,
nullptr, nullptr, nullptr);
......@@ -236,8 +329,8 @@ void* CouchbaseServerListener::ListenThread(void* arg) {
break;
}
if (i == FLAGS_listener_retry_times) {
if (bthread_usleep(FLAGS_listener_sleep_interval_ms * 1000) < 0) {
if (i == FLAGS_couchbase_listen_retry_times) {
if (bthread_usleep(FLAGS_couchbase_listen_interval_ms * 1000) < 0) {
if (errno == ESTOP) {
LOG(INFO) << "ListenThread is stopped.";
break;
......@@ -271,50 +364,197 @@ void CouchbaseServerListener::UpdateVBucketMap(
// TODO: ketama distribution
if (butil::vbucket_config_get_distribution_type(vb_conf)
== butil::VBUCKET_DISTRIBUTION_KETAMA) {
LOG(FATAL) << "Not support ketama distribution.";
!= butil::VBUCKET_DISTRIBUTION_VBUCKET) {
LOG(FATAL) << "Only support vbucket distribution.";
return;
}
const CouchbaseChannelMap& channel_map = _cb_channel->GetChannelMap();
int vb_num = butil::vbucket_config_get_num_vbuckets(vb_conf);
int replicas_num = butil::vbucket_config_get_num_replicas(vb_conf);
int server_num = butil::vbucket_config_get_num_servers(vb_conf);
const VBucketServerMap* vb_map = _cb_channel->vbucket_map();
const size_t vb_num = butil::vbucket_config_get_num_vbuckets(vb_conf);
const size_t replicas_num = butil::vbucket_config_get_num_replicas(vb_conf);
const size_t server_num = butil::vbucket_config_get_num_servers(vb_conf);
std::vector<std::vector<int>> vbuckets(vb_num);
std::vector<std::vector<int>> fvbuckets;
std::vector<std::string> servers(server_num);
std::vector<std::string> added_servers;
std::vector<std::string> removed_servers;
for (int i = 0; i != vb_num; ++i) {
if (butil::vbucket_config_has_forward_vbuckets(vb_conf)) {
fvbuckets.resize(vb_num);
}
for (size_t i = 0; i != vb_num; ++i) {
if (butil::vbucket_config_has_forward_vbuckets(vb_conf)) {
fvbuckets[i].resize(replicas_num + 1, -1);
}
vbuckets[i].resize(replicas_num + 1, -1);
vbuckets[i][0] = butil::vbucket_get_master(vb_conf, i);
for (int j = 1; j <= replicas_num; ++j) {
vbuckets[i][j] = butil::vbucket_get_replica(vb_conf, i, j);
if (butil::vbucket_config_has_forward_vbuckets(vb_conf)) {
fvbuckets[i][0] = butil::fvbucket_get_master(vb_conf, i);
}
for (size_t j = 0; j < replicas_num; ++j) {
vbuckets[i][j+1] = butil::vbucket_get_replica(vb_conf, i, j);
if (butil::vbucket_config_has_forward_vbuckets(vb_conf)) {
fvbuckets[i][j+1] = butil::fvbucket_get_replica(vb_conf, i, j);
}
}
for (int i = 0; i != server_num; ++i) {
}
std::vector<size_t> keeping_servers;
for (size_t i = 0; i != server_num; ++i) {
servers[i] = butil::vbucket_config_get_server(vb_conf, i);
const auto iter = channel_map.find(servers[i]);
if (iter == channel_map.end()) {
const auto iter = vb_map->_channel_map.find(servers[i]);
if (iter == vb_map->_channel_map.end()) {
added_servers.emplace_back(servers[i]);
} else {
keeping_servers.emplace_back(i);
}
}
for (size_t i = 0; i != vb_map->_servers.size(); ++i) {
size_t j = 0;
for (; j != keeping_servers.size(); ++j) {
if (vb_map->_servers[i] == servers[keeping_servers[j]]) {
break;
}
}
if (j == keeping_servers.size()) {
removed_servers.emplace_back(vb_map->_servers[i]);
}
}
// Reset new server list of listen channel.
if (!added_servers.empty() || !removed_servers.empty()) {
std::string server_list;
for (const auto& server : servers) {
const size_t pos = server.find(':');
server_list.append(server.data(), pos);
server_list += ":" + _listen_port + ",";
}
server_list.pop_back();
policy::CouchbaseNamingService::ResetCouchbaseListenerServers(
_service_name, server_list);
}
bool curr_rebalance = _cb_channel->IsInRebalancing(vb_map);
bool update_rebalance = !fvbuckets.empty();
uint64_t version = vb_map->_version;
_cb_channel->UpdateVBucketServerMap(replicas_num, vbuckets, fvbuckets,
servers, added_servers, removed_servers);
if (!curr_rebalance && update_rebalance) {
LOG(ERROR) << "Couchbase enters into rebalance status from version "
<< ++version;
}
if (curr_rebalance && !update_rebalance) {
DetectedVBucketMap& detect = *_cb_channel->_detected_vbucket_map;
for (size_t vb_index = 0; vb_index != vb_num; ++vb_index) {
detect[vb_index]._verified.store(false, butil::memory_order_relaxed);
detect[vb_index]._index.store(-1, butil::memory_order_relaxed);
}
LOG(ERROR) << "Couchbase quit rebalance status from version "
<< ++version;
}
}
class VBucketContext {
public:
VBucketContext() = default;
~VBucketContext() = default;
bool Init(const VBucketServerMap* vb_map, const size_t vb_index,
const int server_type, const int server_index,
const CouchbaseRequest* request, const std::string& key,
const policy::MemcacheBinaryCommand command);
VBucketStatus Update(const VBucketServerMap* vb_map,
const size_t vb_index);
const CouchbaseRequest* GetReplicasReadRequest();
public:
size_t _retried_count = 0;
uint64_t _version = 0;
int _server_type = 0;
int _server_type = MASTER_SERVER;
int _server_index = 0;
size_t _vbucket_index;
policy::MemcacheBinaryCommand _command;
std::string _forward_master;
std::string _master;
std::string _key;
policy::MemcacheBinaryCommand _command;
CouchbaseRequest _request;
CouchbaseRequest _replicas_req;
CouchbaseRequest _replica_request;
};
bool VBucketContext::Init(const VBucketServerMap* vb_map,
const size_t vb_index,
const int server_type,
const int server_index,
const CouchbaseRequest* request,
const std::string& key,
const policy::MemcacheBinaryCommand command) {
if (vb_map->_version == 0) {
return false;
}
_version = vb_map->_version;
_vbucket_index = vb_index;
_server_type = server_type;
_server_index = server_index;
_command = command;
_key = key;
const std::string* fm = GetForwardMaster(vb_map, vb_index);
if (fm != nullptr) {
_forward_master = *fm;
}
const std::string* master = GetMaster(vb_map, vb_index);
_master = *master;
if (!request->BuildVBucketId(vb_index, &_request)) {
return false;
}
return true;
}
VBucketStatus VBucketContext::Update(
const VBucketServerMap* vb_map, const size_t vb_index) {
VBucketStatus change = NO_CHANGE;
if (_version == vb_map->_version) {
change = NO_CHANGE;
return change;
}
_version = vb_map->_version;
const std::string* fm = GetForwardMaster(vb_map, vb_index);
const std::string* master = GetMaster(vb_map, vb_index);
if (_forward_master.empty()) {
if (fm == nullptr) {
if (_master == *master) {
change = MASTER_KEEPING_WITHOUT_F;
} else {
change = MASTER_CHANGE_WITHOUT_F;
}
} else {
change = FORWARD_CREATE;
}
} else {
if (fm == nullptr) {
change = FORWARD_FINISH;
} else {
if (_forward_master == *fm) {
change = FORWARD_KEEPING;
} else {
change = FORWARD_CHANGE;
}
}
}
if (fm != nullptr) {
_forward_master = *fm;
}
_master = *master;
return change;
}
const CouchbaseRequest* VBucketContext::GetReplicasReadRequest() {
if (!_replica_request.IsInitialized()) {
_replica_request.ReplicasGet(_key, _vbucket_index);
}
return &_replica_request;
}
class CouchbaseDone : public google::protobuf::Closure {
friend class CouchbaseChannel;
public:
CouchbaseDone(CouchbaseChannel* cb_channel, brpc::Controller* cntl,
CouchbaseResponse* response, google::protobuf::Closure* done)
......@@ -333,11 +573,66 @@ private:
void CouchbaseDone::Run() {
std::unique_ptr<CouchbaseDone> self_guard(this);
while(FLAGS_retry_during_rebalance) {
//TODO: retry in case of rebalance/failover.
ClosureGuard done_guard(_done);
if (FLAGS_couchbase_disable_retry_during_rebalance) {
return;
}
int reason = 0;
std::string error_text;
bool retry = _cb_channel->IsNeedRetry(_cntl, _vb_context,
_response, &reason, &error_text);
_cb_channel->UpdateDetectedMasterIfNeeded(reason, _vb_context);
if (!retry) {
return;
}
int64_t remain_ms = _cntl->timeout_ms() - _cntl->latency_us() / 1000;
if (remain_ms <= 0) {
_cntl->SetFailed(ERPCTIMEDOUT, "reach timeout, finish retry");
return;
}
if (reason != SERVER_DOWN) {
_cntl->SetFailed(error_text);
}
Controller retry_cntl;
retry_cntl.set_timeout_ms(remain_ms);
// TODO: Inherit other fields except of timeout_ms of _cntl.
retry_cntl.set_log_id(_cntl->log_id());
retry_cntl.set_max_retry(0);
while (true) {
// TODO: _cntl cancel
if (!_cb_channel->DoRetry(reason, &retry_cntl,
_response, &_vb_context)) {
break;
}
reason = 0;
retry = _cb_channel->IsNeedRetry(&retry_cntl, _vb_context,
_response, &reason, &error_text);
_cb_channel->UpdateDetectedMasterIfNeeded(reason, _vb_context);
if (!retry) {
break;
}
remain_ms = retry_cntl.timeout_ms() - retry_cntl.latency_us() / 1000;
if (remain_ms <= 0) {
retry_cntl.SetFailed(ERPCTIMEDOUT, "reach timeout, finish retry");
break;
}
_done->Run();
_cntl->SetFailed(error_text);
retry_cntl.Reset();
retry_cntl.set_timeout_ms(remain_ms);
// TODO: Inherit other fields except of timeout_ms of _cntl.
retry_cntl.set_log_id(_cntl->log_id());
retry_cntl.set_max_retry(0);
}
if (_vb_context._server_type == REPLICA_SERVER
&& retry_cntl.ErrorCode() == 0) {
_response->RecoverOptCodeForReplicasRead();
}
// Fetch result from retry_cntl to _cntl. They share the same response.
if (retry_cntl.Failed()) {
_cntl->SetFailed(retry_cntl.ErrorText());
}
_cntl->_error_code = retry_cntl.ErrorCode();
_cntl->OnRPCEnd(butil::gettimeofday_us());
}
CouchbaseChannel::CouchbaseChannel() {}
......@@ -346,7 +641,29 @@ CouchbaseChannel::~CouchbaseChannel() {
_listener.reset(nullptr);
}
int CouchbaseChannel::Init(const char* server_addr,
int CouchbaseChannel::Init(const char* listen_url, const ChannelOptions* options) {
if (options != nullptr) {
if (options->protocol != PROTOCOL_UNKNOWN &&
options->protocol != PROTOCOL_MEMCACHE) {
LOG(FATAL) << "Failed to init channel due to invalid protocol "
<< options->protocol.name() << '.';
return -1;
}
_common_options = *options;
}
_common_options.protocol = PROTOCOL_MEMCACHE;
_detected_vbucket_map.reset(
new std::vector<DetectedMaster>(kCouchbaseMaxVBuckets));
auto ptr = new CouchbaseServerListener(listen_url, this);
if (ptr == nullptr) {
LOG(FATAL) << "Failed to init CouchbaseChannel to " << listen_url << '.';
return -1;
}
_listener.reset(ptr);
return 0;
}
int CouchbaseChannel::Init(const char* server_addr, const char* bucket_name,
const ChannelOptions* options) {
if (options != nullptr) {
if (options->protocol != PROTOCOL_UNKNOWN &&
......@@ -358,7 +675,9 @@ int CouchbaseChannel::Init(const char* server_addr,
_common_options = *options;
}
_common_options.protocol = PROTOCOL_MEMCACHE;
auto ptr = new CouchbaseServerListener(server_addr, this);
_detected_vbucket_map.reset(
new std::vector<DetectedMaster>(kCouchbaseMaxVBuckets));
auto ptr = new CouchbaseServerListener(server_addr, bucket_name, this);
if (ptr == nullptr) {
LOG(FATAL) << "Failed to init CouchbaseChannel to " << server_addr << '.';
return -1;
......@@ -378,11 +697,11 @@ void CouchbaseChannel::CallMethod(const google::protobuf::MethodDescriptor* meth
ClosureGuard done_guard(done);
std::string key;
policy::MemcacheBinaryCommand command;
// Do not support Flush/Version
if (req->ParseRequest(&key, &command) != 0) {
cntl->SetFailed("failed to parse key and command from request");
return;
}
const CallId call_id = cntl->call_id();
{
butil::DoublyBufferedData<VBucketServerMap>::ScopedPtr vb_map;
if(_vbucket_map.Read(&vb_map) != 0) {
......@@ -393,34 +712,65 @@ void CouchbaseChannel::CallMethod(const google::protobuf::MethodDescriptor* meth
cntl->SetFailed(ENODATA, "vbucket map is not initialize");
return;
}
ServerType type = MASTER_SERVER;
int index = -1;
const size_t vb_index = Hash(key, vb_map->_vbucket.size());
channel = SelectMasterChannel(vb_map.get(), vb_index);
if (!IsInRebalancing(vb_map.get()) ||
FLAGS_couchbase_disable_retry_during_rebalance) {
const std::string* server = GetMaster(vb_map.get(), vb_index, &index);
channel = GetMappedChannel(server, vb_map.get());
} else {
// Close the default retry policy. CouchbaeChannel decide to how to retry.
cntl->set_max_retry(0);
index = GetDetectedMaster(vb_map.get(), vb_index);
if (index >= 0) {
type = DETECTED_SERVER;
channel = GetMappedChannel(&vb_map->_servers[index], vb_map.get());
} else {
const std::string* server = GetMaster(vb_map.get(), vb_index, &index);
channel = GetMappedChannel(server, vb_map.get());
}
}
if (channel == nullptr) {
cntl->SetFailed(ENODATA,"failed to get mapped channel");
return;
}
CouchbaseRequest new_req;
if (!req->BuildNewWithVBucketId(&new_req, vb_index)) {
cntl->SetFailed("failed to add vbucket id");
CouchbaseDone* cb_done = new CouchbaseDone(
this, cntl, static_cast<CouchbaseResponse*>(response), done);
if (!cb_done->_vb_context.Init(vb_map.get(), vb_index, type,
index, req, key, command)) {
cntl->SetFailed(ENOMEM, "failed to init couchbase context");
return;
}
done_guard.release();
channel->CallMethod(nullptr, cntl, &new_req, response, done);
channel->CallMethod(nullptr, cntl, &cb_done->_vb_context._request,
response, cb_done);
}
if (done == nullptr) {
Join(call_id);
}
return;
}
Channel* CouchbaseChannel::SelectMasterChannel(
const VBucketServerMap* vb_map, const size_t vb_index) {
return GetMappedChannel(GetMaster(vb_map, vb_index), vb_map);
Channel* CouchbaseChannel::SelectBackupChannel(
const VBucketServerMap* vb_map, const size_t vb_index,
const int reason, VBucketContext* context) {
VBucketStatus change = VBucketStatus::NO_CHANGE;
if (vb_map->_version != context->_version) {
change = context->Update(vb_map, vb_index);
}
const std::string* server = GetNextRetryServer(change, reason, vb_map,
vb_index, context);
return server ? GetMappedChannel(server, vb_map) : nullptr;
}
const CouchbaseChannelMap& CouchbaseChannel::GetChannelMap() {
const VBucketServerMap* CouchbaseChannel::vbucket_map() {
butil::DoublyBufferedData<VBucketServerMap>::ScopedPtr vbucket_map;
if(_vbucket_map.Read(&vbucket_map) != 0) {
LOG(FATAL) << "Failed to read vbucket map.";
LOG(ERROR) << "Failed to read vbucket map.";
return nullptr;
}
return vbucket_map->_channel_map;
return vbucket_map.get();
}
Channel* CouchbaseChannel::GetMappedChannel(const std::string* server,
......@@ -435,24 +785,219 @@ Channel* CouchbaseChannel::GetMappedChannel(const std::string* server,
return nullptr;
}
const std::string* CouchbaseChannel::GetMaster(
const VBucketServerMap* vb_map, const size_t vb_index, int* index) {
if (vb_index < vb_map->_vbucket.size()) {
const int i = vb_map->_vbucket[vb_index][0];
if (i >= 0 && i < static_cast<int>(vb_map->_servers.size())) {
if (index != nullptr) {
*index = i;
bool CouchbaseChannel::IsNeedRetry(
const Controller* cntl, const VBucketContext& context,
CouchbaseResponse* response, int* reason, std::string* error_text) {
*reason = DEFAULT_DUMMY;
error_text->clear();
const int error_code = cntl->ErrorCode();
if (error_code != 0) {
if (error_code == EHOSTDOWN || error_code == ELOGOFF ||
error_code == EFAILEDSOCKET || error_code == EEOF ||
error_code == ECLOSE || error_code == ECONNRESET) {
*reason = SERVER_DOWN;
error_text->append(cntl->ErrorText());
error_text->append(";");
} else {
*reason = RPC_FAILED;
}
} else {
CouchbaseResponse::Status status = CouchbaseResponse::STATUS_SUCCESS;
const size_t vb_index = context._vbucket_index;
if (response->GetStatus(&status)) {
if (status != CouchbaseResponse::STATUS_SUCCESS) {
*reason = status == CouchbaseResponse::STATUS_NOT_MY_VBUCKET
? RPC_SUCCESS_BUT_WRONG_SERVER
: RPC_SUCCESS_BUT_RESPONSE_FAULT;
error_text->append(CouchbaseResponse::status_str(status));
error_text->append(
"(vbucket_id=" + butil::IntToString(vb_index) + ") latency="
+ butil::Int64ToString(cntl->latency_us()) + "us @");
error_text->append(butil::endpoint2str(cntl->remote_side()).c_str());
error_text->append(";");
} else {
*reason = RESPONSE_OK;
}
}
}
if (IsInRebalancing(vbucket_map())) {
return *reason == SERVER_DOWN ||
*reason == RPC_SUCCESS_BUT_WRONG_SERVER ||
*reason == RPC_SUCCESS_BUT_RESPONSE_FAULT;
} else if(!FLAGS_couchbase_disable_retry_during_active) {
return *reason == RPC_SUCCESS_BUT_WRONG_SERVER ||
(*reason == SERVER_DOWN && context._request.read_replicas());
}
return false;
}
bool CouchbaseChannel::DoRetry(const int reason, Controller* cntl,
CouchbaseResponse* response, VBucketContext* vb_ctx) {
{
butil::DoublyBufferedData<VBucketServerMap>::ScopedPtr vb_map;
if(_vbucket_map.Read(&vb_map) != 0) {
cntl->SetFailed(ENOMEM, "failed to read vbucket map");
return false;
}
if (++(vb_ctx->_retried_count) >= vb_map->_servers.size()) {
cntl->SetFailed("Reach the max couchbase retry count");
return false;
}
const size_t vb_index = vb_ctx->_vbucket_index;
Channel* channel = SelectBackupChannel(vb_map.get(), vb_index,
reason, vb_ctx);
if (channel == nullptr) {
cntl->SetFailed(ENODATA, "no buckup server found");
return false;
}
const CouchbaseRequest* request = &(vb_ctx->_request);
if (vb_ctx->_server_type == REPLICA_SERVER) {
request = vb_ctx->GetReplicasReadRequest();
}
response->Clear();
channel->CallMethod(nullptr, cntl, request, response, DoNothing());
}
Join(cntl->call_id());
return true;
}
const std::string* CouchbaseChannel::GetNextRetryServer(
const VBucketStatus change, const int reason, const VBucketServerMap* vb_map,
const size_t vb_index, VBucketContext* context) {
int curr_index = context->_server_index;
const int server_num = vb_map->_servers.size();
if (IsInRebalancing(vb_map)) {
// keep current server to retry if it is right server of the vbucket.
if (reason != RPC_SUCCESS_BUT_WRONG_SERVER
&& reason != SERVER_DOWN) {
if (curr_index < server_num) {
return &(vb_map->_servers[curr_index]);
}
}
int next_index = GetDetectedMaster(vb_map, vb_index);
if(next_index >= 0) {
context->_server_type = DETECTED_SERVER;
context->_server_index = next_index;
return &(vb_map->_servers[next_index]);
}
int dummy_index = -1;
// Retry forward master as first if having forward master. Otherwise,
// probe other servers.
if(!GetForwardMaster(vb_map, vb_index, &next_index)) {
next_index = (curr_index + 1) % server_num;
}
(*_detected_vbucket_map)[vb_index]._index.compare_exchange_strong(
dummy_index, next_index, butil::memory_order_release);
context->_server_type = DETECTED_SERVER;
context->_server_index = next_index;
return &(vb_map->_servers[next_index]);
} else {
if (change == FORWARD_FINISH || change == MASTER_CHANGE_WITHOUT_F) {
context->_server_type = MASTER_SERVER;
return GetMaster(vb_map, vb_index, &context->_server_index);
} else {
if (reason == SERVER_DOWN && context->_request.read_replicas()) {
context->_server_type = REPLICA_SERVER;
return GetReplica(vb_map, vb_index);
}
if (reason == RPC_SUCCESS_BUT_WRONG_SERVER) {
context->_server_type = DETECTED_SERVER;
context->_server_index = (curr_index + 1) % server_num;
// TODO: need update detect server.
return &(vb_map->_servers[context->_server_index]);
}
return &vb_map->_servers[i];
}
}
return nullptr;
}
size_t CouchbaseChannel::Hash(const butil::StringPiece& key,
const size_t vbuckets_num) {
size_t digest = butil::hash_crc32(key.data(), key.size());
return digest & (vbuckets_num - 1);
void CouchbaseChannel::UpdateDetectedMasterIfNeeded(
const int reason, const VBucketContext& context) {
if (context._server_type == REPLICA_SERVER) {
return;
}
if (reason == DEFAULT_DUMMY || reason == RPC_FAILED) {
return;
}
butil::DoublyBufferedData<VBucketServerMap>::ScopedPtr vb_map;
if(_vbucket_map.Read(&vb_map) != 0) {
LOG(ERROR) << "Failed to read vbucket map.";
return;
}
if (!IsInRebalancing(vb_map.get())) {
return;
}
const int server_num = vb_map->_servers.size();
int curr_index = context._server_index;
if (curr_index >= server_num) {
return;
}
const size_t vb_index = context._vbucket_index;
DetectedMaster& detect_master = (*_detected_vbucket_map)[vb_index];
butil::atomic<bool>& is_verified = detect_master._verified;
butil::atomic<int>& index = detect_master._index;
if (reason != SERVER_DOWN && reason != RPC_SUCCESS_BUT_WRONG_SERVER) {
if (context._server_type == MASTER_SERVER) {
return;
}
// We detected the right new master for vbucket no matter the
// response status is success or not. Record for following request
// during rebalancing.
if (curr_index != index.load(butil::memory_order_acquire)) {
index.store(curr_index, butil::memory_order_relaxed);
}
if (!is_verified.load(butil::memory_order_acquire)) {
is_verified.store(true, butil::memory_order_relaxed);
}
} else {
// Server is down or it is a wrong server of the vbucket. Go on probing
// other servers.
int dummy_index = -1;
int next_index = -1;
// Detect forward master as the first if having forwad master,
// otherwise, probe other servers.
if (dummy_index == index.load(butil::memory_order_acquire)) {
if(!GetForwardMaster(vb_map.get(), vb_index, &next_index)) {
next_index = (curr_index + 1) % server_num;
}
index.compare_exchange_strong(dummy_index, next_index,
butil::memory_order_release,
butil::memory_order_relaxed);
if (is_verified.load(butil::memory_order_acquire)) {
is_verified.store(false, butil::memory_order_relaxed);
}
} else {
next_index = (curr_index + 1) % server_num;
if (is_verified.load(butil::memory_order_acquire)) {
// Verified master server is invalid. Reset to detect again.
if (index.compare_exchange_strong(curr_index, -1,
butil::memory_order_relaxed,
butil::memory_order_relaxed)) {
is_verified.store(false, butil::memory_order_relaxed);
}
} else {
// Probe next servers.
index.compare_exchange_strong(curr_index, next_index,
butil::memory_order_release,
butil::memory_order_relaxed);
}
}
}
}
int CouchbaseChannel::GetDetectedMaster(const VBucketServerMap* vb_map,
const size_t vb_index) {
butil::atomic<int>& detected_index = (*_detected_vbucket_map)[vb_index]._index;
const int server_num = vb_map->_servers.size();
int curr_index = detected_index.load(butil::memory_order_acquire);
if (curr_index >= 0 && curr_index < server_num) {
return curr_index;
}
if (curr_index >= server_num) {
detected_index.compare_exchange_strong(
curr_index, -1, butil::memory_order_relaxed);
}
return -1;
}
bool CouchbaseChannel::UpdateVBucketServerMap(
......@@ -559,4 +1104,51 @@ int CouchbaseChannel::CheckHealth() {
return 0;
}
const std::string* GetMaster(const VBucketServerMap* vb_map,
const size_t vb_index, int* index) {
if (vb_index < vb_map->_vbucket.size()) {
const int i = vb_map->_vbucket[vb_index][0];
if (i >= 0 && i < static_cast<int>(vb_map->_servers.size())) {
if (index != nullptr) {
*index = i;
}
return &vb_map->_servers[i];
}
}
return nullptr;
}
const std::string* GetForwardMaster(const VBucketServerMap* vb_map,
const size_t vb_index, int* index) {
if (vb_index < vb_map->_fvbucket.size()) {
const int i = vb_map->_fvbucket[vb_index][0];
if (i >= 0 && i < static_cast<int>(vb_map->_servers.size())) {
if (index != nullptr) {
*index = i;
}
return &vb_map->_servers[i];
}
}
if (index != nullptr) {
*index = -1;
}
return nullptr;
}
const std::string* GetReplica(const VBucketServerMap* vb_map,
const size_t vb_index) {
if (vb_index < vb_map->_vbucket.size()) {
const int index = vb_map->_vbucket[vb_index][0];
if (index != -1) {
return &vb_map->_servers[index];
}
}
return nullptr;
}
size_t Hash(const butil::StringPiece& key, const size_t vbuckets_num) {
size_t digest = butil::hash_crc32(key.data(), key.size());
return digest & (vbuckets_num - 1);
}
} // namespace brpc
......@@ -25,11 +25,59 @@
#include "butil/containers/doubly_buffered_data.h"
namespace brpc {
// It is used to detect the new master server of vbuckets when the lastest
// vbucket mapping has not been received during rebalance.
class DetectedMaster {
public:
DetectedMaster() : _verified(false), _index(-1) {}
butil::atomic<bool> _verified;
butil::atomic<int> _index;
private:
DetectedMaster(const DetectedMaster&) = delete;
DetectedMaster& operator=(const DetectedMaster&) = delete;
};
using CouchbaseChannelMap =
std::unordered_map<std::string, std::unique_ptr<Channel>>;
using DetectedVBucketMap = std::vector<DetectedMaster>;
// Couchbase has two type of distribution used to map keys to servers.
// One is vbucket distribution and other is ketama distribution.
// This struct describes vbucket distribution of couchbase.
// 'num_replicas': the number of copies that will be stored on servers of one
// vbucket. Each vbucket must have this number of servers
// indexes plus one.
// '_vbucket': A zero-based indexed by vBucketId. The entries in the _vbucket
// are arrays of integers, where each integer is a zero-based
// index into the '_servers'.
// '_fvbucket': It is fast forward map with same struct as _vbucket. It is
// used to provide the final vBubcket-to-server map during the
// statrt of the rebalance.
// '_servers': all servers of a bucket.
// '_channel_map': the memcache channel for each server.
// TODO: support ketama vbucket distribution
struct VBucketServerMap {
uint64_t _version = 0;
int _num_replicas = 0;
std::vector<std::vector<int>> _vbucket;
std::vector<std::vector<int>> _fvbucket;
std::vector<std::string> _servers;
CouchbaseChannelMap _channel_map;
};
enum VBucketStatus {
FORWARD_CREATE = 0x00,
FORWARD_FINISH = 0x01,
FORWARD_KEEPING = 0x02,
FORWARD_CHANGE = 0x03,
MASTER_CHANGE_WITHOUT_F = 0x04,
MASTER_KEEPING_WITHOUT_F = 0x05,
NO_CHANGE = 0x06,
};
class CouchbaseServerListener;
class VBucketContext;
// A couchbase channel maps different key to sub memcache channel according to
// current vbuckets mapping. It retrieves current vbuckets mapping by maintain
......@@ -40,17 +88,27 @@ class CouchbaseServerListener;
// For async rpc, Should not delete this channel until rpc done.
class CouchbaseChannel : public ChannelBase/*non-copyable*/ {
friend class CouchbaseServerListener;
friend class VBucketContext;
friend class CouchbaseDone;
public:
CouchbaseChannel();
~CouchbaseChannel();
// You MUST initialize a couchbasechannel before using it.
// 'Server_addr': address list of couchbase servers. On these addresses, we
// can get vbucket map.
// can get vbucket map. Like following: "addr1:port1,addr2:port2"
// 'bucket_name': the bucket name of couchbase server to access.
// 'options': is used for each memcache channel of vbucket. The protocol
// should be PROTOCOL_MEMCACHE. If 'options' is null,
// use default options.
int Init(const char* server_addr, const ChannelOptions* options);
int Init(const char* server_addr, const char* bucket_name,
const ChannelOptions* options);
// 'listen_url': from this url, we can get vbucket map. Usually, it is
// somthing like following:
// "http://host:port/pools/default/bucketsStreaming/bucket_name" or
// "http://host:port/pools/default/buckets/bucket_name"
int Init(const char* listen_url, const ChannelOptions* options);
// TODO: Do not support pipeline mode now.
// Send request to the mapped channel according to the key of request.
......@@ -62,45 +120,38 @@ public:
void Describe(std::ostream& os, const DescribeOptions& options);
// Couchbase has two type of distribution used to map keys to servers.
// One is vbucket distribution and other is ketama distribution.
// This struct describes vbucket distribution of couchbase.
// 'num_replicas': the number of copies that will be stored on servers of one
// vbucket. Each vbucket must have this number of servers
// indexes plus one.
// '_vbucket': A zero-based indexed by vBucketId. The entries in the _vbucket
// are arrays of integers, where each integer is a zero-based
// index into the '_servers'.
// '_fvbucket': It is fast forward map with same struct as _vbucket. It is
// used to provide the final vBubcket-to-server map during the
// statrt of the rebalance.
// '_servers': all servers of a bucket.
// '_channel_map': the memcache channel for each server.
// TODO: support ketama vbucket distribution
struct VBucketServerMap {
uint64_t _version = 0;
int _num_replicas = 0;
std::vector<std::vector<int>> _vbucket;
std::vector<std::vector<int>> _fvbucket;
std::vector<std::string> _servers;
CouchbaseChannelMap _channel_map;
};
private:
int CheckHealth();
Channel* SelectMasterChannel(const VBucketServerMap* vb_map,
const size_t vb_index);
Channel* SelectBackupChannel(const VBucketServerMap* vb_map,
const size_t vb_index, const int reason,
VBucketContext* context);
Channel* GetMappedChannel(const std::string* server,
const VBucketServerMap* vb_map);
const CouchbaseChannelMap& GetChannelMap();
const VBucketServerMap* vbucket_map();
bool IsNeedRetry(const Controller* cntl, const VBucketContext& context,
CouchbaseResponse* response, int* reason,
std::string* error_text);
bool DoRetry(const int reason, Controller* cntl,
CouchbaseResponse* response, VBucketContext* vb_ct);
int GetDetectedMaster(const VBucketServerMap* vb_map, const size_t vb_index);
void UpdateDetectedMasterIfNeeded(const int reason,
const VBucketContext& context);
const std::string* GetMaster(const VBucketServerMap* vb_map,
const size_t vb_index, int* index = nullptr);
bool IsInRebalancing(const VBucketServerMap* vb_map) {
return !vb_map->_fvbucket.empty();
}
size_t Hash(const butil::StringPiece& key, const size_t vbuckets_num);
const std::string* GetNextRetryServer(
const VBucketStatus change, const int reason,
const VBucketServerMap* vb_map, const size_t vb_index,
VBucketContext* context);
bool UpdateVBucketServerMap(
const int num_replicas,
......@@ -121,10 +172,13 @@ private:
std::string GetAuthentication() const;
// Options for each memcache channel of vbucket.
// Options for each memcache channel of real servers.
ChannelOptions _common_options;
// Listener monitor and update vbucket map information.
// Listener monitor and update vbucket map.
std::unique_ptr<CouchbaseServerListener> _listener;
// We need detect new vbucket map due to current vbucket map is invalid
// during rebalance.
std::unique_ptr<DetectedVBucketMap> _detected_vbucket_map;
butil::DoublyBufferedData<VBucketServerMap> _vbucket_map;
};
......
......@@ -30,6 +30,7 @@
#include "brpc/policy/domain_naming_service.h"
#include "brpc/policy/remote_file_naming_service.h"
#include "brpc/policy/consul_naming_service.h"
#include "brpc/policy/couchbase_naming_service.h"
// Load Balancers
#include "brpc/policy/round_robin_load_balancer.h"
......@@ -112,6 +113,7 @@ struct GlobalExtensions {
DomainNamingService dns;
RemoteFileNamingService rfns;
ConsulNamingService cns;
CouchbaseNamingService cblns;
RoundRobinLoadBalancer rr_lb;
WeightedRoundRobinLoadBalancer wrr_lb;
......@@ -326,6 +328,7 @@ static void GlobalInitializeOrDieImpl() {
NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns);
NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns);
NamingServiceExtension()->RegisterOrDie("couchbase_list", &g_ext->cblns);
// Load Balancers
LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb);
......
// Copyright (c) 2018 Iqiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Cai,Daojin (Caidaojin@qiyi.com)
#include <stdlib.h> // strtol
#include <string> // std::string
#include <set> // std::set
#include "butil/string_splitter.h" // StringSplitter
#include "butil/strings/string_piece.h"
#include "butil/strings/string_split.h"
#include "butil/strings/string_number_conversions.h"
#include "brpc/log.h"
#include "brpc/policy/couchbase_naming_service.h"
namespace brpc {
namespace policy {
// Defined in file_naming_service.cpp
bool SplitIntoServerAndTag(const butil::StringPiece& line,
butil::StringPiece* server_addr,
butil::StringPiece* tag);
butil::Mutex CouchbaseNamingService::_mutex;
std::unordered_map<std::string, std::string> CouchbaseNamingService::servers_map;
bool CouchbaseNamingService::ParseListenUrl(
const butil::StringPiece listen_url, std::string* server,
std::string* streaming_uri, std::string* init_uri) {
do {
const size_t pos = listen_url.find("//");
if (pos == listen_url.npos) {
break;
}
const size_t host_pos = listen_url.find('/', pos + 2);
if (host_pos == listen_url.npos) {
break;
}
butil::StringPiece sub_str = listen_url.substr(pos + 2, host_pos - pos - 2);
server->clear();
server->append(sub_str.data(), sub_str.length());
butil::EndPoint point;
if (butil::str2endpoint(server->c_str(), &point) != 0) {
LOG(FATAL) << "Failed to get address and port \'"
<< server << "\'.";
break;
}
butil::StringPiece uri_sub = listen_url;
uri_sub.remove_prefix(host_pos);
size_t uri_pos = uri_sub.find("/bucketsStreaming/");
if (uri_pos != uri_sub.npos) {
streaming_uri->clear();
streaming_uri->append(uri_sub.data(), uri_sub.length());
init_uri->clear();
init_uri->append(uri_sub.data(), uri_pos);
init_uri->append("/buckets/");
butil::StringPiece bucket_name = uri_sub;
bucket_name.remove_prefix(uri_pos + std::strlen("/bucketsStreaming/"));
init_uri->append(bucket_name.data(), bucket_name.length());
return true;
}
uri_pos = uri_sub.find("/buckets/");
if (uri_pos != uri_sub.npos) {
init_uri->clear();
init_uri->append(uri_sub.data(), uri_sub.length());
streaming_uri->clear();
streaming_uri->append(uri_sub.data(), uri_pos);
streaming_uri->append("/bucketsStreaming/");
butil::StringPiece bucket_name = uri_sub;
bucket_name.remove_prefix(uri_pos + std::strlen("/buckets/"));
streaming_uri->append(bucket_name.data(), bucket_name.length());
return true;
}
} while (false);
LOG(FATAL) << "Failed to parse listen url \'" << listen_url << "\'.";
return false;
}
bool CouchbaseNamingService::ParseNamingServiceUrl(const butil::StringPiece ns_url,
std::string* listen_port) {
butil::StringPiece protocol;
std::string server_list;
const size_t pos = ns_url.find("//");
if (pos != ns_url.npos) {
protocol = ns_url.substr(0, pos);
butil::StringPiece sub = ns_url.substr(pos+2);
server_list.append(sub.data(), sub.length());
}
if (protocol != "couchbase_list:" && server_list.empty()) {
LOG(FATAL) << "Invalid couchbase naming service " << ns_url;
return false;
}
std::vector<std::string> server_array;
butil::SplitString(server_list, ',', &server_array);
listen_port->clear();
for (const std::string& addr_port : server_array) {
butil::EndPoint point;
if (butil::str2endpoint(addr_port.c_str(), &point) != 0) {
LOG(FATAL) << "Failed to get endpoint from \'" << addr_port
<< "\' of the naming server url \'" << ns_url << "\'.";
return false;
}
if (listen_port->empty()) {
*listen_port = butil::IntToString(point.port);
}
}
return true;
}
int CouchbaseNamingService::GetServers(const char *service_name,
std::vector<ServerNode>* servers) {
servers->clear();
// Sort/unique the inserted vector is faster, but may have a different order
// of addresses from the file. To make assertions in tests easier, we use
// set to de-duplicate and keep the order.
std::set<ServerNode> presence;
std::string line;
if (!service_name) {
LOG(FATAL) << "Param[service_name] is NULL";
return -1;
}
std::string new_servers(service_name);
{
BAIDU_SCOPED_LOCK(_mutex);
const auto& iter = servers_map.find(new_servers);
if (iter != servers_map.end()) {
new_servers = iter->second;
}
}
RemoveUniqueSuffix(new_servers);
for (butil::StringSplitter sp(new_servers.c_str(), ','); sp != NULL; ++sp) {
line.assign(sp.field(), sp.length());
butil::StringPiece addr;
butil::StringPiece tag;
if (!SplitIntoServerAndTag(line, &addr, &tag)) {
continue;
}
const_cast<char*>(addr.data())[addr.size()] = '\0'; // safe
butil::EndPoint point;
if (str2endpoint(addr.data(), &point) != 0 &&
hostname2endpoint(addr.data(), &point) != 0) {
LOG(ERROR) << "Invalid address=`" << addr << '\'';
continue;
}
ServerNode node;
node.addr = point;
tag.CopyToString(&node.tag);
if (presence.insert(node).second) {
servers->push_back(node);
} else {
RPC_VLOG << "Duplicated server=" << node;
}
}
RPC_VLOG << "Got " << servers->size()
<< (servers->size() > 1 ? " servers" : " server");
return 0;
}
void CouchbaseNamingService::Describe(
std::ostream& os, const DescribeOptions&) const {
os << "Couchbase_list";
return;
}
NamingService* CouchbaseNamingService::New() const {
return new CouchbaseNamingService;
}
void CouchbaseNamingService::Destroy() {
delete this;
}
void CouchbaseNamingService::ResetCouchbaseListenerServers(
const std::string& service_name, std::string& new_servers) {
BAIDU_SCOPED_LOCK(_mutex);
auto iter = servers_map.find(service_name);
if (iter != servers_map.end()) {
iter->second.swap(new_servers);
} else {
servers_map.emplace(service_name, new_servers);
}
}
std::string CouchbaseNamingService::AddUniqueSuffix(
const char* name_url, const char* unique_id) {
std::string couchbase_name_url;
couchbase_name_url.append(name_url);
couchbase_name_url.append(1, '_');
couchbase_name_url.append(unique_id);
return std::move(couchbase_name_url);
}
void CouchbaseNamingService::RemoveUniqueSuffix(std::string& name_service) {
const size_t pos = name_service.find('_');
if (pos != std::string::npos) {
name_service.resize(pos);
}
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2018 Iqiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Cai,Daojin (caidaojin@qiyi.com)
#ifndef BRPC_POLICY_COUCHBASE_NAMING_SERVICE
#define BRPC_POLICY_COUCHBASE_NAMING_SERVICE
#include <unordered_map>
#include "brpc/periodic_naming_service.h"
namespace brpc {
class CouchbaseServerListener;
}
namespace brpc {
namespace policy {
// It is only used for couchbase channel. It updates servers for listen channel
// of CouchbaseServerListener. The naming service format is like
// "couchbase_list://addr1:port,addr:port_****" where "_****" is a unique id for
// each couchbase channel since we can not share naming service and "addr*:port"
// are avalible servers for initializing.
// After initialization, it get the latest server list periodically from
// 'servers_map' by service name as key.
class CouchbaseNamingService : public PeriodicNamingService {
friend brpc::CouchbaseServerListener;
private:
static butil::Mutex _mutex;
// Store the lastest server list for each couchbase channel.
// Key is service name of each couchbase channel and value is the latest
// server list. It is like following:
// key: addr1:port,addr2:port_****
// value: addr1:port,addr2:port,addr3:port
static std::unordered_map<std::string, std::string> servers_map;
int GetServers(const char *service_name,
std::vector<ServerNode>* servers);
static bool ParseNamingServiceUrl(butil::StringPiece ns_url,
std::string* listen_port);
static bool ParseListenUrl(
const butil::StringPiece listen_url, std::string* server_address,
std::string* streaming_uri, std::string* init_uri);
// Clear naming server data when couchbase channel destroyed.
static void ClearNamingServiceData(const std::string& service_name) {
BAIDU_SCOPED_LOCK(_mutex);
servers_map.erase(service_name);
}
// Called by couchbase listener when vbucekt map changing.
// It set new server list for key 'service_name' in servers_map.
static void ResetCouchbaseListenerServers(const std::string& service_name,
std::string& new_servers);
// For couchbase listeners, we should not share this name service object.
// So we append couchbase listener address to make name_url unique.
// Input: couchbase_list://address1:port1,address2:port2
// Output: couchbase_list://address1:port1,address2:port2_****
static std::string AddUniqueSuffix(const char* name_url,
const char* unique_id);
// Reserve handling to AddPrefixBeforeAddress.
void RemoveUniqueSuffix(std::string& name_service);
void Describe(std::ostream& os, const DescribeOptions& options) const;
NamingService* New() const;
void Destroy();
};
} // namespace policy
} // namespace brpc
#endif //BRPC_POLICY_COUCHBASE_NAMING_SERVICE
......@@ -91,7 +91,10 @@ enum MemcacheBinaryCommand {
MC_BINARY_RINCR = 0x39,
MC_BINARY_RINCRQ = 0x3a,
MC_BINARY_RDECR = 0x3b,
MC_BINARY_RDECRQ = 0x3c
MC_BINARY_RDECRQ = 0x3c,
// Replicas read for couchbase
MC_BINARY_REPLICAS_READ = 0x83
// End Range operations
};
......
......@@ -64,6 +64,7 @@ static void InitSupportedCommandMap() {
butil::bit_array_set(supported_cmd_map, MC_BINARY_STAT);
butil::bit_array_set(supported_cmd_map, MC_BINARY_TOUCH);
butil::bit_array_set(supported_cmd_map, MC_BINARY_SASL_AUTH);
butil::bit_array_set(supported_cmd_map, MC_BINARY_REPLICAS_READ);
}
inline bool IsSupportedCommand(uint8_t command) {
......
......@@ -4,7 +4,7 @@
/* This library uses the reference MD5 implementation from [RFC1321] */
#define PROTOTYPES 1
#include "butil/third_party/libvbucket/rfc1321/md5c.c"
#include "butil/third_party/libvbucket/rfc1321/md5.h"
#undef PROTOTYPES
void hash_md5(const char *key, size_t key_length, unsigned char *result)
......
......@@ -24,6 +24,9 @@
*/
/* MD5 context. */
#include "butil/third_party/libvbucket/rfc1321/global.h"
typedef struct {
UINT4 state[4]; /* state (ABCD) */
UINT4 count[2]; /* number of bits, modulo 2^64 (lsb first) */
......
......@@ -23,7 +23,6 @@
documentation and/or software.
*/
#include "butil/third_party/libvbucket/rfc1321/global.h"
#include "butil/third_party/libvbucket/rfc1321/md5.h"
/* Constants for MD5Transform routine.
......
......@@ -741,6 +741,10 @@ const char *vbucket_config_get_rest_api_server(VBUCKET_CONFIG_HANDLE vb, int i)
return vb->servers[i].rest_api_authority;
}
int vbucket_config_has_forward_vbuckets(VBUCKET_CONFIG_HANDLE vb) {
return vb->fvbuckets ? 1 : 0;
}
int vbucket_config_is_config_node(VBUCKET_CONFIG_HANDLE vb, int i) {
return vb->servers[i].config_node;
}
......@@ -782,6 +786,23 @@ int vbucket_get_replica(VBUCKET_CONFIG_HANDLE vb, int vbucket, int i) {
}
}
int fvbucket_get_master(VBUCKET_CONFIG_HANDLE vb, int vbucket) {
if (vb->fvbuckets) {
return vb->fvbuckets[vbucket].servers[0];
}
return -1;
}
int fvbucket_get_replica(VBUCKET_CONFIG_HANDLE vb, int vbucket, int i) {
if (vb->fvbuckets) {
int idx = i + 1;
if (idx < vb->num_servers) {
return vb->fvbuckets[vbucket].servers[idx];
}
}
return -1;
}
int vbucket_found_incorrect_master(VBUCKET_CONFIG_HANDLE vb, int vbucket,
int wrongserver) {
int mappedServer = vb->vbuckets[vbucket].servers[0];
......
......@@ -352,6 +352,39 @@ namespace butil {
LIBVBUCKET_PUBLIC_API
int vbucket_get_replica(VBUCKET_CONFIG_HANDLE h, int id, int n);
/**
* Check whether including forward vbuckets
*
* @param id the fvbucket identifier
*
* @return true if forward vbuckets included.
*/
LIBVBUCKET_PUBLIC_API
int vbucket_config_has_forward_vbuckets(VBUCKET_CONFIG_HANDLE h);
/**
* Get the master server for the given vbucket.
*
* @param h the vbucket config
* @param id the fvbucket identifier
*
* @return the server index
*/
LIBVBUCKET_PUBLIC_API
int fvbucket_get_master(VBUCKET_CONFIG_HANDLE h, int id);
/**
* Get a given replica for a forward vbucket.
*
* @param h the vbucket config
* @param id the vbucket id
* @param n the replica number
*
* @return the server ID
*/
LIBVBUCKET_PUBLIC_API
int fvbucket_get_replica(VBUCKET_CONFIG_HANDLE h, int id, int n);
/**
* @}
*/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment