Skip to content

Commit

Permalink
Fix: cache issues (#292)
Browse files Browse the repository at this point in the history
* restrict dev mode cache actions to 1

* Add check to open_tempdir in cache store that recreates tempdir if missing.

* fix cache_server arg tooltip

* add log cache to restart request monitoring task.
  • Loading branch information
saikonen authored Mar 20, 2024
1 parent 93be3c3 commit 0d4ac65
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 6 deletions.
6 changes: 4 additions & 2 deletions docker-compose.development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ services:
- PREFETCH_RUNS_SINCE=2592000 # 30 days in seconds
- PREFETCH_RUNS_LIMIT=1 # Prefetch only one run
- S3_NUM_WORKERS=2
- CACHE_ARTIFACT_MAX_ACTIONS=4
- CACHE_DAG_MAX_ACTIONS=4
- CACHE_ARTIFACT_MAX_ACTIONS=1
- CACHE_DAG_MAX_ACTIONS=1
- CACHE_LOG_MAX_ACTIONS=1
- CACHE_ARTIFACT_STORAGE_LIMIT=16000000
- CACHE_DAG_STORAGE_LIMIT=16000000
- WS_POSTPROCESS_CONCURRENCY_LIMIT=8
Expand Down Expand Up @@ -62,6 +63,7 @@ services:
volumes:
- ./services:/root/services
environment:
- LOGLEVEL=WARNING
- MF_METADATA_DB_HOST=db
- MF_METADATA_DB_PORT=5432
- MF_METADATA_DB_USER=postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def _error_callback(self, worker, res):
help="Maximum number of concurrent cache actions.")
@click.option("--max-size",
default=10000,
help="Maximum amount of disk space to use in MB.")
help="Maximum amount of disk space to use in bytes.")
def cli(root=None,
max_actions=None,
max_size=None):
Expand Down
11 changes: 9 additions & 2 deletions services/ui_backend_service/data/cache/client/cache_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,17 @@ def mark_for_deletion(path, size):
unmarked_size -= size

def ensure_path(self, path):
"Ensures that the directory for a given path exists, creating it if missing."
dirr = os.path.dirname(path)
self.ensure_dir(dirr)

def ensure_dir(self, dirr):
"Ensures that a directory exists, creating it if missing."
if not os.path.isdir(dirr):
try:
makedirs(dirr)
except Exception as ex:
self.warn(ex, "Could not create dir: %s" % path)
self.warn(ex, "Could not create dir: %s" % dirr)

def open_tempdir(self, token, action_name, stream_key):
self._gc_objects()
Expand All @@ -198,7 +203,9 @@ def open_tempdir(self, token, action_name, stream_key):
)

try:
tmp = tempfile.mkdtemp(prefix="cache_action_%s." % token, dir=self.tmproot)
self.ensure_dir(self.tmproot)
tmp = tempfile.mkdtemp(prefix='cache_action_%s.' % token,
dir=self.tmproot)
except Exception as ex:
msg = "Could not create a temp directory for request %s" % token
self.warn(ex, msg)
Expand Down
2 changes: 1 addition & 1 deletion services/ui_backend_service/data/cache/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def start_caches(self, app):

async def _monitor_restart_requests(self):
while True:
for _cache in [self.artifact_cache, self.dag_cache]:
for _cache in [self.artifact_cache, self.dag_cache, self.log_cache]:
if await _cache.restart_requested():
cache_name = type(_cache).__name__
logger.info("[{}] restart requested...".format(cache_name))
Expand Down

0 comments on commit 0d4ac65

Please sign in to comment.