Skip to content

Commit

Permalink
Update pipeline_runners to use smarter creation schema defaults (#321)
Browse files Browse the repository at this point in the history
  • Loading branch information
drobison00 authored Jan 10, 2025
1 parent c3bc572 commit d17e4c3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 39 deletions.
28 changes: 4 additions & 24 deletions examples/bare_metal_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def run_ingestor():
max_character_length=5000,
sentence_window_size=0,
)
.embed()
.embed(text=True, tables=True)
)

try:
Expand All @@ -59,28 +59,8 @@ def run_ingestor():

def main():
try:
# Collect environment parameters, falling back to schema defaults if not set
config_data = {
"cached_grpc_endpoint": os.getenv("CACHED_GRPC_ENDPOINT"),
"cached_infer_protocol": os.getenv("CACHED_INFER_PROTOCOL"),
"embedding_nim_endpoint": os.getenv("EMBEDDING_NIM_ENDPOINT"),
"deplot_http_endpoint": os.getenv("DEPLOT_HTTP_ENDPOINT"),
"deplot_infer_protocol": os.getenv("DEPLOT_INFER_PROTOCOL"),
"ingest_log_level": os.getenv("INGEST_LOG_LEVEL"),
"message_client_host": os.getenv("MESSAGE_CLIENT_HOST"),
"message_client_port": os.getenv("MESSAGE_CLIENT_PORT"),
"message_client_type": os.getenv("MESSAGE_CLIENT_TYPE"),
"minio_bucket": os.getenv("MINIO_BUCKET"),
"mrc_ignore_numa_check": os.getenv("MRC_IGNORE_NUMA_CHECK"),
"otel_exporter_otlp_endpoint": os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"),
"paddle_grpc_endpoint": os.getenv("PADDLE_GRPC_ENDPOINT"),
"paddle_http_endpoint": os.getenv("PADDLE_HTTP_ENDPOINT"),
"paddle_infer_protocol": os.getenv("PADDLE_INFER_PROTOCOL"),
"redis_morpheus_task_queue": os.getenv("REDIS_MORPHEUS_TASK_QUEUE"),
"yolox_infer_protocol": os.getenv("YOLOX_INFER_PROTOCOL"),
"yolox_grpc_endpoint": os.getenv("YOLOX_GRPC_ENDPOINT"),
"vlm_caption_endpoint": os.getenv("VLM_CAPTION_ENDPOINT"),
}
# Possibly override config parameters
config_data = {}

# Filter out None values to let the schema defaults handle them
config_data = {key: value for key, value in config_data.items() if value is not None}
Expand All @@ -89,7 +69,7 @@ def main():
config = PipelineCreationSchema(**config_data)

# Start the pipeline subprocess
pipeline_process = start_pipeline_subprocess(config)
pipeline_process = start_pipeline_subprocess(config, stderr=sys.stderr, stdout=sys.stdout)

# Optionally, wait a bit before starting the ingestor to ensure the pipeline is ready
time.sleep(10)
Expand Down
37 changes: 22 additions & 15 deletions src/nv_ingest/util/pipeline/pipeline_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,35 @@
logger = logging.getLogger(__name__)


# TODO(Devin) Name this function something more descriptive
class PipelineCreationSchema(BaseModel):
cached_grpc_endpoint: str = "localhost:8007"
cached_infer_protocol: str = "grpc"
deplot_http_endpoint: str = "http://localhost:8003/v1/chat/completions"
cached_http_endpoint: str = os.getenv(
"CACHED_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/university-at-buffalo/cached"
)
cached_infer_protocol: str = "http"
deplot_http_endpoint: str = os.getenv("DEPLOT_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/vlm/google/deplot")
deplot_infer_protocol: str = "http"
embedding_nim_endpoint: str = "http://localhost:8012/v1"
embedding_nim_model_name: str = "nvidia/nv-embedqa-e5-v5"
ingest_log_level: str = "INFO"
doughnut_grpc_triton: str = "triton-doughnut:8001"
embedding_nim_endpoint: str = os.getenv("EMBEDDING_NIM_ENDPOINT", "https://integrate.api.nvidia.com/v1")
embedding_nim_model_name: str = os.getenv("EMBEDDING_NIM_MODEL_NAME", "nvidia/nv-embedqa-e5-v5")
ingest_log_level: str = os.getenv("INGEST_LOG_LEVEL", "INFO")
message_client_host: str = "localhost"
message_client_port: str = "7671"
message_client_type: str = "simple"
minio_bucket: str = "nv-ingest"
mrc_ignore_numa_check: str = "1"
ngc_api_key: str = os.getenv("NGC_API_KEY", "")
nvidia_build_api_key: str = os.getenv("NVIDIA_BUILD_API_KEY", "")
otel_exporter_otlp_endpoint: str = "localhost:4317"
paddle_grpc_endpoint: str = "localhost:8010"
paddle_http_endpoint: str = "http://localhost:8009/v1/infer"
paddle_infer_protocol: str = "grpc"
paddle_http_endpoint: str = os.getenv("PADDLE_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/baidu/paddleocr")
paddle_infer_protocol: str = "http"
redis_morpheus_task_queue: str = "morpheus_task_queue"
yolox_infer_protocol: str = "grpc"
yolox_grpc_endpoint: str = "localhost:8001"
vlm_caption_endpoint: str = "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct/chat/completions"
vlm_caption_endpoint: str = os.getenv(
"VLM_CAPTION_ENDPOINT", "https://ai.api.nvidia.com/v1/gr/meta/llama-3.2-90b-vision-instruct/chat/completions"
)
yolox_http_endpoint: str = os.getenv(
"YOLOX_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/cv/nvidia/nv-yolox-page-elements-v1"
)
yolox_infer_protocol: str = "http"

model_config = ConfigDict(extra="forbid")


Expand Down Expand Up @@ -311,7 +318,7 @@ def start_pipeline_subprocess(config: PipelineCreationSchema, stdout=None, stder

# Prepare environment variables from the config
env = os.environ.copy()
env.update({key.upper(): val for key, val in config.dict().items()})
env.update({key.upper(): val for key, val in config.model_dump().items()})

logger.info("Starting pipeline subprocess...")

Expand Down

0 comments on commit d17e4c3

Please sign in to comment.