diff --git a/opencti-integration/connectors b/opencti-integration/connectors index f415ff8e73d2..711c337b1365 160000 --- a/opencti-integration/connectors +++ b/opencti-integration/connectors @@ -1 +1 @@ -Subproject commit f415ff8e73d2da93a5623e59034687ab674d6d9b +Subproject commit 711c337b13650e4b28540df85b20b297d6ee5db7 diff --git a/opencti-worker/worker_export.py b/opencti-worker/worker_export.py index aedb0b46969a..10122d8ffe0b 100644 --- a/opencti-worker/worker_export.py +++ b/opencti-worker/worker_export.py @@ -26,19 +26,6 @@ def __init__(self, verbose=True): self.config['opencti']['verbose'] ) - # Initialize the RabbitMQ connection - credentials = pika.PlainCredentials(self.config['rabbitmq']['username'], self.config['rabbitmq']['password']) - connection = pika.BlockingConnection(pika.ConnectionParameters( - host=self.config['rabbitmq']['hostname'], - port=self.config['rabbitmq']['port'], - virtual_host='/', - credentials=credentials - )) - self.channel = connection.channel() - self.channel.exchange_declare(exchange='opencti', exchange_type='topic', durable=True) - self.channel.queue_declare('opencti-export', durable=True) - self.channel.queue_bind(exchange='opencti', queue='opencti-export', routing_key='export.*.*') - def export_action(self, ch, method, properties, body): try: data = json.loads(body) @@ -57,14 +44,26 @@ def export_action(self, ch, method, properties, body): return False def consume(self): - self.channel.basic_consume(queue='opencti-export', on_message_callback=self.export_action, auto_ack=True) - self.channel.start_consuming() + # Initialize the RabbitMQ connection + credentials = pika.PlainCredentials(self.config['rabbitmq']['username'], self.config['rabbitmq']['password']) + connection = pika.BlockingConnection(pika.ConnectionParameters( + host=self.config['rabbitmq']['hostname'], + port=self.config['rabbitmq']['port'], + virtual_host='/', + credentials=credentials + )) + channel = connection.channel() + channel.exchange_declare(exchange='opencti', exchange_type='topic', durable=True) + channel.queue_declare('opencti-export', durable=True) + channel.queue_bind(exchange='opencti', queue='opencti-export', routing_key='export.*.*') + channel.basic_consume(queue='opencti-export', on_message_callback=self.export_action, auto_ack=True) + channel.start_consuming() if __name__ == '__main__': + worker_export = WorkerExport() while True: try: - worker_export = WorkerExport() worker_export.consume() except Exception as e: print(e) diff --git a/opencti-worker/worker_import.py b/opencti-worker/worker_import.py index 56f1846b000c..97c5a78e8f0f 100644 --- a/opencti-worker/worker_import.py +++ b/opencti-worker/worker_import.py @@ -26,19 +26,6 @@ def __init__(self, verbose=True): self.config['opencti']['verbose'] ) - # Initialize the RabbitMQ connection - credentials = pika.PlainCredentials(self.config['rabbitmq']['username'], self.config['rabbitmq']['password']) - connection = pika.BlockingConnection(pika.ConnectionParameters( - host=self.config['rabbitmq']['hostname'], - port=self.config['rabbitmq']['port'], - virtual_host='/', - credentials=credentials - )) - self.channel = connection.channel() - self.channel.exchange_declare(exchange='opencti', exchange_type='topic', durable=True) - self.channel.queue_declare('opencti-import', durable=True) - self.channel.queue_bind(exchange='opencti', queue='opencti-import', routing_key='import.*.*') - def import_action(self, ch, method, properties, body): try: data = json.loads(body) @@ -50,14 +37,26 @@ def import_action(self, ch, method, properties, body): return False def consume(self): - self.channel.basic_consume(queue='opencti-import', on_message_callback=self.import_action, auto_ack=True) - self.channel.start_consuming() + # Initialize the RabbitMQ connection + credentials = pika.PlainCredentials(self.config['rabbitmq']['username'], self.config['rabbitmq']['password']) + connection = pika.BlockingConnection(pika.ConnectionParameters( + host=self.config['rabbitmq']['hostname'], + port=self.config['rabbitmq']['port'], + virtual_host='/', + credentials=credentials + )) + channel = connection.channel() + channel.exchange_declare(exchange='opencti', exchange_type='topic', durable=True) + channel.queue_declare('opencti-import', durable=True) + channel.queue_bind(exchange='opencti', queue='opencti-import', routing_key='import.*.*') + channel.basic_consume(queue='opencti-import', on_message_callback=self.import_action, auto_ack=True) + channel.start_consuming() if __name__ == '__main__': + worker_import = WorkerImport() while True: try: - worker_import = WorkerImport() worker_import.consume() except Exception as e: print(e)