Skip to content

Commit

Permalink
async compression
Browse files Browse the repository at this point in the history
  • Loading branch information
jstzwj committed Feb 25, 2025
1 parent 819d7e6 commit 62ded75
Showing 1 changed file with 43 additions and 22 deletions.
65 changes: 43 additions & 22 deletions src/olah/cache/olah_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

import asyncio
import lzma
import mmap
import os
Expand Down Expand Up @@ -266,18 +267,28 @@ async def read_block(self, block_index: int) -> Optional[bytes]:
with portalocker.Lock(block_path, "rb", flags=portalocker.LOCK_SH) as fh:
async with aiofiles.open(block_path, mode='rb') as f:
raw_block = await f.read(self._get_block_size())

# compression
if self.header.compression_algo == 0:
pass
elif self.header.compression_algo == 1:
raw_block = gzip.decompress(raw_block)
elif self.header.compression_algo == 2:
lzma_dec = lzma.LZMADecompressor()
raw_block = lzma_dec.decompress(raw_block)
else:
raise Exception("Unsupported compression algorithm.")


def decompression(block_data: bytes, compression_algo: int):
# compression
if compression_algo == 0:
return block_data
elif compression_algo == 1:
block_data = gzip.decompress(block_data)
elif compression_algo == 2:
lzma_dec = lzma.LZMADecompressor()
block_data = lzma_dec.decompress(block_data)
else:
raise Exception("Unsupported compression algorithm.")
return block_data

loop = asyncio.get_running_loop()
# Run in the default thread pool executor
raw_block = await loop.run_in_executor(
None, # Uses the default executor
decompression,
raw_block,
self.header.compression_algo
)

block = self._pad_block(raw_block)
return block
Expand Down Expand Up @@ -306,17 +317,27 @@ async def write_block(self, block_index: int, block_bytes: bytes) -> None:
else:
real_block_bytes = block_bytes

# Compression
if self.header.compression_algo == 0:
pass
elif self.header.compression_algo == 1:
real_block_bytes = gzip.compress(real_block_bytes, compresslevel=4)
elif self.header.compression_algo == 2:
lzma_enc = lzma.LZMACompressor()
real_block_bytes = lzma_enc.compress(real_block_bytes)
else:
raise Exception("Unsupported compression algorithm.")
def compression(block_data: bytes, compression_algo: int):
if compression_algo == 0:
return block_data
elif compression_algo == 1:
block_data = gzip.compress(block_data, compresslevel=4)
elif compression_algo == 2:
lzma_enc = lzma.LZMACompressor()
block_data = lzma_enc.compress(block_data)
else:
raise Exception("Unsupported compression algorithm.")
return block_data

loop = asyncio.get_running_loop()
# Run in the default thread pool executor
real_block_bytes = await loop.run_in_executor(
None, # Uses the default executor
compression,
real_block_bytes,
self.header.compression_algo
)

block_path = string.Template(self._data_path).substitute(block_index=f"{block_index:0>8}")

with portalocker.Lock(block_path, 'wb+', flags=portalocker.LOCK_EX) as fh:
Expand Down

0 comments on commit 62ded75

Please sign in to comment.