From 214b141628a721be8d55c57e6aa763aad411af5d Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 12 Sep 2025 16:52:22 -0700 Subject: [PATCH 01/30] Update contributing guidelines for CLI and Helm --- CONTRIBUTING.md | 149 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 CONTRIBUTING.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..8648d98 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,149 @@ +# Contributing + +# CLI + +## Install + +```sh +➜ git clone git@github.com:calypr/backup-service + +➜ cd backup-service + +➜ python3 -m venv venv && source venv/bin/activate + +➜ pip install -r requirements.txt + +➜ pip install -e . + +➜ which bak +./venv/bin/bak + +➜ bak --help +Usage: bak [OPTIONS] COMMAND [ARGS]... + +Options: + --version Show the version and exit. + -v, --verbose, --debug Enable debug logging. + --help Show this message and exit. + +Commands: + grip (gp) Commands for GRIP backups. + pg (pg) Commands for Postgres backups. + s3 Commands for S3. +``` + +## PostgreSQL + +| Command | Example | +|-------------|------------------| +| List Tables | `bak pg ls` | +| Backup | `bak pg dump` | +| Restore | `bak pg restore` | + +## GRIP + +| Command | Example | +|-------------|--------------------| +| List Graphs | `bak pg ls` | +| Backup | `bak grip backup` | +| Restore | `bak grip restore` | + +## S3 + +| Command | Example | +|--------------|-------------------| +| List backups | `bak pg ls` | +| Upload | `bak s3 upload` | +| Download | `bak s3 download` | + +# Helm + +```sh +➜ helm repo add ohsu https://ohsu-comp-bio.github.io/helm-charts +"ohsu" has been added to your repositories + +➜ helm repo update ohsu +Update Complete. ⎈Happy Helming!⎈ + +➜ helm search repo ohsu +NAME CHART VERSION APP VERSION DESCRIPTION +ohsu/backups 0.2.5 1.13.0 A Helm chart for Kubernetes + +➜ kubectl config current-context +kind-dev + +➜ kubectl create secret generic postgres-credentials --from-literal=postgres-password= --namespace backups + +➜ kubectl create secret generic s3-credentials --from-literal=AWS_ACCESS_KEY= --from-literal=AWS_SECRET_KEY= --namespace backups + +➜ helm upgrade --install backups ohsu/backups --create-namespace --namespace backups +Release "backups" has been upgraded. Happy Helming! + +➜ kubectl create job example-job --from=cronjob/backup-service-cronjob --namespace backups +job.batch/example-job created + +➜ kubectl get jobs -n backups +NAME STATUS COMPLETIONS DURATION +example-job Complete 1/1 11s + +➜ mc ls cbds/calypr-backups/calypr-dev +2025-09-12T23:10:29/ + +➜ mc ls cbds/calypr-backups/calypr-dev/2025-09-12T23:10:29/ +CALYPR.edges +CALYPR.vertices +CALYPR__schema__.edges +CALYPR__schema__.vertices +arborist_local.sql +fence_local.sql +gecko_local.sql +indexd_local.sql +postgres.sql +requestor_local.sql +``` + +* Steps to confirm backups in S3 bucket with mc + +```sh +➜ brew install minio-mc + +➜ which mc +/opt/homebrew/bin/mc + +➜ mc alias set example https://aced-storage.ohsu.edu +Enter Access Key: +Enter Secret Key: +Added `example` successfully. + +➜ mc alias ls example +cbds + URL : https://aced-storage.ohsu.edu + AccessKey : + SecretKey : + API : s3v4 + Path : auto + Src : $HOME/.mc/config.json + +➜ mc ls cbds/calypr-backups/calypr-dev/ +... +2025-09-12T02:00:01/ <---- Last timestamped backup + +➜ mc ls cbds/calypr-backups/calypr-dev/2025-09-12T02:00:01/ +160MiB CALYPR.edges <---- CALYPR edges +1.8GiB CALYPR.vertices <---- CALYPR vertices + 0B CALYPR__schema__.edges <---- Schema edges +1.4MiB CALYPR__schema__.vertices <---- Schema vertices +107KiB arborist_local.sql <---- Arborist +234KiB fence_local.sql <---- Fence +6.0KiB gecko_local.sql <---- Gecko + 21MiB indexd_local.sql <---- Indexd +9.6KiB metadata_local.sql <---- Metadata +2.9KiB postgres.sql <---- Postgres + 64KiB requestor_local.sql <---- Requestor +8.0KiB wts_local.sql <---- Workspace Token Service +``` + +# Known Limitations (Next Steps) ⚠️ + +- [ ] No clear, human-readable output of the path of the backup in S3 after a successful run +- [ ] Always uploads to calypr-dev even when using local k8s cluster From 3f1db206d1b9df7dc7b1ef1324b02c731dfcdbd3 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 12 Sep 2025 16:53:03 -0700 Subject: [PATCH 02/30] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8648d98..76adfeb 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -52,7 +52,7 @@ Commands: | Command | Example | |--------------|-------------------| -| List backups | `bak pg ls` | +| List backups | `bak s3 ls` | | Upload | `bak s3 upload` | | Download | `bak s3 download` | From c1cc3cd7592a7cf054879179723507f3033b6609 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 12 Sep 2025 16:53:15 -0700 Subject: [PATCH 03/30] Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 76adfeb..fe3116c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -44,7 +44,7 @@ Commands: | Command | Example | |-------------|--------------------| -| List Graphs | `bak pg ls` | +| List Graphs | `bak grip ls` | | Backup | `bak grip backup` | | Restore | `bak grip restore` | From 4744e0b5542d77c543b2e8a10f06dd6d41debbae Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Wed, 1 Oct 2025 16:21:55 -0700 Subject: [PATCH 04/30] docs: Add deployment Helm url to README.md --- CONTRIBUTING.md | 4 +- README.md | 103 ++++++++++++++++++++++++++---------------------- 2 files changed, 57 insertions(+), 50 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index fe3116c..8648d98 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -44,7 +44,7 @@ Commands: | Command | Example | |-------------|--------------------| -| List Graphs | `bak grip ls` | +| List Graphs | `bak pg ls` | | Backup | `bak grip backup` | | Restore | `bak grip restore` | @@ -52,7 +52,7 @@ Commands: | Command | Example | |--------------|-------------------| -| List backups | `bak s3 ls` | +| List backups | `bak pg ls` | | Upload | `bak s3 upload` | | Download | `bak s3 download` | diff --git a/README.md b/README.md index 3351be9..84368d9 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,26 @@ Data backup and recovery service for the CALYPR systems 🔄 # 2. Quick Start ⚡ +> [!TIP] +> The recommended use of the backup-service is through deploying to a K8s cluster for automated daily backups. + ```sh +➜ helm repo add ohsu https://ohsu-comp-bio.github.io/helm-charts + +➜ helm upgrade --install backups ohsu/backups +``` + +# 3. CLI + +> [!TIP] +> Manual backups (and restorations) can be done through the CLI + +```sh +➜ git clone git@github.com:calypr/backup-service.git +Cloning into 'backup-service'... + +➜ cd backup-service + ➜ python3 -m venv venv && source venv/bin/activate ➜ pip install -r requirements.txt @@ -49,40 +68,7 @@ Commands: upload local ➜ S3 ``` -# 3. Design + Examples 📐 - -```mermaid -sequenceDiagram - participant Backup as Backup Service - participant Database - participant S3 as S3 Bucket - - Title: Gen3 Backups - - Backup-->>Database: Database Credentials - - Note over Database: `pg_dump` - - Database-->>Backup: Database Backup - - Backup-->>S3: Database Backup -``` - -| Service | Postgres Database | Database Backup Name | Description | -| ---------------------- | ------------------- | ----------------------------- | ------------------------------------------------ | -| [Arborist][arborist] | `arborist-EXAMPLE` | `arborist-EXAMPLE-TIMESTAMP` | Gen3 policy engine | -| [Fence][fence] | `fence-EXAMPLE` | `fence-EXAMPLE-TIMESTAMP` | AuthN/AuthZ OIDC service | -| [Gecko][gecko] | `gecko-EXAMPLE` | `gecko-EXAMPLE-TIMESTAMP` | Frontend configurations for dynamic data loading | -| [Indexd][indexd] | `indexd-EXAMPLE` | `indexd-EXAMPLE-TIMESTAMP` | Data indexing and tracking service | -| [Requestor][requestor] | `requestor-EXAMPLE` | `requestor-EXAMPLE-TIMESTAMP` | Data access manager | - -[arborist]: https://github.com/uc-cdis/arborist -[fence]: https://github.com/uc-cdis/fence -[gecko]: https://github.com/aced-idp/gecko -[indexd]: https://github.com/uc-cdis/indexd -[requestor]: https://github.com/uc-cdis/requestor - -## Backup ⬆️ +## Backup ⬆ ### Postgres Dump: @@ -95,12 +81,6 @@ sequenceDiagram --dir DIR ``` -## ElasticSearch Backup: - -``` -➜ bak es backup -``` - ## GRIP Backup: ```sh @@ -118,7 +98,7 @@ sequenceDiagram --secret SECRET ``` -## Restore ⬇️ +## Restore ⬇ ### Postgres Restore: @@ -131,12 +111,6 @@ sequenceDiagram --dir DIR ``` -## ElasticSearch Restore: - -``` -➜ bak es restore -``` - ## GRIP Restore: ```sh @@ -154,7 +128,40 @@ sequenceDiagram --secret SECRET ``` -# 4. Alternatives 📖 +# 4. Design 📐 + +```mermaid +sequenceDiagram + participant Backup as Backup Service + participant Database + participant S3 as S3 Bucket + + Title: Gen3 Backups + + Backup-->>Database: Database Credentials + + Note over Database: `pg_dump` + + Database-->>Backup: Database Backup + + Backup-->>S3: Database Backup +``` + +| Service | Postgres Database | Database Backup Name | Description | +| ---------------------- | ------------------- | ----------------------------- | ------------------------------------------------ | +| [Arborist][arborist] | `arborist-EXAMPLE` | `arborist-EXAMPLE-TIMESTAMP` | Gen3 policy engine | +| [Fence][fence] | `fence-EXAMPLE` | `fence-EXAMPLE-TIMESTAMP` | AuthN/AuthZ OIDC service | +| [Gecko][gecko] | `gecko-EXAMPLE` | `gecko-EXAMPLE-TIMESTAMP` | Frontend configurations for dynamic data loading | +| [Indexd][indexd] | `indexd-EXAMPLE` | `indexd-EXAMPLE-TIMESTAMP` | Data indexing and tracking service | +| [Requestor][requestor] | `requestor-EXAMPLE` | `requestor-EXAMPLE-TIMESTAMP` | Data access manager | + +[arborist]: https://github.com/uc-cdis/arborist +[fence]: https://github.com/uc-cdis/fence +[gecko]: https://github.com/aced-idp/gecko +[indexd]: https://github.com/uc-cdis/indexd +[requestor]: https://github.com/uc-cdis/requestor + +# 5. Alternatives 📖 > [!TIP] > The alternative options below work on the K8s resources themseleves (e.g. PVC/PV) as opposed to database resources (e.g. Postgres tables, ElasticSearch indices) From 8473b3c87609865a8798271f968d990caeae1d23 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Wed, 1 Oct 2025 16:22:26 -0700 Subject: [PATCH 05/30] fix: Remove `limit` parameter from Grip functions --- entrypoint.sh | 1 - src/backup/grip/__init__.py | 14 +++++++------- src/backup/main.py | 14 ++++++++------ src/backup/options.py | 23 +++++++---------------- 4 files changed, 22 insertions(+), 30 deletions(-) diff --git a/entrypoint.sh b/entrypoint.sh index bf53b42..87e5548 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -17,7 +17,6 @@ bak --debug grip backup \ --dir "${DIR}" \ --host "${GRIP_HOST}" \ --graph "${GRIP_GRAPH}" \ - --limit "${GRIP_LIMIT}" \ --vertex \ --edge diff --git a/src/backup/grip/__init__.py b/src/backup/grip/__init__.py index d14bbaa..fff5625 100644 --- a/src/backup/grip/__init__.py +++ b/src/backup/grip/__init__.py @@ -32,7 +32,7 @@ class GripConfig: port: int -def _getEdges(grip: GripConfig, graph: str, limit: int) -> list[str]: +def _getEdges(grip: GripConfig, graph: str) -> list[str]: """ Utility function to connect to Grip and list all edges. """ @@ -45,13 +45,13 @@ def _getEdges(grip: GripConfig, graph: str, limit: int) -> list[str]: G = c.graph(graph) - for i in G.query().E().limit(limit): + for i in G.query().E(): edges.append(i) return edges -def _getVertices(grip: GripConfig, graph: str, limit: int) -> list[str]: +def _getVertices(grip: GripConfig, graph: str) -> list[str]: """ Utility function to connect to Grip and list all vertices. """ @@ -62,7 +62,7 @@ def _getVertices(grip: GripConfig, graph: str, limit: int) -> list[str]: G = c.graph(graph) - for i in G.query().V().limit(limit): + for i in G.query().V(): vertices.append(i) return vertices @@ -83,7 +83,7 @@ def _connect(grip: GripConfig) -> gripql.Connection: return client -def _dump(grip: GripConfig, graph: str, limit: int, vertex: bool, edge: bool, out: Path) -> None: +def _dump(grip: GripConfig, graph: str, vertex: bool, edge: bool, out: Path) -> None: # Dump conn = _connect(grip) G = conn.graph(graph) @@ -91,12 +91,12 @@ def _dump(grip: GripConfig, graph: str, limit: int, vertex: bool, edge: bool, ou # write vertex and edge objects from grip DB to file if vertex: with open(out / f"{graph}.vertices", "wb") as f: - for i in G.query().V().limit(limit): + for i in G.query().V(): f.write(orjson.dumps(i, option=orjson.OPT_APPEND_NEWLINE)) if edge: with open(out / f"{graph}.edges", "wb") as f: - for i in G.query().E().limit(limit): + for i in G.query().E(): f.write(orjson.dumps(i, option=orjson.OPT_APPEND_NEWLINE)) # TODO: At this point you will need to reconnect to the new grip instance to load the data that was dumped diff --git a/src/backup/main.py b/src/backup/main.py index b0e1608..a029ad0 100644 --- a/src/backup/main.py +++ b/src/backup/main.py @@ -73,23 +73,25 @@ def grip(): @grip.command(name="ls") @grip_options -def list_grip(host: str, port: int, graph: str, limit: int, vertex: bool, edge: bool): +def list_grip(host: str, port: int, graph: str, vertex: bool, edge: bool): """list GRIP vertices and/or edges""" conf = GripConfig(host=host, port=port) if vertex: - for v in _getVertices(conf, graph, limit): + logging.debug(f"Listing vertices from GRIP graph '{graph}' at {conf.host}:{conf.port}") + for v in _getVertices(conf, graph): click.echo(json.dumps(v, indent=2)) if edge: - for e in _getEdges(conf, graph, limit): + logging.debug(f"Listing edges from GRIP graph '{graph}' at {conf.host}:{conf.port}") + for e in _getEdges(conf, graph): click.echo(json.dumps(e, indent=2)) @grip.command(name="backup") @grip_options @dir_options -def backup_grip(host: str, port: int, graph: str, limit: int, vertex: bool, edge: bool, dir: Path): +def backup_grip(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path): """grip ➜ local""" conf = GripConfig(host=host, port=port) @@ -97,12 +99,12 @@ def backup_grip(host: str, port: int, graph: str, limit: int, vertex: bool, edge dir.mkdir(parents=True, exist_ok=True) logging.debug(f"Backing up GRIP graph '{graph}' to directory '{dir}'") - _gripDump(conf, graph, limit, vertex, edge, dir) + _gripDump(conf, graph, vertex, edge, dir) # TODO: Better way to handle GRIP graph schemas? schema = f"{graph}__schema__" logging.debug(f"Backing up GRIP graph '{schema}' to directory '{dir}'") - _gripDump(conf, schema, limit, vertex, edge, dir) + _gripDump(conf, schema, vertex, edge, dir) @grip.command(name="restore") diff --git a/src/backup/options.py b/src/backup/options.py index 4dfc2b2..15808f2 100644 --- a/src/backup/options.py +++ b/src/backup/options.py @@ -6,7 +6,12 @@ def grip_options(fn): options = [ click.option( - "--edge", "--edges", "-e", is_flag=True, help="Output GRIP edges." + "--edge", + "--edges", + "-e", + is_flag=True, + default=True, + help="Output GRIP edges.", ), click.option("--graph", "-g", default="CALYPR", help="Name of the GRIP graph."), click.option( @@ -17,14 +22,6 @@ def grip_options(fn): show_default=True, help="GRIP host ($GRIPHOST)", ), - click.option( - "--limit", - "-l", - envvar="GRIP_LIMIT", - type=int, - default=10000, - help="Limit number of items listed.", - ), click.option( "--port", "-p", @@ -38,9 +35,9 @@ def grip_options(fn): "--vertices", "-v", is_flag=True, + default=True, help="Output GRIP vertices.", ), - ] for option in reversed(options): fn = option(fn) @@ -74,12 +71,6 @@ def pg_options(fn): show_default=True, help="Postgres username ($PGUSER)", ), - click.option( - "--password", - "-P", - envvar="PGPASSWORD", - help="Postgres password ($PGPASSWORD)", - ), ] for option in reversed(options): fn = option(fn) From 483ee12dfc065cd1c63984464efff3f2e459c81b Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Wed, 1 Oct 2025 17:23:26 -0700 Subject: [PATCH 06/30] chore: Update gripql to 0.8.0 --- requirements.txt | 1 - src/backup/grip/__init__.py | 10 +++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/requirements.txt b/requirements.txt index 51bd166..8422c44 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,5 +8,4 @@ psycopg2-binary pytest # Development dependencies -# TODO: Move to separate file (e.g. dev-requirements.txt) testcontainers diff --git a/src/backup/grip/__init__.py b/src/backup/grip/__init__.py index fff5625..6765918 100644 --- a/src/backup/grip/__init__.py +++ b/src/backup/grip/__init__.py @@ -45,7 +45,7 @@ def _getEdges(grip: GripConfig, graph: str) -> list[str]: G = c.graph(graph) - for i in G.query().E(): + for i in G.E(): edges.append(i) return edges @@ -62,7 +62,7 @@ def _getVertices(grip: GripConfig, graph: str) -> list[str]: G = c.graph(graph) - for i in G.query().V(): + for i in G.V(): vertices.append(i) return vertices @@ -91,12 +91,12 @@ def _dump(grip: GripConfig, graph: str, vertex: bool, edge: bool, out: Path) -> # write vertex and edge objects from grip DB to file if vertex: with open(out / f"{graph}.vertices", "wb") as f: - for i in G.query().V(): + for i in G.V(): f.write(orjson.dumps(i, option=orjson.OPT_APPEND_NEWLINE)) if edge: with open(out / f"{graph}.edges", "wb") as f: - for i in G.query().E(): + for i in G.E(): f.write(orjson.dumps(i, option=orjson.OPT_APPEND_NEWLINE)) # TODO: At this point you will need to reconnect to the new grip instance to load the data that was dumped @@ -132,7 +132,7 @@ def _restore(grip: GripConfig, graph: str, dir: Path): _to = data["_to"] _from = data["_from"] del data["_id"], data["_label"], data["_to"], data["_from"] - bulkE.addEdge(_to, _from, _label, data=data, gid=_id) + bulkE.addEdge(_to, _from, _label, data=data, id=_id) count += 1 if count % 10000 == 0: print("loaded %d edges" % count) From 4cddc797260a75e8178e3772ecf3ffd16f418b6e Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 3 Oct 2025 13:45:57 -0700 Subject: [PATCH 07/30] feat: Support env variables for S3 credentials - ref: https://github.com/minio/minio-go/blob/v7.0.95/pkg/credentials/env_aws.go#L27-L28 --- entrypoint.sh | 4 ++-- src/backup/options.py | 13 +++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/entrypoint.sh b/entrypoint.sh index 87e5548..c0a383a 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -25,5 +25,5 @@ bak --debug s3 upload \ --dir "${DIR}" \ --endpoint "${ENDPOINT}" \ --bucket "${BUCKET}" \ - --key "${KEY}" \ - --secret "${SECRET}" + --key "${ACCESS_KEY}" \ + --secret "${SECRET_KEY}" diff --git a/src/backup/options.py b/src/backup/options.py index 15808f2..c9f4e98 100644 --- a/src/backup/options.py +++ b/src/backup/options.py @@ -88,10 +88,15 @@ def s3_options(fn): help="S3 endpoint URL", ), click.option("--bucket", "-b", required=True, help="S3 bucket name"), - # TODO: Support env variables for S3 credentials - # ref: https://github.com/minio/minio-go/blob/v7.0.95/pkg/credentials/env_aws.go#L27-L28 - click.option("--key", "-k", help="S3 access key ID ($AWS_ACCESS_KEY)"), - click.option("--secret", "-s", help="S3 secret access key ($AWS_SECRET_KEY)"), + click.option( + "--key", "-k", envvar="ACCESS_KEY", help="S3 access key ID ($ACCESS_KEY)" + ), + click.option( + "--secret", + "-s", + envvar="SECRET_KEY", + help="S3 secret access key ($SECRET_KEY)", + ), ] for option in options: fn = option(fn) From c650674190ce9e5477a4fe768cb165c1722bbdd5 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 3 Oct 2025 13:54:15 -0700 Subject: [PATCH 08/30] fix: Remove unused PGPASSWORD flag --- entrypoint.sh | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/entrypoint.sh b/entrypoint.sh index c0a383a..20cc004 100644 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -9,8 +9,7 @@ export DIR="${DIR}/${TIMESTAMP}" bak --debug pg dump \ --dir "${DIR}" \ --host "${PGHOST}" \ - --user "${PGUSER}" \ - --password "${PGPASSWORD}" + --user "${PGUSER}" # GRIP Backup bak --debug grip backup \ From 53e2c7fa0deb1d73077ab1783509af1e7916c529 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 3 Oct 2025 14:04:37 -0700 Subject: [PATCH 09/30] fix: Remove PGPASSWORD parameters in favor of env vars --- src/backup/main.py | 14 +++++++------- src/backup/postgres/__init__.py | 6 ------ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/src/backup/main.py b/src/backup/main.py index a029ad0..6da189f 100644 --- a/src/backup/main.py +++ b/src/backup/main.py @@ -56,7 +56,7 @@ def cli(verbose: bool): ) # Avoid INFO and ElasticsearchWarning logging from the elasticsearch logger - # https://stackoverflow.com/a/47157553 + # ref: https://stackoverflow.com/a/47157553 logging.getLogger("elastic_transport.transport").setLevel(logging.CRITICAL) warnings.simplefilter("ignore", ElasticsearchWarning) @@ -125,9 +125,9 @@ def pg(): @pg.command(name="ls") @pg_options -def listDbs(host: str, port: int, user: str, password: str): +def listDbs(host: str, port: int, user: str): """list databases""" - conf = PGConfig(host=host, port=port, user=user, password=password) + conf = PGConfig(host=host, port=port, user=user) dbs = _getDbs(conf) if not dbs: @@ -142,9 +142,9 @@ def listDbs(host: str, port: int, user: str, password: str): @pg.command(name="dump") @pg_options @dir_options -def dump_postgres(host: str, port: int, user: str, password: str, dir: Path): +def dump_postgres(host: str, port: int, user: str, dir: Path): """postgres ➜ local""" - conf = PGConfig(host=host, port=port, user=user, password=password) + conf = PGConfig(host=host, port=port, user=user) # Dump directory dir.mkdir(parents=True, exist_ok=True) @@ -163,9 +163,9 @@ def dump_postgres(host: str, port: int, user: str, password: str, dir: Path): @pg.command(name="restore") @pg_options @dir_options -def restore_postgres(host: str, port: int, user: str, password: str, dir: Path): +def restore_postgres(host: str, port: int, user: str, dir: Path): """local ➜ postgres""" - conf = PGConfig(host=host, port=port, user=user, password=password) + conf = PGConfig(host=host, port=port, user=user) dbs = _getDbs(conf) if not dbs: diff --git a/src/backup/postgres/__init__.py b/src/backup/postgres/__init__.py index d4d80fa..c2a3548 100644 --- a/src/backup/postgres/__init__.py +++ b/src/backup/postgres/__init__.py @@ -14,7 +14,6 @@ class PGConfig: host: str port: int user: str - password: str def _connect(pgConfig: PGConfig) -> connection: @@ -24,12 +23,10 @@ def _connect(pgConfig: PGConfig) -> connection: assert pgConfig.host, "Host must not be empty" assert pgConfig.port, "Port must not be empty" assert pgConfig.user, "User must not be empty" - assert pgConfig.password, "Password must not be empty" try: connection = psycopg2.connect( user=pgConfig.user, - password=pgConfig.password, host=pgConfig.host, port=pgConfig.port, ) @@ -80,9 +77,6 @@ def _dump(pgConfig: PGConfig, db: str, dir: Path) -> Path: "--no-password", ] - # Set the environment variable for the password - env = {"PGPASSWORD": pgConfig.password} - # Dump File dump = Path(f"{dir}/{db}.sql") From c9581a35385633aedbeeb9dfcece6ae4806fbebc Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 3 Oct 2025 14:09:35 -0700 Subject: [PATCH 10/30] fix: update `pg_dump` env vars --- src/backup/postgres/__init__.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/backup/postgres/__init__.py b/src/backup/postgres/__init__.py index c2a3548..036d98d 100644 --- a/src/backup/postgres/__init__.py +++ b/src/backup/postgres/__init__.py @@ -2,6 +2,7 @@ from pathlib import Path from psycopg2.extensions import connection import logging +import os import psycopg2 import shutil import subprocess @@ -90,10 +91,12 @@ def _dump(pgConfig: PGConfig, db: str, dir: Path) -> Path: stdout=out, stderr=subprocess.PIPE, check=True, - env=env, + env=os.environ.copy(), ) except subprocess.CalledProcessError as e: - logging.error(f"Error dumping database '{db}': {e}, stderr: {e.stderr.decode() if e.stderr else ''}") + logging.error( + f"Error dumping database '{db}': {e}, stderr: {e.stderr.decode() if e.stderr else ''}" + ) raise return dump @@ -109,7 +112,6 @@ def _restore(pgConfig: PGConfig, db: str, dir: Path) -> Path: logging.error(f"Dump file {dump} does not exist") raise FileNotFoundError(f"Dump file {dump} does not exist") - if not shutil.which("pg_restore"): logging.error("pg_restore not found in PATH") From 35c70b46cad2a4858ea596e9d740ecf44686048b Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 7 Oct 2025 14:17:48 -0700 Subject: [PATCH 11/30] feat: Update call to GRIP edges - To `G.V().outE()` from `G.query().E()` --- entrypoint.sh | 21 ++++++++++----------- src/backup/grip/__init__.py | 12 ++++++++++-- src/backup/options.py | 4 ++-- src/backup/postgres/__init__.py | 1 + 4 files changed, 23 insertions(+), 15 deletions(-) mode change 100644 => 100755 entrypoint.sh diff --git a/entrypoint.sh b/entrypoint.sh old mode 100644 new mode 100755 index 20cc004..6096030 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -6,23 +6,22 @@ TIMESTAMP=$(date +"%Y-%m-%dT%H:%M:%S") export DIR="${DIR}/${TIMESTAMP}" # Postgres Dump -bak --debug pg dump \ - --dir "${DIR}" \ - --host "${PGHOST}" \ - --user "${PGUSER}" +#bak --debug pg dump \ +# --dir "${DIR}" \ +# --host "${PGHOST}" \ +# --user "${PGUSER}" # GRIP Backup bak --debug grip backup \ --dir "${DIR}" \ --host "${GRIP_HOST}" \ --graph "${GRIP_GRAPH}" \ - --vertex \ --edge # S3 Upload -bak --debug s3 upload \ - --dir "${DIR}" \ - --endpoint "${ENDPOINT}" \ - --bucket "${BUCKET}" \ - --key "${ACCESS_KEY}" \ - --secret "${SECRET_KEY}" +#bak --debug s3 upload \ +# --dir "${DIR}" \ +# --endpoint "${ENDPOINT}" \ +# --bucket "${BUCKET}" \ +# --key "${ACCESS_KEY}" \ +# --secret "${SECRET_KEY}" diff --git a/src/backup/grip/__init__.py b/src/backup/grip/__init__.py index 6765918..fca86ec 100644 --- a/src/backup/grip/__init__.py +++ b/src/backup/grip/__init__.py @@ -88,15 +88,23 @@ def _dump(grip: GripConfig, graph: str, vertex: bool, edge: bool, out: Path) -> conn = _connect(grip) G = conn.graph(graph) + # Run single query to get all vertices + # Rather than G.V() and G.V().outE() + vertices = G.V() + # write vertex and edge objects from grip DB to file if vertex: with open(out / f"{graph}.vertices", "wb") as f: - for i in G.V(): + for i in vertices: f.write(orjson.dumps(i, option=orjson.OPT_APPEND_NEWLINE)) if edge: with open(out / f"{graph}.edges", "wb") as f: - for i in G.E(): + # Note: + # Using G.V().outE() here to return all edges + # G.V().BothE() would return duplicate edges (outbound and inbound) + # Ref: https://github.com/bmeg/grip/blob/0.8.0/conformance/tests/ot_basic.py#L129-L140 + for i in vertices.outE(): f.write(orjson.dumps(i, option=orjson.OPT_APPEND_NEWLINE)) # TODO: At this point you will need to reconnect to the new grip instance to load the data that was dumped diff --git a/src/backup/options.py b/src/backup/options.py index c9f4e98..a59bdbc 100644 --- a/src/backup/options.py +++ b/src/backup/options.py @@ -10,7 +10,7 @@ def grip_options(fn): "--edges", "-e", is_flag=True, - default=True, + default=False, help="Output GRIP edges.", ), click.option("--graph", "-g", default="CALYPR", help="Name of the GRIP graph."), @@ -35,7 +35,7 @@ def grip_options(fn): "--vertices", "-v", is_flag=True, - default=True, + default=False, help="Output GRIP vertices.", ), ] diff --git a/src/backup/postgres/__init__.py b/src/backup/postgres/__init__.py index 036d98d..55d6ba0 100644 --- a/src/backup/postgres/__init__.py +++ b/src/backup/postgres/__init__.py @@ -30,6 +30,7 @@ def _connect(pgConfig: PGConfig) -> connection: user=pgConfig.user, host=pgConfig.host, port=pgConfig.port, + password=os.getenv("PGPASSWORD"), ) except Exception as err: logging.error(f"Error connecting to Postgres: {err}") From 4e819017583aeb7ca39160e9d09a805f8f5ef392 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 7 Oct 2025 14:24:18 -0700 Subject: [PATCH 12/30] fix: Update Dockerfile to include `pg_config` before Python build --- Dockerfile | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/Dockerfile b/Dockerfile index fc946cb..50e0cea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,28 +1,8 @@ -# GRIP build -# Ref: https://github.com/bmeg/grip/blob/develop/Dockerfile -FROM golang:1.17.2-alpine AS grip - -RUN apk add --no-cache make git bash build-base - -ENV GOPATH=/go -ENV PATH="/go/bin:${PATH}" - -WORKDIR /go/src/github.com/bmeg - -RUN git clone https://github.com/bmeg/grip - -WORKDIR /go/src/github.com/bmeg/grip - -# Checkout latest GRIP tag. Example: -# $ git describe --tags --abbrev=0 -# v1.9.0 -RUN git checkout $(git describe --tags --abbrev=0) - -RUN make install - # Backup build FROM python:slim +RUN apt-get update && apt-get install -y --no-install-recommends postgresql-client libpq-dev + WORKDIR /app COPY requirements.txt . @@ -39,9 +19,4 @@ RUN mkdir -p /backups COPY entrypoint.sh ./entrypoint.sh RUN chmod +x ./entrypoint.sh -RUN apt-get update && apt-get install -y --no-install-recommends postgresql-client - -# Copy GRIP binary from build stage -COPY --from=grip /go/bin/grip /usr/local/bin/grip - ENTRYPOINT ["./entrypoint.sh"] From 5c0eebc015c594f9dc39e5109d3fc703284d5984 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 7 Oct 2025 14:26:24 -0700 Subject: [PATCH 13/30] fix: Add `gcc` dependency to Dockerfile for psycopg2 build --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 50e0cea..a9c4125 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,7 @@ # Backup build FROM python:slim -RUN apt-get update && apt-get install -y --no-install-recommends postgresql-client libpq-dev +RUN apt-get update && apt-get install -y --no-install-recommends postgresql-client libpq-dev gcc WORKDIR /app From 2daf67f1046c7768dedcee2befef51f843ad8cf0 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 7 Oct 2025 14:29:07 -0700 Subject: [PATCH 14/30] fix: Update Dockerfile --- Dockerfile | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index a9c4125..046fff5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,11 @@ # Backup build FROM python:slim -RUN apt-get update && apt-get install -y --no-install-recommends postgresql-client libpq-dev gcc +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + gcc \ + libpq-dev \ + postgresql-client WORKDIR /app From 8a9f6686b8e430363879354ef9822f8bb9351b98 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 7 Oct 2025 14:33:04 -0700 Subject: [PATCH 15/30] chore: Re-enable Postgres + S3 operations --- entrypoint.sh | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/entrypoint.sh b/entrypoint.sh index 6096030..37545da 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -6,10 +6,10 @@ TIMESTAMP=$(date +"%Y-%m-%dT%H:%M:%S") export DIR="${DIR}/${TIMESTAMP}" # Postgres Dump -#bak --debug pg dump \ -# --dir "${DIR}" \ -# --host "${PGHOST}" \ -# --user "${PGUSER}" +bak --debug pg dump \ + --dir "${DIR}" \ + --host "${PGHOST}" \ + --user "${PGUSER}" # GRIP Backup bak --debug grip backup \ @@ -19,9 +19,9 @@ bak --debug grip backup \ --edge # S3 Upload -#bak --debug s3 upload \ -# --dir "${DIR}" \ -# --endpoint "${ENDPOINT}" \ -# --bucket "${BUCKET}" \ -# --key "${ACCESS_KEY}" \ -# --secret "${SECRET_KEY}" +bak --debug s3 upload \ + --dir "${DIR}" \ + --endpoint "${ENDPOINT}" \ + --bucket "${BUCKET}" \ + --key "${ACCESS_KEY}" \ + --secret "${SECRET_KEY}" From 2b98f35837c6289b0ab7eaa518f01388f8cb8637 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 7 Oct 2025 17:35:47 -0700 Subject: [PATCH 16/30] fix: Update tests + re-add `--vertex` flag to GRIP command --- entrypoint.sh | 6 ++++++ tests/conftest.py | 6 ++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/entrypoint.sh b/entrypoint.sh index 37545da..92ff24e 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -16,6 +16,7 @@ bak --debug grip backup \ --dir "${DIR}" \ --host "${GRIP_HOST}" \ --graph "${GRIP_GRAPH}" \ + --vertex \ --edge # S3 Upload @@ -25,3 +26,8 @@ bak --debug s3 upload \ --bucket "${BUCKET}" \ --key "${ACCESS_KEY}" \ --secret "${SECRET_KEY}" + +echo "Backup Complete:" +echo "- ENDPOINT: ${ENDPOINT}" +echo "- BUCKET: ${BUCKET}" +echo "- DIR: ${DIR}" diff --git a/tests/conftest.py b/tests/conftest.py index bf7be8a..e3f5865 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import os from backup.postgres import PGConfig from backup.s3 import S3Config from minio import Minio @@ -26,11 +27,12 @@ def testPostgres(): with PostgresContainer("postgres") as postgres: logging.debug(f"Postgres ready at {postgres.get_connection_url}") + # Set PGPASSWORD environment variable for authentication + os.environ["PGPASSWORD"] = postgres.password + yield PGConfig( # Default: test user=postgres.username, - # Default: test - password=postgres.password, host="localhost", port=postgres.get_exposed_port(5432), ) From 0b197fed399c51bb30c7330d9c7f89ab93171815 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 17 Oct 2025 13:37:39 -0700 Subject: [PATCH 17/30] fix: Downgrade PostgreSQL in Backup Service to match Server version --- Dockerfile | 9 ++++++++- src/backup/postgres/__init__.py | 9 ++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 046fff5..6074db0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,18 @@ # Backup build FROM python:slim +# Note: Installing Postgres 14 for now to match the versions used by Gen3-Helm +# +# Gen3-Helm Chart +# https://github.com/uc-cdis/gen3-helm/blob/gen3-0.2.69/helm/gen3/Chart.yaml#L143-L146 +# +# Bitnami PostgreSQL 11.9.13 (14.5.0): +# https://github.com/bitnami/charts/blob/c6076945ecc47791d82e545a20ef690dd93ff662/bitnami/postgresql/Chart.yaml#L4 RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ gcc \ libpq-dev \ - postgresql-client + postgresql-client-14 WORKDIR /app diff --git a/src/backup/postgres/__init__.py b/src/backup/postgres/__init__.py index 55d6ba0..77f3bd5 100644 --- a/src/backup/postgres/__init__.py +++ b/src/backup/postgres/__init__.py @@ -142,4 +142,11 @@ def _restore(pgConfig: PGConfig, db: str, dir: Path) -> Path: return dump except subprocess.CalledProcessError as e: - raise + stdout = e.stdout.decode(errors="replace") if e.stdout else "" + stderr = e.stderr.decode(errors="replace") if e.stderr else "" + logging.error( + f"Error restoring database '{db}': returncode={e.returncode}; stdout={stdout}; stderr={stderr}" + ) + raise RuntimeError( + f"pg_restore failed for '{db}': returncode={e.returncode}; stdout={stdout}; stderr={stderr}" + ) from e From 581e39f30407ba63aa82284b812e6863b5a85ab7 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 17 Oct 2025 13:44:43 -0700 Subject: [PATCH 18/30] fix: Update Dockerfile to install postgresql-client-14 --- Dockerfile | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 6074db0..8c081d2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,16 +3,24 @@ FROM python:slim # Note: Installing Postgres 14 for now to match the versions used by Gen3-Helm # -# Gen3-Helm Chart +# Gen3-Helm Chart: # https://github.com/uc-cdis/gen3-helm/blob/gen3-0.2.69/helm/gen3/Chart.yaml#L143-L146 # -# Bitnami PostgreSQL 11.9.13 (14.5.0): +# Bitnami Postgres 11.9.13 (14.5.0): # https://github.com/bitnami/charts/blob/c6076945ecc47791d82e545a20ef690dd93ff662/bitnami/postgresql/Chart.yaml#L4 -RUN apt-get update && apt-get install -y --no-install-recommends \ +# +# Postgres Installation docs: +# https://www.postgresql.org/download/linux/debian/ +RUN apt-get update && apt-get install -y \ build-essential \ gcc \ - libpq-dev \ - postgresql-client-14 + libpq-dev + +RUN apt-get install -y postgresql-common + +RUN YES=true /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh + +RUN apt-get update && apt-get install -y postgresql-client-14 WORKDIR /app From 0cbdd8089e2ae0c47fa503a4354ba05b11aced51 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Mon, 20 Oct 2025 11:51:00 -0700 Subject: [PATCH 19/30] fix: Update call to get edges to match latest syntax (`G.V().outE()`) --- src/backup/grip/__init__.py | 6 +++--- src/backup/main.py | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/backup/grip/__init__.py b/src/backup/grip/__init__.py index fca86ec..1fc24a8 100644 --- a/src/backup/grip/__init__.py +++ b/src/backup/grip/__init__.py @@ -45,7 +45,7 @@ def _getEdges(grip: GripConfig, graph: str) -> list[str]: G = c.graph(graph) - for i in G.E(): + for i in G.V().outE(): edges.append(i) return edges @@ -116,7 +116,7 @@ def _restore(grip: GripConfig, graph: str, dir: Path): G = conn.graph(graph) bulkV = G.bulkAdd() - with open("grip.vertices", "rb") as f: + with open(dir / f"{graph}.vertices", "rb") as f: count = 0 for i in f: data = orjson.loads(i) @@ -131,7 +131,7 @@ def _restore(grip: GripConfig, graph: str, dir: Path): print("Vertices load res: ", str(err)) bulkE = G.bulkAdd() - with open("grip.edges", "rb") as f: + with open(dir / f"{graph}.edges", "rb") as f: count = 0 for i in f: data = orjson.loads(i) diff --git a/src/backup/main.py b/src/backup/main.py index 6da189f..e6be9ba 100644 --- a/src/backup/main.py +++ b/src/backup/main.py @@ -78,12 +78,16 @@ def list_grip(host: str, port: int, graph: str, vertex: bool, edge: bool): conf = GripConfig(host=host, port=port) if vertex: - logging.debug(f"Listing vertices from GRIP graph '{graph}' at {conf.host}:{conf.port}") + logging.debug( + f"Listing vertices from GRIP graph '{graph}' at {conf.host}:{conf.port}" + ) for v in _getVertices(conf, graph): click.echo(json.dumps(v, indent=2)) if edge: - logging.debug(f"Listing edges from GRIP graph '{graph}' at {conf.host}:{conf.port}") + logging.debug( + f"Listing edges from GRIP graph '{graph}' at {conf.host}:{conf.port}" + ) for e in _getEdges(conf, graph): click.echo(json.dumps(e, indent=2)) @@ -110,7 +114,7 @@ def backup_grip(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: @grip.command(name="restore") @grip_options @dir_options -def restore_grip(host: str, port: int, graph: str, dir: Path): +def restore_grip(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path): """local ➜ grip""" conf = GripConfig(host=host, port=port) From 6db260bb21a115056022b0943e345f85fe86d60a Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 21 Oct 2025 11:22:06 -0700 Subject: [PATCH 20/30] feat: Re-add support for ElasticSearch backups --- requirements.txt | 8 ++ src/backup/elasticsearch/__init__.py | 142 +++++++++++++++++++++++++++ src/backup/main.py | 124 +++++++++++++++++++++++ src/backup/options.py | 62 ++++++++++++ 4 files changed, 336 insertions(+) create mode 100644 src/backup/elasticsearch/__init__.py diff --git a/requirements.txt b/requirements.txt index 8422c44..2d45a1e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,11 @@ +# TODO: Using rc here to avoid psycopg errors: +# ERROR: Failed building wheel for psycopg2-binary +# error: expression is not assignable +# error: call to undeclared function 'gettimeofday'; +# +# Use non-rc version of aced-submission when released: +# https://pypi.org/project/aced-submission/#history +aced-submission>=0.0.10rc18 click click-aliases elasticsearch diff --git a/src/backup/elasticsearch/__init__.py b/src/backup/elasticsearch/__init__.py new file mode 100644 index 0000000..a4e9bf8 --- /dev/null +++ b/src/backup/elasticsearch/__init__.py @@ -0,0 +1,142 @@ +from dataclasses import dataclass +import logging +from pathlib import Path +import subprocess +from elasticsearch import Elasticsearch + + +@dataclass +class ESConfig: + """ElasticSearch config""" + + host: str + port: int + user: str + password: str + # Backup repo + # https://www.elastic.co/docs/deploy-manage/tools/snapshot-and-restore/self-managed + repo: str = "" + bucket: str = "" + endpoint: str = "" + + +def _connect(esConfig: ESConfig) -> Elasticsearch: + """ + Connects to a given ElasticSearch instance. + """ + assert esConfig.host, "Host must not be empty" + assert esConfig.port, "Port must not be empty" + + try: + elastic = Elasticsearch( + hosts=[{"host": esConfig.host, "port": esConfig.port, "scheme": "http"}], + ) + except Exception as err: + logging.error(f"Error connecting to Elasticsearch: {err}") + raise + + return elastic + + +def _getIndices(esConfig: ESConfig) -> list[str]: + """ + Utiltity function to connect to ElasticSearch and list all indices. + """ + elastic = _connect(esConfig) + + # Get all indices using the cat.indices() method + indices = elastic.cat.indices(h="index").splitlines() + + return indices + + +def _getRepos(esConfig: ESConfig) -> list[str] | None: + """ + Utility function to connect to ElasticSearch and list all snapshot repositories. + """ + elastic = _connect(esConfig) + + try: + repos = elastic.snapshot.get_repository(name="_all") # Get all repositories + repo_names = list(repos.keys()) # Extract just the names + return repo_names + except Exception as err: + logging.error(f"Error listing Elasticsearch repositories: {err}") + return None + + +def _initRepo(esConfig: ESConfig) -> bool: + """ + Initializes a snapshot repository in ElasticSearch. + """ + elastic = _connect(esConfig) + + # Create the repository + elastic.snapshot.create_repository( + name=esConfig.repo, + body={ + "type": "s3", + "endpoint": esConfig.endpoint, + "bucket": esConfig.bucket, + "base_path": esConfig.repo, + "access_key": esConfig.user, + "secret_key": esConfig.password, + }, + ) + logging.info(f"Repository '{esConfig.repo}' created successfully.") + return True + + +def _dump(esConfig: ESConfig, index: str): + """ + Creates a snapshot of a single index using Elasticsearch Snapshot API. + """ + elastic = _connect(esConfig) + + # Check if index exists before attempting to snapshot + if not elastic.indices.exists(index=index): + logging.warning(f"Index '{index}' not found") + + response = elastic.snapshot.create( + repository=esConfig.repo, + snapshot=f"{index}", + wait_for_completion=True, + ) + + if response["snapshot"]["state"] == "SUCCESS": + return response["snapshot"]["snapshot_id"] + else: + logging.error(f"Snapshot '{index}' error: {response}") + + +def _restore(esConfig: ESConfig, index: str, snapshot: str) -> bool: + """ + Restores a single index from a snapshot using Elasticsearch Snapshot API. + """ + elastic = _connect(esConfig) + if elastic is None: + return False + + # Check if the snapshot exists + snapshot_info = elastic.snapshot.get(repository=esConfig.repo, snapshot=snapshot) + if not snapshot_info["snapshots"]: + logging.error(f"Snapshot '{snapshot}' not found in repo '{esConfig.repo}'") + return False + + # Close the index before restoring + if elastic.indices.exists(index=index): + elastic.indices.close(index=index) + + response = elastic.snapshot.restore( + repository=esConfig.repo, + snapshot=snapshot, + body={"indices": index}, + wait_for_completion=True, + ) + + if response["snapshot"]["state"] == "SUCCESS": + return True + + else: + logging.error(f"Snapshot '{snapshot}' error: {response}") + return False diff --git a/src/backup/main.py b/src/backup/main.py index e6be9ba..10ee0b2 100644 --- a/src/backup/main.py +++ b/src/backup/main.py @@ -1,3 +1,11 @@ +from backup.elasticsearch import ( + ESConfig, + _getIndices, + _dump as _esDump, + _restore as _esRestore, + _getRepos, + _initRepo, +) from backup.grip import ( GripConfig, _getEdges, @@ -18,6 +26,7 @@ ) from backup.options import ( dir_options, + es_options, grip_options, pg_options, s3_options, @@ -65,6 +74,121 @@ def cli(verbose: bool): cli() + +@cli.group(aliases=["es"]) +def es(): + """Commands for ElasticSearch backups.""" + pass + + +@es.command(name="ls") +@es_options +def listIndices(host: str, port: int, user: str, password: str): + """list indices""" + esConfig = ESConfig(host=host, port=port, user=user, password=password) + + indices = _getIndices(esConfig) + if not indices: + logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") + return + + # List indices + for index in indices: + click.echo(index) + + +@es.command(name="ls-repo") # New command for listing repositories +@es_options +def listRepos(host: str, port: int, user: str, password: str): + """list snapshot repositories""" + esConfig = ESConfig(host=host, port=port, user=user, password=password) + + repos = _getRepos(esConfig) + if not repos: + logging.warning( + f"No snapshot repositories found at {esConfig.host}:{esConfig.port}" + ) + return + + # List repositories + for repo in repos: + click.echo(repo) + + +@es.command(name="init-repo") # New command for initializing a repository +@es_options +@s3_options +@click.option( + "--repo-name", + "-r", + required=True, + help="Name of the Elasticsearch snapshot repository to initialize.", +) +def initRepo( + host: str, + port: int, + user: str, + password: str, + repo_name: str, + endpoint: str, + bucket: str, + key: str, + secret: str, +): + """initialize a snapshot repository""" + # Create ElasticSearchConfig including S3 endpoint and bucket for repository creation + esConfig = ESConfig( + host=host, + port=port, + user=user, + password=password, + repo=repo_name, + endpoint=endpoint, + bucket=bucket, + ) + + success = _initRepo(esConfig) + if success: + click.echo(f"Repository '{repo_name}' initialized successfully.") + else: + logging.error(f"Failed to initialize repository '{repo_name}'.") + + +@es.command(name="backup") +@es_options +def backup_es(host: str, port: int, user: str, password: str): + """elasticsearch ➜ local""" + + esConfig = ESConfig(host=host, port=port, user=user, password=password) + indices = _getIndices(esConfig) + if not indices: + logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") + return + + for index in indices: + snapshot = _esDump(esConfig, index) + logging.debug(f"Dumped index '{index}' to '{snapshot}'") + + +@es.command(name="restore") +@es_options +@dir_options +def restore_es(host: str, port: int, user: str, password: str, snapshot: str): + """local ➜ elasticsearch""" + esConfig = ESConfig(host=host, port=port, user=user, password=password) + + indices = _getIndices(esConfig) + if not indices: + logging.warning( + f"No indices found to restore at {esConfig.host}:{esConfig.port}." + ) + return + + # Restore indices + for index in indices: + _ = _esRestore(esConfig, index, snapshot) + + @cli.group(aliases=["gp"]) def grip(): """Commands for GRIP backups.""" diff --git a/src/backup/options.py b/src/backup/options.py index a59bdbc..bc6df9b 100644 --- a/src/backup/options.py +++ b/src/backup/options.py @@ -2,6 +2,68 @@ import click +# ElasticSearch Flags +def es_options(fn): + options = [ + click.option( + "--host", + "-H", + envvar="ES_HOST", + default="localhost", + show_default=True, + help="ElasticSearch host ($ES_HOST)", + ), + click.option( + "--port", + "-p", + envvar="ES_PORT", + default=9200, + show_default=True, + help="ElasticSearch port ($ES_PORT)", + ), + click.option( + "--user", + "-u", + envvar="ES_USER", + default="elastic", + show_default=True, + help="ElasticSearch username ($ES_USER)", + ), + click.option( + "--password", + "-P", + envvar="ES_PASSWORD", + help="ElasticSearch password ($ES_PASSWORD)", + ), + # click.option( + # "--repo", + # "-r", + # envvar="ES_REPO", + # default="backup_repo", + # show_default=True, + # help="ElasticSearch snapshot repository name ($ES_REPO)", + # ), + # click.option( + # "--bucket", + # "-b", + # envvar="ES_BUCKET", + # default="backup_bucket", + # show_default=True, + # help="S3 bucket name for ElasticSearch backups ($ES_BUCKET)", + # ), + # click.option( + # "--endpoint", + # "-e", + # envvar="ES_ENDPOINT", + # default="https://s3.amazonaws.com", + # show_default=True, + # help="S3 endpoint URL for ElasticSearch backups ($ES_ENDPOINT)", + # ), + ] + for option in reversed(options): + fn = option(fn) + return fn + # GRIP Flags def grip_options(fn): options = [ From a7d2e676905f871dab6620dd457371c2b8cc7bdf Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 21 Oct 2025 13:17:03 -0700 Subject: [PATCH 21/30] feat: Move CLI functions to respective modules --- src/backup/elasticsearch/cli.py | 129 ++++++++++++++ src/backup/grip/cli.py | 71 ++++++++ src/backup/grip/restore.py | 120 +++++++++++++ src/backup/main.py | 307 ++------------------------------ src/backup/postgres/cli.py | 73 ++++++++ src/backup/s3/cli.py | 55 ++++++ 6 files changed, 460 insertions(+), 295 deletions(-) create mode 100644 src/backup/elasticsearch/cli.py create mode 100644 src/backup/grip/cli.py create mode 100644 src/backup/grip/restore.py create mode 100644 src/backup/postgres/cli.py create mode 100644 src/backup/s3/cli.py diff --git a/src/backup/elasticsearch/cli.py b/src/backup/elasticsearch/cli.py new file mode 100644 index 0000000..f2ed67b --- /dev/null +++ b/src/backup/elasticsearch/cli.py @@ -0,0 +1,129 @@ +from backup.elasticsearch import ( + ESConfig, + _getIndices, + _dump as _esDump, + _restore as _esRestore, + _getRepos, + _initRepo, +) +from backup.options import ( + dir_options, + es_options, + s3_options, +) +import click +import logging + + +@click.group() +def es(): + """Commands for ElasticSearch backups.""" + pass + + +@es.command() +@es_options +def ls(host: str, port: int, user: str, password: str): + """list indices""" + esConfig = ESConfig(host=host, port=port, user=user, password=password) + + indices = _getIndices(esConfig) + if not indices: + logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") + return + + # List indices + for index in indices: + click.echo(index) + + +@es.command(name="ls-repo") # New command for listing repositories +@es_options +def listRepos(host: str, port: int, user: str, password: str): + """list snapshot repositories""" + esConfig = ESConfig(host=host, port=port, user=user, password=password) + + repos = _getRepos(esConfig) + if not repos: + logging.warning( + f"No snapshot repositories found at {esConfig.host}:{esConfig.port}" + ) + return + + # List repositories + for repo in repos: + click.echo(repo) + + +@es.command(name="init-repo") # New command for initializing a repository +@es_options +@s3_options +@click.option( + "--repo-name", + "-r", + required=True, + help="Name of the Elasticsearch snapshot repository to initialize.", +) +def initRepo( + host: str, + port: int, + user: str, + password: str, + repo_name: str, + endpoint: str, + bucket: str, + key: str, + secret: str, +): + """initialize a snapshot repository""" + # Create ElasticSearchConfig including S3 endpoint and bucket for repository creation + esConfig = ESConfig( + host=host, + port=port, + user=user, + password=password, + repo=repo_name, + endpoint=endpoint, + bucket=bucket, + ) + + success = _initRepo(esConfig) + if success: + click.echo(f"Repository '{repo_name}' initialized successfully.") + else: + logging.error(f"Failed to initialize repository '{repo_name}'.") + + +@es.command() +@es_options +def backup(host: str, port: int, user: str, password: str): + """elasticsearch ➜ local""" + + esConfig = ESConfig(host=host, port=port, user=user, password=password) + indices = _getIndices(esConfig) + if not indices: + logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") + return + + for index in indices: + snapshot = _esDump(esConfig, index) + logging.debug(f"Dumped index '{index}' to '{snapshot}'") + + +@es.command() +@es_options +@dir_options +def restore(host: str, port: int, user: str, password: str, snapshot: str): + """local ➜ elasticsearch""" + esConfig = ESConfig(host=host, port=port, user=user, password=password) + + indices = _getIndices(esConfig) + if not indices: + logging.warning( + f"No indices found to restore at {esConfig.host}:{esConfig.port}." + ) + return + + # Restore indices + for index in indices: + _ = _esRestore(esConfig, index, snapshot) diff --git a/src/backup/grip/cli.py b/src/backup/grip/cli.py new file mode 100644 index 0000000..8f046d1 --- /dev/null +++ b/src/backup/grip/cli.py @@ -0,0 +1,71 @@ +from backup.grip import ( + GripConfig, + _getEdges, + _getVertices, + _dump as _gripDump, + _restore as _gripRestore, +) +from backup.options import ( + dir_options, + grip_options, +) +from pathlib import Path +import click +import logging +import json + + +@click.group() +def grip(): + """Commands for GRIP backups.""" + pass + + +@grip.command() +@grip_options +def ls(host: str, port: int, graph: str, vertex: bool, edge: bool): + """list GRIP vertices and/or edges""" + conf = GripConfig(host=host, port=port) + + if vertex: + logging.debug( + f"Listing vertices from GRIP graph '{graph}' at {conf.host}:{conf.port}" + ) + for v in _getVertices(conf, graph): + click.echo(json.dumps(v, indent=2)) + + if edge: + logging.debug( + f"Listing edges from GRIP graph '{graph}' at {conf.host}:{conf.port}" + ) + for e in _getEdges(conf, graph): + click.echo(json.dumps(e, indent=2)) + + +@grip.command() +@grip_options +@dir_options +def backup(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path): + """grip ➜ local""" + conf = GripConfig(host=host, port=port) + + # Set timestamp + dir.mkdir(parents=True, exist_ok=True) + + logging.debug(f"Backing up GRIP graph '{graph}' to directory '{dir}'") + _gripDump(conf, graph, vertex, edge, dir) + + # TODO: Better way to handle GRIP graph schemas? + schema = f"{graph}__schema__" + logging.debug(f"Backing up GRIP graph '{schema}' to directory '{dir}'") + _gripDump(conf, schema, vertex, edge, dir) + + +@grip.command() +@grip_options +@dir_options +def restore(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path): + """local ➜ grip""" + conf = GripConfig(host=host, port=port) + + _ = _gripRestore(conf, graph, dir) diff --git a/src/backup/grip/restore.py b/src/backup/grip/restore.py new file mode 100644 index 0000000..60744f3 --- /dev/null +++ b/src/backup/grip/restore.py @@ -0,0 +1,120 @@ +# bulk_es_load.py + +import os +import sys +import logging + +from aced_submission.meta_flat_load import delete as meta_flat_delete +from aced_submission.grip_load import get_project_data +from opensearchpy import OpenSearchException + +# Set up logging to see output +logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) +logging.getLogger().setLevel(logging.INFO) + +# --- START REQUIRED NEW HELPER FUNCTIONS --- + +def get_all_projects_to_load(): + """ + In a real scenario, this function would query Gen3 Fence/Arborist + or Indexd to get a full list of all active projects in the data commons. + + For your local test environment, hardcode the projects you know exist + and were restored from Production. + """ + # Based on your PostgreSQL DB names (which align with Production projects) + # The common Gen3 project format is PROGRAM-PROJECT + return [ + "CALYPR-arborist_cbds", + "CALYPR-fence_cbds", + "CALYPR-requestor_cbds", + "CALYPR-indexd_cbds", + "CALYPR-wts_cbds", + # ... add any other projects as needed + ] + +def bulk_load_project(project_id: str): + """ + Re-implements the core ES loading logic from _load_all for a single project. + """ + # For a bulk load, assume we are not dealing with local files (no FHIR import) + # just data already in GRIP. + + program, project = project_id.split('-', 1) # Assumes 'CALYPR-arborist_cbds' + + output = {'logs': []} # Capture internal logs + + try: + # Step 1: Fetch and flatten the data into the local SQLite DB + work_path = pathlib.Path("work") + db_path = (work_path / f"{project_id}_fhir.db") + db_path.unlink(missing_ok=True) # Ensure a fresh database + + db = LocalFHIRDatabase(db_name=db_path) + + logging.info(f"Fetching data from GRIP for project {project_id}...") + # Note: 'CALIPER' is the GRIP graph name from fhir_import_export.py + db.bulk_insert_data( + resources=get_project_data(_get_grip_service(), "CALIPER", project_id, output, _get_token()) + ) + + # Step 2: Define generators for flattened data (same as original script) + index_generator_dict = { + 'researchsubject': db.flattened_research_subjects, + 'specimen': db.flattened_specimens, + 'file': db.flattened_document_references, + "medicationadministration": db.flattened_medication_administrations, + "groupmember": db.flattened_group_members, + } + + # Step 3: Clear and Reload Elasticsearch/OpenSearch indices + for index in index_generator_dict.keys(): + logging.info(f"Clearing old ES index for {project_id}:{index}") + meta_flat_delete(project_id=project_id, index=index) + + for index, generator in index_generator_dict.items(): + logging.info(f"Loading new data into ES index for {project_id}:{index}") + load_flat(project_id=project_id, index=index, + generator=generator(), + limit=None, elastic_url=DEFAULT_ELASTIC, + output_path=None) + + logging.info(f"SUCCESS: Loaded {project_id} into Elasticsearch.") + + except OpenSearchException as e: + logging.error(f"OpenSearch Error on project {project_id}: {str(e)}") + except Exception as e: + logging.error(f"General Error on project {project_id}: {str(e)}") + +# --- END REQUIRED NEW HELPER FUNCTIONS --- + + +def main_bulk(): + """Main function to run bulk load across all projects.""" + projects_to_load = get_all_projects_to_load() + + if not projects_to_load: + logging.warning("No projects defined in get_all_projects_to_load. Nothing to do.") + return + + # Check for required environment variables used by the helper functions + if not os.environ.get('GRIP_SERVICE_NAME') or not os.environ.get('ACCESS_TOKEN'): + logging.error("Missing required environment variables (GRIP_SERVICE_NAME or ACCESS_TOKEN).") + logging.error("Please set GRIP_SERVICE_NAME and ACCESS_TOKEN (a Gen3 token with appropriate permissions).") + return + + # Create the work directory if it doesn't exist (it is ignored by git) + pathlib.Path("work").mkdir(parents=True, exist_ok=True) + + logging.info(f"Starting bulk Elasticsearch load for {len(projects_to_load)} projects.") + + for project_id in projects_to_load: + bulk_load_project(project_id) + + logging.info("Bulk Elasticsearch load process complete.") + + +if __name__ == '__main__': + # Ensure you are running this script from the same directory as fhir_import_export.py + # to maintain imports and file paths. + main_bulk() \ No newline at end of file diff --git a/src/backup/main.py b/src/backup/main.py index 10ee0b2..3bb0812 100644 --- a/src/backup/main.py +++ b/src/backup/main.py @@ -1,44 +1,14 @@ -from backup.elasticsearch import ( - ESConfig, - _getIndices, - _dump as _esDump, - _restore as _esRestore, - _getRepos, - _initRepo, -) -from backup.grip import ( - GripConfig, - _getEdges, - _getVertices, - _dump as _gripDump, - _restore as _gripRestore, -) -from backup.postgres import ( - PGConfig, - _getDbs, - _dump as _pgDump, - _restore as _pgRestore, -) -from backup.s3 import ( - S3Config, - _download, - _upload, -) -from backup.options import ( - dir_options, - es_options, - grip_options, - pg_options, - s3_options, -) from click_aliases import ClickAliasedGroup -from datetime import datetime from elasticsearch.exceptions import ElasticsearchWarning -from pathlib import Path import click import logging import warnings -import json + +# Import command groups from subpackages +from .elasticsearch.cli import es as es_command +from .grip.cli import grip as grip_command +from .postgres.cli import pg as pg_command +from .s3.cli import s3 as s3_command @click.group(cls=ClickAliasedGroup) @@ -70,264 +40,11 @@ def cli(verbose: bool): warnings.simplefilter("ignore", ElasticsearchWarning) +# register subcommands +cli.add_command(es_command) +cli.add_command(grip_command) +cli.add_command(pg_command) +cli.add_command(s3_command) + if __name__ == "__main__": cli() - - - -@cli.group(aliases=["es"]) -def es(): - """Commands for ElasticSearch backups.""" - pass - - -@es.command(name="ls") -@es_options -def listIndices(host: str, port: int, user: str, password: str): - """list indices""" - esConfig = ESConfig(host=host, port=port, user=user, password=password) - - indices = _getIndices(esConfig) - if not indices: - logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") - return - - # List indices - for index in indices: - click.echo(index) - - -@es.command(name="ls-repo") # New command for listing repositories -@es_options -def listRepos(host: str, port: int, user: str, password: str): - """list snapshot repositories""" - esConfig = ESConfig(host=host, port=port, user=user, password=password) - - repos = _getRepos(esConfig) - if not repos: - logging.warning( - f"No snapshot repositories found at {esConfig.host}:{esConfig.port}" - ) - return - - # List repositories - for repo in repos: - click.echo(repo) - - -@es.command(name="init-repo") # New command for initializing a repository -@es_options -@s3_options -@click.option( - "--repo-name", - "-r", - required=True, - help="Name of the Elasticsearch snapshot repository to initialize.", -) -def initRepo( - host: str, - port: int, - user: str, - password: str, - repo_name: str, - endpoint: str, - bucket: str, - key: str, - secret: str, -): - """initialize a snapshot repository""" - # Create ElasticSearchConfig including S3 endpoint and bucket for repository creation - esConfig = ESConfig( - host=host, - port=port, - user=user, - password=password, - repo=repo_name, - endpoint=endpoint, - bucket=bucket, - ) - - success = _initRepo(esConfig) - if success: - click.echo(f"Repository '{repo_name}' initialized successfully.") - else: - logging.error(f"Failed to initialize repository '{repo_name}'.") - - -@es.command(name="backup") -@es_options -def backup_es(host: str, port: int, user: str, password: str): - """elasticsearch ➜ local""" - - esConfig = ESConfig(host=host, port=port, user=user, password=password) - indices = _getIndices(esConfig) - if not indices: - logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") - return - - for index in indices: - snapshot = _esDump(esConfig, index) - logging.debug(f"Dumped index '{index}' to '{snapshot}'") - - -@es.command(name="restore") -@es_options -@dir_options -def restore_es(host: str, port: int, user: str, password: str, snapshot: str): - """local ➜ elasticsearch""" - esConfig = ESConfig(host=host, port=port, user=user, password=password) - - indices = _getIndices(esConfig) - if not indices: - logging.warning( - f"No indices found to restore at {esConfig.host}:{esConfig.port}." - ) - return - - # Restore indices - for index in indices: - _ = _esRestore(esConfig, index, snapshot) - - -@cli.group(aliases=["gp"]) -def grip(): - """Commands for GRIP backups.""" - pass - - -@grip.command(name="ls") -@grip_options -def list_grip(host: str, port: int, graph: str, vertex: bool, edge: bool): - """list GRIP vertices and/or edges""" - conf = GripConfig(host=host, port=port) - - if vertex: - logging.debug( - f"Listing vertices from GRIP graph '{graph}' at {conf.host}:{conf.port}" - ) - for v in _getVertices(conf, graph): - click.echo(json.dumps(v, indent=2)) - - if edge: - logging.debug( - f"Listing edges from GRIP graph '{graph}' at {conf.host}:{conf.port}" - ) - for e in _getEdges(conf, graph): - click.echo(json.dumps(e, indent=2)) - - -@grip.command(name="backup") -@grip_options -@dir_options -def backup_grip(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path): - """grip ➜ local""" - conf = GripConfig(host=host, port=port) - - # Set timestamp - dir.mkdir(parents=True, exist_ok=True) - - logging.debug(f"Backing up GRIP graph '{graph}' to directory '{dir}'") - _gripDump(conf, graph, vertex, edge, dir) - - # TODO: Better way to handle GRIP graph schemas? - schema = f"{graph}__schema__" - logging.debug(f"Backing up GRIP graph '{schema}' to directory '{dir}'") - _gripDump(conf, schema, vertex, edge, dir) - - -@grip.command(name="restore") -@grip_options -@dir_options -def restore_grip(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path): - """local ➜ grip""" - conf = GripConfig(host=host, port=port) - - _ = _gripRestore(conf, graph, dir) - - -@cli.group(aliases=["pg"]) -def pg(): - """Commands for Postgres backups.""" - pass - - -@pg.command(name="ls") -@pg_options -def listDbs(host: str, port: int, user: str): - """list databases""" - conf = PGConfig(host=host, port=port, user=user) - - dbs = _getDbs(conf) - if not dbs: - logging.warning(f"No databases found at {conf.host}:{conf.port}.") - return - - # List databases - for database in dbs: - click.echo(database) - - -@pg.command(name="dump") -@pg_options -@dir_options -def dump_postgres(host: str, port: int, user: str, dir: Path): - """postgres ➜ local""" - conf = PGConfig(host=host, port=port, user=user) - - # Dump directory - dir.mkdir(parents=True, exist_ok=True) - - dbs = _getDbs(conf) - if not dbs: - logging.warning(f"No databases found to dump at {conf.host}:{conf.port}.") - return - - # Dump databases - for database in dbs: - dump = _pgDump(conf, database, dir) - logging.debug(f"Dumped {database} to {dump}") - - -@pg.command(name="restore") -@pg_options -@dir_options -def restore_postgres(host: str, port: int, user: str, dir: Path): - """local ➜ postgres""" - conf = PGConfig(host=host, port=port, user=user) - - dbs = _getDbs(conf) - if not dbs: - logging.warning(f"No databases found to restore at {conf.host}:{conf.port}.") - return - - # Restore databases - for database in dbs: - _ = _pgRestore(conf, database, dir) - - -@cli.group() -def s3(): - """Commands for S3.""" - pass - - -@s3.command() -@s3_options -@dir_options -def download(endpoint: str, bucket: str, key: str, secret: str, dir: Path): - """s3 ➜ local""" - conf = S3Config(endpoint=endpoint, bucket=bucket, key=key, secret=secret) - - # Download from S3 - _ = _download(conf, dir) - - -@s3.command() -@s3_options -@dir_options -def upload(endpoint: str, bucket: str, key: str, secret: str, dir: Path): - """local ➜ s3""" - s3 = S3Config(endpoint=endpoint, bucket=bucket, key=key, secret=secret) - - # Upload to S3 - _ = _upload(s3, dir) diff --git a/src/backup/postgres/cli.py b/src/backup/postgres/cli.py new file mode 100644 index 0000000..3277b70 --- /dev/null +++ b/src/backup/postgres/cli.py @@ -0,0 +1,73 @@ +from backup.postgres import ( + PGConfig, + _getDbs, + _dump as _pgDump, + _restore as _pgRestore, +) +from backup.options import ( + dir_options, + pg_options, +) +import click +import logging +from pathlib import Path + + +@click.group() +def pg(): + """Postgres-related commands (moved from main.py).""" + pass + + +@pg.command() +@pg_options +def ls(host: str, port: int, user: str): + """list databases""" + conf = PGConfig(host=host, port=port, user=user) + + dbs = _getDbs(conf) + if not dbs: + logging.warning(f"No databases found at {conf.host}:{conf.port}.") + return + + # List databases + for database in dbs: + click.echo(database) + + +@pg.command() +@pg_options +@dir_options +def dump(host: str, port: int, user: str, dir: Path): + """postgres ➜ local""" + conf = PGConfig(host=host, port=port, user=user) + + # Dump directory + dir.mkdir(parents=True, exist_ok=True) + + dbs = _getDbs(conf) + if not dbs: + logging.warning(f"No databases found to dump at {conf.host}:{conf.port}.") + return + + # Dump databases + for database in dbs: + dump = _pgDump(conf, database, dir) + logging.debug(f"Dumped {database} to {dump}") + + +@pg.command() +@pg_options +@dir_options +def restore(host: str, port: int, user: str, dir: Path): + """local ➜ postgres""" + conf = PGConfig(host=host, port=port, user=user) + + dbs = _getDbs(conf) + if not dbs: + logging.warning(f"No databases found to restore at {conf.host}:{conf.port}.") + return + + # Restore databases + for database in dbs: + _ = _pgRestore(conf, database, dir) diff --git a/src/backup/s3/cli.py b/src/backup/s3/cli.py new file mode 100644 index 0000000..54e5726 --- /dev/null +++ b/src/backup/s3/cli.py @@ -0,0 +1,55 @@ +from backup.s3 import ( + S3Config, + _download, + _upload, +) +from backup.options import ( + dir_options, + s3_options, +) +import click +from pathlib import Path + + +@click.group() +def s3(): + """Commands for S3.""" + pass + + +@s3.command() +@s3_options +def ls(endpoint: str, bucket: str, key: str, secret: str): + """list S3 bucket contents""" + conf = S3Config(endpoint=endpoint, bucket=bucket, key=key, secret=secret) + + objects = _getObjects(conf) + if not objects: + click.echo(f"No objects found in bucket '{bucket}'.") + return + + # List objects + for obj in objects: + click.echo(obj) + + +@s3.command() +@s3_options +@dir_options +def download(endpoint: str, bucket: str, key: str, secret: str, dir: Path): + """s3 ➜ local""" + conf = S3Config(endpoint=endpoint, bucket=bucket, key=key, secret=secret) + + # Download from S3 + _ = _download(conf, dir) + + +@s3.command() +@s3_options +@dir_options +def upload(endpoint: str, bucket: str, key: str, secret: str, dir: Path): + """local ➜ s3""" + s3 = S3Config(endpoint=endpoint, bucket=bucket, key=key, secret=secret) + + # Upload to S3 + _ = _upload(s3, dir) From de54c802862b20986a9e49b4c854f6f643307542 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 21 Oct 2025 15:07:14 -0700 Subject: [PATCH 22/30] chore: Move CLI options to respective subcommand modules --- src/backup/elasticsearch/cli.py | 67 ++++++++++++- src/backup/grip/__init__.py | 17 ++++ src/backup/grip/cli.py | 71 ++++++++++---- src/backup/grip/restore.py | 120 ----------------------- src/backup/options.py | 164 +------------------------------- src/backup/postgres/cli.py | 34 ++++++- src/backup/s3/cli.py | 40 +++++--- 7 files changed, 200 insertions(+), 313 deletions(-) delete mode 100644 src/backup/grip/restore.py diff --git a/src/backup/elasticsearch/cli.py b/src/backup/elasticsearch/cli.py index f2ed67b..5a733cd 100644 --- a/src/backup/elasticsearch/cli.py +++ b/src/backup/elasticsearch/cli.py @@ -8,13 +8,78 @@ ) from backup.options import ( dir_options, - es_options, +) +from backup.s3.cli import ( s3_options, + S3Config, ) import click import logging +# ElasticSearch Flags +def es_options(fn): + options = [ + click.option( + "--host", + "-H", + envvar="ES_HOST", + default="localhost", + show_default=True, + help="ElasticSearch host ($ES_HOST)", + ), + click.option( + "--port", + "-p", + envvar="ES_PORT", + default=9200, + show_default=True, + help="ElasticSearch port ($ES_PORT)", + ), + click.option( + "--user", + "-u", + envvar="ES_USER", + default="elastic", + show_default=True, + help="ElasticSearch username ($ES_USER)", + ), + click.option( + "--password", + "-P", + envvar="ES_PASSWORD", + help="ElasticSearch password ($ES_PASSWORD)", + ), + # click.option( + # "--repo", + # "-r", + # envvar="ES_REPO", + # default="backup_repo", + # show_default=True, + # help="ElasticSearch snapshot repository name ($ES_REPO)", + # ), + # click.option( + # "--bucket", + # "-b", + # envvar="ES_BUCKET", + # default="backup_bucket", + # show_default=True, + # help="S3 bucket name for ElasticSearch backups ($ES_BUCKET)", + # ), + # click.option( + # "--endpoint", + # "-e", + # envvar="ES_ENDPOINT", + # default="https://s3.amazonaws.com", + # show_default=True, + # help="S3 endpoint URL for ElasticSearch backups ($ES_ENDPOINT)", + # ), + ] + for option in reversed(options): + fn = option(fn) + return fn + + @click.group() def es(): """Commands for ElasticSearch backups.""" diff --git a/src/backup/grip/__init__.py b/src/backup/grip/__init__.py index 1fc24a8..7a29782 100644 --- a/src/backup/grip/__init__.py +++ b/src/backup/grip/__init__.py @@ -32,6 +32,23 @@ class GripConfig: port: int +def _getGraphs(grip: GripConfig) -> list[str]: + """ + Utility function to connect to Grip and list all graphs. + """ + + # Connect to Grip + c = _connect(grip) + + # List Graphs + graphs = [] + + for g in c.listGraphs(): + graphs.append(g) + + return graphs + + def _getEdges(grip: GripConfig, graph: str) -> list[str]: """ Utility function to connect to Grip and list all edges. diff --git a/src/backup/grip/cli.py b/src/backup/grip/cli.py index 8f046d1..718bf63 100644 --- a/src/backup/grip/cli.py +++ b/src/backup/grip/cli.py @@ -1,5 +1,6 @@ from backup.grip import ( GripConfig, + _getGraphs, _getEdges, _getVertices, _dump as _gripDump, @@ -7,7 +8,6 @@ ) from backup.options import ( dir_options, - grip_options, ) from pathlib import Path import click @@ -15,6 +15,56 @@ import json +# GRIP Flags +def grip_options(fn): + options = [ + click.option( + "--host", + "-H", + envvar="GRIP_HOST", + default="localhost", + show_default=True, + help="GRIP host ($GRIPHOST)", + ), + click.option( + "--port", + "-p", + envvar="GRIP_PORT", + default=8201, + show_default=True, + help="GRIP port ($GRIP_PORT)", + ), + ] + for option in reversed(options): + fn = option(fn) + return fn + +# GRIP Graph Flags +def grip_graph_options(fn): + options = [ + click.option( + "--edge", + "--edges", + "-e", + is_flag=True, + default=False, + help="Output GRIP edges.", + ), + click.option("--graph", "-g", default="CALYPR", help="Name of the GRIP graph."), + click.option( + "--vertex", + "--vertices", + "-v", + is_flag=True, + default=False, + help="Output GRIP vertices.", + ), + ] + for option in reversed(options): + fn = option(fn) + return fn + + @click.group() def grip(): """Commands for GRIP backups.""" @@ -23,23 +73,12 @@ def grip(): @grip.command() @grip_options -def ls(host: str, port: int, graph: str, vertex: bool, edge: bool): +def ls(host: str, port: int): """list GRIP vertices and/or edges""" conf = GripConfig(host=host, port=port) - if vertex: - logging.debug( - f"Listing vertices from GRIP graph '{graph}' at {conf.host}:{conf.port}" - ) - for v in _getVertices(conf, graph): - click.echo(json.dumps(v, indent=2)) - - if edge: - logging.debug( - f"Listing edges from GRIP graph '{graph}' at {conf.host}:{conf.port}" - ) - for e in _getEdges(conf, graph): - click.echo(json.dumps(e, indent=2)) + for v in _getGraphs(conf): + click.echo(json.dumps(v, indent=2)) @grip.command() @@ -64,7 +103,7 @@ def backup(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path @grip.command() @grip_options @dir_options -def restore(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path): +def restore(host: str, port: int, graph: str, dir: Path): """local ➜ grip""" conf = GripConfig(host=host, port=port) diff --git a/src/backup/grip/restore.py b/src/backup/grip/restore.py deleted file mode 100644 index 60744f3..0000000 --- a/src/backup/grip/restore.py +++ /dev/null @@ -1,120 +0,0 @@ -# bulk_es_load.py - -import os -import sys -import logging - -from aced_submission.meta_flat_load import delete as meta_flat_delete -from aced_submission.grip_load import get_project_data -from opensearchpy import OpenSearchException - -# Set up logging to see output -logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) -logging.getLogger().setLevel(logging.INFO) - -# --- START REQUIRED NEW HELPER FUNCTIONS --- - -def get_all_projects_to_load(): - """ - In a real scenario, this function would query Gen3 Fence/Arborist - or Indexd to get a full list of all active projects in the data commons. - - For your local test environment, hardcode the projects you know exist - and were restored from Production. - """ - # Based on your PostgreSQL DB names (which align with Production projects) - # The common Gen3 project format is PROGRAM-PROJECT - return [ - "CALYPR-arborist_cbds", - "CALYPR-fence_cbds", - "CALYPR-requestor_cbds", - "CALYPR-indexd_cbds", - "CALYPR-wts_cbds", - # ... add any other projects as needed - ] - -def bulk_load_project(project_id: str): - """ - Re-implements the core ES loading logic from _load_all for a single project. - """ - # For a bulk load, assume we are not dealing with local files (no FHIR import) - # just data already in GRIP. - - program, project = project_id.split('-', 1) # Assumes 'CALYPR-arborist_cbds' - - output = {'logs': []} # Capture internal logs - - try: - # Step 1: Fetch and flatten the data into the local SQLite DB - work_path = pathlib.Path("work") - db_path = (work_path / f"{project_id}_fhir.db") - db_path.unlink(missing_ok=True) # Ensure a fresh database - - db = LocalFHIRDatabase(db_name=db_path) - - logging.info(f"Fetching data from GRIP for project {project_id}...") - # Note: 'CALIPER' is the GRIP graph name from fhir_import_export.py - db.bulk_insert_data( - resources=get_project_data(_get_grip_service(), "CALIPER", project_id, output, _get_token()) - ) - - # Step 2: Define generators for flattened data (same as original script) - index_generator_dict = { - 'researchsubject': db.flattened_research_subjects, - 'specimen': db.flattened_specimens, - 'file': db.flattened_document_references, - "medicationadministration": db.flattened_medication_administrations, - "groupmember": db.flattened_group_members, - } - - # Step 3: Clear and Reload Elasticsearch/OpenSearch indices - for index in index_generator_dict.keys(): - logging.info(f"Clearing old ES index for {project_id}:{index}") - meta_flat_delete(project_id=project_id, index=index) - - for index, generator in index_generator_dict.items(): - logging.info(f"Loading new data into ES index for {project_id}:{index}") - load_flat(project_id=project_id, index=index, - generator=generator(), - limit=None, elastic_url=DEFAULT_ELASTIC, - output_path=None) - - logging.info(f"SUCCESS: Loaded {project_id} into Elasticsearch.") - - except OpenSearchException as e: - logging.error(f"OpenSearch Error on project {project_id}: {str(e)}") - except Exception as e: - logging.error(f"General Error on project {project_id}: {str(e)}") - -# --- END REQUIRED NEW HELPER FUNCTIONS --- - - -def main_bulk(): - """Main function to run bulk load across all projects.""" - projects_to_load = get_all_projects_to_load() - - if not projects_to_load: - logging.warning("No projects defined in get_all_projects_to_load. Nothing to do.") - return - - # Check for required environment variables used by the helper functions - if not os.environ.get('GRIP_SERVICE_NAME') or not os.environ.get('ACCESS_TOKEN'): - logging.error("Missing required environment variables (GRIP_SERVICE_NAME or ACCESS_TOKEN).") - logging.error("Please set GRIP_SERVICE_NAME and ACCESS_TOKEN (a Gen3 token with appropriate permissions).") - return - - # Create the work directory if it doesn't exist (it is ignored by git) - pathlib.Path("work").mkdir(parents=True, exist_ok=True) - - logging.info(f"Starting bulk Elasticsearch load for {len(projects_to_load)} projects.") - - for project_id in projects_to_load: - bulk_load_project(project_id) - - logging.info("Bulk Elasticsearch load process complete.") - - -if __name__ == '__main__': - # Ensure you are running this script from the same directory as fhir_import_export.py - # to maintain imports and file paths. - main_bulk() \ No newline at end of file diff --git a/src/backup/options.py b/src/backup/options.py index bc6df9b..718cf6e 100644 --- a/src/backup/options.py +++ b/src/backup/options.py @@ -1,169 +1,7 @@ from pathlib import Path import click - -# ElasticSearch Flags -def es_options(fn): - options = [ - click.option( - "--host", - "-H", - envvar="ES_HOST", - default="localhost", - show_default=True, - help="ElasticSearch host ($ES_HOST)", - ), - click.option( - "--port", - "-p", - envvar="ES_PORT", - default=9200, - show_default=True, - help="ElasticSearch port ($ES_PORT)", - ), - click.option( - "--user", - "-u", - envvar="ES_USER", - default="elastic", - show_default=True, - help="ElasticSearch username ($ES_USER)", - ), - click.option( - "--password", - "-P", - envvar="ES_PASSWORD", - help="ElasticSearch password ($ES_PASSWORD)", - ), - # click.option( - # "--repo", - # "-r", - # envvar="ES_REPO", - # default="backup_repo", - # show_default=True, - # help="ElasticSearch snapshot repository name ($ES_REPO)", - # ), - # click.option( - # "--bucket", - # "-b", - # envvar="ES_BUCKET", - # default="backup_bucket", - # show_default=True, - # help="S3 bucket name for ElasticSearch backups ($ES_BUCKET)", - # ), - # click.option( - # "--endpoint", - # "-e", - # envvar="ES_ENDPOINT", - # default="https://s3.amazonaws.com", - # show_default=True, - # help="S3 endpoint URL for ElasticSearch backups ($ES_ENDPOINT)", - # ), - ] - for option in reversed(options): - fn = option(fn) - return fn - -# GRIP Flags -def grip_options(fn): - options = [ - click.option( - "--edge", - "--edges", - "-e", - is_flag=True, - default=False, - help="Output GRIP edges.", - ), - click.option("--graph", "-g", default="CALYPR", help="Name of the GRIP graph."), - click.option( - "--host", - "-H", - envvar="GRIP_HOST", - default="localhost", - show_default=True, - help="GRIP host ($GRIPHOST)", - ), - click.option( - "--port", - "-p", - envvar="GRIP_PORT", - default=8201, - show_default=True, - help="GRIP port ($GRIP_PORT)", - ), - click.option( - "--vertex", - "--vertices", - "-v", - is_flag=True, - default=False, - help="Output GRIP vertices.", - ), - ] - for option in reversed(options): - fn = option(fn) - return fn - - -# Postgres Flags -def pg_options(fn): - options = [ - click.option( - "--host", - "-H", - envvar="PGHOST", - default="localhost", - show_default=True, - help="Postgres host ($PGHOST)", - ), - click.option( - "--port", - "-p", - envvar="PGPORT", - default=5432, - show_default=True, - help="Postgres port ($PGPORT)", - ), - click.option( - "--user", - "-u", - envvar="PGUSER", - default="postgres", - show_default=True, - help="Postgres username ($PGUSER)", - ), - ] - for option in reversed(options): - fn = option(fn) - return fn - - -# S3 Flags -def s3_options(fn): - options = [ - click.option( - "--endpoint", - "-e", - default="https://s3.amazonaws.com", - show_default=True, - help="S3 endpoint URL", - ), - click.option("--bucket", "-b", required=True, help="S3 bucket name"), - click.option( - "--key", "-k", envvar="ACCESS_KEY", help="S3 access key ID ($ACCESS_KEY)" - ), - click.option( - "--secret", - "-s", - envvar="SECRET_KEY", - help="S3 secret access key ($SECRET_KEY)", - ), - ] - for option in options: - fn = option(fn) - return fn - +# Common CLI Options for all subcommand # Output/intput directory flags dir_options = click.option( diff --git a/src/backup/postgres/cli.py b/src/backup/postgres/cli.py index 3277b70..5b3c4d1 100644 --- a/src/backup/postgres/cli.py +++ b/src/backup/postgres/cli.py @@ -6,13 +6,45 @@ ) from backup.options import ( dir_options, - pg_options, ) import click import logging from pathlib import Path +# Postgres Flags +def pg_options(fn): + options = [ + click.option( + "--host", + "-H", + envvar="PGHOST", + default="localhost", + show_default=True, + help="Postgres host ($PGHOST)", + ), + click.option( + "--port", + "-p", + envvar="PGPORT", + default=5432, + show_default=True, + help="Postgres port ($PGPORT)", + ), + click.option( + "--user", + "-u", + envvar="PGUSER", + default="postgres", + show_default=True, + help="Postgres username ($PGUSER)", + ), + ] + for option in reversed(options): + fn = option(fn) + return fn + + @click.group() def pg(): """Postgres-related commands (moved from main.py).""" diff --git a/src/backup/s3/cli.py b/src/backup/s3/cli.py index 54e5726..42c35a7 100644 --- a/src/backup/s3/cli.py +++ b/src/backup/s3/cli.py @@ -5,12 +5,37 @@ ) from backup.options import ( dir_options, - s3_options, ) import click from pathlib import Path +# S3 Flags +def s3_options(fn): + options = [ + click.option( + "--endpoint", + "-e", + default="https://s3.amazonaws.com", + show_default=True, + help="S3 endpoint URL", + ), + click.option("--bucket", "-b", required=True, help="S3 bucket name"), + click.option( + "--key", "-k", envvar="ACCESS_KEY", help="S3 access key ID ($ACCESS_KEY)" + ), + click.option( + "--secret", + "-s", + envvar="SECRET_KEY", + help="S3 secret access key ($SECRET_KEY)", + ), + ] + for option in options: + fn = option(fn) + return fn + + @click.group() def s3(): """Commands for S3.""" @@ -20,17 +45,8 @@ def s3(): @s3.command() @s3_options def ls(endpoint: str, bucket: str, key: str, secret: str): - """list S3 bucket contents""" - conf = S3Config(endpoint=endpoint, bucket=bucket, key=key, secret=secret) - - objects = _getObjects(conf) - if not objects: - click.echo(f"No objects found in bucket '{bucket}'.") - return - - # List objects - for obj in objects: - click.echo(obj) + # TODO: Implement + pass @s3.command() From 9fcfe05d0c7526cca882ce9212cbc6cc55644c2a Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Tue, 21 Oct 2025 16:54:00 -0700 Subject: [PATCH 23/30] tests: Add initial module test files --- src/backup/elasticsearch/cli.py | 81 ++++++++++++----------- src/backup/s3/cli.py | 1 - tests/elasticsearch/test_elasticsearch.py | 4 ++ tests/grip/test_grip.py | 4 ++ tests/postgres/test_postgres.py | 4 ++ tests/s3/test_s3.py | 4 ++ tests/test_backups.py | 2 - 7 files changed, 59 insertions(+), 41 deletions(-) create mode 100644 tests/elasticsearch/test_elasticsearch.py create mode 100644 tests/grip/test_grip.py create mode 100644 tests/postgres/test_postgres.py create mode 100644 tests/s3/test_s3.py diff --git a/src/backup/elasticsearch/cli.py b/src/backup/elasticsearch/cli.py index 5a733cd..032d94a 100644 --- a/src/backup/elasticsearch/cli.py +++ b/src/backup/elasticsearch/cli.py @@ -102,7 +102,48 @@ def ls(host: str, port: int, user: str, password: str): click.echo(index) -@es.command(name="ls-repo") # New command for listing repositories +@es.command() +@es_options +def backup(host: str, port: int, user: str, password: str): + """elasticsearch ➜ local""" + + esConfig = ESConfig(host=host, port=port, user=user, password=password) + indices = _getIndices(esConfig) + if not indices: + logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") + return + + for index in indices: + snapshot = _esDump(esConfig, index) + logging.debug(f"Dumped index '{index}' to '{snapshot}'") + + +@es.command() +@es_options +@dir_options +def restore(host: str, port: int, user: str, password: str, snapshot: str): + """local ➜ elasticsearch""" + esConfig = ESConfig(host=host, port=port, user=user, password=password) + + indices = _getIndices(esConfig) + if not indices: + logging.warning( + f"No indices found to restore at {esConfig.host}:{esConfig.port}." + ) + return + + # Restore indices + for index in indices: + _ = _esRestore(esConfig, index, snapshot) + + +@es.group() +def repo(): + """Commands for managing snapshot repositories.""" + pass + + +@repo.command(name="ls") @es_options def listRepos(host: str, port: int, user: str, password: str): """list snapshot repositories""" @@ -120,7 +161,7 @@ def listRepos(host: str, port: int, user: str, password: str): click.echo(repo) -@es.command(name="init-repo") # New command for initializing a repository +@repo.command(name="init") @es_options @s3_options @click.option( @@ -137,8 +178,6 @@ def initRepo( repo_name: str, endpoint: str, bucket: str, - key: str, - secret: str, ): """initialize a snapshot repository""" # Create ElasticSearchConfig including S3 endpoint and bucket for repository creation @@ -158,37 +197,3 @@ def initRepo( else: logging.error(f"Failed to initialize repository '{repo_name}'.") - -@es.command() -@es_options -def backup(host: str, port: int, user: str, password: str): - """elasticsearch ➜ local""" - - esConfig = ESConfig(host=host, port=port, user=user, password=password) - indices = _getIndices(esConfig) - if not indices: - logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") - return - - for index in indices: - snapshot = _esDump(esConfig, index) - logging.debug(f"Dumped index '{index}' to '{snapshot}'") - - -@es.command() -@es_options -@dir_options -def restore(host: str, port: int, user: str, password: str, snapshot: str): - """local ➜ elasticsearch""" - esConfig = ESConfig(host=host, port=port, user=user, password=password) - - indices = _getIndices(esConfig) - if not indices: - logging.warning( - f"No indices found to restore at {esConfig.host}:{esConfig.port}." - ) - return - - # Restore indices - for index in indices: - _ = _esRestore(esConfig, index, snapshot) diff --git a/src/backup/s3/cli.py b/src/backup/s3/cli.py index 42c35a7..0573de1 100644 --- a/src/backup/s3/cli.py +++ b/src/backup/s3/cli.py @@ -16,7 +16,6 @@ def s3_options(fn): click.option( "--endpoint", "-e", - default="https://s3.amazonaws.com", show_default=True, help="S3 endpoint URL", ), diff --git a/tests/elasticsearch/test_elasticsearch.py b/tests/elasticsearch/test_elasticsearch.py new file mode 100644 index 0000000..43fe4d7 --- /dev/null +++ b/tests/elasticsearch/test_elasticsearch.py @@ -0,0 +1,4 @@ +import pytest + +def testExample(): + assert True is not False diff --git a/tests/grip/test_grip.py b/tests/grip/test_grip.py new file mode 100644 index 0000000..43fe4d7 --- /dev/null +++ b/tests/grip/test_grip.py @@ -0,0 +1,4 @@ +import pytest + +def testExample(): + assert True is not False diff --git a/tests/postgres/test_postgres.py b/tests/postgres/test_postgres.py new file mode 100644 index 0000000..43fe4d7 --- /dev/null +++ b/tests/postgres/test_postgres.py @@ -0,0 +1,4 @@ +import pytest + +def testExample(): + assert True is not False diff --git a/tests/s3/test_s3.py b/tests/s3/test_s3.py new file mode 100644 index 0000000..43fe4d7 --- /dev/null +++ b/tests/s3/test_s3.py @@ -0,0 +1,4 @@ +import pytest + +def testExample(): + assert True is not False diff --git a/tests/test_backups.py b/tests/test_backups.py index e1313f2..d1e2573 100644 --- a/tests/test_backups.py +++ b/tests/test_backups.py @@ -20,8 +20,6 @@ ) from backup.options import ( dir_options, - pg_options, - s3_options, ) From 4f2949d65a7ae93ab6596aff2810dd410b6c9ee8 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 24 Oct 2025 11:09:32 -0700 Subject: [PATCH 24/30] feat: Add custom Elasticsearch Docker image with S3 Plugin installed --- Dockerfile.es | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 Dockerfile.es diff --git a/Dockerfile.es b/Dockerfile.es new file mode 100644 index 0000000..6d2eeaa --- /dev/null +++ b/Dockerfile.es @@ -0,0 +1,13 @@ +# Creating a custom Docker image to include the s3 snapshot repository plugin +# Ref: https://github.com/elastic/helm-charts/blob/v7.17.3/elasticsearch/README.md#how-to-install-plugins + +# Manual build command: +# docker buildx build --platform=linux/arm64,linux/amd64 -t quay.io/ohsu-comp-bio/elasticsearch-s3:7.17.3 -f Dockerfile.es . --push +# TODO: Add this to GitHub Actions for automatic builds + +# Start from the official Elasticsearch image you are currently using +FROM docker.elastic.co/elasticsearch/elasticsearch:7.17.3 + +# Install the S3 repository plugin +# The 'install' command runs at build time, and is baked into the final image +RUN bin/elasticsearch-plugin install --batch repository-s3 From abeed2574af3d9f4d40b8419581affede5bcba23 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Fri, 24 Oct 2025 18:22:18 -0700 Subject: [PATCH 25/30] feat: Add working ES snapshot repo initialization --- src/backup/elasticsearch/__init__.py | 72 ++++------ src/backup/elasticsearch/cli.py | 164 ++++------------------ src/backup/elasticsearch/repo/__init__.py | 40 ++++++ src/backup/elasticsearch/repo/cli.py | 89 ++++++++++++ src/backup/grip/__init__.py | 3 + src/backup/grip/cli.py | 52 +++---- src/backup/options.py | 2 +- src/backup/postgres/cli.py | 14 +- src/backup/s3/cli.py | 23 +-- tests/elasticsearch/test_elasticsearch.py | 2 - tests/grip/test_grip.py | 2 - tests/postgres/test_postgres.py | 2 - tests/s3/test_s3.py | 2 - tests/test_backups.py | 5 +- 14 files changed, 231 insertions(+), 241 deletions(-) create mode 100644 src/backup/elasticsearch/repo/__init__.py create mode 100644 src/backup/elasticsearch/repo/cli.py diff --git a/src/backup/elasticsearch/__init__.py b/src/backup/elasticsearch/__init__.py index a4e9bf8..0f60aae 100644 --- a/src/backup/elasticsearch/__init__.py +++ b/src/backup/elasticsearch/__init__.py @@ -1,18 +1,41 @@ from dataclasses import dataclass import logging -from pathlib import Path -import subprocess +import click from elasticsearch import Elasticsearch +# ElasticSearch Flags +def es_flags(fn): + options = [ + click.option( + "--host", + "-H", + envvar="ES_HOST", + default="localhost", + show_default=True, + help="ElasticSearch host ($ES_HOST)", + ), + click.option( + "--port", + "-p", + envvar="ES_PORT", + default=9200, + show_default=True, + help="ElasticSearch port ($ES_PORT)", + ), + ] + for option in reversed(options): + fn = option(fn) + return fn + + @dataclass class ESConfig: """ElasticSearch config""" host: str port: int - user: str - password: str + # Backup repo # https://www.elastic.co/docs/deploy-manage/tools/snapshot-and-restore/self-managed repo: str = "" @@ -50,44 +73,7 @@ def _getIndices(esConfig: ESConfig) -> list[str]: return indices -def _getRepos(esConfig: ESConfig) -> list[str] | None: - """ - Utility function to connect to ElasticSearch and list all snapshot repositories. - """ - elastic = _connect(esConfig) - - try: - repos = elastic.snapshot.get_repository(name="_all") # Get all repositories - repo_names = list(repos.keys()) # Extract just the names - return repo_names - except Exception as err: - logging.error(f"Error listing Elasticsearch repositories: {err}") - return None - - -def _initRepo(esConfig: ESConfig) -> bool: - """ - Initializes a snapshot repository in ElasticSearch. - """ - elastic = _connect(esConfig) - - # Create the repository - elastic.snapshot.create_repository( - name=esConfig.repo, - body={ - "type": "s3", - "endpoint": esConfig.endpoint, - "bucket": esConfig.bucket, - "base_path": esConfig.repo, - "access_key": esConfig.user, - "secret_key": esConfig.password, - }, - ) - logging.info(f"Repository '{esConfig.repo}' created successfully.") - return True - - -def _dump(esConfig: ESConfig, index: str): +def _dump(esConfig: ESConfig, index: str) -> str | None: """ Creates a snapshot of a single index using Elasticsearch Snapshot API. """ @@ -99,7 +85,7 @@ def _dump(esConfig: ESConfig, index: str): response = elastic.snapshot.create( repository=esConfig.repo, - snapshot=f"{index}", + snapshot=index, wait_for_completion=True, ) diff --git a/src/backup/elasticsearch/cli.py b/src/backup/elasticsearch/cli.py index 032d94a..314f88c 100644 --- a/src/backup/elasticsearch/cli.py +++ b/src/backup/elasticsearch/cli.py @@ -3,83 +3,16 @@ _getIndices, _dump as _esDump, _restore as _esRestore, - _getRepos, - _initRepo, ) +from . import ESConfig, es_flags +from .repo.cli import repo, repo_flags from backup.options import ( - dir_options, -) -from backup.s3.cli import ( - s3_options, - S3Config, + dir_flags, ) import click import logging -# ElasticSearch Flags -def es_options(fn): - options = [ - click.option( - "--host", - "-H", - envvar="ES_HOST", - default="localhost", - show_default=True, - help="ElasticSearch host ($ES_HOST)", - ), - click.option( - "--port", - "-p", - envvar="ES_PORT", - default=9200, - show_default=True, - help="ElasticSearch port ($ES_PORT)", - ), - click.option( - "--user", - "-u", - envvar="ES_USER", - default="elastic", - show_default=True, - help="ElasticSearch username ($ES_USER)", - ), - click.option( - "--password", - "-P", - envvar="ES_PASSWORD", - help="ElasticSearch password ($ES_PASSWORD)", - ), - # click.option( - # "--repo", - # "-r", - # envvar="ES_REPO", - # default="backup_repo", - # show_default=True, - # help="ElasticSearch snapshot repository name ($ES_REPO)", - # ), - # click.option( - # "--bucket", - # "-b", - # envvar="ES_BUCKET", - # default="backup_bucket", - # show_default=True, - # help="S3 bucket name for ElasticSearch backups ($ES_BUCKET)", - # ), - # click.option( - # "--endpoint", - # "-e", - # envvar="ES_ENDPOINT", - # default="https://s3.amazonaws.com", - # show_default=True, - # help="S3 endpoint URL for ElasticSearch backups ($ES_ENDPOINT)", - # ), - ] - for option in reversed(options): - fn = option(fn) - return fn - - @click.group() def es(): """Commands for ElasticSearch backups.""" @@ -87,10 +20,10 @@ def es(): @es.command() -@es_options -def ls(host: str, port: int, user: str, password: str): +@es_flags +def ls(host: str, port: int): """list indices""" - esConfig = ESConfig(host=host, port=port, user=user, password=password) + esConfig = ESConfig(host=host, port=port) indices = _getIndices(esConfig) if not indices: @@ -103,27 +36,36 @@ def ls(host: str, port: int, user: str, password: str): @es.command() -@es_options -def backup(host: str, port: int, user: str, password: str): +@es_flags +@repo_flags +def backup(host: str, port: int, repo: str, endpoint: str, bucket: str): """elasticsearch ➜ local""" - esConfig = ESConfig(host=host, port=port, user=user, password=password) + esConfig = ESConfig( + host=host, + port=port, + repo=repo, + endpoint=endpoint, + bucket=bucket, + ) + indices = _getIndices(esConfig) if not indices: logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") return for index in indices: + logging.debug(f"Backing up index '{index}'") snapshot = _esDump(esConfig, index) logging.debug(f"Dumped index '{index}' to '{snapshot}'") @es.command() -@es_options -@dir_options -def restore(host: str, port: int, user: str, password: str, snapshot: str): +@es_flags +@dir_flags +def restore(host: str, port: int, snapshot: str): """local ➜ elasticsearch""" - esConfig = ESConfig(host=host, port=port, user=user, password=password) + esConfig = ESConfig(host=host, port=port) indices = _getIndices(esConfig) if not indices: @@ -137,63 +79,5 @@ def restore(host: str, port: int, user: str, password: str, snapshot: str): _ = _esRestore(esConfig, index, snapshot) -@es.group() -def repo(): - """Commands for managing snapshot repositories.""" - pass - - -@repo.command(name="ls") -@es_options -def listRepos(host: str, port: int, user: str, password: str): - """list snapshot repositories""" - esConfig = ESConfig(host=host, port=port, user=user, password=password) - - repos = _getRepos(esConfig) - if not repos: - logging.warning( - f"No snapshot repositories found at {esConfig.host}:{esConfig.port}" - ) - return - - # List repositories - for repo in repos: - click.echo(repo) - - -@repo.command(name="init") -@es_options -@s3_options -@click.option( - "--repo-name", - "-r", - required=True, - help="Name of the Elasticsearch snapshot repository to initialize.", -) -def initRepo( - host: str, - port: int, - user: str, - password: str, - repo_name: str, - endpoint: str, - bucket: str, -): - """initialize a snapshot repository""" - # Create ElasticSearchConfig including S3 endpoint and bucket for repository creation - esConfig = ESConfig( - host=host, - port=port, - user=user, - password=password, - repo=repo_name, - endpoint=endpoint, - bucket=bucket, - ) - - success = _initRepo(esConfig) - if success: - click.echo(f"Repository '{repo_name}' initialized successfully.") - else: - logging.error(f"Failed to initialize repository '{repo_name}'.") - +# Elasticsearch snapshot repository commands +es.add_command(repo) diff --git a/src/backup/elasticsearch/repo/__init__.py b/src/backup/elasticsearch/repo/__init__.py new file mode 100644 index 0000000..9c7776f --- /dev/null +++ b/src/backup/elasticsearch/repo/__init__.py @@ -0,0 +1,40 @@ +import logging +from backup.elasticsearch import ESConfig, _connect + + +def _getRepos(esConfig: ESConfig) -> list[str] | None: + """ + Utility function to connect to ElasticSearch and list all snapshot repositories. + """ + elastic = _connect(esConfig) + + try: + repos = elastic.snapshot.get_repository(name="_all") # Get all repositories + repo_names = list(repos.keys()) # Extract just the names + return repo_names + except Exception as err: + logging.error(f"Error listing Elasticsearch repositories: {err}") + return None + + +def _initRepo(esConfig: ESConfig) -> bool: + """ + Initializes a snapshot repository in ElasticSearch. + """ + elastic = _connect(esConfig) + + # Create the repository + elastic.snapshot.create_repository( + name=esConfig.repo, + repository={ + "type": "s3", + "settings": { + "bucket": esConfig.bucket, + "base_path": esConfig.repo, + }, + }, + ) + + logging.info(f"Repository '{esConfig.repo}' created successfully.") + + return True diff --git a/src/backup/elasticsearch/repo/cli.py b/src/backup/elasticsearch/repo/cli.py new file mode 100644 index 0000000..e1f0f98 --- /dev/null +++ b/src/backup/elasticsearch/repo/cli.py @@ -0,0 +1,89 @@ +import logging +import click +from .. import ESConfig, es_flags +from ..repo import _getRepos, _initRepo +from backup.s3.cli import s3_flags + + +# ElasticSearch Flags +def repo_flags(fn): + options = [ + click.option( + "--repo", + "-r", + envvar="ES_REPO", + default="backup_repo", + show_default=True, + help="ElasticSearch snapshot repository name ($ES_REPO)", + ), + click.option( + "--bucket", + "-b", + envvar="ES_BUCKET", + default="backup_bucket", + show_default=True, + help="S3 bucket name for ElasticSearch backups ($ES_BUCKET)", + ), + click.option( + "--endpoint", + "-e", + envvar="ES_ENDPOINT", + default="https://s3.amazonaws.com", + show_default=True, + help="S3 endpoint URL for ElasticSearch backups ($ES_ENDPOINT)", + ), + ] + for option in reversed(options): + fn = option(fn) + return fn + + +@click.group() +def repo(): + """Commands for managing snapshot repositories.""" + pass + + +@repo.command(name="ls") +@es_flags +def listRepos(host: str, port: int): + """list snapshot repositories""" + esConfig = ESConfig(host=host, port=port) + + repos = _getRepos(esConfig) + if not repos: + logging.warning( + f"No snapshot repositories found at {esConfig.host}:{esConfig.port}" + ) + return + + # List repositories + for repo in repos: + click.echo(repo) + + +@repo.command(name="init") +@es_flags +@repo_flags +def initRepo( + host: str, + port: int, + repo: str, + endpoint: str, + bucket: str, +): + """initialize a snapshot repository""" + # Create ElasticSearchConfig including S3 endpoint and bucket for repository creation + esConfig = ESConfig( + host=host, + port=port, + repo=repo, + endpoint=endpoint, + bucket=bucket, + ) + + success = _initRepo(esConfig) + if success: + click.echo(f"Repository '{repo}' initialized successfully.") + else: + logging.error(f"Failed to initialize repository '{repo}'.") diff --git a/src/backup/grip/__init__.py b/src/backup/grip/__init__.py index 7a29782..c483e94 100644 --- a/src/backup/grip/__init__.py +++ b/src/backup/grip/__init__.py @@ -128,6 +128,9 @@ def _dump(grip: GripConfig, graph: str, vertex: bool, edge: bool, out: Path) -> def _restore(grip: GripConfig, graph: str, dir: Path): + ## Clean/Delete existing graph + ## GRIP initdb job (templates/post-install) + ## Load conn = _connect(grip) G = conn.graph(graph) diff --git a/src/backup/grip/cli.py b/src/backup/grip/cli.py index 718bf63..ba6665f 100644 --- a/src/backup/grip/cli.py +++ b/src/backup/grip/cli.py @@ -7,7 +7,7 @@ _restore as _gripRestore, ) from backup.options import ( - dir_options, + dir_flags, ) from pathlib import Path import click @@ -16,7 +16,7 @@ # GRIP Flags -def grip_options(fn): +def grip_host_flags(fn): options = [ click.option( "--host", @@ -40,25 +40,26 @@ def grip_options(fn): return fn # GRIP Graph Flags -def grip_graph_options(fn): +def grip_flags(fn): options = [ - click.option( - "--edge", - "--edges", - "-e", - is_flag=True, - default=False, - help="Output GRIP edges.", - ), - click.option("--graph", "-g", default="CALYPR", help="Name of the GRIP graph."), - click.option( - "--vertex", - "--vertices", - "-v", - is_flag=True, - default=False, - help="Output GRIP vertices.", - ), + click.option("--graph", "-g", default="CALYPR", help="Name of the GRIP graph."), + # click.option( + # "--edge", + # "--edges", + # "-e", + # is_flag=True, + # default=False, + # help="Output GRIP edges.", + # ), + # click.option("--graph", "-g", default="CALYPR", help="Name of the GRIP graph."), + # click.option( + # "--vertex", + # "--vertices", + # "-v", + # is_flag=True, + # default=False, + # help="Output GRIP vertices.", + # ), ] for option in reversed(options): fn = option(fn) @@ -72,7 +73,7 @@ def grip(): @grip.command() -@grip_options +@grip_host_flags def ls(host: str, port: int): """list GRIP vertices and/or edges""" conf = GripConfig(host=host, port=port) @@ -82,8 +83,8 @@ def ls(host: str, port: int): @grip.command() -@grip_options -@dir_options +@grip_host_flags +@dir_flags def backup(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path): """grip ➜ local""" conf = GripConfig(host=host, port=port) @@ -101,8 +102,9 @@ def backup(host: str, port: int, graph: str, vertex: bool, edge: bool, dir: Path @grip.command() -@grip_options -@dir_options +@grip_host_flags +@grip_flags +@dir_flags def restore(host: str, port: int, graph: str, dir: Path): """local ➜ grip""" conf = GripConfig(host=host, port=port) diff --git a/src/backup/options.py b/src/backup/options.py index 718cf6e..900a024 100644 --- a/src/backup/options.py +++ b/src/backup/options.py @@ -4,7 +4,7 @@ # Common CLI Options for all subcommand # Output/intput directory flags -dir_options = click.option( +dir_flags = click.option( "--dir", "-d", default=Path("."), diff --git a/src/backup/postgres/cli.py b/src/backup/postgres/cli.py index 5b3c4d1..71371e3 100644 --- a/src/backup/postgres/cli.py +++ b/src/backup/postgres/cli.py @@ -5,7 +5,7 @@ _restore as _pgRestore, ) from backup.options import ( - dir_options, + dir_flags, ) import click import logging @@ -13,7 +13,7 @@ # Postgres Flags -def pg_options(fn): +def pg_flags(fn): options = [ click.option( "--host", @@ -52,7 +52,7 @@ def pg(): @pg.command() -@pg_options +@pg_flags def ls(host: str, port: int, user: str): """list databases""" conf = PGConfig(host=host, port=port, user=user) @@ -68,8 +68,8 @@ def ls(host: str, port: int, user: str): @pg.command() -@pg_options -@dir_options +@pg_flags +@dir_flags def dump(host: str, port: int, user: str, dir: Path): """postgres ➜ local""" conf = PGConfig(host=host, port=port, user=user) @@ -89,8 +89,8 @@ def dump(host: str, port: int, user: str, dir: Path): @pg.command() -@pg_options -@dir_options +@pg_flags +@dir_flags def restore(host: str, port: int, user: str, dir: Path): """local ➜ postgres""" conf = PGConfig(host=host, port=port, user=user) diff --git a/src/backup/s3/cli.py b/src/backup/s3/cli.py index 0573de1..248879a 100644 --- a/src/backup/s3/cli.py +++ b/src/backup/s3/cli.py @@ -4,14 +4,14 @@ _upload, ) from backup.options import ( - dir_options, + dir_flags, ) import click from pathlib import Path # S3 Flags -def s3_options(fn): +def s3_flags(fn): options = [ click.option( "--endpoint", @@ -20,15 +20,6 @@ def s3_options(fn): help="S3 endpoint URL", ), click.option("--bucket", "-b", required=True, help="S3 bucket name"), - click.option( - "--key", "-k", envvar="ACCESS_KEY", help="S3 access key ID ($ACCESS_KEY)" - ), - click.option( - "--secret", - "-s", - envvar="SECRET_KEY", - help="S3 secret access key ($SECRET_KEY)", - ), ] for option in options: fn = option(fn) @@ -42,15 +33,15 @@ def s3(): @s3.command() -@s3_options +@s3_flags def ls(endpoint: str, bucket: str, key: str, secret: str): # TODO: Implement pass @s3.command() -@s3_options -@dir_options +@s3_flags +@dir_flags def download(endpoint: str, bucket: str, key: str, secret: str, dir: Path): """s3 ➜ local""" conf = S3Config(endpoint=endpoint, bucket=bucket, key=key, secret=secret) @@ -60,8 +51,8 @@ def download(endpoint: str, bucket: str, key: str, secret: str, dir: Path): @s3.command() -@s3_options -@dir_options +@s3_flags +@dir_flags def upload(endpoint: str, bucket: str, key: str, secret: str, dir: Path): """local ➜ s3""" s3 = S3Config(endpoint=endpoint, bucket=bucket, key=key, secret=secret) diff --git a/tests/elasticsearch/test_elasticsearch.py b/tests/elasticsearch/test_elasticsearch.py index 43fe4d7..7a337e6 100644 --- a/tests/elasticsearch/test_elasticsearch.py +++ b/tests/elasticsearch/test_elasticsearch.py @@ -1,4 +1,2 @@ -import pytest - def testExample(): assert True is not False diff --git a/tests/grip/test_grip.py b/tests/grip/test_grip.py index 43fe4d7..7a337e6 100644 --- a/tests/grip/test_grip.py +++ b/tests/grip/test_grip.py @@ -1,4 +1,2 @@ -import pytest - def testExample(): assert True is not False diff --git a/tests/postgres/test_postgres.py b/tests/postgres/test_postgres.py index 43fe4d7..7a337e6 100644 --- a/tests/postgres/test_postgres.py +++ b/tests/postgres/test_postgres.py @@ -1,4 +1,2 @@ -import pytest - def testExample(): assert True is not False diff --git a/tests/s3/test_s3.py b/tests/s3/test_s3.py index 43fe4d7..7a337e6 100644 --- a/tests/s3/test_s3.py +++ b/tests/s3/test_s3.py @@ -1,4 +1,2 @@ -import pytest - def testExample(): assert True is not False diff --git a/tests/test_backups.py b/tests/test_backups.py index d1e2573..518dd93 100644 --- a/tests/test_backups.py +++ b/tests/test_backups.py @@ -19,9 +19,12 @@ _upload, ) from backup.options import ( - dir_options, + dir_flags, ) +# TODO: This "end-to-end" test file still needs to be implemented with: +# - Backups +# - Restores def testConnect(testPostgres): """ From 5ae148f66ee0fd8ffed005447b01de445b345648 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Mon, 27 Oct 2025 17:05:21 -0700 Subject: [PATCH 26/30] feat: Add initial support for Elasticsearch snapshots --- entrypoint.sh | 54 +++++++++++++++---- src/backup/{elasticsearch => es}/__init__.py | 49 +++++++++-------- src/backup/{elasticsearch => es}/cli.py | 34 +++++++----- .../{elasticsearch => es}/repo/__init__.py | 22 +++++++- src/backup/{elasticsearch => es}/repo/cli.py | 54 ++++++++++++------- src/backup/main.py | 2 +- 6 files changed, 144 insertions(+), 71 deletions(-) rename src/backup/{elasticsearch => es}/__init__.py (69%) rename src/backup/{elasticsearch => es}/cli.py (65%) rename src/backup/{elasticsearch => es}/repo/__init__.py (67%) rename src/backup/{elasticsearch => es}/repo/cli.py (66%) diff --git a/entrypoint.sh b/entrypoint.sh index 92ff24e..c23a303 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,17 +1,39 @@ #!/bin/bash set -e +# Backup Overview/Structure: +# +# ENDPOINT/BUCKET/TIMESTAMP +# │ +# ├─ elastic +# │ ├─ TODO +# │ └─ TODO +# │ +# ├─ grip +# │ ├─ CALYPR.edges +# │ └─ CALYPR.vertices +# │ +# └─ postgres +# ├─ arborist_local.sql +# ├─ fence_local.sql +# ├─ gecko_local.sql +# ├─ indexd_local.sql +# ├─ metadata_local.sql +# ├─ postgres.sql +# ├─ requestor_local.sql +# └─ wts_local.sql + TIMESTAMP=$(date +"%Y-%m-%dT%H:%M:%S") export DIR="${DIR}/${TIMESTAMP}" -# Postgres Dump +# 1. Postgres Dump bak --debug pg dump \ - --dir "${DIR}" \ - --host "${PGHOST}" \ - --user "${PGUSER}" + --dir "${DIR}" \ + --host "${PGHOST}" \ + --user "${PGUSER}" -# GRIP Backup +# 2. GRIP Backup bak --debug grip backup \ --dir "${DIR}" \ --host "${GRIP_HOST}" \ @@ -19,13 +41,23 @@ bak --debug grip backup \ --vertex \ --edge -# S3 Upload +# 3. S3 Upload bak --debug s3 upload \ - --dir "${DIR}" \ - --endpoint "${ENDPOINT}" \ - --bucket "${BUCKET}" \ - --key "${ACCESS_KEY}" \ - --secret "${SECRET_KEY}" + --dir "${DIR}" \ + --endpoint "${ENDPOINT}" \ + --bucket "${BUCKET}" \ + --key "${ACCESS_KEY}" \ + --secret "${SECRET_KEY}" + +# 4. Elasticsearch Snapshot +# We keep the Elasticsearch backups separate from that of Postgres and GRIP +# to conform to the standard snapshot behavior/structure (e.g. incremental diffs) +# Ref: https://www.elastic.co/docs/deploy-manage/tools/snapshot-and-restore/self-managed +bak --debug es dump \ + --endpoint "${ES_ENDPOINT}" \ + --bucket "${ES_BUCKET}" \ + --repo "${ES_REPO}" \ + --snapshot "${DIR}" \ echo "Backup Complete:" echo "- ENDPOINT: ${ENDPOINT}" diff --git a/src/backup/elasticsearch/__init__.py b/src/backup/es/__init__.py similarity index 69% rename from src/backup/elasticsearch/__init__.py rename to src/backup/es/__init__.py index 0f60aae..bdd3b67 100644 --- a/src/backup/elasticsearch/__init__.py +++ b/src/backup/es/__init__.py @@ -70,59 +70,58 @@ def _getIndices(esConfig: ESConfig) -> list[str]: # Get all indices using the cat.indices() method indices = elastic.cat.indices(h="index").splitlines() + # Remove unused '.geoip_databases' to avoid `400` error during snapshot + # https://www.elastic.co/docs/reference/enrich-processor/geoip-processor + if ".geoip_databases" in indices: + indices.remove(".geoip_databases") + return indices -def _dump(esConfig: ESConfig, index: str) -> str | None: +def _snapshot(esConfig: ESConfig, indices: list[str], snapshot: str) -> str | None: """ - Creates a snapshot of a single index using Elasticsearch Snapshot API. + Creates a snapshot of indices using Elasticsearch Snapshot API. """ elastic = _connect(esConfig) - # Check if index exists before attempting to snapshot - if not elastic.indices.exists(index=index): - logging.warning(f"Index '{index}' not found") - response = elastic.snapshot.create( + # Snapshot repo repository=esConfig.repo, - snapshot=index, + # Timestamp + snapshot=snapshot, + # Indices to backup + indices=indices, + # Block until complete wait_for_completion=True, ) + logging.debug(f"Snapshot response: {response}") + if response["snapshot"]["state"] == "SUCCESS": - return response["snapshot"]["snapshot_id"] + # TODO: Return more useful info here? + return response["snapshot"]["snapshot"] else: - logging.error(f"Snapshot '{index}' error: {response}") + logging.error(f"Snapshot error: {response}") -def _restore(esConfig: ESConfig, index: str, snapshot: str) -> bool: +def _restore(esConfig: ESConfig, indices: list[str], snapshot: str) -> str | None: """ Restores a single index from a snapshot using Elasticsearch Snapshot API. """ elastic = _connect(esConfig) - if elastic is None: - return False - - # Check if the snapshot exists - snapshot_info = elastic.snapshot.get(repository=esConfig.repo, snapshot=snapshot) - if not snapshot_info["snapshots"]: - logging.error(f"Snapshot '{snapshot}' not found in repo '{esConfig.repo}'") - return False - - # Close the index before restoring - if elastic.indices.exists(index=index): - elastic.indices.close(index=index) response = elastic.snapshot.restore( repository=esConfig.repo, snapshot=snapshot, - body={"indices": index}, + indices=indices, wait_for_completion=True, ) + logging.debug(f"Restore response: {response}") + if response["snapshot"]["state"] == "SUCCESS": - return True + # TODO: Return more useful info here? + return response["snapshot"]["snapshot"] else: logging.error(f"Snapshot '{snapshot}' error: {response}") - return False diff --git a/src/backup/elasticsearch/cli.py b/src/backup/es/cli.py similarity index 65% rename from src/backup/elasticsearch/cli.py rename to src/backup/es/cli.py index 314f88c..0be8ff5 100644 --- a/src/backup/elasticsearch/cli.py +++ b/src/backup/es/cli.py @@ -1,11 +1,12 @@ -from backup.elasticsearch import ( +from datetime import datetime +from backup.es import ( ESConfig, _getIndices, - _dump as _esDump, - _restore as _esRestore, + _snapshot, + _restore, ) from . import ESConfig, es_flags -from .repo.cli import repo, repo_flags +from .repo.cli import repo, es_repo_flags from backup.options import ( dir_flags, ) @@ -37,16 +38,21 @@ def ls(host: str, port: int): @es.command() @es_flags -@repo_flags -def backup(host: str, port: int, repo: str, endpoint: str, bucket: str): +@es_repo_flags +@click.option( + "--snapshot", + "-s", + required=True, + default=lambda: datetime.now().strftime('%Y-%m-%d_%H-%M-%S'), + help="Snapshot name (will be created under the repository)", +) +def backup(host: str, port: int, repo: str, snapshot: str): """elasticsearch ➜ local""" esConfig = ESConfig( host=host, port=port, repo=repo, - endpoint=endpoint, - bucket=bucket, ) indices = _getIndices(esConfig) @@ -54,10 +60,12 @@ def backup(host: str, port: int, repo: str, endpoint: str, bucket: str): logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") return - for index in indices: - logging.debug(f"Backing up index '{index}'") - snapshot = _esDump(esConfig, index) - logging.debug(f"Dumped index '{index}' to '{snapshot}'") + logging.debug(f"Backing up indices '{indices}' to snapshot '{snapshot}'") + resp = _snapshot(esConfig, indices, snapshot) + if resp: + logging.info(f"Snapshot created: {resp}") + else: + logging.error("Snapshot creation failed") @es.command() @@ -76,7 +84,7 @@ def restore(host: str, port: int, snapshot: str): # Restore indices for index in indices: - _ = _esRestore(esConfig, index, snapshot) + _ = _restore(esConfig, index, snapshot) # Elasticsearch snapshot repository commands diff --git a/src/backup/elasticsearch/repo/__init__.py b/src/backup/es/repo/__init__.py similarity index 67% rename from src/backup/elasticsearch/repo/__init__.py rename to src/backup/es/repo/__init__.py index 9c7776f..521e68c 100644 --- a/src/backup/elasticsearch/repo/__init__.py +++ b/src/backup/es/repo/__init__.py @@ -1,5 +1,5 @@ import logging -from backup.elasticsearch import ESConfig, _connect +from backup.es import ESConfig, _connect def _getRepos(esConfig: ESConfig) -> list[str] | None: @@ -38,3 +38,23 @@ def _initRepo(esConfig: ESConfig) -> bool: logging.info(f"Repository '{esConfig.repo}' created successfully.") return True + + +def _deleteRepo(esConfig: ESConfig, force: bool) -> bool: + """ + Initializes a snapshot repository in ElasticSearch. + """ + if not force: + logging.error("Deletion of repository requires --force flag.") + return False + + elastic = _connect(esConfig) + + # Create the repository + elastic.snapshot.delete_repository( + name=esConfig.repo, + ) + + logging.info(f"Repository '{esConfig.repo}' deleted successfully.") + + return True diff --git a/src/backup/elasticsearch/repo/cli.py b/src/backup/es/repo/cli.py similarity index 66% rename from src/backup/elasticsearch/repo/cli.py rename to src/backup/es/repo/cli.py index e1f0f98..2a29ce3 100644 --- a/src/backup/elasticsearch/repo/cli.py +++ b/src/backup/es/repo/cli.py @@ -1,37 +1,20 @@ import logging import click from .. import ESConfig, es_flags -from ..repo import _getRepos, _initRepo +from ..repo import _deleteRepo, _getRepos, _initRepo from backup.s3.cli import s3_flags # ElasticSearch Flags -def repo_flags(fn): +def es_repo_flags(fn): options = [ click.option( "--repo", "-r", envvar="ES_REPO", - default="backup_repo", show_default=True, help="ElasticSearch snapshot repository name ($ES_REPO)", ), - click.option( - "--bucket", - "-b", - envvar="ES_BUCKET", - default="backup_bucket", - show_default=True, - help="S3 bucket name for ElasticSearch backups ($ES_BUCKET)", - ), - click.option( - "--endpoint", - "-e", - envvar="ES_ENDPOINT", - default="https://s3.amazonaws.com", - show_default=True, - help="S3 endpoint URL for ElasticSearch backups ($ES_ENDPOINT)", - ), ] for option in reversed(options): fn = option(fn) @@ -64,7 +47,8 @@ def listRepos(host: str, port: int): @repo.command(name="init") @es_flags -@repo_flags +@es_repo_flags +@s3_flags def initRepo( host: str, port: int, @@ -87,3 +71,33 @@ def initRepo( click.echo(f"Repository '{repo}' initialized successfully.") else: logging.error(f"Failed to initialize repository '{repo}'.") + + +@repo.command(name="rm") +@es_flags +@es_repo_flags +@click.option( + "--force", + is_flag=True, + default=False, + help="Force/confirm deletion of the repository.", +) +def deleteRepo( + host: str, + port: int, + repo: str, + force: bool +): + """initialize a snapshot repository""" + # Create ElasticSearchConfig including S3 endpoint and bucket for repository creation + esConfig = ESConfig( + host=host, + port=port, + repo=repo, + ) + + success = _deleteRepo(esConfig, force) + if success: + click.echo(f"Repository '{repo}' deleted successfully.") + else: + logging.error(f"Failed to delete repository '{repo}'.") diff --git a/src/backup/main.py b/src/backup/main.py index 3bb0812..074888f 100644 --- a/src/backup/main.py +++ b/src/backup/main.py @@ -5,7 +5,7 @@ import warnings # Import command groups from subpackages -from .elasticsearch.cli import es as es_command +from .es.cli import es as es_command from .grip.cli import grip as grip_command from .postgres.cli import pg as pg_command from .s3.cli import s3 as s3_command From 645142df8adc105098fb2dbaafbcbab7d829f5f0 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Mon, 27 Oct 2025 19:58:14 -0700 Subject: [PATCH 27/30] feat: Add initial support for Elastic Snapshot Restore --- src/backup/es/__init__.py | 66 +++++++++++++++++++++++++++- src/backup/es/cli.py | 90 +++++++++++++++++++++++++++------------ src/backup/es/repo/cli.py | 23 +++------- 3 files changed, 135 insertions(+), 44 deletions(-) diff --git a/src/backup/es/__init__.py b/src/backup/es/__init__.py index bdd3b67..3da959c 100644 --- a/src/backup/es/__init__.py +++ b/src/backup/es/__init__.py @@ -1,5 +1,6 @@ from dataclasses import dataclass import logging +import sys import click from elasticsearch import Elasticsearch @@ -63,7 +64,7 @@ def _connect(esConfig: ESConfig) -> Elasticsearch: def _getIndices(esConfig: ESConfig) -> list[str]: """ - Utiltity function to connect to ElasticSearch and list all indices. + Utiltity function to list all indices. """ elastic = _connect(esConfig) @@ -78,6 +79,56 @@ def _getIndices(esConfig: ESConfig) -> list[str]: return indices +def _getRepos(esConfig: ESConfig) -> list[str]: + """ + Utiltity function to list all repos + """ + elastic = _connect(esConfig) + + repos = elastic.cat.repositories().splitlines() + + return repos + + +def _getSnapshots(esConfig: ESConfig, repo: str) -> list[str]: + """ + Utiltity function to list all snapshots in a given repository. + """ + elastic = _connect(esConfig) + + snapshots = elastic.snapshot.get( + repository=repo, + snapshot="_all", + )["snapshots"] + + snapshot_names = [snap["snapshot"] for snap in snapshots] + + return snapshot_names + + +def _getSnapshotIndices(esConfig: ESConfig, repo: str, snapshot: str) -> list[str]: + """ + Utiltity function to list all indices in all snapshots in a given repository. + """ + elastic = _connect(esConfig) + + snapshots = elastic.snapshot.get( + repository=repo, + snapshot=snapshot, + )["snapshots"] + + indices = [] + for snap in snapshots: + indices.extend(snap.get("indices", [])) + + # Remove unused '.geoip_databases' to avoid `400` error during snapshot + # https://www.elastic.co/docs/reference/enrich-processor/geoip-processor + if ".geoip_databases" in indices: + indices.remove(".geoip_databases") + + return indices + + def _snapshot(esConfig: ESConfig, indices: list[str], snapshot: str) -> str | None: """ Creates a snapshot of indices using Elasticsearch Snapshot API. @@ -107,9 +158,22 @@ def _snapshot(esConfig: ESConfig, indices: list[str], snapshot: str) -> str | No def _restore(esConfig: ESConfig, indices: list[str], snapshot: str) -> str | None: """ Restores a single index from a snapshot using Elasticsearch Snapshot API. + If the indices do not exist, they will be created before restoring. """ elastic = _connect(esConfig) + # Check if indices exist + existing_indices = _getIndices(esConfig) + + for index in indices: + if index not in existing_indices: + # Create the index if it doesn't exist + logging.info(f"Index '{index}' does not exist. Creating it before restore.") + elastic.indices.create(index=index) + + # Close indices before restore + elastic.indices.close(index=",".join(indices)) + response = elastic.snapshot.restore( repository=esConfig.repo, snapshot=snapshot, diff --git a/src/backup/es/cli.py b/src/backup/es/cli.py index 0be8ff5..7b02879 100644 --- a/src/backup/es/cli.py +++ b/src/backup/es/cli.py @@ -2,14 +2,14 @@ from backup.es import ( ESConfig, _getIndices, + _getRepos, + _getSnapshots, + _getSnapshotIndices, _snapshot, _restore, ) from . import ESConfig, es_flags from .repo.cli import repo, es_repo_flags -from backup.options import ( - dir_flags, -) import click import logging @@ -22,18 +22,54 @@ def es(): @es.command() @es_flags -def ls(host: str, port: int): - """list indices""" +@click.option( + "--repos", + is_flag=True, + default=False, + help="List all snapshot repositories instead of indices", +) +@click.option( + "--repo", + default=None, + help="Specify a repository to list its indices", +) +@click.option( + "--snapshot", + default=None, + help="Specify a snapshot to list its indices", +) +# TODO: Fix spaghetti code +def ls(host: str, port: int, repos: bool, repo: str, snapshot: str): + """List live indices, snapshot repositories, indices of a specific repository, or indices in a snapshot""" esConfig = ESConfig(host=host, port=port) - indices = _getIndices(esConfig) - if not indices: - logging.warning(f"No indices found at {esConfig.host}:{esConfig.port}") - return + # List repos in current cluster + if repos: + all_repos = _getRepos(esConfig) + + for repository in all_repos: + click.echo(repository) + + # List indices in given snapshot + elif repo and snapshot: + indices = _getSnapshotIndices(esConfig, repo, snapshot) - # List indices - for index in indices: - click.echo(index) + for index in indices: + click.echo(index) + + # List snapshots in given repo + elif repo: + snapshots = _getSnapshots(esConfig, repo) + + for snapshot in snapshots: + click.echo(snapshot) + + # List indices in current cluster + else: + indices = _getIndices(esConfig) + + for index in indices: + click.echo(index) @es.command() @@ -43,17 +79,12 @@ def ls(host: str, port: int): "--snapshot", "-s", required=True, - default=lambda: datetime.now().strftime('%Y-%m-%d_%H-%M-%S'), + default=lambda: datetime.now().strftime("%Y-%m-%d_%H-%M-%S"), help="Snapshot name (will be created under the repository)", ) def backup(host: str, port: int, repo: str, snapshot: str): - """elasticsearch ➜ local""" - - esConfig = ESConfig( - host=host, - port=port, - repo=repo, - ) + """elasticsearch ➜ snapshot""" + esConfig = ESConfig(host=host, port=port, repo=repo) indices = _getIndices(esConfig) if not indices: @@ -70,12 +101,18 @@ def backup(host: str, port: int, repo: str, snapshot: str): @es.command() @es_flags -@dir_flags -def restore(host: str, port: int, snapshot: str): - """local ➜ elasticsearch""" - esConfig = ESConfig(host=host, port=port) +@es_repo_flags +@click.option( + "--snapshot", + "-s", + required=True, + help="Snapshot name to restore from", +) +def restore(host: str, port: int, repo: str, snapshot: str): + """snapshot ➜ elasticsearch""" + esConfig = ESConfig(host=host, port=port, repo=repo) - indices = _getIndices(esConfig) + indices = _getSnapshotIndices(esConfig, repo, snapshot) if not indices: logging.warning( f"No indices found to restore at {esConfig.host}:{esConfig.port}." @@ -83,8 +120,7 @@ def restore(host: str, port: int, snapshot: str): return # Restore indices - for index in indices: - _ = _restore(esConfig, index, snapshot) + _ = _restore(esConfig, indices, snapshot) # Elasticsearch snapshot repository commands diff --git a/src/backup/es/repo/cli.py b/src/backup/es/repo/cli.py index 2a29ce3..30412a6 100644 --- a/src/backup/es/repo/cli.py +++ b/src/backup/es/repo/cli.py @@ -30,7 +30,7 @@ def repo(): @repo.command(name="ls") @es_flags def listRepos(host: str, port: int): - """list snapshot repositories""" + """List snapshot repositories""" esConfig = ESConfig(host=host, port=port) repos = _getRepos(esConfig) @@ -49,14 +49,9 @@ def listRepos(host: str, port: int): @es_flags @es_repo_flags @s3_flags -def initRepo( - host: str, - port: int, - repo: str, - endpoint: str, - bucket: str, -): - """initialize a snapshot repository""" +def initRepo(host: str, port: int, repo: str, endpoint: str, bucket: str): + """Initialize a snapshot repository""" + # Create ElasticSearchConfig including S3 endpoint and bucket for repository creation esConfig = ESConfig( host=host, @@ -66,6 +61,7 @@ def initRepo( bucket=bucket, ) + # TODO: Add readonly flag to for restore operations success = _initRepo(esConfig) if success: click.echo(f"Repository '{repo}' initialized successfully.") @@ -82,13 +78,8 @@ def initRepo( default=False, help="Force/confirm deletion of the repository.", ) -def deleteRepo( - host: str, - port: int, - repo: str, - force: bool -): - """initialize a snapshot repository""" +def deleteRepo(host: str, port: int, repo: str, force: bool): + """Initialize a snapshot repository""" # Create ElasticSearchConfig including S3 endpoint and bucket for repository creation esConfig = ESConfig( host=host, From 9edb44c1ecae4afaa7b65e0976d0de97596fc793 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Mon, 5 Jan 2026 12:59:41 -0800 Subject: [PATCH 28/30] feat: Update entrypoint.sh --- Dockerfile | 21 +++++++++++---------- entrypoint.sh | 4 +++- src/backup/postgres/__init__.py | 4 ++-- src/backup/postgres/cli.py | 3 +++ 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8c081d2..2530f33 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,16 +1,6 @@ # Backup build FROM python:slim -# Note: Installing Postgres 14 for now to match the versions used by Gen3-Helm -# -# Gen3-Helm Chart: -# https://github.com/uc-cdis/gen3-helm/blob/gen3-0.2.69/helm/gen3/Chart.yaml#L143-L146 -# -# Bitnami Postgres 11.9.13 (14.5.0): -# https://github.com/bitnami/charts/blob/c6076945ecc47791d82e545a20ef690dd93ff662/bitnami/postgresql/Chart.yaml#L4 -# -# Postgres Installation docs: -# https://www.postgresql.org/download/linux/debian/ RUN apt-get update && apt-get install -y \ build-essential \ gcc \ @@ -20,6 +10,17 @@ RUN apt-get install -y postgresql-common RUN YES=true /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh +# Note: We're using Postgres 14 to match the version set in Gen3-Helm: +# +# Gen3-Helm Chart: https://github.com/calypr/gen3-helm/blob/v1.0.0/helm/gen3/Chart.yaml#L92-L94 +# +# Postgres Chart: https://github.com/bitnami/charts/blob/postgresql/11.9.13/bitnami/postgresql/Chart.yaml#L4 +# +# ``` +# ➜ kubectl exec --stdin --tty StatefulSets/cbds-postgresql -- /bin/bash +# $ psql --version +# psql (PostgreSQL) 14.5 +# ``` RUN apt-get update && apt-get install -y postgresql-client-14 WORKDIR /app diff --git a/entrypoint.sh b/entrypoint.sh index c23a303..d2cd936 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -1,5 +1,7 @@ #!/bin/bash -set -e +set -eio pipefail + +trap 'echo "Backup failed."; exit 1' ERR # Backup Overview/Structure: # diff --git a/src/backup/postgres/__init__.py b/src/backup/postgres/__init__.py index 77f3bd5..3954cf1 100644 --- a/src/backup/postgres/__init__.py +++ b/src/backup/postgres/__init__.py @@ -117,7 +117,7 @@ def _restore(pgConfig: PGConfig, db: str, dir: Path) -> Path: logging.error("pg_restore not found in PATH") command = [ - "pg_restore", + "psql", "-U", pgConfig.user, "-h", @@ -126,7 +126,7 @@ def _restore(pgConfig: PGConfig, db: str, dir: Path) -> Path: str(pgConfig.port), "-d", db, - "--no-password", + "-f", dump.as_posix(), ] diff --git a/src/backup/postgres/cli.py b/src/backup/postgres/cli.py index 71371e3..9065214 100644 --- a/src/backup/postgres/cli.py +++ b/src/backup/postgres/cli.py @@ -102,4 +102,7 @@ def restore(host: str, port: int, user: str, dir: Path): # Restore databases for database in dbs: + if database == "gecko_cbds" or database == "metadata_cbds": + logging.debug("Skipping restore of gecko and metadata databases...") + continue _ = _pgRestore(conf, database, dir) From 7c027bde10f2c6b388e85b8e0b31ecff8c052862 Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Mon, 5 Jan 2026 13:04:37 -0800 Subject: [PATCH 29/30] fix: Replace psycopg2-binary with psycopg2 to fix build errors --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 2d45a1e..93128e6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,8 @@ elasticsearch gripql minio orjson -psycopg2-binary +# psycopg2-binary +psycopg2 pytest # Development dependencies From 887a79cb98c259c40baad5f93978bbc41d89128a Mon Sep 17 00:00:00 2001 From: Liam Beckman Date: Mon, 5 Jan 2026 13:07:55 -0800 Subject: [PATCH 30/30] fix: Remove aced-submission dependency --- requirements.txt | 9 --------- 1 file changed, 9 deletions(-) diff --git a/requirements.txt b/requirements.txt index 93128e6..e431af3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,18 +1,9 @@ -# TODO: Using rc here to avoid psycopg errors: -# ERROR: Failed building wheel for psycopg2-binary -# error: expression is not assignable -# error: call to undeclared function 'gettimeofday'; -# -# Use non-rc version of aced-submission when released: -# https://pypi.org/project/aced-submission/#history -aced-submission>=0.0.10rc18 click click-aliases elasticsearch gripql minio orjson -# psycopg2-binary psycopg2 pytest