Skip to content

Commit

Permalink
Refactor impl of Thread::getExitStatus()/Thread::isAlive() (#5695)
Browse files Browse the repository at this point in the history
* Refactor impl of thread exit_code/living --filter=[thread]

* fix --filter=[thread]

* optimize code

* optimize code [2]
  • Loading branch information
matyhtf authored Feb 24, 2025
1 parent d647b78 commit 59d652a
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 125 deletions.
7 changes: 2 additions & 5 deletions ext-src/php_swoole_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
#include "php_swoole_cxx.h"

#ifdef SW_THREAD

#include "swoole_lock.h"
#include "swoole_thread.h"

typedef uint32_t ThreadResourceId;
class ThreadResource;
Expand All @@ -37,9 +36,7 @@ extern zend_class_entry *swoole_thread_lock_ce;
extern zend_class_entry *swoole_thread_map_ce;
extern zend_class_entry *swoole_thread_queue_ce;

void php_swoole_thread_start(zend_string *file, ZendArray *argv);
void php_swoole_thread_join(pthread_t ptid);
int php_swoole_thread_get_exit_status(pthread_t ptid);
void php_swoole_thread_start(std::shared_ptr<swoole::Thread> thread, zend_string *file, ZendArray *argv);
void php_swoole_thread_bailout(void);

ThreadResource *php_swoole_thread_arraylist_cast(zval *zobject);
Expand Down
10 changes: 2 additions & 8 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2684,21 +2684,15 @@ static PHP_METHOD(swoole_server, start) {
zval_ptr_dtor(&_thread_argv);
}

serv->worker_thread_start = [bootstrap, thread_argv](const WorkerFn &fn) {
serv->worker_thread_start = [bootstrap, thread_argv](std::shared_ptr<Thread> thread, const WorkerFn &fn) {
worker_thread_fn = fn;
zend_string *bootstrap_copy = zend_string_dup(bootstrap, 1);
if (thread_argv) {
thread_argv->add_ref();
}
php_swoole_thread_start(bootstrap_copy, thread_argv);
php_swoole_thread_start(thread, bootstrap_copy, thread_argv);
};

serv->worker_thread_get_exit_status = [](pthread_t ptid) -> int {
return php_swoole_thread_get_exit_status(ptid);
};

serv->worker_thread_join = [](pthread_t ptid) { php_swoole_thread_join(ptid); };

/**
*The hook must be enabled before creating child threads.
*The stream factory and ops are global variables, not thread-local resources.
Expand Down
103 changes: 43 additions & 60 deletions ext-src/swoole_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#include <unordered_map>
#include <atomic>

#include "swoole_thread.h"

BEGIN_EXTERN_C()
#include "stubs/php_swoole_thread_arginfo.h"
END_EXTERN_C()
Expand All @@ -45,11 +43,24 @@ static struct {

TSRMLS_CACHE_EXTERN();

typedef std::thread Thread;
using swoole::Thread;

struct PhpThread {
std::shared_ptr<Thread> thread;

PhpThread() : thread(std::make_shared<Thread>()) {}

bool join() {
if (!thread->joinable()) {
return false;
}
thread->join();
return true;
}
};

struct ThreadObject {
Thread *thread;
pthread_t thread_id;
PhpThread *pt;
zend_object std;
};

Expand All @@ -58,44 +69,35 @@ static void thread_register_stdio_file_handles(bool no_close);
static thread_local zval thread_argv = {};
static thread_local JMP_BUF *thread_bailout = nullptr;
static std::atomic<size_t> thread_num(1);
static zend::ConcurrencyHashMap<pthread_t, int> thread_exit_status(-1);
static zend::ConcurrencyHashMap<pthread_t, bool> thread_living(false);

static sw_inline ThreadObject *thread_fetch_object(zend_object *obj) {
return (ThreadObject *) ((char *) obj - swoole_thread_handlers.offset);
}

static pthread_t thread_get_id(zend_object *object) {
zval *res, rv;
res = zend_read_property(swoole_thread_ce, object, ZEND_STRL("id"), 1, &rv);
return (pthread_t) zval_get_long(res);
static sw_inline ThreadObject *thread_fetch_object(zval *zobj) {
return thread_fetch_object(Z_OBJ_P(zobj));
}

static pthread_t thread_get_id(zval *zobj) {
return thread_get_id(Z_OBJ_P(zobj));
static sw_inline PhpThread *thread_get_php_thread(zend_object *obj) {
return thread_fetch_object(obj)->pt;
}

static void thread_join(zend_object *object) {
ThreadObject *to = thread_fetch_object(object);
if (to->thread && to->thread->joinable()) {
to->thread->join();
php_swoole_thread_join(to->thread->native_handle());
delete to->thread;
to->thread = nullptr;
}
static sw_inline PhpThread *thread_get_php_thread(zval *zobj) {
return thread_fetch_object(zobj)->pt;
}

static void thread_free_object(zend_object *object) {
ThreadObject *to = thread_fetch_object(object);
thread_join(object);
thread_living.del(to->thread_id);
auto pt = thread_get_php_thread(object);
pt->join();
delete pt;
zend_object_std_dtor(object);
}

static zend_object *thread_create_object(zend_class_entry *ce) {
ThreadObject *to = (ThreadObject *) zend_object_alloc(sizeof(ThreadObject), ce);
zend_object_std_init(&to->std, ce);
object_properties_init(&to->std, ce);
to->pt = new PhpThread();
to->std.handlers = &swoole_thread_handlers;
return &to->std;
}
Expand Down Expand Up @@ -196,7 +198,7 @@ static PHP_METHOD(swoole_thread, __construct) {
return;
}

ThreadObject *to = thread_fetch_object(Z_OBJ_P(ZEND_THIS));
auto pt = thread_get_php_thread(ZEND_THIS);
zend_string *file = zend_string_init(script_file, l_script_file, 1);

if (argc > 0) {
Expand All @@ -207,46 +209,36 @@ static PHP_METHOD(swoole_thread, __construct) {
}

try {
to->thread = new std::thread([file, argv]() { php_swoole_thread_start(file, argv); });
pt->thread->start([file, argv, pt]() { php_swoole_thread_start(pt->thread, file, argv); });
} catch (const std::exception &e) {
zend_throw_exception(swoole_exception_ce, e.what(), SW_ERROR_SYSTEM_CALL_FAIL);
return;
}

to->thread_id = to->thread->native_handle();
zend::object_set(ZEND_THIS, ZEND_STRL("id"), (zend_long) to->thread_id);
zend::object_set(ZEND_THIS, ZEND_STRL("id"), (zend_long) pt->thread->get_id());
}

static PHP_METHOD(swoole_thread, isAlive) {
ThreadObject *to = thread_fetch_object(Z_OBJ_P(ZEND_THIS));
RETURN_BOOL(thread_living.get(to->thread_id));
auto pt = thread_get_php_thread(ZEND_THIS);
RETURN_BOOL(pt->thread->is_alive());
}

static PHP_METHOD(swoole_thread, join) {
ThreadObject *to = thread_fetch_object(Z_OBJ_P(ZEND_THIS));
if (!to || !to->thread || !to->thread->joinable()) {
RETURN_FALSE;
}
thread_join(Z_OBJ_P(ZEND_THIS));
RETURN_TRUE;
auto pt = thread_get_php_thread(ZEND_THIS);
RETURN_BOOL(pt->join());
}

static PHP_METHOD(swoole_thread, joinable) {
ThreadObject *to = thread_fetch_object(Z_OBJ_P(ZEND_THIS));
if (to == nullptr || !to->thread) {
RETURN_FALSE;
}
RETURN_BOOL(to->thread->joinable());
auto pt = thread_get_php_thread(ZEND_THIS);
RETURN_BOOL(pt->thread->joinable());
}

static PHP_METHOD(swoole_thread, detach) {
ThreadObject *to = thread_fetch_object(Z_OBJ_P(ZEND_THIS));
if (to == nullptr || !to->thread) {
auto pt = thread_get_php_thread(ZEND_THIS);
if (!pt->thread->joinable()) {
RETURN_FALSE;
}
to->thread->detach();
delete to->thread;
to->thread = nullptr;
pt->thread->detach();
RETURN_TRUE;
}

Expand All @@ -261,7 +253,8 @@ static PHP_METHOD(swoole_thread, getId) {
}

static PHP_METHOD(swoole_thread, getExitStatus) {
RETURN_LONG(php_swoole_thread_get_exit_status(thread_get_id(ZEND_THIS)));
auto pt = thread_get_php_thread(ZEND_THIS);
RETURN_LONG(pt->thread->get_exit_status());
}

static PHP_METHOD(swoole_thread, setName) {
Expand Down Expand Up @@ -417,9 +410,9 @@ static void thread_register_stdio_file_handles(bool no_close) {
zend_register_constant(&ec);
}

void php_swoole_thread_start(zend_string *file, ZendArray *argv) {
void php_swoole_thread_start(std::shared_ptr<Thread> thread, zend_string *file, ZendArray *argv) {
thread_num.fetch_add(1);
thread_living.set(pthread_self(), true);
thread->enter();
ts_resource(0);
#if defined(COMPILE_DL_SWOOLE) && defined(ZTS)
ZEND_TSRMLS_CACHE_UPDATE();
Expand Down Expand Up @@ -475,23 +468,13 @@ void php_swoole_thread_start(zend_string *file, ZendArray *argv) {
file_handle.filename = NULL;

_startup_error:
thread_exit_status.set(pthread_self(), EG(exit_status));

zend_string_release(file);
thread->exit(EG(exit_status));
ts_free_thread();
swoole_thread_clean();
thread_living.set(pthread_self(), false);
thread_num.fetch_sub(1);
}

void php_swoole_thread_join(pthread_t ptid) {
thread_exit_status.del(ptid);
}

int php_swoole_thread_get_exit_status(pthread_t ptid) {
return thread_exit_status.get(ptid);
}

size_t sw_active_thread_count(void) {
return thread_num.load();
}
Expand Down
1 change: 1 addition & 0 deletions include/swoole_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ SW_API bool swoole_timer_clear(long timer_id);
SW_API void swoole_timer_free();
SW_API int swoole_timer_select();
SW_API bool swoole_timer_is_available();
SW_API void swoole_timer_set_scheduler(const swoole::TimerScheduler &scheduler);

SW_API int swoole_event_init(int flags);
SW_API int swoole_event_add(swoole::network::Socket *socket, int events);
Expand Down
13 changes: 3 additions & 10 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
#include "swoole_pipe.h"
#include "swoole_channel.h"
#include "swoole_message_bus.h"
#ifdef SW_THREAD
#include "swoole_lock.h"
#endif

#ifdef SW_USE_OPENSSL
#include "swoole_dtls.h"
Expand Down Expand Up @@ -57,6 +54,7 @@ struct Request;

class Server;
struct Manager;
class Thread;

typedef std::function<void(void)> WorkerFn;

Expand Down Expand Up @@ -450,17 +448,14 @@ class ProcessFactory : public Factory {

class ThreadFactory : public BaseFactory {
private:
std::vector<std::thread> threads_;
std::vector<std::shared_ptr<Thread>> threads_;
std::mutex lock_;
std::condition_variable cv_;
std::queue<Worker *> queue_;
long cv_timeout_ms_;
bool reload_all_workers;
bool reloading;
Worker manager;
template <typename _Callable>
void create_thread(int i, _Callable fn);
void join_thread(std::thread &thread);
void at_thread_exit(Worker *worker);
void create_message_bus();
void destroy_message_bus();
Expand Down Expand Up @@ -1489,9 +1484,7 @@ class Server {
void worker_accept_event(DataHead *info);
void worker_signal_init(void);

std::function<void(const WorkerFn &fn)> worker_thread_start;
std::function<void(pthread_t ptid)> worker_thread_join;
std::function<int(pthread_t ptid)> worker_thread_get_exit_status;
std::function<void(std::shared_ptr<Thread>, const WorkerFn &fn)> worker_thread_start;

/**
* [Master]
Expand Down
49 changes: 49 additions & 0 deletions include/swoole_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,52 @@ static bool swoole_thread_set_name(const char *name) {
return pthread_setname_np(pthread_self(), name) == 0;
#endif
}

namespace swoole {
class Thread {
private:
int exit_status;
bool living;
std::thread thread;

public:
bool is_alive() {
return living;
}

bool joinable() {
return thread.joinable();
}

void join() {
thread.join();
}

void detach() {
thread.detach();
}

int get_exit_status() {
return exit_status;
}

pthread_t get_id() {
return thread.native_handle();
}

template <typename _Callable>
void start(_Callable fn) {
thread = std::thread(fn);
}

void enter() {
exit_status = 0;
living = true;
}

void exit(int status) {
exit_status = status;
living = false;
}
};
} // namespace swoole
Loading

0 comments on commit 59d652a

Please sign in to comment.