Commit c5c94016 authored by root's avatar root Committed by caidaojin

couchbase proposal

parent 0b4117d8
// Copyright (c) 2018 Qiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Daojin Cai (caidaojin@qiyi.com)
#ifndef BRPC_COUCHBASE_H
#define BRPC_COUCHBASE_H
#include "brpc/memcache.h"
namespace brpc {
// Request to couchbase.
// Do not support pipeline multiple operations in one request and sent now.
class CouchbaseRequest : public MemcacheRequest {
public:
void Swap(CouchbaseRequest* other) {
MemcacheRequest::Swap(other);
}
bool Get(const butil::StringPiece& key) {
MemcacheRequest::Clear();
return MemcacheRequest::Get(key);
}
bool Set(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Set(key, value, flags, exptime, cas_value);
}
bool Add(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Add(key, value, flags, exptime, cas_value);
}
bool Replace(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Replace(key, value, flags, exptime, cas_value);
}
bool Append(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Append(key, value, flags, exptime, cas_value);
}
bool Prepend(const butil::StringPiece& key, const butil::StringPiece& value,
uint32_t flags, uint32_t exptime, uint64_t cas_value) {
MemcacheRequest::Clear();
return MemcacheRequest::Prepend(key, value, flags, exptime, cas_value);
}
bool Delete(const butil::StringPiece& key) {
MemcacheRequest::Clear();
return MemcacheRequest::Delete(key);
}
bool Flush(uint32_t timeout) {
MemcacheRequest::Clear();
return MemcacheRequest::Flush(timeout);
}
bool Increment(const butil::StringPiece& key, uint64_t delta,
uint64_t initial_value, uint32_t exptime) {
MemcacheRequest::Clear();
return MemcacheRequest::Increment(key, delta, initial_value, exptime);
}
bool Decrement(const butil::StringPiece& key, uint64_t delta,
uint64_t initial_value, uint32_t exptime) {
MemcacheRequest::Clear();
return MemcacheRequest::Decrement(key, delta, initial_value, exptime);
}
bool Touch(const butil::StringPiece& key, uint32_t exptime) {
MemcacheRequest::Clear();
return MemcacheRequest::Touch(key, exptime);
}
bool Version() {
MemcacheRequest::Clear();
return MemcacheRequest::Version();
}
CouchbaseRequest* New() const { return new CouchbaseRequest;}
void CopyFrom(const CouchbaseRequest& from) {
MemcacheRequest::CopyFrom(from);
}
private:
void MergeFrom(const CouchbaseRequest& from);
int pipelined_count();
};
// Request to couchbase.
// Do not support pipeline multiple operations in one request and sent now.
class CouchbaseResponse : public MemcacheResponse {
public:
void Swap(CouchbaseResponse* other) {
MemcacheResponse::Swap(other);
}
CouchbaseResponse* New() const { return new CouchbaseResponse;}
void CopyFrom(const CouchbaseResponse& from) {
MemcacheResponse::CopyFrom(from);
}
private:
void MergeFrom(const CouchbaseResponse& from);
int pipelined_count();
};
} // namespace brpc
#endif // BRPC_COUCHBASE_H
// Copyright (c) 2018 Iqiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Daojin Cai (caidaojin@qiyi.com)
#include "brpc/couchbase_channel.h"
#include "bthread/bthread.h"
namespace brpc {
class CouchbaseServerListener {
public:
CouchbaseServerListener(const char* server_addr,
const CouchbaseChannel* channel)
: _server_addr(server_addr), _channel(channel) {
//TODO: Init vbucket map for first time.
CHECK(bthread_start_background(
&_bthread_id, nullptr, ListenThread, this) == 0)
<< "Failed to start ListenThread.";
}
~CouchbaseServerListener() {
bthread_stop(_bthread_id);
bthread_join(_bthread_id, nullptr);
};
private:
CouchbaseServerListener(const CouchbaseServerListener&) = delete;
CouchbaseServerListener& operator=(const CouchbaseServerListener&) = delete;
static void* ListenThread(void* arg);
bthread_t _bthread_id;
const std::string _server_addr;
const CouchbaseChannel* _channel;
std::vector<std::string> _vbucket_servers;
};
//TODO: Get current vbucket map of couchbase server
void* CouchbaseServerListener::ListenThread(void* arg) {
return nullptr;
}
CouchbaseChannel::~CouchbaseChannel() {
_listener.reset(nullptr);
}
int CouchbaseChannel::Init(const char* server_addr,
const ChannelOptions* options) {
if (options != nullptr) {
if (options->protocol != PROTOCOL_UNKNOWN &&
options->protocol != PROTOCOL_MEMCACHE) {
LOG(FATAL) << "Failed to init channel due to invalid protoc "
<< options->protocol.name() << '.';
return -1;
}
_common_options = *options;
_common_options.protocol = PROTOCOL_MEMCACHE;
} else {
// TODO: use a default options.
}
auto ptr = new CouchbaseServerListener(server_addr, this);
if (ptr == nullptr) {
LOG(FATAL) << "Failed to init CouchbaseChannel to " << server_addr << '.';
return -1;
}
_listener.reset(ptr);
return 0;
}
void CouchbaseChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) {
bool success = false;
butil::StringPiece key;
if (GetKeyFromRequest(request, &key)) {
butil::DoublyBufferedData<VBucketServerMap>::ScopedPtr vbucket_map;
if(_vbucket_map.Read(&vbucket_map) == 0) {
Channel* mapped_channel = SelectChannel(key, vbucket_map.get());
if (mapped_channel != nullptr) {
mapped_channel->CallMethod(nullptr,
controller,
request,
response,
done);
success = true;
}
} else {
LOG(ERROR) << "Failed to read vbucket map.";
}
} else {
LOG(ERROR) << "Failed to get key from request.";
}
if (!success) {
controller->SetFailed("Failed to send request");
}
}
bool CouchbaseChannel::GetKeyFromRequest(const google::protobuf::Message* request,
butil::StringPiece* key) {
return true;
}
Channel* CouchbaseChannel::SelectChannel(
const butil::StringPiece& key, const VBucketServerMap* vbucket_map) {
size_t index = Hash(vbucket_map->_hash_algorithm,
key, vbucket_map->_vbucket_servers.size());
auto iter = vbucket_map->_channel_map.find(
vbucket_map->_vbucket_servers[index]);
if (iter != vbucket_map->_channel_map.end()) {
return iter->second.get();
} else {
LOG(ERROR) << "Failed to find mapped channel.";
}
return nullptr;
}
//TODO: Get different hash algorithm if needed.
size_t CouchbaseChannel::Hash(const std::string& type,
const butil::StringPiece& key,
const size_t size) {
return 0;
}
bool CouchbaseChannel::UpdateVBucketServerMap(
const std::string* hash_algo,
std::vector<std::string>* vbucket_servers,
const std::vector<std::string>* added_vbuckets,
const std::vector<std::string>* removed_vbuckets) {
auto fn = std::bind(Update,
std::placeholders::_1,
&_common_options,
hash_algo,
vbucket_servers,
added_vbuckets,
removed_vbuckets);
return _vbucket_map.Modify(fn);
}
bool CouchbaseChannel::Update(VBucketServerMap& vbucket_map,
const ChannelOptions* options,
const std::string* hash_algo,
std::vector<std::string>* vbucket_servers,
const std::vector<std::string>* added_vbuckets,
const std::vector<std::string>* removed_vbuckets) {
bool ret = true;
if (hash_algo != nullptr) {
vbucket_map._hash_algorithm = *hash_algo;
}
if (vbucket_servers != nullptr) {
vbucket_map._vbucket_servers.swap(*vbucket_servers);
}
if (added_vbuckets != nullptr) {
for (const auto& servers: *added_vbuckets) {
std::unique_ptr<Channel> p(new Channel());
if (p == nullptr) {
LOG(FATAL) << "Failed to init channel.";
return false;
}
if (p->Init(servers.c_str(), "rr", options) != 0) {
LOG(FATAL) << "Failed to init channel.";
return false;
}
auto pair = vbucket_map._channel_map.emplace(servers, std::move(p));
if (!pair.second) {
LOG(ERROR) << "Failed to add new channel to server: " << servers;
ret = false;
}
}
}
if (removed_vbuckets != nullptr) {
for (const auto& servers: *removed_vbuckets) {
auto n = vbucket_map._channel_map.erase(servers);
if (n == 0) {
LOG(ERROR) << "Failed to remove channel to server: " << servers;
ret = false;
}
}
}
return ret;
}
} // namespace brpc
// Copyright (c) 2018 Iqiyi, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Authors: Daojin Cai (caidaojin@qiyi.com)
#ifndef BRPC_COUCHBASE_CHANNEL_H
#define BRPC_COUCHBASE_CHANNEL_H
#include <vector>
#include <unordered_map>
#include "brpc/channel.h"
#include "butil/containers/doubly_buffered_data.h"
namespace brpc {
class CouchbaseServerListener;
class CouchbaseChannel : public ChannelBase/*non-copyable*/ {
public:
CouchbaseChannel() = default;
~CouchbaseChannel();
// You MUST initialize a couchbasechannel before using it. 'Server_addr'
// is address of couchbase server. 'options' is used for each channel to
// real servers of bucket. The protocol should be PROTOCOL_MEMCACHE.
// If 'options' is null, use default options.
int Init(const char* server_addr, const ChannelOptions* options);
// TODO: Do not support pipeline mode now.
// Send request to the mapped channel according to the key of request.
void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done);
void Describe(std::ostream& os, const DescribeOptions& options) const;
private:
// TODO: This struct describes map between vbucket and real memcache server.
// '_hash_algorithm': The hash algorithm couchbase used.
// '_vbucket_servers': server list of vbuckets, like "list://addr1:port1,
// addr2:port2...".
// '_channel_map': the channel for each vbucket.
struct VBucketServerMap {
std::string _hash_algorithm;
std::vector<std::string> _vbucket_servers;
std::unordered_map<std::string, std::unique_ptr<Channel>> _channel_map;
};
int CheckHealth();
bool GetKeyFromRequest(const google::protobuf::Message* request,
butil::StringPiece* key);
Channel* SelectChannel(const butil::StringPiece& key,
const VBucketServerMap* vbucket_map);
//TODO: Get different hash algorithm if needed.
size_t Hash(const std::string& type,
const butil::StringPiece& key,
const size_t size);
bool UpdateVBucketServerMap(
const std::string* hash_algo,
std::vector<std::string>* vbucket_servers,
const std::vector<std::string>* added_vbuckets,
const std::vector<std::string>* removed_vbuckets);
static bool Update(VBucketServerMap& vbucket_map,
const ChannelOptions* options,
const std::string* hash_algo,
std::vector<std::string>* vbucket_servers,
const std::vector<std::string>* added_vbuckets,
const std::vector<std::string>* removed_vbuckets);
// Options for each memcache channel of vbucket.
ChannelOptions _common_options;
std::unique_ptr<CouchbaseServerListener> _listener;
// Memcache channel of each vbucket of couchbase. The key is the server list
// of this vbucket, like 'list://addr1:port1,addr2:port2...'.
butil::DoublyBufferedData<VBucketServerMap> _vbucket_map;
};
} // namespace brpc
#endif // BRPC_COUCHBASE_CHANNEL_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment