Commit 51259c0b authored by gejun's avatar gejun

Move reuse_addr from server.cpp to endpoint.cpp and make the code more robust

parent 1abd943a
...@@ -104,9 +104,6 @@ const char* status_str(Server::Status s) { ...@@ -104,9 +104,6 @@ const char* status_str(Server::Status s) {
butil::static_atomic<int> g_running_server_count = BUTIL_STATIC_ATOMIC_INIT(0); butil::static_atomic<int> g_running_server_count = BUTIL_STATIC_ATOMIC_INIT(0);
DEFINE_bool(reuse_addr, true, "Bind to ports in TIME_WAIT state");
BRPC_VALIDATE_GFLAG(reuse_addr, PassValidate);
// Following services may have security issues and are disabled by default. // Following services may have security issues and are disabled by default.
DEFINE_bool(enable_dir_service, false, "Enable /dir"); DEFINE_bool(enable_dir_service, false, "Enable /dir");
DEFINE_bool(enable_threads_service, false, "Enable /threads"); DEFINE_bool(enable_threads_service, false, "Enable /threads");
...@@ -939,7 +936,7 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -939,7 +936,7 @@ int Server::StartInternal(const butil::ip_t& ip,
_listen_addr.ip = ip; _listen_addr.ip = ip;
for (int port = port_range.min_port; port <= port_range.max_port; ++port) { for (int port = port_range.min_port; port <= port_range.max_port; ++port) {
_listen_addr.port = port; _listen_addr.port = port;
butil::fd_guard sockfd(tcp_listen(_listen_addr, FLAGS_reuse_addr)); butil::fd_guard sockfd(tcp_listen(_listen_addr));
if (sockfd < 0) { if (sockfd < 0) {
if (port != port_range.max_port) { // not the last port, try next if (port != port_range.max_port) { // not the last port, try next
continue; continue;
...@@ -999,7 +996,7 @@ int Server::StartInternal(const butil::ip_t& ip, ...@@ -999,7 +996,7 @@ int Server::StartInternal(const butil::ip_t& ip,
} }
butil::EndPoint internal_point = _listen_addr; butil::EndPoint internal_point = _listen_addr;
internal_point.port = _options.internal_port; internal_point.port = _options.internal_port;
butil::fd_guard sockfd(tcp_listen(internal_point, FLAGS_reuse_addr)); butil::fd_guard sockfd(tcp_listen(internal_point));
if (sockfd < 0) { if (sockfd < 0) {
LOG(ERROR) << "Fail to listen " << internal_point << " (internal)"; LOG(ERROR) << "Fail to listen " << internal_point << " (internal)";
return -1; return -1;
......
...@@ -29,12 +29,12 @@ ...@@ -29,12 +29,12 @@
#include "butil/logging.h" #include "butil/logging.h"
#include "butil/memory/singleton_on_pthread_once.h" #include "butil/memory/singleton_on_pthread_once.h"
#include "butil/strings/string_piece.h" #include "butil/strings/string_piece.h"
#include <sys/socket.h> // SO_REUSEADDR SO_REUSEPORT
#ifndef SO_REUSEPORT //supported since Linux 3.9.
#define SO_REUSEPORT 15 DEFINE_bool(reuse_port, false, "Enable SO_REUSEPORT for all listened sockets");
#endif
//This option is supported since Linux 3.9. DEFINE_bool(reuse_addr, true, "Enable SO_REUSEADDR for all listened sockets");
DEFINE_bool(reuse_port, false, "turn on support for SO_REUSEPORT socket option.");
__BEGIN_DECLS __BEGIN_DECLS
int BAIDU_WEAK bthread_connect( int BAIDU_WEAK bthread_connect(
...@@ -308,25 +308,36 @@ int tcp_connect(EndPoint point, int* self_port) { ...@@ -308,25 +308,36 @@ int tcp_connect(EndPoint point, int* self_port) {
return sockfd.release(); return sockfd.release();
} }
int tcp_listen(EndPoint point, bool reuse_addr) { int tcp_listen(EndPoint point) {
fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0)); fd_guard sockfd(socket(AF_INET, SOCK_STREAM, 0));
if (sockfd < 0) { if (sockfd < 0) {
return -1; return -1;
} }
if (reuse_addr) {
if (FLAGS_reuse_addr) {
#if defined(SO_REUSEADDR)
const int on = 1; const int on = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR,
&on, sizeof(on)) != 0) { &on, sizeof(on)) != 0) {
return -1; return -1;
} }
#else
LOG(ERROR) << "Missing def of SO_REUSEADDR while -reuse_addr is on";
return -1;
#endif
} }
if (FLAGS_reuse_port) { if (FLAGS_reuse_port) {
#if defined(SO_REUSEPORT)
const int on = 1; const int on = 1;
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT,
&on, sizeof(on)) != 0) { &on, sizeof(on)) != 0) {
LOG(WARNING) << "Fail to setsockopt SO_REUSEPORT of sockfd=" << sockfd; LOG(WARNING) << "Fail to setsockopt SO_REUSEPORT of sockfd=" << sockfd;
} }
#else
LOG(ERROR) << "Missing def of SO_REUSEPORT while -reuse_port is on";
return -1;
#endif
} }
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
......
...@@ -119,10 +119,11 @@ int endpoint2hostname(const EndPoint& point, std::string* host); ...@@ -119,10 +119,11 @@ int endpoint2hostname(const EndPoint& point, std::string* host);
// Returns the socket descriptor, -1 otherwise and errno is set. // Returns the socket descriptor, -1 otherwise and errno is set.
int tcp_connect(EndPoint server, int* self_port); int tcp_connect(EndPoint server, int* self_port);
// Create and listen to a TCP socket bound with `ip_and_port'. If `reuse_addr' // Create and listen to a TCP socket bound with `ip_and_port'.
// is true, ports in TIME_WAIT will be bound as well. // To enable SO_REUSEADDR for the whole program, enable gflag -reuse_addr
// To enable SO_REUSEPORT for the whole program, enable gflag -reuse_port
// Returns the socket descriptor, -1 otherwise and errno is set. // Returns the socket descriptor, -1 otherwise and errno is set.
int tcp_listen(EndPoint ip_and_port, bool reuse_addr); int tcp_listen(EndPoint ip_and_port);
// Get the local end of a socket connection // Get the local end of a socket connection
int get_local_side(int fd, EndPoint *out); int get_local_side(int fd, EndPoint *out);
......
...@@ -240,7 +240,7 @@ protected: ...@@ -240,7 +240,7 @@ protected:
int StartAccept(butil::EndPoint ep) { int StartAccept(butil::EndPoint ep) {
int listening_fd = -1; int listening_fd = -1;
while ((listening_fd = tcp_listen(ep, true)) < 0) { while ((listening_fd = tcp_listen(ep)) < 0) {
if (errno == EADDRINUSE) { if (errno == EADDRINUSE) {
bthread_usleep(1000); bthread_usleep(1000);
} else { } else {
......
...@@ -148,7 +148,7 @@ TEST_F(MessengerTest, dispatch_tasks) { ...@@ -148,7 +148,7 @@ TEST_F(MessengerTest, dispatch_tasks) {
snprintf(buf, sizeof(buf), "input_messenger.socket%lu", i); snprintf(buf, sizeof(buf), "input_messenger.socket%lu", i);
int listening_fd = butil::unix_socket_listen(buf); int listening_fd = butil::unix_socket_listen(buf);
#else #else
int listening_fd = tcp_listen(butil::EndPoint(butil::IP_ANY, 7878), false); int listening_fd = tcp_listen(butil::EndPoint(butil::IP_ANY, 7878));
#endif #endif
ASSERT_TRUE(listening_fd > 0); ASSERT_TRUE(listening_fd > 0);
butil::make_non_blocking(listening_fd); butil::make_non_blocking(listening_fd);
......
...@@ -1178,7 +1178,7 @@ TEST_F(ServerTest, range_start) { ...@@ -1178,7 +1178,7 @@ TEST_F(ServerTest, range_start) {
butil::EndPoint point; butil::EndPoint point;
for (int i = START_PORT; i < END_PORT; ++i) { for (int i = START_PORT; i < END_PORT; ++i) {
point.port = i; point.port = i;
listen_fds[i - START_PORT].reset(butil::tcp_listen(point, true)); listen_fds[i - START_PORT].reset(butil::tcp_listen(point));
} }
brpc::Server server; brpc::Server server;
......
...@@ -303,7 +303,7 @@ TEST_F(SocketTest, single_threaded_connect_and_write) { ...@@ -303,7 +303,7 @@ TEST_F(SocketTest, single_threaded_connect_and_write) {
}; };
butil::EndPoint point(butil::IP_ANY, 7878); butil::EndPoint point(butil::IP_ANY, 7878);
int listening_fd = tcp_listen(point, false); int listening_fd = tcp_listen(point);
ASSERT_TRUE(listening_fd > 0); ASSERT_TRUE(listening_fd > 0);
butil::make_non_blocking(listening_fd); butil::make_non_blocking(listening_fd);
ASSERT_EQ(0, messenger->AddHandler(pairs[0])); ASSERT_EQ(0, messenger->AddHandler(pairs[0]));
...@@ -606,7 +606,7 @@ TEST_F(SocketTest, health_check) { ...@@ -606,7 +606,7 @@ TEST_F(SocketTest, health_check) {
EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" } EchoProcessHuluRequest, NULL, NULL, "dummy_hulu" }
}; };
int listening_fd = tcp_listen(point, false); int listening_fd = tcp_listen(point);
ASSERT_TRUE(listening_fd > 0); ASSERT_TRUE(listening_fd > 0);
butil::make_non_blocking(listening_fd); butil::make_non_blocking(listening_fd);
ASSERT_EQ(0, messenger->AddHandler(pairs[0])); ASSERT_EQ(0, messenger->AddHandler(pairs[0]));
......
...@@ -312,7 +312,7 @@ void* ssl_perf_server(void* arg) { ...@@ -312,7 +312,7 @@ void* ssl_perf_server(void* arg) {
TEST_F(SSLTest, ssl_perf) { TEST_F(SSLTest, ssl_perf) {
const butil::EndPoint ep(butil::IP_ANY, 5961); const butil::EndPoint ep(butil::IP_ANY, 5961);
butil::fd_guard listenfd(butil::tcp_listen(ep, false)); butil::fd_guard listenfd(butil::tcp_listen(ep));
ASSERT_GT(listenfd, 0); ASSERT_GT(listenfd, 0);
int clifd = tcp_connect(ep, NULL); int clifd = tcp_connect(ep, NULL);
ASSERT_GT(clifd, 0); ASSERT_GT(clifd, 0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment