Commit 8049aeb5 authored by gejun's avatar gejun

Add Socket.GetAgentSocket & Print socket differently

parent 528bc8cb
......@@ -648,6 +648,7 @@ int Socket::Create(const SocketOptions& options, SocketId* id) {
m->_recycle_flag.store(false, butil::memory_order_relaxed);
m->_error_code = 0;
m->_error_text.clear();
m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
if (rc) {
......@@ -773,7 +774,7 @@ void Socket::Revive() {
if (_user) {
_user->AfterRevived(this);
} else {
LOG(INFO) << "Revived " << *this;
LOG(INFO) << "Revived socket=" << *this;
}
return;
}
......@@ -978,7 +979,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
if (_first_time) { // Only check at first time.
_first_time = false;
if (ptr->WaitAndReset(2/*note*/) != 0) {
LOG(INFO) << "Cancel checking " << *ptr;
LOG(INFO) << "Cancel checking socket=" << *ptr;
return false;
}
}
......@@ -998,7 +999,7 @@ bool HealthCheckTask::OnTriggeringTask(timespec* next_abstime) {
ptr->_hc_count = 0;
return false;
} else if (hc == ESTOP) {
LOG(INFO) << "Cancel checking " << *ptr;
LOG(INFO) << "Cancel checking socket=" << *ptr;
return false;
}
++ ptr->_hc_count;
......@@ -1061,9 +1062,12 @@ void Socket::OnRecycle() {
delete _stream_set;
_stream_set = NULL;
if (_agent_socket) {
_agent_socket->ReleaseAdditionalReference();
_agent_socket.reset(NULL);
const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed);
if (asid != INVALID_SOCKET_ID) {
SocketUniquePtr ptr;
if (Socket::Address(asid, &ptr) == 0) {
ptr->ReleaseAdditionalReference();
}
}
s_vars->nsocket << -1;
......@@ -1392,7 +1396,7 @@ void Socket::AfterAppConnected(int err, void* data) {
}
}
s->SetFailed(err, "Fail to connect %s: %s",
s->SetFailed(err, "Fail to connect socket=%s: %s",
s->description().c_str(), berror(err));
s->ReleaseAllFailedWriteRequests(req);
}
......@@ -1585,7 +1589,7 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
int ret = ConnectIfNot(opt.abstime, req);
if (ret < 0) {
saved_errno = errno;
SetFailed(errno, "Fail to connect %s directly: %m", description().c_str());
SetFailed(errno, "Fail to connect socket=%s directly: %m", description().c_str());
goto FAIL_TO_WRITE;
} else if (ret == 1) {
// We are doing connection. Callback `KeepWriteIfConnected'
......@@ -1617,8 +1621,8 @@ int Socket::StartWrite(WriteRequest* req, const WriteOptions& opt) {
if (errno != EAGAIN && errno != EOVERCROWDED) {
saved_errno = errno;
// EPIPE is common in pooled connections + backup requests.
PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into " << *this;
SetFailed(saved_errno, "Fail to write into %s: %s",
PLOG_IF(WARNING, errno != EPIPE) << "Fail to write into socket=" << *this;
SetFailed(saved_errno, "Fail to write into socket=%s: %s",
description().c_str(), berror(saved_errno));
goto FAIL_TO_WRITE;
}
......@@ -1671,8 +1675,8 @@ void* Socket::KeepWrite(void* void_arg) {
if (nw < 0) {
if (errno != EAGAIN && errno != EOVERCROWDED) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to keep-write into " << *s;
s->SetFailed(saved_errno, "Fail to keep-write into %s: %s",
PLOG(WARNING) << "Fail to keep-write into socket=" << *s;
s->SetFailed(saved_errno, "Fail to keep-write into socket=%s: %s",
s->description().c_str(), berror(saved_errno));
break;
}
......@@ -1703,8 +1707,8 @@ void* Socket::KeepWrite(void* void_arg) {
const int rc = s->WaitEpollOut(s->fd(), pollin, &duetime);
if (rc < 0 && errno != ETIMEDOUT) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to wait epollout of " << *s;
s->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
PLOG(WARNING) << "Fail to wait epollout of socket=" << *s;
s->SetFailed(saved_errno, "Fail to wait epollout of socket=%s: %s",
s->description().c_str(), berror(saved_errno));
break;
}
......@@ -1963,7 +1967,7 @@ void Socket::SetAuthentication(int error_code) {
butil::memory_order_relaxed)) {
// As expected
if (error_code != 0) {
SetFailed(error_code, "Fail to authenticate %s", description().c_str());
SetFailed(error_code, "Fail to authenticate socket=%s", description().c_str());
}
CHECK_EQ(0, bthread_id_unlock_and_destroy(_auth_id));
}
......@@ -2158,11 +2162,12 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
Destroyable* const parsing_context = ptr->parsing_context();
Describable* parsing_context_desc = dynamic_cast<Describable*>(parsing_context);
if (parsing_context_desc) {
os << "\nparsing_context=";
os << "\nparsing_context=" << butil::class_name_str(*parsing_context) << '{';
DescribeOptions opt;
opt.verbose = true;
IndentingOStream os2(os, 2);
parsing_context_desc->Describe(os2, opt);
os << '}';
} else {
os << "\nparsing_context=" << ShowObject(parsing_context);
}
......@@ -2175,7 +2180,14 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\nauth_context=" << ptr->_auth_context
<< "\nlogoff_flag=" << ptr->_logoff_flag.load(butil::memory_order_relaxed)
<< "\nrecycle_flag=" << ptr->_recycle_flag.load(butil::memory_order_relaxed)
<< "\ncid=" << ptr->_correlation_id
<< "\nagent_socket_id=";
const SocketId asid = ptr->_agent_socket_id.load(butil::memory_order_relaxed);
if (asid != INVALID_SOCKET_ID) {
os << asid;
} else {
os << "(none)";
}
os << "\ncid=" << ptr->_correlation_id
<< "\nwrite_head=" << ptr->_write_head.load(butil::memory_order_relaxed)
<< "\nssl_state=" << SSLStateToString(ssl_state);
const SocketSSLContext* ssl_ctx = ptr->_ssl_ctx.get();
......@@ -2250,7 +2262,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
int Socket::CheckHealth() {
if (_hc_count == 0) {
LOG(INFO) << "Checking " << *this;
LOG(INFO) << "Checking socket=" << *this;
}
// Note: No timeout. Timeout setting is given to Write() which
// we don't know. A drawback is that if a connection takes long
......@@ -2312,7 +2324,7 @@ int SocketUser::CheckHealth(Socket* ptr) {
}
void SocketUser::AfterRevived(Socket* ptr) {
LOG(INFO) << "Revived " << *ptr;
LOG(INFO) << "Revived socket=" << *ptr;
}
////////// SocketPool //////////////
......@@ -2451,27 +2463,26 @@ void Socket::ShareStats(Socket* main_socket) {
}
}
int Socket::GetPooledSocket(Socket* main_socket,
SocketUniquePtr* pooled_socket) {
if (main_socket == NULL || pooled_socket == NULL) {
LOG(ERROR) << "main_socket or pooled_socket is NULL";
int Socket::GetPooledSocket(SocketUniquePtr* pooled_socket) {
if (pooled_socket == NULL) {
LOG(ERROR) << "pooled_socket is NULL";
return -1;
}
SharedPart* main_sp = main_socket->GetOrNewSharedPart();
SharedPart* main_sp = GetOrNewSharedPart();
if (main_sp == NULL) {
LOG(ERROR) << "main_socket->_shared_part is NULL";
LOG(ERROR) << "_shared_part is NULL";
return -1;
}
// Create socket_pool optimistically.
SocketPool* socket_pool = main_sp->socket_pool.load(butil::memory_order_consume);
if (socket_pool == NULL) {
SocketOptions opt;
opt.remote_side = main_socket->remote_side();
opt.user = main_socket->user();
opt.on_edge_triggered_events = main_socket->_on_edge_triggered_events;
opt.initial_ssl_ctx = main_socket->_ssl_ctx;
opt.keytable_pool = main_socket->_keytable_pool;
opt.app_connect = main_socket->_app_connect;
opt.remote_side = remote_side();
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
opt.initial_ssl_ctx = _ssl_ctx;
opt.keytable_pool = _keytable_pool;
opt.app_connect = _app_connect;
socket_pool = new SocketPool(opt);
SocketPool* expected = NULL;
if (!main_sp->socket_pool.compare_exchange_strong(
......@@ -2484,7 +2495,7 @@ int Socket::GetPooledSocket(Socket* main_socket,
if (socket_pool->GetSocket(pooled_socket) != 0) {
return -1;
}
(*pooled_socket)->ShareStats(main_socket);
(*pooled_socket)->ShareStats(this);
CHECK((*pooled_socket)->parsing_context() == NULL)
<< "context=" << (*pooled_socket)->parsing_context()
<< " is not NULL when socket={" << *(*pooled_socket) << "} is got from"
......@@ -2520,6 +2531,14 @@ int Socket::ReturnToPool() {
return 0;
}
bool Socket::HasSocketPool() const {
SharedPart* sp = GetSharedPart();
if (sp != NULL) {
return sp->socket_pool.load(butil::memory_order_consume) != NULL;
}
return false;
}
void Socket::ListPooledSockets(std::vector<SocketId>* out, size_t max_count) {
out->clear();
SharedPart* sp = GetSharedPart();
......@@ -2547,28 +2566,65 @@ bool Socket::GetPooledSocketStats(int* numfree, int* numinflight) {
return true;
}
int Socket::GetShortSocket(Socket* main_socket,
SocketUniquePtr* short_socket) {
if (main_socket == NULL || short_socket == NULL) {
LOG(ERROR) << "main_socket or short_socket is NULL";
int Socket::GetShortSocket(SocketUniquePtr* short_socket) {
if (short_socket == NULL) {
LOG(ERROR) << "short_socket is NULL";
return -1;
}
SocketId id;
SocketOptions opt;
opt.remote_side = main_socket->remote_side();
opt.user = main_socket->user();
opt.on_edge_triggered_events = main_socket->_on_edge_triggered_events;
opt.initial_ssl_ctx = main_socket->_ssl_ctx;
opt.keytable_pool = main_socket->_keytable_pool;
opt.app_connect = main_socket->_app_connect;
if (get_client_side_messenger()->Create(opt, &id) != 0) {
opt.remote_side = remote_side();
opt.user = user();
opt.on_edge_triggered_events = _on_edge_triggered_events;
opt.initial_ssl_ctx = _ssl_ctx;
opt.keytable_pool = _keytable_pool;
opt.app_connect = _app_connect;
if (get_client_side_messenger()->Create(opt, &id) != 0 ||
Socket::Address(id, short_socket) != 0) {
return -1;
}
if (Socket::Address(id, short_socket) != 0) {
(*short_socket)->ShareStats(this);
return 0;
}
int Socket::GetAgentSocket(SocketUniquePtr* out, bool (*checkfn)(Socket*)) {
SocketId id = _agent_socket_id.load(butil::memory_order_relaxed);
SocketUniquePtr tmp_sock;
do {
if (Socket::Address(id, &tmp_sock) == 0) {
if (checkfn == NULL || checkfn(tmp_sock.get())) {
out->swap(tmp_sock);
return 0;
}
tmp_sock->ReleaseAdditionalReference();
}
do {
if (GetShortSocket(&tmp_sock) != 0) {
LOG(ERROR) << "Fail to get short socket from " << *this;
return -1;
}
if (checkfn == NULL || checkfn(tmp_sock.get())) {
break;
}
tmp_sock->ReleaseAdditionalReference();
} while (1);
if (_agent_socket_id.compare_exchange_strong(
id, tmp_sock->id(), butil::memory_order_acq_rel)) {
out->swap(tmp_sock);
return 0;
}
tmp_sock->ReleaseAdditionalReference();
// id was updated, re-address
} while (1);
}
int Socket::PeekAgentSocket(SocketUniquePtr* out) const {
SocketId id = _agent_socket_id.load(butil::memory_order_relaxed);
if (id == INVALID_SOCKET_ID) {
return -1;
}
(*short_socket)->ShareStats(main_socket);
return 0;
return Address(id, out);
}
void Socket::GetStat(SocketStat* s) const {
......@@ -2611,7 +2667,7 @@ SocketId Socket::main_socket_id() const {
if (sp) {
return sp->creator_socket_id;
}
return (SocketId)-1;
return INVALID_SOCKET_ID;
}
void Socket::OnProgressiveReadCompleted() {
......@@ -2631,17 +2687,18 @@ std::string Socket::description() const {
// NOTE: The output should be consistent with operator<<()
std::string result;
result.reserve(64);
butil::string_appendf(&result, "{id=%" PRIu64, id());
const int saved_fd = fd();
if (saved_fd >= 0) {
butil::string_appendf(&result, "fd=%d ", saved_fd);
butil::string_appendf(&result, " fd=%d", saved_fd);
}
butil::string_appendf(&result, "SocketId=%" PRIu64 "@%s", id(),
butil::endpoint2str(remote_side()).c_str());
butil::string_appendf(&result, " addr=%s",
butil::endpoint2str(remote_side()).c_str());
const int local_port = local_side().port;
if (local_port > 0) {
butil::string_appendf(&result, "@%d", local_port);
butil::string_appendf(&result, ":%d", local_port);
}
butil::string_appendf(&result, " (0x%p)", this);
butil::string_appendf(&result, "} (0x%p)", this);
return result;
}
......@@ -2661,16 +2718,17 @@ SocketSSLContext::~SocketSSLContext() {
namespace std {
ostream& operator<<(ostream& os, const brpc::Socket& sock) {
// NOTE: The output should be consistent with Socket::description()
os << "{id=" << sock.id();
const int fd = sock.fd();
if (fd >= 0) {
os << "fd=" << fd << ' ';
os << " fd=" << fd;
}
os << "SocketId=" << sock.id() << '@' << sock.remote_side();
os << " addr=" << sock.remote_side();
const int local_port = sock.local_side().port;
if (local_port > 0) {
os << '@' << local_port;
os << ':' << local_port;
}
os << " (0x" << (void*)&sock << ')';
os << "} (" << (void*)&sock << ')';
return os;
}
}
......@@ -412,24 +412,42 @@ public:
// True if this socket was created by Connect.
bool CreatedByConnect() const;
/////////////// Pooled sockets ////////////////
// Get a (unused) socket from _shared_part->socket_pool, address it into
// `poole_socket'.
static int GetPooledSocket(Socket* main_socket,
SocketUniquePtr* pooled_socket);
// Return the socket (which must be got from GetPooledSocket) to its
// _main_socket's pool and reset _main_socket to NULL.
// Get an UNUSED socket connecting to the same place as this socket
// from the SocketPool of this socket.
int GetPooledSocket(SocketUniquePtr* pooled_socket);
// Return this socket which MUST be got from GetPooledSocket to its
// main_socket's pool.
int ReturnToPool();
// True if this socket has SocketPool
bool HasSocketPool() const;
// Put all sockets in _shared_part->socket_pool into `list'.
void ListPooledSockets(std::vector<SocketId>* list, size_t max_count = 0);
// Return true on success
bool GetPooledSocketStats(int* numfree, int* numinflight);
// Create a socket connecting to the same place of main_socket.
static int GetShortSocket(Socket* main_socket,
SocketUniquePtr* short_socket);
// Create a socket connecting to the same place as this socket.
int GetShortSocket(SocketUniquePtr* short_socket);
// Get and persist a socket connecting to the same place as this socket.
// If an agent socket was already created and persisted, it's returned
// directly (provided other constraints are satisfied)
// If `checkfn' is not NULL, and the checking result on the socket that
// would be returned is false, the socket is abadoned and the getting
// process is restarted.
// For example, http2 connections may run out of stream_id after long time
// running and a new socket should be created. In order not to affect
// LoadBalancers or NamingServices that may reference the Socket, agent
// socket can be used for the communication and replaced periodically but
// the main socket is unchanged.
int GetAgentSocket(SocketUniquePtr* out, bool (*checkfn)(Socket*));
// Take a peek at existing agent socket (no creation).
// Returns 0 on success.
int PeekAgentSocket(SocketUniquePtr* out) const;
// Where the stats of this socket are accumulated to.
SocketId main_socket_id() const;
......@@ -741,6 +759,8 @@ private:
int _error_code;
std::string _error_text;
butil::atomic<SocketId> _agent_socket_id;
butil::Mutex _pipeline_mutex;
std::deque<PipelinedInfo>* _pipeline_q;
......@@ -761,13 +781,6 @@ private:
butil::Mutex _stream_mutex;
std::set<StreamId> *_stream_set;
// In some protocols, certain resources may run out according to
// protocol spec. For example, http2 streamId would run out after
// long time running and a new socket should be created. In order
// not to affect main socket, _agent_socket are introduced to
// represent the communication socket.
SocketUniquePtr _agent_socket;
};
} // namespace brpc
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment