Skip to content

Commit f151c49

Browse files
committed
feat: Add extract method to reverse the ingest method.
- Especially useful for extracting code files from LLM Output
1 parent 4e259a0 commit f151c49

File tree

5 files changed

+337
-3
lines changed

5 files changed

+337
-3
lines changed

src/gitingest/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Gitingest: A package for ingesting data from Git repositories."""
22

33
from gitingest.entrypoint import ingest, ingest_async
4+
from gitingest.extract import extract
45

5-
__all__ = ["ingest", "ingest_async"]
6+
__all__ = ["ingest", "ingest_async", "extract"]

src/gitingest/__main__.py

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from gitingest.config import MAX_FILE_SIZE, OUTPUT_FILE_NAME
1313
from gitingest.entrypoint import ingest_async
14+
from gitingest.extract import extract
1415

1516
# Import logging configuration first to intercept all logging
1617
from gitingest.utils.logging_config import get_logger
@@ -31,7 +32,34 @@ class _CLIArgs(TypedDict):
3132
output: str | None
3233

3334

34-
@click.command()
35+
class DefaultGroup(click.Group):
36+
"""A Click Group that invokes a default command if a subcommand is not found."""
37+
38+
def parse_args(self, ctx, args):
39+
if args and args[0] in ["--help", "-h"]:
40+
return super().parse_args(ctx, args)
41+
42+
if not args or args[0] not in self.commands:
43+
# Default to ingest command
44+
# Insert "ingest" as the first argument
45+
args = ["ingest"] + args
46+
47+
return super().parse_args(ctx, args)
48+
49+
50+
@click.group(cls=DefaultGroup)
51+
def main() -> None:
52+
"""Gitingest CLI tool.
53+
54+
The default command is 'ingest', which analyzes a directory or repository.
55+
Use 'gitingest ingest --help' to see options for the default command.
56+
57+
To extract files from a digest, use 'gitingest extract'.
58+
"""
59+
pass
60+
61+
62+
@main.command(name="ingest")
3563
@click.argument("source", type=str, default=".")
3664
@click.option(
3765
"--max-size",
@@ -76,7 +104,7 @@ class _CLIArgs(TypedDict):
76104
default=None,
77105
help="Output file path (default: digest.txt in current directory). Use '-' for stdout.",
78106
)
79-
def main(**cli_kwargs: Unpack[_CLIArgs]) -> None:
107+
def ingest_command(**cli_kwargs: Unpack[_CLIArgs]) -> None:
80108
"""Run the CLI entry point to analyze a repo / directory and dump its contents.
81109
82110
Parameters
@@ -114,6 +142,34 @@ def main(**cli_kwargs: Unpack[_CLIArgs]) -> None:
114142
asyncio.run(_async_main(**cli_kwargs))
115143

116144

145+
@main.command(name="extract")
146+
@click.argument("digest_file", type=click.Path(exists=True, dir_okay=False))
147+
@click.option(
148+
"--output",
149+
"-o",
150+
default=".",
151+
type=click.Path(file_okay=False, dir_okay=True),
152+
help="Output directory where files will be extracted.",
153+
)
154+
def extract_command(digest_file: str, output: str) -> None:
155+
"""Extract files from a gitingest digest file.
156+
157+
Parameters
158+
----------
159+
digest_file : str
160+
Path to the digest file.
161+
output : str
162+
Directory where extracted files will be saved.
163+
164+
"""
165+
try:
166+
extract(digest_file, output)
167+
click.echo(f"Successfully extracted files to '{output}'")
168+
except Exception as exc:
169+
click.echo(f"Error extracting files: {exc}", err=True)
170+
raise click.Abort from exc
171+
172+
117173
async def _async_main(
118174
source: str,
119175
*,

src/gitingest/extract.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
"""Module for extracting files from a gitingest digest."""
2+
3+
from __future__ import annotations
4+
5+
import os
6+
import re
7+
from pathlib import Path
8+
9+
from gitingest.schemas.filesystem import SEPARATOR
10+
from gitingest.utils.logging_config import get_logger
11+
12+
logger = get_logger(__name__)
13+
14+
15+
def extract(digest_path: str | Path, output_dir: str | Path = ".") -> None:
16+
"""Extract files from a gitingest digest file.
17+
18+
Parameters
19+
----------
20+
digest_path : str | Path
21+
Path to the digest file.
22+
output_dir : str | Path
23+
Directory where extracted files will be saved.
24+
25+
"""
26+
digest_path = Path(digest_path)
27+
output_dir = Path(output_dir)
28+
29+
if not digest_path.exists():
30+
raise FileNotFoundError(f"Digest file not found: {digest_path}")
31+
32+
logger.info("Reading digest file", extra={"digest_path": str(digest_path)})
33+
with digest_path.open("r", encoding="utf-8") as f:
34+
content = f.read()
35+
36+
# Create the output directory if it doesn't exist
37+
output_dir.mkdir(parents=True, exist_ok=True)
38+
39+
# Regex to identify file blocks
40+
# Format:
41+
# ================================================
42+
# FILE: path/to/file
43+
# ================================================
44+
# content...
45+
separator_pattern = re.escape(SEPARATOR)
46+
pattern = re.compile(
47+
rf"^{separator_pattern}\n(FILE|SYMLINK): (.+)\n{separator_pattern}\n",
48+
re.MULTILINE,
49+
)
50+
51+
matches = list(pattern.finditer(content))
52+
53+
if not matches:
54+
logger.warning("No files found in the digest.")
55+
return
56+
57+
logger.info(f"Found {len(matches)} files to extract.")
58+
59+
for i, match in enumerate(matches):
60+
node_type = match.group(1)
61+
path_info = match.group(2).strip()
62+
63+
# Calculate content range
64+
start_idx = match.end()
65+
end_idx = matches[i + 1].start() if i + 1 < len(matches) else len(content)
66+
67+
# Extract content and remove trailing newlines added during ingestion
68+
file_content = content[start_idx:end_idx]
69+
# The ingestion process adds "\n\n" to content_string and joins with "\n", so we expect 3 newlines.
70+
if file_content.endswith("\n\n\n"):
71+
file_content = file_content[:-3]
72+
73+
if node_type == "SYMLINK":
74+
# SYMLINK: source -> target
75+
if " -> " in path_info:
76+
link_path_str, target_path_str = path_info.split(" -> ", 1)
77+
link_full_path = output_dir / link_path_str
78+
79+
# Ensure parent dir exists
80+
link_full_path.parent.mkdir(parents=True, exist_ok=True)
81+
82+
# Create symlink
83+
# We need to be careful with existing files
84+
if link_full_path.exists() or link_full_path.is_symlink():
85+
link_full_path.unlink()
86+
87+
try:
88+
os.symlink(target_path_str, link_full_path)
89+
logger.debug(
90+
f"Created symlink: {link_full_path} -> {target_path_str}"
91+
)
92+
except OSError as e:
93+
logger.error(f"Failed to create symlink {link_full_path}: {e}")
94+
else:
95+
logger.warning(f"Invalid symlink format: {path_info}")
96+
97+
else:
98+
# FILE: path
99+
# path_info is the file path
100+
target_file_path = output_dir / path_info
101+
102+
# Ensure parent dir exists
103+
target_file_path.parent.mkdir(parents=True, exist_ok=True)
104+
105+
try:
106+
with target_file_path.open("w", encoding="utf-8") as f:
107+
f.write(file_content)
108+
logger.debug(f"Extracted: {target_file_path}")
109+
except OSError as e:
110+
logger.error(f"Failed to write file {target_file_path}: {e}")
111+
112+
logger.info("Extraction complete.")

tests/test_cli.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,34 @@ def test_cli_with_stdout_output() -> None:
9090
output_file.unlink()
9191

9292

93+
def test_cli_extract_file(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None:
94+
"""Test the extract command."""
95+
monkeypatch.chdir(tmp_path)
96+
97+
# Create a dummy digest file
98+
digest_content = (
99+
"Directory structure:\n"
100+
"└── test_file.py\n\n"
101+
"================================================\n"
102+
"FILE: test_file.py\n"
103+
"================================================\n"
104+
"print('hello world')\n\n\n"
105+
)
106+
digest_file = tmp_path / "digest.txt"
107+
digest_file.write_text(digest_content, encoding="utf-8")
108+
109+
# Run extract
110+
result = _invoke_isolated_cli_runner(["extract", str(digest_file), "-o", "."])
111+
112+
assert result.exit_code == 0, result.stderr
113+
assert "Successfully extracted files" in result.stdout
114+
115+
# Check if file was extracted
116+
extracted_file = tmp_path / "test_file.py"
117+
assert extracted_file.exists()
118+
assert extracted_file.read_text(encoding="utf-8") == "print('hello world')"
119+
120+
93121
def _invoke_isolated_cli_runner(args: list[str]) -> Result:
94122
"""Return a ``CliRunner`` that keeps ``stderr`` separate on Click 8.0-8.1."""
95123
kwargs = {}

tests/test_extract.py

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""Unit tests for the gitingest.extract module."""
2+
3+
from __future__ import annotations
4+
5+
import os
6+
import pytest
7+
from pathlib import Path
8+
9+
from gitingest.extract import extract
10+
from gitingest.schemas.filesystem import SEPARATOR
11+
12+
13+
def test_extract_basic_file(tmp_path: Path) -> None:
14+
"""Test basic extraction of a single text file."""
15+
digest_content = (
16+
"Directory structure:\n"
17+
"└── file1.txt\n\n"
18+
f"{SEPARATOR}\n"
19+
"FILE: file1.txt\n"
20+
f"{SEPARATOR}\n"
21+
"Hello, World!\n\n\n"
22+
)
23+
digest_file = tmp_path / "test_digest.txt"
24+
digest_file.write_text(digest_content, encoding="utf-8")
25+
26+
output_dir = tmp_path / "extracted_output"
27+
extract(digest_file, output_dir)
28+
29+
extracted_file = output_dir / "file1.txt"
30+
assert extracted_file.exists()
31+
assert extracted_file.read_text(encoding="utf-8") == "Hello, World!"
32+
33+
34+
def test_extract_to_specified_directory(tmp_path: Path) -> None:
35+
"""Test extraction to a custom output directory."""
36+
digest_content = (
37+
"Directory structure:\n"
38+
"└── sub/file.txt\n\n"
39+
f"{SEPARATOR}\n"
40+
"FILE: sub/file.txt\n"
41+
f"{SEPARATOR}\n"
42+
"Content in subfolder.\n\n\n"
43+
)
44+
digest_file = tmp_path / "custom_digest.txt"
45+
digest_file.write_text(digest_content, encoding="utf-8")
46+
47+
output_dir = tmp_path / "my_custom_output"
48+
extract(digest_file, output_dir)
49+
50+
extracted_file = output_dir / "sub" / "file.txt"
51+
assert extracted_file.exists()
52+
assert extracted_file.read_text(encoding="utf-8") == "Content in subfolder."
53+
54+
55+
def test_extract_empty_file(tmp_path: Path) -> None:
56+
"""Test extraction of an empty file placeholder."""
57+
digest_content = (
58+
"Directory structure:\n"
59+
"└── empty.txt\n\n"
60+
f"{SEPARATOR}\n"
61+
"FILE: empty.txt\n"
62+
f"{SEPARATOR}\n"
63+
"[Empty file]\n\n\n"
64+
)
65+
digest_file = tmp_path / "empty_digest.txt"
66+
digest_file.write_text(digest_content, encoding="utf-8")
67+
68+
output_dir = tmp_path / "output_empty"
69+
extract(digest_file, output_dir)
70+
71+
extracted_file = output_dir / "empty.txt"
72+
assert extracted_file.exists()
73+
assert extracted_file.read_text(encoding="utf-8") == "[Empty file]"
74+
75+
76+
def test_extract_binary_file_placeholder(tmp_path: Path) -> None:
77+
"""Test extraction of a binary file placeholder."""
78+
digest_content = (
79+
"Directory structure:\n"
80+
"└── image.png\n\n"
81+
f"{SEPARATOR}\n"
82+
"FILE: image.png\n"
83+
f"{SEPARATOR}\n"
84+
"[Binary file]\n\n\n"
85+
)
86+
digest_file = tmp_path / "binary_digest.txt"
87+
digest_file.write_text(digest_content, encoding="utf-8")
88+
89+
output_dir = tmp_path / "output_binary"
90+
extract(digest_file, output_dir)
91+
92+
extracted_file = output_dir / "image.png"
93+
assert extracted_file.exists()
94+
assert extracted_file.read_text(encoding="utf-8") == "[Binary file]"
95+
96+
97+
def test_extract_symlink(tmp_path: Path) -> None:
98+
"""Test extraction of a symlink."""
99+
# Create a target file first
100+
target_file = tmp_path / "target.txt"
101+
target_file.write_text("This is the target.", encoding="utf-8")
102+
103+
digest_content = (
104+
"Directory structure:\n"
105+
"├── target.txt\n"
106+
"└── link.txt -> target.txt\n\n"
107+
f"{SEPARATOR}\n"
108+
"FILE: target.txt\n"
109+
f"{SEPARATOR}\n"
110+
"This is the target.\n\n\n"
111+
f"{SEPARATOR}\n"
112+
"SYMLINK: link.txt -> target.txt\n"
113+
f"{SEPARATOR}\n"
114+
"\n\n" # Symlinks have empty content in the digest
115+
)
116+
digest_file = tmp_path / "symlink_digest.txt"
117+
digest_file.write_text(digest_content, encoding="utf-8")
118+
119+
output_dir = tmp_path / "output_symlink"
120+
extract(digest_file, output_dir)
121+
122+
extracted_symlink = output_dir / "link.txt"
123+
extracted_target = output_dir / "target.txt"
124+
125+
assert extracted_target.exists()
126+
assert extracted_target.read_text() == "This is the target."
127+
assert extracted_symlink.is_symlink()
128+
assert os.readlink(extracted_symlink) == str(Path("target.txt")) # Symlink target is relative
129+
130+
131+
def test_extract_file_not_found(tmp_path: Path) -> None:
132+
"""Test that FileNotFoundError is raised for a missing digest file."""
133+
non_existent_digest = tmp_path / "non_existent.txt"
134+
output_dir = tmp_path / "output_error"
135+
136+
with pytest.raises(FileNotFoundError, match="Digest file not found"):
137+
extract(non_existent_digest, output_dir)

0 commit comments

Comments
 (0)