Commit 8090fae7 authored by Ge Jun's avatar Ge Jun

Calculate numfree/numinflight in SocketPool

parent 52599384
...@@ -103,6 +103,7 @@ const int WAIT_EPOLLOUT_TIMEOUT_MS = 50; ...@@ -103,6 +103,7 @@ const int WAIT_EPOLLOUT_TIMEOUT_MS = 50;
#endif #endif
class BAIDU_CACHELINE_ALIGNMENT SocketPool { class BAIDU_CACHELINE_ALIGNMENT SocketPool {
friend class Socket;
public: public:
explicit SocketPool(const SocketOptions& opt); explicit SocketPool(const SocketOptions& opt);
~SocketPool(); ~SocketPool();
...@@ -124,8 +125,8 @@ private: ...@@ -124,8 +125,8 @@ private:
butil::Mutex _mutex; butil::Mutex _mutex;
std::vector<SocketId> _pool; std::vector<SocketId> _pool;
butil::EndPoint _remote_side; butil::EndPoint _remote_side;
// #free-sockets in all sub pools. butil::atomic<int> _numfree; // #free sockets in all sub pools.
butil::atomic<int> _count; butil::atomic<int> _numinflight; // #inflight sockets in all sub pools.
}; };
// NOTE: sizeof of this class is 1200 bytes. If we have 10K sockets, total // NOTE: sizeof of this class is 1200 bytes. If we have 10K sockets, total
...@@ -298,6 +299,14 @@ void Socket::CreateVarsOnce() { ...@@ -298,6 +299,14 @@ void Socket::CreateVarsOnce() {
CHECK_EQ(0, pthread_once(&s_create_vars_once, CreateVars)); CHECK_EQ(0, pthread_once(&s_create_vars_once, CreateVars));
} }
// Used by ConnectionService
int64_t GetChannelConnectionCount() {
if (s_vars) {
return s_vars->channel_conn.get_value();
}
return 0;
}
bool Socket::CreatedByConnect() const { bool Socket::CreatedByConnect() const {
return _user == static_cast<SocketUser*>(get_client_side_messenger()); return _user == static_cast<SocketUser*>(get_client_side_messenger());
} }
...@@ -2073,7 +2082,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { ...@@ -2073,7 +2082,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
&ptr->_id_wait_list, idsizes, arraysize(idsizes)); &ptr->_id_wait_list, idsizes, arraysize(idsizes));
} }
} }
const int preferred_index = ptr->_preferred_index; const int preferred_index = ptr->preferred_index();
SharedPart* sp = ptr->GetSharedPart(); SharedPart* sp = ptr->GetSharedPart();
os << "version=" << VersionOfVRef(vref); os << "version=" << VersionOfVRef(vref);
if (sp) { if (sp) {
...@@ -2090,7 +2099,10 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) { ...@@ -2090,7 +2099,10 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
} }
os << pooled_sockets[i]; os << pooled_sockets[i];
} }
os << ']'; os << "]\n numfree="
<< pool->_numfree.load(butil::memory_order_relaxed)
<< "\n numinflight="
<< pool->_numinflight.load(butil::memory_order_relaxed);
} else { } else {
os << "null"; os << "null";
} }
...@@ -2289,7 +2301,10 @@ void SocketUser::AfterRevived(Socket* ptr) { ...@@ -2289,7 +2301,10 @@ void SocketUser::AfterRevived(Socket* ptr) {
////////// SocketPool ////////////// ////////// SocketPool //////////////
inline SocketPool::SocketPool(const SocketOptions& opt) inline SocketPool::SocketPool(const SocketOptions& opt)
: _options(opt), _remote_side(opt.remote_side), _count(0) { : _options(opt)
, _remote_side(opt.remote_side)
, _numfree(0)
, _numinflight(0) {
} }
inline SocketPool::~SocketPool() { inline SocketPool::~SocketPool() {
...@@ -2330,10 +2345,11 @@ inline int SocketPool::GetSocket(SocketUniquePtr* ptr) { ...@@ -2330,10 +2345,11 @@ inline int SocketPool::GetSocket(SocketUniquePtr* ptr) {
sid = _pool.back(); sid = _pool.back();
_pool.pop_back(); _pool.pop_back();
} }
_count.fetch_sub(1, butil::memory_order_relaxed); _numfree.fetch_sub(1, butil::memory_order_relaxed);
// Not address inside the lock since at most time the pooled socket // Not address inside the lock since at most time the pooled socket
// is likely to be valid. // is likely to be valid.
if (Socket::Address(sid, ptr) == 0) { if (Socket::Address(sid, ptr) == 0) {
_numinflight.fetch_add(1, butil::memory_order_relaxed);
return 0; return 0;
} }
} }
...@@ -2343,8 +2359,10 @@ inline int SocketPool::GetSocket(SocketUniquePtr* ptr) { ...@@ -2343,8 +2359,10 @@ inline int SocketPool::GetSocket(SocketUniquePtr* ptr) {
// Only main socket can be the owner of ssl_ctx // Only main socket can be the owner of ssl_ctx
opt.owns_ssl_ctx = false; opt.owns_ssl_ctx = false;
opt.health_check_interval_s = -1; opt.health_check_interval_s = -1;
if (get_client_side_messenger()->Create(opt, &sid) == 0) { if (get_client_side_messenger()->Create(opt, &sid) == 0 &&
return Socket::Address(sid, ptr); Socket::Address(sid, ptr) == 0) {
_numinflight.fetch_add(1, butil::memory_order_relaxed);
return 0;
} }
return -1; return -1;
} }
...@@ -2354,16 +2372,17 @@ inline void SocketPool::ReturnSocket(Socket* sock) { ...@@ -2354,16 +2372,17 @@ inline void SocketPool::ReturnSocket(Socket* sock) {
const int connection_pool_size = FLAGS_max_connection_pool_size; const int connection_pool_size = FLAGS_max_connection_pool_size;
// Check if the pool is full. // Check if the pool is full.
if (_count.fetch_add(1, butil::memory_order_relaxed) < if (_numfree.fetch_add(1, butil::memory_order_relaxed) <
connection_pool_size) { connection_pool_size) {
const SocketId sid = sock->id(); const SocketId sid = sock->id();
BAIDU_SCOPED_LOCK(_mutex); BAIDU_SCOPED_LOCK(_mutex);
_pool.push_back(sid); _pool.push_back(sid);
} else { } else {
// Cancel the addition and close the pooled socket. // Cancel the addition and close the pooled socket.
_count.fetch_sub(1, butil::memory_order_relaxed); _numfree.fetch_sub(1, butil::memory_order_relaxed);
sock->SetFailed(EUNUSED, "Close unused pooled socket"); sock->SetFailed(EUNUSED, "Close unused pooled socket");
} }
_numinflight.fetch_sub(1, butil::memory_order_relaxed);
} }
inline void SocketPool::ListSockets(std::vector<SocketId>* out, size_t max_count) { inline void SocketPool::ListSockets(std::vector<SocketId>* out, size_t max_count) {
...@@ -2491,6 +2510,20 @@ void Socket::ListPooledSockets(std::vector<SocketId>* out, size_t max_count) { ...@@ -2491,6 +2510,20 @@ void Socket::ListPooledSockets(std::vector<SocketId>* out, size_t max_count) {
} }
pool->ListSockets(out, max_count); pool->ListSockets(out, max_count);
} }
bool Socket::GetPooledSocketStats(int* numfree, int* numinflight) {
SharedPart* sp = GetSharedPart();
if (sp == NULL) {
return false;
}
SocketPool* pool = sp->socket_pool.load(butil::memory_order_consume);
if (pool == NULL) {
return false;
}
*numfree = pool->_numfree.load(butil::memory_order_relaxed);
*numinflight = pool->_numinflight.load(butil::memory_order_relaxed);
return true;
}
int Socket::GetShortSocket(Socket* main_socket, int Socket::GetShortSocket(Socket* main_socket,
SocketUniquePtr* short_socket) { SocketUniquePtr* short_socket) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment