Skip to content

Commit

Permalink
dp: wait till dp thread stops in thread cancel
Browse files Browse the repository at this point in the history
There's a race posiibility in DP when stopping a pipeline:
 - dp starts processing
 - incoming IPC - pause the pipeline. IPC has higher priority
than DP, so DP is preempted
 - pipeline is stopping, module "reset" is called. Some of resources
may be freed here
 - when IPC finishes, DP thread continues processing

Sollution: wait for DP to finish processing and terminate DP thread
before calling "reset" method in module

To do this:
1) call "thread cancel" before calling "reset"reset
2) modify "thread cancel" to mark the thread to terminate and
execute k_thread_join()
3) terminated thread cannot be restarted, so thread creation must be
moved from "init" to "schedule". There's no need to reallocate memory
zephyr guarantees that resources may be re-used when a thread
is terminated.

Signed-off-by: Marcin Szkudlinski <marcin.szkudlinski@intel.com>
  • Loading branch information
marcinszkudlinski authored and kv2019i committed Jun 20, 2024
1 parent a209d8f commit 34957e7
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 48 deletions.
3 changes: 3 additions & 0 deletions src/audio/module_adapter/module/generic.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ int module_reset(struct processing_module *mod)
if (md->state < MODULE_IDLE)
return 0;
#endif
/* cancel task if DP task*/
if (mod->dev->ipc_config.proc_domain == COMP_PROCESSING_DOMAIN_DP && mod->dev->task)
schedule_task_cancel(mod->dev->task);
if (ops->reset) {
ret = ops->reset(mod);
if (ret) {
Expand Down
8 changes: 1 addition & 7 deletions src/audio/pipeline/pipeline-schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ DECLARE_SOF_UUID("dp-task", dp_task_uuid, 0xee755917, 0x96b9, 0x4130,
*/
#define TASK_DP_STACK_SIZE 8192

/**
* \brief a priority of the DP threads in the system.
*/
#define ZEPHYR_DP_THREAD_PRIORITY (CONFIG_NUM_PREEMPT_PRIORITIES - 2)

#endif /* CONFIG_ZEPHYR_DP_SCHEDULER */

static void pipeline_schedule_cancel(struct pipeline *p)
Expand Down Expand Up @@ -404,8 +399,7 @@ int pipeline_comp_dp_task_init(struct comp_dev *comp)
&ops,
mod,
comp->ipc_config.core,
TASK_DP_STACK_SIZE,
ZEPHYR_DP_THREAD_PRIORITY);
TASK_DP_STACK_SIZE);
if (ret < 0)
return ret;
}
Expand Down
4 changes: 1 addition & 3 deletions src/include/sof/schedule/dp_schedule.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,13 @@ int scheduler_dp_init(void);
* \param[in] mod pointer to the module to be run
* \param[in] core CPU the thread should run on
* \param[in] stack_size size of stack for a zephyr task
* \param[in] task_priority priority of the zephyr task
*/
int scheduler_dp_task_init(struct task **task,
const struct sof_uuid_entry *uid,
const struct task_ops *ops,
struct processing_module *mod,
uint16_t core,
size_t stack_size,
uint32_t task_priority);
size_t stack_size);

/**
* \brief Extract information about scheduler's tasks
Expand Down
97 changes: 59 additions & 38 deletions src/schedule/zephyr_dp_schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,22 @@ DECLARE_SOF_UUID("dp-schedule", dp_sched_uuid, 0x87858bc2, 0xbaa9, 0x40b6,

DECLARE_TR_CTX(dp_tr, SOF_UUID(dp_sched_uuid), LOG_LEVEL_INFO);

/**
* \brief a priority of the DP threads in the system.
*/
#define ZEPHYR_DP_THREAD_PRIORITY (CONFIG_NUM_PREEMPT_PRIORITIES - 2)

struct scheduler_dp_data {
struct list_item tasks; /* list of active dp tasks */
struct task ll_tick_src; /* LL task - source of DP tick */
};

struct task_dp_pdata {
k_tid_t thread_id; /* zephyr thread ID */
struct k_thread thread; /* memory space for a thread */
uint32_t deadline_clock_ticks; /* dp module deadline in Zephyr ticks */
k_thread_stack_t __sparse_cache *p_stack; /* pointer to thread stack */
size_t stack_size; /* size of the stack in bytes */
struct k_sem sem; /* semaphore for task scheduling */
struct processing_module *mod; /* the module to be scheduled */
uint32_t ll_cycles_to_start; /* current number of LL cycles till delayed start */
Expand Down Expand Up @@ -267,6 +274,8 @@ static int scheduler_dp_task_cancel(void *data, struct task *task)
{
unsigned int lock_key;
struct scheduler_dp_data *dp_sch = (struct scheduler_dp_data *)data;
struct task_dp_pdata *pdata = task->priv_data;


/* this is asyn cancel - mark the task as canceled and remove it from scheduling */
lock_key = scheduler_dp_lock();
Expand All @@ -278,8 +287,14 @@ static int scheduler_dp_task_cancel(void *data, struct task *task)
if (list_is_empty(&dp_sch->tasks))
schedule_task_cancel(&dp_sch->ll_tick_src);

/* if the task is waiting on a semaphore - let it run and self-terminate */
k_sem_give(&pdata->sem);
scheduler_dp_unlock(lock_key);

/* wait till the task has finished, if there was any task created */
if (pdata->thread_id)
k_thread_join(pdata->thread_id, K_FOREVER);

return 0;
}

Expand All @@ -289,10 +304,17 @@ static int scheduler_dp_task_free(void *data, struct task *task)

scheduler_dp_task_cancel(data, task);

/* abort the execution of the thread */
k_thread_abort(pdata->thread_id);
/* the thread should be terminated at this moment,
* abort is safe and will ensure no use after free
*/
if (pdata->thread_id) {
k_thread_abort(pdata->thread_id);
pdata->thread_id = NULL;
}

/* free task stack */
rfree((__sparse_force void *)pdata->p_stack);
pdata->p_stack = NULL;

/* all other memory has been allocated as a single malloc, will be freed later by caller */
return 0;
Expand Down Expand Up @@ -345,17 +367,17 @@ static void dp_thread_fn(void *p1, void *p2, void *p3)
}
}

/* call task_complete */
if (task->state == SOF_TASK_STATE_COMPLETED) {
/* call task_complete out of lock, it may eventually call schedule again */
scheduler_dp_unlock(lock_key);
task_complete(task);
} else {
scheduler_dp_unlock(lock_key);
}
};
if (task->state == SOF_TASK_STATE_COMPLETED ||
task->state == SOF_TASK_STATE_CANCEL)
break; /* exit the while loop, terminate the thread */

/* never be here */
scheduler_dp_unlock(lock_key);
}

scheduler_dp_unlock(lock_key);
/* call task_complete */
if (task->state == SOF_TASK_STATE_COMPLETED)
task_complete(task);
}

static int scheduler_dp_task_shedule(void *data, struct task *task, uint64_t start,
Expand All @@ -365,6 +387,7 @@ static int scheduler_dp_task_shedule(void *data, struct task *task, uint64_t sta
struct task_dp_pdata *pdata = task->priv_data;
unsigned int lock_key;
uint64_t deadline_clock_ticks;
int ret;

lock_key = scheduler_dp_lock();

Expand All @@ -375,6 +398,22 @@ static int scheduler_dp_task_shedule(void *data, struct task *task, uint64_t sta
return -EINVAL;
}

/* create a zephyr thread for the task */
pdata->thread_id = k_thread_create(&pdata->thread, (__sparse_force void *)pdata->p_stack,
pdata->stack_size, dp_thread_fn, task, NULL, NULL,
ZEPHYR_DP_THREAD_PRIORITY, K_USER, K_FOREVER);

/* pin the thread to specific core */
ret = k_thread_cpu_pin(pdata->thread_id, task->core);
if (ret < 0) {
tr_err(&dp_tr, "zephyr_dp_task_init(): zephyr task pin to core failed");
goto err;
}

/* start the thread, it should immediately stop at a semaphore, so clean it */
k_sem_reset(&pdata->sem);
k_thread_start(pdata->thread_id);

/* if there's no DP tasks scheduled yet, run ll tick source task */
if (list_is_empty(&dp_sch->tasks))
schedule_task(&dp_sch->ll_tick_src, 0, 0);
Expand All @@ -396,6 +435,12 @@ static int scheduler_dp_task_shedule(void *data, struct task *task, uint64_t sta

tr_dbg(&dp_tr, "DP task scheduled with period %u [us]", (uint32_t)period);
return 0;

err:
/* cleanup - unlock and free all allocated resources */
scheduler_dp_unlock(lock_key);
k_thread_abort(pdata->thread_id);
return ret;
}

static struct scheduler_ops schedule_dp_ops = {
Expand Down Expand Up @@ -436,19 +481,16 @@ int scheduler_dp_task_init(struct task **task,
const struct task_ops *ops,
struct processing_module *mod,
uint16_t core,
size_t stack_size,
uint32_t task_priority)
size_t stack_size)
{
void __sparse_cache *p_stack = NULL;

/* memory allocation helper structure */
struct {
struct task task;
struct task_dp_pdata pdata;
struct k_thread thread;
} *task_memory;

k_tid_t thread_id = NULL;
int ret;

/* must be called on the same core the task will be binded to */
Expand Down Expand Up @@ -478,23 +520,6 @@ int scheduler_dp_task_init(struct task **task,
goto err;
}

/* create a zephyr thread for the task */
thread_id = k_thread_create(&task_memory->thread, (__sparse_force void *)p_stack,
stack_size, dp_thread_fn, &task_memory->task, NULL, NULL,
task_priority, K_USER, K_FOREVER);
if (!thread_id) {
ret = -EFAULT;
tr_err(&dp_tr, "zephyr_dp_task_init(): zephyr thread create failed");
goto err;
}
/* pin the thread to specific core */
ret = k_thread_cpu_pin(thread_id, core);
if (ret < 0) {
ret = -EFAULT;
tr_err(&dp_tr, "zephyr_dp_task_init(): zephyr task pin to core failed");
goto err;
}

/* internal SOF task init */
ret = schedule_task_init(&task_memory->task, uid, SOF_SCHEDULE_DP, 0, ops->run,
mod, core, 0);
Expand All @@ -514,19 +539,15 @@ int scheduler_dp_task_init(struct task **task,

/* success, fill the structures */
task_memory->task.priv_data = &task_memory->pdata;
task_memory->pdata.thread_id = thread_id;
task_memory->pdata.p_stack = p_stack;
task_memory->pdata.stack_size = stack_size;
task_memory->pdata.mod = mod;
*task = &task_memory->task;

/* start the thread - it will immediately stop at a semaphore */
k_thread_start(thread_id);

return 0;
err:
/* cleanup - free all allocated resources */
if (thread_id)
k_thread_abort(thread_id);
rfree((__sparse_force void *)p_stack);
rfree(task_memory);
return ret;
Expand Down

0 comments on commit 34957e7

Please sign in to comment.