Skip to content

Commit

Permalink
rw file by mmap
Browse files Browse the repository at this point in the history
  • Loading branch information
jstzwj committed Feb 25, 2025
1 parent cb4c880 commit f000f59
Showing 1 changed file with 63 additions and 41 deletions.
104 changes: 63 additions & 41 deletions src/olah/cache/olah_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.

import mmap
import os
import struct
import threading
Expand Down Expand Up @@ -139,19 +140,21 @@ def open(self, path: str, block_size: int = DEFAULT_BLOCK_SIZE):
if os.path.exists(path):
with self._header_lock:
with open(path, "rb") as f:
f.seek(0)
self.header = OlahCacheHeader.read(f)
with mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ) as mm:
mm.seek(0)
self.header = OlahCacheHeader.read(mm)
else:
with self._header_lock:
# Create new file
with open(path, "wb") as f:
f.seek(0)
self.header = OlahCacheHeader(
version=CURRENT_OLAH_CACHE_VERSION,
block_size=block_size,
file_size=0,
)
self.header.write(f)
with mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_WRITE) as mm:
mm.seek(0)
self.header = OlahCacheHeader(
version=CURRENT_OLAH_CACHE_VERSION,
block_size=block_size,
file_size=0,
)
self.header.write(mm)

self.is_open = True

Expand All @@ -169,10 +172,15 @@ def close(self):
self.is_open = False

def _flush_header(self):
if self.header is None:
raise Exception("The header of cache file is None")
if self.path is None:
raise Exception("The path of cache file is None")
with self._header_lock:
with open(self.path, "rb+") as f:
f.seek(0)
self.header.write(f)
with mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_WRITE) as mm:
mm.seek(0)
self.header.write(mm)

def _get_file_size(self) -> int:
with self._header_lock:
Expand Down Expand Up @@ -252,6 +260,9 @@ def read_block(self, block_index: int) -> Optional[bytes]:
if not self.is_open:
raise Exception("This file has been closed.")

if self.path is None:
raise Exception("The path of the cache file is None.")

if block_index >= self._get_block_number():
raise Exception("Invalid block index.")

Expand All @@ -264,26 +275,30 @@ def read_block(self, block_index: int) -> Optional[bytes]:

offset = self._get_header_size() + (block_index * self._get_block_size())
with open(self.path, "rb") as f:
f.seek(offset, 0)
raw_block = f.read(self._get_block_size())
# Prefetch blocks
for block_offset in range(1, self._prefech_blocks + 1):
if block_index + block_offset >= self._get_block_number():
break
if not self.has_block(block_index=block_index):
self._write_cache(block_index + block_offset, None)
else:
byte_offset = self._get_header_size() + ((block_index + block_offset) * self._get_block_size())
f.seek(byte_offset, 0)
prefetch_raw_block = f.read(self._get_block_size())
self._write_cache(block_index + block_offset, self._pad_block(prefetch_raw_block))
with mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ) as mm:
mm.seek(offset, 0)
raw_block = mm.read(self._get_block_size())
# Prefetch blocks
for block_offset in range(1, self._prefech_blocks + 1):
if block_index + block_offset >= self._get_block_number():
break
if not self.has_block(block_index=block_index):
self._write_cache(block_index + block_offset, None)
else:
byte_offset = self._get_header_size() + ((block_index + block_offset) * self._get_block_size())
mm.seek(byte_offset, 0)
prefetch_raw_block = mm.read(self._get_block_size())
self._write_cache(block_index + block_offset, self._pad_block(prefetch_raw_block))

block = self._pad_block(raw_block)
return block

def write_block(self, block_index: int, block_bytes: bytes) -> None:
if not self.is_open:
raise Exception("This file has been closed.")

if self.path is None:
raise Exception("The path of the cache file is None. ")

if block_index >= self._get_block_number():
raise Exception("Invalid block index.")
Expand All @@ -293,14 +308,15 @@ def write_block(self, block_index: int, block_bytes: bytes) -> None:

offset = self._get_header_size() + (block_index * self._get_block_size())
with open(self.path, "rb+") as f:
f.seek(offset)
if (block_index + 1) * self._get_block_size() > self._get_file_size():
real_block_bytes = block_bytes[
: self._get_file_size() - block_index * self._get_block_size()
]
else:
real_block_bytes = block_bytes
f.write(real_block_bytes)
with mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_WRITE) as mm:
mm.seek(offset)
if (block_index + 1) * self._get_block_size() > self._get_file_size():
real_block_bytes = block_bytes[
: self._get_file_size() - block_index * self._get_block_size()
]
else:
real_block_bytes = block_bytes
mm.write(real_block_bytes)

self._set_header_block(block_index)
self._flush_header()
Expand All @@ -312,6 +328,10 @@ def write_block(self, block_index: int, block_bytes: bytes) -> None:
def _resize_file_size(self, file_size: int):
if not self.is_open:
raise Exception("This file has been closed.")

if self.path is None:
raise Exception("The path of the cache file is None. ")

if file_size == self._get_file_size():
return
if file_size < self._get_file_size():
Expand All @@ -320,19 +340,21 @@ def _resize_file_size(self, file_size: int):
)

with open(self.path, "rb") as f:
f.seek(0, os.SEEK_END)
bin_size = f.tell()
with mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ) as mm:
mm.seek(0, os.SEEK_END)
bin_size = mm.tell()

# FIXME: limit the resize method, because it may influence the _block_mask
new_bin_size = self._get_header_size() + file_size
with open(self.path, "rb+") as f:
f.seek(new_bin_size - 1)
f.write(b'\0')
f.truncate()

# Extend file size (slow)
# f.seek(0, os.SEEK_END)
# f.write(b"\x00" * (new_bin_size - bin_size))
with mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_WRITE) as mm:
mm.seek(new_bin_size - 1)
mm.write(b'\0')
mm.truncate()

# Extend file size (slow)
# mm.seek(0, os.SEEK_END)
# mm.write(b"\x00" * (new_bin_size - bin_size))

def resize(self, file_size: int):
if not self.is_open:
Expand Down

0 comments on commit f000f59

Please sign in to comment.