Skip to content

Commit

Permalink
Merge pull request #223 from investigativedata/develop
Browse files Browse the repository at this point in the history
v0.5.2
  • Loading branch information
simonwoerpel authored Mar 14, 2024
2 parents a2028c1 + ac7a43f commit 2a1200e
Show file tree
Hide file tree
Showing 32 changed files with 1,512 additions and 1,684 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.5.1
current_version = 0.5.2
commit = True
tag = True
sign_tags = True
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ jobs:
- name: set PY
run: echo "PY=$(python -VV | sha256sum | cut -d' ' -f1)" >> $GITHUB_ENV
- name: Set up poetry cache
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: .venv
key: venv-${{ runner.os }}-${{ env.PY }}-${{ hashFiles('**/poetry.lock') }}
- name: Ensure cache is healthy
if: steps.cache.outputs.cache-hit == 'true'
run: poetry run pip --version >/dev/null 2>&1 || rm -rf .venv
- name: Set up pre-commit cache
uses: actions/cache@v3
uses: actions/cache@v4
with:
path: ~/.cache/pre-commit
key: pre-commit-${{ runner.os }}-${{ env.PY }}-${{ hashFiles('.pre-commit-config.yaml') }}
Expand Down
5 changes: 0 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ COPY README.md /investigraph/

RUN pip install -q /investigraph

COPY docker-entrypoint.sh /docker-entrypoint.sh
RUN chmod +x /docker-entrypoint.sh

RUN mkdir -p /data/datasets
RUN mkdir -p /data/prefect
RUN chown -R 1000:1000 /data

Expand All @@ -37,5 +33,4 @@ ENV DEBUG=0

USER 1000
WORKDIR /data
ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["prefect server start"]
14 changes: 1 addition & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Using [prefect.io](https://www.prefect.io/) for ftm pipeline processing

## example datasets

There is a dedicated [repo](https://github.com/investigativedata/investigraph-datasets) for example datasets that can be used as a [Block](https://docs.prefect.io/2.10.11/concepts/blocks/) within the prefect.io deployment.
There is a dedicated [repo](https://github.com/investigativedata/investigraph-datasets) for example datasets built with investigraph.

## deployment

Expand All @@ -49,18 +49,6 @@ Quick run a local dataset definition:

investigraph run -c ./path/to/config.yml

Register a local datasets block:

investigraph add-block -b local-file-system/investigraph-local -u ./datasets

Register github datasets block:

investigraph add-block -b github/investigraph-datasets -u https://github.com/investigativedata/investigraph-datasets.git

Run a dataset pipeline from a dataset defined in a registered block:

investigraph run -d ec_meetings -b github/investigraph-datasets

View prefect dashboard:

make server
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.5.1
0.5.2
3 changes: 0 additions & 3 deletions benchmark.sh

This file was deleted.

20 changes: 0 additions & 20 deletions docker-entrypoint.sh

This file was deleted.

9 changes: 4 additions & 5 deletions investigraph/cache.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import logging
from collections.abc import Iterable
from functools import cache
from typing import Any, Set
from typing import Any, Iterable, Set

import fakeredis
import redis
from anystore.util import make_data_checksum
from cachelib.serializers import RedisSerializer

from investigraph import settings
from investigraph.util import data_checksum

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,7 +39,7 @@ def __init__(self):
self.cache = con

def set(self, data: Any, key: str | None = None) -> str:
key = key or data_checksum(data)
key = key or make_data_checksum(data)
data = self.serializer.dumps(data)
self.cache.set(self.get_key(key), data)
return key
Expand All @@ -56,7 +55,7 @@ def get(self, key: str, delete: bool | None = DELETE) -> Any:

def sadd(self, *values: Iterable[Any], key: str | None = None) -> str:
values = [str(v) for v in values]
key = key or data_checksum(values)
key = key or make_data_checksum(values)
self.cache.sadd(self.get_key(key) + "#SET", *values)
return key

Expand Down
100 changes: 47 additions & 53 deletions investigraph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@

import orjson
import typer
from ftmq.io import smart_write
from anystore.io import smart_write
from ftmq.model import Catalog
from prefect.settings import PREFECT_HOME
from rich import print
from rich.console import Console
from rich.table import Table

from investigraph.inspect import inspect_config, inspect_extract, inspect_transform
from investigraph.model.block import get_block
from investigraph.model.flow import FlowOptions
from investigraph.pipeline import run
from investigraph.settings import DATASETS_BLOCK, DATASETS_REPO, VERSION
from investigraph.settings import VERSION

cli = typer.Typer(no_args_is_help=True)
console = Console()


@cli.callback(invoke_without_command=True)
Expand All @@ -30,9 +32,10 @@ def cli_version(

@cli.command("run")
def cli_run(
dataset: Annotated[Optional[str], typer.Option("-d")] = None,
block: Annotated[Optional[str], typer.Option("-b")] = None,
config: Annotated[Optional[str], typer.Option("-c")] = None,
config: Annotated[
str,
typer.Option("-c", help="Any local or remote json or yaml uri"),
],
index_uri: Annotated[Optional[str], typer.Option(...)] = None,
fragments_uri: Annotated[Optional[str], typer.Option(...)] = None,
entities_uri: Annotated[Optional[str], typer.Option(...)] = None,
Expand All @@ -43,8 +46,6 @@ def cli_run(
Execute a dataset pipeline
"""
options = FlowOptions(
dataset=dataset,
block=block,
config=config,
index_uri=index_uri,
fragments_uri=fragments_uri,
Expand All @@ -55,66 +56,59 @@ def cli_run(
run(options)


@cli.command("add-block")
def cli_add_block(
block: Annotated[
str,
typer.Option(
"-b",
prompt=f"Datasets configuration block, for example: {DATASETS_BLOCK}",
),
],
uri: Annotated[
str, typer.Option("-u", prompt=f"Block source uri, example: {DATASETS_REPO}")
],
):
"""
Configure a datasets block (currently only github and local filesystem supported.)
"""
block = get_block(block)
try:
block.register(uri)
print(f"[bold green]OK[/bold green] block `{block}` created.")
except ValueError as e:
if "already in use" in str(e):
print(f"[bold red]Error[/bold red] block `{block}` already existing.")
else:
raise e


@cli.command("inspect")
def cli_inspect(
config_path: Annotated[Path, typer.Argument()],
extract: Annotated[Optional[bool], typer.Option()] = False,
transform: Annotated[Optional[bool], typer.Option()] = False,
extract: Annotated[Optional[bool], typer.Option("-e", "--extract")] = False,
transform: Annotated[Optional[bool], typer.Option("-t", "--transform")] = False,
limit: Annotated[Optional[int], typer.Option("-l", "--limit")] = 5,
to_csv: Annotated[Optional[bool], typer.Option()] = False,
to_json: Annotated[Optional[bool], typer.Option()] = False,
usecols: Annotated[
Optional[str],
typer.Option(
"-c",
"--usecols",
help="Comma separated list of column names or ix to display",
),
] = None,
):
config = inspect_config(config_path)
print(f"[bold green]OK[/bold green] `{config_path}`")
print(f"[bold]dataset:[/bold] {config.dataset.name}")
print(f"[bold]title:[/bold] {config.dataset.title}")
if not to_json and not to_csv:
print(f"[bold green]OK[/bold green] `{config_path}`")
print(f"[bold]dataset:[/bold] {config.dataset.name}")
print(f"[bold]title:[/bold] {config.dataset.title}")

if extract:
for name, df in inspect_extract(config):
print(f"[bold green]OK[/bold green] {name}")
for name, df in inspect_extract(config, limit):
if usecols:
df = df[[c for c in usecols.split(",") if c in df.columns]]
if not to_json and not to_csv:
print(f"[bold green]OK[/bold green] {name}")
if to_json:
for _, row in df.iterrows():
print(
orjson.dumps(
row.to_dict(), option=orjson.OPT_APPEND_NEWLINE
).decode()
typer.echo(
orjson.dumps(row.to_dict(), option=orjson.OPT_APPEND_NEWLINE)
)
elif to_csv:
df.to_csv(sys.stdout, index=False)
else:
print(df.to_markdown(index=False))
table = Table(*df.columns.map(str))
df = df.fillna("").map(str)
for _, row in df.iterrows():
table.add_row(*row.values)
console.print(table)

if transform:
for name, proxies in inspect_transform(config):
print(f"[bold green]OK[/bold green] {name}")
for name, proxies in inspect_transform(config, limit):
if not to_json:
print(f"[bold green]OK[/bold green] {name}")
for proxy in proxies:
data = orjson.dumps(
proxy.to_dict(), option=orjson.OPT_APPEND_NEWLINE
).decode()
print(data)
data = proxy.to_dict()
if not to_json:
print(data)
else:
typer.echo(orjson.dumps(data))


@cli.command("build-catalog")
Expand Down
4 changes: 0 additions & 4 deletions investigraph/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,5 @@ class ImproperlyConfigured(Exception):
pass


class BlockError(Exception):
pass


class DataError(Exception):
pass
41 changes: 23 additions & 18 deletions investigraph/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,37 @@
Inspect dataset pipelines interactively
"""


from typing import Any, Generator
from typing import Any, Generator, Iterable

import pandas as pd
from nomenklatura.entity import CE
from rich import print

from investigraph.model import Resolver
from investigraph.model.config import Config, get_config
from investigraph.model.context import Context, init_context
from investigraph.model.context import BaseContext, Context
from investigraph.util import PathLike


def print_error(msg: str):
print(f"[bold red]ERROR[/bold red] {msg}")


def get_records(ctx: Context) -> list[dict[str, Any]]:
def get_records(ctx: Context, limit: int | None = 5) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
print("Extracting `%s` ..." % ctx.source.uri)
# print("Extracting `%s` ..." % ctx.source.uri)
res = Resolver(source=ctx.source)
if res.source.is_http and ctx.config.extract.fetch:
res._resolve_http()
for rec in ctx.config.extract.handle(ctx, res):
records.append(rec)
if len(records) == 5:
if len(records) == limit:
return records
return records


def inspect_config(p: PathLike) -> Config:
config = get_config(path=p)
config = get_config(p)
try:
if not callable(config.extract.get_handler()):
print_error(f"module not found or not callable: `{config.extract.handler}`")
Expand All @@ -54,24 +53,30 @@ def inspect_config(p: PathLike) -> Config:
return config


def inspect_extract(config: Config) -> Generator[tuple[str, pd.DataFrame], None, None]:
def inspect_extract(
config: Config, limit: int | None = 5
) -> Generator[tuple[str, pd.DataFrame], None, None]:
"""
Preview fetched & extracted records in tabular format
"""
for source in config.extract.sources:
ctx = init_context(config, source)
df = pd.DataFrame(get_records(ctx))
yield source.name, df
ctx = BaseContext.from_config(config)
for ix, sctx in enumerate(ctx.from_sources(), 1):
df = pd.DataFrame(get_records(sctx, limit))
yield sctx.source.name, df
if ix == limit:
return


def inspect_transform(config: Config) -> Generator[tuple[str, CE], None, None]:
def inspect_transform(
config: Config, limit: int | None = 5
) -> Generator[tuple[str, Iterable[CE]], None, None]:
"""
Preview first proxies
"""
for source in config.extract.sources:
ctx = init_context(config, source)
ctx = BaseContext.from_config(config)
for ix, sctx in enumerate(ctx.from_sources(), 1):
proxies: list[CE] = []
for ix, rec in enumerate(get_records(ctx)):
for proxy in ctx.config.transform.handle(ctx, rec, ix):
for ix, rec in enumerate(get_records(sctx, limit)):
for proxy in sctx.config.transform.handle(sctx, rec, ix):
proxies.append(proxy)
yield source.name, proxies
yield sctx.source.name, proxies
Loading

0 comments on commit 2a1200e

Please sign in to comment.