diff --git a/src/qoa4ml/collector/amqp_collector.py b/src/qoa4ml/collector/amqp_collector.py index f2b2562..e3f2a40 100644 --- a/src/qoa4ml/collector/amqp_collector.py +++ b/src/qoa4ml/collector/amqp_collector.py @@ -10,18 +10,67 @@ class AmqpCollector(BaseCollector): - # Init an amqp client handling the connection to amqp servier + """ + AmqpCollector handles the connection to an AMQP server for collecting and processing messages. + + Parameters + ---------- + configuration : AMQPCollectorConfig + Configuration settings for connecting to the AMQP server. + host_object : Optional[HostObject], optional + An optional HostObject to process incoming messages, default is None. + + Attributes + ---------- + host_object : Optional[HostObject] + The host object responsible for processing messages. + exchange_name : str + The name of the exchange to connect to. + exchange_type : str + The type of the exchange (e.g., 'direct', 'topic'). + in_routing_key : str + The routing key for incoming messages. + in_connection : pika.BlockingConnection + The connection to the RabbitMQ server. + in_channel : pika.channel.Channel + The channel for communication with RabbitMQ. + queue : pika.spec.Queue.DeclareOk + The queue to receive prediction responses. + queue_name : str + The name of the queue. + + Methods + ------- + on_request(ch, method, props, body) + Process incoming request messages. + start_collecting() + Start collecting messages from the queue. + stop() + Stop collecting messages and close the connection. + get_queue() -> str + Get the name of the queue. + """ + def __init__( self, configuration: AMQPCollectorConfig, host_object: Optional[HostObject] = None, ): + """ + Initialize an instance of AmqpCollector. + + Parameters + ---------- + configuration : AMQPCollectorConfig + Configuration settings for connecting to the AMQP server. + host_object : Optional[HostObject], optional + An optional HostObject to process incoming messages, default is None. + """ self.host_object = host_object self.exchange_name = configuration.exchange_name self.exchange_type = configuration.exchange_type self.in_routing_key = configuration.in_routing_key - # Connect to RabbitMQ host if "amqps://" in configuration.end_point: self.in_connection = pika.BlockingConnection( pika.URLParameters(configuration.end_point) @@ -31,45 +80,80 @@ def __init__( pika.ConnectionParameters(host=configuration.end_point) ) - # Create a channel self.in_channel = self.in_connection.channel() - - # Init an Exchange self.in_channel.exchange_declare( exchange=self.exchange_name, exchange_type=self.exchange_type ) - # Declare a queue to receive prediction response self.queue = self.in_channel.queue_declare( queue=configuration.in_queue, exclusive=False ) self.queue_name = self.queue.method.queue - # Binding the exchange to the queue with specific routing + self.in_channel.queue_bind( exchange=self.exchange_name, queue=self.queue_name, routing_key=self.in_routing_key, ) - def on_request(self, ch, method, props, body): - # Process the data on request: sending back to host object + def on_request(self, ch, method, props, body) -> None: + """ + Process incoming request messages. + + Parameters + ---------- + ch : pika.channel.Channel + The channel object for the communication. + method : pika.spec.Basic.Deliver + The method frame object containing delivery parameters. + props : pika.spec.BasicProperties + The properties frame object containing message properties. + body : bytes + The message body sent from the producer. + + Notes + ----- + If `host_object` is provided, it will handle message processing. Otherwise, the raw message will be logged. + """ if self.host_object is not None: self.host_object.message_processing(ch, method, props, body) else: mess = json.loads(str(body.decode("utf-8"))) qoa_logger.info(mess) - def start_collecting(self): - # Start rabbit MQ + def start_collecting(self) -> None: + """ + Start collecting messages from the queue. + + Notes + ----- + This method starts the RabbitMQ consumer to collect messages from the queue and process them. + The method will block and run indefinitely until `stop` is called. + """ self.in_channel.basic_qos(prefetch_count=1) self.in_channel.basic_consume( queue=self.queue_name, on_message_callback=self.on_request, auto_ack=True ) self.in_channel.start_consuming() - def stop(self): + def stop(self) -> None: + """ + Stop collecting messages and close the connection. + + Notes + ----- + This method stops the consumer from collecting messages and closes the channel. + """ self.in_channel.stop_consuming() self.in_channel.close() - def get_queue(self): + def get_queue(self) -> str: + """ + Get the name of the queue. + + Returns + ------- + str + The name of the queue. + """ return self.queue.method.queue diff --git a/src/qoa4ml/collector/socket_collector.py b/src/qoa4ml/collector/socket_collector.py index 307b8c4..97a56f3 100644 --- a/src/qoa4ml/collector/socket_collector.py +++ b/src/qoa4ml/collector/socket_collector.py @@ -7,7 +7,50 @@ class SocketCollector(BaseCollector): + """ + SocketCollector handles the collection of data over a TCP socket. + + Parameters + ---------- + config : SocketCollectorConfig + Configuration settings for the socket collector. + process_report : Callable + A callable function to process incoming reports. + + Attributes + ---------- + config : SocketCollectorConfig + The socket collector configuration. + host : str + The hostname or IP address to bind the socket. + port : int + The port number to bind the socket. + backlog : int + The maximum number of queued connections. + bufsize : int + The maximum size of data to be received at once. + process_report : Callable + A function to process the received report. + execution_flag : bool + Flag to control the execution loop. + + Methods + ------- + start_collecting() + Start the socket server to collect and process incoming data. + """ + def __init__(self, config: SocketCollectorConfig, process_report: Callable) -> None: + """ + Initialize an instance of SocketCollector. + + Parameters + ---------- + config : SocketCollectorConfig + Configuration settings for the socket collector. + process_report : Callable + A callable function to process incoming reports. + """ self.config = config self.host = config.host self.port = config.port @@ -16,10 +59,20 @@ def __init__(self, config: SocketCollectorConfig, process_report: Callable) -> N self.process_report = process_report self.execution_flag = True - def start_collecting(self): + def start_collecting(self) -> None: + """ + Start the socket server to collect and process incoming data. + + Notes + ----- + - This method starts a TCP socket server that listens for incoming connections. + - Data received from clients is deserialized using pickle and then processed using the `process_report` function. + - The server runs indefinitely until the `execution_flag` is set to False. + """ server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((self.host, self.port)) server_socket.listen(self.backlog) + while self.execution_flag: client_socket, _ = server_socket.accept() data = b"" diff --git a/src/qoa4ml/connector/amqp_connector.py b/src/qoa4ml/connector/amqp_connector.py index 1e5f9b4..486475e 100644 --- a/src/qoa4ml/connector/amqp_connector.py +++ b/src/qoa4ml/connector/amqp_connector.py @@ -8,12 +8,51 @@ class AmqpConnector(BaseConnector): - # Init an amqp client handling the connection to amqp servier + """ + AmqpConnector handles the connection to an AMQP server for sending messages. + + Parameters + ---------- + config : AMQPConnectorConfig + Configuration settings for the AMQP connector. + log : bool, optional + A flag to enable logging of messages, default is False. + + Attributes + ---------- + conf : AMQPConnectorConfig + The AMQP connector configuration. + exchange_name : str + The name of the exchange to connect to. + exchange_type : str + The type of the exchange (e.g., 'direct', 'topic'). + out_routing_key : str + The routing key for outgoing messages. + log_flag : bool + Flag indicating whether to log messages. + out_connection : pika.BlockingConnection + The connection to the RabbitMQ server. + out_channel : pika.channel.Channel + The channel for communication with RabbitMQ. + + Methods + ------- + send_report(body_message: str, corr_id: Optional[str] = None, routing_key: Optional[str] = None, expiration: int = 1000) + Send data to the desired destination. + get() -> AMQPConnectorConfig + Get the current configuration of the connector. + """ + def __init__(self, config: AMQPConnectorConfig, log: bool = False): """ - AMQP connector - configuration: a dictionary include broker and queue information - log: a bool flag for logging message if being set to True, default is False + Initialize an instance of AmqpConnector. + + Parameters + ---------- + config : AMQPConnectorConfig + Configuration settings for the AMQP connector. + log : bool, optional + A flag to enable logging of messages, default is False. """ self.conf = config self.exchange_name = config.exchange_name @@ -34,7 +73,7 @@ def __init__(self, config: AMQPConnectorConfig, log: bool = False): # Create a channel self.out_channel = self.out_connection.channel() - # Init an Exchange + # Initialize an Exchange self.out_channel.exchange_declare( exchange=self.exchange_name, exchange_type=self.exchange_type ) @@ -42,13 +81,29 @@ def __init__(self, config: AMQPConnectorConfig, log: bool = False): def send_report( self, body_message: str, - corr_id=None, + corr_id: Optional[str] = None, routing_key: Optional[str] = None, - expiration=1000, - ): - # Sending data to desired destination - # if sender is client, it will include the "reply_to" attribute to specify where to reply this message - # if sender is server, it will reply the message to "reply_to" via default exchange + expiration: int = 1000, + ) -> None: + """ + Send data to the desired destination. + + Parameters + ---------- + body_message : str + The message body to be sent. + corr_id : str, optional + The correlation ID for the message, default is None. + routing_key : str, optional + The routing key for the message, default is None. + expiration : int, optional + Message expiration time in milliseconds, default is 1000. + + Notes + ----- + - If `corr_id` is not provided, a new UUID will be generated. + - If `routing_key` is not provided, the default `out_routing_key` will be used. + """ if corr_id is None: corr_id = str(uuid.uuid4()) if routing_key is None: @@ -62,8 +117,14 @@ def send_report( properties=self.sub_properties, body=body_message, ) - # if self.log_flag: - # self.mess_logging.log_request(body_mess,corr_id) - def get(self): + def get(self) -> AMQPConnectorConfig: + """ + Get the current configuration of the connector. + + Returns + ------- + AMQPConnectorConfig + The AMQP connector configuration. + """ return self.conf diff --git a/src/qoa4ml/connector/debug_connector.py b/src/qoa4ml/connector/debug_connector.py index 70f414a..c3a487c 100644 --- a/src/qoa4ml/connector/debug_connector.py +++ b/src/qoa4ml/connector/debug_connector.py @@ -7,9 +7,48 @@ class DebugConnector(BaseConnector): + """ + DebugConnector provides a connector for debugging purposes, enabling the logging of messages. + + Parameters + ---------- + config : DebugConnectorConfig + Configuration settings for the debug connector. + + Attributes + ---------- + silence : bool + Flag to suppress debugging output if set to True. + + Methods + ------- + send_report(body_message: str) -> None + Send and debug the message. + """ + def __init__(self, config: DebugConnectorConfig): + """ + Initialize an instance of DebugConnector. + + Parameters + ---------- + config : DebugConnectorConfig + Configuration settings for the debug connector. + """ self.silence = config.silence - def send_report(self, body_message: str): + def send_report(self, body_message: str) -> None: + """ + Send and debug the message. + + Parameters + ---------- + body_message : str + The message body to be sent and debugged. + + Notes + ----- + If `silence` is set to False, the message will be logged for debugging purposes. + """ if not self.silence: debug(json.loads(body_message)) diff --git a/src/qoa4ml/connector/socket_connector.py b/src/qoa4ml/connector/socket_connector.py index 0e592b4..be6db25 100644 --- a/src/qoa4ml/connector/socket_connector.py +++ b/src/qoa4ml/connector/socket_connector.py @@ -9,12 +9,64 @@ class SocketConnector(BaseConnector): + """ + SocketConnector handles the connection to a TCP socket for sending serialized messages. + + Parameters + ---------- + config : SocketConnectorConfig + Configuration settings for the socket connector. + + Attributes + ---------- + config : SocketConnectorConfig + The socket connector configuration. + host : str + The hostname or IP address to connect to. + port : int + The port number to connect to on the host. + + Methods + ------- + send_report(body_message: str, log_path: Optional[str] = None) -> None + Send a serialized message over the socket and optionally log the round-trip time. + """ + def __init__(self, config: SocketConnectorConfig): + """ + Initialize an instance of SocketConnector. + + Parameters + ---------- + config : SocketConnectorConfig + Configuration settings for the socket connector. + """ self.config = config self.host = config.host self.port = config.port - def send_report(self, body_message: str, log_path: Optional[str] = None): + def send_report(self, body_message: str, log_path: Optional[str] = None) -> None: + """ + Send a serialized message over the socket and optionally log the round-trip time. + + Parameters + ---------- + body_message : str + The message body to be serialized and sent. + log_path : str, optional + The path to the log file where round-trip time will be recorded, default is None. + + Notes + ----- + - This method serializes the `body_message` using the `pickle` module. + - It then sends the serialized message to the configured host and port. + - If `log_path` is provided, the round-trip time in milliseconds will be recorded in the specified log file. + + Raises + ------ + ConnectionRefusedError + If the connection to the host is refused. + """ try: start = time.time() client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -22,8 +74,9 @@ def send_report(self, body_message: str, log_path: Optional[str] = None): serialized_message = pickle.dumps(body_message) client_socket.sendall(serialized_message) client_socket.close() + if log_path: with open(log_path, "a", encoding="utf-8") as file: - file.write(str((time.time() - start) * 1000) + "\n") + file.write(f"{(time.time() - start) * 1000:.2f} ms\n") except ConnectionRefusedError: logging.error("Connection to aggregator refused") diff --git a/src/qoa4ml/metric.py b/src/qoa4ml/metric.py deleted file mode 100644 index 3dc7aba..0000000 --- a/src/qoa4ml/metric.py +++ /dev/null @@ -1,147 +0,0 @@ -# # Metrics are implemented based on these classes to be compatible with Prometheus -# from .lang.datamodel_enum import MetricNameEnum -# -# -# class PrometheusMetric: -# """ -# This class defines the common attribute and provide basic function for handling a metric -# - Attribute: -# - name: name of the metric -# - description: describe the metric -# - value: value of the metric -# - category: group metric into specific category supporting building QoA_Report -# - others: (Developing) -# -# - Function: -# - set: set specific value -# - get_val: return current value -# - get_name: return metric name -# - get_des: return metric description -# - other: (Developing) -# -# - Category: metrics are categorized into following groups -# 0 - Quality: Performance (metrics for evaluating service performance e.g., response time, throughput) -# 1 - Quality: Data (metrics for evaluating data quality e.g., missing, duplicate, erroneous) -# 2 - Quality: Inference (metrics for evaluating quality of ML inference, measured from inferences e.g., accuracy, confidence) -# 3 - Resource: metrics for evaluating resource utilization e.g. CPU, Memory -# Other: To do (extend more categories) -# """ -# -# def __init__( -# self, metric_name: MetricNameEnum, description, default_value=-1, category=0 -# ): -# self.name = metric_name -# self.description = description -# self.default_value = default_value -# self.value = default_value -# self.category = category -# -# def set(self, value): -# self.value = value -# -# def get_val(self): -# return self.value -# -# def get_name(self): -# return self.name -# -# def get_des(self): -# return self.description -# -# def get_category(self): -# return self.category -# -# def reset(self): -# self.value = self.default_value -# -# def __str__(self) -> str: -# return "metric_name: " + self.name + ", " + "value: " + str(self.value) -# -# def to_dict(self): -# mectric_dict = {} -# mectric_dict[self.name] = self.value -# return mectric_dict -# -# -# class Counter(PrometheusMetric): -# """ -# This class inherit all attributes of Metric -# - Attribute: (Developing) -# -# - Function: -# - inc: increase its value by num -# - reset: set the value back to zero -# - others: (Developing) -# """ -# -# def __init__(self, metric_name, description, default_value=0, category=0): -# super().__init__(metric_name, description, default_value, category) -# -# def inc(self, num=1): -# self.value += num -# -# -# class Gauge(PrometheusMetric): -# """ -# This class inherit all attributes of Metric -# - Attribute: (Developing) -# -# - Function: -# - inc: increase its value by num -# - others: (Developing) -# """ -# -# def __init__(self, metric_name, description, default_value=-1, category=0): -# super().__init__(metric_name, description, default_value, category) -# -# def inc(self, num=1): -# self.value += num -# -# # TO DO: -# # implement other functions -# def dec(self, num=1): -# self.value -= num -# -# -# class Summary(PrometheusMetric): -# """ -# This class inherit all attributes of Metric -# - Attribute: (Developing) -# -# - Function: -# - inc: increase its value by num -# - others: (Developing) -# """ -# -# def __init__(self, metric_name, description, default_value=-1, category=0): -# super().__init__(metric_name, description, default_value, category) -# -# def inc(self, num): -# self.value += num -# -# # TO DO: -# # implement other functions -# def dec(self, num): -# self.value -= num -# -# -# class Histogram(PrometheusMetric): -# """ -# This class inherit all attributes of Metric -# - Attribute: (Developing) -# -# - Function: -# - inc: increase its value by num -# - others: (Developing) -# """ -# -# def __init__(self, metric_name, description, default_value=-1, category=0): -# super().__init__(metric_name, description, default_value, category) -# -# def inc(self, num): -# self.value += num -# -# # TO DO: -# # implement other functions -# def dec(self, num): -# self.value -= num diff --git a/src/qoa4ml/probes/docker_monitoring_probe.py b/src/qoa4ml/probes/docker_monitoring_probe.py index 6a7ecc8..db82d33 100644 --- a/src/qoa4ml/probes/docker_monitoring_probe.py +++ b/src/qoa4ml/probes/docker_monitoring_probe.py @@ -13,19 +13,72 @@ class DockerMonitoringProbe(Probe): + """ + DockerMonitoringProbe is responsible for monitoring Docker containers and creating reports. + + Parameters + ---------- + config : DockerProbeConfig + Configuration settings for the Docker monitoring probe. + connector : BaseConnector + Connector to send the report data. + client_info : ClientInfo + Information about the client. + + Attributes + ---------- + config : DockerProbeConfig + The Docker monitoring probe configuration. + obs_service_url : str + The URL of the observation service, if registration is required. + docker_client : docker.DockerClient + The Docker client for communicating with Docker API. + + Methods + ------- + create_report() -> str + Create a report based on Docker container statistics. + """ + def __init__( self, config: DockerProbeConfig, connector: BaseConnector, client_info: ClientInfo, ) -> None: + """ + Initialize an instance of DockerMonitoringProbe. + + Parameters + ---------- + config : DockerProbeConfig + Configuration settings for the Docker monitoring probe. + connector : BaseConnector + Connector to send the report data. + client_info : ClientInfo + Information about the client. + """ super().__init__(config, connector, client_info) self.config = config if self.config.require_register: self.obs_service_url = self.config.obs_service_url self.docker_client = docker.from_env() - def create_report(self): + def create_report(self) -> str: + """ + Create a report based on Docker container statistics. + + Returns + ------- + str + JSON-encoded report containing Docker container statistics. + + Notes + ----- + - This method collects statistics for the specified Docker containers. + - If the report dictionary is empty, it adds a 2-second delay to prevent fast looping. + - In case of a RuntimeError, an error message is returned in a JSON format. + """ try: reports = get_docker_stats(self.docker_client, self.config.container_list) docker_report = DockerReport( @@ -34,12 +87,11 @@ def create_report(self): container_reports=reports, ) reports_dict = docker_report.model_dump() - # NOTE: if the reports dict is empty, the loop will run very fast, so here add 2 seconds as if there is container to report if not reports_dict: time.sleep(2) return json.dumps(reports_dict) except RuntimeError: qoa_logger.exception( - "Maybe running in the background result in this exception!" + "RuntimeError occurred, possibly due to running in the background!" ) return json.dumps({"error": "RuntimeError"}) diff --git a/src/qoa4ml/probes/process_monitoring_probe.py b/src/qoa4ml/probes/process_monitoring_probe.py index a29a2dc..29027dd 100644 --- a/src/qoa4ml/probes/process_monitoring_probe.py +++ b/src/qoa4ml/probes/process_monitoring_probe.py @@ -33,24 +33,72 @@ class ProcessMonitoringProbe(Probe): + """ + ProcessMonitoringProbe is responsible for monitoring system processes and creating reports based on usage statistics. + + Parameters + ---------- + config : ProcessProbeConfig + Configuration settings for the process monitoring probe. + connector : BaseConnector + Connector to send the report data. + client_info : Optional[ClientInfo] + Information about the client, default is None. + + Attributes + ---------- + config : ProcessProbeConfig + The process monitoring probe configuration. + pid : int + The process ID to monitor. + environment : EnvironmentEnum + The environment in which the process is running. + process : psutil.Process + The psutil Process object for the monitored process. + obs_service_url : Optional[str] + The URL of the observation service, if registration is required. + metadata : Union[dict, resources_report_model.ProcessMetadata] + Metadata related to the monitored process. + + Methods + ------- + get_cpu_usage() -> dict + Get the CPU usage of the process. + get_mem_usage() -> dict + Get the memory usage of the process. + create_report() -> str + Create a JSON report based on the process statistics. + """ + def __init__( self, config: ProcessProbeConfig, connector: BaseConnector, client_info: ClientInfo | None = None, ) -> None: + """ + Initialize an instance of ProcessMonitoringProbe. + + Parameters + ---------- + config : ProcessProbeConfig + Configuration settings for the process monitoring probe. + connector : BaseConnector + Connector to send the report data. + client_info : Optional[ClientInfo] + Information about the client, default is None. + """ super().__init__(config, connector, client_info) self.config = config - if self.config.pid is None: - self.pid = os.getpid() - else: - self.pid = self.config.pid - if not psutil.pid_exists(self.pid): - raise RuntimeError(f"No process with pid {self.pid}") + self.pid = os.getpid() if self.config.pid is None else self.config.pid + if not psutil.pid_exists(self.pid): + raise RuntimeError(f"No process with pid {self.pid}") + self.environment = config.environment self.process = psutil.Process(self.pid) if self.config.require_register: self.obs_service_url = self.config.obs_service_url + if self.environment == EnvironmentEnum.hpc: self.metadata = {"pid": str(self.pid), "user": self.process.username()} else: @@ -58,24 +106,53 @@ def __init__( pid=str(self.pid), user=self.process.username(), client_info=client_info ) - def get_cpu_usage(self): + def get_cpu_usage(self) -> dict: + """ + Get the CPU usage of the process. + + Returns + ------- + dict + Dictionary containing the CPU usage information. + """ process_usage = report_proc_child_cpu(self.process) return process_usage - def get_mem_usage(self): + def get_mem_usage(self) -> dict: + """ + Get the memory usage of the process. + + Returns + ------- + dict + Dictionary containing the memory usage in megabytes. + """ data = report_proc_mem(self.process) return { "rss": {"value": convert_to_mbyte(data["rss"]), "unit": "Mb"}, "vms": {"value": convert_to_mbyte(data["vms"]), "unit": "Mb"}, } - def create_report(self): + def create_report(self) -> str: + """ + Create a JSON report based on the process statistics. + + Returns + ------- + str + JSON-encoded report containing process statistics. + + Notes + ----- + - This method collects CPU and memory usage stats for the specified process. + - Reports are generated differently based on the environment (HPC or other). + """ timestamp = time.time() cpu_usage = self.get_cpu_usage() mem_usage = self.get_mem_usage() allowed_cpu_list = get_process_allowed_cpus() allowed_memory_size = get_process_allowed_memory() - report = None + if self.environment == EnvironmentEnum.hpc: report = { "type": "process", @@ -100,4 +177,5 @@ def create_report(self): cpu=resources_report_model.ResourceReport(usage=cpu_usage), mem=resources_report_model.ResourceReport(usage=mem_usage), ).model_dump() + return json.dumps(report) diff --git a/src/qoa4ml/probes/system_monitoring_probe.py b/src/qoa4ml/probes/system_monitoring_probe.py index 3fa86f8..c2e3c81 100644 --- a/src/qoa4ml/probes/system_monitoring_probe.py +++ b/src/qoa4ml/probes/system_monitoring_probe.py @@ -30,18 +30,78 @@ class SystemMonitoringProbe(Probe): + """ + SystemMonitoringProbe is responsible for monitoring system resources and creating reports based on usage statistics. + + Parameters + ---------- + config : SystemProbeConfig + Configuration settings for the system monitoring probe. + connector : BaseConnector + Connector to send the report data. + client_info : Optional[ClientInfo] + Information about the client, default is None. + + Attributes + ---------- + config : SystemProbeConfig + The system monitoring probe configuration. + node_name : str + The name of the node being monitored. + environment : EnvironmentEnum + The environment in which the node is running. + cpu_metadata : dict + Metadata about the CPU. + gpu_metadata : dict + Metadata about the GPU. + mem_metadata : dict + Metadata about the memory. + metadata : dict + General metadata about the node. + + Methods + ------- + get_cpu_metadata() -> dict + Get metadata about the CPU. + get_cpu_usage() -> dict + Get the CPU usage of the system. + get_gpu_metadata() -> dict + Get metadata about the GPU. + get_gpu_usage() -> dict + Get the GPU usage of the system. + get_mem_metadata() -> dict + Get metadata about the memory. + get_mem_usage() -> dict + Get the memory usage of the system. + create_report() -> str + Create a JSON report based on system resource usage statistics. + """ + def __init__( self, config: SystemProbeConfig, connector: BaseConnector, client_info: ClientInfo | None = None, ) -> None: + """ + Initialize an instance of SystemMonitoringProbe. + + Parameters + ---------- + config : SystemProbeConfig + Configuration settings for the system monitoring probe. + connector : BaseConnector + Connector to send the report data. + client_info : Optional[ClientInfo] + Information about the client, default is None. + """ super().__init__(config, connector, client_info) self.config = config - if self.config.node_name is None: - self.node_name = socket.gethostname().split(".")[0] - else: - self.node_name = self.config.node_name + self.node_name = ( + socket.gethostname().split(".")[0] + if self.config.node_name is None + else self.config.node_name + ) if self.config.require_register: self.obs_service_url = self.config.obs_service_url self.environment = config.environment @@ -50,40 +110,102 @@ def __init__( self.mem_metadata = self.get_mem_metadata() self.metadata = {"node_name": self.node_name} - def get_cpu_metadata(self): + def get_cpu_metadata(self) -> dict: + """ + Get metadata about the CPU. + + Returns + ------- + dict + Dictionary containing metadata about the CPU. + """ return get_sys_cpu_metadata() - def get_cpu_usage(self): + def get_cpu_usage(self) -> dict: + """ + Get the CPU usage of the system. + + Returns + ------- + dict + Dictionary containing the CPU usage information in percentage. + """ value = get_sys_cpu_util() return {"value": value, "unit": "percentage"} - def get_gpu_metadata(self): + def get_gpu_metadata(self) -> dict: + """ + Get metadata about the GPU. + + Returns + ------- + dict + Dictionary containing metadata about the GPU. + """ if self.environment == EnvironmentEnum.edge: report = find_igpu() else: report = get_sys_gpu_metadata() return report - def get_gpu_usage(self): + def get_gpu_usage(self) -> dict: + """ + Get the GPU usage of the system. + + Returns + ------- + dict + Dictionary containing the GPU usage information. + """ if self.environment == EnvironmentEnum.edge: report = get_gpu_load(self.gpu_metadata) else: report = get_sys_gpu_usage() return report - def get_mem_metadata(self): + def get_mem_metadata(self) -> dict: + """ + Get metadata about the memory. + + Returns + ------- + dict + Dictionary containing memory metadata in gigabytes. + """ mem = get_sys_mem() return {"mem": {"capacity": convert_to_gbyte(mem["total"]), "unit": "Gb"}} - def get_mem_usage(self): + def get_mem_usage(self) -> dict: + """ + Get the memory usage of the system. + + Returns + ------- + dict + Dictionary containing the memory usage in megabytes. + """ mem = get_sys_mem() return {"value": convert_to_mbyte(mem["used"]), "unit": "Mb"} - def create_report(self): + def create_report(self) -> str: + """ + Create a JSON report based on system resource usage statistics. + + Returns + ------- + str + JSON-encoded report containing system resource usage statistics. + + Notes + ----- + - This method collects CPU, GPU, and memory usage stats for the system. + - Reports are generated differently based on the environment (HPC or other). + """ timestamp = time.time() cpu_usage = self.get_cpu_usage() gpu_usage = self.get_gpu_usage() mem_usage = self.get_mem_usage() + if self.environment == EnvironmentEnum.hpc: report = { "type": "system", @@ -118,4 +240,5 @@ def create_report(self): metadata=self.mem_metadata, usage=mem_usage ), ).model_dump() + return json.dumps(report) diff --git a/src/qoa4ml/qoa_client.py b/src/qoa4ml/qoa_client.py index 34bb8da..27781b4 100644 --- a/src/qoa4ml/qoa_client.py +++ b/src/qoa4ml/qoa_client.py @@ -55,28 +55,33 @@ class QoaClient(Generic[T]): def __init__( self, - # NOTE: use text, number, enum report_cls: type[T] = MLReport, config_dict: Optional[dict] = None, config_path: Optional[str] = None, registration_url: Optional[str] = None, - logging_level=2, + logging_level: int = 2, ): """ - Initialize the QoA Client with configuration settings and report class. + Initialize the QoA Client with configuration settings and a report class. Parameters ---------- report_cls : type[T], optional The class type for reports, default is MLReport. - config_dict : Optional[dict], optional - A dictionary to load the client's configuration. - config_path : Optional[str], optional + config_dict : dict, optional + A dictionary to load the client's configuration from. + config_path : str, optional Path to a JSON configuration file. - registration_url : Optional[str], optional + registration_url : str, optional URL for registering the client and receiving configuration data. logging_level : int, optional The logging verbosity level (default: 2). + + Notes + ----- + - If both `config_dict` and `config_path` are provided, the `config_dict` will take precedence. + - If neither `config_dict` nor `config_path` is provided, the client may attempt to fetch configurations from the `registration_url`. + - The method will raise an exception if the necessary configuration details are not found. """ set_logger_level(logging_level) @@ -108,7 +113,6 @@ def __init__( self.client_config.id = str(uuid.uuid4()) self.qoa_report = report_cls(self.client_config) if self.configuration.connector: - # init connectors offline if it's specified connector_conf = self.configuration.connector try: for connector in connector_conf: @@ -118,12 +122,10 @@ def __init__( f"Error {type(e)} when configuring connector in QoaClient" ) elif registration_url or self.configuration.registration_url: - # init connectors using configuration received from monitoring service, if it's specified try: if registration_url: registration_data = self.registration(registration_url) else: - # NOTE: logically true registration_data = self.registration( self.configuration.registration_url ) @@ -154,7 +156,6 @@ def __init__( qoa_logger.warning("No connector initiated") self.default_connector = None else: - # Set default connector for sending monitoring data if not specify self.default_connector = next(iter(self.connector_list.keys())) self.probes_list = None @@ -162,10 +163,9 @@ def __init__( self.probes_list = self.init_probes( self.configuration.probes, self.configuration.client ) - # lock report to guarantee consistency self.lock = threading.Lock() - def registration(self, url: str): + def registration(self, url: str) -> requests.Response: """ Registers the client with the monitoring service and retrieves connector configurations. @@ -178,16 +178,18 @@ def registration(self, url: str): ------- requests.Response The response from the registration service, containing connector configurations. - """ - # get connector configuration by registering with the monitoring service + Notes + ----- + This method sends a POST request to the given URL with the client's configuration in JSON format. + """ return requests.request( "POST", url, headers=headers, data=self.client_config.json() ) def init_probes( self, probe_config_list: list[ProbeConfig], client_info: ClientInfo - ): + ) -> list[Probe]: """ Initialize monitoring probes based on the provided probe configuration list. @@ -202,9 +204,13 @@ def init_probes( ------- list[Probe] A list of initialized probe instances. + + Raises + ------ + ValueError + If an unsupported probe configuration type is provided. """ probes_list: list[Probe] = [] - # TODO: each probe can have their own connector if self.default_connector: selected_connector = self.connector_list[self.default_connector] else: @@ -212,7 +218,6 @@ def init_probes( selected_connector = DebugConnector(DebugConnectorConfig(silence=False)) for probe_config in probe_config_list: - # TODO: can be simplify for less duplicate code if isinstance(probe_config, DockerProbeConfig): probes_list.append( DockerMonitoringProbe(probe_config, selected_connector, client_info) @@ -261,16 +266,9 @@ def init_connector(self, configuration: ConnectorConfig) -> BaseConnector: ): return DebugConnector(configuration.config) - # TODO: MQTT is both connector and collector - # - # if ( - # configuration.connector_class == ServiceAPIEnum.mqtt - # and type(configuration.config) is MQTTConnectorConfig - # ): - # return Mqtt_Connector(configuration.config) raise RuntimeError("Connector config is not of correct type") - def get_client_config(self): + def get_client_config(self) -> ClientConfig: """ Get the current client configuration. @@ -281,7 +279,7 @@ def get_client_config(self): """ return self.client_config - def set_config(self, key, value): + def set_config(self, key: str, value: Any) -> None: """ Update a specific configuration setting by key. @@ -305,10 +303,10 @@ def set_config(self, key, value): def observe_metric( self, metric_name: MetricNameEnum, - value, + value: Any, category: int = 0, description: str = "", - ): + ) -> None: """ Observe and report a metric. @@ -343,7 +341,7 @@ def observe_metric( Metric(metric_name=metric_name, records=[value], description=description), ) - def timer(self): + def timer(self) -> dict: """ Start or stop a timer and record the response time. @@ -351,6 +349,11 @@ def timer(self): ------- dict A dictionary containing the start time and response time. + + Notes + ----- + - When called for the first time, it starts the timer. + - When called again, it stops the timer and records the response time as a metric. """ if self.timer_flag is False: self.timer_flag = True @@ -367,7 +370,7 @@ def timer(self): ) return response_time - def import_previous_report(self, reports: Union[dict, list[dict]]): + def import_previous_report(self, reports: Union[dict, list[dict]]) -> None: """ Import and process previous reports. @@ -382,7 +385,7 @@ def import_previous_report(self, reports: Union[dict, list[dict]]): else: self.qoa_report.process_previous_report(reports) - def asyn_report(self, body_mess: str, connectors: Optional[list] = None): + def asyn_report(self, body_mess: str, connectors: Optional[list] = None) -> None: """ Asynchronously send a report through the connectors. @@ -390,12 +393,15 @@ def asyn_report(self, body_mess: str, connectors: Optional[list] = None): ---------- body_mess : str The message body to be sent. - connectors : Optional[list], optional - A list of connectors to send the report through. If None, the default connector is used. + connectors : list, optional + A list of connectors to send the report through. If None, the default connector is used. + + Notes + ----- + Uses threading to send reports asynchronously. """ self.lock.acquire() if connectors is None: - # if connectors are not specify, use default if self.default_connector: chosen_connector = self.connector_list[self.default_connector] if isinstance(chosen_connector, AmqpConnector): @@ -407,42 +413,42 @@ def asyn_report(self, body_mess: str, connectors: Optional[list] = None): "No default connector, please specify the connector to use" ) else: - # iterate connector to send report - # for connector in connectors: - # print(connector) - # Todo: send by multiple connector pass - self.lock.release() def report( self, report: Optional[dict] = None, connectors: Optional[list] = None, - submit=False, - reset=True, - corr_id=None, - ): + submit: bool = False, + reset: bool = True, + corr_id: Optional[str] = None, + ) -> str: """ Generate a report and optionally submit it through the default connector. Parameters ---------- - report : Optional[dict], optional + report : dict, optional The report data to be submitted. If None, a report will be generated. - connectors : Optional[list], optional + connectors : list, optional A list of connectors through which to send the report, default is None. submit : bool, optional Whether to submit the report, default is False. reset : bool, optional Whether to reset the report state after submission, default is True. - corr_id : Optional[str], optional + corr_id : str, optional The correlation ID for the report, default is None. Returns ------- str The JSON-encoded report. + + Notes + ----- + The method will create a report based on the current state if none is provided. + If `submit` is True, the report will be sent through the default or specified connectors. """ if report is None: return_report = self.qoa_report.generate_report(reset, corr_id=corr_id) @@ -470,7 +476,7 @@ def report( qoa_logger.warning("No connector available") return return_report.model_dump(mode="json") - def start_all_probes(self): + def start_all_probes(self) -> None: """ Start all probes for monitoring, running them in the background. @@ -479,7 +485,9 @@ def start_all_probes(self): RuntimeError If no probes have been initialized. - NOTE: if the probe takes long to report, and the main process exit, no report may be sent + Notes + ----- + If the probe takes a long time to report and the main process exits, no report may be sent. """ if not self.probes_list: raise RuntimeError( @@ -488,7 +496,7 @@ def start_all_probes(self): for probe in self.probes_list: probe.start_reporting() - def stop_all_probes(self): + def stop_all_probes(self) -> None: """ Stop all running probes. @@ -496,18 +504,19 @@ def stop_all_probes(self): ------ RuntimeError If no probes have been initialized. + + Notes + ----- + This method stops the background monitoring activities of all active probes. """ if not self.probes_list: raise RuntimeError( - "There is no initiated probes, please recheck the config" + "There are no initiated probes, please recheck the config" ) for probe in self.probes_list: probe.stop_reporting() - def observe_inference( - self, - inference_value, - ): + def observe_inference(self, inference_value: Any) -> None: """ Observe and record inference data. @@ -515,6 +524,10 @@ def observe_inference( ---------- inference_value : Any The value of the inference to be observed. + + Notes + ----- + This method is used to record predictions or inference results for later analysis. """ self.qoa_report.observe_inference(inference_value) @@ -522,7 +535,7 @@ def observe_inference_metric( self, metric_name: MetricNameEnum, value: Any, - ): + ) -> None: """ Observe and report a specific inference metric. @@ -532,9 +545,25 @@ def observe_inference_metric( The name of the inference metric being observed. value : Any The value of the observed metric. + + Notes + ----- + This method can be used to log performance metrics, evaluation scores, etc. during inference. """ metric = Metric(metric_name=metric_name, records=[value]) self.qoa_report.observe_inference_metric(metric) - def __str__(self): + def __str__(self) -> str: + """ + Returns a string representation of the client's configuration and connectors. + + Returns + ------- + str + JSON representation of the client configuration and a string representation of the connector list. + + Notes + ----- + This method is particularly useful for debugging and logging purposes. + """ return self.client_config.model_dump_json() + "\n" + str(self.connector_list) diff --git a/src/qoa4ml/reports/general_application_report.py b/src/qoa4ml/reports/general_application_report.py index 3ce064a..a39b87f 100644 --- a/src/qoa4ml/reports/general_application_report.py +++ b/src/qoa4ml/reports/general_application_report.py @@ -1,5 +1,6 @@ import copy import time +from typing import Any from uuid import UUID from ..config.configs import ClientInfo @@ -14,14 +15,63 @@ class GeneralApplicationReport(AbstractReport): - def __init__(self, client_config: ClientInfo): + """ + GeneralApplicationReport manages the reporting of application metrics and inference data. + + Parameters + ---------- + client_config : ClientInfo + Configuration settings related to the client. + + Attributes + ---------- + client_config : ClientInfo + A deep copy of the client configuration. + init_time : float + The initialization time of the report. + report : GeneralApplicationReportModel + The current state of the report. + execution_instance : MicroserviceInstance + An instance representing the current execution context. + previous_reports : list[MicroserviceInstance] + A list of previous execution instances. + + Methods + ------- + reset() -> None + Reset the report to an initial state. + process_previous_report(previous_report_dict: dict) -> None + Process and incorporate a previous report. + observe_metric(report_type: ReportTypeEnum, stage: str, metric: Metric) -> None + Observe and record a metric. + observe_inference(inference_value: Any) -> None + Observe and record inference data. + observe_inference_metric(metric: Metric) -> None + Observe and record an inference-specific metric. + """ + + def __init__(self, client_config: ClientInfo) -> None: + """ + Initialize an instance of GeneralApplicationReport. + + Parameters + ---------- + client_config : ClientInfo + Configuration settings related to the client. + """ self.client_config = copy.deepcopy(client_config) self.reset() self.init_time = time.time() - def reset(self): - self.report = GeneralApplicationReportModel() + def reset(self) -> None: + """ + Reset the report to an initial state. + Notes + ----- + - This method initializes a new report model and sets up the execution instance and previous reports list. + """ + self.report = GeneralApplicationReportModel() self.execution_instance = MicroserviceInstance( id=UUID(self.client_config.instance_id), name=self.client_config.name, @@ -30,14 +80,40 @@ def reset(self): ) self.previous_reports: list[MicroserviceInstance] = [] - def process_previous_report(self, previous_report_dict: dict): + def process_previous_report(self, previous_report_dict: dict) -> None: + """ + Process and incorporate a previous report. + + Parameters + ---------- + previous_report_dict : dict + Dictionary representation of a previous report. + + Notes + ----- + - This method assumes the last metric in the previous report was observed by the previous instance. + - It appends the metrics from the previous report to the current report. + """ previous_report = GeneralApplicationReportModel(**previous_report_dict) - # NOTE: assume that the last metric is observed by the previous instance self.previous_reports.append(previous_report.metrics[-1].instance) for metric in previous_report.metrics: self.report.metrics.append(metric) - def observe_metric(self, report_type, stage, metric: Metric): + def observe_metric( + self, report_type: ReportTypeEnum, stage: str, metric: Metric + ) -> None: + """ + Observe and record a metric. + + Parameters + ---------- + report_type : ReportTypeEnum + The type of report being generated. + stage : str + The stage of the process in which the metric is recorded. + metric : Metric + The metric to be recorded. + """ flatten_metric = FlattenMetric( metric_name=metric.metric_name, records=metric.records, @@ -50,8 +126,19 @@ def observe_metric(self, report_type, stage, metric: Metric): ) self.report.metrics.append(flatten_metric) - def observe_inference(self, inference_value): - # TODO: may not be a great idea + def observe_inference(self, inference_value: Any) -> None: + """ + Observe and record inference data. + + Parameters + ---------- + inference_value : Any + The value of the inference to be recorded. + + Notes + ----- + - This method records inference values as a metric with the name "Inference" and report type ml_specific. + """ flatten_metric = FlattenMetric( metric_name="Inference", records=inference_value, @@ -62,7 +149,15 @@ def observe_inference(self, inference_value): ) self.report.metrics.append(flatten_metric) - def observe_inference_metric(self, metric: Metric): + def observe_inference_metric(self, metric: Metric) -> None: + """ + Observe and record an inference-specific metric. + + Parameters + ---------- + metric : Metric + The inference-specific metric to be recorded. + """ flatten_metric = FlattenMetric( metric_name=metric.metric_name, records=metric.records, diff --git a/src/qoa4ml/reports/ml_reports.py b/src/qoa4ml/reports/ml_reports.py index 73f3596..97ad0ec 100644 --- a/src/qoa4ml/reports/ml_reports.py +++ b/src/qoa4ml/reports/ml_reports.py @@ -1,6 +1,6 @@ import copy import time -from typing import Optional +from typing import Any, Optional from uuid import UUID, uuid4 from ..config.configs import ClientInfo @@ -16,12 +16,64 @@ class MLReport(AbstractReport): - def __init__(self, client_config: ClientInfo): + """ + MLReport manages the reporting of machine learning metrics and inference data. + + Parameters + ---------- + client_config : ClientInfo + Configuration settings related to the client. + + Attributes + ---------- + client_config : ClientInfo + A deep copy of the client configuration. + init_time : float + The initialization time of the report. + previous_report : list[GeneralMlInferenceReport] + A list of previously processed reports. + report : GeneralMlInferenceReport + The current state of the report. + + Methods + ------- + reset() -> None + Reset the report to an initial state. + combine_stage_report(current_stage_report: dict[str, StageReport], previous_stage_report: dict[str, StageReport]) -> dict[str, StageReport] + Combine metrics from the current and previous stage reports. + process_previous_report(previous_report_dict: dict) -> None + Process and incorporate a previous report. + observe_metric(report_type: ReportTypeEnum, stage: str, metric: Metric) -> None + Observe and record a metric. + observe_inference(inference_value: Any) -> None + Observe and record inference data. + observe_inference_metric(metric: Metric) -> None + Observe and record an inference-specific metric. + generate_report(reset: bool = True, corr_id: Optional[str] = None) -> BaseReport + Generate the report and optionally reset the current report state. + """ + + def __init__(self, client_config: ClientInfo) -> None: + """ + Initialize an instance of MLReport. + + Parameters + ---------- + client_config : ClientInfo + Configuration settings related to the client. + """ self.client_config = copy.deepcopy(client_config) self.reset() self.init_time = time.time() - def reset(self): + def reset(self) -> None: + """ + Reset the report to an initial state. + + Notes + ----- + - This method initializes a new report model and clears the list of previous reports. + """ self.previous_report: list[GeneralMlInferenceReport] = [] self.report = GeneralMlInferenceReport() @@ -29,7 +81,22 @@ def combine_stage_report( self, current_stage_report: dict[str, StageReport], previous_stage_report: dict[str, StageReport], - ): + ) -> dict[str, StageReport]: + """ + Combine metrics from the current and previous stage reports. + + Parameters + ---------- + current_stage_report : dict + The current stage report containing metrics. + previous_stage_report : dict + The previous stage report containing metrics. + + Returns + ------- + dict + Combined stage report containing metrics from both reports. + """ combined_stage_report: dict[str, StageReport] = {} for stage_name, stage_report in previous_stage_report.items(): new_stage_report = StageReport(name=stage_name, metrics={}) @@ -47,78 +114,141 @@ def combine_stage_report( combined_stage_report[stage_name] = new_stage_report return combined_stage_report - def process_previous_report(self, previous_report_dict: dict): + def process_previous_report(self, previous_report_dict: dict) -> None: + """ + Process and incorporate a previous report. + + Parameters + ---------- + previous_report_dict : dict + Dictionary representation of a previous report. + + Notes + ----- + - Service quality, data quality, and ML inference reports are combined with the current report. + """ previous_report = GeneralMlInferenceReport(**previous_report_dict) self.previous_report.append(previous_report) - # NOTE: service quality report self.report.service = self.combine_stage_report( self.report.service, previous_report.service ) - - # NOTE: data quality report self.report.data = self.combine_stage_report( self.report.data, previous_report.data ) - # NOTE: ml inference report - # self.report.ml_inference |= previous_report.ml_inference - def observe_metric(self, report_type, stage, metric: Metric): + def observe_metric( + self, report_type: ReportTypeEnum, stage: str, metric: Metric + ) -> None: + """ + Observe and record a metric. + + Parameters + ---------- + report_type : ReportTypeEnum + The type of report being generated. + stage : str + The stage of the process in which the metric is recorded. + metric : Metric + The metric to be recorded. + + Raises + ------ + ValueError + If the stage name is empty or the report type is not handled. + """ if stage == "": raise ValueError("Stage name can't be empty") + + report_dict = None if report_type == ReportTypeEnum.service: if stage not in self.report.service: self.report.service[stage] = StageReport(name=stage, metrics={}) - if metric.metric_name not in self.report.service[stage].metrics: - self.report.service[stage].metrics[metric.metric_name] = {} - - self.report.service[stage].metrics[metric.metric_name] |= { - UUID(self.client_config.instance_id): metric - } - + report_dict = self.report.service[stage].metrics elif report_type == ReportTypeEnum.data: if stage not in self.report.data: self.report.data[stage] = StageReport(name=stage, metrics={}) - if metric.metric_name not in self.report.data[stage].metrics: - self.report.data[stage].metrics[metric.metric_name] = {} - - self.report.data[stage].metrics[metric.metric_name] |= { - UUID(self.client_config.instance_id): metric - } + report_dict = self.report.data[stage].metrics else: raise ValueError(f"Can't handle report type {report_type}") - def observe_inference(self, inference_value): - if self.client_config.instance_id in self.report.ml_inference: + if metric.metric_name not in report_dict: + report_dict[metric.metric_name] = {} + + report_dict[metric.metric_name][UUID(self.client_config.instance_id)] = metric + + def observe_inference(self, inference_value: Any) -> None: + """ + Observe and record inference data. + + Parameters + ---------- + inference_value : Any + The value of the inference to be recorded. + + Notes + ----- + - Raises a warning if inference data already exists for the current instance. + """ + instance_id = UUID(self.client_config.instance_id) + + if instance_id in self.report.ml_inference: raise RuntimeWarning( "Inference existed, will override the existing inference" ) - self.report.ml_inference[self.client_config.instance_id] = InferenceInstance( + + self.report.ml_inference[instance_id] = InferenceInstance( inference_id=uuid4(), - instance_id=UUID(self.client_config.instance_id), + instance_id=instance_id, functionality=self.client_config.functionality, prediction=inference_value, ) - def observe_inference_metric(self, metric: Metric): - if self.client_config.instance_id in self.report.ml_inference: - self.report.ml_inference[self.client_config.instance_id].metrics.append( - metric - ) + def observe_inference_metric(self, metric: Metric) -> None: + """ + Observe and record an inference-specific metric. + + Parameters + ---------- + metric : Metric + The inference-specific metric to be recorded. + """ + instance_id = UUID(self.client_config.instance_id) + + if instance_id in self.report.ml_inference: + self.report.ml_inference[instance_id].metrics.append(metric) else: - self.report.ml_inference[self.client_config.instance_id] = ( - InferenceInstance( - inference_id=uuid4(), - instance_id=UUID(self.client_config.instance_id), - functionality=self.client_config.functionality, - metrics=[metric], - ) + self.report.ml_inference[instance_id] = InferenceInstance( + inference_id=uuid4(), + instance_id=instance_id, + functionality=self.client_config.functionality, + metrics=[metric], ) def generate_report( self, reset: bool = True, corr_id: Optional[str] = None ) -> BaseReport: + """ + Generate the report and optionally reset the current report state. + + Parameters + ---------- + reset : bool, optional + Whether to reset the report state after generating the report, default is True. + corr_id : Optional[str], optional + Correlation ID for the report, default is None. + + Returns + ------- + BaseReport + The generated report. + + Notes + ----- + - Adds metadata such as client configuration, timestamp, and runtime to the report. + - Deep copies the current state of the report before optionally resetting it. + """ self.report.metadata["client_config"] = copy.deepcopy(self.client_config) self.report.metadata["timestamp"] = time.time() if corr_id is not None: @@ -126,6 +256,7 @@ def generate_report( self.report.metadata["runtime"] = ( self.report.metadata["timestamp"] - self.init_time ) + report = copy.deepcopy(self.report) if reset: self.reset() diff --git a/src/qoa4ml/reports/rohe_reports.py b/src/qoa4ml/reports/rohe_reports.py index 39ff3f1..df9f14a 100644 --- a/src/qoa4ml/reports/rohe_reports.py +++ b/src/qoa4ml/reports/rohe_reports.py @@ -1,6 +1,6 @@ import copy import time -from typing import Optional +from typing import Any, Optional from uuid import UUID, uuid4 from ..config.configs import ClientInfo @@ -21,15 +21,77 @@ class RoheReport(AbstractReport): - def __init__(self, client_config: ClientInfo): + """ + RoheReport manages the reporting of metrics and inference data for a specific application. + + Parameters + ---------- + client_config : ClientInfo + Configuration settings related to the client. + + Attributes + ---------- + client_config : ClientInfo + A deep copy of the client configuration. + init_time : float + The initialization time of the report. + previous_report : list[RoheReportModel] + A list of previously processed reports. + inference_report : EnsembleInferenceReport + The current inference report. + execution_graph : ExecutionGraph + The current execution graph. + report : RoheReportModel + The current state of the report. + previous_microservice_instance : list[MicroserviceInstance] + List of previous microservice instances. + execution_instance : MicroserviceInstance + An instance representing the current execution context. + + Methods + ------- + reset() -> None + Reset the report to an initial state. + import_report_from_file(file_path: str) -> None + Import a report from a specified file path. + combine_stage_report(current_stage_report: dict[str, StageReport], previous_stage_report: dict[str, StageReport]) -> dict[str, StageReport] + Combine metrics from the current and previous stage reports. + process_previous_report(previous_report_dict: dict) -> None + Process and incorporate a previous report. + build_execution_graph() -> None + Build the execution graph for the current report. + observe_metric(report_type: ReportTypeEnum, stage: str, metric: Metric) -> None + Observe and record a metric. + observe_inference(inference_value: Any) -> None + Observe and record inference data. + observe_inference_metric(metric: Metric) -> None + Observe and record an inference-specific metric. + generate_report(reset: bool = True, corr_id: Optional[str] = None) -> RoheReportModel + Generate the report and optionally reset the current report state. + """ + + def __init__(self, client_config: ClientInfo) -> None: + """ + Initialize an instance of RoheReport. + + Parameters + ---------- + client_config : ClientInfo + Configuration settings related to the client. + """ self.client_config = copy.deepcopy(client_config) self.reset() self.init_time = time.time() - # if file_path: - # self.import_report_from_file(file_path) + def reset(self) -> None: + """ + Reset the report to an initial state. - def reset(self): + Notes + ----- + - This method initializes a new report model and clears the list of previous reports. + - It also resets the inference report, execution graph, and execution instance. + """ self.previous_report: list[RoheReportModel] = [] self.inference_report = EnsembleInferenceReport() self.execution_graph = ExecutionGraph(linked_list={}) @@ -42,19 +104,47 @@ def reset(self): stage=self.client_config.stage_id, ) - def import_report_from_file(self, file_path: str): + def import_report_from_file(self, file_path: str) -> None: + """ + Import a report from a specified file path. + + Parameters + ---------- + file_path : str + The path to the file containing the report data. + + Notes + ----- + - The imported report updates the current inference report and execution graph. + """ report = load_config(file_path) self.inference_report = EnsembleInferenceReport(**report["inference_report"]) self.execution_graph = ExecutionGraph(**report["execution_graph"]) self.report = RoheReportModel( - inference_report=self.inference_report, execution_graph=self.execution_graph + inference_report=self.inference_report, + execution_graph=self.execution_graph, ) def combine_stage_report( self, current_stage_report: dict[str, StageReport], previous_stage_report: dict[str, StageReport], - ): + ) -> dict[str, StageReport]: + """ + Combine metrics from the current and previous stage reports. + + Parameters + ---------- + current_stage_report : dict + The current stage report containing metrics. + previous_stage_report : dict + The previous stage report containing metrics. + + Returns + ------- + dict + Combined stage report containing metrics from both reports. + """ combined_stage_report: dict[str, StageReport] = {} for stage_name, stage_report in previous_stage_report.items(): new_stage_report = StageReport(name=stage_name, metrics={}) @@ -72,30 +162,34 @@ def combine_stage_report( combined_stage_report[stage_name] = new_stage_report return combined_stage_report - def process_previous_report(self, previous_report_dict: dict): + def process_previous_report(self, previous_report_dict: dict) -> None: + """ + Process and incorporate a previous report. + + Parameters + ---------- + previous_report_dict : dict + Dictionary representation of a previous report. + + Notes + ----- + - Raises a ValueError if the previous report is empty. + - Service quality, data quality, ML-specific quality reports, and execution graphs are combined with the current report. + """ previous_report = RoheReportModel(**previous_report_dict) self.previous_report.append(previous_report) if not previous_report.inference_report or not previous_report.execution_graph: raise ValueError("Can't process empty previous report") - # NOTE: service quality report self.inference_report.service = self.combine_stage_report( self.inference_report.service, previous_report.inference_report.service ) - - # NOTE: data quality report self.inference_report.data = self.combine_stage_report( self.inference_report.data, previous_report.inference_report.data ) - # NOTE: ml-specific quality report - # debug(previous_report.inference_report.ml_specific) if not self.inference_report.ml_specific: - if ( - previous_report.inference_report.ml_specific - and previous_report.inference_report.ml_specific.end_point - and previous_report.inference_report.ml_specific.linked_list - ): + if previous_report.inference_report.ml_specific: self.inference_report.ml_specific = ( previous_report.inference_report.ml_specific ) @@ -105,81 +199,118 @@ def process_previous_report(self, previous_report_dict: dict): instance_id=self.execution_instance.id, ) self.inference_report.ml_specific.end_point = end_point - self.inference_report.ml_specific.linked_list |= { - end_point.instance_id: LinkedInstance[InferenceInstance]( + self.inference_report.ml_specific.linked_list[end_point.instance_id] = ( + LinkedInstance( instance=end_point, previous=[ previous_report.inference_report.ml_specific.end_point ], ) - } + ) + else: + if ( + previous_report.inference_report.ml_specific + and self.inference_report.ml_specific.end_point + ): + self.inference_report.ml_specific.linked_list.update( + previous_report.inference_report.ml_specific.linked_list + ) + current_end_point = self.inference_report.ml_specific.end_point + previous_end_point = ( + previous_report.inference_report.ml_specific.end_point + ) + self.inference_report.ml_specific.linked_list[ + current_end_point.instance_id + ].previous.append(previous_end_point) - elif ( - previous_report.inference_report.ml_specific - and previous_report.inference_report.ml_specific.end_point - and previous_report.inference_report.ml_specific.linked_list - and self.inference_report.ml_specific.end_point - ): - self.inference_report.ml_specific.linked_list |= ( - previous_report.inference_report.ml_specific.linked_list - ) - current_end_point = self.inference_report.ml_specific.end_point - previous_end_point = previous_report.inference_report.ml_specific.end_point - self.inference_report.ml_specific.linked_list[ - current_end_point.instance_id - ].previous.append(previous_end_point) - # NOTE: execution graph if not self.execution_graph: self.execution_graph = previous_report.execution_graph else: self.execution_graph.linked_list.update( previous_report.execution_graph.linked_list ) + self.previous_microservice_instance.append( previous_report.execution_graph.end_point ) self.report = RoheReportModel( - inference_report=self.inference_report, execution_graph=self.execution_graph + inference_report=self.inference_report, + execution_graph=self.execution_graph, ) - def build_execution_graph(self): + def build_execution_graph(self) -> None: + """ + Build the execution graph for the current report. + + Notes + ----- + - Creates a new endpoint in the execution graph linking to previous microservice instances. + """ end_point = LinkedInstance( instance=self.execution_instance, - previous=list(self.previous_microservice_instance), + previous=self.previous_microservice_instance, ) self.execution_graph.linked_list[end_point.instance.id] = end_point self.execution_graph.end_point = end_point.instance self.report.execution_graph = self.execution_graph - def observe_metric(self, report_type: ReportTypeEnum, stage: str, metric: Metric): + def observe_metric( + self, report_type: ReportTypeEnum, stage: str, metric: Metric + ) -> None: + """ + Observe and record a metric. + + Parameters + ---------- + report_type : ReportTypeEnum + The type of report being generated. + stage : str + The stage of the process in which the metric is recorded. + metric : Metric + The metric to be recorded. + + Raises + ------ + ValueError + If the stage name is empty or the report type is not handled. + """ if stage == "": raise ValueError("Stage name can't be empty") + + report_dict = None if report_type == ReportTypeEnum.service: if stage not in self.inference_report.service: self.inference_report.service[stage] = StageReport( name=stage, metrics={} ) - if metric.metric_name not in self.inference_report.service[stage].metrics: - self.inference_report.service[stage].metrics[metric.metric_name] = {} - - self.inference_report.service[stage].metrics[metric.metric_name] |= { - UUID(self.client_config.instance_id): metric - } - + report_dict = self.inference_report.service[stage].metrics elif report_type == ReportTypeEnum.data: if stage not in self.inference_report.data: self.inference_report.data[stage] = StageReport(name=stage, metrics={}) - if metric.metric_name not in self.inference_report.data[stage].metrics: - self.inference_report.data[stage].metrics[metric.metric_name] = {} - - self.inference_report.data[stage].metrics[metric.metric_name] |= { - UUID(self.client_config.instance_id): metric - } + report_dict = self.inference_report.data[stage].metrics else: raise ValueError(f"Can't handle report type {report_type}") + + if metric.metric_name not in report_dict: + report_dict[metric.metric_name] = {} + + report_dict[metric.metric_name][UUID(self.client_config.instance_id)] = metric self.report.inference_report = self.inference_report - def observe_inference(self, inference_value): + def observe_inference(self, inference_value: Any) -> None: + """ + Observe and record inference data. + + Parameters + ---------- + inference_value : Any + The value of the inference to be recorded. + + Notes + ----- + - If an inference endpoint already exists, the prediction value is updated. + - Otherwise, a new inference endpoint is created in the inference report. + """ if ( self.inference_report.ml_specific and self.inference_report.ml_specific.end_point @@ -194,39 +325,72 @@ def observe_inference(self, inference_value): prediction=inference_value, ) self.inference_report.ml_specific.end_point = end_point - self.inference_report.ml_specific.linked_list |= { - end_point.instance_id: LinkedInstance[InferenceInstance]( + self.inference_report.ml_specific.linked_list[end_point.instance_id] = ( + LinkedInstance( instance=end_point, previous=[], ) - } + ) - def observe_inference_metric( - self, - metric: Metric, - ): + def observe_inference_metric(self, metric: Metric) -> None: + """ + Observe and record an inference-specific metric. + + Parameters + ---------- + metric : Metric + The inference-specific metric to be recorded. + + Notes + ----- + - If an inference endpoint already exists, the metric is appended to the existing metrics. + - Otherwise, a new inference endpoint is created and the metric is added to it. + """ if ( self.inference_report.ml_specific and self.inference_report.ml_specific.end_point ): self.inference_report.ml_specific.end_point.metrics.append(metric) else: - if self.inference_report.ml_specific is None: + if not self.inference_report.ml_specific: self.inference_report.ml_specific = InferenceGraph() - if self.inference_report.ml_specific.end_point is None: + if not self.inference_report.ml_specific.end_point: end_point = InferenceInstance( inference_id=uuid4(), instance_id=self.execution_instance.id, functionality=self.client_config.functionality, ) - self.inference_report.ml_specific.end_point = end_point self.inference_report.ml_specific.linked_list[end_point.instance_id] = ( - LinkedInstance[InferenceInstance](instance=end_point) + LinkedInstance( + instance=end_point, + ) ) self.inference_report.ml_specific.end_point.metrics.append(metric) - def generate_report(self, reset: bool = True, corr_id: Optional[str] = None): + def generate_report( + self, reset: bool = True, corr_id: Optional[str] = None + ) -> RoheReportModel: + """ + Generate the report and optionally reset the current report state. + + Parameters + ---------- + reset : bool, optional + Whether to reset the report state after generating the report, default is True. + corr_id : Optional[str], optional + Correlation ID for the report, default is None. + + Returns + ------- + RoheReportModel + The generated report. + + Notes + ----- + - Adds metadata such as client configuration, timestamp, and runtime to the report. + - Builds the execution graph and deep copies the current state of the report before optionally resetting it. + """ self.build_execution_graph() self.report.metadata["client_config"] = copy.deepcopy(self.client_config) self.report.metadata["timestamp"] = time.time() @@ -235,6 +399,7 @@ def generate_report(self, reset: bool = True, corr_id: Optional[str] = None): self.report.metadata["runtime"] = ( self.report.metadata["timestamp"] - self.init_time ) + report = copy.deepcopy(self.report) if reset: self.reset() diff --git a/src/qoa4ml/utils/qoa_utils.py b/src/qoa4ml/utils/qoa_utils.py index 9d6211d..1614a8b 100644 --- a/src/qoa4ml/utils/qoa_utils.py +++ b/src/qoa4ml/utils/qoa_utils.py @@ -10,6 +10,7 @@ import time import traceback from threading import Thread +from typing import Any, Optional import numpy as np import psutil @@ -18,11 +19,26 @@ from .logger import qoa_logger -def make_folder(temp_path): +def make_folder(temp_path: str) -> bool: + """ + Create a folder if it doesn't already exist. + + Parameters + ---------- + temp_path : str + The path of the folder to be created. + + Returns + ------- + bool + True if the folder exists or is created successfully, False otherwise. + + Notes + ----- + If the folder already exists, nothing is done. + """ try: - if os.path.exists(temp_path): - pass - else: + if not os.path.exists(temp_path): os.makedirs(temp_path) return True except Exception: @@ -30,6 +46,18 @@ def make_folder(temp_path): def get_cgroup_version() -> str: + """ + Retrieve the current cgroup version. + + Returns + ------- + str + The cgroup version ("v1" or "v2"). + + Notes + ----- + Uses subprocess to execute the `mount` command and grep for cgroup version. + """ proc1 = subprocess.Popen("mount", stdout=subprocess.PIPE) proc2 = subprocess.Popen( shlex.split("grep cgroup"), @@ -40,9 +68,7 @@ def get_cgroup_version() -> str: if proc1.stdout: proc1.stdout.close() out, _ = proc2.communicate() - if "cgroup2" in out.decode(): - return "v2" - return "v1" + return "v2" if "cgroup2" in out.decode() else "v1" if get_cgroup_version() == "v2": @@ -51,136 +77,239 @@ def get_cgroup_version() -> str: CGROUP_VERSION = "v1" -def set_logger_level(logging_level): - if logging_level == 0: - log_level = logging.NOTSET - elif logging_level == 1: - log_level = logging.DEBUG - elif logging_level == 2: - log_level = logging.INFO - elif logging_level == 3: - log_level = logging.WARNING - elif logging_level == 4: - log_level = logging.ERROR - elif logging_level == 5: - log_level = logging.CRITICAL - else: +def set_logger_level(logging_level: int) -> None: + """ + Set the logging level for the application logger. + + Parameters + ---------- + logging_level : int + The desired logging level: + 0 - NOTSET + 1 - DEBUG + 2 - INFO + 3 - WARNING + 4 - ERROR + 5 - CRITICAL + + Raises + ------ + ValueError + If the logging level is not between 0 and 5. + """ + log_levels = [ + logging.NOTSET, + logging.DEBUG, + logging.INFO, + logging.WARNING, + logging.ERROR, + logging.CRITICAL, + ] + + if not 0 <= logging_level < len(log_levels): raise ValueError(f"Error logging level {logging_level}") - qoa_logger.setLevel(log_level) + qoa_logger.setLevel(log_levels[logging_level]) -def load_config(file_path: str) -> dict: + +def load_config(file_path: str) -> Optional[dict]: """ - file_path: file path to load config + Load a configuration file. + + Parameters + ---------- + file_path : str + The path to the configuration file. + Returns + ------- + dict + The loaded configuration dictionary. + + Notes + ----- + Supports JSON and YAML file formats. Logs a warning if the format is unsupported. """ try: - if "json" in file_path: - with open(file_path) as f: + with open(file_path) as f: + if "json" in file_path: return json.load(f) - if ("yaml" in file_path) or ("yml" in file_path): - with open(file_path) as f: + elif "yaml" in file_path or "yml" in file_path: return yaml.safe_load(f) - else: - qoa_logger.warning("Unsupported format") - return None + else: + qoa_logger.warning("Unsupported format") except Exception: qoa_logger.error("Unable to load configuration") - - return None + return None -def to_json(file_path: str, conf: dict): +def to_json(file_path: str, conf: dict) -> None: """ - file_path: file path to save config + Save a configuration to a JSON file. + + Parameters + ---------- + file_path : str + The path to the file where the configuration should be saved. + conf : dict + The configuration dictionary to save. """ with open(file_path, "w") as f: json.dump(conf, f) -def to_yaml(file_path: str, conf: dict): +def to_yaml(file_path: str, conf: dict) -> None: """ - file_path: file path to save config + Save a configuration to a YAML file. + + Parameters + ---------- + file_path : str + The path to the file where the configuration should be saved. + conf : dict + The configuration dictionary to save. """ with open(file_path, "w") as f: yaml.dump(conf, f) -def get_sys_cpu(): +def get_sys_cpu() -> dict: + """ + Retrieve system CPU statistics and times. + + Returns + ------- + dict + Dictionary containing CPU stats and times. + + Notes + ----- + Uses psutil to retrieve both CPU stats and times. + """ stats = psutil.cpu_stats() cpu_time = psutil.cpu_times() - info = {} - for key in stats._fields: - info[key] = getattr(stats, key) - for key in cpu_time._fields: - info[key] = getattr(cpu_time, key) - return info + return {key: getattr(stats, key) for key in stats._fields} | { + key: getattr(cpu_time, key) for key in cpu_time._fields + } -def get_sys_cpu_util(): - info = {} +def get_sys_cpu_util() -> dict: + """ + Retrieve system CPU utilization for each core. + + Returns + ------- + dict + Dictionary containing CPU utilization for each core. + """ core_utils = psutil.cpu_percent(percpu=True) - for core_num, core_util in enumerate(core_utils): - info[f"core_{core_num}"] = core_util - return info + return { + f"core_{core_num}": core_util for core_num, core_util in enumerate(core_utils) + } + +def get_sys_cpu_metadata() -> dict: + """ + Retrieve metadata information about the system CPU. -def get_sys_cpu_metadata(): + Returns + ------- + dict + Dictionary containing CPU frequency and thread count. + """ cpu_freq = psutil.cpu_freq() - frequency = {"value": cpu_freq.max / 1000, "unit": "GHz"} - cpu_threads = psutil.cpu_count(logical=True) - return {"frequency": frequency, "thread": cpu_threads} + return { + "frequency": {"value": cpu_freq.max / 1000, "unit": "GHz"}, + "thread": psutil.cpu_count(logical=True), + } -def get_sys_mem(): +def get_sys_mem() -> dict: + """ + Retrieve system memory statistics. + + Returns + ------- + dict + Dictionary containing memory stats. + """ stats = psutil.virtual_memory() - info = {} - for key in stats._fields: - info[key] = getattr(stats, key) - return info + return {key: getattr(stats, key) for key in stats._fields} -def get_sys_net(): - info = {} +def get_sys_net() -> dict: + """ + Retrieve system network I/O statistics. + + Returns + ------- + dict + Dictionary containing network I/O stats. + """ net = psutil.net_io_counters() - for key in net._fields: - info[key] = getattr(net, key) - return info + return {key: getattr(net, key) for key in net._fields} -def report_proc_cpu(process): - report = {} +def report_proc_cpu(process: psutil.Process) -> dict: + """ + Retrieve CPU usage statistics for a given process. + + Parameters + ---------- + process : psutil.Process + The process to retrieve CPU stats for. + + Returns + ------- + dict + Dictionary containing CPU stats for the process. + """ cpu_time = process.cpu_times() context = process.num_ctx_switches() - for key in cpu_time._fields: - report[key] = getattr(cpu_time, key) - for key in context._fields: - report[key] = getattr(context, key) - report["num_thread"] = process.num_threads() + return ( + {key: getattr(cpu_time, key) for key in cpu_time._fields} + | {key: getattr(context, key) for key in context._fields} + | {"num_thread": process.num_threads()} + ) - return report +def report_proc_child_cpu(process: psutil.Process) -> dict: + """ + Retrieve CPU usage statistics for a given process and its children. + + Parameters + ---------- + process : psutil.Process + The process to retrieve CPU stats for. -def report_proc_child_cpu(process: psutil.Process): - # WARNING: this children function takes a lot of time + Returns + ------- + dict + Dictionary containing CPU stats for the process and its children. + + Notes + ----- + This function can be time-consuming as it recursively evaluates all child processes. + """ child_processes = process.children(recursive=True) - child_processes_count = len(child_processes) - child_processes_cpu = {} - process_cpu_time = process.cpu_times() - for id, child_proc in enumerate(child_processes): - cpu_time = child_proc.cpu_times() - child_processes_cpu[f"child_{id}"] = float(cpu_time.user + cpu_time.system) + child_processes_cpu = { + f"child_{id}": float( + child_proc.cpu_times().user + child_proc.cpu_times().system + ) + for id, child_proc in enumerate(child_processes) + } - total_cpu_usage = sum(child_processes_cpu.values()) + process_cpu_time = process.cpu_times() main_process = float( process_cpu_time.user + process_cpu_time.system + process_cpu_time.children_user + process_cpu_time.children_system ) - total_cpu_usage += main_process + total_cpu_usage = sum(child_processes_cpu.values()) + main_process + return { - "child_process": child_processes_count, + "child_process": len(child_processes), "value": child_processes_cpu, "main_process": main_process, "total": total_cpu_usage, @@ -188,7 +317,25 @@ def report_proc_child_cpu(process: psutil.Process): } -def get_proc_cpu(pid=None): +def get_proc_cpu(pid: Optional[int] = None) -> dict: + """ + Retrieve CPU usage statistics for a given process and its children. + + Parameters + ---------- + pid : int, optional + The process ID to retrieve CPU stats for. If None, uses the current process ID. + + Returns + ------- + dict + Dictionary containing CPU stats for the process and its children. + + Notes + ----- + - The main process's stats are keyed by its PID. + - Each child process's stats are keyed by its PID with a "c" suffix. + """ if pid is None: pid = os.getpid() process = psutil.Process(pid) @@ -197,19 +344,47 @@ def get_proc_cpu(pid=None): info[pid] = report_proc_cpu(process) for child in child_list: - info[child.pid + "c"] = report_proc_cpu(child) + info[f"{child.pid}c"] = report_proc_cpu(child) return info -def report_proc_mem(process: psutil.Process): - report = {} +def report_proc_mem(process: psutil.Process) -> dict: + """ + Retrieve memory usage statistics for a given process. + + Parameters + ---------- + process : psutil.Process + The process to retrieve memory stats for. + + Returns + ------- + dict + Dictionary containing memory stats for the process. + """ mem_info = process.memory_info() - for key in mem_info._fields: - report[key] = getattr(mem_info, key) - return report + return {key: getattr(mem_info, key) for key in mem_info._fields} -def get_proc_mem(pid=None): +def get_proc_mem(pid: Optional[int] = None) -> dict: + """ + Retrieve memory usage statistics for a given process and its children. + + Parameters + ---------- + pid : int, optional + The process ID to retrieve memory stats for. If None, uses the current process ID. + + Returns + ------- + dict + Dictionary containing memory stats for the process and its children. + + Notes + ----- + - The main process's stats are keyed by its PID. + - Each child process's stats are keyed by its PID with a "c" suffix. + """ if pid is None: pid = os.getpid() process = psutil.Process(pid) @@ -218,19 +393,58 @@ def get_proc_mem(pid=None): info[pid] = report_proc_mem(process) for child in child_list: - info[child.pid + "c"] = report_proc_mem(child) + info[f"{child.pid}c"] = report_proc_mem(child) return info -def convert_to_gbyte(value): +def convert_to_gbyte(value: float) -> float: + """ + Convert a value from bytes to gigabytes. + + Parameters + ---------- + value : float + The value in bytes to be converted. + + Returns + ------- + float + The converted value in gigabytes. + """ return value / 1024.0 / 1024.0 / 1024.0 -def convert_to_mbyte(value): +def convert_to_mbyte(value: float) -> float: + """ + Convert a value from bytes to megabytes. + + Parameters + ---------- + value : float + The value in bytes to be converted. + + Returns + ------- + float + The converted value in megabytes. + """ return value / 1024.0 / 1024.0 -def convert_to_kbyte(value): +def convert_to_kbyte(value: float) -> float: + """ + Convert a value from bytes to kilobytes. + + Parameters + ---------- + value : float + The value in bytes to be converted. + + Returns + ------- + float + The converted value in kilobytes. + """ return value / 1024.0 @@ -241,25 +455,48 @@ def convert_to_kbyte(value): doc_monitor_flag = False -def system_report(client, interval: int, to_mb=True, to_gb=False, to_kb=False): +def system_report( + client, interval: int, to_mb: bool = True, to_gb: bool = False, to_kb: bool = False +) -> None: + """ + Generate a system report and send it to the client at regular intervals. + + Parameters + ---------- + client : Any + The client to send the report to. + interval : int + The time interval in seconds between reports. + to_mb : bool, optional + Convert network I/O values to megabytes, default is True. + to_gb : bool, optional + Convert network I/O values to gigabytes, default is False. + to_kb : bool, optional + Convert network I/O values to kilobytes, default is False. + + Notes + ----- + - Collects CPU, memory, and network I/O statistics. + - Logs errors if any occur during data collection or report sending. + - Sleeps for the specified interval between reports. + """ report = {} last_net_value = {"sent": 0, "receive": 0} while sys_monitor_flag: try: - report["sys_cpu_stats"] = get_sys_mem() + report["sys_cpu_stats"] = get_sys_cpu() except Exception as e: - qoa_logger.error(f"Error {type(e)} in report CPU stat: {e.__traceback__}") + qoa_logger.error(f"Error {type(e)} in report CPU stat: {e}") traceback.print_exception(*sys.exc_info()) try: report["sys_mem_stats"] = get_sys_mem() except Exception as e: - qoa_logger.error( - f"Error {type(e)} in report memory stat: {e.__traceback__}" - ) + qoa_logger.error(f"Error {type(e)} in report memory stat: {e}") traceback.print_exception(*sys.exc_info()) try: report["sys_net_stats"] = get_sys_net() sent = 0 + receive = 0 if to_mb: sent = convert_to_mbyte(psutil.net_io_counters().bytes_sent) receive = convert_to_mbyte(psutil.net_io_counters().bytes_recv) @@ -272,6 +509,7 @@ def system_report(client, interval: int, to_mb=True, to_gb=False, to_kb=False): else: sent = psutil.net_io_counters().bytes_sent receive = psutil.net_io_counters().bytes_recv + curr_net_value = {"sent": sent, "receive": receive} report["sys_net_send"] = curr_net_value["sent"] - last_net_value["sent"] report["sys_net_receive"] = ( @@ -279,21 +517,31 @@ def system_report(client, interval: int, to_mb=True, to_gb=False, to_kb=False): ) last_net_value = curr_net_value.copy() except Exception as e: - qoa_logger.error( - f"Error {type(e)} in report network stat: {e.__traceback__}" - ) + qoa_logger.error(f"Error {type(e)} in report network stat: {e}") traceback.print_exception(*sys.exc_info()) try: client.report(report=report) except Exception as e: - qoa_logger.error( - f"Error {type(e)} in sent system report: {e.__traceback__}" - ) + qoa_logger.error(f"Error {type(e)} in sent system report: {e}") traceback.print_exception(*sys.exc_info()) time.sleep(interval) -def sys_monitor(client, interval: int): +def sys_monitor(client, interval: int) -> None: + """ + Start monitoring system reports. + + Parameters + ---------- + client : Any + The client to send the report to. + interval : int + The time interval in seconds between reports. + + Notes + ----- + - Starts a new thread to generate system reports at regular intervals. + """ sub_thread = Thread(target=system_report, args=(client, interval)) sub_thread.start() @@ -331,34 +579,85 @@ def sys_monitor(client, interval: int): ###################### DOCKER REPORT ###################### -def get_cpu_stat(stats, key): +def get_cpu_stat(stats: dict, key: str) -> float: + """ + Retrieve CPU usage statistics from Docker stats. + + Parameters + ---------- + stats : dict + The Docker stats dictionary. + key : str + The key indicating the type of CPU statistic (e.g., "percentage"). + + Returns + ------- + float + The CPU usage percentage, or -1 if the key is not recognized. + + Notes + ----- + - Calculates the CPU usage percentage based on the difference between the current and previous CPU usage. + """ if key == "percentage": usage_delta = ( stats["cpu_stats"]["cpu_usage"]["total_usage"] - stats["precpu_stats"]["cpu_usage"]["total_usage"] ) - system_delta = ( stats["cpu_stats"]["system_cpu_usage"] - stats["precpu_stats"]["system_cpu_usage"] ) - len_cpu = stats["cpu_stats"]["online_cpus"] - percentage = (usage_delta / system_delta) * len_cpu * 100 return round(percentage, 2) - else: - return -1 + return -1 -def get_mem_stat(stats, key): +def get_mem_stat(stats: dict, key: str) -> int: + """ + Retrieve memory usage statistics from Docker stats. + + Parameters + ---------- + stats : dict + The Docker stats dictionary. + key : str + The key indicating the type of memory statistic (e.g., "used"). + + Returns + ------- + int + The memory usage in bytes, or -1 if the key is not recognized. + """ if key == "used": return stats["memory_stats"]["usage"] - else: - return -1 + return -1 -def merge_report(f_report, i_report, prio=True): +def merge_report(f_report: dict, i_report: dict, prio: bool = True) -> dict: + """ + Merge two report dictionaries. + + Parameters + ---------- + f_report : dict + The first report dictionary. + i_report : dict + The second report dictionary. + prio : bool, optional + Flag to determine which report takes priority in case of conflict, default is True. + + Returns + ------- + dict + The merged report dictionary. + + Notes + ----- + - If both reports are dictionaries, merges them recursively. + - If there is a conflict and prio is True, the value from f_report is used; otherwise, the value from i_report is used. + """ try: if isinstance(f_report, dict) and isinstance(i_report, dict): key_list = tuple(f_report.keys()) @@ -368,55 +667,138 @@ def merge_report(f_report, i_report, prio=True): i_report.pop(key) f_report.update(i_report) elif f_report != i_report: - if prio is True: - return f_report - else: - return i_report + return f_report if prio else i_report except Exception as e: - qoa_logger.error(f"Error {type(e)} in mergeReport: {e.__traceback__}") + qoa_logger.error(f"Error {type(e)} in merge_report: {e.__traceback__}") traceback.print_exception(*sys.exc_info()) return f_report -def get_dict_at(dict, i=0): +def get_dict_at(dictionary: dict, i: int = 0): + """ + Retrieve the key-value pair at a specific index in a dictionary. + + Parameters + ---------- + dictionary : dict + The dictionary from which to retrieve the key-value pair. + i : int, optional + The index of the key-value pair to retrieve, default is 0. + + Returns + ------- + tuple + A tuple containing the key and the value at the specified index. + + Raises + ------ + IndexError + If the index is out of range. + + Notes + ----- + - Logs an error and prints the exception traceback if an error occurs. + """ try: - keys = list(dict.keys()) - return keys[i], dict[keys[i]] + keys = list(dictionary.keys()) + return keys[i], dictionary[keys[i]] except Exception as e: qoa_logger.error(f"Error {type(e)} in get_dict_at: {e.__traceback__}") traceback.print_exception(*sys.exc_info()) -def get_file_dir(file, to_string=True): +def get_file_dir(file: str, to_string: bool = True): + """ + Get the directory of a file. + + Parameters + ---------- + file : str + The file path. + to_string : bool, optional + Flag to return the directory as a string, default is True. + + Returns + ------- + str or pathlib.Path + The directory of the file as a string or Path object. + """ current_dir = pathlib.Path(file).parent.absolute() - if to_string: - return str(current_dir) - else: - return current_dir + return str(current_dir) if to_string else current_dir -def get_parent_dir(file, parent_level=1, to_string=True): +def get_parent_dir(file: str, parent_level: int = 1, to_string: bool = True): + """ + Get the parent directory of a file by a specified number of levels. + + Parameters + ---------- + file : str + The file path. + parent_level : int, optional + The number of levels up to retrieve the parent directory, default is 1. + to_string : bool, optional + Flag to return the directory as a string, default is True. + + Returns + ------- + str or pathlib.Path + The parent directory of the file as a string or Path object. + """ current_dir = get_file_dir(file=file, to_string=False) for _ in range(parent_level): current_dir = current_dir.parent.absolute() - if to_string: - return str(current_dir) - else: - return current_dir + return str(current_dir) if to_string else current_dir + + +def is_numpyarray(obj: Any) -> bool: + """ + Check if an object is a NumPy array. + Parameters + ---------- + obj : Any + The object to check. -def is_numpyarray(obj): + Returns + ------- + bool + True if the object is a NumPy array, False otherwise. + """ return isinstance(obj, np.ndarray) -def get_process_allowed_cpus(): - # NOTE: 0 as PID represents the calling process +def get_process_allowed_cpus() -> list[int]: + """ + Retrieve the list of CPU cores available to the process. + + Returns + ------- + list[int] + A list of CPU core indices. + + Notes + ----- + - Uses the call process's PID (0) to get the CPU affinity. + """ pid = 0 affinity = os.sched_getaffinity(pid) return list(affinity) -def get_process_allowed_memory(): +def get_process_allowed_memory() -> Optional[float]: + """ + Retrieve the memory limit allowed to the process. + + Returns + ------- + Optional[float] + The memory limit in bytes, or None if unable to retrieve. + + Notes + ----- + - Supports both cgroup v1 and v2 formats to get the memory limit. + """ if CGROUP_VERSION == "v1": with open("/proc/self/cgroup") as file: for line in file: @@ -425,7 +807,7 @@ def get_process_allowed_memory(): cgroup_path = parts[2] memory_limit_file = re.sub(r"/task_\d+", "", cgroup_path) - number_of_task = len( + number_of_tasks = len( glob.glob(f"/sys/fs/cgroup/memory{memory_limit_file}/task_*") ) @@ -435,11 +817,10 @@ def get_process_allowed_memory(): memory_limit_str = limit_file.read().strip() try: memory_limit_int = int(memory_limit_str) - return memory_limit_int / number_of_task + return memory_limit_int / number_of_tasks except ValueError: return memory_limit_str return None - else: with open("/proc/self/cgroup") as file: for line in file: @@ -448,13 +829,13 @@ def get_process_allowed_memory(): pattern = r"/task_\d+" cgroup_path = re.sub(pattern, "", cgroup_path) with open(f"/sys/fs/cgroup{cgroup_path}/memory.max") as limit_file: - number_of_task = len( + number_of_tasks = len( glob.glob(f"/sys/fs/cgroup{cgroup_path}/task_*") ) memory_limit_str = limit_file.read().strip() try: memory_limit_int = int(memory_limit_str) - return memory_limit_int / number_of_task + return memory_limit_int / number_of_tasks except ValueError: return memory_limit_str return None