Commit 8b69ecfb authored by gejun's avatar gejun

Patch svn r35189 & patch_from_svn interprets test files better

parent 38ec549c
...@@ -184,7 +184,7 @@ bthread_t bthread_self(void) __THROW { ...@@ -184,7 +184,7 @@ bthread_t bthread_self(void) __THROW {
if (g != NULL && !g->is_current_main_task()/*note*/) { if (g != NULL && !g->is_current_main_task()/*note*/) {
return g->current_tid(); return g->current_tid();
} }
return 0; return INVALID_BTHREAD;
} }
int bthread_equal(bthread_t t1, bthread_t t2) __THROW { int bthread_equal(bthread_t t1, bthread_t t2) __THROW {
...@@ -314,7 +314,8 @@ int bthread_usleep(uint64_t microseconds) __THROW { ...@@ -314,7 +314,8 @@ int bthread_usleep(uint64_t microseconds) __THROW {
int bthread_yield(void) __THROW { int bthread_yield(void) __THROW {
bthread::TaskGroup* g = bthread::tls_task_group; bthread::TaskGroup* g = bthread::tls_task_group;
if (NULL != g && !g->is_current_pthread_task()) { if (NULL != g && !g->is_current_pthread_task()) {
return bthread::TaskGroup::yield(&g); bthread::TaskGroup::yield(&g);
return 0;
} }
return pthread_yield(); return pthread_yield();
} }
......
...@@ -522,7 +522,8 @@ static void wait_for_butex(void* arg) { ...@@ -522,7 +522,8 @@ static void wait_for_butex(void* arg) {
// // Value unmatched or waiter is already woken up by TimerThread, jump // // Value unmatched or waiter is already woken up by TimerThread, jump
// // back to original bthread. // // back to original bthread.
// TaskGroup* g = tls_task_group; // TaskGroup* g = tls_task_group;
// g->set_remained(TaskGroup::ready_to_run_in_worker, (void*)g->current_tid()); // ReadyToRunArgs args = { g->current_tid(), false };
// g->set_remained(TaskGroup::ready_to_run_in_worker, &args);
// // 2: Don't run remained because we're already in a remained function // // 2: Don't run remained because we're already in a remained function
// // otherwise stack may overflow. // // otherwise stack may overflow.
// TaskGroup::sched_to(&g, bw->tid, false/*2*/); // TaskGroup::sched_to(&g, bw->tid, false/*2*/);
......
...@@ -274,7 +274,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) { ...@@ -274,7 +274,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
if (!skip_remained) { if (!skip_remained) {
while (g->_last_context_remained) { while (g->_last_context_remained) {
void (*fn)(void*) = g->_last_context_remained; RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL; g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg); fn(g->_last_context_remained_arg);
g = tls_task_group; g = tls_task_group;
...@@ -362,7 +362,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) { ...@@ -362,7 +362,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
} }
void TaskGroup::_release_last_context(void* arg) { void TaskGroup::_release_last_context(void* arg) {
TaskMeta* m = (TaskMeta*)arg; TaskMeta* m = static_cast<TaskMeta*>(arg);
if (m->stack_type() != STACK_TYPE_PTHREAD) { if (m->stack_type() != STACK_TYPE_PTHREAD) {
return_stack(m->release_stack()/*may be NULL*/); return_stack(m->release_stack()/*may be NULL*/);
} else { } else {
...@@ -411,10 +411,17 @@ int TaskGroup::start_foreground(TaskGroup** pg, ...@@ -411,10 +411,17 @@ int TaskGroup::start_foreground(TaskGroup** pg,
g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL)); g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else { } else {
// NOSIGNAL affects current task, not the new task. // NOSIGNAL affects current task, not the new task.
g->set_remained(((using_attr.flags & BTHREAD_NOSIGNAL) RemainedFn fn = NULL;
? ready_to_run_in_worker_nosignal if (g->current_task()->about_to_quit) {
: ready_to_run_in_worker), fn = ready_to_run_in_worker_ignoresignal;
(void*)g->current_tid()); } else {
fn = ready_to_run_in_worker;
}
ReadyToRunArgs args = {
g->current_tid(),
(bool)(using_attr.flags & BTHREAD_NOSIGNAL)
};
g->set_remained(fn, &args);
TaskGroup::sched_to(pg, m->tid); TaskGroup::sched_to(pg, m->tid);
} }
return 0; return 0;
...@@ -636,7 +643,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) { ...@@ -636,7 +643,7 @@ void TaskGroup::sched_to(TaskGroup** pg, TaskMeta* next_meta) {
} }
while (g->_last_context_remained) { while (g->_last_context_remained) {
void (*fn)(void*) = g->_last_context_remained; RemainedFn fn = g->_last_context_remained;
g->_last_context_remained = NULL; g->_last_context_remained = NULL;
fn(g->_last_context_remained_arg); fn(g->_last_context_remained_arg);
g = tls_task_group; g = tls_task_group;
...@@ -661,24 +668,16 @@ void TaskGroup::destroy_self() { ...@@ -661,24 +668,16 @@ void TaskGroup::destroy_self() {
} }
} }
void TaskGroup::ready_to_run(bthread_t tid) {
push_rq(tid);
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled += 1 + additional_signal;
_control->signal_task(1 + additional_signal);
}
void TaskGroup::ready_to_run_nosignal(bthread_t tid) {
push_rq(tid);
++_num_nosignal;
}
void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) { void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
push_rq(tid);
if (nosignal) { if (nosignal) {
return ready_to_run_nosignal(tid); ++_num_nosignal;
} else {
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled += 1 + additional_signal;
_control->signal_task(1 + additional_signal);
} }
return ready_to_run(tid);
} }
void TaskGroup::flush_nosignal_tasks() { void TaskGroup::flush_nosignal_tasks() {
...@@ -737,16 +736,14 @@ void TaskGroup::flush_nosignal_tasks_general() { ...@@ -737,16 +736,14 @@ void TaskGroup::flush_nosignal_tasks_general() {
return flush_nosignal_tasks_remote(); return flush_nosignal_tasks_remote();
} }
void TaskGroup::ready_to_run_in_worker(void* arg) { void TaskGroup::ready_to_run_in_worker(void* args_in) {
return tls_task_group->ready_to_run((bthread_t)arg); ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
} return tls_task_group->ready_to_run(args->tid, args->nosignal);
void TaskGroup::ready_to_run_in_worker_nosignal(void* arg) {
return tls_task_group->ready_to_run_nosignal((bthread_t)arg);
} }
void TaskGroup::ready_to_run_in_worker_ignoresignal(void* arg) { void TaskGroup::ready_to_run_in_worker_ignoresignal(void* args_in) {
return tls_task_group->push_rq((bthread_t)arg); ReadyToRunArgs* args = static_cast<ReadyToRunArgs*>(args_in);
return tls_task_group->push_rq(args->tid);
} }
struct SleepArgs { struct SleepArgs {
...@@ -775,7 +772,7 @@ void TaskGroup::_add_sleep_event(void* arg) { ...@@ -775,7 +772,7 @@ void TaskGroup::_add_sleep_event(void* arg) {
base::microseconds_from_now(e.timeout_us)); base::microseconds_from_now(e.timeout_us));
if (!sleep_id) { if (!sleep_id) {
// TimerThread is stopping, schedule previous thread. // fail to schedule timer, go back to previous thread.
// TODO(gejun): Need error? // TODO(gejun): Need error?
g->ready_to_run(e.tid); g->ready_to_run(e.tid);
return; return;
...@@ -810,12 +807,8 @@ void TaskGroup::_add_sleep_event(void* arg) { ...@@ -810,12 +807,8 @@ void TaskGroup::_add_sleep_event(void* arg) {
// To be consistent with sys_usleep, set errno and return -1 on error. // To be consistent with sys_usleep, set errno and return -1 on error.
int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) { int TaskGroup::usleep(TaskGroup** pg, uint64_t timeout_us) {
if (0 == timeout_us) { if (0 == timeout_us) {
int rc = yield(pg); yield(pg);
if (rc == 0) { return 0;
return 0;
}
errno = rc;
return -1;
} }
TaskGroup* g = *pg; TaskGroup* g = *pg;
// We have to schedule timer after we switched to next bthread otherwise // We have to schedule timer after we switched to next bthread otherwise
...@@ -856,12 +849,11 @@ int TaskGroup::stop_usleep(bthread_t tid) { ...@@ -856,12 +849,11 @@ int TaskGroup::stop_usleep(bthread_t tid) {
return 0; return 0;
} }
int TaskGroup::yield(TaskGroup** pg) { void TaskGroup::yield(TaskGroup** pg) {
TaskGroup* g = *pg; TaskGroup* g = *pg;
g->set_remained(ready_to_run_in_worker_ignoresignal, ReadyToRunArgs args = { g->current_tid(), true };
(void*)g->current_tid()); g->set_remained(ready_to_run_in_worker_ignoresignal, &args);
sched(pg); sched(pg);
return 0;
} }
void print_task(std::ostream& os, bthread_t tid) { void print_task(std::ostream& os, bthread_t tid) {
......
...@@ -64,9 +64,8 @@ public: ...@@ -64,9 +64,8 @@ public:
static void ending_sched(TaskGroup** pg); static void ending_sched(TaskGroup** pg);
// Suspend caller and run bthread `next_tid' in TaskGroup *pg. // Suspend caller and run bthread `next_tid' in TaskGroup *pg.
// Purpose of this function is to avoid pushing `next_tid' to local // Purpose of this function is to avoid pushing `next_tid' to _rq and
// runqueue and then calling sched(pg), which has similar effect but // then being popped by sched(pg), which is not necessary.
// slower.
static void sched_to(TaskGroup** pg, TaskMeta* next_meta); static void sched_to(TaskGroup** pg, TaskMeta* next_meta);
static void sched_to(TaskGroup** pg, bthread_t next_tid); static void sched_to(TaskGroup** pg, bthread_t next_tid);
static void exchange(TaskGroup** pg, bthread_t next_tid); static void exchange(TaskGroup** pg, bthread_t next_tid);
...@@ -74,7 +73,8 @@ public: ...@@ -74,7 +73,8 @@ public:
// The callback will be run in the beginning of next-run bthread. // The callback will be run in the beginning of next-run bthread.
// Can't be called by current bthread directly because it often needs // Can't be called by current bthread directly because it often needs
// the target to be suspended already. // the target to be suspended already.
void set_remained(void (*cb)(void*), void* arg) { typedef void (*RemainedFn)(void*);
void set_remained(RemainedFn cb, void* arg) {
_last_context_remained = cb; _last_context_remained = cb;
_last_context_remained_arg = arg; _last_context_remained_arg = arg;
} }
...@@ -88,8 +88,7 @@ public: ...@@ -88,8 +88,7 @@ public:
// Suspend caller and run another bthread. When the caller will resume // Suspend caller and run another bthread. When the caller will resume
// is undefined. // is undefined.
// Returns 0 on success, -1 otherwise and errno is set. static void yield(TaskGroup** pg);
static int yield(TaskGroup** pg);
// Suspend caller until bthread `tid' terminates. // Suspend caller until bthread `tid' terminates.
static int join(bthread_t tid, void** return_value); static int join(bthread_t tid, void** return_value);
...@@ -132,9 +131,7 @@ public: ...@@ -132,9 +131,7 @@ public:
int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; } int64_t cumulated_cputime_ns() const { return _cumulated_cputime_ns; }
// Push a bthread into the runqueue // Push a bthread into the runqueue
void ready_to_run(bthread_t tid); void ready_to_run(bthread_t tid, bool nosignal = false);
void ready_to_run(bthread_t tid, bool nosignal);
void ready_to_run_nosignal(bthread_t tid);
// Flush tasks pushed to rq but signalled. // Flush tasks pushed to rq but signalled.
void flush_nosignal_tasks(); void flush_nosignal_tasks();
...@@ -182,8 +179,11 @@ friend class TaskControl; ...@@ -182,8 +179,11 @@ friend class TaskControl;
// Callbacks for set_remained() // Callbacks for set_remained()
static void _release_last_context(void*); static void _release_last_context(void*);
static void _add_sleep_event(void*); static void _add_sleep_event(void*);
struct ReadyToRunArgs {
bthread_t tid;
bool nosignal;
};
static void ready_to_run_in_worker(void*); static void ready_to_run_in_worker(void*);
static void ready_to_run_in_worker_nosignal(void*);
static void ready_to_run_in_worker_ignoresignal(void*); static void ready_to_run_in_worker_ignoresignal(void*);
// Wait for a task to run. // Wait for a task to run.
...@@ -216,7 +216,7 @@ friend class TaskControl; ...@@ -216,7 +216,7 @@ friend class TaskControl;
int64_t _cumulated_cputime_ns; int64_t _cumulated_cputime_ns;
size_t _nswitch; size_t _nswitch;
void (*_last_context_remained)(void*); RemainedFn _last_context_remained;
void* _last_context_remained_arg; void* _last_context_remained_arg;
ParkingLot* _pl; ParkingLot* _pl;
......
...@@ -36,10 +36,11 @@ inline void TaskGroup::exchange(TaskGroup** pg, bthread_t next_tid) { ...@@ -36,10 +36,11 @@ inline void TaskGroup::exchange(TaskGroup** pg, bthread_t next_tid) {
if (g->is_current_pthread_task()) { if (g->is_current_pthread_task()) {
return g->ready_to_run(next_tid); return g->ready_to_run(next_tid);
} }
ReadyToRunArgs args = { g->current_tid(), false };
g->set_remained((g->current_task()->about_to_quit g->set_remained((g->current_task()->about_to_quit
? ready_to_run_in_worker_ignoresignal ? ready_to_run_in_worker_ignoresignal
: ready_to_run_in_worker), : ready_to_run_in_worker),
(void*)g->current_tid()); &args);
TaskGroup::sched_to(pg, next_tid); TaskGroup::sched_to(pg, next_tid);
} }
...@@ -73,6 +74,10 @@ inline void TaskGroup::push_rq(bthread_t tid) { ...@@ -73,6 +74,10 @@ inline void TaskGroup::push_rq(bthread_t tid) {
// baidu-rpc) // baidu-rpc)
flush_nosignal_tasks(); flush_nosignal_tasks();
LOG_EVERY_SECOND(ERROR) << "_rq is full, capacity=" << _rq.capacity(); LOG_EVERY_SECOND(ERROR) << "_rq is full, capacity=" << _rq.capacity();
// TODO(gejun): May cause deadlock when all workers are spinning here.
// A better solution is to pop and run existing bthreads, however which
// make set_remained()-callbacks do context switches and need extensive
// reviews on related code.
::usleep(1000); ::usleep(1000);
} }
} }
......
...@@ -424,24 +424,46 @@ void* wait_cond_thread(void* arg) { ...@@ -424,24 +424,46 @@ void* wait_cond_thread(void* arg) {
return NULL; return NULL;
} }
TEST(CondTest, too_many_bthreads) { static void launch_many_bthreads() {
std::vector<bthread_t> th; g_stop = false;
th.resize(32768); bthread_t tid;
BthreadCond c; BthreadCond c;
c.Init(); c.Init();
bthread_t tid; base::Timer tm;
bthread_start_urgent(&tid, &BTHREAD_ATTR_PTHREAD, wait_cond_thread, &c); bthread_start_urgent(&tid, &BTHREAD_ATTR_PTHREAD, wait_cond_thread, &c);
for (size_t i = 0; i < th.size(); ++i) { std::vector<bthread_t> tids;
bthread_start_background(&th[i], NULL, usleep_thread, NULL); tids.reserve(32768);
} tm.start();
c.Signal(); for (size_t i = 0; i < 32768; ++i) {
bthread_t t0;
ASSERT_EQ(0, bthread_start_background(&t0, NULL, usleep_thread, NULL));
tids.push_back(t0);
}
tm.stop();
LOG(INFO) << "Creating bthreads took " << tm.u_elapsed() << " us";
usleep(3 * 1000 * 1000L); usleep(3 * 1000 * 1000L);
c.Signal();
g_stop = true; g_stop = true;
bthread_join(tid, NULL); bthread_join(tid, NULL);
ASSERT_TRUE(started_wait); for (size_t i = 0; i < tids.size(); ++i) {
ASSERT_TRUE(ended_wait); LOG_EVERY_SECOND(INFO) << "Joined " << i << " threads";
for (size_t i = 0; i < th.size(); ++i) { bthread_join(tids[i], NULL);
bthread_join(th[i], NULL);
} }
LOG_EVERY_SECOND(INFO) << "Joined " << tids.size() << " threads";
}
TEST(CondTest, too_many_bthreads_from_pthread) {
launch_many_bthreads();
}
static void* run_launch_many_bthreads(void*) {
launch_many_bthreads();
return NULL;
}
TEST(CondTest, too_many_bthreads_from_bthread) {
bthread_t th;
ASSERT_EQ(0, bthread_start_urgent(&th, NULL, run_launch_many_bthreads, NULL));
bthread_join(th, NULL);
} }
} // namespace } // namespace
...@@ -45,8 +45,6 @@ TEST(FDTest, read_kernel_version) { ...@@ -45,8 +45,6 @@ TEST(FDTest, read_kernel_version) {
volatile bool stop = false; volatile bool stop = false;
const size_t NCLIENT = 30;
struct SocketMeta { struct SocketMeta {
int fd; int fd;
int epfd; int epfd;
...@@ -62,6 +60,7 @@ struct EpollMeta { ...@@ -62,6 +60,7 @@ struct EpollMeta {
int epfd; int epfd;
}; };
const size_t NCLIENT = 30;
void* process_thread(void* arg) { void* process_thread(void* arg) {
SocketMeta* m = (SocketMeta*)arg; SocketMeta* m = (SocketMeta*)arg;
size_t count; size_t count;
...@@ -148,9 +147,7 @@ void* client_thread(void* arg) { ...@@ -148,9 +147,7 @@ void* client_thread(void* arg) {
ssize_t rc; ssize_t rc;
do { do {
const int wait_rc = bthread_fd_wait(m->fd, EPOLLIN); const int wait_rc = bthread_fd_wait(m->fd, EPOLLIN);
if (__builtin_expect(wait_rc != 0, 0)) { EXPECT_EQ(0, wait_rc) << berror();
EXPECT_EQ(0, wait_rc) << berror();
}
rc = read(m->fd, &m->count, sizeof(m->count)); rc = read(m->fd, &m->count, sizeof(m->count));
} while (rc < 0 && errno == EAGAIN); } while (rc < 0 && errno == EAGAIN);
#else #else
......
...@@ -19,11 +19,23 @@ if [ -d "$1/.svn" ]; then ...@@ -19,11 +19,23 @@ if [ -d "$1/.svn" ]; then
cd $CURRENT_DIR cd $CURRENT_DIR
fi fi
MODIFIED_PATCHFILE="$(basename $PATCHFILE).brpc_os" MODIFIED_PATCHFILE="$(dirname $PATCHFILE)/brpc_os.$(basename $PATCHFILE)"
# guess prefix of test files
TEST_PREFIX="test_"
TEST_SUFFIX=
if fgrep -q " bthread/" $PATCHFILE; then
TEST_PREFIX="bthread_"
TEST_SUFFIX="_unittest"
elif fgrep -q " bvar/" $PATCHFILE; then
TEST_PREFIX="bvar_"
TEST_SUFFIX="_unittest"
fi
cat $PATCHFILE | sed -e 's/src\/baidu\/rpc\//src\/brpc\//g' \ cat $PATCHFILE | sed -e 's/src\/baidu\/rpc\//src\/brpc\//g' \
-e 's/\<baidu\/rpc\//brpc\//g' \ -e 's/\<baidu\/rpc\//brpc\//g' \
-e 's/src\/brpc\/test\/test_\(.*\)\.cpp/test\/brpc_\1_unittest.cpp/g' \ -e 's/\<src\/brpc\/test\/test_\(.*\)\.cpp/test\/brpc_\1_unittest.cpp/g' \
-e "s/\<test\/test_\(.*\)\.cpp/test\/${TEST_PREFIX}\1${TEST_SUFFIX}.cpp/g" \
-e 's/\<namespace \+baidu *{/namespace brpc {/g' \ -e 's/\<namespace \+baidu *{/namespace brpc {/g' \
-e 's/\<namespace \+rpc *{//g' \ -e 's/\<namespace \+rpc *{//g' \
-e 's/} *\/\/ \+namespace \+baidu/} \/\/ namespace brpc/g' \ -e 's/} *\/\/ \+namespace \+baidu/} \/\/ namespace brpc/g' \
...@@ -34,10 +46,10 @@ cat $PATCHFILE | sed -e 's/src\/baidu\/rpc\//src\/brpc\//g' \ ...@@ -34,10 +46,10 @@ cat $PATCHFILE | sed -e 's/src\/baidu\/rpc\//src\/brpc\//g' \
-e 's/\<protocol\/\(.*\)\.proto/src\/\1.proto/g' \ -e 's/\<protocol\/\(.*\)\.proto/src\/\1.proto/g' \
-e 's/TEST_F(HttpMessageTest/TEST(HttpMessageTest/g' \ -e 's/TEST_F(HttpMessageTest/TEST(HttpMessageTest/g' \
-e 's/TEST_F(URITest/TEST(URITest/g' \ -e 's/TEST_F(URITest/TEST(URITest/g' \
-e 's/bthread_cond\.cpp/src\/bthread\/condition_variable.cpp/g' \ -e 's/ bthread_cond\.cpp/ src\/bthread\/condition_variable.cpp/g' \
-e 's/bthread_\([^.]*\)\.cpp/src\/bthread\/\1.cpp/g' \ -e 's/ bthread_\([^.]*\)\.cpp/ src\/bthread\/\1.cpp/g' \
-e 's/bthread_\([^.]*\)\.h/src\/bthread\/\1.h/g' \ -e 's/ bthread_\([^.]*\)\.h/ src\/bthread\/\1.h/g' \
-e 's/bthread\.h/src\/bthread\/bthread.h/g' \ -e 's/ bthread\.\(h\|cpp\)/ src\/bthread\/bthread.\1/g' \
-e 's/<\(brpc\/[^>]*\)>/"\1"/g' \ -e 's/<\(brpc\/[^>]*\)>/"\1"/g' \
-e 's/<\(bvar\/[^>]*\)>/"\1"/g' \ -e 's/<\(bvar\/[^>]*\)>/"\1"/g' \
-e 's/<\(base\/[^>]*\)>/"\1"/g' \ -e 's/<\(base\/[^>]*\)>/"\1"/g' \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment