Unverified Commit 7c28b568 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #134 from feng-y/master

add redis auth.
parents f28785b1 e7959dca
...@@ -1076,6 +1076,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) { ...@@ -1076,6 +1076,7 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
wopt.id_wait = cid; wopt.id_wait = cid;
wopt.abstime = pabstime; wopt.abstime = pabstime;
wopt.pipelined_count = _pipelined_count; wopt.pipelined_count = _pipelined_count;
wopt.with_auth = has_flag(FLAGS_REQUEST_WITH_AUTH);
wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED); wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED);
int rc; int rc;
size_t packet_size = 0; size_t packet_size = 0;
......
...@@ -116,6 +116,7 @@ friend void policy::ProcessMongoRequest(InputMessageBase*); ...@@ -116,6 +116,7 @@ friend void policy::ProcessMongoRequest(InputMessageBase*);
static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11); static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11);
static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12); static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12);
static const uint32_t FLAGS_USED_BY_RPC = (1 << 13); static const uint32_t FLAGS_USED_BY_RPC = (1 << 13);
static const uint32_t FLAGS_REQUEST_WITH_AUTH = (1 << 14);
public: public:
Controller(); Controller();
......
...@@ -125,6 +125,11 @@ public: ...@@ -125,6 +125,11 @@ public:
void set_readable_progressive_attachment(ReadableProgressiveAttachment* s) void set_readable_progressive_attachment(ReadableProgressiveAttachment* s)
{ _cntl->_rpa.reset(s); } { _cntl->_rpa.reset(s); }
ControllerPrivateAccessor &set_with_auth(bool with_auth) {
_cntl->set_flag(Controller::FLAGS_REQUEST_WITH_AUTH, with_auth);
return *this;
}
bool with_auth() const { return _cntl->has_flag(Controller::FLAGS_REQUEST_WITH_AUTH); }
private: private:
Controller* _cntl; Controller* _cntl;
}; };
......
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author(s): Feng Yan <yanfeng@qiyi.com>
#include "brpc/policy/redis_authenticator.h"
#include "butil/base64.h"
#include "butil/iobuf.h"
#include "butil/string_printf.h"
#include "butil/sys_byteorder.h"
#include "brpc/redis_command.h"
namespace brpc {
namespace policy {
int RedisAuthenticator::GenerateCredential(std::string* auth_str) const {
butil::IOBuf buf;
brpc::RedisCommandFormat(&buf, "AUTH %s", passwd_.c_str());
*auth_str = buf.to_string();
return 0;
}
} // namespace policy
} // namespace brpc
// Copyright (c) 2017 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author(s): Feng Yan <yanfeng@qiyi.com>
#ifndef BRPC_POLICY_REDIS_AUTHENTICATOR_H
#define BRPC_POLICY_REDIS_AUTHENTICATOR_H
#include "brpc/authenticator.h"
namespace brpc {
namespace policy {
// Request to redis for authentication.
class RedisAuthenticator : public Authenticator {
public:
RedisAuthenticator(const std::string& passwd)
: passwd_(passwd) {}
int GenerateCredential(std::string* auth_str) const;
int VerifyCredential(const std::string&, const butil::EndPoint&,
brpc::AuthContext*) const {
return 0;
}
private:
const std::string passwd_;
};
} // namespace policy
} // namespace brpc
#endif // BRPC_POLICY_COUCHBASE_AUTHENTICATOR_H
// Copyright (c) 2015 Baidu, Inc. // Copyright (c) 2015 Baidu, Inc.
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
// You may obtain a copy of the License at // You may obtain a copy of the License at
// //
// http://www.apache.org/licenses/LICENSE-2.0 // http://www.apache.org/licenses/LICENSE-2.0
// //
// Unless required by applicable law or agreed to in writing, software // Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, // distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
...@@ -69,20 +69,43 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket, ...@@ -69,20 +69,43 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
LOG(WARNING) << "No corresponding PipelinedInfo in socket"; LOG(WARNING) << "No corresponding PipelinedInfo in socket";
return MakeParseError(PARSE_ERROR_TRY_OTHERS); return MakeParseError(PARSE_ERROR_TRY_OTHERS);
} }
InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context());
if (msg == NULL) {
msg = new InputResponse;
socket->reset_parsing_context(msg);
}
if (!msg->response.ConsumePartialIOBuf(*source, pi.count)) { do {
socket->GivebackPipelinedInfo(pi); InputResponse* msg = static_cast<InputResponse*>(socket->parsing_context());
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); if (msg == NULL) {
} msg = new InputResponse;
CHECK_EQ((uint32_t)msg->response.reply_size(), pi.count); socket->reset_parsing_context(msg);
msg->id_wait = pi.id_wait; }
socket->release_parsing_context();
return MakeMessage(msg); const int consume_count = (pi.with_auth ? 1 : pi.count);
if (!msg->response.ConsumePartialIOBuf(*source, consume_count)) {
socket->GivebackPipelinedInfo(pi);
return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA);
}
if (pi.with_auth) {
if (msg->response.reply_size() != 1 ||
!(msg->response.reply(0).type() == brpc::REDIS_REPLY_STATUS &&
msg->response.reply(0).data().compare("OK") == 0)) {
LOG(ERROR) << "Redis Auth failed: " << msg->response;
return MakeParseError(PARSE_ERROR_NO_RESOURCE,
"Fail to authenticate with Redis");
}
DestroyingPtr<InputResponse> auth_msg(
static_cast<InputResponse*>(socket->release_parsing_context()));
pi.with_auth = false;
continue;
}
CHECK_EQ((uint32_t)msg->response.reply_size(), pi.count);
msg->id_wait = pi.id_wait;
socket->release_parsing_context();
return MakeMessage(msg);
} while(true);
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
} }
void ProcessRedisResponse(InputMessageBase* msg_base) { void ProcessRedisResponse(InputMessageBase* msg_base) {
...@@ -97,7 +120,7 @@ void ProcessRedisResponse(InputMessageBase* msg_base) { ...@@ -97,7 +120,7 @@ void ProcessRedisResponse(InputMessageBase* msg_base) {
<< "Fail to lock correlation_id=" << cid << ": " << berror(rc); << "Fail to lock correlation_id=" << cid << ": " << berror(rc);
return; return;
} }
ControllerPrivateAccessor accessor(cntl); ControllerPrivateAccessor accessor(cntl);
Span* span = accessor.span(); Span* span = accessor.span();
if (span) { if (span) {
...@@ -125,7 +148,7 @@ void ProcessRedisResponse(InputMessageBase* msg_base) { ...@@ -125,7 +148,7 @@ void ProcessRedisResponse(InputMessageBase* msg_base) {
} }
} }
} // silently ignore the response. } // silently ignore the response.
// Unlocks correlation_id inside. Revert controller's // Unlocks correlation_id inside. Revert controller's
// error code if it version check of `cid' fails // error code if it version check of `cid' fails
msg.reset(); // optional, just release resourse ASAP msg.reset(); // optional, just release resourse ASAP
...@@ -156,9 +179,20 @@ void PackRedisRequest(butil::IOBuf* buf, ...@@ -156,9 +179,20 @@ void PackRedisRequest(butil::IOBuf* buf,
SocketMessage**, SocketMessage**,
uint64_t /*correlation_id*/, uint64_t /*correlation_id*/,
const google::protobuf::MethodDescriptor*, const google::protobuf::MethodDescriptor*,
Controller*, Controller* cntl,
const butil::IOBuf& request, const butil::IOBuf& request,
const Authenticator* /*auth*/) { const Authenticator* auth) {
if (auth) {
std::string auth_str;
if (auth->GenerateCredential(&auth_str) != 0) {
return cntl->SetFailed(EREQUEST, "Fail to generate credential");
}
buf->append(auth_str);
ControllerPrivateAccessor(cntl).set_with_auth(true);
} else {
ControllerPrivateAccessor(cntl).set_with_auth(false);
}
buf->append(request); buf->append(request);
} }
......
...@@ -86,6 +86,7 @@ BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable, ...@@ -86,6 +86,7 @@ BRPC_VALIDATE_GFLAG(connect_timeout_as_unreachable,
validate_connect_timeout_as_unreachable); validate_connect_timeout_as_unreachable);
const int WAIT_EPOLLOUT_TIMEOUT_MS = 50; const int WAIT_EPOLLOUT_TIMEOUT_MS = 50;
static const uint32_t REDIS_AUTH_FLAG = (1ul << 15);
#ifdef BAIDU_INTERNAL #ifdef BAIDU_INTERNAL
#define BRPC_AUXTHREAD_ATTR \ #define BRPC_AUXTHREAD_ATTR \
...@@ -293,7 +294,7 @@ bool Socket::CreatedByConnect() const { ...@@ -293,7 +294,7 @@ bool Socket::CreatedByConnect() const {
} }
SocketMessage* const DUMMY_USER_MESSAGE = (SocketMessage*)0x1; SocketMessage* const DUMMY_USER_MESSAGE = (SocketMessage*)0x1;
const uint32_t MAX_PIPELINED_COUNT = 65536; const uint32_t MAX_PIPELINED_COUNT = 32768;
struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest { struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest {
static WriteRequest* const UNCONNECTED; static WriteRequest* const UNCONNECTED;
...@@ -316,7 +317,10 @@ struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest { ...@@ -316,7 +317,10 @@ struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest {
_pc_and_udmsg &= 0xFFFF000000000000ULL; _pc_and_udmsg &= 0xFFFF000000000000ULL;
} }
void set_pipelined_count_and_user_message( void set_pipelined_count_and_user_message(
uint32_t pc, SocketMessage* msg) { uint32_t pc, SocketMessage* msg, bool with_auth) {
if (pc != 0 && with_auth) {
pc |= REDIS_AUTH_FLAG;
}
_pc_and_udmsg = ((uint64_t)pc << 48) | (uint64_t)(uintptr_t)msg; _pc_and_udmsg = ((uint64_t)pc << 48) | (uint64_t)(uintptr_t)msg;
} }
...@@ -329,7 +333,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest { ...@@ -329,7 +333,7 @@ struct BAIDU_CACHELINE_ALIGNMENT Socket::WriteRequest {
// is already failed. // is already failed.
(void)msg->AppendAndDestroySelf(&dummy_buf, NULL); (void)msg->AppendAndDestroySelf(&dummy_buf, NULL);
} }
set_pipelined_count_and_user_message(0, NULL); set_pipelined_count_and_user_message(0, NULL, false);
return true; return true;
} }
return false; return false;
...@@ -367,7 +371,8 @@ void Socket::WriteRequest::Setup(Socket* s) { ...@@ -367,7 +371,8 @@ void Socket::WriteRequest::Setup(Socket* s) {
// which is common in cache servers: memcache, redis... // which is common in cache servers: memcache, redis...
// The struct will be popped when reading a message from the socket. // The struct will be popped when reading a message from the socket.
PipelinedInfo pi; PipelinedInfo pi;
pi.count = pc; pi.count = pc & (~REDIS_AUTH_FLAG);
pi.with_auth = pc & REDIS_AUTH_FLAG;
pi.id_wait = id_wait; pi.id_wait = id_wait;
clear_pipelined_count(); // avoid being pushed again clear_pipelined_count(); // avoid being pushed again
s->PushPipelinedInfo(pi); s->PushPipelinedInfo(pi);
...@@ -1456,7 +1461,7 @@ int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) { ...@@ -1456,7 +1461,7 @@ int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
req->next = WriteRequest::UNCONNECTED; req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait; req->id_wait = opt.id_wait;
req->set_pipelined_count_and_user_message( req->set_pipelined_count_and_user_message(
opt.pipelined_count, DUMMY_USER_MESSAGE); opt.pipelined_count, DUMMY_USER_MESSAGE, opt.with_auth);
return StartWrite(req, opt); return StartWrite(req, opt);
} }
...@@ -1491,7 +1496,7 @@ int Socket::Write(SocketMessagePtr<>& msg, const WriteOptions* options_in) { ...@@ -1491,7 +1496,7 @@ int Socket::Write(SocketMessagePtr<>& msg, const WriteOptions* options_in) {
// wait until it points to a valid WriteRequest or NULL. // wait until it points to a valid WriteRequest or NULL.
req->next = WriteRequest::UNCONNECTED; req->next = WriteRequest::UNCONNECTED;
req->id_wait = opt.id_wait; req->id_wait = opt.id_wait;
req->set_pipelined_count_and_user_message(opt.pipelined_count, msg.release()); req->set_pipelined_count_and_user_message(opt.pipelined_count, msg.release(), opt.with_auth);
return StartWrite(req, opt); return StartWrite(req, opt);
} }
......
...@@ -128,9 +128,11 @@ struct PipelinedInfo { ...@@ -128,9 +128,11 @@ struct PipelinedInfo {
PipelinedInfo() { reset(); } PipelinedInfo() { reset(); }
void reset() { void reset() {
count = 0; count = 0;
with_auth = false;
id_wait = INVALID_BTHREAD_ID; id_wait = INVALID_BTHREAD_ID;
} }
uint32_t count; uint32_t count;
bool with_auth;
bthread_id_t id_wait; bthread_id_t id_wait;
}; };
...@@ -217,9 +219,11 @@ public: ...@@ -217,9 +219,11 @@ public:
// Default: false // Default: false
bool ignore_eovercrowded; bool ignore_eovercrowded;
bool with_auth;
WriteOptions() WriteOptions()
: id_wait(INVALID_BTHREAD_ID), abstime(NULL) : id_wait(INVALID_BTHREAD_ID), abstime(NULL)
, pipelined_count(0), ignore_eovercrowded(false) {} , pipelined_count(0), ignore_eovercrowded(false), with_auth(false) {}
}; };
int Write(butil::IOBuf *msg, const WriteOptions* options = NULL); int Write(butil::IOBuf *msg, const WriteOptions* options = NULL);
......
...@@ -8,6 +8,7 @@ ...@@ -8,6 +8,7 @@
#include "butil/logging.h" #include "butil/logging.h"
#include <brpc/redis.h> #include <brpc/redis.h>
#include <brpc/channel.h> #include <brpc/channel.h>
#include <brpc/policy/redis_authenticator.h>
#include <gtest/gtest.h> #include <gtest/gtest.h>
namespace brpc { namespace brpc {
...@@ -297,5 +298,110 @@ TEST_F(RedisTest, by_components) { ...@@ -297,5 +298,110 @@ TEST_F(RedisTest, by_components) {
response2.MergeFrom(response); response2.MergeFrom(response);
AssertResponseEqual(response2, response, 2); AssertResponseEqual(response2, response, 2);
} }
TEST_F(RedisTest, auth) {
// config auth
{
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("0.0.0.0:6479", &options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
butil::StringPiece comp1[] = { "set", "passwd", "my_redis" };
butil::StringPiece comp2[] = { "config", "set", "requirepass", "my_redis" };
butil::StringPiece comp3[] = { "auth", "my_redis" };
butil::StringPiece comp4[] = { "get", "passwd" };
request.AddCommandByComponents(comp1, arraysize(comp1));
request.AddCommandByComponents(comp2, arraysize(comp2));
request.AddCommandByComponents(comp3, arraysize(comp3));
request.AddCommandByComponents(comp4, arraysize(comp4));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(4, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(0).type());
ASSERT_STREQ("OK", response.reply(0).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
ASSERT_STREQ("OK", response.reply(1).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(2).type());
ASSERT_STREQ("OK", response.reply(2).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(3).type());
ASSERT_STREQ("my_redis", response.reply(3).c_str());
}
// Auth failed
{
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("0.0.0.0:6479", &options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
butil::StringPiece comp1[] = { "get", "passwd" };
request.AddCommandByComponents(comp1, arraysize(comp1));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_ERROR, response.reply(0).type());
}
// Auth with RedisAuthenticator && clear auth
{
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
brpc::policy::RedisAuthenticator* auth =
new brpc::policy::RedisAuthenticator("my_redis");
options.auth = auth;
ASSERT_EQ(0, channel.Init("0.0.0.0:6479", &options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
butil::StringPiece comp1[] = { "get", "passwd" };
butil::StringPiece comp2[] = { "config", "set", "requirepass", "" };
request.AddCommandByComponents(comp1, arraysize(comp1));
request.AddCommandByComponents(comp2, arraysize(comp2));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(2, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
ASSERT_STREQ("my_redis", response.reply(0).c_str());
ASSERT_EQ(brpc::REDIS_REPLY_STATUS, response.reply(1).type());
ASSERT_STREQ("OK", response.reply(1).c_str());
}
// check noauth.
{
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_REDIS;
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("0.0.0.0:6479", &options));
brpc::RedisRequest request;
brpc::RedisResponse response;
brpc::Controller cntl;
butil::StringPiece comp1[] = { "get", "passwd" };
request.AddCommandByComponents(comp1, arraysize(comp1));
channel.CallMethod(NULL, &cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, response.reply_size());
ASSERT_EQ(brpc::REDIS_REPLY_STRING, response.reply(0).type());
ASSERT_STREQ("my_redis", response.reply(0).c_str());
}
}
} //namespace } //namespace
#endif // BAIDU_INTERNAL #endif // BAIDU_INTERNAL
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment