Skip to content

Commit

Permalink
add docstring generated from chatgpt
Browse files Browse the repository at this point in the history
  • Loading branch information
nguu0123 committed Sep 18, 2024
1 parent c45a52b commit 3aa99b2
Show file tree
Hide file tree
Showing 14 changed files with 1,716 additions and 519 deletions.
110 changes: 97 additions & 13 deletions src/qoa4ml/collector/amqp_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,67 @@


class AmqpCollector(BaseCollector):
# Init an amqp client handling the connection to amqp servier
"""
AmqpCollector handles the connection to an AMQP server for collecting and processing messages.
Parameters
----------
configuration : AMQPCollectorConfig
Configuration settings for connecting to the AMQP server.
host_object : Optional[HostObject], optional
An optional HostObject to process incoming messages, default is None.
Attributes
----------
host_object : Optional[HostObject]
The host object responsible for processing messages.
exchange_name : str
The name of the exchange to connect to.
exchange_type : str
The type of the exchange (e.g., 'direct', 'topic').
in_routing_key : str
The routing key for incoming messages.
in_connection : pika.BlockingConnection
The connection to the RabbitMQ server.
in_channel : pika.channel.Channel
The channel for communication with RabbitMQ.
queue : pika.spec.Queue.DeclareOk
The queue to receive prediction responses.
queue_name : str
The name of the queue.
Methods
-------
on_request(ch, method, props, body)
Process incoming request messages.
start_collecting()
Start collecting messages from the queue.
stop()
Stop collecting messages and close the connection.
get_queue() -> str
Get the name of the queue.
"""

def __init__(
self,
configuration: AMQPCollectorConfig,
host_object: Optional[HostObject] = None,
):
"""
Initialize an instance of AmqpCollector.
Parameters
----------
configuration : AMQPCollectorConfig
Configuration settings for connecting to the AMQP server.
host_object : Optional[HostObject], optional
An optional HostObject to process incoming messages, default is None.
"""
self.host_object = host_object
self.exchange_name = configuration.exchange_name
self.exchange_type = configuration.exchange_type
self.in_routing_key = configuration.in_routing_key

# Connect to RabbitMQ host
if "amqps://" in configuration.end_point:
self.in_connection = pika.BlockingConnection(
pika.URLParameters(configuration.end_point)
Expand All @@ -31,45 +80,80 @@ def __init__(
pika.ConnectionParameters(host=configuration.end_point)
)

# Create a channel
self.in_channel = self.in_connection.channel()

# Init an Exchange
self.in_channel.exchange_declare(
exchange=self.exchange_name, exchange_type=self.exchange_type
)

# Declare a queue to receive prediction response
self.queue = self.in_channel.queue_declare(
queue=configuration.in_queue, exclusive=False
)
self.queue_name = self.queue.method.queue
# Binding the exchange to the queue with specific routing

self.in_channel.queue_bind(
exchange=self.exchange_name,
queue=self.queue_name,
routing_key=self.in_routing_key,
)

def on_request(self, ch, method, props, body):
# Process the data on request: sending back to host object
def on_request(self, ch, method, props, body) -> None:
"""
Process incoming request messages.
Parameters
----------
ch : pika.channel.Channel
The channel object for the communication.
method : pika.spec.Basic.Deliver
The method frame object containing delivery parameters.
props : pika.spec.BasicProperties
The properties frame object containing message properties.
body : bytes
The message body sent from the producer.
Notes
-----
If `host_object` is provided, it will handle message processing. Otherwise, the raw message will be logged.
"""
if self.host_object is not None:
self.host_object.message_processing(ch, method, props, body)
else:
mess = json.loads(str(body.decode("utf-8")))
qoa_logger.info(mess)

def start_collecting(self):
# Start rabbit MQ
def start_collecting(self) -> None:
"""
Start collecting messages from the queue.
Notes
-----
This method starts the RabbitMQ consumer to collect messages from the queue and process them.
The method will block and run indefinitely until `stop` is called.
"""
self.in_channel.basic_qos(prefetch_count=1)
self.in_channel.basic_consume(
queue=self.queue_name, on_message_callback=self.on_request, auto_ack=True
)
self.in_channel.start_consuming()

def stop(self):
def stop(self) -> None:
"""
Stop collecting messages and close the connection.
Notes
-----
This method stops the consumer from collecting messages and closes the channel.
"""
self.in_channel.stop_consuming()
self.in_channel.close()

def get_queue(self):
def get_queue(self) -> str:
"""
Get the name of the queue.
Returns
-------
str
The name of the queue.
"""
return self.queue.method.queue
55 changes: 54 additions & 1 deletion src/qoa4ml/collector/socket_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,50 @@


class SocketCollector(BaseCollector):
"""
SocketCollector handles the collection of data over a TCP socket.
Parameters
----------
config : SocketCollectorConfig
Configuration settings for the socket collector.
process_report : Callable
A callable function to process incoming reports.
Attributes
----------
config : SocketCollectorConfig
The socket collector configuration.
host : str
The hostname or IP address to bind the socket.
port : int
The port number to bind the socket.
backlog : int
The maximum number of queued connections.
bufsize : int
The maximum size of data to be received at once.
process_report : Callable
A function to process the received report.
execution_flag : bool
Flag to control the execution loop.
Methods
-------
start_collecting()
Start the socket server to collect and process incoming data.
"""

def __init__(self, config: SocketCollectorConfig, process_report: Callable) -> None:
"""
Initialize an instance of SocketCollector.
Parameters
----------
config : SocketCollectorConfig
Configuration settings for the socket collector.
process_report : Callable
A callable function to process incoming reports.
"""
self.config = config
self.host = config.host
self.port = config.port
Expand All @@ -16,10 +59,20 @@ def __init__(self, config: SocketCollectorConfig, process_report: Callable) -> N
self.process_report = process_report
self.execution_flag = True

def start_collecting(self):
def start_collecting(self) -> None:
"""
Start the socket server to collect and process incoming data.
Notes
-----
- This method starts a TCP socket server that listens for incoming connections.
- Data received from clients is deserialized using pickle and then processed using the `process_report` function.
- The server runs indefinitely until the `execution_flag` is set to False.
"""
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((self.host, self.port))
server_socket.listen(self.backlog)

while self.execution_flag:
client_socket, _ = server_socket.accept()
data = b""
Expand Down
89 changes: 75 additions & 14 deletions src/qoa4ml/connector/amqp_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,51 @@


class AmqpConnector(BaseConnector):
# Init an amqp client handling the connection to amqp servier
"""
AmqpConnector handles the connection to an AMQP server for sending messages.
Parameters
----------
config : AMQPConnectorConfig
Configuration settings for the AMQP connector.
log : bool, optional
A flag to enable logging of messages, default is False.
Attributes
----------
conf : AMQPConnectorConfig
The AMQP connector configuration.
exchange_name : str
The name of the exchange to connect to.
exchange_type : str
The type of the exchange (e.g., 'direct', 'topic').
out_routing_key : str
The routing key for outgoing messages.
log_flag : bool
Flag indicating whether to log messages.
out_connection : pika.BlockingConnection
The connection to the RabbitMQ server.
out_channel : pika.channel.Channel
The channel for communication with RabbitMQ.
Methods
-------
send_report(body_message: str, corr_id: Optional[str] = None, routing_key: Optional[str] = None, expiration: int = 1000)
Send data to the desired destination.
get() -> AMQPConnectorConfig
Get the current configuration of the connector.
"""

def __init__(self, config: AMQPConnectorConfig, log: bool = False):
"""
AMQP connector
configuration: a dictionary include broker and queue information
log: a bool flag for logging message if being set to True, default is False
Initialize an instance of AmqpConnector.
Parameters
----------
config : AMQPConnectorConfig
Configuration settings for the AMQP connector.
log : bool, optional
A flag to enable logging of messages, default is False.
"""
self.conf = config
self.exchange_name = config.exchange_name
Expand All @@ -34,21 +73,37 @@ def __init__(self, config: AMQPConnectorConfig, log: bool = False):
# Create a channel
self.out_channel = self.out_connection.channel()

# Init an Exchange
# Initialize an Exchange
self.out_channel.exchange_declare(
exchange=self.exchange_name, exchange_type=self.exchange_type
)

def send_report(
self,
body_message: str,
corr_id=None,
corr_id: Optional[str] = None,
routing_key: Optional[str] = None,
expiration=1000,
):
# Sending data to desired destination
# if sender is client, it will include the "reply_to" attribute to specify where to reply this message
# if sender is server, it will reply the message to "reply_to" via default exchange
expiration: int = 1000,
) -> None:
"""
Send data to the desired destination.
Parameters
----------
body_message : str
The message body to be sent.
corr_id : str, optional
The correlation ID for the message, default is None.
routing_key : str, optional
The routing key for the message, default is None.
expiration : int, optional
Message expiration time in milliseconds, default is 1000.
Notes
-----
- If `corr_id` is not provided, a new UUID will be generated.
- If `routing_key` is not provided, the default `out_routing_key` will be used.
"""
if corr_id is None:
corr_id = str(uuid.uuid4())
if routing_key is None:
Expand All @@ -62,8 +117,14 @@ def send_report(
properties=self.sub_properties,
body=body_message,
)
# if self.log_flag:
# self.mess_logging.log_request(body_mess,corr_id)

def get(self):
def get(self) -> AMQPConnectorConfig:
"""
Get the current configuration of the connector.
Returns
-------
AMQPConnectorConfig
The AMQP connector configuration.
"""
return self.conf
Loading

0 comments on commit 3aa99b2

Please sign in to comment.