Commit c1a267bf authored by wuchengcheng's avatar wuchengcheng

support couchbase autentication in memcache client

parent fae306f0
BRPC_PATH = ../../
include $(BRPC_PATH)/config.mk
CXXFLAGS+=$(CPPFLAGS) -std=c++0x -DNDEBUG -O2 -D__const__= -pipe -W -Wall -fPIC -fno-omit-frame-pointer
HDRS+=$(BRPC_PATH)/output/include
LIBS+=$(BRPC_PATH)/output/lib
HDRPATHS = $(addprefix -I, $(HDRS))
LIBPATHS = $(addprefix -L, $(LIBS))
COMMA=,
SOPATHS=$(addprefix -Wl$(COMMA)-rpath=, $(LIBS))
STATIC_LINKINGS += -lbrpc
SOURCES = $(wildcard *.cpp)
OBJS = $(addsuffix .o, $(basename $(SOURCES)))
.PHONY:all
all: couchbase_client
.PHONY:clean
clean:
@echo "Cleaning"
@rm -rf couchbase_client $(OBJS)
couchbase_client:$(OBJS)
@echo "Linking $@"
ifneq ("$(LINK_SO)", "")
@$(CXX) $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(DYNAMIC_LINKINGS) -o $@
else
@$(CXX) $(LIBPATHS) -Xlinker "-(" $^ -Wl,-Bstatic $(STATIC_LINKINGS) -Wl,-Bdynamic -Xlinker "-)" $(DYNAMIC_LINKINGS) -o $@
endif
%.o:%.cpp
@echo "Compiling $@"
@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
%.o:%.cc
@echo "Compiling $@"
@$(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A multi-threaded client getting keys from a memcache server constantly.
#include <gflags/gflags.h>
#include <bthread/bthread.h>
#include <butil/logging.h>
#include <butil/string_printf.h>
#include <brpc/channel.h>
#include <brpc/memcache.h>
#include <brpc/policy/couchbase_authenticator.h>
DEFINE_int32(thread_num, 10, "Number of threads to send requests");
DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_string(connection_type, "",
"Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:11211", "IP Address of server");
DEFINE_string(bucket_name, "", "Couchbase bucktet name");
DEFINE_string(bucket_password, "", "Couchbase bucket password");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
DEFINE_int32(exptime, 0,
"The to-be-got data will be expired after so many seconds");
DEFINE_string(key, "hello", "The key to be get");
DEFINE_string(value, "world", "The value associated with the key");
DEFINE_int32(batch, 1, "Pipelined Operations");
bvar::LatencyRecorder g_latency_recorder("client");
bvar::Adder<int> g_error_count("client_error_count");
butil::static_atomic<int> g_sender_count = BUTIL_STATIC_ATOMIC_INIT(0);
static void* sender(void* arg) {
google::protobuf::RpcChannel* channel =
static_cast<google::protobuf::RpcChannel*>(arg);
const int base_index =
g_sender_count.fetch_add(1, butil::memory_order_relaxed);
std::string value;
std::vector<std::pair<std::string, std::string>> kvs;
kvs.resize(FLAGS_batch);
for (int i = 0; i < FLAGS_batch; ++i) {
kvs[i].first =
butil::string_printf("%s%d", FLAGS_key.c_str(), base_index + i);
kvs[i].second =
butil::string_printf("%s%d", FLAGS_value.c_str(), base_index + i);
}
brpc::MemcacheRequest request;
for (int i = 0; i < FLAGS_batch; ++i) {
CHECK(request.Get(kvs[i].first));
}
while (!brpc::IsAskedToQuit()) {
// We will receive response synchronously, safe to put variables
// on stack.
brpc::MemcacheResponse response;
brpc::Controller cntl;
// Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout).
channel->CallMethod(NULL, &cntl, &request, &response, NULL);
const int64_t elp = cntl.latency_us();
if (!cntl.Failed()) {
g_latency_recorder << cntl.latency_us();
for (int i = 0; i < FLAGS_batch; ++i) {
uint32_t flags;
if (!response.PopGet(&value, &flags, NULL)) {
LOG(INFO) << "Fail to GET the key, " << response.LastError();
brpc::AskToQuit();
return NULL;
}
CHECK(flags == 0xdeadbeef + base_index + i) << "flags=" << flags;
CHECK(kvs[i].second == value) << "base=" << base_index << " i=" << i
<< " value=" << value;
}
} else {
g_error_count << 1;
CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
<< "error=" << cntl.ErrorText() << " latency=" << elp;
// We can't connect to the server, sleep a while. Notice that this
// is a specific sleeping to prevent this thread from spinning too
// fast. You should continue the business logic in a production
// server rather than sleeping.
bthread_usleep(50000);
}
}
return NULL;
}
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_exptime < 0) {
FLAGS_exptime = 0;
}
// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;
brpc::policy::CouchbaseAuthenticator auth(FLAGS_bucket_name,
FLAGS_bucket_password);
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_MEMCACHE;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
options.max_retry = FLAGS_max_retry;
options.auth = &auth;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(),
&options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
// Pipeline #batch * #thread_num SET requests into memcache so that we
// have keys to get.
brpc::MemcacheRequest request;
brpc::MemcacheResponse response;
brpc::Controller cntl;
for (int i = 0; i < FLAGS_batch * FLAGS_thread_num; ++i) {
if (!request.Set(butil::string_printf("%s%d", FLAGS_key.c_str(), i),
butil::string_printf("%s%d", FLAGS_value.c_str(), i),
0xdeadbeef + i, FLAGS_exptime, 0)) {
LOG(ERROR) << "Fail to SET " << i << "th request";
return -1;
}
}
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access memcache, " << cntl.ErrorText();
return -1;
}
for (int i = 0; i < FLAGS_batch * FLAGS_thread_num; ++i) {
if (!response.PopSet(NULL)) {
LOG(ERROR) << "Fail to SET memcache, i=" << i << ", "
<< response.LastError();
CHECK(response.IsAuthFailure());
return -1;
}
}
if (FLAGS_exptime > 0) {
LOG(INFO) << "Set " << FLAGS_batch * FLAGS_thread_num
<< " values, expired after " << FLAGS_exptime << " seconds";
} else {
LOG(INFO) << "Set " << FLAGS_batch * FLAGS_thread_num
<< " values, never expired";
}
std::vector<bthread_t> tids;
tids.resize(FLAGS_thread_num);
if (!FLAGS_use_bthread) {
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (pthread_create(&tids[i], NULL, sender, &channel) != 0) {
LOG(ERROR) << "Fail to create pthread";
return -1;
}
}
} else {
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (bthread_start_background(&tids[i], NULL, sender, &channel) != 0) {
LOG(ERROR) << "Fail to create bthread";
return -1;
}
}
}
while (!brpc::IsAskedToQuit()) {
sleep(1);
LOG(INFO) << "Accessing memcache server at qps="
<< g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}
LOG(INFO) << "memcache_client is going to quit";
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (!FLAGS_use_bthread) {
pthread_join(tids[i], NULL);
} else {
bthread_join(tids[i], NULL);
}
}
return 0;
}
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author(s): Chengcheng Wu <wuchengcheng@qiyi.com>
#include "brpc/policy/couchbase_authenticator.h"
#include "butil/base64.h"
#include "butil/iobuf.h"
#include "butil/string_printf.h"
#include "butil/sys_byteorder.h"
#include "brpc/policy/memcache_binary_header.h"
namespace brpc {
namespace policy {
namespace {
constexpr char kPlainAuthCommand[] = "PLAIN";
constexpr char kPadding[1] = {'\0'};
} // namespace
// To get the couchbase authentication protocol, see
// https://developer.couchbase.com/documentation/server/3.x/developer/dev-guide-3.0/sasl.html
int CouchbaseAuthenticator::GenerateCredential(std::string* auth_str) const {
const brpc::policy::MemcacheRequestHeader header = {
brpc::policy::MC_MAGIC_REQUEST, brpc::policy::MC_BINARY_SASL_AUTH,
butil::HostToNet16(sizeof(kPlainAuthCommand) - 1), 0, 0, 0,
butil::HostToNet32(sizeof(kPlainAuthCommand) + 1 +
bucket_name_.length() * 2 + bucket_password_.length()),
0, 0};
butil::IOBuf in;
in.append(&header, sizeof(header));
in.append(kPlainAuthCommand, sizeof(kPlainAuthCommand) - 1);
in.append(bucket_name_.c_str(), bucket_name_.length());
in.append(kPadding, sizeof(kPadding));
in.append(bucket_name_.c_str(), bucket_name_.length());
in.append(kPadding, sizeof(kPadding));
in.append(bucket_password_.c_str(), bucket_password_.length());
auth_str->assign(in.to_string().c_str(), in.size());
return 0;
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author(s): Chengcheng Wu <wuchengcheng@qiyi.com>
#ifndef BRPC_POLICY_COUCHBASE_AUTHENTICATOR_H
#define BRPC_POLICY_COUCHBASE_AUTHENTICATOR_H
#include "brpc/authenticator.h"
namespace brpc {
namespace policy {
// Request to couchbase for authentication.
// Notice that authentication for couchbase in special SASLAuthProtocol.
// Couchbase Server 2.2 provide CRAM-MD5 support for SASL authentication,
// but Couchbase Server prior to 2.2 using PLAIN SASL authentication.
class CouchbaseAuthenticator : public Authenticator {
public:
CouchbaseAuthenticator(const std::string& bucket_name,
const std::string& bucket_password)
: bucket_name_(bucket_name), bucket_password_(bucket_password) {}
int GenerateCredential(std::string* auth_str) const;
int VerifyCredential(const std::string&, const butil::EndPoint&,
brpc::AuthContext*) const {
return 0;
}
private:
const std::string bucket_name_;
const std::string bucket_password_;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_COUCHBASE_AUTHENTICATOR_H
......@@ -63,6 +63,7 @@ static void InitSupportedCommandMap() {
butil::bit_array_set(supported_cmd_map, MC_BINARY_PREPEND);
butil::bit_array_set(supported_cmd_map, MC_BINARY_STAT);
butil::bit_array_set(supported_cmd_map, MC_BINARY_TOUCH);
butil::bit_array_set(supported_cmd_map, MC_BINARY_SASL_AUTH);
}
inline bool IsSupportedCommand(uint8_t command) {
......@@ -125,14 +126,23 @@ ParseResult ParseMemcacheMessage(butil::IOBuf* source,
msg->meta.append(&local_header, sizeof(local_header));
source->pop_front(sizeof(*header));
source->cutn(&msg->meta, total_body_length);
if (++msg->pi.count >= pi.count) {
CHECK_EQ(msg->pi.count, pi.count);
if (header->command == MC_BINARY_SASL_AUTH) {
if (header->status != 0) {
LOG(ERROR) << "Failed to authenticate the couchbase bucket."
<< "All the following commands will result in auth failure.";
}
msg = static_cast<MostCommonMessage*>(socket->release_parsing_context());
msg->pi = pi;
return MakeMessage(msg);
} else {
socket->GivebackPipelinedInfo(pi);
}
} else {
if (++msg->pi.count >= pi.count) {
CHECK_EQ(msg->pi.count, pi.count);
msg = static_cast<MostCommonMessage*>(socket->release_parsing_context());
msg->pi = pi;
return MakeMessage(msg);
} else {
socket->GivebackPipelinedInfo(pi);
}
}
}
}
......@@ -196,9 +206,16 @@ void PackMemcacheRequest(butil::IOBuf* buf,
SocketMessage**,
uint64_t /*correlation_id*/,
const google::protobuf::MethodDescriptor*,
Controller*,
Controller* cntl,
const butil::IOBuf& request,
const Authenticator* /*auth*/) {
const Authenticator* auth) {
if (auth) {
std::string auth_str;
if (auth->GenerateCredential(&auth_str) != 0) {
return cntl->SetFailed(EREQUEST, "Fail to generate credential");
}
buf->append(auth_str);
}
buf->append(request);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment