From d335216490d424e65523ddd91ea314cb0c9df5b4 Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Mon, 5 Aug 2024 14:30:51 +0200 Subject: [PATCH 1/2] flatten command field types for the jsonpacker --- flow/record/jsonpacker.py | 4 ++++ tests/test_json_packer.py | 17 +++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/flow/record/jsonpacker.py b/flow/record/jsonpacker.py index 96c646e..18ddd19 100644 --- a/flow/record/jsonpacker.py +++ b/flow/record/jsonpacker.py @@ -55,6 +55,10 @@ def pack_obj(self, obj): elif field_type == "boolean" and isinstance(serial[field_name], int): serial[field_name] = bool(serial[field_name]) + # Flatten command type + elif field_type == "command" and isinstance(serial[field_name], fieldtypes.command): + serial[field_name] = serial[field_name]._join() + return serial if isinstance(obj, RecordDescriptor): serial = { diff --git a/tests/test_json_packer.py b/tests/test_json_packer.py index 2ea2883..7609467 100644 --- a/tests/test_json_packer.py +++ b/tests/test_json_packer.py @@ -90,3 +90,20 @@ def test_record_pack_bool_regression() -> None: # pack the json string back to a record and make sure it is the same as before assert packer.unpack(data) == record + + +def test_record_pack_command_type() -> None: + TestRecord = RecordDescriptor( + "test/record_with_commands", + [ + ("command", "win_command"), + ("command", "nix_command"), + ], + ) + + record = TestRecord(win_command="foo.exe /H /E /L /O", nix_command="/bin/bash -c 'echo hello'") + packer = JsonRecordPacker() + data = packer.pack(record) + + assert data.startswith('{"win_command": "foo.exe /H /E /L /O", "nix_command": "/bin/bash -c \'echo hello\'", ') + assert packer.unpack(data) == record From 09dda7922628a96f7c4d776abf40586035c8500f Mon Sep 17 00:00:00 2001 From: JSCU-CNI <121175071+JSCU-CNI@users.noreply.github.com> Date: Wed, 14 Aug 2024 11:31:47 +0200 Subject: [PATCH 2/2] correctly pack records for elastic adapter --- flow/record/adapter/elastic.py | 4 +-- tests/test_elastic_adapter.py | 45 +++++++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/flow/record/adapter/elastic.py b/flow/record/adapter/elastic.py index f5469e3..23e70ef 100644 --- a/flow/record/adapter/elastic.py +++ b/flow/record/adapter/elastic.py @@ -59,7 +59,7 @@ def __init__( api_key=api_key, ) - self.json_packer = JsonRecordPacker() + self.json_packer = JsonRecordPacker(pack_descriptors=False) self.queue: queue.Queue[Union[Record, StopIteration]] = queue.Queue() self.event = threading.Event() self.thread = threading.Thread(target=self.streaming_bulk_thread) @@ -78,7 +78,7 @@ def __init__( def record_to_document(self, record: Record, index: str) -> dict: """Convert a record to a Elasticsearch compatible document dictionary""" - rdict = record._asdict() + rdict = self.json_packer.pack_obj(record) # Store record metadata under `_record_metadata`. rdict_meta = { diff --git a/tests/test_elastic_adapter.py b/tests/test_elastic_adapter.py index c70012d..d6aaa85 100644 --- a/tests/test_elastic_adapter.py +++ b/tests/test_elastic_adapter.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import json import pytest @@ -10,18 +12,50 @@ [ ("string", "field_one"), ("string", "field_two"), + ("string", "field_three"), + ], +) + +AnotherRecord = RecordDescriptor( + "my/record", + [ + ("command", "field_one"), + ("boolean", "field_two"), + ("bytes", "field_three"), ], ) @pytest.mark.parametrize( - "record", + "record, expected_output", [ - MyRecord("first", "record"), - MyRecord("second", "record"), + ( + MyRecord("first", "record", "!"), + { + "field_one": "first", + "field_two": "record", + "field_three": "!", + }, + ), + ( + MyRecord("second", "record", "!"), + { + "field_one": "second", + "field_two": "record", + "field_three": "!", + }, + ), + ( + AnotherRecord("/bin/bash -c 'echo hello'", False, b"\x01\x02\x03\x04"), + { + "field_one": "/bin/bash -c 'echo hello'", + "field_two": False, + "field_three": "AQIDBA==", + }, + ), ], ) -def test_elastic_writer_metadata(record): +def test_elastic_writer_metadata(record: MyRecord | AnotherRecord, expected_output: dict) -> None: options = { "_meta_foo": "some value", "_meta_bar": "another value", @@ -34,8 +68,7 @@ def test_elastic_writer_metadata(record): "_index": "some-index", "_source": json.dumps( { - "field_one": record.field_one, - "field_two": record.field_two, + **expected_output, "_record_metadata": { "descriptor": { "name": "my/record",