Skip to content

AudYoFlo: Behavior: Dynamic Birth and Death of Devices

jvxgit edited this page Apr 4, 2023 · 21 revisions

In many cases, the AudYoFlo media engine must be able to handle incoming and outgoing data from dynamic input/output streams. A phone call is a good example for a dynamic input/output stream: as long as nobody calls, there is no incoming and outgoing data. However, if a call starts to be connected, the corresponding input/output streams should be integrated into the overall processing scheme as soon as the data starts to flow. And also, multiple input output streams should be able to be handled in parallel.

In these kinds of cases, a good approach is to let devices of a technology be born and killed given signals from a specific underlying technology. Often, the signals to indicate new streams occur asynchronously - hence, the the origin of spawning devices lies in a thread other than the AudYoFlo main tread. This requires special attention.

In this chapter, the typical procedures to integrate spawning devices into the signal processing chain for processing and their way back to the initial state will be explained in detail in the following. The description will start from the very first indication to report a new stream all the way to the last indication that indicates that the device has been killed and is no longer present in the signal processing main functional graph.

Note that the dynamic functionality is closely coupled to an implementation of the class of interface type IjvxAutomation which is in detail described here.

Principle Involved Steps

The steps to be principally taken to handle dynamic media streams by spawning new devices are the following:

  • Step (1): Accept an event to indicate that a new stream is pending - typically reported within a dedicated thread otther that the main thread.
  • Step (2): Map the pending stream indication to be logically linked to a new device in the main thread.
  • Step (3): Activate the device in the main thread.
  • Step (4): Connect the processing chain with the new device as the master.
  • Step (5): Test the processing chain to distribute the processing parameters towards all involved processing components.
  • Step (6): Trigger the sequencer to start and run data processing.

Once processing is over, the following steps are required to allow the new device to be deactivated and to be killed. In detail, this involves the following steps for the device to be removed:

  1. Stop processing of the chain,
  2. disconnect the chain,
  3. deactivate the device and
  4. Cleanup the device and remove it from the devices list.

The steps in detail will be subject to the remainder of this document section. We will guide all code fragements along with the example of the technology jvxSPTSocketSignal which is located in the folder

<AudYoFlo>/sources/jvxComponents/jvxSignalProcessingTechnologies/jvxSpTSocketSignal

New Devices Appearing

The following logic steps are involved when a new device started to live:

Reporting a Pending Device in a Dedicated Thread

Typically, pending new devices are detected in threads which operate outside of the main thread. A new connection is spawned in this thread on the very first sign of activity and stored in a list for objects in state pending. In the example application, the first indication of a new device is reported whevener there is incoming data on the socket which is operated in a thread which is bound to the message receive thread of the sub module CjvxSocketsServer waiting for incoming connections. In that case the following callback is triggered:

jvxErrorType 
CjvxSignalProcessingTechnologySocket::report_server_connect(const char* description, IjvxSocketsConnection* newConnection)
{
	jvxCBitField prio = 0;
	std::string nm = deviceNamePrefix + "#" + jvx_size2String(uId);
	uId++;
	CjvxSignalProcessingDeviceSocket* newDevice =
		new CjvxSignalProcessingDeviceSocket(
			nm.c_str(), false,
			description,
			_common_set.theFeatureClass,
			_common_set.theModuleName.c_str(),
			JVX_COMPONENT_ACCESS_SUB_COMPONENT,
			(jvxComponentType)(_common_set.theComponentType.tp + 1),
			"", NULL);
	newDevice->init_connection(newConnection, this);

	JVX_LOCK_MUTEX(safeAccessConnections);
	pendig_activate[newConnection] = newDevice;
	JVX_UNLOCK_MUTEX(safeAccessConnections);

	return JVX_NO_ERROR;
}

This function allocates a device in that thread and stores it in the list of pending devices. Naturally, the allocation must be secured by a mutex since the list will be accessed also from the main thread in the future. The devices in the pending_active list are not yet ready for exposure.

After this first allocation steps, additional network traffic and handshake communication is expected to be happening. Finally, a callback report_new_connection is the location where a new device is reported to be pending ready for operation. Again, the underlying thread is bound to the message receive thread of the sub module CjvxSocketsServer which waits for incoming connections on a socket. The report_new_connection is the member function of the interface class IjvxSocketsServer_report.

Whenever the report_new_connection function is called, a lot of message-send-and-receive-activity was already done such that we can expect that all connection communication is complete. In the callback, the event of the new connection is mapped to the main thread by emitting an object of type CjvxReportCommandRequest_rm. This object has the purpose to re-schedule an event to return in the main thread:

jvxErrorType
CjvxSignalProcessingTechnologySocket::report_new_connection(IjvxSocketsConnection* connection)
{
	// Reschedule this information into the main thread
	CjvxReportCommandRequest_rm theRequest(_common_set.theComponentType, JVX_SCHEDULE_ID_NEW_CONNECTION, 
		jvxReportCommandBroadcastType::JVX_REPORT_COMMAND_BROADCAST_NO_FURTHER,
		connection);
	_request_command(theRequest);
	return JVX_NO_ERROR;
}

The purpose of the message is to pass the connection pointer toegether with the id JVX_SCHEDULE_ID_NEW_CONNECTION.

Mapping the Pending Device to become a valid Device in the Main Thread

The re-scheduled event will then pop-up in the main thread in the callback schedule_main_loop(jvxSize rescheduleId, jvxHandle* user_data) which is part of the same class and sees the passed id and the connection pointer in arguments rescheduleId and user_data. In the rescheduling callback, the pending device is moved into the list of regular devices:

jvxErrorType 
CjvxSignalProcessingTechnologySocket::schedule_main_loop(jvxSize rescheduleId, jvxHandle* user_data)
{
	std::string ident;
	jvxErrorType res = JVX_ERROR_ELEMENT_NOT_FOUND;
	switch (rescheduleId)
	{
	case JVX_SCHEDULE_ID_NEW_CONNECTION:
	{
		// New Connection established
		JVX_LOCK_MUTEX(safeAccessConnections);
		auto elm = pendig_activate.begin();
		for (; elm != pendig_activate.end();elm++)
		{
			if (elm->second->theSMachine.localState == jvxAudioSocketsState::JVX_AUDIOSTATE_XCHANGE_INIT)
			{
				jvxBool nmSet = false;
				jvxSize cnt = 0;
				std::string uniqueName;
				jvxAudioSocketParameters params;
				elm->second->data_slave(&params);
				uniqueName = params.nameConnection;
				while (1)
				{
					uniqueName = params.nameConnection;
					if (cnt != 0)
					{
						uniqueName += "<" + jvx_size2String(cnt) + ">";					
					}
					cnt++;
					jvxBool foundIt = false;
					auto elmD = _common_tech_set.lstDevices.begin();
					for (;elmD != _common_tech_set.lstDevices.end(); elmD++)
					{
						jvxApiString astr;
						elmD->hdlDev->description(&astr);
						if(astr.std_str() == uniqueName)
						{
							foundIt = true;
							break;
						}
					}
					if (!foundIt)
					{
						params.nameConnection = uniqueName;
						break;
					}
				}

				res = elm->second->accept_slave(&params);
				assert(res == JVX_NO_ERROR); // <- this must be observed :-)

				oneDeviceWrapper elmAdd;
				elmAdd.hdlDev = static_cast<IjvxDevice*>(elm->second);
				ident = elm->second->_common_set_min.theDescription;
				_common_tech_set.lstDevices.push_back(elmAdd);
			}
		}
		JVX_UNLOCK_MUTEX(safeAccessConnections);

		report_device_was_born(ident);

		res = JVX_NO_ERROR;
	}
	break;	
	}
	return res;
}

The movement is done by adding the pointer into the list stored in variable _common_tech_set.lstDevices. Then, in the final step, the call of member function report_device_was_born(ident) will inform the system that there is actually a new device available. The call to report_device_was_born emits an object of type CjvxReportCommandRequest to the host with request type jvxReportCommandRequest::JVX_REPORT_COMMAND_REQUEST_UPDATE_AVAILABLE_COMPONENT_LIST and emit another object of type CjvxReportCommandRequest_id with the request type jvxReportCommandRequest::JVX_REPORT_COMMAND_REQUEST_REPORT_BORN_SUBDEVICE towards the object of type IjvxAutomation. The latter request will show up in the implementation of the class for interface ``, function

jvxErrorType IjvxReportSystem::request_command(const CjvxReportCommandRequest& request);

Activation of the newly Born Device

The implementation of the class IjvxAutomation in most cases involves the class CjvxAutomation. This class is defined in the library jvx-automate and provides default implementations. In the case of the report of a new born device, the class maps the request_command call to a call of the function

jvxErrorType handle_report_ident(jvxReportCommandRequest req,
	jvxComponentIdentification tp, const std::string& ident,
	CjvxAutomation::callSpecificParameters* params);

In that function, at the end, the call to the implementation of handle_report_ident must be called,

jvxErrorType 
classDerived::handle_report_ident(jvxReportCommandRequest req,
	jvxComponentIdentification tp, const std::string& ident,
	CjvxAutomation::callSpecificParameters* params)
{
   // ===================================================================
   ...
   // ===================================================================

   return CjvxAutomation::handle_report_ident(req, tp, ident, params);
}

The involved default implementation automatically activates the new born device.

Connecting the New Device

Testing the Data Processing Chain Involving the New Device

Starting the New Device

End of Lifetime of a Dynamic Device

If the lifetime of a device ends, all active structures must be removed. Typically, again, this is triggered by an event in a thread other than the main thread. What will be required first is that the condition of the end-of-lifecycle will bring the device in a state in which there is no more real data io happening. An approach could be to set a flag to indicate the state of shutting down the device.

Then, a command request is emitted to synchronize the event that the device will no longer be active in the main thread. By emitting this event, the next steps will be under control of the object of type IjvxAutomation. In the mentioned example, the indication about the lifecycle of the device is handled as follows:

jvxErrorType
CjvxSignalProcessingTechnologySocket::report_server_disconnect(const char* description, IjvxSocketsConnection* newConnection)
{
	std::string ident;
	JVX_LOCK_MUTEX(safeAccessConnections);
	auto elm = pendig_activate.find(newConnection);
	if (elm != pendig_activate.end())
	{
		assert(elm->second->connection);
		ident = elm->second->_common_set_min.theDescription;
		
		// Indicate that this connection is no longer in use after this function call
		elm->second->terminate_connection();		
		pendig_activate.erase(elm);
	}
	JVX_UNLOCK_MUTEX(safeAccessConnections);

	report_device_died(ident);

	return JVX_NO_ERROR;
}

The last call to the function report_device_died(ident) will emit the event to trigger the automation object to take care of all following steps.

Stopping the Processing Chain

When the class CjvxAutomation sees a device to be removed, it finds the list of process tasks the device was involved with as obtained at startup time. Each task is flagged to be completed immediately and the sequencer is triggered to run the task immediately by emitting an object with request type jvxReportCommandRequest::JVX_REPORT_COMMAND_REQUEST_TRIGGER_SEQUENCER_IMMEDIATE. This object has the immediate flag set and will force the sequencer to run immediately until the task has been completed. After being run to complete, the task itself is removed from the list of open tasks. Since the task was initially created to startup the processing chain, once the immediate sequencer operation is done the device is still active but no longer in operation.

Disconnecting the Processing Chain, Deactivating the Device and Removal from the Technology Devices List

In the next step, the device will be deactivated. While doing so, the data connection sub system will automatically disconnect the connections that the device was connected to earlier. In the last step, the device will be unselected. When doing so, the member function return_device(IjvxDevice* dev) will be called. Here is now the right place to remove all structures which were allocated before:

jvxErrorType
CjvxSignalProcessingTechnologySocket::return_device(IjvxDevice* dev)
{
	jvxErrorType res = CjvxTemplateTechnology<CjvxSignalProcessingDeviceSocket>::return_device(dev);
	if(res == JVX_NO_ERROR)
	{
		jvxBool stillWaiting = true;
		while (stillWaiting)
		{
			jvxBool stillAlive = false;

			JVX_LOCK_MUTEX(safeAccessConnections);
			auto elm = pendig_activate.begin();
			for (; elm != pendig_activate.end(); elm++)
			{
				if (static_cast<IjvxDevice*>(elm->second) == dev)
				{
					stillAlive = true;
					break;
				}
			}
			JVX_UNLOCK_MUTEX(safeAccessConnections);
			if (stillAlive)
			{
				elm->second->connection->disconnect();
				JVX_SLEEP_MS(200);
			}
			else
			{
				stillWaiting = false;
			}
		}
		
		auto elm = _common_tech_set.lstDevices.begin();
		for (; elm != _common_tech_set.lstDevices.end(); elm++)
		{
			if (static_cast<IjvxDevice*>(elm->hdlDev) == dev)
			{
				break;
			}
		}
		if (elm != _common_tech_set.lstDevices.end())
		{
			delete elm->hdlDev;
			_common_tech_set.lstDevices.erase(elm);
		}
	}
	return res;
};

The shown function here plays an important role as it needs to wait for the connection to complete successfully.

Clone this wiki locally