From cfeb127b65a7730ed4dd913bce696efe1ea99319 Mon Sep 17 00:00:00 2001 From: Arker123 Date: Tue, 26 Sep 2023 12:17:43 +0530 Subject: [PATCH 01/16] Implement UTF-8 Decoder --- floss/language/rust/decode_utf8.py | 105 +++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 floss/language/rust/decode_utf8.py diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py new file mode 100644 index 000000000..d2e8253ea --- /dev/null +++ b/floss/language/rust/decode_utf8.py @@ -0,0 +1,105 @@ +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +import pefile +import logging +import argparse +import pathlib +import sys + +MIN_STR_LEN = 4 + +logger = logging.getLogger(__name__) + +def get_rdata_section(pe: pefile.PE) -> pefile.SectionStructure: + for section in pe.sections: + if section.Name.startswith(b".rdata\x00"): + return section + + raise ValueError("no .rdata section found") + +def extract_utf8_strings(pe, min_length=MIN_STR_LEN): + try: + rdata_section = get_rdata_section(pe) + except ValueError as e: + print("cannot extract rust strings: %s", e) + return [] + + strings = rdata_section.get_data() + + character_and_index = [] + + # Reference: https://en.wikipedia.org/wiki/UTF-8 + + for i in range(0, len(strings)): + # for 1 byte + if strings[i] & 0x80 == 0x00: + character = strings[i].to_bytes(1, "big").decode("utf-8", "ignore") + character_and_index.append([character, i, 1]) + + # for 2 bytes + elif strings[i] & 0xE0 == 0xC0: + temp = strings[i] << 8 | strings[i+1] + character = temp.to_bytes(2, "big").decode("utf-8", "ignore") + i += 1 + character_and_index.append([character, i, 2]) + + # for 3 bytes + elif strings[i] & 0xF0 == 0xE0: + temp = strings[i] << 16 | strings[i+1] << 8 | strings[i+2] + character = temp.to_bytes(3, "big").decode("utf-8", "ignore") + i += 2 + character_and_index.append([character, i, 3]) + + # for 4 bytes + elif strings[i] & 0xF8 == 0xF0: + temp = strings[i] << 24 | strings[i+1] << 16 | strings[i+2] << 8 | strings[i+3] + character = temp.to_bytes(4, "big").decode("utf-8", "ignore") + i += 3 + character_and_index.append([character, i, 4]) + + + strings = [] # string, start index, end index + + # check for consecutive characters and convert to string + for i in range(0, len(character_and_index)): + if i == 0: + strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]]) + else: + if character_and_index[i-1][1] + character_and_index[i-1][2] == character_and_index[i][1] and character_and_index[i][0].isprintable() == True: + strings[-1][0] += character_and_index[i][0] + strings[-1][2] = character_and_index[i][1] + else: + if character_and_index[i][0].isprintable() == True: + strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]]) + + # filter strings less than min length + strings = [string for string in strings if len(string[0]) >= min_length] + + return strings + + +def main(argv=None): + parser = argparse.ArgumentParser(description="Get Rust strings") + parser.add_argument("path", help="file or path to analyze") + parser.add_argument( + "-n", + "--minimum-length", + dest="min_length", + type=int, + default=MIN_STR_LEN, + help="minimum string length", + ) + args = parser.parse_args(args=argv) + + logging.basicConfig(level=logging.DEBUG) + + pe = pathlib.Path(args.path) + buf = pe.read_bytes() + pe = pefile.PE(data=buf, fast_load=True) + + strings = extract_utf8_strings(pe, args.min_length) + for string in strings: + print(string[0]) + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file From e083376da2bd41b52e512e0546b3d7e3b66b66ab Mon Sep 17 00:00:00 2001 From: Arker123 Date: Tue, 26 Sep 2023 12:31:39 +0530 Subject: [PATCH 02/16] Tweaks --- floss/language/rust/decode_utf8.py | 32 +++++++++++++++++++----------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py index d2e8253ea..3637e97de 100644 --- a/floss/language/rust/decode_utf8.py +++ b/floss/language/rust/decode_utf8.py @@ -1,14 +1,16 @@ # Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. -import pefile +import sys import logging -import argparse import pathlib -import sys +import argparse + +import pefile MIN_STR_LEN = 4 logger = logging.getLogger(__name__) + def get_rdata_section(pe: pefile.PE) -> pefile.SectionStructure: for section in pe.sections: if section.Name.startswith(b".rdata\x00"): @@ -16,7 +18,11 @@ def get_rdata_section(pe: pefile.PE) -> pefile.SectionStructure: raise ValueError("no .rdata section found") -def extract_utf8_strings(pe, min_length=MIN_STR_LEN): + +def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN): + """ + Extracts UTF-8 strings from the .rdata section of a PE file. + """ try: rdata_section = get_rdata_section(pe) except ValueError as e: @@ -27,7 +33,7 @@ def extract_utf8_strings(pe, min_length=MIN_STR_LEN): character_and_index = [] - # Reference: https://en.wikipedia.org/wiki/UTF-8 + # Reference: https://en.wikipedia.org/wiki/UTF-8 for i in range(0, len(strings)): # for 1 byte @@ -37,34 +43,36 @@ def extract_utf8_strings(pe, min_length=MIN_STR_LEN): # for 2 bytes elif strings[i] & 0xE0 == 0xC0: - temp = strings[i] << 8 | strings[i+1] + temp = strings[i] << 8 | strings[i + 1] character = temp.to_bytes(2, "big").decode("utf-8", "ignore") i += 1 character_and_index.append([character, i, 2]) # for 3 bytes elif strings[i] & 0xF0 == 0xE0: - temp = strings[i] << 16 | strings[i+1] << 8 | strings[i+2] + temp = strings[i] << 16 | strings[i + 1] << 8 | strings[i + 2] character = temp.to_bytes(3, "big").decode("utf-8", "ignore") i += 2 character_and_index.append([character, i, 3]) # for 4 bytes elif strings[i] & 0xF8 == 0xF0: - temp = strings[i] << 24 | strings[i+1] << 16 | strings[i+2] << 8 | strings[i+3] + temp = strings[i] << 24 | strings[i + 1] << 16 | strings[i + 2] << 8 | strings[i + 3] character = temp.to_bytes(4, "big").decode("utf-8", "ignore") i += 3 character_and_index.append([character, i, 4]) - - strings = [] # string, start index, end index + strings = [] # string, start index, end index # check for consecutive characters and convert to string for i in range(0, len(character_and_index)): if i == 0: strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]]) else: - if character_and_index[i-1][1] + character_and_index[i-1][2] == character_and_index[i][1] and character_and_index[i][0].isprintable() == True: + if ( + character_and_index[i - 1][1] + character_and_index[i - 1][2] == character_and_index[i][1] + and character_and_index[i][0].isprintable() == True + ): strings[-1][0] += character_and_index[i][0] strings[-1][2] = character_and_index[i][1] else: @@ -102,4 +110,4 @@ def main(argv=None): if __name__ == "__main__": - sys.exit(main()) \ No newline at end of file + sys.exit(main()) From 4a54532bef8137a28341d3cf65f13970dd7a5458 Mon Sep 17 00:00:00 2001 From: Arker123 Date: Mon, 2 Oct 2023 09:04:22 +0530 Subject: [PATCH 03/16] Minor changes --- floss/language/rust/decode_utf8.py | 23 +++++++++++------------ floss/language/rust/extract.py | 13 +++++-------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py index 3637e97de..40b2b9c86 100644 --- a/floss/language/rust/decode_utf8.py +++ b/floss/language/rust/decode_utf8.py @@ -3,6 +3,7 @@ import logging import pathlib import argparse +from typing import List, Tuple, Iterable, Optional import pefile @@ -19,7 +20,7 @@ def get_rdata_section(pe: pefile.PE) -> pefile.SectionStructure: raise ValueError("no .rdata section found") -def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN): +def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Tuple[str, int, int]]: """ Extracts UTF-8 strings from the .rdata section of a PE file. """ @@ -64,20 +65,18 @@ def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN): strings = [] # string, start index, end index - # check for consecutive characters and convert to string + prev = False + for i in range(0, len(character_and_index)): - if i == 0: - strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]]) - else: - if ( - character_and_index[i - 1][1] + character_and_index[i - 1][2] == character_and_index[i][1] - and character_and_index[i][0].isprintable() == True - ): + if character_and_index[i][0].isprintable() == True: + if prev == False: + strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]]) + prev = True + else: strings[-1][0] += character_and_index[i][0] strings[-1][2] = character_and_index[i][1] - else: - if character_and_index[i][0].isprintable() == True: - strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]]) + else: + prev = False # filter strings less than min length strings = [string for string in strings if len(string[0]) >= min_length] diff --git a/floss/language/rust/extract.py b/floss/language/rust/extract.py index f67e57c82..b8b773fb7 100644 --- a/floss/language/rust/extract.py +++ b/floss/language/rust/extract.py @@ -11,6 +11,7 @@ from floss.results import StaticString, StringEncoding from floss.language.utils import find_lea_xrefs, find_mov_xrefs, find_push_xrefs, get_struct_string_candidates +from floss.language.rust.decode_utf8 import extract_utf8_strings logger = logging.getLogger(__name__) @@ -26,18 +27,14 @@ def get_rdata_section(pe: pefile.PE) -> pefile.SectionStructure: def filter_and_transform_utf8_strings( - strings: List[Tuple[str, str, Tuple[int, int], bool]], + strings: List[Tuple[str, int, int]], start_rdata: int, ) -> List[StaticString]: transformed_strings = [] for string in strings: s = string[0] - string_type = string[1] - start = string[2][0] + start_rdata - - if string_type != "UTF8": - continue + start = string[1] + start_rdata # our static algorithm does not extract new lines either s = s.replace("\n", "") @@ -98,8 +95,8 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt virtual_address = rdata_section.VirtualAddress pointer_to_raw_data = rdata_section.PointerToRawData - # extract utf-8 and wide strings, latter not needed here - strings = b2s.extract_all_strings(rdata_section.get_data(), min_length) + # extract utf-8 strings + strings = extract_utf8_strings(pe, min_length) # select only UTF-8 strings and adjust offset static_strings = filter_and_transform_utf8_strings(strings, start_rdata) From 775f1cec9835ff0a62c5dcb6ea953fd3c3733582 Mon Sep 17 00:00:00 2001 From: Arker123 Date: Mon, 2 Oct 2023 10:42:21 +0530 Subject: [PATCH 04/16] Discovered more i386 xrefs --- floss/language/rust/extract.py | 11 +++++++++-- floss/language/utils.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/floss/language/rust/extract.py b/floss/language/rust/extract.py index b8b773fb7..107cce6b6 100644 --- a/floss/language/rust/extract.py +++ b/floss/language/rust/extract.py @@ -10,7 +10,13 @@ import binary2strings as b2s from floss.results import StaticString, StringEncoding -from floss.language.utils import find_lea_xrefs, find_mov_xrefs, find_push_xrefs, get_struct_string_candidates +from floss.language.utils import ( + find_lea_xrefs, + find_mov_xrefs, + find_push_xrefs, + get_raw_xrefs_rdata_i386, + get_struct_string_candidates, +) from floss.language.rust.decode_utf8 import extract_utf8_strings logger = logging.getLogger(__name__) @@ -107,7 +113,8 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt xrefs_lea = find_lea_xrefs(pe) xrefs_push = find_push_xrefs(pe) xrefs_mov = find_mov_xrefs(pe) - xrefs = itertools.chain(struct_string_addrs, xrefs_lea, xrefs_push, xrefs_mov) + xrefs_raw_rdata = get_raw_xrefs_rdata_i386(pe, rdata_section.get_data()) + xrefs = itertools.chain(struct_string_addrs, xrefs_lea, xrefs_push, xrefs_mov, xrefs_raw_rdata) elif pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_AMD64"]: xrefs_lea = find_lea_xrefs(pe) diff --git a/floss/language/utils.py b/floss/language/utils.py index e97c4fa47..101ccb35c 100644 --- a/floss/language/utils.py +++ b/floss/language/utils.py @@ -465,6 +465,34 @@ def get_struct_string_candidates(pe: pefile.PE) -> Iterable[StructString]: # dozens of seconds or more (suspect many minutes). +def get_raw_xrefs_rdata_i386(pe: pefile.PE, buf: bytes) -> Iterable[VA]: + """ + scan for raw xrefs in .rdata section + """ + format = "I" + + if not buf: + return + + low, high = get_image_range(pe) + + # using array module as a high-performance way to access the data as fixed-sized words. + words = iter(array.array(format, buf)) + + last = next(words) + for current in words: + address = last + last = current + + if address == 0x0: + continue + + if not (low <= address < high): + continue + + yield address + + def get_extract_stats( pe: pefile, all_ss_strings: List[StaticString], lang_strings: List[StaticString], min_len: int, min_blob_len=0 ) -> float: From 18e60803f77f73c08b743d95b0c55adcd78e9065 Mon Sep 17 00:00:00 2001 From: Arker123 Date: Mon, 2 Oct 2023 10:45:18 +0530 Subject: [PATCH 05/16] =?UTF-8?q?Enhance=20percentage=20extraction=20to=20?= =?UTF-8?q?91%=20=F0=9F=98=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_language_rust_coverage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_language_rust_coverage.py b/tests/test_language_rust_coverage.py index f6cc25bba..4ac226c42 100644 --- a/tests/test_language_rust_coverage.py +++ b/tests/test_language_rust_coverage.py @@ -54,4 +54,4 @@ def test_language_detection_64(binary_file): out = get_extract_stats(pe, all_ss_strings, rust_strings, n) # check that the output percentage is greater than 88% - assert float(out) > 88 + assert float(out) > 91 From 851589997171b1f7dc92c656c3b027791fd6188f Mon Sep 17 00:00:00 2001 From: Arker123 Date: Fri, 10 Nov 2023 17:09:21 +0530 Subject: [PATCH 06/16] Clean up --- floss/language/rust/extract.py | 13 +++---------- floss/language/utils.py | 28 ---------------------------- tests/test_language_rust_coverage.py | 4 ++-- 3 files changed, 5 insertions(+), 40 deletions(-) diff --git a/floss/language/rust/extract.py b/floss/language/rust/extract.py index 9059d69a3..78d26337b 100644 --- a/floss/language/rust/extract.py +++ b/floss/language/rust/extract.py @@ -10,13 +10,7 @@ import binary2strings as b2s from floss.results import StaticString, StringEncoding -from floss.language.utils import ( - find_lea_xrefs, - find_mov_xrefs, - find_push_xrefs, - get_raw_xrefs_rdata_i386, - get_struct_string_candidates, -) +from floss.language.utils import find_lea_xrefs, find_mov_xrefs, find_push_xrefs, get_struct_string_candidates from floss.language.rust.decode_utf8 import extract_utf8_strings logger = logging.getLogger(__name__) @@ -145,7 +139,7 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt strings = extract_utf8_strings(pe, min_length) # select only UTF-8 strings and adjust offset - static_strings = filter_and_transform_utf8_strings(fixed_strings, start_rdata) + static_strings = filter_and_transform_utf8_strings(strings, start_rdata) struct_string_addrs = map(lambda c: c.address, get_struct_string_candidates(pe)) @@ -153,8 +147,7 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt xrefs_lea = find_lea_xrefs(pe) xrefs_push = find_push_xrefs(pe) xrefs_mov = find_mov_xrefs(pe) - xrefs_raw_rdata = get_raw_xrefs_rdata_i386(pe, rdata_section.get_data()) - xrefs = itertools.chain(struct_string_addrs, xrefs_lea, xrefs_push, xrefs_mov, xrefs_raw_rdata) + xrefs = itertools.chain(struct_string_addrs, xrefs_lea, xrefs_push, xrefs_mov) elif pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_AMD64"]: xrefs_lea = find_lea_xrefs(pe) diff --git a/floss/language/utils.py b/floss/language/utils.py index 101ccb35c..e97c4fa47 100644 --- a/floss/language/utils.py +++ b/floss/language/utils.py @@ -465,34 +465,6 @@ def get_struct_string_candidates(pe: pefile.PE) -> Iterable[StructString]: # dozens of seconds or more (suspect many minutes). -def get_raw_xrefs_rdata_i386(pe: pefile.PE, buf: bytes) -> Iterable[VA]: - """ - scan for raw xrefs in .rdata section - """ - format = "I" - - if not buf: - return - - low, high = get_image_range(pe) - - # using array module as a high-performance way to access the data as fixed-sized words. - words = iter(array.array(format, buf)) - - last = next(words) - for current in words: - address = last - last = current - - if address == 0x0: - continue - - if not (low <= address < high): - continue - - yield address - - def get_extract_stats( pe: pefile, all_ss_strings: List[StaticString], lang_strings: List[StaticString], min_len: int, min_blob_len=0 ) -> float: diff --git a/tests/test_language_rust_coverage.py b/tests/test_language_rust_coverage.py index 4ac226c42..b8668e5a7 100644 --- a/tests/test_language_rust_coverage.py +++ b/tests/test_language_rust_coverage.py @@ -53,5 +53,5 @@ def test_language_detection_64(binary_file): with contextlib.redirect_stdout(None): out = get_extract_stats(pe, all_ss_strings, rust_strings, n) - # check that the output percentage is greater than 88% - assert float(out) > 91 + # check that the output percentage is greater than 86% + assert float(out) > 86 # increase to 91 after merging PR #899 From 51525ae36dc76bad8b9b3cf4cfee77f145d0827b Mon Sep 17 00:00:00 2001 From: ark Date: Sun, 23 Jun 2024 18:57:26 +0530 Subject: [PATCH 07/16] Added extract_utf8_strings_from_buffer --- floss/language/rust/decode_utf8.py | 68 ++++++++++++++++-------------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py index 40b2b9c86..fb5f7402d 100644 --- a/floss/language/rust/decode_utf8.py +++ b/floss/language/rust/decode_utf8.py @@ -20,61 +20,52 @@ def get_rdata_section(pe: pefile.PE) -> pefile.SectionStructure: raise ValueError("no .rdata section found") -def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Tuple[str, int, int]]: +def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[Tuple[str, int]]: """ - Extracts UTF-8 strings from the .rdata section of a PE file. + Extracts UTF-8 strings from a buffer. """ - try: - rdata_section = get_rdata_section(pe) - except ValueError as e: - print("cannot extract rust strings: %s", e) - return [] - - strings = rdata_section.get_data() - - character_and_index = [] # Reference: https://en.wikipedia.org/wiki/UTF-8 - for i in range(0, len(strings)): + strings = [] + + for i in range(0, len(buf)): # for 1 byte - if strings[i] & 0x80 == 0x00: - character = strings[i].to_bytes(1, "big").decode("utf-8", "ignore") - character_and_index.append([character, i, 1]) + if buf[i] & 0x80 == 0x00: + character = buf[i].to_bytes(1, "big").decode("utf-8", "ignore") + strings.append([character, i]) # for 2 bytes - elif strings[i] & 0xE0 == 0xC0: - temp = strings[i] << 8 | strings[i + 1] + elif buf[i] & 0xE0 == 0xC0: + temp = buf[i] << 8 | buf[i + 1] character = temp.to_bytes(2, "big").decode("utf-8", "ignore") i += 1 - character_and_index.append([character, i, 2]) + strings.append([character, i]) # for 3 bytes - elif strings[i] & 0xF0 == 0xE0: - temp = strings[i] << 16 | strings[i + 1] << 8 | strings[i + 2] + elif buf[i] & 0xF0 == 0xE0: + temp = buf[i] << 16 | buf[i + 1] << 8 | buf[i + 2] character = temp.to_bytes(3, "big").decode("utf-8", "ignore") i += 2 - character_and_index.append([character, i, 3]) + strings.append([character, i]) # for 4 bytes - elif strings[i] & 0xF8 == 0xF0: - temp = strings[i] << 24 | strings[i + 1] << 16 | strings[i + 2] << 8 | strings[i + 3] + elif buf[i] & 0xF8 == 0xF0: + temp = buf[i] << 24 | buf[i + 1] << 16 | buf[i + 2] << 8 | buf[i + 3] character = temp.to_bytes(4, "big").decode("utf-8", "ignore") i += 3 - character_and_index.append([character, i, 4]) - - strings = [] # string, start index, end index + strings.append([character, i]) prev = False - for i in range(0, len(character_and_index)): - if character_and_index[i][0].isprintable() == True: + for i in range(0, len(strings)): + if strings[i][0].isprintable() == True: if prev == False: - strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]]) + strings.append([strings[i][0], strings[i][1]]) prev = True else: - strings[-1][0] += character_and_index[i][0] - strings[-1][2] = character_and_index[i][1] + strings[-1][0] += strings[i][0] + strings[-1][1] = strings[i][1] else: prev = False @@ -84,6 +75,21 @@ def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Tuple[st return strings +def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Tuple[str, int, int]]: + """ + Extracts UTF-8 strings from the .rdata section of a PE file. + """ + try: + rdata_section = get_rdata_section(pe) + except ValueError as e: + print("cannot extract rust strings: %s", e) + return [] + + buf = pe.get_memory_mapped_image()[rdata_section.VirtualAddress : rdata_section.VirtualAddress + rdata_section.SizeOfRawData] + strings = extract_utf8_strings_from_buffer(buf, min_length) + return strings + + def main(argv=None): parser = argparse.ArgumentParser(description="Get Rust strings") parser.add_argument("path", help="file or path to analyze") From 1f5f3eb40a2a43ce826cb23052a1280d2cc0753c Mon Sep 17 00:00:00 2001 From: ark Date: Sun, 23 Jun 2024 18:58:52 +0530 Subject: [PATCH 08/16] Code style --- floss/language/rust/decode_utf8.py | 8 +++++--- floss/main.py | 24 +++++++++++++++--------- floss/render/default.py | 16 ++++++++++------ 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py index fb5f7402d..9f0672e21 100644 --- a/floss/language/rust/decode_utf8.py +++ b/floss/language/rust/decode_utf8.py @@ -75,7 +75,7 @@ def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[Tuple[ return strings -def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Tuple[str, int, int]]: +def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Tuple[str, int, int]]: """ Extracts UTF-8 strings from the .rdata section of a PE file. """ @@ -84,8 +84,10 @@ def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Tuple[st except ValueError as e: print("cannot extract rust strings: %s", e) return [] - - buf = pe.get_memory_mapped_image()[rdata_section.VirtualAddress : rdata_section.VirtualAddress + rdata_section.SizeOfRawData] + + buf = pe.get_memory_mapped_image()[ + rdata_section.VirtualAddress : rdata_section.VirtualAddress + rdata_section.SizeOfRawData + ] strings = extract_utf8_strings_from_buffer(buf, min_length) return strings diff --git a/floss/main.py b/floss/main.py index acd288acf..0793c4e3b 100644 --- a/floss/main.py +++ b/floss/main.py @@ -213,9 +213,11 @@ def make_parser(argv): type=lambda x: int(x, 0x10), default=None, nargs="+", - help="only analyze the specified functions, hex-encoded like 0x401000, space-separate multiple functions" - if show_all_options - else argparse.SUPPRESS, + help=( + "only analyze the specified functions, hex-encoded like 0x401000, space-separate multiple functions" + if show_all_options + else argparse.SUPPRESS + ), ) advanced_group.add_argument( "--disable-progress", @@ -226,17 +228,21 @@ def make_parser(argv): "--signatures", type=str, default=SIGNATURES_PATH_DEFAULT_STRING, - help="path to .sig/.pat file or directory used to identify library functions, use embedded signatures by default" - if show_all_options - else argparse.SUPPRESS, + help=( + "path to .sig/.pat file or directory used to identify library functions, use embedded signatures by default" + if show_all_options + else argparse.SUPPRESS + ), ) advanced_group.add_argument( "-L", "--large-file", action="store_true", - help="allow processing files larger than {} MB".format(int(MAX_FILE_SIZE / MEGABYTE)) - if show_all_options - else argparse.SUPPRESS, + help=( + "allow processing files larger than {} MB".format(int(MAX_FILE_SIZE / MEGABYTE)) + if show_all_options + else argparse.SUPPRESS + ), ) advanced_group.add_argument( "--version", diff --git a/floss/render/default.py b/floss/render/default.py index b145539cd..780228099 100644 --- a/floss/render/default.py +++ b/floss/render/default.py @@ -80,15 +80,19 @@ def render_string_type_rows(results: ResultDocument) -> List[Tuple[str, str]]: return [ ( " static strings", - f"{len_ss:>{len(str(len_ss))}} ({len_chars_ss:>{len(str(len_chars_ss))}d} characters)" - if results.analysis.enable_static_strings - else DISABLED, + ( + f"{len_ss:>{len(str(len_ss))}} ({len_chars_ss:>{len(str(len_chars_ss))}d} characters)" + if results.analysis.enable_static_strings + else DISABLED + ), ), ( " language strings", - f"{len_ls:>{len(str(len_ss))}} ({len_chars_ls:>{len(str(len_chars_ss))}d} characters)" - if results.metadata.language - else DISABLED, + ( + f"{len_ls:>{len(str(len_ss))}} ({len_chars_ls:>{len(str(len_chars_ss))}d} characters)" + if results.metadata.language + else DISABLED + ), ), ( " stack strings", From a5e46ae98dd6e9b2d6b7d3425fc27a28f36e1dae Mon Sep 17 00:00:00 2001 From: Arker123 Date: Sun, 23 Jun 2024 19:36:08 +0530 Subject: [PATCH 09/16] Tweaks --- floss/language/rust/decode_utf8.py | 8 +++++--- floss/language/rust/extract.py | 6 +++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py index 9f0672e21..d2992524f 100644 --- a/floss/language/rust/decode_utf8.py +++ b/floss/language/rust/decode_utf8.py @@ -3,7 +3,7 @@ import logging import pathlib import argparse -from typing import List, Tuple, Iterable, Optional +from typing import Any, List, Tuple, Iterable, Optional import pefile @@ -20,7 +20,7 @@ def get_rdata_section(pe: pefile.PE) -> pefile.SectionStructure: raise ValueError("no .rdata section found") -def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[Tuple[str, int]]: +def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[List[Any]]: """ Extracts UTF-8 strings from a buffer. """ @@ -72,10 +72,12 @@ def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[Tuple[ # filter strings less than min length strings = [string for string in strings if len(string[0]) >= min_length] + print(strings) + return strings -def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Tuple[str, int, int]]: +def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[List[Any]]: """ Extracts UTF-8 strings from the .rdata section of a PE file. """ diff --git a/floss/language/rust/extract.py b/floss/language/rust/extract.py index b56a10a92..543633227 100644 --- a/floss/language/rust/extract.py +++ b/floss/language/rust/extract.py @@ -4,7 +4,7 @@ import pathlib import argparse import itertools -from typing import List, Tuple, Iterable, Optional +from typing import Any, List, Tuple, Iterable, Optional import pefile import binary2strings as b2s @@ -60,7 +60,7 @@ def fix_b2s_wide_strings( def filter_and_transform_utf8_strings( - strings: List[Tuple[str, int, int]], + strings: list[list[Any]], start_rdata: int, ) -> List[StaticString]: transformed_strings = [] @@ -148,7 +148,7 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt buffer_rdata = rdata_section.get_data() # extract utf-8 strings - strings = extract_utf8_strings(pe, min_length) + fixed_strings = extract_utf8_strings(pe, min_length) # select only UTF-8 strings and adjust offset static_strings = filter_and_transform_utf8_strings(fixed_strings, start_rdata) From 310584384feb7f727386ced8757a6b9d816981c7 Mon Sep 17 00:00:00 2001 From: Arker123 Date: Sun, 23 Jun 2024 19:39:20 +0530 Subject: [PATCH 10/16] Minor bug --- floss/language/rust/extract.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/floss/language/rust/extract.py b/floss/language/rust/extract.py index 543633227..7c1037041 100644 --- a/floss/language/rust/extract.py +++ b/floss/language/rust/extract.py @@ -60,7 +60,7 @@ def fix_b2s_wide_strings( def filter_and_transform_utf8_strings( - strings: list[list[Any]], + strings: List[List[Any]], start_rdata: int, ) -> List[StaticString]: transformed_strings = [] From 7481274b0de1561fae8b0dc5802dffe0ac35ee18 Mon Sep 17 00:00:00 2001 From: Arker123 Date: Sun, 23 Jun 2024 20:33:23 +0530 Subject: [PATCH 11/16] Tweaks --- floss/language/rust/decode_utf8.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py index d2992524f..11983fe62 100644 --- a/floss/language/rust/decode_utf8.py +++ b/floss/language/rust/decode_utf8.py @@ -27,53 +27,52 @@ def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[List[A # Reference: https://en.wikipedia.org/wiki/UTF-8 + character_and_index = [] strings = [] for i in range(0, len(buf)): # for 1 byte if buf[i] & 0x80 == 0x00: character = buf[i].to_bytes(1, "big").decode("utf-8", "ignore") - strings.append([character, i]) + character_and_index.append([character, i, 1]) # for 2 bytes elif buf[i] & 0xE0 == 0xC0: temp = buf[i] << 8 | buf[i + 1] character = temp.to_bytes(2, "big").decode("utf-8", "ignore") i += 1 - strings.append([character, i]) + character_and_index.append([character, i, 2]) # for 3 bytes elif buf[i] & 0xF0 == 0xE0: temp = buf[i] << 16 | buf[i + 1] << 8 | buf[i + 2] character = temp.to_bytes(3, "big").decode("utf-8", "ignore") i += 2 - strings.append([character, i]) + character_and_index.append([character, i, 3]) # for 4 bytes elif buf[i] & 0xF8 == 0xF0: temp = buf[i] << 24 | buf[i + 1] << 16 | buf[i + 2] << 8 | buf[i + 3] character = temp.to_bytes(4, "big").decode("utf-8", "ignore") i += 3 - strings.append([character, i]) + character_and_index.append([character, i, 4]) prev = False - for i in range(0, len(strings)): - if strings[i][0].isprintable() == True: + for i in range(0, len(character_and_index)): + if character_and_index[i][0].isprintable() == True: if prev == False: - strings.append([strings[i][0], strings[i][1]]) + strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]]) prev = True else: - strings[-1][0] += strings[i][0] - strings[-1][1] = strings[i][1] + strings[-1][0] += character_and_index[i][0] + strings[-1][2] = character_and_index[i][1] else: prev = False # filter strings less than min length strings = [string for string in strings if len(string[0]) >= min_length] - print(strings) - return strings From 60b3ca6089c4d7fdfea3b81bda86fd4ef41a5669 Mon Sep 17 00:00:00 2001 From: Arker123 Date: Sun, 23 Jun 2024 20:44:14 +0530 Subject: [PATCH 12/16] Add tests --- tests/test_utf8_decoder.py | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 tests/test_utf8_decoder.py diff --git a/tests/test_utf8_decoder.py b/tests/test_utf8_decoder.py new file mode 100644 index 000000000..963607b73 --- /dev/null +++ b/tests/test_utf8_decoder.py @@ -0,0 +1,30 @@ +import pathlib + +import pytest + +from floss.results import StaticString, StringEncoding +from floss.language.rust.extract import extract_rust_strings + + +@pytest.fixture(scope="module") +def rust_strings64(): + n = 1 + path = pathlib.Path(__file__).parent / "data" / "language" / "rust" / "rust-hello" / "bin" / "rust-hello64.exe" + return extract_rust_strings(path, n) + + +@pytest.mark.parametrize( + "string,offset,encoding,rust_strings", + [ + # For 1 character strings + pytest.param("Hello, world!", 0xBB030, StringEncoding.UTF8, "rust_strings64"), + # For 2 character strings + pytest.param("۶ж̶ƶ", 0xC73E3, StringEncoding.UTF8, "rust_strings64"), + # For 3 character strings + pytest.param("jd8n8n헧??", 0xD3CE2, StringEncoding.UTF8, "rust_strings64"), + # For 4 character strings + pytest.param("&ޓޓttt", 0xD41F8, StringEncoding.UTF8, "rust_strings64"), + ], +) +def test_utf8_decoder(request, string, offset, encoding, rust_strings): + assert StaticString(string=string, offset=offset, encoding=encoding) in request.getfixturevalue(rust_strings) From 272770df5d49a4c9f54902e29ff1f6f780753ed6 Mon Sep 17 00:00:00 2001 From: Arnav Kharbanda <94680887+Arker123@users.noreply.github.com> Date: Mon, 24 Jun 2024 10:22:29 +0530 Subject: [PATCH 13/16] Update floss/language/rust/decode_utf8.py Co-authored-by: Vasco Schiavo <115561717+VascoSch92@users.noreply.github.com> --- floss/language/rust/decode_utf8.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py index 11983fe62..8d4922d1b 100644 --- a/floss/language/rust/decode_utf8.py +++ b/floss/language/rust/decode_utf8.py @@ -76,7 +76,7 @@ def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[List[A return strings -def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[List[Any]]: +def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Optional[List[Any]]]: """ Extracts UTF-8 strings from the .rdata section of a PE file. """ From 770955c58c9bcdceea8c4ae69a35b0183da85060 Mon Sep 17 00:00:00 2001 From: Arker123 Date: Mon, 24 Jun 2024 10:31:02 +0530 Subject: [PATCH 14/16] Tweaks --- floss/language/rust/decode_utf8.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py index 8d4922d1b..530b51e9d 100644 --- a/floss/language/rust/decode_utf8.py +++ b/floss/language/rust/decode_utf8.py @@ -76,14 +76,14 @@ def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[List[A return strings -def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[Optional[List[Any]]]: +def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[List[Any]]: """ Extracts UTF-8 strings from the .rdata section of a PE file. """ try: rdata_section = get_rdata_section(pe) except ValueError as e: - print("cannot extract rust strings: %s", e) + logger.error("cannot extract rust strings: %s", e) return [] buf = pe.get_memory_mapped_image()[ From a354b30f7e764bfb2ffe741ab7cb117f169149d5 Mon Sep 17 00:00:00 2001 From: Arnav Kharbanda <94680887+Arker123@users.noreply.github.com> Date: Wed, 26 Jun 2024 09:39:21 +0530 Subject: [PATCH 15/16] Update tests/test_language_rust_coverage.py Co-authored-by: Willi Ballenthin --- tests/test_language_rust_coverage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_language_rust_coverage.py b/tests/test_language_rust_coverage.py index b8668e5a7..b149dd740 100644 --- a/tests/test_language_rust_coverage.py +++ b/tests/test_language_rust_coverage.py @@ -54,4 +54,4 @@ def test_language_detection_64(binary_file): out = get_extract_stats(pe, all_ss_strings, rust_strings, n) # check that the output percentage is greater than 86% - assert float(out) > 86 # increase to 91 after merging PR #899 + assert float(out) > 86 # TODO(Arker123): increase to 91 after merging PR #899 From 960f2c09fd30298c2f870e52c0891a08ab4b3ad7 Mon Sep 17 00:00:00 2001 From: Arker123 Date: Wed, 26 Jun 2024 09:55:11 +0530 Subject: [PATCH 16/16] Several Refinements --- floss/language/rust/decode_utf8.py | 56 ++++++++++++++++++------------ 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/floss/language/rust/decode_utf8.py b/floss/language/rust/decode_utf8.py index 530b51e9d..124ae6495 100644 --- a/floss/language/rust/decode_utf8.py +++ b/floss/language/rust/decode_utf8.py @@ -1,72 +1,75 @@ # Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. import sys -import logging import pathlib import argparse from typing import Any, List, Tuple, Iterable, Optional +from collections import namedtuple import pefile -MIN_STR_LEN = 4 - -logger = logging.getLogger(__name__) +import floss.logging_ +from floss.language.utils import get_rdata_section +MIN_STR_LEN = 4 -def get_rdata_section(pe: pefile.PE) -> pefile.SectionStructure: - for section in pe.sections: - if section.Name.startswith(b".rdata\x00"): - return section - - raise ValueError("no .rdata section found") +logger = floss.logging_.getLogger(__name__) -def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[List[Any]]: +def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[List[Tuple[str, int, int]]]: """ Extracts UTF-8 strings from a buffer. """ # Reference: https://en.wikipedia.org/wiki/UTF-8 + character_info = namedtuple("character_info", ["character", "position", "length"]) character_and_index = [] - strings = [] for i in range(0, len(buf)): # for 1 byte if buf[i] & 0x80 == 0x00: + # ignore is used below because decode function throws an exception + # when there is an character where the if condition is satisfied but it is not a valid utf-8 character character = buf[i].to_bytes(1, "big").decode("utf-8", "ignore") - character_and_index.append([character, i, 1]) + character_and_index.append(character_info(character, i, 1)) # for 2 bytes elif buf[i] & 0xE0 == 0xC0: temp = buf[i] << 8 | buf[i + 1] character = temp.to_bytes(2, "big").decode("utf-8", "ignore") i += 1 - character_and_index.append([character, i, 2]) + character_and_index.append(character_info(character, i, 2)) # for 3 bytes elif buf[i] & 0xF0 == 0xE0: temp = buf[i] << 16 | buf[i + 1] << 8 | buf[i + 2] character = temp.to_bytes(3, "big").decode("utf-8", "ignore") i += 2 - character_and_index.append([character, i, 3]) + character_and_index.append(character_info(character, i, 3)) # for 4 bytes elif buf[i] & 0xF8 == 0xF0: temp = buf[i] << 24 | buf[i + 1] << 16 | buf[i + 2] << 8 | buf[i + 3] character = temp.to_bytes(4, "big").decode("utf-8", "ignore") i += 3 - character_and_index.append([character, i, 4]) + character_and_index.append(character_info(character, i, 4)) + + else: + logger.trace("Invalid UTF-8 character at offset %d", i) prev = False + strings = [] for i in range(0, len(character_and_index)): - if character_and_index[i][0].isprintable() == True: + if character_and_index[i].character.isprintable(): if prev == False: - strings.append([character_and_index[i][0], character_and_index[i][1], character_and_index[i][1]]) + strings.append( + [character_and_index[i].character, character_and_index[i].position, character_and_index[i].position] + ) prev = True else: - strings[-1][0] += character_and_index[i][0] - strings[-1][2] = character_and_index[i][1] + strings[-1][0] += character_and_index[i].character + strings[-1][2] = character_and_index[i].position else: prev = False @@ -76,7 +79,7 @@ def extract_utf8_strings_from_buffer(buf, min_length=MIN_STR_LEN) -> List[List[A return strings -def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[List[Any]]: +def extract_rdata_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[List[Tuple[str, int, int]]]: """ Extracts UTF-8 strings from the .rdata section of a PE file. """ @@ -93,6 +96,14 @@ def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[List[Any return strings +def extract_utf8_strings(pe: pefile.PE, min_length=MIN_STR_LEN) -> List[List[Tuple[str, int, int]]]: + """ + Extracts UTF-8 strings from a PE file. + """ + # Can be extended to extract strings from other sections + return extract_rdata_utf8_strings(pe, min_length) + + def main(argv=None): parser = argparse.ArgumentParser(description="Get Rust strings") parser.add_argument("path", help="file or path to analyze") @@ -106,13 +117,12 @@ def main(argv=None): ) args = parser.parse_args(args=argv) - logging.basicConfig(level=logging.DEBUG) - pe = pathlib.Path(args.path) buf = pe.read_bytes() pe = pefile.PE(data=buf, fast_load=True) strings = extract_utf8_strings(pe, args.min_length) + print(strings) for string in strings: print(string[0])