Skip to content

Commit

Permalink
Further changes to the other threads (plugin and core)
Browse files Browse the repository at this point in the history
  • Loading branch information
meetecho committed Dec 18, 2014
1 parent 42be4ea commit dbf63b6
Show file tree
Hide file tree
Showing 15 changed files with 384 additions and 160 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,9 @@ Makefile.in
/plugins/.libs
/postprocessing/*.o

/conf/janus.cfg.sample
/conf/janus.plugin.recordplay.cfg.sample
/conf/janus.plugin.streaming.cfg.sample

.deps
.dirstamp
6 changes: 5 additions & 1 deletion dtls.c
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ void janus_dtls_srtp_incoming_msg(janus_dtls_srtp *dtls, char *buf, uint16_t len
g_thread_try_new("DTLS-SCTP", janus_dtls_sctp_setup_thread, dtls, &error);
if(error != NULL) {
/* Something went wrong... */
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Got error %d (%s) trying to launch thread...\n", handle->handle_id, error->code, error->message ? error->message : "??");
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Got error %d (%s) trying to launch the DTLS-SCTP thread...\n", handle->handle_id, error->code, error->message ? error->message : "??");
}
dtls->srtp_valid = 1;
}
Expand Down Expand Up @@ -651,10 +651,14 @@ gboolean janus_dtls_retry(gpointer stack) {
/* Helper thread to create a SCTP association that will use this DTLS stack */
void *janus_dtls_sctp_setup_thread(void *data) {
if(data == NULL) {
JANUS_LOG(LOG_ERR, "No DTLS stack??\n");
g_thread_unref(g_thread_self());
return NULL;
}
janus_dtls_srtp *dtls = (janus_dtls_srtp *)data;
if(dtls->sctp == NULL) {
JANUS_LOG(LOG_ERR, "No SCTP stack??\n");
g_thread_unref(g_thread_self());
return NULL;
}
janus_sctp_association *sctp = (janus_sctp_association *)dtls->sctp;
Expand Down
17 changes: 14 additions & 3 deletions ice.c
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ void *janus_ice_thread(void *data) {
GMainLoop *loop = handle->iceloop;
if(loop == NULL) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Invalid loop...\n", handle->handle_id);
g_thread_unref(g_thread_self());
return NULL;
}
g_usleep (100000);
Expand Down Expand Up @@ -1072,10 +1073,20 @@ int janus_ice_setup_local(janus_ice_handle *handle, int offer, int audio, int vi

handle->icectx = g_main_context_new();
handle->iceloop = g_main_loop_new(handle->icectx, FALSE);
handle->icethread = g_thread_new("ice thread", &janus_ice_thread, handle);
/* We have a dedicated thread for sending packets/messages */
GError *error = NULL;
handle->icethread = g_thread_try_new("ice thread", &janus_ice_thread, handle, &error);
if(error != NULL) {
/* FIXME We should clear some resources... */
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the ICE thread...\n", error->code, error->message ? error->message : "??");
return -1;
}
handle->queued_packets = g_async_queue_new();
handle->send_thread = g_thread_new("ice send thread", &janus_ice_send_thread, handle);
handle->send_thread = g_thread_try_new("ice send thread", &janus_ice_send_thread, handle, &error);
if(error != NULL) {
/* FIXME We should clear some resources... */
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the ICE send thread...\n", error->code, error->message ? error->message : "??");
return -1;
}
/* Note: NICE_COMPATIBILITY_RFC5245 is only available in more recent versions of libnice */
handle->agent = nice_agent_new(handle->icectx, NICE_COMPATIBILITY_DRAFT19);
/* Any STUN server to use? */
Expand Down
37 changes: 28 additions & 9 deletions janus.c
Original file line number Diff line number Diff line change
Expand Up @@ -2562,7 +2562,7 @@ int janus_wss_onopen(libwebsock_client_state *state) {
/* Something went wrong... */
g_free(ws_client);
janus_mutex_unlock(&wss_mutex);
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch pool thread...\n", error->code, error->message ? error->message : "??");
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the pool thread...\n", error->code, error->message ? error->message : "??");
libwebsock_close(state);
return 0;
}
Expand All @@ -2571,7 +2571,15 @@ int janus_wss_onopen(libwebsock_client_state *state) {
ws_client->responses = g_async_queue_new();
ws_client->sessions = NULL;
/* Create a thread for notifications related to this session as well */
ws_client->thread = g_thread_new("wss_client", &janus_wss_thread, ws_client);
ws_client->thread = g_thread_try_new("wss_client", &janus_wss_thread, ws_client, &error);
if(error != NULL) {
/* Something went wrong... */
g_free(ws_client);
janus_mutex_unlock(&wss_mutex);
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the notifications thread...\n", error->code, error->message ? error->message : "??");
libwebsock_close(state);
return 0;
}
ws_client->destroy = 0;
janus_mutex_init(&ws_client->mutex);

Expand Down Expand Up @@ -4280,14 +4288,24 @@ gint main(int argc, char *argv[])
rmq_client->sessions = NULL;
rmq_client->responses = g_async_queue_new();
rmq_client->destroy = 0;
rmq_client->in_thread = g_thread_new("rmq_in_thread", &janus_rmq_in_thread, rmq_client);
rmq_client->out_thread = g_thread_new("rmq_out_thread", &janus_rmq_out_thread, rmq_client);
/* rabbitmq-c is single threaded, we need a thread pool to serve requests */
GError *error = NULL;
rmq_client->in_thread = g_thread_try_new("rmq_in_thread", &janus_rmq_in_thread, rmq_client, &error);
if(error != NULL) {
/* Something went wrong... */
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQ incoming thread...\n", error->code, error->message ? error->message : "??");
exit(1); /* FIXME Should we really give up? */
}
rmq_client->out_thread = g_thread_try_new("rmq_out_thread", &janus_rmq_out_thread, rmq_client, &error);
if(error != NULL) {
/* Something went wrong... */
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the RabbitMQ outgoing thread...\n", error->code, error->message ? error->message : "??");
exit(1); /* FIXME Should we really give up? */
}
/* rabbitmq-c is single threaded, we need a thread pool to serve requests */
rmq_client->thread_pool = g_thread_pool_new(janus_rmq_task, rmq_client, -1, FALSE, &error);
if(error != NULL) {
/* Something went wrong... */
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch pool thread...\n", error->code, error->message ? error->message : "??");
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to launch the pool thread...\n", error->code, error->message ? error->message : "??");
exit(1); /* FIXME Should we really give up? */
}
janus_mutex_init(&rmq_client->mutex);
Expand Down Expand Up @@ -4315,9 +4333,10 @@ gint main(int argc, char *argv[])
/* Start the sessions watchdog */
GMainContext *watchdog_context = g_main_context_new();
GMainLoop *watchdog_loop = g_main_loop_new(watchdog_context, FALSE);
GThread *watchdog = g_thread_new("watchdog", &janus_sessions_watchdog, watchdog_loop);
if(!watchdog) {
JANUS_LOG(LOG_FATAL, "Couldn't start sessions watchdog...\n");
GError *error = NULL;
GThread *watchdog = g_thread_try_new("watchdog", &janus_sessions_watchdog, watchdog_loop, &error);
if(error != NULL) {
JANUS_LOG(LOG_FATAL, "Got error %d (%s) trying to start sessions watchdog...\n", error->code, error->message ? error->message : "??");
exit(1);
}

Expand Down
94 changes: 71 additions & 23 deletions plugins/janus_audiobridge.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ janus_plugin *create(void) {


/* Useful stuff */
static int initialized = 0, stopping = 0;
static gint initialized = 0, stopping = 0;
static janus_callbacks *gateway = NULL;
static GThread *handler_thread;
static GThread *watchdog;
static void *janus_audiobridge_handler(void *data);
static void janus_audiobridge_relay_rtp_packet(gpointer data, gpointer user_data);
static void *janus_audiobridge_mixer_thread(void *data);
Expand Down Expand Up @@ -153,6 +154,8 @@ typedef struct janus_audiobridge_room {
FILE *recording; /* File to record the room into */
gboolean destroy; /* Value to flag the room for destruction */
GHashTable *participants; /* Map of participants */
GThread *thread; /* Mixer thread for this room */
gint64 destroyed; /* When this room has been destroyed */
janus_mutex mutex; /* Mutex to lock this room instance */
} janus_audiobridge_room;
static GHashTable *rooms;
Expand Down Expand Up @@ -258,7 +261,7 @@ void *janus_audiobridge_watchdog(void *data);
void *janus_audiobridge_watchdog(void *data) {
JANUS_LOG(LOG_INFO, "AudioBridge watchdog started\n");
gint64 now = 0;
while(initialized && !stopping) {
while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
janus_mutex_lock(&sessions_mutex);
/* Iterate on all the sessions */
now = janus_get_monotonic_time();
Expand All @@ -267,7 +270,7 @@ void *janus_audiobridge_watchdog(void *data) {
JANUS_LOG(LOG_VERB, "Checking %d old sessions\n", g_list_length(old_sessions));
while(sl) {
janus_audiobridge_session *session = (janus_audiobridge_session *)sl->data;
if(!session || !initialized || stopping) {
if(!session) {
sl = sl->next;
continue;
}
Expand All @@ -294,7 +297,7 @@ void *janus_audiobridge_watchdog(void *data) {

/* Plugin implementation */
int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) {
if(stopping) {
if(g_atomic_int_get(&stopping)) {
/* Still stopping from before */
return -1;
}
Expand Down Expand Up @@ -373,13 +376,20 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) {
audiobridge->recording = NULL;
audiobridge->destroy = 0;
audiobridge->participants = g_hash_table_new(NULL, NULL);
audiobridge->destroyed = 0;
janus_mutex_init(&audiobridge->mutex);
janus_mutex_lock(&rooms_mutex);
g_hash_table_insert(rooms, GUINT_TO_POINTER(audiobridge->room_id), audiobridge);
janus_mutex_unlock(&rooms_mutex);
JANUS_LOG(LOG_VERB, "Created audiobridge: %"SCNu64" (%s, %s, secret: %s)\n", audiobridge->room_id, audiobridge->room_name, audiobridge->is_private ? "private" : "public", audiobridge->room_secret ? audiobridge->room_secret : "no secret");
/* We need a thread for the mix */
g_thread_new("audiobridge mixer thread", &janus_audiobridge_mixer_thread, audiobridge);
GError *error = NULL;
audiobridge->thread = g_thread_try_new("audiobridge mixer thread", &janus_audiobridge_mixer_thread, audiobridge, &error);
if(error != NULL) {
/* FIXME We should clear some resources... */
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the mixer thread...\n", error->code, error->message ? error->message : "??");
} else {
janus_mutex_lock(&rooms_mutex);
g_hash_table_insert(rooms, GUINT_TO_POINTER(audiobridge->room_id), audiobridge);
janus_mutex_unlock(&rooms_mutex);
}
cat = cat->next;
}
/* Done */
Expand All @@ -399,34 +409,39 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) {
}
janus_mutex_unlock(&rooms_mutex);

initialized = 1;
g_atomic_int_set(&initialized, 1);

GError *error = NULL;
/* Start the sessions watchdog */
GThread *watchdog = g_thread_new("abridge watchdog", &janus_audiobridge_watchdog, NULL);
if(!watchdog) {
JANUS_LOG(LOG_FATAL, "Couldn't start AudioBridge watchdog...\n");
watchdog = g_thread_try_new("abridge watchdog", &janus_audiobridge_watchdog, NULL, &error);
if(error != NULL) {
g_atomic_int_set(&initialized, 0);
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the AudioBridge watchdog thread...\n", error->code, error->message ? error->message : "??");
return -1;
}
/* Launch the thread that will handle incoming messages */
GError *error = NULL;
handler_thread = g_thread_try_new("janus audiobridge handler", janus_audiobridge_handler, NULL, &error);
if(error != NULL) {
initialized = 0;
/* Something went wrong... */
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch thread...\n", error->code, error->message ? error->message : "??");
g_atomic_int_set(&initialized, 0);
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the AudioBridge handler thread...\n", error->code, error->message ? error->message : "??");
return -1;
}
JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_AUDIOBRIDGE_NAME);
return 0;
}

void janus_audiobridge_destroy(void) {
if(!initialized)
if(!g_atomic_int_get(&initialized))
return;
stopping = 1;
g_atomic_int_set(&stopping, 1);
if(handler_thread != NULL) {
g_thread_join(handler_thread);
handler_thread = NULL;
}
if(watchdog != NULL) {
g_thread_join(watchdog);
watchdog = NULL;
}
handler_thread = NULL;
/* FIXME We should destroy the sessions cleanly */
janus_mutex_lock(&sessions_mutex);
g_hash_table_destroy(sessions);
Expand All @@ -437,7 +452,8 @@ void janus_audiobridge_destroy(void) {
g_async_queue_unref(messages);
messages = NULL;
sessions = NULL;
initialized = 0;
g_atomic_int_set(&initialized, 0);
g_atomic_int_set(&stopping, 0);
JANUS_LOG(LOG_INFO, "%s destroyed!\n", JANUS_AUDIOBRIDGE_NAME);
}

Expand Down Expand Up @@ -707,11 +723,29 @@ struct janus_plugin_result *janus_audiobridge_handle_message(janus_plugin_sessio
audiobridge->recording = NULL;
audiobridge->destroy = 0;
audiobridge->participants = g_hash_table_new(NULL, NULL);
audiobridge->destroyed = 0;
janus_mutex_init(&audiobridge->mutex);
g_hash_table_insert(rooms, GUINT_TO_POINTER(audiobridge->room_id), audiobridge);
JANUS_LOG(LOG_VERB, "Created audiobridge: %"SCNu64" (%s, %s, secret: %s)\n", audiobridge->room_id, audiobridge->room_name, audiobridge->is_private ? "private" : "public", audiobridge->room_secret ? audiobridge->room_secret : "no secret");
/* We need a thread for the mix */
g_thread_new("audiobridge mixer thread", &janus_audiobridge_mixer_thread, audiobridge);
GError *error = NULL;
audiobridge->thread = g_thread_try_new("audiobridge mixer thread", &janus_audiobridge_mixer_thread, audiobridge, &error);
if(error != NULL) {
janus_mutex_unlock(&rooms_mutex);
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the mixer thread...\n", error->code, error->message ? error->message : "??");
error_code = JANUS_AUDIOBRIDGE_ERROR_UNKNOWN_ERROR;
g_snprintf(error_cause, 512, "Got error %d (%s) trying to launch the mixer thread", error->code, error->message ? error->message : "??");
g_free(audiobridge->room_name);
g_free(audiobridge->room_secret);
g_free(audiobridge->record_file);
g_hash_table_destroy(audiobridge->participants);
g_free(audiobridge);
goto error;
} else {
janus_mutex_lock(&rooms_mutex);
g_hash_table_insert(rooms, GUINT_TO_POINTER(audiobridge->room_id), audiobridge);
janus_mutex_unlock(&rooms_mutex);
}
/* Show updated rooms list */
GHashTableIter iter;
gpointer value;
Expand Down Expand Up @@ -804,7 +838,11 @@ struct janus_plugin_result *janus_audiobridge_handle_message(janus_plugin_sessio
gateway->end_session(p->session->handle);
}
}
JANUS_LOG(LOG_VERB, "Waiting for the mixer thread to complete...\n");
audiobridge->destroyed = janus_get_monotonic_time();
g_thread_join(audiobridge->thread);
/* Done */
JANUS_LOG(LOG_VERB, "Audiobridge room destroyed\n");
janus_plugin_result *result = janus_plugin_result_new(JANUS_PLUGIN_OK, response_text);
g_free(response_text);
return result;
Expand Down Expand Up @@ -1036,7 +1074,7 @@ static void *janus_audiobridge_handler(void *data) {
return NULL;
}
json_t *root = NULL;
while(initialized && !stopping) {
while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
if(!messages || (msg = g_async_queue_try_pop(messages)) == NULL) {
usleep(50000);
continue;
Expand Down Expand Up @@ -1759,12 +1797,14 @@ static void *janus_audiobridge_mixer_thread(void *data) {
janus_audiobridge_rtp_relay_packet *outpkt = calloc(1, sizeof(janus_audiobridge_rtp_relay_packet));
if(outpkt == NULL) {
JANUS_LOG(LOG_FATAL, "Memory error!\n");
g_thread_unref(g_thread_self());
return NULL;
}
outpkt->data = (rtp_header *)calloc(BUFFER_SAMPLES, sizeof(unsigned char));
if(outpkt->data == NULL) {
JANUS_LOG(LOG_FATAL, "Memory error!\n");
g_free(outpkt);
g_thread_unref(g_thread_self());
return NULL;
}
unsigned char *payload = (unsigned char *)outpkt->data;
Expand All @@ -1774,7 +1814,7 @@ static void *janus_audiobridge_mixer_thread(void *data) {
gint32 ts = 0;
/* Loop */
int i=0;
while(!stopping) { /* FIXME We need a per-mountpoint watchdog as well */
while(!g_atomic_int_get(&stopping) && audiobridge->destroyed == 0) { /* FIXME We need a per-room watchdog as well */
/* See if it's time to prepare a frame */
gettimeofday(&now, NULL);
d_s = now.tv_sec - before.tv_sec;
Expand Down Expand Up @@ -1880,6 +1920,14 @@ static void *janus_audiobridge_mixer_thread(void *data) {
if(audiobridge->recording)
fclose(audiobridge->recording);
JANUS_LOG(LOG_VERB, "Leaving mixer thread for room %"SCNu64" (%s)...\n", audiobridge->room_id, audiobridge->room_name);

/* Free resources */
g_free(audiobridge->room_name);
g_free(audiobridge->room_secret);
g_free(audiobridge->record_file);
g_hash_table_destroy(audiobridge->participants);
g_free(audiobridge);

return NULL;
}

Expand Down
4 changes: 2 additions & 2 deletions plugins/janus_echotest.c
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,14 @@ int janus_echotest_init(janus_callbacks *callback, const char *config_path) {
watchdog = g_thread_try_new("etest watchdog", &janus_echotest_watchdog, NULL, &error);
if(error != NULL) {
g_atomic_int_set(&initialized, 0);
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch thread...\n", error->code, error->message ? error->message : "??");
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the EchoTest watchdog thread...\n", error->code, error->message ? error->message : "??");
return -1;
}
/* Launch the thread that will handle incoming messages */
handler_thread = g_thread_try_new("janus echotest handler", janus_echotest_handler, NULL, &error);
if(error != NULL) {
g_atomic_int_set(&initialized, 0);
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch thread...\n", error->code, error->message ? error->message : "??");
JANUS_LOG(LOG_ERR, "Got error %d (%s) trying to launch the EchoTest handler thread...\n", error->code, error->message ? error->message : "??");
return -1;
}
JANUS_LOG(LOG_INFO, "%s initialized!\n", JANUS_ECHOTEST_NAME);
Expand Down
Loading

0 comments on commit dbf63b6

Please sign in to comment.