Commit bbced206 authored by wuchengcheng's avatar wuchengcheng

change auth response failure and destroy auth response msg and code style

parent afbee1f0
// Copyright (c) 2014 Baidu, Inc. // Copyright (c) 2014 Baidu, Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at
// //
// http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
// //
// Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -24,17 +24,15 @@ ...@@ -24,17 +24,15 @@
DEFINE_int32(thread_num, 10, "Number of threads to send requests"); DEFINE_int32(thread_num, 10, "Number of threads to send requests");
DEFINE_bool(use_bthread, false, "Use bthread to send requests"); DEFINE_bool(use_bthread, false, "Use bthread to send requests");
DEFINE_string(connection_type, "", DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
"Connection type. Available values: single, pooled, short");
DEFINE_string(server, "0.0.0.0:11211", "IP Address of server"); DEFINE_string(server, "0.0.0.0:11211", "IP Address of server");
DEFINE_string(bucket_name, "", "Couchbase bucktet name"); DEFINE_string(bucket_name, "", "Couchbase bucktet name");
DEFINE_string(bucket_password, "", "Couchbase bucket password"); DEFINE_string(bucket_password, "", "Couchbase bucket password");
DEFINE_string(load_balancer, "", "The algorithm for load balancing"); DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_bool(dont_fail, false, "Print fatal when some call failed"); DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
DEFINE_int32(exptime, 0, DEFINE_int32(exptime, 0, "The to-be-got data will be expired after so many seconds");
"The to-be-got data will be expired after so many seconds");
DEFINE_string(key, "hello", "The key to be get"); DEFINE_string(key, "hello", "The key to be get");
DEFINE_string(value, "world", "The value associated with the key"); DEFINE_string(value, "world", "The value associated with the key");
DEFINE_int32(batch, 1, "Pipelined Operations"); DEFINE_int32(batch, 1, "Pipelined Operations");
...@@ -44,154 +42,150 @@ bvar::Adder<int> g_error_count("client_error_count"); ...@@ -44,154 +42,150 @@ bvar::Adder<int> g_error_count("client_error_count");
butil::static_atomic<int> g_sender_count = BUTIL_STATIC_ATOMIC_INIT(0); butil::static_atomic<int> g_sender_count = BUTIL_STATIC_ATOMIC_INIT(0);
static void* sender(void* arg) { static void* sender(void* arg) {
google::protobuf::RpcChannel* channel = google::protobuf::RpcChannel* channel =
static_cast<google::protobuf::RpcChannel*>(arg); static_cast<google::protobuf::RpcChannel*>(arg);
const int base_index = const int base_index = g_sender_count.fetch_add(1, butil::memory_order_relaxed);
g_sender_count.fetch_add(1, butil::memory_order_relaxed);
std::string value; std::string value;
std::vector<std::pair<std::string, std::string>> kvs; std::vector<std::pair<std::string, std::string> > kvs;
kvs.resize(FLAGS_batch); kvs.resize(FLAGS_batch);
for (int i = 0; i < FLAGS_batch; ++i) { for (int i = 0; i < FLAGS_batch; ++i) {
kvs[i].first = kvs[i].first = butil::string_printf("%s%d", FLAGS_key.c_str(), base_index + i);
butil::string_printf("%s%d", FLAGS_key.c_str(), base_index + i); kvs[i].second = butil::string_printf("%s%d", FLAGS_value.c_str(), base_index + i);
kvs[i].second = }
butil::string_printf("%s%d", FLAGS_value.c_str(), base_index + i); brpc::MemcacheRequest request;
} for (int i = 0; i < FLAGS_batch; ++i) {
brpc::MemcacheRequest request; CHECK(request.Get(kvs[i].first));
for (int i = 0; i < FLAGS_batch; ++i) { }
CHECK(request.Get(kvs[i].first)); while (!brpc::IsAskedToQuit()) {
} // We will receive response synchronously, safe to put variables
while (!brpc::IsAskedToQuit()) { // on stack.
// We will receive response synchronously, safe to put variables brpc::MemcacheResponse response;
// on stack. brpc::Controller cntl;
brpc::MemcacheResponse response;
brpc::Controller cntl;
// Because `done'(last parameter) is NULL, this function waits until // Because `done'(last parameter) is NULL, this function waits until
// the response comes back or error occurs(including timedout). // the response comes back or error occurs(including timedout).
channel->CallMethod(NULL, &cntl, &request, &response, NULL); channel->CallMethod(NULL, &cntl, &request, &response, NULL);
const int64_t elp = cntl.latency_us(); const int64_t elp = cntl.latency_us();
if (!cntl.Failed()) { if (!cntl.Failed()) {
g_latency_recorder << cntl.latency_us(); g_latency_recorder << cntl.latency_us();
for (int i = 0; i < FLAGS_batch; ++i) { for (int i = 0; i < FLAGS_batch; ++i) {
uint32_t flags; uint32_t flags;
if (!response.PopGet(&value, &flags, NULL)) { if (!response.PopGet(&value, &flags, NULL)) {
LOG(INFO) << "Fail to GET the key, " << response.LastError(); LOG(INFO) << "Fail to GET the key, " << response.LastError();
brpc::AskToQuit(); brpc::AskToQuit();
return NULL; return NULL;
}
CHECK(flags == 0xdeadbeef + base_index + i)
<< "flags=" << flags;
CHECK(kvs[i].second == value)
<< "base=" << base_index << " i=" << i << " value=" << value;
}
} else {
g_error_count << 1;
CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
<< "error=" << cntl.ErrorText() << " latency=" << elp;
// We can't connect to the server, sleep a while. Notice that this
// is a specific sleeping to prevent this thread from spinning too
// fast. You should continue the business logic in a production
// server rather than sleeping.
bthread_usleep(50000);
} }
CHECK(flags == 0xdeadbeef + base_index + i) << "flags=" << flags;
CHECK(kvs[i].second == value) << "base=" << base_index << " i=" << i
<< " value=" << value;
}
} else {
g_error_count << 1;
CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
<< "error=" << cntl.ErrorText() << " latency=" << elp;
// We can't connect to the server, sleep a while. Notice that this
// is a specific sleeping to prevent this thread from spinning too
// fast. You should continue the business logic in a production
// server rather than sleeping.
bthread_usleep(50000);
} }
} return NULL;
return NULL;
} }
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well. // Parse gflags. We recommend you to use gflags as well.
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_exptime < 0) { if (FLAGS_exptime < 0) {
FLAGS_exptime = 0; FLAGS_exptime = 0;
}
// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;
brpc::policy::CouchbaseAuthenticator auth(FLAGS_bucket_name,
FLAGS_bucket_password);
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_MEMCACHE;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms /*milliseconds*/;
options.max_retry = FLAGS_max_retry;
options.auth = &auth;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(),
&options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1;
}
// Pipeline #batch * #thread_num SET requests into memcache so that we
// have keys to get.
brpc::MemcacheRequest request;
brpc::MemcacheResponse response;
brpc::Controller cntl;
for (int i = 0; i < FLAGS_batch * FLAGS_thread_num; ++i) {
if (!request.Set(butil::string_printf("%s%d", FLAGS_key.c_str(), i),
butil::string_printf("%s%d", FLAGS_value.c_str(), i),
0xdeadbeef + i, FLAGS_exptime, 0)) {
LOG(ERROR) << "Fail to SET " << i << "th request";
return -1;
} }
}
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access memcache, " << cntl.ErrorText();
return -1;
}
for (int i = 0; i < FLAGS_batch * FLAGS_thread_num; ++i) {
if (!response.PopSet(NULL)) {
LOG(ERROR) << "Fail to SET memcache, i=" << i << ", "
<< response.LastError();
CHECK(response.IsAuthFailure());
return -1;
}
}
if (FLAGS_exptime > 0) {
LOG(INFO) << "Set " << FLAGS_batch * FLAGS_thread_num
<< " values, expired after " << FLAGS_exptime << " seconds";
} else {
LOG(INFO) << "Set " << FLAGS_batch * FLAGS_thread_num
<< " values, never expired";
}
std::vector<bthread_t> tids; // A Channel represents a communication line to a Server. Notice that
tids.resize(FLAGS_thread_num); // Channel is thread-safe and can be shared by all threads in your program.
if (!FLAGS_use_bthread) { brpc::Channel channel;
for (int i = 0; i < FLAGS_thread_num; ++i) { brpc::policy::CouchbaseAuthenticator auth(FLAGS_bucket_name,
if (pthread_create(&tids[i], NULL, sender, &channel) != 0) { FLAGS_bucket_password);
LOG(ERROR) << "Fail to create pthread";
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_MEMCACHE;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
options.auth = &auth;
if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
return -1; return -1;
}
} }
} else {
for (int i = 0; i < FLAGS_thread_num; ++i) { // Pipeline #batch * #thread_num SET requests into memcache so that we
if (bthread_start_background(&tids[i], NULL, sender, &channel) != 0) { // have keys to get.
LOG(ERROR) << "Fail to create bthread"; brpc::MemcacheRequest request;
brpc::MemcacheResponse response;
brpc::Controller cntl;
for (int i = 0; i < FLAGS_batch * FLAGS_thread_num; ++i) {
if (!request.Set(butil::string_printf("%s%d", FLAGS_key.c_str(), i),
butil::string_printf("%s%d", FLAGS_value.c_str(), i),
0xdeadbeef + i, FLAGS_exptime, 0)) {
LOG(ERROR) << "Fail to SET " << i << "th request";
return -1;
}
}
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "Fail to access memcache, " << cntl.ErrorText();
return -1; return -1;
}
} }
} for (int i = 0; i < FLAGS_batch * FLAGS_thread_num; ++i) {
if (!response.PopSet(NULL)) {
while (!brpc::IsAskedToQuit()) { LOG(ERROR) << "Fail to SET memcache, i=" << i
sleep(1); << ", " << response.LastError();
LOG(INFO) << "Accessing memcache server at qps=" return -1;
<< g_latency_recorder.qps(1) }
<< " latency=" << g_latency_recorder.latency(1); }
} if (FLAGS_exptime > 0) {
LOG(INFO) << "Set " << FLAGS_batch * FLAGS_thread_num
LOG(INFO) << "memcache_client is going to quit"; << " values, expired after " << FLAGS_exptime << " seconds";
for (int i = 0; i < FLAGS_thread_num; ++i) { } else {
LOG(INFO) << "Set " << FLAGS_batch * FLAGS_thread_num
<< " values, never expired";
}
std::vector<bthread_t> tids;
tids.resize(FLAGS_thread_num);
if (!FLAGS_use_bthread) { if (!FLAGS_use_bthread) {
pthread_join(tids[i], NULL); for (int i = 0; i < FLAGS_thread_num; ++i) {
if (pthread_create(&tids[i], NULL, sender, &channel) != 0) {
LOG(ERROR) << "Fail to create pthread";
return -1;
}
}
} else { } else {
bthread_join(tids[i], NULL); for (int i = 0; i < FLAGS_thread_num; ++i) {
if (bthread_start_background(
&tids[i], NULL, sender, &channel) != 0) {
LOG(ERROR) << "Fail to create bthread";
return -1;
}
}
}
while (!brpc::IsAskedToQuit()) {
sleep(1);
LOG(INFO) << "Accessing memcache server at qps=" << g_latency_recorder.qps(1)
<< " latency=" << g_latency_recorder.latency(1);
}
LOG(INFO) << "memcache_client is going to quit";
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (!FLAGS_use_bthread) {
pthread_join(tids[i], NULL);
} else {
bthread_join(tids[i], NULL);
}
} }
}
return 0; return 0;
} }
...@@ -230,12 +230,6 @@ public: ...@@ -230,12 +230,6 @@ public:
static const char* status_str(Status); static const char* status_str(Status);
// Although the implementation of this function looks somewhat weird,
// we did not find a better way to provide auth failure check for user codes.
bool IsAuthFailure() const {
return (_err == "Auth failure");
}
private: private:
bool PopCounter(uint8_t command, uint64_t* new_value, uint64_t* cas_value); bool PopCounter(uint8_t command, uint64_t* new_value, uint64_t* cas_value);
bool PopStore(uint8_t command, uint64_t* cas_value); bool PopStore(uint8_t command, uint64_t* cas_value);
......
...@@ -35,22 +35,21 @@ constexpr char kPadding[1] = {'\0'}; ...@@ -35,22 +35,21 @@ constexpr char kPadding[1] = {'\0'};
// To get the couchbase authentication protocol, see // To get the couchbase authentication protocol, see
// https://developer.couchbase.com/documentation/server/3.x/developer/dev-guide-3.0/sasl.html // https://developer.couchbase.com/documentation/server/3.x/developer/dev-guide-3.0/sasl.html
int CouchbaseAuthenticator::GenerateCredential(std::string* auth_str) const { int CouchbaseAuthenticator::GenerateCredential(std::string* auth_str) const {
const brpc::policy::MemcacheRequestHeader header = { const brpc::policy::MemcacheRequestHeader header = {
brpc::policy::MC_MAGIC_REQUEST, brpc::policy::MC_BINARY_SASL_AUTH, brpc::policy::MC_MAGIC_REQUEST, brpc::policy::MC_BINARY_SASL_AUTH,
butil::HostToNet16(sizeof(kPlainAuthCommand) - 1), 0, 0, 0, butil::HostToNet16(sizeof(kPlainAuthCommand) - 1), 0, 0, 0,
butil::HostToNet32(sizeof(kPlainAuthCommand) + 1 + butil::HostToNet32(sizeof(kPlainAuthCommand) + 1 +
bucket_name_.length() * 2 + bucket_password_.length()), bucket_name_.length() * 2 + bucket_password_.length()),
0, 0}; 0, 0};
butil::IOBuf in; auth_str->clear();
in.append(&header, sizeof(header)); auth_str->append(reinterpret_cast<const char*>(&header), sizeof(header));
in.append(kPlainAuthCommand, sizeof(kPlainAuthCommand) - 1); auth_str->append(kPlainAuthCommand, sizeof(kPlainAuthCommand) - 1);
in.append(bucket_name_.c_str(), bucket_name_.length()); auth_str->append(bucket_name_);
in.append(kPadding, sizeof(kPadding)); auth_str->append(kPadding, sizeof(kPadding));
in.append(bucket_name_.c_str(), bucket_name_.length()); auth_str->append(bucket_name_);
in.append(kPadding, sizeof(kPadding)); auth_str->append(kPadding, sizeof(kPadding));
in.append(bucket_password_.c_str(), bucket_password_.length()); auth_str->append(bucket_password_);
auth_str->assign(in.to_string().c_str(), in.size()); return 0;
return 0;
} }
} // namespace policy } // namespace policy
......
...@@ -130,8 +130,11 @@ ParseResult ParseMemcacheMessage(butil::IOBuf* source, ...@@ -130,8 +130,11 @@ ParseResult ParseMemcacheMessage(butil::IOBuf* source,
if (header->status != 0) { if (header->status != 0) {
LOG(ERROR) << "Failed to authenticate the couchbase bucket." LOG(ERROR) << "Failed to authenticate the couchbase bucket."
<< "All the following commands will result in auth failure."; << "All the following commands will result in auth failure.";
return MakeParseError(PARSE_ERROR_NO_RESOURCE,
"Fail to authenticate with the couchbase bucket");
} }
msg = static_cast<MostCommonMessage*>(socket->release_parsing_context()); DestroyingPtr<MostCommonMessage> auth_msg(
static_cast<MostCommonMessage*>(socket->release_parsing_context()));
socket->GivebackPipelinedInfo(pi); socket->GivebackPipelinedInfo(pi);
} else { } else {
if (++msg->pi.count >= pi.count) { if (++msg->pi.count >= pi.count) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment