Commit d070170e authored by zhujiashun's avatar zhujiashun

make src/brpc/event_dispatcher.cpp support kqueue

parent bc509810
...@@ -114,10 +114,22 @@ if(NOT PROTOC_LIB) ...@@ -114,10 +114,22 @@ if(NOT PROTOC_LIB)
message(FATAL_ERROR "Fail to find protoc lib") message(FATAL_ERROR "Fail to find protoc lib")
endif() endif()
find_path(SSL_INCLUDE_PATH NAMES openssl/ssl.h)
find_library(SSL_LIB NAMES ssl)
if ((NOT SSL_INCLUDE_PATH) OR (NOT SSL_LIB))
message(FATAL_ERROR "Fail to find ssl")
endif()
find_library(CRYPTO_LIB NAMES crypto)
if (NOT CRYPTO_LIB)
message(FATAL_ERROR "Fail to find crypto")
endif()
include_directories( include_directories(
${GFLAGS_INCLUDE_PATH} ${GFLAGS_INCLUDE_PATH}
${PROTOBUF_INCLUDE_DIRS} ${PROTOBUF_INCLUDE_DIRS}
${LEVELDB_INCLUDE_PATH} ${LEVELDB_INCLUDE_PATH}
${SSL_INCLUDE_PATH}
) )
set(DYNAMIC_LIB set(DYNAMIC_LIB
...@@ -126,8 +138,8 @@ set(DYNAMIC_LIB ...@@ -126,8 +138,8 @@ set(DYNAMIC_LIB
${LEVELDB_LIB} ${LEVELDB_LIB}
${PROTOC_LIB} ${PROTOC_LIB}
${CMAKE_THREAD_LIBS_INIT} ${CMAKE_THREAD_LIBS_INIT}
ssl ${SSL_LIB}
crypto ${CRYPTO_LIB}
dl dl
z z
) )
......
...@@ -19,7 +19,12 @@ set_property(TARGET ${BUTIL_LIB} PROPERTY POSITION_INDEPENDENT_CODE 1) ...@@ -19,7 +19,12 @@ set_property(TARGET ${BUTIL_LIB} PROPERTY POSITION_INDEPENDENT_CODE 1)
add_library(brpc SHARED $<TARGET_OBJECTS:BUTIL_LIB> $<TARGET_OBJECTS:OBJ_LIB>) add_library(brpc SHARED $<TARGET_OBJECTS:BUTIL_LIB> $<TARGET_OBJECTS:OBJ_LIB>)
add_library(brpc_static STATIC $<TARGET_OBJECTS:BUTIL_LIB> $<TARGET_OBJECTS:OBJ_LIB>) add_library(brpc_static STATIC $<TARGET_OBJECTS:BUTIL_LIB> $<TARGET_OBJECTS:OBJ_LIB>)
target_link_libraries(brpc ${DYNAMIC_LIB}) target_link_libraries(brpc ${DYNAMIC_LIB}
"-framework CoreFoundation"
"-framework CoreGraphics"
"-framework CoreData"
"-framework CoreText"
"-framework Security")
if(WITH_GLOG) if(WITH_GLOG)
target_link_libraries(brpc ${GLOG_LIB}) target_link_libraries(brpc ${GLOG_LIB})
......
...@@ -127,8 +127,8 @@ void EventDispatcher::Stop() { ...@@ -127,8 +127,8 @@ void EventDispatcher::Stop() {
#if defined(OS_LINUX) #if defined(OS_LINUX)
epoll_event evt = { EPOLLOUT, { NULL } }; epoll_event evt = { EPOLLOUT, { NULL } };
epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt); epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
#elif define(OS_MACOSX) #elif defined(OS_MACOSX)
kevent kqueue_event; struct kevent kqueue_event;
EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
0, 0, NULL); 0, 0, NULL);
kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL); kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
...@@ -168,8 +168,8 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { ...@@ -168,8 +168,8 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
return -1; return -1;
} }
} }
#elif define(OS_MACOSX) #elif defined(OS_MACOSX)
kevent evt; struct kevent evt;
//TODO: add EV_EOF //TODO: add EV_EOF
EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
0, 0, (void*)socket_id); 0, 0, (void*)socket_id);
...@@ -189,6 +189,7 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) { ...@@ -189,6 +189,7 @@ int EventDispatcher::AddEpollOut(SocketId socket_id, int fd, bool pollin) {
int EventDispatcher::RemoveEpollOut(SocketId socket_id, int EventDispatcher::RemoveEpollOut(SocketId socket_id,
int fd, bool pollin) { int fd, bool pollin) {
#if defined(OS_LINUX)
if (pollin) { if (pollin) {
epoll_event evt; epoll_event evt;
evt.data.u64 = socket_id; evt.data.u64 = socket_id;
...@@ -200,6 +201,20 @@ int EventDispatcher::RemoveEpollOut(SocketId socket_id, ...@@ -200,6 +201,20 @@ int EventDispatcher::RemoveEpollOut(SocketId socket_id,
} else { } else {
return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL); return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
} }
#elif defined(OS_MACOSX)
struct kevent evt;
EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
return -1;
}
if (pollin) {
EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
0, 0, (void*)socket_id);
return kevent(_epfd, &evt, 1, NULL, 0, NULL);
}
return 0;
#endif
return -1;
} }
int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
...@@ -207,6 +222,7 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { ...@@ -207,6 +222,7 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
#if defined(OS_LINUX)
epoll_event evt; epoll_event evt;
evt.events = EPOLLIN | EPOLLET; evt.events = EPOLLIN | EPOLLET;
evt.data.u64 = socket_id; evt.data.u64 = socket_id;
...@@ -214,6 +230,13 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) { ...@@ -214,6 +230,13 @@ int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
evt.events |= has_epollrdhup; evt.events |= has_epollrdhup;
#endif #endif
return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt); return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
#elif defined(OS_MACOSX)
struct kevent evt;
EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
0, 0, (void*)socket_id);
return kevent(_epfd, &evt, 1, NULL, 0, NULL);
#endif
return -1;
} }
int EventDispatcher::RemoveConsumer(int fd) { int EventDispatcher::RemoveConsumer(int fd) {
...@@ -227,10 +250,18 @@ int EventDispatcher::RemoveConsumer(int fd) { ...@@ -227,10 +250,18 @@ int EventDispatcher::RemoveConsumer(int fd) {
// from epoll again! If the fd was level-triggered and there's data left, // from epoll again! If the fd was level-triggered and there's data left,
// epoll_wait will keep returning events of the fd continuously, making // epoll_wait will keep returning events of the fd continuously, making
// program abnormal. // program abnormal.
#if defined(OS_LINUX)
if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) { if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd; PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
return -1; return -1;
} }
#elif defined(OS_MACOSX)
struct kevent evt;
EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
kevent(_epfd, &evt, 1, NULL, 0, NULL);
#endif
return 0; return 0;
} }
...@@ -240,8 +271,9 @@ void* EventDispatcher::RunThis(void* arg) { ...@@ -240,8 +271,9 @@ void* EventDispatcher::RunThis(void* arg) {
} }
void EventDispatcher::Run() { void EventDispatcher::Run() {
epoll_event e[32];
while (!_stop) { while (!_stop) {
#if defined(OS_LINUX)
epoll_event e[32];
#ifdef BRPC_ADDITIONAL_EPOLL #ifdef BRPC_ADDITIONAL_EPOLL
// Performance downgrades in examples. // Performance downgrades in examples.
int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0); int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
...@@ -250,6 +282,10 @@ void EventDispatcher::Run() { ...@@ -250,6 +282,10 @@ void EventDispatcher::Run() {
} }
#else #else
const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1); const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
#endif
#elif defined(OS_MACOSX)
struct kevent e[32];
int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
#endif #endif
if (_stop) { if (_stop) {
// epoll_ctl/epoll_wait should have some sort of memory fencing // epoll_ctl/epoll_wait should have some sort of memory fencing
...@@ -262,10 +298,15 @@ void EventDispatcher::Run() { ...@@ -262,10 +298,15 @@ void EventDispatcher::Run() {
// We've checked _stop, no wake-up will be missed. // We've checked _stop, no wake-up will be missed.
continue; continue;
} }
#if defined(OS_LINUX)
PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd; PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
#elif defined(OS_MACOSX)
PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
#endif
break; break;
} }
for (int i = 0; i < n; ++i) { for (int i = 0; i < n; ++i) {
#if defined(OS_LINUX)
if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP) if (e[i].events & (EPOLLIN | EPOLLERR | EPOLLHUP)
#ifdef BRPC_SOCKET_HAS_EOF #ifdef BRPC_SOCKET_HAS_EOF
|| (e[i].events & has_epollrdhup) || (e[i].events & has_epollrdhup)
...@@ -275,12 +316,26 @@ void EventDispatcher::Run() { ...@@ -275,12 +316,26 @@ void EventDispatcher::Run() {
Socket::StartInputEvent(e[i].data.u64, e[i].events, Socket::StartInputEvent(e[i].data.u64, e[i].events,
_consumer_thread_attr); _consumer_thread_attr);
} }
#elif defined(OS_MACOSX)
if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
// We don't care about the return value.
Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
_consumer_thread_attr);
}
#endif
} }
for (int i = 0; i < n; ++i) { for (int i = 0; i < n; ++i) {
#if defined(OS_LINUX)
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) { if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
// We don't care about the return value. // We don't care about the return value.
Socket::HandleEpollOut(e[i].data.u64); Socket::HandleEpollOut(e[i].data.u64);
} }
#elif defined(OS_MACOSX)
if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
// We don't care about the return value.
Socket::HandleEpollOut((SocketId)e[i].udata);
}
#endif
} }
} }
} }
...@@ -302,7 +357,7 @@ void InitializeGlobalDispatchers() { ...@@ -302,7 +357,7 @@ void InitializeGlobalDispatchers() {
CHECK_EQ(0, g_edisp[i].Start(&attr)); CHECK_EQ(0, g_edisp[i].Start(&attr));
} }
// This atexit is will be run before g_task_control.stop() because above // This atexit is will be run before g_task_control.stop() because above
// Start() initializes g_task_control by creating bthread (to run epoll). // Start() initializes g_task_control by creating bthread (to run epoll/kqueue).
CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers)); CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
} }
......
...@@ -40,6 +40,9 @@ ...@@ -40,6 +40,9 @@
#include "brpc/stream_impl.h" #include "brpc/stream_impl.h"
#include "brpc/shared_object.h" #include "brpc/shared_object.h"
#include "brpc/policy/rtmp_protocol.h" // FIXME #include "brpc/policy/rtmp_protocol.h" // FIXME
#if defined(OS_MACOSX)
#include <sys/event.h>
#endif
namespace bthread { namespace bthread {
size_t __attribute__((weak)) size_t __attribute__((weak))
...@@ -1865,7 +1868,7 @@ AuthContext* Socket::mutable_auth_context() { ...@@ -1865,7 +1868,7 @@ AuthContext* Socket::mutable_auth_context() {
return _auth_context; return _auth_context;
} }
int Socket::StartInputEvent(SocketId id, uint32_t epoll_events, int Socket::StartInputEvent(SocketId id, uint32_t events,
const bthread_attr_t& thread_attr) { const bthread_attr_t& thread_attr) {
SocketUniquePtr s; SocketUniquePtr s;
if (Address(id, &s) < 0) { if (Address(id, &s) < 0) {
...@@ -1877,11 +1880,15 @@ int Socket::StartInputEvent(SocketId id, uint32_t epoll_events, ...@@ -1877,11 +1880,15 @@ int Socket::StartInputEvent(SocketId id, uint32_t epoll_events,
return 0; return 0;
} }
if (s->fd() < 0) { if (s->fd() < 0) {
CHECK(!(epoll_events & EPOLLIN)) << "epoll_events=" << epoll_events; #if defined(OS_LINUX)
CHECK(!(events & EPOLLIN)) << "epoll_events=" << events;
#elif defined(OS_MACOSX)
CHECK((short)events != EVFILT_READ) << "kqueue filter=" << events;
#endif
return -1; return -1;
} }
// if (epoll_events & has_epollrdhup) { // if (events & has_epollrdhup) {
// s->_eof = 1; // s->_eof = 1;
// } // }
// Passing e[i].events causes complex visibility issues and // Passing e[i].events causes complex visibility issues and
......
...@@ -332,7 +332,7 @@ public: ...@@ -332,7 +332,7 @@ public:
// Start to process edge-triggered events from the fd. // Start to process edge-triggered events from the fd.
// This function does not block caller. // This function does not block caller.
static int StartInputEvent(SocketId id, uint32_t epoll_events, static int StartInputEvent(SocketId id, uint32_t events,
const bthread_attr_t& thread_attr); const bthread_attr_t& thread_attr);
static const int PROGRESS_INIT = 1; static const int PROGRESS_INIT = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment