Skip to content

Commit

Permalink
working hatchet integration (#1046)
Browse files Browse the repository at this point in the history
  • Loading branch information
emrgnt-cmplxty authored Sep 4, 2024
1 parent bb08b60 commit b5fb31b
Show file tree
Hide file tree
Showing 9 changed files with 524 additions and 1,242 deletions.
12 changes: 8 additions & 4 deletions py/compose.hatchet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ services:
SERVER_AUTH_COOKIE_INSECURE: "t"
SERVER_GRPC_BIND_ADDRESS: "0.0.0.0"
SERVER_GRPC_INSECURE: "t"
SERVER_GRPC_BROADCAST_ADDRESS: localhost:7077
SERVER_GRPC_BROADCAST_ADDRESS: "host.docker.internal:7077"
SERVER_GRPC_MAX_MSG_SIZE: 13421772800

volumes:
Expand All @@ -88,7 +88,7 @@ services:
hatchet-setup-config:
condition: service_completed_successfully
ports:
- "7077:7070"
- "7077:7077"
environment:
POSTGRES_USER: "${POSTGRES_USER:-postgres}"
POSTGRES_PASSWORD: "${POSTGRES_PASSWORD:-postgres}"
Expand All @@ -97,15 +97,19 @@ services:
POSTGRES_DBNAME: "${POSTGRES_DBNAME:-postgres}"
DATABASE_URL: "postgres://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@postgres:5432/${POSTGRES_DBNAME:-postgres}?sslmode=disable"
SERVER_GRPC_BIND_ADDRESS: "0.0.0.0"
SERVER_GRPC_BROADCAST_ADDRESS: "host.docker.internal:7077"
SERVER_GRPC_PORT: 7077
SERVER_GRPC_INSECURE: "t"
SERVER_GRPC_MAX_MSG_SIZE: 104857600
volumes:
- hatchet_certs:/hatchet/certs
- hatchet_config:/hatchet/config
networks:
- r2r-network
r2r-network:
aliases:
- host.docker.internal
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:7070/health"]
test: ["CMD", "curl", "-f", "http://localhost:7077/health"]
interval: 10s
timeout: 5s
retries: 5
Expand Down
57 changes: 50 additions & 7 deletions py/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,64 @@ networks:
- "com.docker.compose.recreate=always"



services:

setup-token:
image: ghcr.io/hatchet-dev/hatchet/hatchet-admin:latest

command: >
sh -c "
if [ ! -f /hatchet_api_key/api_key.txt ]; then
echo 'API key does not exist. Creating new key...'
/hatchet/hatchet-admin token create --config /hatchet/config --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52 > /tmp/hatchet_api_key &&
cat /tmp/hatchet_api_key | tr -d '\n' > /hatchet_api_key/api_key.txt &&
echo 'Hatchet API key has been saved to /hatchet_api_key/api_key.txt'
set -e
echo 'Starting token creation process...'
# Attempt to create token and capture both stdout and stderr
TOKEN_OUTPUT=$$(/hatchet/hatchet-admin token create --config /hatchet/config --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52 2>&1)
# Extract the token (assuming it's the only part that looks like a JWT)
TOKEN=$$(echo "$$TOKEN_OUTPUT" | grep -Eo 'eyJ[A-Za-z0-9_-]*\.eyJ[A-Za-z0-9_-]*\.[A-Za-z0-9_-]*')
if [ -z "$$TOKEN" ]; then
echo 'Error: Failed to extract token. Full command output:' >&2
echo "$$TOKEN_OUTPUT" >&2
exit 1
fi
echo "$$TOKEN" > /tmp/hatchet_api_key
echo 'Token created and saved to /tmp/hatchet_api_key'
# Copy token to final destination
echo -n "$$TOKEN" > /hatchet_api_key/api_key.txt
echo 'Token copied to /hatchet_api_key/api_key.txt'
# Verify token was copied correctly
if [ "$$(cat /tmp/hatchet_api_key)" != "$$(cat /hatchet_api_key/api_key.txt)" ]; then
echo 'Error: Token copy failed, files do not match' >&2
echo 'Content of /tmp/hatchet_api_key:'
cat /tmp/hatchet_api_key
echo 'Content of /hatchet_api_key/api_key.txt:'
cat /hatchet_api_key/api_key.txt
exit 1
fi
echo 'Hatchet API key has been saved successfully'
echo 'Token length:' $${#TOKEN}
echo 'Token (first 20 chars):' $${TOKEN:0:20}
echo 'Token structure:' $$(echo $$TOKEN | awk -F. '{print NF-1}') 'parts'
# Check each part of the token
for i in 1 2 3; do
PART=$$(echo $$TOKEN | cut -d. -f$$i)
echo 'Part' $$i 'length:' $${#PART}
echo 'Part' $$i 'base64 check:' $$(echo $$PART | base64 -d >/dev/null 2>&1 && echo 'Valid' || echo 'Invalid')
done
# Final validation attempt
if ! echo $$TOKEN | awk -F. '{print $$2}' | base64 -d 2>/dev/null | jq . >/dev/null 2>&1; then
echo 'Warning: Token payload is not valid JSON when base64 decoded' >&2
else
echo 'API key already exists. Skipping creation.'
echo 'Token payload appears to be valid JSON'
fi
"
networks:
- r2r-network
environment:
Expand Down Expand Up @@ -128,12 +171,12 @@ services:
- UNSTRUCTURED_API_URL=${UNSTRUCTURED_API_URL:-https://api.unstructured.io/general/v0/general}

# Hatchet
- HATCHET_CLIENT_TLS_STRATEGY=none
- HATCHET_CLIENT_TOKEN

command: >
sh -c '
export HATCHET_CLIENT_TOKEN=$(cat /hatchet_api_key/api_key.txt) &&
echo "HATCHET_CLIENT_TOKEN has been set successfully. Value: $$HATCHET_CLIENT_TOKEN" &&
exec uvicorn core.main.app_entry:app --host $$HOST --port $$PORT
'
networks:
Expand Down
7 changes: 4 additions & 3 deletions py/core/main/api/ingestion_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from core.base.api.models.ingestion.responses import WrappedIngestionResponse
from core.base.providers import OrchestrationProvider

from ...main.hatchet import r2r_hatchet
from ..hatchet import IngestFilesWorkflow, UpdateFilesWorkflow
from ..services.ingestion_service import IngestionService
from .base_router import BaseRouter, RunType
Expand Down Expand Up @@ -123,7 +124,7 @@ async def ingest_files_app(
"is_update": False,
}

task_id = self.orchestration_provider.workflow(
task_id = r2r_hatchet.client.admin.run_workflow(
"ingest-file", {"request": workflow_input}
)
messages.append(
Expand All @@ -132,6 +133,7 @@ async def ingest_files_app(
"task_id": str(task_id),
}
)
print("messages = ", messages)

return messages

Expand Down Expand Up @@ -200,10 +202,9 @@ async def update_files_app(
"user": auth_user.json(),
}

task_id = self.orchestration_provider.workflow(
task_id = r2r_hatchet.client.admin.run_workflow(
"update-files", {"request": workflow_input}
)

return {
"message": f"Update task queued successfully.",
"task_id": str(task_id),
Expand Down
2 changes: 1 addition & 1 deletion py/core/main/api/management_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async def logs_app(
run_type_filter: Optional[str] = Query(""),
offset: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
auth_user=Depends(self.engine.providers.auth.auth_wrapper),
auth_user=Depends(self.service.providers.auth.auth_wrapper),
) -> WrappedLogResponse:
if not auth_user.is_superuser:
raise R2RException(
Expand Down
5 changes: 3 additions & 2 deletions py/core/main/api/restructure_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
)
from core.base.providers import OrchestrationProvider

from ...main.hatchet import r2r_hatchet
from ..hatchet import (
CreateGraphWorkflow,
EnrichGraphWorkflow,
Expand Down Expand Up @@ -81,7 +82,7 @@ async def create_graph(
"user": auth_user.json(),
}

task_id = self.orchestration_provider.workflow(
task_id = r2r_hatchet.client.admin.run_workflow(
"create-graph", {"request": workflow_input}
)

Expand Down Expand Up @@ -124,7 +125,7 @@ async def enrich_graph(
"user": auth_user.json(),
}

task_id = self.orchestration_provider.workflow(
task_id = r2r_hatchet.client.admin.run_workflow(
"enrich-graph", {"request": workflow_input}
)

Expand Down
12 changes: 3 additions & 9 deletions py/core/main/app_entry.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
import logging
import os
import threading
from typing import Optional

from fastapi import FastAPI
from hatchet.base import worker

from .assembly import R2RBuilder, R2RConfig

logger = logging.getLogger(__name__)


def start_hatchet_worker():
worker.start()


def r2r_app(
config_name: Optional[str] = "default",
config_path: Optional[str] = None,
Expand All @@ -30,12 +24,12 @@ def r2r_app(
)

# Build the FastAPI app
app = R2RBuilder(config=config).build().app
app = R2RBuilder(config=config).build()

# Start the Hatchet worker in a separate thread
r2r_app.orchestration_provider.start_worker()
app.orchestration_provider.start_worker()

return app
return app.app


logging.basicConfig(level=logging.INFO)
Expand Down
8 changes: 2 additions & 6 deletions py/core/main/hatchet/ingestion_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
from .base import r2r_hatchet


@r2r_hatchet.workflow(
name="ingest-file", on_events=["file:ingest"], timeout=3600
)
@r2r_hatchet.workflow(name="ingest-file", timeout=3600)
class IngestFilesWorkflow:
def __init__(self, ingestion_service: IngestionService):
self.ingestion_service = ingestion_service
Expand Down Expand Up @@ -84,9 +82,7 @@ def get_document_info(self, context: Context) -> DocumentInfo:
)


@r2r_hatchet.workflow(
name="update-files", on_events=["file:update"], timeout=3600
)
@r2r_hatchet.workflow(name="update-files", timeout=3600)
class UpdateFilesWorkflow:
def __init__(self, ingestion_service: IngestionService):
self.ingestion_service = ingestion_service
Expand Down
8 changes: 2 additions & 6 deletions py/core/main/hatchet/restructure_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ async def kg_extract_and_store(self, context: Context) -> None:
return {"result": None}


@r2r_hatchet.workflow(
name="create-graph", on_events=["graph:create"], timeout=3600
)
@r2r_hatchet.workflow(name="create-graph", timeout=3600)
class CreateGraphWorkflow:
def __init__(self, restructure_service: RestructureService):
self.restructure_service = restructure_service
Expand Down Expand Up @@ -73,9 +71,7 @@ async def kg_extraction_ingress(self, context: Context) -> None:
return {"result": "success"}


@r2r_hatchet.workflow(
name="enrich-graph", on_events=["graph:enrich"], timeout=3600
)
@r2r_hatchet.workflow(name="enrich-graph", timeout=3600)
class EnrichGraphWorkflow:
def __init__(self, restructure_service: RestructureService):
self.restructure_service = restructure_service
Expand Down
Loading

0 comments on commit b5fb31b

Please sign in to comment.