Skip to content

Commit 1253c3e

Browse files
FIX: More generic way of using different (de)-serializers
Signed-off-by: Sebastian Waldbauer <waldbauer@cert.at>
1 parent e97db41 commit 1253c3e

32 files changed

+270
-188
lines changed

intelmq/bin/intelmqdump.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ def main():
349349
if queue_name in pipeline_pipes:
350350
if runtime_config[pipeline_pipes[queue_name]]['group'] == 'Parser' and json.loads(msg)['__type'] == 'Event':
351351
print('Event converted to Report automatically.')
352-
msg = message.Report(message.MessageFactory.unserialize(msg)).serialize()
352+
msg = message.Report(message.MessageFactory.deserialize(msg)).serialize()
353353
else:
354354
print(red(f"The given queue '{queue_name}' is not configured. Please retry with a valid queue."))
355355
break

intelmq/bots/collectors/amqp/collector_amqp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def process(self):
7979
self.logger.exception('Error receiving messages.')
8080
else:
8181
if self.expect_intelmq_message:
82-
message = MessageFactory.unserialize(body.decode())
82+
message = MessageFactory.deserialize(body.decode())
8383
self.send_message(message, auto_add=False)
8484
else:
8585
report = self.new_report()

intelmq/bots/collectors/microsoft/collector_azure.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,7 @@ class MicrosoftAzureCollectorBot(CollectorBot, CacheMixin):
3636

3737
def init(self):
3838
if ContainerClient is None or create_configuration is None:
39-
<<<<<<< HEAD
4039
raise MissingDependencyError("azure-storage-blob", version='>=12.0.0')
41-
=======
42-
raise MissingDependencyError("azure.storage", version='>=12.0.0')
43-
>>>>>>> c78494bb6 (DOC: azure collector: document minimum azure version)
4440

4541
self.config = create_configuration(storage_sdk='blob')
4642
if hasattr(self, 'https_proxy'):

intelmq/bots/outputs/redis/output.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ def process(self):
4141
event = self.receive_message()
4242

4343
try:
44-
self.output.lpush(self.queue,
45-
event.to_json(hierarchical=self.hierarchical_output,
46-
with_type=self.with_type))
44+
self.output.lpush(self.queue, event.to_pack(use_packer=self.use_packer, hierarchical=self.hierarchical, with_type=self.with_type))
4745
except Exception:
4846
self.logger.exception('Failed to send message. Reconnecting.')
4947
self.connect()

intelmq/bots/outputs/udp/output.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def process(self):
3737
del event['raw']
3838

3939
if self.format == 'json':
40-
self.send(self.header + event.to_json())
40+
self.send(self.header + event.to_pack(use_packer=self.format))
4141
elif self.format == 'delimited':
4242
self.send(self.delimited(event))
4343

intelmq/bots/parsers/json/parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def process(self):
2626
lines = [base64_decode(report['raw'])]
2727

2828
for line in lines:
29-
new_event = MessageFactory.unserialize(line,
29+
new_event = MessageFactory.deserialize(line,
3030
harmonization=self.harmonization,
3131
default_type='Event',
3232
use_packer="json")

intelmq/lib/bot.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import inspect
1717
import io
1818
import json
19-
import msgpack
2019
import logging
2120
import os
2221
import re
@@ -102,6 +101,7 @@ class Bot:
102101
statistics_host: str = "127.0.0.1"
103102
statistics_password: Optional[str] = None
104103
statistics_port: int = 6379
104+
use_packer: str = os.environ.get('INTELMQ_USE_PACKER', 'MsgPack')
105105

106106
_message_processed_verb: str = 'Processed'
107107

@@ -330,8 +330,8 @@ def start(self, starting: bool = True, error_on_pipeline: bool = True,
330330
self.logger.error('Pipeline failed.')
331331
self.__disconnect_pipelines()
332332

333-
except exceptions.UnserializationError as exc:
334-
self.logger.exception('Could not unserialize message from pipeline. No retries useful.')
333+
except exceptions.DeserializationError as exc:
334+
self.logger.exception('Could not deserialize message from pipeline. No retries useful.')
335335

336336
# ensure that we do not re-process the faulty message
337337
self.__error_retries_counter = self.error_max_retries + 1
@@ -662,7 +662,7 @@ def receive_message(self) -> libmessage.Message:
662662
return self.receive_message()
663663

664664
try:
665-
self.__current_message = libmessage.MessageFactory.unserialize(message,
665+
self.__current_message = libmessage.MessageFactory.deserialize(message,
666666
harmonization=self.harmonization)
667667
except exceptions.InvalidKey as exc:
668668
# In case a incoming message is malformed an does not conform with the currently
@@ -821,7 +821,7 @@ def __init_logger(self):
821821

822822
def __log_configuration_parameter(self, config_name: str, option: str, value: Any):
823823
if "password" in option or "token" in option:
824-
value = "HIDDEN"
824+
value = "<redacted>"
825825

826826
message = "{} configuration: parameter {!r} loaded with value {!r}." \
827827
.format(config_name.title(), option, value)
@@ -1319,9 +1319,8 @@ def export_event(self, event: libmessage.Event,
13191319
if 'raw' in event:
13201320
del event['raw']
13211321
if return_type is str:
1322-
return event.to_json(hierarchical=self.hierarchical,
1323-
with_type=self.with_type,
1324-
jsondict_as_string=self.jsondict_as_string)
1322+
return event.to_pack(use_packer=self.use_packer, hierarchical=self.hierarchical,
1323+
with_type=self.with_type)
13251324
else:
13261325
retval = event.to_dict(hierarchical=self.hierarchical,
13271326
with_type=self.with_type,

intelmq/lib/bot_debugger.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ def outputappend(self, msg):
169169
def arg2msg(self, msg):
170170
default_type = "Report" if (self.runtime_configuration.get("group", None) == "Parser" or isinstance(self.instance, ParserBot)) else "Event"
171171
try:
172-
msg = MessageFactory.unserialize(msg, default_type=default_type)
172+
msg = MessageFactory.deserialize(msg, default_type=default_type)
173173
except (Exception, KeyError, TypeError, ValueError) as exc:
174174
if exists(msg):
175175
with open(msg) as f:

intelmq/lib/exceptions.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,10 +169,24 @@ def __init__(self, encodings=None, exception: UnicodeDecodeError = None,
169169
super().__init__("Could not decode string%s." % suffix)
170170

171171

172-
class UnserializationError(IntelMQException, ValueError):
172+
class DeserializationError(IntelMQException, ValueError):
173173
"""
174-
Unrecoverable error during message unserialization
174+
Unrecoverable error during message deserialization
175175
"""
176176
def __init__(self, exception: Exception = None, object: bytes = None):
177177
self.object = object
178-
super().__init__("Could not unserialize message%s." % exception)
178+
super().__init__("Could not deserialize message, %s." % exception)
179+
180+
181+
class SerializationError(IntelMQException, ValueError):
182+
"""
183+
Unrecoverable error during message serialization
184+
"""
185+
def __init__(self, exception: Exception = None, object: bytes = None):
186+
self.object = object
187+
super().__init__("Could not serialize message, %s." % exception)
188+
189+
190+
class MissingPackerError(IntelMQException):
191+
def __init__(self, packer: str):
192+
super().__init__(f"Could not load '{packer}' as packer, please check intelmq.lib.packers.{packer.lower()} and documentation")

intelmq/lib/message.py

Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,19 @@
99
Use MessageFactory to get a Message object (types Report and Event).
1010
"""
1111
import hashlib
12+
import importlib
13+
import inspect
1214
import json
1315
import re
1416
import warnings
1517
from collections import defaultdict
1618
from typing import Any, Dict, Iterable, Optional, Sequence, Union
17-
import msgpack
1819

1920
import intelmq.lib.exceptions as exceptions
2021
import intelmq.lib.harmonization
2122
from intelmq import HARMONIZATION_CONF_FILE
2223
from intelmq.lib import utils
24+
from intelmq.lib.packers.packer import Packer
2325

2426
__all__ = ['Event', 'Message', 'MessageFactory', 'Report']
2527
VALID_MESSSAGE_TYPES = ('Event', 'Message', 'Report')
@@ -29,8 +31,8 @@
2931

3032
class MessageFactory:
3133
"""
32-
unserialize: JSON encoded message to object
33-
serialize: object to JSON encoded object
34+
deserialize: packed message to object
35+
serialize: object to packed
3436
"""
3537

3638
@staticmethod
@@ -45,7 +47,7 @@ def from_dict(message: dict, harmonization=None,
4547
default_type: If '__type' is not present in message, the given type will be used
4648
4749
See also:
48-
MessageFactory.unserialize
50+
MessageFactory.deserialize
4951
MessageFactory.serialize
5052
"""
5153
if default_type and "__type" not in message:
@@ -61,8 +63,8 @@ def from_dict(message: dict, harmonization=None,
6163
return class_reference(message, auto=True, harmonization=harmonization)
6264

6365
@staticmethod
64-
def unserialize(raw_message: bytes, harmonization: dict = None,
65-
default_type: Optional[str] = None, use_packer: str = "msgpack") -> dict:
66+
def deserialize(raw_message: bytes, harmonization: dict = None,
67+
default_type: Optional[str] = None, use_packer: str = "MsgPack", **kwargs) -> dict:
6668
"""
6769
Takes JSON-encoded Message object, returns instance of correct class.
6870
@@ -75,19 +77,18 @@ def unserialize(raw_message: bytes, harmonization: dict = None,
7577
MessageFactory.from_dict
7678
MessageFactory.serialize
7779
"""
78-
message = Message.unserialize(raw_message, use_packer=use_packer)
80+
message = Message.deserialize(raw_message, use_packer=use_packer, **kwargs)
7981
return MessageFactory.from_dict(message, harmonization=harmonization,
8082
default_type=default_type)
8183

8284
@staticmethod
83-
def serialize(message) -> bytes:
85+
def serialize(message, use_packer: str = 'MsgPack', **kwargs) -> bytes:
8486
"""
85-
Takes instance of message-derived class and makes JSON-encoded Message.
87+
Takes instance of message-derived class and makes packed Message.
8688
8789
The class is saved in __type attribute.
8890
"""
89-
raw_message = Message.serialize(message)
90-
return raw_message
91+
return Message.serialize(message, use_packer=use_packer, **kwargs)
9192

9293

9394
class Message(dict):
@@ -307,36 +308,43 @@ def copy(self):
307308
return retval
308309

309310
def deep_copy(self):
310-
return MessageFactory.unserialize(MessageFactory.serialize(self),
311+
return MessageFactory.deserialize(MessageFactory.serialize(self),
311312
harmonization={self.__class__.__name__.lower(): self.harmonization_config})
312313

313314
def __str__(self):
314-
return self.serialize(use_packer="json")
315+
return self.serialize(use_packer="JSON")
315316

316-
def serialize(self, use_packer: str = "msgpack"):
317+
def serialize(self, use_packer: str = "MsgPack", **kwargs):
317318
delete_type = False
318319
if '__type' not in self:
319320
delete_type = True
320321
self['__type'] = self.__class__.__name__
321322

322-
if use_packer == "json":
323-
packed = json.dumps(self)
324-
else:
325-
packed = msgpack.packb(self)
323+
try:
324+
packer: Packer = inspect.getmembers(importlib.import_module(f'intelmq.lib.packers.{use_packer.lower()}.packer'))[0][1]()
325+
except:
326+
raise exceptions.MissingPackerError(packer=use_packer)
327+
328+
try:
329+
packed = packer.serialize(data=self, **kwargs)
330+
except Exception as exc:
331+
raise exceptions.SerializationError(exception=exc, object=self)
326332

327333
if delete_type:
328334
del self['__type']
329335
return packed
330336

331337
@staticmethod
332-
def unserialize(message: bytes, use_packer: str = "msgpack"):
338+
def deserialize(message: bytes, use_packer: str = "MsgPack", **kwargs):
333339
try:
334-
if use_packer == "json":
335-
return json.loads(message)
336-
else:
337-
return msgpack.unpackb(message, raw=False)
340+
packer: Packer = inspect.getmembers(importlib.import_module(f'intelmq.lib.packers.{use_packer.lower()}.packer'))[0][1]()
341+
except:
342+
raise exceptions.MissingPackerError(packer=use_packer)
343+
344+
try:
345+
return packer.deserialize(data=message, **kwargs)
338346
except Exception as exc:
339-
raise exceptions.UnserializationError(exception=exc, object=message)
347+
raise exceptions.DeserializationError(exception=exc, object=message)
340348

341349
def __is_valid_key(self, key: str):
342350
try:
@@ -489,13 +497,17 @@ def to_dict(self, hierarchical: bool = False, with_type: bool = False,
489497

490498
return new_dict
491499

492-
def to_json(self, hierarchical=False, with_type=False, jsondict_as_string=False):
493-
json_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
494-
return json.dumps(json_dict, ensure_ascii=False, sort_keys=True)
500+
def to_pack(self, use_packer="MsgPack", hierarchical=False, with_type=False, **kwargs):
501+
try:
502+
packer: Packer = inspect.getmembers(importlib.import_module(f'intelmq.lib.packers.{use_packer.lower()}.packer'))[0][1]()
503+
except:
504+
raise exceptions.MissingPackerError(packer=use_packer)
495505

496-
def to_msgpack(self, hierarchical=False, with_type=False):
497-
msgpack_dict = self.to_dict(hierarchical=hierarchical, with_type=with_type)
498-
return msgpack.packb(msgpack_dict)
506+
try:
507+
data = self.to_dict(hierarchical=hierarchical, with_type=with_type)
508+
return packer.serialize(data, **kwargs)
509+
except Exception as exc:
510+
raise exceptions.SerializationError(exception=exc, object=self)
499511

500512
def __eq__(self, other: dict) -> bool:
501513
"""

0 commit comments

Comments
 (0)