Unverified Commit 736f1bc7 authored by Ge Jun's avatar Ge Jun Committed by GitHub

Merge pull request #482 from brpc/ssl_supports_ns

SSL supports NS & connection_group support
parents 0f621581 db844fd9
......@@ -190,7 +190,7 @@ int main(int argc, char* argv[]) {
pthread_mutex_unlock(&g_latency_mutex);
const int64_t avg_latency = (latency_sum - last_latency_sum) /
std::max(nsuccess - last_counter, 1LL);
std::max(nsuccess - last_counter, (int64_t)1);
LOG(INFO) << "Sending EchoRequest at qps=" << nsuccess - last_counter
<< " latency=" << avg_latency;
last_counter = nsuccess;
......
......@@ -184,9 +184,9 @@ int main(int argc, char* argv[]) {
// Start the server.
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.ssl_options.default_cert.certificate = FLAGS_certificate;
options.ssl_options.default_cert.private_key = FLAGS_private_key;
options.ssl_options.ciphers = FLAGS_ciphers;
options.mutable_ssl_options()->default_cert.certificate = FLAGS_certificate;
options.mutable_ssl_options()->default_cert.private_key = FLAGS_private_key;
options.mutable_ssl_options()->ciphers = FLAGS_ciphers;
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start HttpServer";
return -1;
......
......@@ -95,7 +95,9 @@ int main(int argc, char* argv[]) {
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.ssl_options.enable = FLAGS_enable_ssl;
if (FLAGS_enable_ssl) {
options.mutable_ssl_options();
}
options.protocol = FLAGS_protocol;
options.connection_type = FLAGS_connection_type;
options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100);
......
......@@ -82,8 +82,8 @@ int main(int argc, char* argv[]) {
// Start the server.
brpc::ServerOptions options;
options.ssl_options.default_cert.certificate = "cert.pem";
options.ssl_options.default_cert.private_key = "key.pem";
options.mutable_ssl_options()->default_cert.certificate = "cert.pem";
options.mutable_ssl_options()->default_cert.private_key = "key.pem";
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;
options.internal_port = FLAGS_internal_port;
......
......@@ -192,7 +192,7 @@ int main(int argc, char* argv[]) {
pthread_mutex_unlock(&g_latency_mutex);
const int64_t avg_latency = (latency_sum - last_latency_sum) /
std::max(nsuccess - last_counter, 1LL);
std::max(nsuccess - last_counter, (int64_t)1);
LOG(INFO) << "Sending EchoRequest at qps=" << nsuccess - last_counter
<< " latency=" << avg_latency;
last_counter = nsuccess;
......
......@@ -33,7 +33,6 @@ DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
DEFINE_bool(enable_ssl, false, "Use SSL connection");
DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
std::string g_request;
......@@ -85,7 +84,6 @@ int main(int argc, char* argv[]) {
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.ssl_options.enable = FLAGS_enable_ssl;
options.protocol = brpc::PROTOCOL_THRIFT;
options.connection_type = FLAGS_connection_type;
options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100);
......
......@@ -44,8 +44,8 @@ Acceptor::~Acceptor() {
Join();
}
int Acceptor::StartAccept(
int listened_fd, int idle_timeout_sec, SSL_CTX* ssl_ctx) {
int Acceptor::StartAccept(int listened_fd, int idle_timeout_sec,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
if (listened_fd < 0) {
LOG(FATAL) << "Invalid listened_fd=" << listened_fd;
return -1;
......@@ -271,7 +271,7 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* acception) {
options.remote_side = butil::EndPoint(*(sockaddr_in*)&in_addr);
options.user = acception->user();
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
options.ssl_ctx = am->_ssl_ctx;
options.initial_ssl_ctx = am->_ssl_ctx;
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
continue;
......
......@@ -53,7 +53,8 @@ public:
// transmission for `idle_timeout_sec' will be closed automatically iff
// `idle_timeout_sec' > 0
// Return 0 on success, -1 otherwise.
int StartAccept(int listened_fd, int idle_timeout_sec, SSL_CTX* ssl_ctx);
int StartAccept(int listened_fd, int idle_timeout_sec,
const std::shared_ptr<SocketSSLContext>& ssl_ctx);
// [thread-safe] Stop accepting connections.
// `closewait_ms' is not used anymore.
......@@ -104,8 +105,7 @@ private:
// The map containing all the accepted sockets
SocketMap _socket_map;
// Not owner
SSL_CTX* _ssl_ctx;
std::shared_ptr<SocketSSLContext> _ssl_ctx;
};
} // namespace brpc
......
......@@ -192,10 +192,13 @@ void ConnectionsService::PrintConnections(
// slow (because we have many connections here).
int pref_index = ptr->preferred_index();
SocketUniquePtr first_sub;
int numfree = 0;
int numinflight = 0;
int pooled_count = -1;
if (ptr->fd() < 0) {
ptr->GetPooledSocketStats(&numfree, &numinflight);
int numfree = 0;
int numinflight = 0;
if (ptr->GetPooledSocketStats(&numfree, &numinflight)) {
pooled_count = numfree + numinflight;
}
// Check preferred_index of any pooled sockets.
ptr->ListPooledSockets(&first_id, 1);
if (!first_id.empty()) {
......@@ -263,11 +266,11 @@ void ConnectionsService::PrintConnections(
}
os << SSLStateToYesNo(ptr->ssl_state(), use_html) << bar;
char protname[32];
if (!ptr->CreatedByConnect()) {
if (pooled_count < 0) {
snprintf(protname, sizeof(protname), "%s", pref_prot);
} else {
snprintf(protname, sizeof(protname), "%s*%d", pref_prot,
numfree + numinflight);
pooled_count);
}
os << min_width(protname, 12) << bar;
if (ptr->fd() >= 0) {
......
......@@ -21,6 +21,8 @@
#include <gflags/gflags.h>
#include "butil/time.h" // milliseconds_from_now
#include "butil/logging.h"
#include "butil/third_party/murmurhash3/murmurhash3.h"
#include "butil/strings/string_util.h"
#include "bthread/unstable.h" // bthread_timer_add
#include "brpc/socket_map.h" // SocketMapInsert
#include "brpc/compress.h"
......@@ -32,7 +34,6 @@
#include "brpc/details/usercode_backup_pool.h" // TooManyUserCode
#include "brpc/policy/esp_authenticator.h"
namespace brpc {
DECLARE_bool(enable_rpcz);
......@@ -52,6 +53,77 @@ ChannelOptions::ChannelOptions()
, ns_filter(NULL)
{}
ChannelSSLOptions* ChannelOptions::mutable_ssl_options() {
if (!_ssl_options) {
_ssl_options.reset(new ChannelSSLOptions);
}
return _ssl_options.get();
}
static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) {
if (opt.auth == NULL &&
!opt.has_ssl_options() &&
opt.connection_group.empty()) {
// Returning zeroized result by default is more intuitive for users.
return ChannelSignature();
}
uint32_t seed = 0;
std::string buf;
buf.reserve(1024);
butil::MurmurHash3_x64_128_Context mm_ctx;
do {
buf.clear();
butil::MurmurHash3_x64_128_Init(&mm_ctx, seed);
if (!opt.connection_group.empty()) {
buf.append("|conng=");
buf.append(opt.connection_group);
}
if (opt.auth) {
buf.append("|auth=");
buf.append((char*)&opt.auth, sizeof(opt.auth));
}
if (opt.has_ssl_options()) {
const ChannelSSLOptions& ssl = opt.ssl_options();
buf.push_back('|');
buf.append(ssl.ciphers);
buf.push_back('|');
buf.append(ssl.protocols);
buf.push_back('|');
buf.append(ssl.sni_name);
const VerifyOptions& verify = ssl.verify;
buf.push_back('|');
buf.append((char*)&verify.verify_depth, sizeof(verify.verify_depth));
buf.push_back('|');
buf.append(verify.ca_file_path);
} else {
// All disabled ChannelSSLOptions are the same
}
butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size());
buf.clear();
if (opt.has_ssl_options()) {
const CertInfo& cert = opt.ssl_options().client_cert;
if (!cert.certificate.empty()) {
// Certificate may be too long (PEM string) to fit into `buf'
butil::MurmurHash3_x64_128_Update(
&mm_ctx, cert.certificate.data(), cert.certificate.size());
butil::MurmurHash3_x64_128_Update(
&mm_ctx, cert.private_key.data(), cert.private_key.size());
}
}
// sni_filters has no effect in ChannelSSLOptions
ChannelSignature result;
butil::MurmurHash3_x64_128_Final(result.data, &mm_ctx);
if (result != ChannelSignature()) {
// the empty result is reserved for default case and cannot
// be used, increment the seed and retry.
return result;
}
++seed;
} while (true);
}
Channel::Channel(ProfilerLinker)
: _server_id((SocketId)-1)
, _serialize_request(NULL)
......@@ -62,9 +134,8 @@ Channel::Channel(ProfilerLinker)
Channel::~Channel() {
if (_server_id != (SocketId)-1) {
SocketMapRemove(SocketMapKey(_server_address,
_options.ssl_options,
_options.auth));
const ChannelSignature sig = ComputeChannelSignature(_options);
SocketMapRemove(SocketMapKey(_server_address, sig));
}
}
......@@ -123,17 +194,13 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
if (_options.auth == NULL) {
_options.auth = policy::global_esp_authenticator();
}
} else if (_options.protocol == brpc::PROTOCOL_HTTP) {
if (_raw_server_address.compare(0, 5, "https") == 0) {
_options.ssl_options.enable = true;
if (_options.ssl_options.sni_name.empty()) {
int port;
ParseHostAndPortFromURL(_raw_server_address.c_str(),
&_options.ssl_options.sni_name, &port);
}
}
}
// Normalize connection_group
std::string& cg = _options.connection_group;
if (!cg.empty() && (::isspace(cg.front()) || ::isspace(cg.back()))) {
butil::TrimWhitespace(cg, butil::TRIM_ALL, &cg);
}
return 0;
}
......@@ -163,8 +230,7 @@ int Channel::Init(const char* server_addr_and_port,
return -1;
}
}
_raw_server_address.assign(server_addr_and_port);
return Init(point, options);
return InitSingle(point, server_addr_and_port, options);
}
int Channel::Init(const char* server_addr, int port,
......@@ -186,25 +252,58 @@ int Channel::Init(const char* server_addr, int port,
return -1;
}
}
_raw_server_address.assign(server_addr);
return Init(point, options);
return InitSingle(point, server_addr, options);
}
static int CreateSocketSSLContext(const ChannelOptions& options,
std::shared_ptr<SocketSSLContext>* ssl_ctx) {
if (options.has_ssl_options()) {
SSL_CTX* raw_ctx = CreateClientSSLContext(options.ssl_options());
if (!raw_ctx) {
LOG(ERROR) << "Fail to CreateClientSSLContext";
return -1;
}
*ssl_ctx = std::make_shared<SocketSSLContext>();
(*ssl_ctx)->raw_ctx = raw_ctx;
(*ssl_ctx)->sni_name = options.ssl_options().sni_name;
} else {
(*ssl_ctx) = NULL;
}
return 0;
}
int Channel::Init(butil::EndPoint server_addr_and_port,
const ChannelOptions* options) {
return InitSingle(server_addr_and_port, "", options);
}
int Channel::InitSingle(const butil::EndPoint& server_addr_and_port,
const char* raw_server_address,
const ChannelOptions* options) {
GlobalInitializeOrDie();
if (InitChannelOptions(options) != 0) {
return -1;
}
if (_options.protocol == brpc::PROTOCOL_HTTP &&
::strncmp(raw_server_address, "https://", 8) == 0) {
if (_options.mutable_ssl_options()->sni_name.empty()) {
ParseURL(raw_server_address,
NULL, &_options.mutable_ssl_options()->sni_name, NULL);
}
}
const int port = server_addr_and_port.port;
if (port < 0 || port > 65535) {
LOG(ERROR) << "Invalid port=" << port;
return -1;
}
_server_address = server_addr_and_port;
if (SocketMapInsert(SocketMapKey(server_addr_and_port,
_options.ssl_options,
_options.auth), &_server_id) != 0) {
const ChannelSignature sig = ComputeChannelSignature(_options);
std::shared_ptr<SocketSSLContext> ssl_ctx;
if (CreateSocketSSLContext(_options, &ssl_ctx) != 0) {
return -1;
}
if (SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
&_server_id, ssl_ctx) != 0) {
LOG(ERROR) << "Fail to insert into SocketMap";
return -1;
}
......@@ -222,6 +321,13 @@ int Channel::Init(const char* ns_url,
if (InitChannelOptions(options) != 0) {
return -1;
}
if (_options.protocol == brpc::PROTOCOL_HTTP &&
::strncmp(ns_url, "https://", 8) == 0) {
if (_options.mutable_ssl_options()->sni_name.empty()) {
ParseURL(ns_url,
NULL, &_options.mutable_ssl_options()->sni_name, NULL);
}
}
LoadBalancerWithNaming* lb = new (std::nothrow) LoadBalancerWithNaming;
if (NULL == lb) {
LOG(FATAL) << "Fail to new LoadBalancerWithNaming";
......@@ -230,6 +336,10 @@ int Channel::Init(const char* ns_url,
GetNamingServiceThreadOptions ns_opt;
ns_opt.succeed_without_server = _options.succeed_without_server;
ns_opt.log_succeed_without_server = _options.log_succeed_without_server;
ns_opt.channel_signature = ComputeChannelSignature(_options);
if (CreateSocketSSLContext(_options, &ns_opt.ssl_ctx) != 0) {
return -1;
}
if (lb->Init(ns_url, lb_name, _options.ns_filter, &ns_opt) != 0) {
LOG(ERROR) << "Fail to initialize LoadBalancerWithNaming";
delete lb;
......
......@@ -21,10 +21,11 @@
// To brpc developers: This is a header included by user, don't depend
// on internal structures, use opaque pointers instead.
#include <ostream> // std::ostream
#include "bthread/errno.h" // Redefine errno
#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr
#include "brpc/ssl_option.h" // ChannelSSLOptions
#include <ostream> // std::ostream
#include "bthread/errno.h" // Redefine errno
#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr
#include "butil/ptr_container.h"
#include "brpc/ssl_options.h" // ChannelSSLOptions
#include "brpc/channel_base.h" // ChannelBase
#include "brpc/adaptive_protocol_type.h" // AdaptiveProtocolType
#include "brpc/adaptive_connection_type.h" // AdaptiveConnectionType
......@@ -90,8 +91,10 @@ struct ChannelOptions {
bool log_succeed_without_server;
// SSL related options. Refer to `ChannelSSLOptions' for details
ChannelSSLOptions ssl_options;
bool has_ssl_options() const { return _ssl_options != NULL; }
const ChannelSSLOptions& ssl_options() const { return *_ssl_options.get(); }
ChannelSSLOptions* mutable_ssl_options();
// Turn on authentication for this channel if `auth' is not NULL.
// Note `auth' will not be deleted by channel and must remain valid when
// the channel is being used.
......@@ -99,9 +102,10 @@ struct ChannelOptions {
const Authenticator* auth;
// Customize the error code that should be retried. The interface is
// defined src/brpc/retry_policy.h
// defined in src/brpc/retry_policy.h
// This object is NOT owned by channel and should remain valid when
// channel is used.
// Default: NULL
const RetryPolicy* retry_policy;
// Filter ServerNodes (i.e. based on `tag' field of `ServerNode')
......@@ -109,7 +113,19 @@ struct ChannelOptions {
// in src/brpc/naming_service_filter.h
// This object is NOT owned by channel and should remain valid when
// channel is used.
// Default: NULL
const NamingServiceFilter* ns_filter;
// Channels with same connection_group share connections.
// In other words, set to a different value to stop sharing connections.
// Case-sensitive, leading and trailing spaces are ignored.
// Default: ""
std::string connection_group;
private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ChannelOptions from being bloated in most cases.
butil::PtrContainer<ChannelSSLOptions> _ssl_options;
};
// A Channel represents a communication line to one server or multiple servers
......@@ -188,8 +204,10 @@ protected:
static void CallMethodImpl(Controller* controller, SharedLoadBalancer* lb);
int InitChannelOptions(const ChannelOptions* options);
int InitSingle(const butil::EndPoint& server_addr_and_port,
const char* raw_server_address,
const ChannelOptions* options);
std::string _raw_server_address;
butil::EndPoint _server_address;
SocketId _server_id;
Protocol::SerializeRequest _serialize_request;
......
......@@ -28,17 +28,28 @@
namespace brpc {
struct NSKey {
const NamingService* ns;
std::string protocol;
std::string service_name;
ChannelSignature channel_signature;
NSKey(const std::string& prot_in,
const std::string& service_in,
const ChannelSignature& sig)
: protocol(prot_in), service_name(service_in), channel_signature(sig) {
}
};
struct NSKeyHasher {
size_t operator()(const NSKey& nskey) const {
return butil::DefaultHasher<std::string>()(nskey.service_name)
* 101 + (uintptr_t)nskey.ns;
size_t h = butil::DefaultHasher<std::string>()(nskey.protocol);
h = h * 101 + butil::DefaultHasher<std::string>()(nskey.service_name);
h = h * 101 + nskey.channel_signature.data[1];
return h;
}
};
inline bool operator==(const NSKey& k1, const NSKey& k2) {
return (k1.ns == k2.ns && k1.service_name == k2.service_name);
return k1.protocol == k2.protocol &&
k1.service_name == k2.service_name &&
k1.channel_signature == k2.channel_signature;
}
typedef butil::FlatMap<NSKey, NamingServiceThread*, NSKeyHasher> NamingServiceMap;
......@@ -58,7 +69,8 @@ NamingServiceThread::Actions::~Actions() {
// Remove all sockets from SocketMap
for (std::vector<ServerNode>::const_iterator it = _last_servers.begin();
it != _last_servers.end(); ++it) {
SocketMapRemove(SocketMapKey(it->addr));
const SocketMapKey key(*it, _owner->_options.channel_signature);
SocketMapRemove(key);
}
EndWait(0);
}
......@@ -110,7 +122,8 @@ void NamingServiceThread::Actions::ResetServers(
// TODO: For each unique SocketMapKey (i.e. SSL settings), insert a new
// Socket. SocketMapKey may be passed through AddWatcher. Make sure
// to pick those Sockets with the right settings during OnAddedServers
CHECK_EQ(SocketMapInsert(SocketMapKey(_added[i].addr), &tagged_id.id), 0);
const SocketMapKey key(_added[i], _owner->_options.channel_signature);
CHECK_EQ(0, SocketMapInsert(key, &tagged_id.id, _owner->_options.ssl_ctx));
_added_sockets.push_back(tagged_id);
}
......@@ -118,7 +131,8 @@ void NamingServiceThread::Actions::ResetServers(
for (size_t i = 0; i < _removed.size(); ++i) {
ServerNodeWithId tagged_id;
tagged_id.node = _removed[i];
CHECK_EQ(0, SocketMapFind(SocketMapKey(_removed[i].addr), &tagged_id.id));
const SocketMapKey key(_removed[i], _owner->_options.channel_signature);
CHECK_EQ(0, SocketMapFind(key, &tagged_id.id));
_removed_sockets.push_back(tagged_id);
}
......@@ -169,7 +183,8 @@ void NamingServiceThread::Actions::ResetServers(
for (size_t i = 0; i < _removed.size(); ++i) {
// TODO: Remove all Sockets that have the same address in SocketMapKey.peer
// We may need another data structure to avoid linear cost
SocketMapRemove(SocketMapKey(_removed[i].addr));
const SocketMapKey key(_removed[i], _owner->_options.channel_signature);
SocketMapRemove(key);
}
if (!_removed.empty() || !_added.empty()) {
......@@ -207,7 +222,6 @@ int NamingServiceThread::Actions::WaitForFirstBatchOfServers() {
NamingServiceThread::NamingServiceThread()
: _tid(0)
, _source_ns(NULL)
, _ns(NULL)
, _actions(this) {
}
......@@ -215,8 +229,8 @@ NamingServiceThread::NamingServiceThread()
NamingServiceThread::~NamingServiceThread() {
RPC_VLOG << "~NamingServiceThread(" << *this << ')';
// Remove from g_nsthread_map first
if (_source_ns != NULL) {
const NSKey key = { _source_ns, _service_name };
if (!_protocol.empty()) {
const NSKey key(_protocol, _service_name, _options.channel_signature);
std::unique_lock<pthread_mutex_t> mu(g_nsthread_map_mutex);
if (g_nsthread_map != NULL) {
NamingServiceThread** ptr = g_nsthread_map->seek(key);
......@@ -255,15 +269,16 @@ void* NamingServiceThread::RunThis(void* arg) {
return NULL;
}
int NamingServiceThread::Start(const NamingService* naming_service,
int NamingServiceThread::Start(NamingService* naming_service,
const std::string& protocol,
const std::string& service_name,
const GetNamingServiceThreadOptions* opt_in) {
if (naming_service == NULL) {
LOG(ERROR) << "Param[naming_service] is NULL";
return -1;
}
_source_ns = naming_service;
_ns = naming_service->New();
_ns = naming_service;
_protocol = protocol;
_service_name = service_name;
if (opt_in) {
_options = *opt_in;
......@@ -400,14 +415,13 @@ int GetNamingServiceThread(
LOG(ERROR) << "Invalid naming service url=" << url;
return -1;
}
const NamingService* ns = NamingServiceExtension()->Find(protocol);
if (ns == NULL) {
const NamingService* source_ns = NamingServiceExtension()->Find(protocol);
if (source_ns == NULL) {
LOG(ERROR) << "Unknown protocol=" << protocol;
return -1;
}
NSKey key;
key.ns = ns;
key.service_name = service_name;
const NSKey key(protocol, service_name,
(options ? options->channel_signature : ChannelSignature()));
bool new_thread = false;
butil::intrusive_ptr<NamingServiceThread> nsthread;
{
......@@ -452,7 +466,7 @@ int GetNamingServiceThread(
}
}
if (new_thread) {
if (nsthread->Start(ns, key.service_name, options) != 0) {
if (nsthread->Start(source_ns->New(), key.protocol, key.service_name, options) != 0) {
LOG(ERROR) << "Fail to start NamingServiceThread";
std::unique_lock<pthread_mutex_t> mu(g_nsthread_map_mutex);
g_nsthread_map->erase(key);
......
......@@ -24,7 +24,7 @@
#include "brpc/shared_object.h" // SharedObject
#include "brpc/naming_service.h" // NamingService
#include "brpc/naming_service_filter.h" // NamingServiceFilter
#include "brpc/socket_map.h"
namespace brpc {
......@@ -46,6 +46,8 @@ struct GetNamingServiceThreadOptions {
bool succeed_without_server;
bool log_succeed_without_server;
ChannelSignature channel_signature;
std::shared_ptr<SocketSSLContext> ssl_ctx;
};
// A dedicated thread to map a name to ServerIds
......@@ -86,7 +88,9 @@ public:
NamingServiceThread();
~NamingServiceThread();
int Start(const NamingService* ns, const std::string& service_name,
int Start(NamingService* ns,
const std::string& protocol,
const std::string& service_name,
const GetNamingServiceThreadOptions* options);
int WaitForFirstBatchOfServers();
......@@ -106,9 +110,8 @@ private:
butil::Mutex _mutex;
bthread_t _tid;
// TODO: better use a name.
const NamingService* _source_ns;
NamingService* _ns;
std::string _protocol;
std::string _service_name;
GetNamingServiceThreadOptions _options;
std::vector<ServerNodeWithId> _last_sockets;
......
......@@ -437,10 +437,6 @@ static int SetSSLOptions(SSL_CTX* ctx, const std::string& ciphers,
}
SSL_CTX* CreateClientSSLContext(const ChannelSSLOptions& options) {
if (!options.enable) {
return NULL;
}
std::unique_ptr<SSL_CTX, FreeSSLCTX> ssl_ctx(
SSL_CTX_new(SSLv23_client_method()));
if (!ssl_ctx) {
......@@ -770,51 +766,66 @@ int SSLDHInit() {
return 0;
}
} // namespace brpc
std::ostream& operator<<(std::ostream& os, SSL* ssl) {
os << "[SSL HANDSHAKE]"
<< "\n* cipher: " << SSL_get_cipher(ssl)
<< "\n* protocol: " << SSL_get_version(ssl)
<< "\n* verify: " << (SSL_get_verify_mode(ssl) & SSL_VERIFY_PEER
? "success" : "none")
<< "\n";
static std::string GetNextLevelSeparator(const char* sep) {
if (sep[0] != '\n') {
return sep;
}
const size_t left_len = strlen(sep + 1);
if (left_len == 0) {
return "\n ";
}
std::string new_sep;
new_sep.reserve(left_len * 2 + 1);
new_sep.append(sep, left_len + 1);
new_sep.append(sep + 1, left_len);
return new_sep;
}
void Print(std::ostream& os, SSL* ssl, const char* sep) {
os << "cipher=" << SSL_get_cipher(ssl) << sep
<< "protocol=" << SSL_get_version(ssl) << sep
<< "verify=" << (SSL_get_verify_mode(ssl) & SSL_VERIFY_PEER
? "success" : "none");
X509* cert = SSL_get_peer_certificate(ssl);
if (cert) {
os << "\n" << cert;
os << sep << "peer_certificate={";
const std::string new_sep = GetNextLevelSeparator(sep);
if (sep[0] == '\n') {
os << new_sep;
}
Print(os, cert, new_sep.c_str());
if (sep[0] == '\n') {
os << sep;
}
os << '}';
}
return os;
}
std::ostream& operator<<(std::ostream& os, X509* cert) {
void Print(std::ostream& os, X509* cert, const char* sep) {
BIO* buf = BIO_new(BIO_s_mem());
if (buf == NULL) {
return os;
return;
}
BIO_printf(buf, "[CERTIFICATE]");
BIO_printf(buf, "\n* subject: ");
BIO_printf(buf, "subject=");
X509_NAME_print(buf, X509_get_subject_name(cert), 0);
BIO_printf(buf, "\n* start date: ");
BIO_printf(buf, "%sstart_date=", sep);
ASN1_TIME_print(buf, X509_get_notBefore(cert));
BIO_printf(buf, "\n* expire date: ");
BIO_printf(buf, "%sexpire_date=", sep);
ASN1_TIME_print(buf, X509_get_notAfter(cert));
BIO_printf(buf, "\n* common name: ");
BIO_printf(buf, "%scommon_name=", sep);
std::vector<std::string> hostnames;
brpc::ExtractHostnames(cert, &hostnames);
for (size_t i = 0; i < hostnames.size(); ++i) {
BIO_printf(buf, "%s; ", hostnames[i].c_str());
BIO_printf(buf, "%s;", hostnames[i].c_str());
}
BIO_printf(buf, "\n* issuer: ");
BIO_printf(buf, "%sissuer=", sep);
X509_NAME_print(buf, X509_get_issuer_name(cert), 0);
BIO_printf(buf, "\n");
char* bufp = NULL;
int len = BIO_get_mem_data(buf, &bufp);
os << butil::StringPiece(bufp, len);
return os;
}
} // namespace brpc
......@@ -22,8 +22,7 @@
// For some versions of openssl, SSL_* are defined inside this header
#include <openssl/ossl_typ.h>
#include "brpc/socket_id.h" // SocketId
#include "brpc/ssl_option.h" // SSLOptions
#include "brpc/ssl_options.h" // ServerSSLOptions
namespace brpc {
......@@ -76,7 +75,7 @@ SSL_CTX* CreateClientSSLContext(const ChannelSSLOptions& options);
// fields into `hostnames'
SSL_CTX* CreateServerSSLContext(const std::string& certificate_file,
const std::string& private_key_file,
const SSLOptions& options,
const ServerSSLOptions& options,
std::vector<std::string>* hostnames);
// Create a new SSL (per connection object) using configurations in `ctx'.
......@@ -92,9 +91,9 @@ void AddBIOBuffer(SSL* ssl, int fd, int bufsize);
// set to indicate the reason (0 for EOF)
SSLState DetectSSLState(int fd, int* error_code);
} // namespace brpc
void Print(std::ostream& os, SSL* ssl, const char* sep);
void Print(std::ostream& os, X509* cert, const char* sep);
std::ostream& operator<<(std::ostream& os, SSL* ssl);
std::ostream& operator<<(std::ostream& os, X509* cert);
} // namespace brpc
#endif // BRPC_SSL_HELPER_H
......@@ -333,8 +333,9 @@ static void GlobalInitializeOrDieImpl() {
#endif
NamingServiceExtension()->RegisterOrDie("file", &g_ext->fns);
NamingServiceExtension()->RegisterOrDie("list", &g_ext->lns);
NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("https", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("redis", &g_ext->dns);
NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns);
NamingServiceExtension()->RegisterOrDie("consul", &g_ext->cns);
......
......@@ -20,29 +20,15 @@
#include <vector> // std::vector
#include <string> // std::string
#include <ostream> // std::ostream
#include "butil/endpoint.h" // butil::EndPoint
#include "butil/macros.h" // BAIDU_CONCAT
#include "butil/endpoint.h" // butil::EndPoint
#include "butil/macros.h" // BAIDU_CONCAT
#include "brpc/describable.h"
#include "brpc/destroyable.h"
#include "brpc/extension.h" // Extension<T>
#include "brpc/extension.h" // Extension<T>
#include "brpc/server_node.h" // ServerNode
namespace brpc {
// Representing a server inside a NamingService.
struct ServerNode {
ServerNode() {}
ServerNode(butil::ip_t ip, int port, const std::string& tag2)
: addr(ip, port), tag(tag2) {}
ServerNode(const butil::EndPoint& pt, const std::string& tag2)
: addr(pt), tag(tag2) {}
ServerNode(butil::ip_t ip, int port) : addr(ip, port) {}
explicit ServerNode(const butil::EndPoint& pt) : addr(pt) {}
butil::EndPoint addr;
std::string tag;
};
// Continuing actions to added/removed servers.
// NOTE: You don't have to implement this class.
class NamingServiceActions {
......@@ -84,21 +70,6 @@ inline Extension<const NamingService>* NamingServiceExtension() {
return Extension<const NamingService>::instance();
}
inline bool operator<(const ServerNode& n1, const ServerNode& n2)
{ return n1.addr != n2.addr ? (n1.addr < n2.addr) : (n1.tag < n2.tag); }
inline bool operator==(const ServerNode& n1, const ServerNode& n2)
{ return n1.addr == n2.addr && n1.tag == n2.tag; }
inline bool operator!=(const ServerNode& n1, const ServerNode& n2)
{ return !(n1 == n2); }
inline std::ostream& operator<<(std::ostream& os, const ServerNode& n) {
os << n.addr;
if (!n.tag.empty()) {
os << "(tag=" << n.tag << ')';
}
return os;
}
} // namespace brpc
#endif // BRPC_NAMING_SERVICE_H
......@@ -1297,14 +1297,28 @@ void ProcessHttpRequest(InputMessageBase *msg) {
}
bool ParseHttpServerAddress(butil::EndPoint* point, const char* server_addr_and_port) {
std::string schema;
std::string host;
int port = -1;
if (ParseHostAndPortFromURL(server_addr_and_port, &host, &port) != 0) {
if (ParseURL(server_addr_and_port, &schema, &host, &port) != 0) {
LOG(ERROR) << "Invalid address=`" << server_addr_and_port << '\'';
return false;
}
if (schema.empty() || schema == "http") {
if (port < 0) {
port = 80;
}
} else if (schema == "https") {
if (port < 0) {
port = 443;
}
} else {
LOG(ERROR) << "Invalid schema=`" << schema << '\'';
return false;
}
if (str2endpoint(host.c_str(), port, point) != 0 &&
hostname2endpoint(host.c_str(), port, point) != 0) {
LOG(ERROR) << "Invalid address=`" << host << '\'';
LOG(ERROR) << "Invalid host=" << host << " port=" << port;
return false;
}
return true;
......
......@@ -488,7 +488,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
if (!method_status->OnRequested()) {
cntl->SetFailed(ELIMIT, "Reached %s's max_concurrency=%d",
cntl->thrift_method_name().c_str(),
method_status->max_concurrency());
method_status->MaxConcurrency());
return thrift_done->Run();
}
}
......@@ -522,7 +522,7 @@ void ProcessThriftRequest(InputMessageBase* msg_base) {
}
if (!server_accessor.AddConcurrency(cntl)) {
cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d",
server->max_concurrency());
server->options().max_concurrency);
break;
}
if (FLAGS_usercode_in_pthread && TooManyUserCode()) {
......
......@@ -1683,7 +1683,7 @@ void RtmpClientStream::CleanupSocketForStream(
Socket* prev_sock, Controller*, int /*error_code*/) {
if (prev_sock) {
if (_from_socketmap) {
_client_impl->socket_map().Remove(prev_sock->remote_side(),
_client_impl->socket_map().Remove(SocketMapKey(prev_sock->remote_side()),
prev_sock->id());
} else {
prev_sock->SetFailed(); // not necessary, already failed.
......@@ -1888,7 +1888,7 @@ void RtmpClientStream::OnStopInternal() {
LOG(FATAL) << "RtmpContext of " << *_rtmpsock << " is NULL";
}
if (_from_socketmap) {
_client_impl->socket_map().Remove(_rtmpsock->remote_side(),
_client_impl->socket_map().Remove(SocketMapKey(_rtmpsock->remote_side()),
_rtmpsock->id());
} else {
_rtmpsock->ReleaseAdditionalReference();
......
......@@ -145,6 +145,13 @@ ServerOptions::ServerOptions()
}
}
ServerSSLOptions* ServerOptions::mutable_ssl_options() {
if (!_ssl_options) {
_ssl_options.reset(new ServerSSLOptions);
}
return _ssl_options.get();
}
Server::MethodProperty::OpaqueParams::OpaqueParams()
: is_tabbed(false)
, allow_http_body_to_pb(true)
......@@ -840,14 +847,18 @@ int Server::StartInternal(const butil::ip_t& ip,
// Free last SSL contexts
FreeSSLContexts();
CertInfo& default_cert = _options.ssl_options.default_cert;
if (!default_cert.certificate.empty()) {
if (_options.has_ssl_options()) {
CertInfo& default_cert = _options.mutable_ssl_options()->default_cert;
if (default_cert.certificate.empty()) {
LOG(ERROR) << "default_cert is empty";
return -1;
}
if (AddCertificate(default_cert) != 0) {
return -1;
}
_default_ssl_ctx = _ssl_ctx_map.begin()->second.ctx;
const std::vector<CertInfo>& certs = _options.ssl_options.certs;
const std::vector<CertInfo>& certs = _options.mutable_ssl_options()->certs;
for (size_t i = 0; i < certs.size(); ++i) {
if (AddCertificate(certs[i]) != 0) {
return -1;
......@@ -1791,6 +1802,10 @@ Server::FindServicePropertyByName(const butil::StringPiece& name) const {
}
int Server::AddCertificate(const CertInfo& cert) {
if (!_options.has_ssl_options()) {
LOG(ERROR) << "ServerOptions.ssl_options is not configured yet";
return -1;
}
std::string cert_key(cert.certificate);
cert_key.append(cert.private_key);
if (_ssl_ctx_map.seek(cert_key) != NULL) {
......@@ -1800,15 +1815,17 @@ int Server::AddCertificate(const CertInfo& cert) {
SSLContext ssl_ctx;
ssl_ctx.filters = cert.sni_filters;
ssl_ctx.ctx = CreateServerSSLContext(cert.certificate, cert.private_key,
_options.ssl_options, &ssl_ctx.filters);
if (ssl_ctx.ctx == NULL) {
ssl_ctx.ctx = std::make_shared<SocketSSLContext>();
SSL_CTX* raw_ctx = CreateServerSSLContext(cert.certificate, cert.private_key,
_options.ssl_options(), &ssl_ctx.filters);
if (raw_ctx == NULL) {
return -1;
}
ssl_ctx.ctx->raw_ctx = raw_ctx;
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
SSL_CTX_set_tlsext_servername_callback(ssl_ctx.ctx, SSLSwitchCTXByHostname);
SSL_CTX_set_tlsext_servername_arg(ssl_ctx.ctx, this);
SSL_CTX_set_tlsext_servername_callback(ssl_ctx.ctx->raw_ctx, SSLSwitchCTXByHostname);
SSL_CTX_set_tlsext_servername_arg(ssl_ctx.ctx->raw_ctx, this);
#endif
if (!_reload_cert_maps.Modify(AddCertMapping, ssl_ctx)) {
......@@ -1850,6 +1867,10 @@ bool Server::AddCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
}
int Server::RemoveCertificate(const CertInfo& cert) {
if (!_options.has_ssl_options()) {
LOG(ERROR) << "ServerOptions.ssl_options is not configured yet";
return -1;
}
std::string cert_key(cert.certificate);
cert_key.append(cert.private_key);
SSLContext* ssl_ctx = _ssl_ctx_map.seek(cert_key);
......@@ -1868,8 +1889,6 @@ int Server::RemoveCertificate(const CertInfo& cert) {
return -1;
}
// After a successful Modify, now it's safe to erase SSLContext
SSL_CTX_free(ssl_ctx->ctx);
_ssl_ctx_map.erase(cert_key);
return 0;
}
......@@ -1884,7 +1903,7 @@ bool Server::RemoveCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
} else {
cmap = &(bg.cert_map);
}
SSL_CTX** ctx = cmap->seek(hostname);
std::shared_ptr<SocketSSLContext>* ctx = cmap->seek(hostname);
if (ctx != NULL && *ctx == ssl_ctx.ctx) {
cmap->erase(hostname);
}
......@@ -1893,6 +1912,11 @@ bool Server::RemoveCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
}
int Server::ResetCertificates(const std::vector<CertInfo>& certs) {
if (!_options.has_ssl_options()) {
LOG(ERROR) << "ServerOptions.ssl_options is not configured yet";
return -1;
}
SSLContextMap tmp_map;
if (tmp_map.init(INITIAL_CERT_MAP) != 0) {
LOG(ERROR) << "Fail to initialize tmp_map";
......@@ -1901,8 +1925,8 @@ int Server::ResetCertificates(const std::vector<CertInfo>& certs) {
// Add default certficiate into tmp_map first since it can't be reloaded
std::string default_cert_key =
_options.ssl_options.default_cert.certificate
+ _options.ssl_options.default_cert.private_key;
_options.ssl_options().default_cert.certificate
+ _options.ssl_options().default_cert.private_key;
tmp_map[default_cert_key] = _ssl_ctx_map[default_cert_key];
for (size_t i = 0; i < certs.size(); ++i) {
......@@ -1915,28 +1939,26 @@ int Server::ResetCertificates(const std::vector<CertInfo>& certs) {
SSLContext ssl_ctx;
ssl_ctx.filters = certs[i].sni_filters;
ssl_ctx.ctx = CreateServerSSLContext(
ssl_ctx.ctx = std::make_shared<SocketSSLContext>();
ssl_ctx.ctx->raw_ctx = CreateServerSSLContext(
certs[i].certificate, certs[i].private_key,
_options.ssl_options, &ssl_ctx.filters);
if (ssl_ctx.ctx == NULL) {
FreeSSLContextMap(tmp_map, true);
_options.ssl_options(), &ssl_ctx.filters);
if (ssl_ctx.ctx->raw_ctx == NULL) {
return -1;
}
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
SSL_CTX_set_tlsext_servername_callback(ssl_ctx.ctx, SSLSwitchCTXByHostname);
SSL_CTX_set_tlsext_servername_arg(ssl_ctx.ctx, this);
SSL_CTX_set_tlsext_servername_callback(ssl_ctx.ctx->raw_ctx, SSLSwitchCTXByHostname);
SSL_CTX_set_tlsext_servername_arg(ssl_ctx.ctx->raw_ctx, this);
#endif
tmp_map[cert_key] = ssl_ctx;
}
if (!_reload_cert_maps.Modify(ResetCertMappings, tmp_map)) {
FreeSSLContextMap(tmp_map, true);
return -1;
}
_ssl_ctx_map.swap(tmp_map);
FreeSSLContextMap(tmp_map, true);
return 0;
}
......@@ -1976,19 +1998,8 @@ bool Server::ResetCertMappings(CertMaps& bg, const SSLContextMap& ctx_map) {
return true;
}
void Server::FreeSSLContextMap(SSLContextMap& ctx_map, bool keep_default) {
for (SSLContextMap::iterator it =
ctx_map.begin(); it != ctx_map.end(); ++it) {
if (keep_default && it->second.ctx == _default_ssl_ctx) {
continue;
}
SSL_CTX_free(it->second.ctx);
}
ctx_map.clear();
}
void Server::FreeSSLContexts() {
FreeSSLContextMap(_ssl_ctx_map, false);
_ssl_ctx_map.clear();
_reload_cert_maps.Modify(ClearCertMapping);
_default_ssl_ctx = NULL;
}
......@@ -2082,7 +2093,7 @@ int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
int* al, Server* server) {
(void)al;
const char* hostname = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name);
bool strict_sni = server->_options.ssl_options.strict_sni;
bool strict_sni = server->_options.ssl_options().strict_sni;
if (hostname == NULL) {
return strict_sni ? SSL_TLSEXT_ERR_ALERT_FATAL : SSL_TLSEXT_ERR_NOACK;
}
......@@ -2092,7 +2103,7 @@ int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
return SSL_TLSEXT_ERR_ALERT_FATAL;
}
SSL_CTX** pctx = s->cert_map.seek(hostname);
std::shared_ptr<SocketSSLContext>* pctx = s->cert_map.seek(hostname);
if (pctx == NULL) {
const char* dot = hostname;
for (; *dot != '\0'; ++dot) {
......@@ -2114,7 +2125,7 @@ int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
}
// Switch SSL_CTX to the one with correct hostname
SSL_set_SSL_CTX(ssl, *pctx);
SSL_set_SSL_CTX(ssl, (*pctx)->raw_ctx);
return SSL_TLSEXT_ERR_OK;
}
#endif // SSL_CTRL_SET_TLSEXT_HOSTNAME
......
......@@ -24,13 +24,14 @@
#include "bthread/errno.h" // Redefine errno
#include "bthread/bthread.h" // Server may need some bthread functions,
// e.g. bthread_usleep
#include <google/protobuf/service.h> // google::protobuf::Service
#include <google/protobuf/service.h> // google::protobuf::Service
#include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN
#include "butil/containers/doubly_buffered_data.h" // DoublyBufferedData
#include "bvar/bvar.h"
#include "butil/containers/case_ignored_flat_map.h" // [CaseIgnored]FlatMap
#include "butil/ptr_container.h"
#include "brpc/controller.h" // brpc::Controller
#include "brpc/ssl_option.h" // ServerSSLOptions
#include "brpc/ssl_options.h" // ServerSSLOptions
#include "brpc/describable.h" // User often needs this
#include "brpc/data_factory.h" // DataFactory
#include "brpc/builtin/tabbed.h"
......@@ -38,10 +39,6 @@
#include "brpc/health_reporter.h"
#include "brpc/adaptive_max_concurrency.h"
extern "C" {
struct ssl_ctx_st;
}
namespace brpc {
class Acceptor;
......@@ -52,6 +49,7 @@ class SimpleDataPool;
class MongoServiceAdaptor;
class RestfulMap;
class RtmpService;
class SocketSSLContext;
struct ServerOptions {
// Constructed with default options.
......@@ -202,7 +200,9 @@ struct ServerOptions {
bool security_mode() const { return internal_port >= 0 || !has_builtin_services; }
// SSL related options. Refer to `ServerSSLOptions' for details
ServerSSLOptions ssl_options;
bool has_ssl_options() const { return _ssl_options != NULL; }
const ServerSSLOptions& ssl_options() const { return *_ssl_options.get(); }
ServerSSLOptions* mutable_ssl_options();
// [CAUTION] This option is for implementing specialized http proxies,
// most users don't need it. Don't change this option unless you fully
......@@ -228,6 +228,11 @@ struct ServerOptions {
// All names inside must be valid, check protocols name in global.cpp
// Default: empty (all protocols)
std::string enabled_protocols;
private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ServerOptions from being bloated in most cases.
butil::PtrContainer<ServerSSLOptions> _ssl_options;
};
// This struct is originally designed to contain basic statistics of the
......@@ -573,21 +578,20 @@ friend class Controller;
std::string ServerPrefix() const;
// Mapping from hostname to corresponding SSL_CTX
typedef butil::CaseIgnoredFlatMap<struct ssl_ctx_st*> CertMap;
typedef butil::CaseIgnoredFlatMap<std::shared_ptr<SocketSSLContext> > CertMap;
struct CertMaps {
CertMap cert_map;
CertMap wildcard_cert_map;
};
struct SSLContext {
struct ssl_ctx_st* ctx;
std::shared_ptr<SocketSSLContext> ctx;
std::vector<std::string> filters;
};
// Mapping from [certficate + private-key] to SSLContext
typedef butil::FlatMap<std::string, SSLContext> SSLContextMap;
void FreeSSLContexts();
void FreeSSLContextMap(SSLContextMap& ctx_map, bool keep_default);
static int SSLSwitchCTXByHostname(struct ssl_st* ssl,
int* al, Server* server);
......@@ -636,7 +640,7 @@ friend class Controller;
RestfulMap* _global_restful_map;
// Default certficate which can't be reloaded
struct ssl_ctx_st* _default_ssl_ctx;
std::shared_ptr<SocketSSLContext> _default_ssl_ctx;
// Reloadable SSL mappings
butil::DoublyBufferedData<CertMaps> _reload_cert_maps;
......
// Copyright (c) 2014 Baidu, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Authors: Ge,Jun (gejun@baidu.com)
#ifndef BRPC_SERVER_NODE_H
#define BRPC_SERVER_NODE_H
#include <string>
#include "butil/endpoint.h"
namespace brpc {
// Representing a server inside a NamingService.
struct ServerNode {
ServerNode() {}
explicit ServerNode(const butil::EndPoint& pt) : addr(pt) {}
ServerNode(butil::ip_t ip, int port, const std::string& tag2)
: addr(ip, port), tag(tag2) {}
ServerNode(const butil::EndPoint& pt, const std::string& tag2)
: addr(pt), tag(tag2) {}
ServerNode(butil::ip_t ip, int port) : addr(ip, port) {}
butil::EndPoint addr;
std::string tag;
};
inline bool operator<(const ServerNode& n1, const ServerNode& n2)
{ return n1.addr != n2.addr ? (n1.addr < n2.addr) : (n1.tag < n2.tag); }
inline bool operator==(const ServerNode& n1, const ServerNode& n2)
{ return n1.addr == n2.addr && n1.tag == n2.tag; }
inline bool operator!=(const ServerNode& n1, const ServerNode& n2)
{ return !(n1 == n2); }
inline std::ostream& operator<<(std::ostream& os, const ServerNode& n) {
os << n.addr;
if (!n.tag.empty()) {
os << "(tag=" << n.tag << ')';
}
return os;
}
} // namespace brpc
#endif // BRPC_SERVER_NODE_H
This diff is collapsed.
......@@ -38,7 +38,6 @@
#include "brpc/socket_id.h" // SocketId
#include "brpc/socket_message.h" // SocketMessagePtr
namespace brpc {
namespace policy {
class ConsistentHashingLoadBalancer;
......@@ -137,6 +136,14 @@ struct PipelinedInfo {
bthread_id_t id_wait;
};
struct SocketSSLContext {
SocketSSLContext();
~SocketSSLContext();
SSL_CTX* raw_ctx; // owned
std::string sni_name; // useful for clients
};
// TODO: Comment fields
struct SocketOptions {
SocketOptions();
......@@ -155,9 +162,7 @@ struct SocketOptions {
// one thread at any time.
void (*on_edge_triggered_events)(Socket*);
int health_check_interval_s;
bool owns_ssl_ctx;
SSL_CTX* ssl_ctx;
std::string sni_name;
std::shared_ptr<SocketSSLContext> initial_ssl_ctx;
bthread_keytable_pool_t* keytable_pool;
SocketConnection* conn;
AppConnect* app_connect;
......
......@@ -54,8 +54,6 @@ inline SocketOptions::SocketOptions()
, user(NULL)
, on_edge_triggered_events(NULL)
, health_check_interval_s(-1)
, owns_ssl_ctx(false)
, ssl_ctx(NULL)
, keytable_pool(NULL)
, conn(NULL)
, app_connect(NULL)
......
......@@ -21,14 +21,11 @@
#include "butil/time.h"
#include "butil/scoped_lock.h"
#include "butil/logging.h"
#include "butil/third_party/murmurhash3/murmurhash3.h"
#include "brpc/log.h"
#include "brpc/protocol.h"
#include "brpc/input_messenger.h"
#include "brpc/reloadable_flags.h"
#include "brpc/socket_map.h"
#include "brpc/details/ssl_helper.h" // CreateClientSSLContext
namespace brpc {
......@@ -88,62 +85,9 @@ SocketMap* get_or_new_client_side_socket_map() {
return g_socket_map.load(butil::memory_order_consume);
}
void ComputeSocketMapKeyChecksum(const SocketMapKey& key,
unsigned char* checksum) {
butil::MurmurHash3_x64_128_Context mm_ctx;
butil::MurmurHash3_x64_128_Init(&mm_ctx, 0);
const int BUFSIZE = 1024; // Should be enough
char buf[BUFSIZE];
int cur_len = 0;
#define SAFE_MEMCOPY(dst, cur_len, src, size) \
do { \
int copy_len = std::min((int)size, BUFSIZE - cur_len); \
if (copy_len > 0) { \
memcpy(dst + cur_len, src, copy_len); \
cur_len += copy_len; \
} \
} while (0);
std::size_t ephash = butil::DefaultHasher<butil::EndPoint>()(key.peer);
SAFE_MEMCOPY(buf, cur_len, &ephash, sizeof(ephash));
SAFE_MEMCOPY(buf, cur_len, &key.auth, sizeof(key.auth));
const ChannelSSLOptions& ssl = key.ssl_options;
SAFE_MEMCOPY(buf, cur_len, &ssl.enable, sizeof(ssl.enable));
if (ssl.enable) {
SAFE_MEMCOPY(buf, cur_len, ssl.ciphers.data(), ssl.ciphers.size());
SAFE_MEMCOPY(buf, cur_len, ssl.protocols.data(), ssl.protocols.size());
SAFE_MEMCOPY(buf, cur_len, ssl.sni_name.data(), ssl.sni_name.size());
const VerifyOptions& verify = ssl.verify;
SAFE_MEMCOPY(buf, cur_len, &verify.verify_depth,
sizeof(verify.verify_depth));
if (verify.verify_depth > 0) {
SAFE_MEMCOPY(buf, cur_len, verify.ca_file_path.data(),
verify.ca_file_path.size());
}
} else {
// All disabled ChannelSSLOptions are the same
}
#undef SAFE_MEMCOPY
butil::MurmurHash3_x64_128_Update(&mm_ctx, buf, cur_len);
const CertInfo& cert = ssl.client_cert;
if (ssl.enable && !cert.certificate.empty()) {
// Certificate may be too long (PEM string) to fit into `buf'
butil::MurmurHash3_x64_128_Update(
&mm_ctx, cert.certificate.data(), cert.certificate.size());
butil::MurmurHash3_x64_128_Update(
&mm_ctx, cert.private_key.data(), cert.private_key.size());
// sni_filters has no effect in ChannelSSLOptions
}
butil::MurmurHash3_x64_128_Final(checksum, &mm_ctx);
}
int SocketMapInsert(const SocketMapKey& key, SocketId* id) {
return get_or_new_client_side_socket_map()->Insert(key, id);
int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
return get_or_new_client_side_socket_map()->Insert(key, id, ssl_ctx);
}
int SocketMapFind(const SocketMapKey& key, SocketId* id) {
......@@ -264,10 +208,10 @@ void SocketMap::PrintSocketMap(std::ostream& os, void* arg) {
static_cast<SocketMap*>(arg)->Print(os);
}
int SocketMap::Insert(const SocketMapKey& key, SocketId* id) {
SocketMapKeyChecksum ck(key);
int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx) {
std::unique_lock<butil::Mutex> mu(_mutex);
SingleConnection* sc = _map.seek(ck);
SingleConnection* sc = _map.seek(key);
if (sc) {
if (!sc->socket->Failed() ||
sc->socket->health_check_interval() > 0/*HC enabled*/) {
......@@ -277,30 +221,20 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id) {
}
// A socket w/o HC is failed (permanently), replace it.
SocketUniquePtr ptr(sc->socket); // Remove the ref added at insertion.
_map.erase(ck); // in principle, we can override the entry in map w/o
_map.erase(key); // in principle, we can override the entry in map w/o
// removing and inserting it again. But this would make error branches
// below have to remove the entry before returning, which is
// error-prone. We prefer code maintainability here.
sc = NULL;
}
std::unique_ptr<SSL_CTX, FreeSSLCTX> ssl_ctx(
CreateClientSSLContext(key.ssl_options));
if (key.ssl_options.enable && !ssl_ctx) {
return -1;
}
SocketId tmp_id;
SocketOptions opt;
opt.remote_side = key.peer;
// Can't save SSL_CTX in SocketMap since SingleConnection's desctruction
// may happen before Socket's destruction (remove Channel before RPC complete)
opt.owns_ssl_ctx = true;
opt.ssl_ctx = ssl_ctx.get();
opt.sni_name = key.ssl_options.sni_name;
opt.remote_side = key.peer.addr;
opt.initial_ssl_ctx = ssl_ctx;
if (_options.socket_creator->CreateSocket(opt, &tmp_id) != 0) {
PLOG(FATAL) << "Fail to create socket to " << key.peer;
return -1;
}
ssl_ctx.release();
// Add a reference to make sure that sc->socket is always accessible. Not
// use SocketUniquePtr which cannot put into containers before c++11.
// The ref will be removed at entry's removal.
......@@ -310,7 +244,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id) {
return -1;
}
SingleConnection new_sc = { 1, ptr.release(), 0 };
_map[ck] = new_sc;
_map[key] = new_sc;
*id = tmp_id;
bool need_to_create_bvar = false;
if (FLAGS_show_socketmap_in_vars && !_exposed_in_bvar) {
......@@ -334,9 +268,8 @@ void SocketMap::Remove(const SocketMapKey& key, SocketId expected_id) {
void SocketMap::RemoveInternal(const SocketMapKey& key,
SocketId expected_id,
bool remove_orphan) {
SocketMapKeyChecksum ck(key);
std::unique_lock<butil::Mutex> mu(_mutex);
SingleConnection* sc = _map.seek(ck);
SingleConnection* sc = _map.seek(key);
if (!sc) {
return;
}
......@@ -354,7 +287,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
sc->no_ref_us = butil::cpuwide_time_us();
} else {
Socket* const s = sc->socket;
_map.erase(ck);
_map.erase(key);
bool need_to_create_bvar = false;
if (FLAGS_show_socketmap_in_vars && !_exposed_in_bvar) {
_exposed_in_bvar = true;
......@@ -374,9 +307,8 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
}
int SocketMap::Find(const SocketMapKey& key, SocketId* id) {
SocketMapKeyChecksum ck(key);
BAIDU_SCOPED_LOCK(_mutex);
SingleConnection* sc = _map.seek(ck);
SingleConnection* sc = _map.seek(key);
if (sc) {
*id = sc->socket->id();
return 0;
......@@ -400,14 +332,14 @@ void SocketMap::List(std::vector<butil::EndPoint>* pts) {
}
}
void SocketMap::ListOrphans(int64_t defer_us, std::vector<butil::EndPoint>* out) {
void SocketMap::ListOrphans(int64_t defer_us, std::vector<SocketMapKey>* out) {
out->clear();
const int64_t now = butil::cpuwide_time_us();
BAIDU_SCOPED_LOCK(_mutex);
for (Map::iterator it = _map.begin(); it != _map.end(); ++it) {
SingleConnection& sc = it->second;
if (sc.ref_count == 0 && now - sc.no_ref_us >= defer_us) {
out->push_back(it->first.peer);
out->push_back(it->first);
}
}
}
......@@ -420,7 +352,7 @@ void* SocketMap::RunWatchConnections(void* arg) {
void SocketMap::WatchConnections() {
std::vector<SocketId> main_sockets;
std::vector<SocketId> pooled_sockets;
std::vector<butil::EndPoint> orphan_sockets;
std::vector<SocketMapKey> orphan_sockets;
const uint64_t CHECK_INTERVAL_US = 1000000UL;
while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
// NOTE: save the gflag which may be reloaded at any time.
......
......@@ -17,42 +17,73 @@
#ifndef BRPC_SOCKET_MAP_H
#define BRPC_SOCKET_MAP_H
#include <vector> // std::vector
#include "bvar/bvar.h" // bvar::PassiveStatus
#include "butil/containers/flat_map.h" // FlatMap
#include <vector> // std::vector
#include "bvar/bvar.h" // bvar::PassiveStatus
#include "butil/containers/flat_map.h" // FlatMap
#include "brpc/socket_id.h" // SockdetId
#include "brpc/options.pb.h" // ProtocolType
#include "brpc/ssl_option.h" // ChannelSSLOptions
#include "brpc/input_messenger.h" // InputMessageHandler
#include "brpc/server_node.h" // ServerNode
namespace brpc {
// Global mapping from remote-side to out-going sockets created by Channels.
// Different signature means that the Channel needs separate sockets.
struct ChannelSignature {
uint64_t data[2];
ChannelSignature() { Reset(); }
void Reset() { data[0] = data[1] = 0; }
};
inline bool operator==(const ChannelSignature& s1, const ChannelSignature& s2) {
return s1.data[0] == s2.data[0] && s1.data[1] == s2.data[1];
}
inline bool operator!=(const ChannelSignature& s1, const ChannelSignature& s2) {
return !(s1 == s2);
}
// The following fields uniquely define a Socket. In other word,
// Socket can't be shared between 2 different SocketMapKeys
struct SocketMapKey {
SocketMapKey(const butil::EndPoint& pt,
ChannelSSLOptions ssl = ChannelSSLOptions(),
const Authenticator* auth2 = NULL)
: peer(pt), ssl_options(ssl), auth(auth2)
explicit SocketMapKey(const butil::EndPoint& pt)
: peer(pt)
{}
SocketMapKey(const butil::EndPoint& pt, const ChannelSignature& cs)
: peer(pt), channel_signature(cs)
{}
SocketMapKey(const ServerNode& sn, const ChannelSignature& cs)
: peer(sn), channel_signature(cs)
{}
ServerNode peer;
ChannelSignature channel_signature;
};
butil::EndPoint peer;
ChannelSSLOptions ssl_options;
const Authenticator* auth;
inline bool operator==(const SocketMapKey& k1, const SocketMapKey& k2) {
return k1.peer == k2.peer && k1.channel_signature == k2.channel_signature;
};
struct SocketMapKeyHasher {
size_t operator()(const SocketMapKey& key) const {
size_t h = butil::DefaultHasher<butil::EndPoint>()(key.peer.addr);
h = h * 101 + butil::DefaultHasher<std::string>()(key.peer.tag);
h = h * 101 + key.channel_signature.data[1];
return h;
}
};
// Calculate an 128-bit hashcode for SocketMapKey
void ComputeSocketMapKeyChecksum(const SocketMapKey& key,
unsigned char* checksum);
// Try to share the Socket to `key'. If the Socket does not exist, create one.
// The corresponding SocketId is written to `*id'. If this function returns
// successfully, SocketMapRemove() MUST be called when the Socket is not needed.
// Return 0 on success, -1 otherwise.
int SocketMapInsert(const SocketMapKey& key, SocketId* id);
int SocketMapInsert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx);
inline int SocketMapInsert(const SocketMapKey& key, SocketId* id) {
std::shared_ptr<SocketSSLContext> empty_ptr;
return SocketMapInsert(key, id, empty_ptr);
}
// Find the SocketId associated with `key'.
// Return 0 on found, -1 otherwise.
......@@ -110,7 +141,13 @@ public:
SocketMap();
~SocketMap();
int Init(const SocketMapOptions&);
int Insert(const SocketMapKey& key, SocketId* id);
int Insert(const SocketMapKey& key, SocketId* id,
const std::shared_ptr<SocketSSLContext>& ssl_ctx);
int Insert(const SocketMapKey& key, SocketId* id) {
std::shared_ptr<SocketSSLContext> empty_ptr;
return Insert(key, id, empty_ptr);
}
void Remove(const SocketMapKey& key, SocketId expected_id);
int Find(const SocketMapKey& key, SocketId* id);
void List(std::vector<SocketId>* ids);
......@@ -120,7 +157,7 @@ public:
private:
void RemoveInternal(const SocketMapKey& key, SocketId id,
bool remove_orphan);
void ListOrphans(int64_t defer_us, std::vector<butil::EndPoint>* out);
void ListOrphans(int64_t defer_us, std::vector<SocketMapKey>* out);
void WatchConnections();
static void* RunWatchConnections(void*);
void Print(std::ostream& os);
......@@ -133,39 +170,10 @@ private:
int64_t no_ref_us;
};
// Store checksum of SocketMapKey instead of itself in order to:
// 1. Save precious space of key field in FlatMap
// 2. Simplify equivalence logic between SocketMapKeys
// (regard the hash collision to be zero)
struct SocketMapKeyChecksum {
explicit SocketMapKeyChecksum(const SocketMapKey& key)
: peer(key.peer) {
ComputeSocketMapKeyChecksum(key, checksum);
}
butil::EndPoint peer;
unsigned char checksum[16];
inline bool operator==(const SocketMapKeyChecksum& rhs) const {
return this->peer == rhs.peer
&& memcmp(this->checksum, rhs.checksum, sizeof(checksum)) == 0;
}
};
struct Checksum2Hash {
std::size_t operator()(const SocketMapKeyChecksum& key) const {
// Slice a subset of checksum over an evenly distributed hash
// won't affect the overall balance
std::size_t hash;
memcpy(&hash, key.checksum, sizeof(hash));
return hash;
}
};
// TODO: When RpcChannels connecting to one EndPoint are frequently created
// and destroyed, a single map+mutex may become hot-spots.
typedef butil::FlatMap<SocketMapKeyChecksum,
SingleConnection, Checksum2Hash> Map;
typedef butil::FlatMap<SocketMapKey, SingleConnection,
SocketMapKeyHasher> Map;
SocketMapOptions _options;
butil::Mutex _mutex;
Map _map;
......
......@@ -14,15 +14,14 @@
// Authors: Rujie Jiang (jiangrujie@baidu.com)
#include "brpc/ssl_option.h"
#include "brpc/ssl_options.h"
namespace brpc {
VerifyOptions::VerifyOptions() : verify_depth(0) {}
ChannelSSLOptions::ChannelSSLOptions()
: enable(false)
, ciphers("DEFAULT")
: ciphers("DEFAULT")
, protocols("TLSv1, TLSv1.1, TLSv1.2")
{}
......
......@@ -59,10 +59,6 @@ struct ChannelSSLOptions {
// Constructed with default options
ChannelSSLOptions();
// Whether to enable SSL on the channel.
// Default: false
bool enable;
// Cipher suites used for SSL handshake.
// The format of this string should follow that in `man 1 cipers'.
// Default: "DEFAULT"
......
......@@ -224,8 +224,8 @@ int URI::SetHttpURL(const char* url) {
return 0;
}
int ParseHostAndPortFromURL(const char* url, std::string* host_out,
int* port_out) {
int ParseURL(const char* url,
std::string* schema_out, std::string* host_out, int* port_out) {
const char* p = url;
// skip heading blanks
if (*p == ' ') {
......@@ -235,7 +235,6 @@ int ParseHostAndPortFromURL(const char* url, std::string* host_out,
// Find end of host, locate schema and user_info during the searching
bool need_schema = true;
bool need_user_info = true;
butil::StringPiece schema;
for (; true; ++p) {
const char action = g_url_parsing_fast_action_map[(int)*p];
if (action == URI_PARSE_CONTINUE) {
......@@ -247,7 +246,9 @@ int ParseHostAndPortFromURL(const char* url, std::string* host_out,
if (*p == ':') {
if (p[1] == '/' && p[2] == '/' && need_schema) {
need_schema = false;
schema.set(start, p - start);
if (schema_out) {
schema_out->assign(start, p - start);
}
p += 2;
start = p + 1;
}
......@@ -266,15 +267,12 @@ int ParseHostAndPortFromURL(const char* url, std::string* host_out,
}
int port = -1;
const char* host_end = SplitHostAndPort(start, p, &port);
if (port < 0) {
if (schema.empty() || schema == "http") {
port = 80;
} else if (schema == "https") {
port = 443;
}
if (host_out) {
host_out->assign(start, host_end - start);
}
if (port_out) {
*port_out = port;
}
host_out->assign(start, host_end - start);
*port_out = port;
return 0;
}
......
......@@ -155,10 +155,9 @@ friend class HttpMessage;
mutable QueryMap _query_map;
};
// Parse host and port from `url'.
// When port is absent, it's set to 80 for http and 443 for https.
// Parse host/port/schema from `url' if the corresponding parameter is not NULL.
// Returns 0 on success, -1 otherwise.
int ParseHostAndPortFromURL(const char* url, std::string* host, int* port);
int ParseURL(const char* url, std::string* schema, std::string* host, int* port);
inline void URI::SetQuery(const std::string& key, const std::string& value) {
get_query_map()[key] = value;
......
// Copyright (c) 2018 brpc authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Author: Ge,Jun (jge666@gmail.com)
// Date: Fri Sep 7 12:15:23 CST 2018
#ifndef BUTIL_PTR_CONTAINER_H
#define BUTIL_PTR_CONTAINER_H
namespace butil {
// Manage lifetime of a pointer. The key difference between PtrContainer and
// unique_ptr is that PtrContainer can be copied and the pointer inside is
// deeply copied or constructed on-demand.
template <typename T>
class PtrContainer {
public:
PtrContainer() : _ptr(NULL) {}
explicit PtrContainer(T* obj) : _ptr(obj) {}
~PtrContainer() {
delete _ptr;
}
PtrContainer(const PtrContainer& rhs)
: _ptr(rhs._ptr ? new T(*rhs._ptr) : NULL) {}
void operator=(const PtrContainer& rhs) {
if (rhs._ptr) {
if (_ptr) {
*_ptr = *rhs._ptr;
} else {
_ptr = new T(*rhs._ptr);
}
} else {
delete _ptr;
_ptr = NULL;
}
}
T* get() const { return _ptr; }
void reset(T* ptr) {
delete _ptr;
_ptr = ptr;
}
operator void*() const { return _ptr; }
private:
T* _ptr;
};
} // namespace butil
#endif // BUTIL_PTR_CONTAINER_H
......@@ -141,6 +141,7 @@ class MyEchoService : public ::test::EchoService {
if (req->code() != 0) {
res->add_code_list(req->code());
}
res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());
}
};
......@@ -258,14 +259,17 @@ protected:
}
void SetUpChannel(brpc::Channel* channel,
bool single_server, bool short_connection,
const brpc::Authenticator* auth = NULL) {
bool single_server,
bool short_connection,
const brpc::Authenticator* auth = NULL,
std::string connection_group = std::string()) {
brpc::ChannelOptions opt;
if (short_connection) {
opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
}
opt.auth = auth;
opt.max_retry = 0;
opt.connection_group = connection_group;
if (single_server) {
EXPECT_EQ(0, channel->Init(_ep, &opt));
} else {
......@@ -405,6 +409,7 @@ protected:
EXPECT_EQ(0, cntl.ErrorCode())
<< single_server << ", " << async << ", " << short_connection;
const uint64_t receiving_socket_id = res.receiving_socket_id();
EXPECT_EQ(0, cntl.sub_count());
EXPECT_TRUE(NULL == cntl.sub(-1));
EXPECT_TRUE(NULL == cntl.sub(0));
......@@ -419,7 +424,48 @@ protected:
}
} else {
EXPECT_GE(1ul, _messenger.ConnectionCount());
}
}
if (single_server && !short_connection) {
// Reuse the connection
brpc::Channel channel2;
SetUpChannel(&channel2, single_server, short_connection);
cntl.Reset();
req.Clear();
res.Clear();
req.set_message(__FUNCTION__);
CallMethod(&channel2, &cntl, &req, &res, async);
EXPECT_EQ(0, cntl.ErrorCode())
<< single_server << ", " << async << ", " << short_connection;
EXPECT_EQ(receiving_socket_id, res.receiving_socket_id());
// A different connection_group does not reuse the connection
brpc::Channel channel3;
SetUpChannel(&channel3, single_server, short_connection,
NULL, "another_group");
cntl.Reset();
req.Clear();
res.Clear();
req.set_message(__FUNCTION__);
CallMethod(&channel3, &cntl, &req, &res, async);
EXPECT_EQ(0, cntl.ErrorCode())
<< single_server << ", " << async << ", " << short_connection;
const uint64_t receiving_socket_id2 = res.receiving_socket_id();
EXPECT_NE(receiving_socket_id, receiving_socket_id2);
// Channel in the same connection_group reuses the connection
// note that the leading/trailing spaces should be trimed.
brpc::Channel channel4;
SetUpChannel(&channel4, single_server, short_connection,
NULL, " another_group ");
cntl.Reset();
req.Clear();
res.Clear();
req.set_message(__FUNCTION__);
CallMethod(&channel4, &cntl, &req, &res, async);
EXPECT_EQ(0, cntl.ErrorCode())
<< single_server << ", " << async << ", " << short_connection;
EXPECT_EQ(receiving_socket_id2, res.receiving_socket_id());
}
StopAndJoin();
}
......@@ -1547,6 +1593,10 @@ protected:
void TestAuthentication(bool single_server,
bool async, bool short_connection) {
std::cout << " *** single=" << single_server
<< " async=" << async
<< " short=" << short_connection << std::endl;
ASSERT_EQ(0, StartAccept(_ep));
MyAuthenticator auth;
brpc::Channel channel;
......@@ -1809,7 +1859,7 @@ TEST_F(ChannelTest, init_as_single_server) {
ASSERT_EQ(ep, channel._server_address);
brpc::SocketId id;
ASSERT_EQ(0, brpc::SocketMapFind(ep, &id));
ASSERT_EQ(0, brpc::SocketMapFind(brpc::SocketMapKey(ep), &id));
ASSERT_EQ(id, channel._server_id);
const int NUM = 10;
......
......@@ -127,7 +127,7 @@ TEST_F(SocketMapTest, max_pool_size) {
} //namespace
int main(int argc, char* argv[]) {
butil::str2endpoint("127.0.0.1:12345", &g_key.peer);
butil::str2endpoint("127.0.0.1:12345", &g_key.peer.addr);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -95,7 +95,7 @@ TEST_F(SSLTest, sanity) {
brpc::CertInfo cert;
cert.certificate = "cert1.crt";
cert.private_key = "cert1.key";
options.ssl_options.default_cert = cert;
options.mutable_ssl_options()->default_cert = cert;
EchoServiceImpl echo_svc;
ASSERT_EQ(0, server.AddService(
......@@ -108,7 +108,7 @@ TEST_F(SSLTest, sanity) {
{
brpc::Channel channel;
brpc::ChannelOptions coptions;
coptions.ssl_options.enable = true;
coptions.mutable_ssl_options();
ASSERT_EQ(0, channel.Init("localhost", port, &coptions));
brpc::Controller cntl;
......@@ -124,7 +124,7 @@ TEST_F(SSLTest, sanity) {
{
brpc::Channel channel;
brpc::ChannelOptions coptions;
coptions.ssl_options.enable = true;
coptions.mutable_ssl_options();
ASSERT_EQ(0, channel.Init("127.0.0.1", port, &coptions));
for (int i = 0; i < NUM; ++i) {
google::protobuf::Closure* thrd_func =
......@@ -140,7 +140,7 @@ TEST_F(SSLTest, sanity) {
brpc::Channel channel;
brpc::ChannelOptions coptions;
coptions.protocol = "http";
coptions.ssl_options.enable = true;
coptions.mutable_ssl_options();
ASSERT_EQ(0, channel.Init("127.0.0.1", port, &coptions));
for (int i = 0; i < NUM; ++i) {
google::protobuf::Closure* thrd_func =
......@@ -160,8 +160,7 @@ void CheckCert(const char* cname, const char* cert) {
const int port = 8613;
brpc::Channel channel;
brpc::ChannelOptions coptions;
coptions.ssl_options.enable = true;
coptions.ssl_options.sni_name = cname;
coptions.mutable_ssl_options()->sni_name = cname;
ASSERT_EQ(0, channel.Init("127.0.0.1", port, &coptions));
SendMultipleRPC(&channel, 1);
......@@ -199,14 +198,14 @@ TEST_F(SSLTest, ssl_sni) {
cert.certificate = "cert1.crt";
cert.private_key = "cert1.key";
cert.sni_filters.push_back("cert1.com");
options.ssl_options.default_cert = cert;
options.mutable_ssl_options()->default_cert = cert;
}
{
brpc::CertInfo cert;
cert.certificate = GetRawPemString("cert2.crt");
cert.private_key = GetRawPemString("cert2.key");
cert.sni_filters.push_back("*.cert2.com");
options.ssl_options.certs.push_back(cert);
options.mutable_ssl_options()->certs.push_back(cert);
}
EchoServiceImpl echo_svc;
ASSERT_EQ(0, server.AddService(
......@@ -230,7 +229,7 @@ TEST_F(SSLTest, ssl_reload) {
cert.certificate = "cert1.crt";
cert.private_key = "cert1.key";
cert.sni_filters.push_back("cert1.com");
options.ssl_options.default_cert = cert;
options.mutable_ssl_options()->default_cert = cert;
}
EchoServiceImpl echo_svc;
ASSERT_EQ(0, server.AddService(
......@@ -318,7 +317,6 @@ TEST_F(SSLTest, ssl_perf) {
ASSERT_GT(servfd, 0);
brpc::ChannelSSLOptions opt;
opt.enable = true;
SSL_CTX* cli_ctx = brpc::CreateClientSSLContext(opt);
SSL_CTX* serv_ctx =
brpc::CreateServerSSLContext("cert1.crt", "cert1.key",
......
......@@ -20,9 +20,11 @@ TEST(URITest, everything) {
ASSERT_EQ(*uri.GetQuery("wd"), "uri");
ASSERT_FALSE(uri.GetQuery("nonkey"));
std::string schema;
std::string host_out;
int port_out = -1;
brpc::ParseHostAndPortFromURL(uri_str.c_str(), &host_out, &port_out);
brpc::ParseURL(uri_str.c_str(), &schema, &host_out, &port_out);
ASSERT_EQ("foobar", schema);
ASSERT_EQ("www.baidu.com", host_out);
ASSERT_EQ(80, port_out);
}
......
......@@ -15,6 +15,7 @@ message EchoRequest {
message EchoResponse {
required string message = 1;
repeated int32 code_list = 2;
optional uint64 receiving_socket_id = 3;
};
message ComboRequest {
......
saved_pwd_before_making=$PWD
for file in `find tools example -name CMakeFiles -type d -prune -o -name CMakeLists.txt -print`; do
for file in `find tools example -name Makefile`; do
cd $(dirname $file)
echo
echo "[$file]"
if ! ( rm -rf build && mkdir build && cd build && cmake .. && make -sj4 ); then
if ! make -sj4; then
exit 1
fi
cd $saved_pwd_before_making
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment