Skip to content

feat: add concurrent task processing to run() for long_running mode #4

@kitsune-hash

Description

@kitsune-hash

Problem

Currently run() processes tasks sequentially in a for line in sys.stdin loop. Even when the Go worker sends multiple tasks concurrently via stdin (StdioHandler supports multiplexing by task_id), the Python side blocks on each handler call before reading the next task.

This means concurrency > 1 in worker config has no real effect for long_running workers — tasks are just pre-fetched into the stdin buffer.

Proposed Solution

Add a concurrent mode to run() that dispatches incoming tasks to a thread pool:

def run(workers=1):
    # ... load, ready signal ...
    if workers <= 1:
        # Current sequential behavior
        for line in sys.stdin:
            process_single(line, handler, ctx)
    else:
        from concurrent.futures import ThreadPoolExecutor
        with ThreadPoolExecutor(max_workers=workers) as pool:
            for line in sys.stdin:
                pool.submit(process_single, line, handler, ctx)

Stdout writes need to be thread-safe (add a lock around print/flush).

Why Threads (not Processes)

  • The @load context (ML models) must be shared across tasks — threads share memory
  • ML inference (torch, numpy, llama.cpp) releases the GIL during C operations
  • Existing workers (prompt-classifier, image-classifier) already use threading.Lock()
  • GPU inference naturally releases GIL during CUDA calls

Configuration

  • Parameter: run(workers=N)
  • Env var: RUNQY_PYTHON_WORKERS (runtime override without code changes)
  • Default workers=1 = current sequential behavior (backward compatible)

Related

  • Go StdioHandler already supports concurrent multiplexing via pending[taskID] map
  • Workers that benefit: prompt-classifier, image-classifier (both thread-safe)
  • Workers to keep sequential: dolphin (GPU VRAM bound)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions