-
Notifications
You must be signed in to change notification settings - Fork 2
AudYoFlo: Behavior: Dynamic Birth and Death of Devices
In many cases, the AudYoFlo media engine must be able to handle incoming and outgoing data from dynamic input/output streams. A phone call is a good example for a dynamic input/output stream: as long as nobody calls, there is no incoming and outgoing data. However, if a call starts to be connected, the corresponding input/output streams should be integrated into the overall processing scheme as soon as the data starts to flow. And also, multiple input output streams should be able to be handled in parallel.
In these kinds of cases, a good approach is to let devices of a technology be born and killed given signals from a specific underlying technology. Often, the signals to indicate new streams occur asynchronously - hence, the the origin of spawning devices lies in a thread other than the AudYoFlo main tread. This requires special attention.
In this chapter, the typical procedures to integrate spawning devices into the signal processing chain for processing and their way back to the initial state will be explained in detail in the following. The description will start from the very first indication to report a new stream all the way to the last indication that indicates that the device has been killed and is no longer present in the signal processing main functional graph.
Note that the dynamic functionality is closely coupled to an implementation of the class of interface type IjvxAutomation
which is in detail described here.
The steps to be principally taken to handle dynamic media streams by spawning new devices are the following:
- Step (1): Accept an event to indicate that a new stream is pending - typically reported within a dedicated thread otther that the main thread.
- Step (2): Map the pending stream indication to be logically linked to a new device in the main thread.
- Step (3): Activate the device in the main thread.
- Step (4): Connect the processing chain with the new device as the master.
- Step (5): Test the processing chain to distribute the processing parameters towards all involved processing components.
- Step (6): Trigger the sequencer to start and run data processing.
The data processing operates as long as the i/o process is connected. Typically, another event indicates that data processing is over at a later time. If that is the case, the following steps are required to allow the new device to be deactivated and to be removed to achieve the initial state.
- Step (7): Accept incoming event to indicate that the i/o streaming shall no longer be active in a dedicated thread other than he main thread.
- Step (8): Stop processing of the data processing chain.
- Step (9): Disconnect the data processing chain.
- Step (10): Deactivate the device.
- Step (11): Cleanup the device and remove it from the devices list in the technology component.
The description of the steps in detail will be subject to the remainder of this chapter. We will guide the reader based on code fragments taken mostly from the example of the technology jvxSPTSocketSignal
which is located in the folder
<AudYoFlo>/sources/jvxComponents/jvxSignalProcessingTechnologies/jvxSpTSocketSignal
The following logic steps as listed previously, Step (1)-(6), are involved if a new data stream is reported to be started:
Typically, incoming i/o streams are detected in threads which operate outside of the main thread. Once this happens, a device shall be created to logically map the i/o stream. Each i/o stream is stored in a list of containers in the technology component, and the logic access to the i/o stream is granted by exposing a device. This is demonstrated by the following figure:
In the example application as mentioned in the beginning, the first indication of an incoming i/o stream is reported whenever there is incoming data on the socket which is operated in a thread bound to the message receive loop of the sub module CjvxSocketsServer
waiting for incoming connections. In that case the following callback is triggered:
jvxErrorType
CjvxSignalProcessingTechnologySocket::report_server_connect(const char* description, IjvxSocketsConnection* newConnection)
{
jvxCBitField prio = 0;
std::string nm = deviceNamePrefix + "#" + jvx_size2String(uId);
uId++;
CjvxSignalProcessingDeviceSocket* newDevice =
new CjvxSignalProcessingDeviceSocket(
nm.c_str(), false,
description,
_common_set.theFeatureClass,
_common_set.theModuleName.c_str(),
JVX_COMPONENT_ACCESS_SUB_COMPONENT,
(jvxComponentType)(_common_set.theComponentType.tp + 1),
"", NULL);
newDevice->init_connection(newConnection, this);
JVX_LOCK_MUTEX(safeAccessConnections);
pendig_activate[newConnection] = newDevice;
JVX_UNLOCK_MUTEX(safeAccessConnections);
return JVX_NO_ERROR;
}
We already allocate a device in this thread but we store it in a private list pending_active
at this moment. Naturally, the allocation of the device must be secured by a mutex since the list will be accessed from within the dedicated as well as from within the main thread in the future. The devices in the pending_active
list are not yet ready for exposure.
After the first allocation steps, in the object to represent an i/o stream in the jvxSpTSocketSignal
project, additional network traffic and handshake communication happens to establish the underlying network connection for the i/o stream. Finally, a callback report_new_connection
indicates that the connection is fully established. At this moment, the new device can be reported to be available for operation in the main thread in AudYoFlo. Just for the sake of completeness: the thread in which report_new_connection
is triggered is bound to the message receive thread of the sub module CjvxSocketsServer
which waits for incoming connections on a socket. The report_new_connection
is the member function of the interface class IjvxSocketsServer_report
.
All object state switches and other AudYoFlo specific issues are required to happen within the (single) main thread. So far, the incoming i/o streams, however, do not yet live in the main thread. Therefore, in the callback, the event of the new connection is mapped to the main thread by emitting an object of type CjvxReportCommandRequest_rm
. Emitting this object will trigger a callback which is re-scheduled to return in the main thread:
jvxErrorType
CjvxSignalProcessingTechnologySocket::report_new_connection(IjvxSocketsConnection* connection)
{
// Reschedule this information into the main thread
CjvxReportCommandRequest_rm theRequest(_common_set.theComponentType, JVX_SCHEDULE_ID_NEW_CONNECTION,
jvxReportCommandBroadcastType::JVX_REPORT_COMMAND_BROADCAST_NO_FURTHER,
connection);
_request_command(theRequest);
return JVX_NO_ERROR;
}
In this re-schedule operation the connection pointer is passed as an argument that will transfer from the dedicated to the main thread. The id JVX_SCHEDULE_ID_NEW_CONNECTION
can be used to allow multiple re-schedule operation which is not necessary here. The callback to actually run the re-scheduled operation is function schedule_main_loop
. The re-scheduling is outlined in the following figure.
The callback schedule_main_loop(jvxSize rescheduleId, jvxHandle* user_data)
is part of the technology class but the call happens in the main thread now. The callback receives the passed id and the connection pointer in arguments rescheduleId
and user_data
, respectively. At this time, the pending device is moved into the list of regular devices by adding an entry to the device list of the technology:
jvxErrorType
CjvxSignalProcessingTechnologySocket::schedule_main_loop(jvxSize rescheduleId, jvxHandle* user_data)
{
std::string ident;
jvxErrorType res = JVX_ERROR_ELEMENT_NOT_FOUND;
switch (rescheduleId)
{
case JVX_SCHEDULE_ID_NEW_CONNECTION:
{
// New Connection established
JVX_LOCK_MUTEX(safeAccessConnections);
auto elm = pendig_activate.begin();
for (; elm != pendig_activate.end();elm++)
{
if (elm->second->theSMachine.localState == jvxAudioSocketsState::JVX_AUDIOSTATE_XCHANGE_INIT)
{
jvxBool nmSet = false;
jvxSize cnt = 0;
std::string uniqueName;
jvxAudioSocketParameters params;
elm->second->data_slave(¶ms);
uniqueName = params.nameConnection;
while (1)
{
uniqueName = params.nameConnection;
if (cnt != 0)
{
uniqueName += "<" + jvx_size2String(cnt) + ">";
}
cnt++;
jvxBool foundIt = false;
auto elmD = _common_tech_set.lstDevices.begin();
for (;elmD != _common_tech_set.lstDevices.end(); elmD++)
{
jvxApiString astr;
elmD->hdlDev->description(&astr);
if(astr.std_str() == uniqueName)
{
foundIt = true;
break;
}
}
if (!foundIt)
{
params.nameConnection = uniqueName;
break;
}
}
res = elm->second->accept_slave(¶ms);
assert(res == JVX_NO_ERROR); // <- this must be observed :-)
oneDeviceWrapper elmAdd;
elmAdd.hdlDev = static_cast<IjvxDevice*>(elm->second);
ident = elm->second->_common_set_min.theDescription;
_common_tech_set.lstDevices.push_back(elmAdd);
}
}
JVX_UNLOCK_MUTEX(safeAccessConnections);
report_device_was_born(ident);
res = JVX_NO_ERROR;
}
break;
}
return res;
}
The process involves adding the device pointer from the pending_active
list into the list stored in variable _common_tech_set.lstDevices
. From here, the AudYoFlo host now can access the exposed devices via the technology/device API.
Even though principally accessible, in the final step, the call of member function report_device_was_born(ident)
informs the system that there is a new device available. The call to report_device_was_born
emits an object of type CjvxReportCommandRequest
to the host with request type jvxReportCommandRequest::JVX_REPORT_COMMAND_REQUEST_UPDATE_AVAILABLE_COMPONENT_LIST
.
A user could at this point activate the new device manually. However, in most cases, the automation should be used from here on to procede with all steps required to bring the new device into the processing state. Therefore, in report_device_was_born
another object of type CjvxReportCommandRequest_id
is emitted with the request type jvxReportCommandRequest::JVX_REPORT_COMMAND_REQUEST_REPORT_BORN_SUBDEVICE
. This emission, however, with the object of type IjvxAutomation
, has the clear target.
The new born device hence shows up in the implementation of the class for interface IjvxAutomation
, in member function
jvxErrorType IjvxReportSystem::request_command(const CjvxReportCommandRequest& request);
The implementation of the class IjvxAutomation
in most cases involves the class CjvxAutomation
. This class is defined in the library jvx-automate
and provides default implementations. In the case of the report of a new born device, the class maps the request_command
call to a call of the function
jvxErrorType handle_report_ident(jvxReportCommandRequest req,
jvxComponentIdentification tp, const std::string& ident,
CjvxAutomation::callSpecificParameters* params);
In that function, at the end, the call to the implementation of handle_report_ident
must be called,
jvxErrorType
classDerived::handle_report_ident(jvxReportCommandRequest req,
jvxComponentIdentification tp, const std::string& ident,
CjvxAutomation::callSpecificParameters* params)
{
// ===================================================================
...
// ===================================================================
return CjvxAutomation::handle_report_ident(req, tp, ident, params);
}
The involved default implementation automatically activates the new born device.
If the lifetime of a device ends, all active structures must be removed. Typically, again, this is triggered by an event in a thread other than the main thread. What will be required first is that the condition of the end-of-lifecycle will bring the device in a state in which there is no more real data io happening. An approach could be to set a flag to indicate the state of shutting down the device.
Then, a command request is emitted to synchronize the event that the device will no longer be active in the main thread. By emitting this event, the next steps will be under control of the object of type IjvxAutomation
. In the mentioned example, the indication about the lifecycle of the device is handled as follows:
jvxErrorType
CjvxSignalProcessingTechnologySocket::report_server_disconnect(const char* description, IjvxSocketsConnection* newConnection)
{
std::string ident;
JVX_LOCK_MUTEX(safeAccessConnections);
auto elm = pendig_activate.find(newConnection);
if (elm != pendig_activate.end())
{
assert(elm->second->connection);
ident = elm->second->_common_set_min.theDescription;
// Indicate that this connection is no longer in use after this function call
elm->second->terminate_connection();
pendig_activate.erase(elm);
}
JVX_UNLOCK_MUTEX(safeAccessConnections);
report_device_died(ident);
return JVX_NO_ERROR;
}
The last call to the function report_device_died(ident)
will emit the event to trigger the automation object to take care of all following steps.
When the class CjvxAutomation
sees a device to be removed, it finds the list of process tasks the device was involved with as obtained at startup time. Each task is flagged to be completed immediately and the sequencer is triggered to run the task immediately by emitting an object with request type jvxReportCommandRequest::JVX_REPORT_COMMAND_REQUEST_TRIGGER_SEQUENCER_IMMEDIATE
. This object has the immediate flag set and will force the sequencer to run immediately until the task has been completed. After being run to complete, the task itself is removed from the list of open tasks. Since the task was initially created to startup the processing chain, once the immediate sequencer operation is done the device is still active but no longer in operation.
Disconnecting the Processing Chain, Deactivating the Device and Removal from the Technology Devices List
In the next step, the device will be deactivated. While doing so, the data connection sub system will automatically disconnect the connections that the device was connected to earlier. In the last step, the device will be unselected. When doing so, the member function return_device(IjvxDevice* dev)
will be called. Here is now the right place to remove all structures which were allocated before:
jvxErrorType
CjvxSignalProcessingTechnologySocket::return_device(IjvxDevice* dev)
{
jvxErrorType res = CjvxTemplateTechnology<CjvxSignalProcessingDeviceSocket>::return_device(dev);
if(res == JVX_NO_ERROR)
{
jvxBool stillWaiting = true;
while (stillWaiting)
{
jvxBool stillAlive = false;
JVX_LOCK_MUTEX(safeAccessConnections);
auto elm = pendig_activate.begin();
for (; elm != pendig_activate.end(); elm++)
{
if (static_cast<IjvxDevice*>(elm->second) == dev)
{
stillAlive = true;
break;
}
}
JVX_UNLOCK_MUTEX(safeAccessConnections);
if (stillAlive)
{
elm->second->connection->disconnect();
JVX_SLEEP_MS(200);
}
else
{
stillWaiting = false;
}
}
auto elm = _common_tech_set.lstDevices.begin();
for (; elm != _common_tech_set.lstDevices.end(); elm++)
{
if (static_cast<IjvxDevice*>(elm->hdlDev) == dev)
{
break;
}
}
if (elm != _common_tech_set.lstDevices.end())
{
delete elm->hdlDev;
_common_tech_set.lstDevices.erase(elm);
}
}
return res;
};
The shown function here plays an important role as it needs to wait for the connection to complete successfully.
A special consideration is required to properly deal with the end of program executaion with an active dynamic device. In this case, we need to shutdown the device and the associated i/o stream properly. This is typically required in the device deactivate
function. This function is typically part of the main thread and is the location at which we trigger the end of the i/o stream. The shutdown will then completely finish in the thread related to the involved technology but has some contact requests to structures from the main struct and is always in danger to produce a deadlock.
Every connected technology provides different constraints and demands a good design to operate without deadlock. An example implementation of the deactivate
function of a sip device is given as follows:
jvxErrorType
CayfBaresipDevice::deactivate()
{
jvxErrorType res = _pre_check_deactivate();
if (res == JVX_NO_ERROR)
{
...
if(theCall)
{
parentTech->baresip_ayfctrl_trigger_command(CayfBaresipTechnology::ayfBaresipSyncCommands::BARESIP_SYNC_COMMAND_HUP, theCall);
while (theCall)
{
JVX_SLEEP_MS(200);
}
}
}
return (res);
}
The i/o streaming technology is shutdown by calling the function baresip_ayfctrl_trigger_command
in which the streaming thread is driven to hang up the connection. The important handle is the argument theCall
: It indicates that a call is up and running when entering the loop to drive the hangup. The end of the connection actually happens in the specific thread and indicates that the connection has gone if the variable theCall
becomes a nullptr
. This, however, happens in the parallel non-main thread! In the case of this example, it is very important that the trigger function is outside the protecting lock - otherwise, the non-main thread would not be able to take the lock to protect theCall
and a deadlock would be the result.