Skip to content

Commit 563bc06

Browse files
committed
Add file-level result caching system with tests
Signed-off-by: Om Ambole <omambole2007@gmail.com>
1 parent 022ddc8 commit 563bc06

File tree

3 files changed

+310
-0
lines changed

3 files changed

+310
-0
lines changed

src/scancode/cache_manager.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
#
3+
# Copyright (c) nexB Inc. and others. All rights reserved.
4+
# ScanCode is a trademark of nexB Inc.
5+
# Visit https://aboutcode.org and https://github.com/aboutcode-org/scancode-toolkit/
6+
#
7+
"""
8+
File-level result caching for faster repeated scans.
9+
"""
10+
11+
import hashlib
12+
import json
13+
import os
14+
from pathlib import Path
15+
16+
from commoncode import fileutils
17+
scancode_version = "dev"
18+
19+
class ResultCache:
20+
"""
21+
Manages cached scan results for files based on content hash.
22+
"""
23+
24+
def __init__(self, cache_dir=None):
25+
"""
26+
Initialize cache manager.
27+
28+
Args:
29+
cache_dir: Custom cache directory path. If None, uses default.
30+
"""
31+
if cache_dir:
32+
self.cache_dir = Path(cache_dir)
33+
else:
34+
# Use platform-appropriate cache directory
35+
cache_base = Path.home() / '.cache' / 'scancode'
36+
self.cache_dir = cache_base / 'file_results'
37+
38+
self.cache_dir.mkdir(parents=True, exist_ok=True)
39+
self.stats = {'hits': 0, 'misses': 0}
40+
41+
def _get_file_hash(self, file_path, scan_options):
42+
"""
43+
Generate unique hash for file + scan configuration.
44+
45+
Args:
46+
file_path: Path to file being scanned
47+
scan_options: Dict of enabled scan options (e.g., {'license': True})
48+
49+
Returns:
50+
SHA256 hex digest string
51+
"""
52+
hasher = hashlib.sha256()
53+
54+
# Hash file content
55+
with open(file_path, 'rb') as f:
56+
hasher.update(f.read())
57+
58+
# Hash scan configuration to invalidate on option changes
59+
config_str = f"{scancode_version}:{sorted(scan_options.items())}"
60+
hasher.update(config_str.encode('utf-8'))
61+
62+
return hasher.hexdigest()
63+
64+
def get_cached_result(self, file_path, scan_options):
65+
"""
66+
Retrieve cached scan result if available.
67+
68+
Args:
69+
file_path: Path to file
70+
scan_options: Dict of scan options
71+
72+
Returns:
73+
Dict with scan results or None if not cached
74+
"""
75+
file_hash = self._get_file_hash(file_path, scan_options)
76+
cache_file = self.cache_dir / f"{file_hash}.json"
77+
78+
if cache_file.exists():
79+
try:
80+
with open(cache_file, 'r') as f:
81+
self.stats['hits'] += 1
82+
return json.load(f)
83+
except (json.JSONDecodeError, IOError):
84+
# Corrupted cache, remove it
85+
cache_file.unlink(missing_ok=True)
86+
self.stats['misses'] += 1
87+
return None
88+
89+
self.stats['misses'] += 1
90+
return None
91+
92+
def store_result(self, file_path, scan_options, result):
93+
"""
94+
Store scan result in cache.
95+
96+
Args:
97+
file_path: Path to scanned file
98+
scan_options: Dict of scan options used
99+
result: Scan result dict to cache
100+
"""
101+
file_hash = self._get_file_hash(file_path, scan_options)
102+
cache_file = self.cache_dir / f"{file_hash}.json"
103+
104+
try:
105+
with open(cache_file, 'w') as f:
106+
json.dump(result, f)
107+
except IOError as e:
108+
# Don't fail scan if cache write fails
109+
print(f"Warning: Failed to write cache: {e}")
110+
111+
def clear_cache(self):
112+
"""Remove all cached result files."""
113+
for file in self.cache_dir.glob("*.json"):
114+
try:
115+
file.unlink()
116+
except Exception:
117+
pass
118+
119+
def get_stats(self):
120+
"""Return cache statistics."""
121+
total = self.stats['hits'] + self.stats['misses']
122+
hit_rate = (self.stats['hits'] / total * 100) if total > 0 else 0
123+
return {
124+
'hits': self.stats['hits'],
125+
'misses': self.stats['misses'],
126+
'hit_rate_percent': round(hit_rate, 1)
127+
}

src/scancode/plugin_cache.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
#
3+
# Copyright (c) nexB Inc. and others. All rights reserved.
4+
#
5+
"""
6+
CLI plugin to enable result caching.
7+
"""
8+
9+
import click
10+
11+
from plugincode.scan import ScanPlugin
12+
from plugincode.scan import scan_impl
13+
from scancode.cache_manager import ResultCache
14+
15+
16+
@scan_impl
17+
class CachePlugin(ScanPlugin):
18+
"""
19+
Enable file-level result caching for faster repeated scans.
20+
"""
21+
22+
options = [
23+
click.Option(
24+
['--cache'],
25+
is_flag=True,
26+
default=False,
27+
help='Enable result caching for faster repeated scans.',
28+
),
29+
click.Option(
30+
['--cache-dir'],
31+
type=click.Path(exists=False, file_okay=False, dir_okay=True),
32+
metavar='DIR',
33+
help='Custom directory for cache storage. '
34+
'Default: ~/.cache/scancode/file_results',
35+
),
36+
click.Option(
37+
['--force-reindex'],
38+
is_flag=True,
39+
default=False,
40+
help='Ignore cache and perform full rescan of all files.',
41+
),
42+
]
43+
44+
def is_enabled(self, cache, **kwargs):
45+
return cache
46+
47+
def setup(self, **kwargs):
48+
"""
49+
Initialize cache manager for the scan.
50+
"""
51+
pass
52+
53+
54+
def get_cache_manager(cache, cache_dir, **kwargs):
55+
"""
56+
Factory function to get cache manager instance.
57+
58+
Args:
59+
cache: Boolean, whether caching is enabled
60+
cache_dir: Custom cache directory path
61+
62+
Returns:
63+
ResultCache instance or None if caching disabled
64+
"""
65+
if not cache:
66+
return None
67+
68+
return ResultCache(cache_dir=cache_dir)
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# SPDX-License-Identifier: Apache-2.0
2+
#
3+
# Copyright (c) nexB Inc. and others. All rights reserved.
4+
#
5+
"""
6+
Tests for result caching functionality.
7+
"""
8+
9+
import json
10+
import tempfile
11+
from pathlib import Path
12+
13+
import pytest
14+
15+
from scancode.cache_manager import ResultCache
16+
17+
18+
class TestResultCache:
19+
20+
def test_cache_stores_and_retrieves_results(self):
21+
"""Test basic cache store and retrieval."""
22+
with tempfile.TemporaryDirectory() as tmpdir:
23+
cache = ResultCache(cache_dir=tmpdir)
24+
25+
# Create a test file
26+
test_file = Path(tmpdir) / 'test.py'
27+
test_file.write_text('# Test file\nprint("hello")')
28+
29+
scan_options = {'license': True, 'copyright': False}
30+
result = {'licenses': ['MIT'], 'path': str(test_file)}
31+
32+
# Store result
33+
cache.store_result(test_file, scan_options, result)
34+
35+
# Retrieve result
36+
cached = cache.get_cached_result(test_file, scan_options)
37+
38+
assert cached is not None
39+
assert cached['licenses'] == ['MIT']
40+
assert cache.stats['hits'] == 1
41+
42+
def test_cache_miss_on_file_change(self):
43+
"""Test cache invalidation when file content changes."""
44+
with tempfile.TemporaryDirectory() as tmpdir:
45+
cache = ResultCache(cache_dir=tmpdir)
46+
test_file = Path(tmpdir) / 'test.py'
47+
48+
# First content and scan
49+
test_file.write_text('# Version 1')
50+
scan_options = {'license': True}
51+
result1 = {'version': 1}
52+
cache.store_result(test_file, scan_options, result1)
53+
54+
# Modify file
55+
test_file.write_text('# Version 2')
56+
57+
# Should be cache miss
58+
cached = cache.get_cached_result(test_file, scan_options)
59+
assert cached is None
60+
assert cache.stats['misses'] == 1
61+
62+
def test_cache_miss_on_scan_options_change(self):
63+
"""Test cache invalidation when scan options change."""
64+
with tempfile.TemporaryDirectory() as tmpdir:
65+
cache = ResultCache(cache_dir=tmpdir)
66+
test_file = Path(tmpdir) / 'test.py'
67+
test_file.write_text('# Test')
68+
69+
# First scan with license
70+
options1 = {'license': True}
71+
cache.store_result(test_file, options1, {'result': 1})
72+
73+
# Second scan with copyright added
74+
options2 = {'license': True, 'copyright': True}
75+
cached = cache.get_cached_result(test_file, options2)
76+
77+
assert cached is None # Should miss due to different options
78+
79+
def test_cache_statistics(self):
80+
"""Test cache statistics tracking."""
81+
with tempfile.TemporaryDirectory() as tmpdir:
82+
cache = ResultCache(cache_dir=tmpdir)
83+
test_file = Path(tmpdir) / 'test.py'
84+
test_file.write_text('# Test')
85+
86+
options = {'license': True}
87+
88+
# First scan: miss
89+
cache.get_cached_result(test_file, options)
90+
91+
# Store and retrieve: hit
92+
cache.store_result(test_file, options, {'data': 'test'})
93+
cache.get_cached_result(test_file, options)
94+
95+
stats = cache.get_stats()
96+
assert stats['hits'] == 1
97+
assert stats['misses'] == 1
98+
assert stats['hit_rate_percent'] == 50.0
99+
100+
def test_clear_cache(self):
101+
"""Test cache clearing functionality."""
102+
with tempfile.TemporaryDirectory() as tmpdir:
103+
cache = ResultCache(cache_dir=tmpdir)
104+
test_file = Path(tmpdir) / 'test.py'
105+
test_file.write_text('# Test')
106+
107+
options = {'license': True}
108+
cache.store_result(test_file, options, {'data': 'test'})
109+
110+
# Clear cache
111+
cache.clear_cache()
112+
113+
# Should be miss after clear
114+
cached = cache.get_cached_result(test_file, options)
115+
assert cached is None

0 commit comments

Comments
 (0)