From e82a940ebed5f00cdb731210ccd5f06bea517838 Mon Sep 17 00:00:00 2001 From: jsn-sekoia Date: Thu, 29 Aug 2024 18:29:33 +0200 Subject: [PATCH 1/2] feat: upgrade unpacker.py to fix UPX header if needed --- unpacker.py | 58 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 4 deletions(-) diff --git a/unpacker.py b/unpacker.py index 6540ebf..4e04a90 100644 --- a/unpacker.py +++ b/unpacker.py @@ -30,20 +30,70 @@ def execute(self, request): request.result.add_section(ResultSection(f"{os.path.basename(uresult.displayname)} successfully unpacked!", body=caveat_msg)) - def _unpack_upx(self, packedfile, outputpath, displayname): - # Test the file to see if UPX agrees with our identification. + def _check_upx(self, packedfile): + # Test the file to see if UPX agrees with our identification stdout, stderr = Popen((self.upx_exe, '-t', packedfile), stdout=PIPE, stderr=PIPE).communicate() + return (stdout, stderr) + + def _fix_p_info(self, packedfile) + stream = open(packedfile, 'r+b') + buff = stream.read() + upx_index = buff.find(b'UPX!') + if upx_index == -1: + self.log.info('Could not find UPX l_info struct') + return None + l_info_offset = upx_index - 4 + if l_info_offset < 0: + self.log.info('Invalid l_info_offset') + return None + p_filesize = buff[l_info_offset + 16 : l_info_offset + 20] + p_blocksize = buff[l_info_offset + 20 : l_info_offset + 24] + stream.seek(-12, os.SEEK_END) + filesize = stream.read(4) + # check p_filesize and p_blocksize of packed file, if corrupt (set to 0), fix it by set the real filesize value + if p_blocksize == b'\x00\x00\x00\x00' or p_filesize == b'\x00\x00\x00\x00': + self.log.info('UPX altered p_blocksize / p_filesize - try to restore') + stream.seek(l_info_offset + 16) + stream.write(filesize) + stream.seek(l_info_offset + 20) + stream.write(filesize) + stream.close() - if b'[OK]' in stdout and b'Tested 1 file' in stdout: + def _unpack_upx(self, packedfile, outputpath, displayname): + i = 0 + # run check in while to recheck after fix + # call fix function if UPX return p_info corrupted + # used a counter to avoid infinite loop + while i < 2: + check_stdout, check_stderr = self._check_upx(packedfile) + if b'[OK]' in check_stdout and b'Tested 1 file' in check_stdout: + check = True + break + elif b'p_info corrupted' in check_stderr: + check = False + self.log.info('UPX File p_info corrupted') + self._fix_p_info(packedfile) + i = i+1 + else: + check = False + break + + # Check is True when UPX -t validates file - we could try to unpack + if check is True: stdout, stderr = Popen((self.upx_exe, '-d', '-o', outputpath, packedfile), stdout=PIPE, stderr=PIPE).communicate() if b'Unpacked 1 file' in stdout: # successfully unpacked. return UnpackResult(True, outputpath, displayname, {'stdout': stdout[:1024]}) + else: + # add a return if unpacking didn't work + self.log.info(f'UPX extractor failed to unpack file:\n{stderr[:1024]}\n{stderr[:1024]}') + + # Check is True when UPX -t validates file - we could try to unpack else: - self.log.info(f'UPX extractor said this file was not UPX packed:\n{stderr[:1024]}\n{stderr[:1024]}') + self.log.info(f'UPX extractor said this file was not UPX packed:\n{check_stderr[:1024]}\n{check_stderr[:1024]}') # UPX unpacking is failure prone due to the number of samples that are identified as UPX # but are really some minor variant. For that reason we can't really fail the result # every time upx has problems with a file. From dfbe29bca2c84bc59d8d19f2d5802a0255c5f422 Mon Sep 17 00:00:00 2001 From: jsn-sekoia Date: Wed, 4 Sep 2024 16:13:31 +0200 Subject: [PATCH 2/2] fix: missing parenthesis --- unpacker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unpacker.py b/unpacker.py index 4e04a90..2adc33e 100644 --- a/unpacker.py +++ b/unpacker.py @@ -36,7 +36,7 @@ def _check_upx(self, packedfile): stdout=PIPE, stderr=PIPE).communicate() return (stdout, stderr) - def _fix_p_info(self, packedfile) + def _fix_p_info(self, packedfile): stream = open(packedfile, 'r+b') buff = stream.read() upx_index = buff.find(b'UPX!')