Skip to content

Commit

Permalink
Devin issue fea 145 (#189)
Browse files Browse the repository at this point in the history
  • Loading branch information
drobison00 authored Oct 25, 2024
1 parent ba96ad8 commit 58057cc
Show file tree
Hide file tree
Showing 44 changed files with 2,746 additions and 838 deletions.
44 changes: 28 additions & 16 deletions client/src/nv_ingest_client/cli/util/click.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@

import click
from nv_ingest_client.cli.util.processing import check_schema
from nv_ingest_client.primitives.tasks.caption import CaptionTaskSchema
from nv_ingest_client.primitives.tasks.chart_extraction import ChartExtractionSchema
from nv_ingest_client.primitives.tasks.chart_extraction import ChartExtractionTask
from nv_ingest_client.primitives.tasks.dedup import DedupTaskSchema
from nv_ingest_client.primitives.tasks.embed import EmbedTaskSchema
from nv_ingest_client.primitives.tasks.extract import ExtractTaskSchema
from nv_ingest_client.primitives.tasks.filter import FilterTaskSchema
from nv_ingest_client.primitives.tasks import CaptionTask
from nv_ingest_client.primitives.tasks import DedupTask
from nv_ingest_client.primitives.tasks import EmbedTask
Expand All @@ -20,13 +27,10 @@
from nv_ingest_client.primitives.tasks import SplitTask
from nv_ingest_client.primitives.tasks import StoreTask
from nv_ingest_client.primitives.tasks import VdbUploadTask
from nv_ingest_client.primitives.tasks.caption import CaptionTaskSchema
from nv_ingest_client.primitives.tasks.dedup import DedupTaskSchema
from nv_ingest_client.primitives.tasks.embed import EmbedTaskSchema
from nv_ingest_client.primitives.tasks.extract import ExtractTaskSchema
from nv_ingest_client.primitives.tasks.filter import FilterTaskSchema
from nv_ingest_client.primitives.tasks.split import SplitTaskSchema
from nv_ingest_client.primitives.tasks.store import StoreTaskSchema
from nv_ingest_client.primitives.tasks.table_extraction import TableExtractionSchema
from nv_ingest_client.primitives.tasks.table_extraction import TableExtractionTask
from nv_ingest_client.primitives.tasks.vdb_upload import VdbUploadTaskSchema
from nv_ingest_client.util.util import generate_matching_files

Expand Down Expand Up @@ -104,51 +108,59 @@ def click_validate_task(ctx, param, value):
if task_id == "split":
task_options = check_schema(SplitTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = SplitTask(**task_options.dict())
new_task = [(new_task_id, SplitTask(**task_options.dict()))]
elif task_id == "extract":
task_options = check_schema(ExtractTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}_{task_options.document_type}"
new_task = ExtractTask(**task_options.dict())
new_task = [(new_task_id, ExtractTask(**task_options.dict()))]

if (task_options.extract_tables == True):
subtask_options = check_schema(TableExtractionSchema, {}, "table_data_extract", "{}")
new_task.append(("table_data_extract", TableExtractionTask(**subtask_options.dict())))

if (task_options.extract_charts == True):
subtask_options = check_schema(ChartExtractionSchema, {}, "chart_data_extract", "{}")
new_task.append(("chart_data_extract", ChartExtractionTask(**subtask_options.dict())))

elif task_id == "store":
task_options = check_schema(StoreTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = StoreTask(**task_options.dict())
new_task = [(new_task_id, StoreTask(**task_options.dict()))]
elif task_id == "caption":
task_options = check_schema(CaptionTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = CaptionTask(**task_options.dict())
new_task = [(new_task_id, CaptionTask(**task_options.dict()))]
elif task_id == "dedup":
task_options = check_schema(DedupTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = DedupTask(**task_options.dict())
new_task = [(new_task_id, DedupTask(**task_options.dict()))]
elif task_id == "filter":
task_options = check_schema(FilterTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = FilterTask(**task_options.dict())
new_task = [(new_task_id, FilterTask(**task_options.dict()))]
elif task_id == "embed":
task_options = check_schema(EmbedTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = EmbedTask(**task_options.dict())
new_task = [(new_task_id, EmbedTask(**task_options.dict()))]
elif task_id == "vdb_upload":
task_options = check_schema(VdbUploadTaskSchema, options, task_id, json_options)
new_task_id = f"{task_id}"
new_task = VdbUploadTask(**task_options.dict())

new_task = [(new_task_id, VdbUploadTask(**task_options.dict()))]
else:
raise ValueError(f"Unsupported task type: {task_id}")

if new_task_id in validated_tasks:
raise ValueError(f"Duplicate task detected: {new_task_id}")

logger.debug("Adding task: %s", new_task_id)
validated_tasks[new_task_id] = new_task
for task_tuple in new_task:
validated_tasks[task_tuple[0]] = task_tuple[1]
except ValueError as e:
validation_errors.append(str(e))

if validation_errors:
# Aggregate error messages with original values highlighted
error_message = "\n".join(validation_errors)
# logger.error(error_message)
raise click.BadParameter(error_message)

return validated_tasks
Expand Down
2 changes: 1 addition & 1 deletion client/src/nv_ingest_client/cli/util/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def check_schema(schema: Type[BaseModel], options: dict, task_id: str, original_


def report_stage_statistics(
stage_elapsed_times: defaultdict(list), total_trace_elapsed: float, abs_elapsed: float
stage_elapsed_times: defaultdict, total_trace_elapsed: float, abs_elapsed: float
) -> None:
"""
Reports the statistics for each processing stage, including average, median, total time spent,
Expand Down
64 changes: 32 additions & 32 deletions client/src/nv_ingest_client/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ class NvIngestClient:
"""

def __init__(
self,
message_client_allocator: Callable[..., RestClient] = RestClient,
message_client_hostname: Optional[str] = "localhost",
message_client_port: Optional[int] = 7670,
message_client_kwargs: Optional[Dict] = None,
msg_counter_id: Optional[str] = "nv-ingest-message-id",
worker_pool_size: int = 1,
self,
message_client_allocator: Callable[..., RestClient] = RestClient,
message_client_hostname: Optional[str] = "localhost",
message_client_port: Optional[int] = 7670,
message_client_kwargs: Optional[Dict] = None,
msg_counter_id: Optional[str] = "nv-ingest-message-id",
worker_pool_size: int = 1,
) -> None:
"""
Initializes the NvIngestClient with a client allocator, REST configuration, a message counter ID,
Expand Down Expand Up @@ -149,9 +149,9 @@ def _pop_job_state(self, job_index: str) -> JobState:
return job_state

def _get_and_check_job_state(
self,
job_index: str,
required_state: Union[JobStateEnum, List[JobStateEnum]] = None,
self,
job_index: str,
required_state: Union[JobStateEnum, List[JobStateEnum]] = None,
) -> JobState:
if required_state and not isinstance(required_state, list):
required_state = [required_state]
Expand Down Expand Up @@ -192,13 +192,13 @@ def add_job(self, job_spec: Union[BatchJobSpec, JobSpec]) -> str:
raise ValueError(f"Unexpected type: {type(job_spec)}")

def create_job(
self,
payload: str,
source_id: str,
source_name: str,
document_type: str = None,
tasks: Optional[list] = None,
extended_options: Optional[dict] = None,
self,
payload: str,
source_id: str,
source_name: str,
document_type: str = None,
tasks: Optional[list] = None,
extended_options: Optional[dict] = None,
) -> str:
"""
Creates a new job with the specified parameters and adds it to the job tracking dictionary.
Expand Down Expand Up @@ -249,10 +249,10 @@ def add_task(self, job_index: str, task: Task) -> None:
job_state.job_spec.add_task(task)

def create_task(
self,
job_index: Union[str, int],
task_type: TaskType,
task_params: dict = None,
self,
job_index: Union[str, int],
task_type: TaskType,
task_params: dict = None,
) -> None:
"""
Creates a task of the specified type with given parameters and associates it with the existing job.
Expand Down Expand Up @@ -345,12 +345,12 @@ def _fetch_job_result_wait(self, job_id: str, timeout: float = 60, data_only: bo
# This is the direct Python approach function for retrieving jobs which handles the timeouts directly
# in the function itself instead of expecting the user to handle it themselves
def fetch_job_result(
self,
job_ids: List[str],
timeout: float = 100,
max_retries: Optional[int] = None,
retry_delay: float = 1,
verbose: bool = False,
self,
job_ids: List[str],
timeout: float = 100,
max_retries: Optional[int] = None,
retry_delay: float = 1,
verbose: bool = False,
) -> List[Tuple[Optional[Dict], str]]:
"""
Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.
Expand Down Expand Up @@ -437,7 +437,7 @@ def _ensure_submitted(self, job_ids: List[str]):
job_state.future = None

def fetch_job_result_async(
self, job_ids: Union[str, List[str]], timeout: float = 10, data_only: bool = True
self, job_ids: Union[str, List[str]], timeout: float = 10, data_only: bool = True
) -> Dict[Future, str]:
"""
Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.
Expand Down Expand Up @@ -467,9 +467,9 @@ def fetch_job_result_async(
return future_to_job_id

def _submit_job(
self,
job_index: str,
job_queue_id: str,
self,
job_index: str,
job_queue_id: str,
) -> Optional[Dict]:
"""
Submits a job to a specified job queue and optionally waits for a response if blocking is True.
Expand Down Expand Up @@ -514,7 +514,7 @@ def _submit_job(
raise

def submit_job(
self, job_indices: Union[str, List[str]], job_queue_id: str, batch_size: int = 10
self, job_indices: Union[str, List[str]], job_queue_id: str, batch_size: int = 10
) -> List[Union[Dict, None]]:
if isinstance(job_indices, str):
job_indices = [job_indices]
Expand Down
1 change: 0 additions & 1 deletion client/src/nv_ingest_client/nv_ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import click
import pkg_resources
from nv_ingest_client.cli.util.click import ClientType
from nv_ingest_client.cli.util.click import LogLevel
from nv_ingest_client.cli.util.click import click_match_and_validate_files
from nv_ingest_client.cli.util.click import click_validate_batch_size
Expand Down
53 changes: 53 additions & 0 deletions client/src/nv_ingest_client/primitives/tasks/chart_extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments

import logging
from typing import Dict

from pydantic import BaseModel

from .task_base import Task

logger = logging.getLogger(__name__)


class ChartExtractionSchema(BaseModel):
class Config:
extra = "forbid"


class ChartExtractionTask(Task):
"""
Object for chart extraction task
"""

def __init__(
self) -> None:
"""
Setup Dedup Task Config
"""
super().__init__()

def __str__(self) -> str:
"""
Returns a string with the object's config and run time state
"""
info = ""
info += "chart extraction task\n"
return info

def to_dict(self) -> Dict:
"""
Convert to a dict for submission to redis
"""

task_properties = {
"params": {},
}

return {"type": "chart_data_extract", "task_properties": task_properties}
53 changes: 53 additions & 0 deletions client/src/nv_ingest_client/primitives/tasks/table_extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments

import logging
from typing import Dict

from pydantic import BaseModel

from .task_base import Task

logger = logging.getLogger(__name__)


class TableExtractionSchema(BaseModel):
class Config:
extra = "forbid"


class TableExtractionTask(Task):
"""
Object for table extraction tasks
"""

def __init__(
self) -> None:
"""
Setup Dedup Task Config
"""
super().__init__()

def __str__(self) -> str:
"""
Returns a string with the object's config and run time state
"""
info = ""
info += "table extraction task\n"
return info

def to_dict(self) -> Dict:
"""
Convert to a dict for submission to redis
"""

task_properties = {
"params": {},
}

return {"type": "table_data_extract", "task_properties": task_properties}
2 changes: 2 additions & 0 deletions client/src/nv_ingest_client/primitives/tasks/task_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class TaskType(Enum):
TRANSFORM = auto()
STORE = auto()
VDB_UPLOAD = auto()
TABLE_DATA_EXTRACT = auto()
CHART_DATA_EXTRACT = auto()


def is_valid_task_type(task_type_str: str) -> bool:
Expand Down
6 changes: 3 additions & 3 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ services:
nv-ingest-ms-runtime:
image: nvcr.io/ohlfw0olaadg/ea-participants/nv-ingest:24.08
build:
context: ${NV_INGEST_ROOT}
context: ${NV_INGEST_ROOT:-.}
dockerfile: "./Dockerfile"
target: runtime
volumes:
- ${DATASET_ROOT}:/workspace/data
- ${DATASET_ROOT:-./data}:/workspace/data
ports:
- "7670:7670"
cap_add:
Expand All @@ -138,8 +138,8 @@ services:
- DEPLOT_HEALTH_ENDPOINT=deplot:8000
- DEPLOT_HTTP_ENDPOINT=http://deplot:8000/v1/chat/completions
# build.nvidia.com hosted deplot
#- DEPLOT_HTTP_ENDPOINT=https://ai.api.nvidia.com/v1/vlm/google/deplot
- DEPLOT_INFER_PROTOCOL=http
#- DEPLOT_HTTP_ENDPOINT=https://ai.api.nvidia.com/v1/vlm/google/deplot
- DOUGHNUT_GRPC_TRITON=triton-doughnut:8001
- INGEST_LOG_LEVEL=DEFAULT
- MESSAGE_CLIENT_HOST=redis
Expand Down
Loading

1 comment on commit 58057cc

@enwaiax
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drobison00 @sosahi The change for client/src/nv_ingest_client/cli/util/click.py makes the nv-client failed to submit ingestion job.
It failed with below error:

Traceback (most recent call last):
  File "/localhome/local-xiangw/nv-ingest/.venv/bin/nv-ingest-cli", line 33, in <module>
    sys.exit(load_entry_point('nv-ingest-client==2024.10.31.dev0', 'console_scripts', 'nv-ingest-cli')())
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/click/decorators.py", line 33, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/nv_ingest_client/nv_ingest_cli.py", line 236, in main
    (total_files, trace_times, pages_processed) = create_and_process_jobs(
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/nv_ingest_client/cli/util/processing.py", line 612, in create_and_process_jobs
    futures_dict = client.fetch_job_result_async(job_ids, timeout=timeout, data_only=False)
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/nv_ingest_client/client/client.py", line 457, in fetch_job_result_async
    self._ensure_submitted(job_ids)
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/nv_ingest_client/client/client.py", line 436, in _ensure_submitted
    job_state.state = JobStateEnum.SUBMITTED
  File "/localhome/local-xiangw/nv-ingest/.venv/lib/python3.10/site-packages/nv_ingest_client/primitives/jobs/job_state.py", line 113, in state
    raise ValueError(f"Cannot change state from {self._state.name} to {value.name}.")
ValueError: Cannot change state from FAILED to SUBMITTED.

The nv-ingest-cli insider the container worked well. I just copy the client/src/nv_ingest_client/cli/util/click.py from the released container and re-install from the source code. It worked. Please fix.

Please sign in to comment.