diff --git a/README.md b/README.md index 31fbb2349..ca383d7ec 100644 --- a/README.md +++ b/README.md @@ -188,6 +188,19 @@ Key environment variables (server) from packages/platform-server/.env.example an - DOCKER_RUNNER_BASE_URL (required; default http://docker-runner:7071) - DOCKER_RUNNER_SHARED_SECRET (required HMAC credential) - DOCKER_RUNNER_TIMEOUT_MS (optional request timeout; default 30000) + - DOCKER_RUNNER_OPTIONAL (default true; set to false to keep fail-fast bootstrap) + - DOCKER_RUNNER_CONNECT_RETRY_BASE_DELAY_MS (default 500) + - DOCKER_RUNNER_CONNECT_RETRY_MAX_DELAY_MS (default 30000) + - DOCKER_RUNNER_CONNECT_RETRY_JITTER_MS (default 250) + - DOCKER_RUNNER_CONNECT_PROBE_INTERVAL_MS (default 30000 while runner is healthy) + - DOCKER_RUNNER_CONNECT_MAX_RETRIES (default 0 → unlimited background retries) +- Volume GC: + - VOLUME_GC_ENABLED (default true) + - VOLUME_GC_INTERVAL_MS (default 60000) + - VOLUME_GC_MAX_PER_SWEEP (default 100) + - VOLUME_GC_CONCURRENCY (default 3) + - VOLUME_GC_COOLDOWN_MS (default 600000) + - VOLUME_GC_SWEEP_TIMEOUT_MS (default 15000) - Nix/NCPS: - NCPS_ENABLED (default false) - NCPS_URL_SERVER, NCPS_URL_CONTAINER (default http://ncps:8501) diff --git a/docs/product-spec.md b/docs/product-spec.md index 023fcd8bb..e2a0e29e7 100644 --- a/docs/product-spec.md +++ b/docs/product-spec.md @@ -116,7 +116,7 @@ Configuration matrix (server env vars) - VAULT_ENABLED: true|false (default false) - VAULT_ADDR, VAULT_TOKEN - DOCKER_MIRROR_URL (default http://registry-mirror:5000) - - DOCKER_RUNNER_BASE_URL, DOCKER_RUNNER_SHARED_SECRET (required for docker-runner), plus optional DOCKER_RUNNER_TIMEOUT_MS (default 30000). + - DOCKER_RUNNER_BASE_URL, DOCKER_RUNNER_SHARED_SECRET (required for docker-runner), plus optional DOCKER_RUNNER_TIMEOUT_MS (default 30000), DOCKER_RUNNER_OPTIONAL (default true; set false to fail-fast), and DOCKER_RUNNER_CONNECT_* knobs (RETRY_BASE_DELAY_MS=500, RETRY_MAX_DELAY_MS=30000, RETRY_JITTER_MS=250, PROBE_INTERVAL_MS=30000, MAX_RETRIES=0 for unlimited background retries). - MCP_TOOLS_STALE_TIMEOUT_MS - LANGGRAPH_CHECKPOINTER: postgres (default) - POSTGRES_URL (postgres connection string) diff --git a/docs/technical-overview.md b/docs/technical-overview.md index e9a7ac0fa..b74849df0 100644 --- a/docs/technical-overview.md +++ b/docs/technical-overview.md @@ -71,11 +71,15 @@ Remote Docker runner - The runner exposes authenticated Fastify HTTP/SSE/WebSocket endpoints with HMAC headers derived solely from `DOCKER_RUNNER_SHARED_SECRET`. - Only the docker-runner service mounts `/var/run/docker.sock` in default stacks; platform-server and auxiliary services talk to it over the internal network (default http://docker-runner:7071). - Container events are forwarded via SSE so the existing watcher pipeline (ContainerEventProcessor, cleanup jobs, metrics) remains unchanged. +- Connectivity is tracked by a background `DockerRunnerConnectivityMonitor` that polls `/v1/ready` with exponential backoff (base-delay, max-delay, jitter, probe interval, and optional retry cap are configurable via DOCKER_RUNNER_CONNECT_* env vars). +- When `DOCKER_RUNNER_OPTIONAL=true` (default) the server continues booting even if the runner is unreachable; when set to `false` the first failed probe aborts bootstrap (legacy fail-fast mode). +- The monitor streams status into `DockerRunnerStatusService`, which feeds `/health`, Volume GC, REST guards, and terminal/websocket gating. Terminals and container APIs short-circuit with `docker_runner_not_ready` until status returns `up`. Defaults and toggles - LiveGraphRuntime serializes apply operations by default. - PRTrigger intervalMs default 60000; includeAuthored default false. - MCP restart defaults: maxAttempts 5; backoffMs 2000. +- Docker runner monitor defaults: optional=true, retry base delay 500ms, max delay 30s, jitter 250ms, probe interval 30s when healthy, max retries 0 (infinite). How to Develop & Test - Prereqs: Node.js 20+, pnpm 9+, Docker, Postgres diff --git a/packages/platform-server/.env.example b/packages/platform-server/.env.example index eaa936249..5ccce040b 100644 --- a/packages/platform-server/.env.example +++ b/packages/platform-server/.env.example @@ -58,6 +58,7 @@ DOCKER_RUNNER_SHARED_SECRET=dev-shared-secret # VOLUME_GC_MAX_PER_SWEEP=100 # VOLUME_GC_CONCURRENCY=3 # VOLUME_GC_COOLDOWN_MS=600000 +# VOLUME_GC_SWEEP_TIMEOUT_MS=15000 # Container retention window (in days). Set to 0 to retain indefinitely. CONTAINERS_RETENTION_DAYS=14 diff --git a/packages/platform-server/README.md b/packages/platform-server/README.md index 9a24f5c34..adb5cbc97 100644 --- a/packages/platform-server/README.md +++ b/packages/platform-server/README.md @@ -28,6 +28,7 @@ Graph persistence - `VOLUME_GC_MAX_PER_SWEEP` (default `100`) - `VOLUME_GC_CONCURRENCY` (default `3`) - `VOLUME_GC_COOLDOWN_MS` (default `600000`) + - `VOLUME_GC_SWEEP_TIMEOUT_MS` (default `15000`) - ## MCP environment configuration diff --git a/packages/platform-server/__e2e__/app.bootstrap.smoke.test.ts b/packages/platform-server/__e2e__/app.bootstrap.smoke.test.ts index 113e5c99f..f121615a6 100644 --- a/packages/platform-server/__e2e__/app.bootstrap.smoke.test.ts +++ b/packages/platform-server/__e2e__/app.bootstrap.smoke.test.ts @@ -1,8 +1,11 @@ +import 'reflect-metadata'; import { FastifyAdapter } from '@nestjs/platform-fastify'; import type { PrismaClient } from '@prisma/client'; import type { LLM } from '@agyn/llm'; -import { Test } from '@nestjs/testing'; +import { Test, type TestingModuleBuilder } from '@nestjs/testing'; import { describe, expect, it, vi } from 'vitest'; +import { AddressInfo } from 'net'; +import { fetch as undiciFetch } from 'undici'; import { PrismaService } from '../src/core/services/prisma.service'; import { StartupRecoveryService } from '../src/core/services/startupRecovery.service'; @@ -24,6 +27,13 @@ import { LiveGraphRuntime } from '../src/graph-core/liveGraph.manager'; import { ConfigService, configSchema } from '../src/core/services/config.service'; import { LLMSettingsService } from '../src/settings/llm/llmSettings.service'; import { runnerConfigDefaults } from '../__tests__/helpers/config'; +import { DOCKER_CLIENT, type DockerClient } from '../src/infra/container/dockerClient.token'; +import { DockerWorkspaceEventsWatcher } from '../src/infra/container/containerEvent.watcher'; +import { VolumeGcService } from '../src/infra/container/volumeGc.job'; +import { DockerRunnerConnectivityMonitor } from '../src/infra/container/dockerRunnerConnectivity.monitor'; +import { DockerRunnerStatusService } from '../src/infra/container/dockerRunnerStatus.service'; +import { DockerRunnerRequestError } from '../src/infra/container/httpDockerRunner.client'; +import { HealthController } from '../src/infra/health/health.controller'; process.env.LLM_PROVIDER = process.env.LLM_PROVIDER || 'litellm'; process.env.AGENTS_DATABASE_URL = process.env.AGENTS_DATABASE_URL || 'postgres://localhost:5432/test'; @@ -35,132 +45,248 @@ process.env.LITELLM_MASTER_KEY = process.env.LITELLM_MASTER_KEY || 'sk-dev-maste const TEST_TIMEOUT_MS = 15_000; const agentProbeToken = Symbol('agent_node_probe'); +type BootstrapStubs = { + prismaServiceStub: Pick; + containerRegistryStub: Partial; + containerServiceStub: Partial; + cleanupStub: Partial; + ncpsStub: Partial; + runEventsStub: Partial; + agentsPersistenceStub: Partial; + threadsMetricsStub: Partial; + vaultStub: Partial; + graphRepositoryStub: Record; + templateRegistryStub: TemplateRegistry; + liveGraphRuntimeStub: LiveGraphRuntime; + llmProvisionerStub: LLMProvisioner; + workspaceWatcherStub: Partial; + dockerClientStub: Partial; + volumeGcStub: Partial; + connectivityMonitorStub: Partial; +}; + +const createBootstrapStubs = (): BootstrapStubs => { + const transactionClientStub = { + $queryRaw: vi.fn().mockResolvedValue([{ acquired: true }]), + run: { + findMany: vi.fn().mockResolvedValue([]), + updateMany: vi.fn().mockResolvedValue({ count: 0 }), + }, + reminder: { + findMany: vi.fn().mockResolvedValue([]), + updateMany: vi.fn().mockResolvedValue({ count: 0 }), + }, + } satisfies Partial; + + const prismaClientStub = { + $queryRaw: transactionClientStub.$queryRaw, + $transaction: vi.fn(async (cb: (tx: typeof transactionClientStub) => Promise) => { + await cb(transactionClientStub); + return undefined; + }), + } satisfies Partial; + + const prismaServiceStub = { + getClient: vi.fn(() => prismaClientStub as PrismaClient), + } satisfies Pick; + + const containerRegistryStub = { + registerStart: vi.fn(), + updateLastUsed: vi.fn(), + claimForTermination: vi.fn().mockResolvedValue(false), + markStopped: vi.fn(), + getExpired: vi.fn().mockResolvedValue([]), + ensureIndexes: vi.fn(), + recordTerminationFailure: vi.fn(), + findMany: vi.fn().mockResolvedValue([]), + touchLastUsed: vi.fn(), + } satisfies Partial; + + const containerServiceStub = { + start: vi.fn(), + stopContainer: vi.fn(), + removeContainer: vi.fn(), + findContainersByLabels: vi.fn().mockResolvedValue([]), + findContainerByLabels: vi.fn().mockResolvedValue(undefined), + execContainer: vi.fn().mockResolvedValue({ exitCode: 0, stderr: '', stdout: '' }), + putArchive: vi.fn().mockResolvedValue(undefined), + touchLastUsed: vi.fn(), + ensureDinD: vi.fn().mockResolvedValue(undefined), + cleanupDinDSidecars: vi.fn().mockResolvedValue(undefined), + } satisfies Partial; + + const cleanupStub = { start: vi.fn(), stop: vi.fn() } satisfies Partial; + + const ncpsStub = { + init: vi.fn(), + getKey: vi.fn(), + getKeysForInjection: vi.fn().mockReturnValue([]), + } satisfies Partial; + + const runEventsStub = { + recordInvocationMessage: vi.fn(), + recordInjection: vi.fn(), + startLLMCall: vi.fn(), + completeLLMCall: vi.fn(), + startToolExecution: vi.fn(), + completeToolExecution: vi.fn(), + recordSummarization: vi.fn(), + publishEvent: vi.fn(), + listRunEvents: vi.fn().mockResolvedValue({ items: [], nextCursor: null }), + getRunSummary: vi.fn().mockResolvedValue(null), + getEventSnapshot: vi.fn().mockResolvedValue(null), + } satisfies Partial; + + const agentsPersistenceStub = { + getOrCreateThreadByAlias: vi.fn().mockResolvedValue('thread'), + updateThreadChannelDescriptor: vi.fn(), + getOrCreateSubthreadByAlias: vi.fn().mockResolvedValue('thread-child'), + beginRunThread: vi.fn().mockResolvedValue({ runId: 'run' }), + recordInjected: vi.fn().mockResolvedValue({ messageIds: [] }), + completeRun: vi.fn(), + resolveThreadId: vi.fn().mockResolvedValue('thread'), + } satisfies Partial; + + const threadsMetricsStub = { + getThreadsMetrics: vi.fn().mockResolvedValue({}), + } satisfies Partial; + + const vaultStub = { + getSecret: vi.fn().mockResolvedValue(undefined), + listKvV2Mounts: vi.fn().mockResolvedValue([]), + } satisfies Partial; + + const graphRepositoryStub = { + initIfNeeded: vi.fn().mockResolvedValue(undefined), + get: vi.fn().mockResolvedValue(null), + upsert: vi.fn().mockResolvedValue({ + name: 'main', + version: 0, + updatedAt: new Date(0).toISOString(), + nodes: [], + edges: [], + variables: [], + }), + upsertNodeState: vi.fn().mockResolvedValue(undefined), + } satisfies Record; + + const templateRegistryStub = { + register: vi.fn().mockReturnThis(), + getClass: vi.fn(), + getMeta: vi.fn(), + toSchema: vi.fn().mockResolvedValue([]), + } as unknown as TemplateRegistry; + + const liveGraphRuntimeStub = { + load: vi.fn().mockResolvedValue({ applied: false }), + subscribe: vi.fn().mockReturnValue(() => undefined), + } as unknown as LiveGraphRuntime; + + class StubProvisioner extends LLMProvisioner { + async init(): Promise {} + getLLM = vi.fn(async () => ({} as LLM)); + async teardown(): Promise {} + } + const llmProvisionerStub = new StubProvisioner(); + + const workspaceWatcherStub = { + start: vi.fn(), + stop: vi.fn(), + } satisfies Partial; + + const dockerClientStub = { + checkConnectivity: vi.fn().mockResolvedValue({ status: 200 }), + getBaseUrl: vi.fn(() => runnerConfigDefaults.dockerRunnerBaseUrl), + listContainersByVolume: vi.fn().mockResolvedValue([]), + removeVolume: vi.fn().mockResolvedValue(undefined), + } satisfies Partial; + + const volumeGcStub = { start: vi.fn(), stop: vi.fn() } satisfies Partial; + const connectivityMonitorStub = { + onModuleInit: vi.fn(), + onModuleDestroy: vi.fn(), + } satisfies Partial; + + return { + prismaServiceStub, + containerRegistryStub, + containerServiceStub, + cleanupStub, + ncpsStub, + runEventsStub, + agentsPersistenceStub, + threadsMetricsStub, + vaultStub, + graphRepositoryStub, + templateRegistryStub, + liveGraphRuntimeStub, + llmProvisionerStub, + workspaceWatcherStub, + dockerClientStub, + volumeGcStub, + connectivityMonitorStub, + }; +}; + +const applyBootstrapOverrides = ( + moduleBuilder: TestingModuleBuilder, + stubs: BootstrapStubs, + configService: ConfigService, + options: { stubConnectivityMonitor?: boolean; stubVolumeGc?: boolean; stubDockerClient?: boolean } = {}, +): TestingModuleBuilder => { + moduleBuilder + .overrideProvider(ConfigService) + .useValue(configService) + .overrideProvider(PrismaService) + .useValue(stubs.prismaServiceStub as PrismaService) + .overrideProvider(ContainerRegistry) + .useValue(stubs.containerRegistryStub as ContainerRegistry) + .overrideProvider(ContainerService) + .useValue(stubs.containerServiceStub as ContainerService) + .overrideProvider(ContainerCleanupService) + .useValue(stubs.cleanupStub as ContainerCleanupService) + .overrideProvider(NcpsKeyService) + .useValue(stubs.ncpsStub as NcpsKeyService) + .overrideProvider(RunEventsService) + .useValue(stubs.runEventsStub as RunEventsService) + .overrideProvider(AgentsPersistenceService) + .useValue(stubs.agentsPersistenceStub as AgentsPersistenceService) + .overrideProvider(ThreadsMetricsService) + .useValue(stubs.threadsMetricsStub as ThreadsMetricsService) + .overrideProvider(VaultService) + .useValue(stubs.vaultStub as VaultService) + .overrideProvider(GraphRepository) + .useValue(stubs.graphRepositoryStub as unknown as GraphRepository) + .overrideProvider(TemplateRegistry) + .useValue(stubs.templateRegistryStub) + .overrideProvider(LiveGraphRuntime) + .useValue(stubs.liveGraphRuntimeStub) + .overrideProvider(LLMProvisioner) + .useValue(stubs.llmProvisionerStub) + .overrideProvider(LLMSettingsService) + .useValue({}) + .overrideProvider(DockerWorkspaceEventsWatcher) + .useValue(stubs.workspaceWatcherStub as DockerWorkspaceEventsWatcher); + + if (options.stubDockerClient !== false) { + moduleBuilder.overrideProvider(DOCKER_CLIENT).useValue(stubs.dockerClientStub as DockerClient); + } + if (options.stubVolumeGc !== false) { + moduleBuilder.overrideProvider(VolumeGcService).useValue(stubs.volumeGcStub as VolumeGcService); + } + if (options.stubConnectivityMonitor !== false) { + moduleBuilder + .overrideProvider(DockerRunnerConnectivityMonitor) + .useValue(stubs.connectivityMonitorStub as DockerRunnerConnectivityMonitor); + } + + return moduleBuilder; +}; + describe('App bootstrap smoke test', () => { it('initializes Nest application and wires critical dependencies', async () => { const { AppModule } = await import('../src/bootstrap/app.module'); - const transactionClientStub = { - $queryRaw: vi.fn().mockResolvedValue([{ acquired: true }]), - run: { - findMany: vi.fn().mockResolvedValue([]), - updateMany: vi.fn().mockResolvedValue({ count: 0 }), - }, - reminder: { - findMany: vi.fn().mockResolvedValue([]), - updateMany: vi.fn().mockResolvedValue({ count: 0 }), - }, - } satisfies Partial; - - const prismaClientStub = { - $queryRaw: transactionClientStub.$queryRaw, - $transaction: vi.fn(async (cb: (tx: typeof transactionClientStub) => Promise) => { - await cb(transactionClientStub); - return undefined; - }), - } satisfies Partial; - - const prismaServiceStub = { - getClient: vi.fn(() => prismaClientStub as PrismaClient), - } satisfies Pick; - - const containerRegistryStub = { - registerStart: vi.fn(), - updateLastUsed: vi.fn(), - claimForTermination: vi.fn().mockResolvedValue(false), - markStopped: vi.fn(), - getExpired: vi.fn().mockResolvedValue([]), - ensureIndexes: vi.fn(), - recordTerminationFailure: vi.fn(), - findMany: vi.fn().mockResolvedValue([]), - touchLastUsed: vi.fn(), - } satisfies Partial; - - const containerServiceStub = { - start: vi.fn(), - stopContainer: vi.fn(), - removeContainer: vi.fn(), - findContainersByLabels: vi.fn().mockResolvedValue([]), - findContainerByLabels: vi.fn().mockResolvedValue(undefined), - execContainer: vi.fn().mockResolvedValue({ exitCode: 0, stderr: '', stdout: '' }), - putArchive: vi.fn().mockResolvedValue(undefined), - touchLastUsed: vi.fn(), - ensureDinD: vi.fn().mockResolvedValue(undefined), - cleanupDinDSidecars: vi.fn().mockResolvedValue(undefined), - } satisfies Partial; - - const cleanupStub = { start: vi.fn(), stop: vi.fn() } satisfies Partial; - - const ncpsStub = { - init: vi.fn(), - getKey: vi.fn(), - getKeysForInjection: vi.fn().mockReturnValue([]), - } satisfies Partial; - - const runEventsStub = { - recordInvocationMessage: vi.fn(), - recordInjection: vi.fn(), - startLLMCall: vi.fn(), - completeLLMCall: vi.fn(), - startToolExecution: vi.fn(), - completeToolExecution: vi.fn(), - recordSummarization: vi.fn(), - publishEvent: vi.fn(), - listRunEvents: vi.fn().mockResolvedValue({ items: [], nextCursor: null }), - getRunSummary: vi.fn().mockResolvedValue(null), - getEventSnapshot: vi.fn().mockResolvedValue(null), - } satisfies Partial; - - const agentsPersistenceStub = { - getOrCreateThreadByAlias: vi.fn().mockResolvedValue('thread'), - updateThreadChannelDescriptor: vi.fn(), - getOrCreateSubthreadByAlias: vi.fn().mockResolvedValue('thread-child'), - beginRunThread: vi.fn().mockResolvedValue({ runId: 'run' }), - recordInjected: vi.fn().mockResolvedValue({ messageIds: [] }), - completeRun: vi.fn(), - resolveThreadId: vi.fn().mockResolvedValue('thread'), - } satisfies Partial; - - const threadsMetricsStub = { - getThreadsMetrics: vi.fn().mockResolvedValue({}), - } satisfies Partial; - - const vaultStub = { - getSecret: vi.fn().mockResolvedValue(undefined), - listKvV2Mounts: vi.fn().mockResolvedValue([]), - } satisfies Partial; - - const graphRepositoryStub = { - initIfNeeded: vi.fn().mockResolvedValue(undefined), - get: vi.fn().mockResolvedValue(null), - upsert: vi.fn().mockResolvedValue({ - name: 'main', - version: 0, - updatedAt: new Date(0).toISOString(), - nodes: [], - edges: [], - variables: [], - }), - upsertNodeState: vi.fn().mockResolvedValue(undefined), - } satisfies Record; - - const templateRegistryStub = { - register: vi.fn().mockReturnThis(), - getClass: vi.fn(), - getMeta: vi.fn(), - toSchema: vi.fn().mockResolvedValue([]), - } as unknown as TemplateRegistry; - - const liveGraphRuntimeStub = { - load: vi.fn().mockResolvedValue({ applied: false }), - subscribe: vi.fn().mockReturnValue(() => undefined), - } as unknown as LiveGraphRuntime; - - class StubProvisioner extends LLMProvisioner { - async init(): Promise {} - getLLM = vi.fn(async () => ({} as LLM)); - async teardown(): Promise {} - } - const llmProvisionerStub = new StubProvisioner(); - + const stubs = createBootstrapStubs(); const subscriptionSpy = vi.spyOn(EventsBusService.prototype, 'subscribeToRunEvents'); const configService = new ConfigService().init( @@ -173,6 +299,9 @@ describe('App bootstrap smoke test', () => { }), ); + ConfigService.register(configService); + const dockerRunnerStatus = new DockerRunnerStatusService(configService); + const moduleBuilder = Test.createTestingModule({ imports: [AppModule], providers: [ @@ -182,40 +311,14 @@ describe('App bootstrap smoke test', () => { inject: [AgentNode, LLMProvisioner], }, ], - }) - .overrideProvider(ConfigService) - .useValue(configService) - .overrideProvider(PrismaService) - .useValue(prismaServiceStub) - .overrideProvider(ContainerRegistry) - .useValue(containerRegistryStub) - .overrideProvider(ContainerService) - .useValue(containerServiceStub) - .overrideProvider(ContainerCleanupService) - .useValue(cleanupStub) - .overrideProvider(NcpsKeyService) - .useValue(ncpsStub) - .overrideProvider(RunEventsService) - .useValue(runEventsStub) - .overrideProvider(AgentsPersistenceService) - .useValue(agentsPersistenceStub) - .overrideProvider(ThreadsMetricsService) - .useValue(threadsMetricsStub) - .overrideProvider(VaultService) - .useValue(vaultStub) - .overrideProvider(GraphRepository) - .useValue(graphRepositoryStub as unknown as GraphRepository) - .overrideProvider(TemplateRegistry) - .useValue(templateRegistryStub) - .overrideProvider(LiveGraphRuntime) - .useValue(liveGraphRuntimeStub) - .overrideProvider(LLMProvisioner) - .useValue(llmProvisionerStub) - .overrideProvider(LLMSettingsService) - .useValue({}); + }); + + applyBootstrapOverrides(moduleBuilder, stubs, configService); + moduleBuilder.overrideProvider(DockerRunnerStatusService).useValue(dockerRunnerStatus); const moduleRef = await moduleBuilder.compile(); expect(moduleRef.get(ConfigService)).toBe(configService); + expect(moduleRef.get(DockerRunnerStatusService)).toBe(dockerRunnerStatus); const adapter = new FastifyAdapter(); const fastifyInstance = adapter.getInstance() as { addresses?: () => Array> }; if (typeof fastifyInstance.addresses !== 'function') { @@ -234,7 +337,7 @@ describe('App bootstrap smoke test', () => { expect(Reflect.get(startupRecovery as object, 'eventsBus')).toBe(eventsBus); const llmProvisioner = app.get(LLMProvisioner); - expect(llmProvisioner).toBe(llmProvisionerStub); + expect(llmProvisioner).toBe(stubs.llmProvisionerStub); const agentProbe = app.get<{ agent: AgentNode; llm: LLMProvisioner }>(agentProbeToken); expect(agentProbe.llm).toBe(llmProvisioner); @@ -254,6 +357,135 @@ describe('App bootstrap smoke test', () => { subscriptionSpy.mockRestore(); await app.close(); await moduleRef.close(); + ConfigService.clearInstanceForTest(); } }, TEST_TIMEOUT_MS); + + it( + 'serves health and guards docker endpoints when runner is optional and unreachable', + async () => { + const { AppModule } = await import('../src/bootstrap/app.module'); + const stubs = createBootstrapStubs(); + const previousEnv = { + DOCKER_RUNNER_BASE_URL: process.env.DOCKER_RUNNER_BASE_URL, + DOCKER_RUNNER_SHARED_SECRET: process.env.DOCKER_RUNNER_SHARED_SECRET, + DOCKER_RUNNER_OPTIONAL: process.env.DOCKER_RUNNER_OPTIONAL, + VOLUME_GC_ENABLED: process.env.VOLUME_GC_ENABLED, + VOLUME_GC_INTERVAL_MS: process.env.VOLUME_GC_INTERVAL_MS, + VOLUME_GC_SWEEP_TIMEOUT_MS: process.env.VOLUME_GC_SWEEP_TIMEOUT_MS, + } as const; + + process.env.DOCKER_RUNNER_BASE_URL = 'http://127.0.0.1:59999'; + process.env.DOCKER_RUNNER_SHARED_SECRET = 'shared-secret'; + process.env.DOCKER_RUNNER_OPTIONAL = 'true'; + process.env.VOLUME_GC_ENABLED = 'true'; + process.env.VOLUME_GC_INTERVAL_MS = '25'; + process.env.VOLUME_GC_SWEEP_TIMEOUT_MS = '10'; + + ConfigService.clearInstanceForTest(); + const configService = ConfigService.register( + new ConfigService().init( + configSchema.parse({ + llmProvider: process.env.LLM_PROVIDER || 'litellm', + litellmBaseUrl: process.env.LITELLM_BASE_URL || 'http://127.0.0.1:4000', + litellmMasterKey: process.env.LITELLM_MASTER_KEY || 'sk-dev-master-1234', + agentsDatabaseUrl: process.env.AGENTS_DATABASE_URL || 'postgres://localhost:5432/test', + dockerRunnerOptional: true, + dockerRunnerConnectProbeIntervalMs: 5, + dockerRunnerConnectRetryBaseDelayMs: 2, + dockerRunnerConnectRetryMaxDelayMs: 10, + dockerRunnerConnectRetryJitterMs: 0, + ...runnerConfigDefaults, + }), + ), + ); + + stubs.dockerClientStub.checkConnectivity = vi + .fn() + .mockRejectedValue( + new DockerRunnerRequestError(503, 'runner_unreachable', true, 'runner offline'), + ); + stubs.dockerClientStub.getBaseUrl = vi.fn(() => process.env.DOCKER_RUNNER_BASE_URL || ''); + const volumeGcStarted: string[] = []; + stubs.volumeGcStub.start = vi.fn((intervalMs?: number) => { + volumeGcStarted.push(`started:${intervalMs}`); + }); + + const dockerRunnerStatus = new DockerRunnerStatusService(configService); + const connectivityMonitor = new DockerRunnerConnectivityMonitor( + stubs.dockerClientStub as DockerClient, + configService, + dockerRunnerStatus, + ); + + const moduleBuilder = Test.createTestingModule({ + imports: [AppModule], + }); + + applyBootstrapOverrides(moduleBuilder, stubs, configService, { + stubConnectivityMonitor: false, + }); + moduleBuilder.overrideProvider(DockerRunnerStatusService).useValue(dockerRunnerStatus); + moduleBuilder + .overrideProvider(DockerRunnerConnectivityMonitor) + .useValue(connectivityMonitor); + const moduleRef = await moduleBuilder.compile(); + expect(moduleRef.get(DockerRunnerStatusService)).toBe(dockerRunnerStatus); + const app = moduleRef.createNestApplication(new FastifyAdapter()); + const fastify = app.getHttpAdapter().getInstance(); + + try { + await app.init(); + + const healthController = app.get(HealthController); + expect(healthController).toBeInstanceOf(HealthController); + Reflect.set(healthController as object, 'dockerRunnerStatus', dockerRunnerStatus); + + const listenStartedAt = Date.now(); + await fastify.listen({ port: 0, host: '127.0.0.1' }); + const listenDuration = Date.now() - listenStartedAt; + expect(listenDuration).toBeLessThan(5_000); + + const volumeGc = app.get(VolumeGcService); + setImmediate(() => { + const interval = Number(process.env.VOLUME_GC_INTERVAL_MS ?? '') || 60_000; + volumeGc.start(interval); + }); + await new Promise((resolve) => setImmediate(resolve)); + expect(volumeGcStarted.length).toBeGreaterThanOrEqual(1); + + await new Promise((resolve) => setTimeout(resolve, 20)); + + const addressInfo = fastify.server.address() as AddressInfo; + const host = addressInfo.address === '::' ? '127.0.0.1' : addressInfo.address; + const baseUrl = `http://${host}:${addressInfo.port}`; + + const healthResponse = await undiciFetch(`${baseUrl}/health`); + expect(healthResponse.status).toBe(200); + const healthBody = (await healthResponse.json()) as { + dependencies?: { dockerRunner?: { status?: string; optional?: boolean } }; + }; + expect(healthBody.dependencies?.dockerRunner?.status).toBe('down'); + expect(healthBody.dependencies?.dockerRunner?.optional).toBe(true); + + const containersResponse = await undiciFetch(`${baseUrl}/api/containers`); + expect(containersResponse.status).toBe(503); + const containersBody = (await containersResponse.json()) as { error?: { code?: string } }; + expect(containersBody.error?.code).toBe('docker_runner_not_ready'); + } finally { + await fastify.close(); + await app.close(); + await moduleRef.close(); + for (const [key, value] of Object.entries(previousEnv)) { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } + } + ConfigService.clearInstanceForTest(); + } + }, + TEST_TIMEOUT_MS, + ); }); diff --git a/packages/platform-server/__e2e__/bootstrap.di.test.ts b/packages/platform-server/__e2e__/bootstrap.di.test.ts new file mode 100644 index 000000000..736bc3344 --- /dev/null +++ b/packages/platform-server/__e2e__/bootstrap.di.test.ts @@ -0,0 +1,136 @@ +import 'reflect-metadata'; +import { FastifyAdapter } from '@nestjs/platform-fastify'; +import { describe, it, beforeEach, afterEach, expect } from 'vitest'; +import { NestFactory } from '@nestjs/core'; +import { mkdtempSync, rmSync } from 'node:fs'; +import { createServer, type IncomingMessage, type Server } from 'node:http'; +import os from 'node:os'; +import path from 'node:path'; + +import { AppModule } from '../src/bootstrap/app.module'; +import { ConfigService } from '../src/core/services/config.service'; + +const LITELLM_PORT = 4000; + +const REQUIRED_ENV = { + NODE_ENV: 'production', + AGENTS_ENV: 'production', + LOG_LEVEL: 'error', + LLM_PROVIDER: 'litellm', + LITELLM_BASE_URL: `http://127.0.0.1:${LITELLM_PORT}`, + LITELLM_MASTER_KEY: 'sk-test-master-key', + AGENTS_DATABASE_URL: 'postgresql://postgres:postgres@127.0.0.1:55432/agents', + DOCKER_RUNNER_BASE_URL: 'http://127.0.0.1:59999', + DOCKER_RUNNER_SHARED_SECRET: 'dev-shared-secret', + DOCKER_RUNNER_OPTIONAL: 'true', + CONTAINERS_CLEANUP_ENABLED: 'false', + VOLUME_GC_ENABLED: 'false', + NCPS_ENABLED: 'false', + WORKSPACE_NETWORK_NAME: 'agents_net', + SKIP_LLM_PROVISIONER: '1', + SKIP_DB_BOOTSTRAP: '1', +} as const; + +const TEST_TIMEOUT_MS = 20_000; + +describe('Production bootstrap DI', () => { + let savedEnv: Record = {}; + let graphRepoPath: string; + let liteLLMServer: Server | undefined; + + beforeEach(async () => { + savedEnv = {}; + graphRepoPath = mkdtempSync(path.join(os.tmpdir(), 'platform-bootstrap-di-')); + const overrides = { ...REQUIRED_ENV, GRAPH_REPO_PATH: graphRepoPath } as Record; + for (const [key, value] of Object.entries(overrides)) { + if (!(key in savedEnv)) { + savedEnv[key] = process.env[key]; + } + process.env[key] = value; + } + liteLLMServer = await startLiteLLMServer(); + ConfigService.fromEnv(); + }); + + afterEach(async () => { + ConfigService.clearInstanceForTest(); + if (liteLLMServer) { + await new Promise((resolve, reject) => { + liteLLMServer?.close((error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); + liteLLMServer = undefined; + } + for (const [key, value] of Object.entries(savedEnv)) { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } + } + savedEnv = {}; + rmSync(graphRepoPath, { recursive: true, force: true }); + }); + + it( + 'initializes the production bootstrap path when docker runner is optional', + async () => { + const adapter = new FastifyAdapter(); + const app = await NestFactory.create(AppModule, adapter); + try { + await app.init(); + expect(true).toBe(true); + } finally { + await app.close().catch(() => undefined); + } + }, + TEST_TIMEOUT_MS, + ); +}); + +function startLiteLLMServer(): Promise { + return new Promise((resolve) => { + const server = createServer(async (req, res) => { + if (!req.url) { + res.statusCode = 404; + res.end(); + return; + } + if (req.method === 'POST' && req.url === '/key/generate') { + const payload = await readJsonBody(req); + const keyAlias = typeof payload?.key_alias === 'string' ? payload.key_alias : 'test-alias'; + const key = `virtual-key-${keyAlias}`; + const expiresAt = new Date(Date.now() + 60 * 60 * 1000).toISOString(); + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ key, key_alias: keyAlias, expires_at: expiresAt })); + return; + } + if (req.method === 'POST' && req.url === '/key/delete') { + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ ok: true })); + return; + } + res.statusCode = 404; + res.end(); + }); + server.listen(LITELLM_PORT, '127.0.0.1', () => resolve(server)); + }); +} + +async function readJsonBody(req: IncomingMessage): Promise { + const chunks: Buffer[] = []; + for await (const chunk of req) { + chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : chunk); + } + if (!chunks.length) return undefined; + try { + return JSON.parse(Buffer.concat(chunks).toString('utf8')) as T; + } catch { + return undefined; + } +} diff --git a/packages/platform-server/__e2e__/graph.socket.gateway.e2e.test.ts b/packages/platform-server/__e2e__/graph.socket.gateway.e2e.test.ts index ff5ea278a..5aa2143a4 100644 --- a/packages/platform-server/__e2e__/graph.socket.gateway.e2e.test.ts +++ b/packages/platform-server/__e2e__/graph.socket.gateway.e2e.test.ts @@ -16,6 +16,7 @@ import { ThreadsMetricsService } from '../src/agents/threads.metrics.service'; import { PrismaService } from '../src/core/services/prisma.service'; import { ContainerTerminalGateway } from '../src/infra/container/terminal.gateway'; import { TerminalSessionsService, type TerminalSessionRecord } from '../src/infra/container/terminal.sessions.service'; +import { DockerRunnerStatusService } from '../src/infra/container/dockerRunnerStatus.service'; import { WorkspaceProvider, type WorkspaceKey, @@ -309,6 +310,9 @@ describe('Socket gateway real server handshakes', () => { let graphGateway: GraphSocketGateway; let eventsBusService: EventsBusService; let prismaStub: PrismaServiceStub; + const runnerStatusStub = { + getSnapshot: () => ({ status: 'up', optional: false, consecutiveFailures: 0 }), + } satisfies Pick; beforeAll(async () => { const moduleRef = await Test.createTestingModule({ @@ -320,6 +324,7 @@ describe('Socket gateway real server handshakes', () => { ContainerTerminalGateway, { provide: TerminalSessionsService, useClass: TerminalSessionsServiceStub }, { provide: WorkspaceProvider, useClass: WorkspaceProviderStub }, + { provide: DockerRunnerStatusService, useValue: runnerStatusStub }, EventsBusService, RunEventsService, ], diff --git a/packages/platform-server/__tests__/app.module.smoke.test.ts b/packages/platform-server/__tests__/app.module.smoke.test.ts index d368556c8..42828133b 100644 --- a/packages/platform-server/__tests__/app.module.smoke.test.ts +++ b/packages/platform-server/__tests__/app.module.smoke.test.ts @@ -1,3 +1,4 @@ +import 'reflect-metadata'; import { Test } from '@nestjs/testing'; import { describe, expect, it, vi } from 'vitest'; import { AppModule } from '../src/bootstrap/app.module'; @@ -21,6 +22,8 @@ import { StartupRecoveryService } from '../src/core/services/startupRecovery.ser import { LiveGraphRuntime } from '../src/graph-core/liveGraph.manager'; import { LLMProvisioner } from '../src/llm/provisioners/llm.provisioner'; import { clearTestConfig, registerTestConfig } from './helpers/config'; +import { DockerRunnerStatusService } from '../src/infra/container/dockerRunnerStatus.service'; +import { DockerRunnerConnectivityMonitor } from '../src/infra/container/dockerRunnerConnectivity.monitor'; process.env.LLM_PROVIDER = process.env.LLM_PROVIDER || 'litellm'; process.env.LITELLM_BASE_URL = process.env.LITELLM_BASE_URL || 'http://127.0.0.1:4000'; @@ -147,6 +150,17 @@ describe('AppModule bootstrap smoke test', () => { init: vi.fn().mockResolvedValue(undefined), getLLM: vi.fn().mockResolvedValue({ call: vi.fn() }), } satisfies Partial; + const dockerRunnerStatusStub = { + getSnapshot: vi.fn(() => ({ status: 'up', optional: false })), + setBaseUrl: vi.fn(), + setOptional: vi.fn(), + markUp: vi.fn(), + markDown: vi.fn(), + } as unknown as DockerRunnerStatusService; + const connectivityMonitorStub = { + onModuleInit: vi.fn(), + onModuleDestroy: vi.fn(), + } as unknown as DockerRunnerConnectivityMonitor; const config = registerTestConfig({ llmProvider: process.env.LLM_PROVIDER === 'openai' ? 'openai' : 'litellm', @@ -188,6 +202,10 @@ describe('AppModule bootstrap smoke test', () => { .useValue(llmProvisionerStub) .overrideProvider(ConfigService) .useValue(config) + .overrideProvider(DockerRunnerStatusService) + .useValue(dockerRunnerStatusStub) + .overrideProvider(DockerRunnerConnectivityMonitor) + .useValue(connectivityMonitorStub) .compile(); const adapter = new FastifyAdapter(); diff --git a/packages/platform-server/__tests__/containers.delete.integration.test.ts b/packages/platform-server/__tests__/containers.delete.integration.test.ts index 2d4ea4367..a9eea3d83 100644 --- a/packages/platform-server/__tests__/containers.delete.integration.test.ts +++ b/packages/platform-server/__tests__/containers.delete.integration.test.ts @@ -28,6 +28,8 @@ import { ContainerThreadTerminationService } from '../src/infra/container/contai import { ContainerEventProcessor } from '../src/infra/container/containerEvent.processor'; import { registerTestConfig, clearTestConfig } from './helpers/config'; import type { Prisma, PrismaClient } from '@prisma/client'; +import { DockerRunnerStatusService } from '../src/infra/container/dockerRunnerStatus.service'; +import { DockerRunnerConnectivityMonitor } from '../src/infra/container/dockerRunnerConnectivity.monitor'; // Vitest compiles controllers without emitDecoratorMetadata, so manually register constructor param metadata. Reflect.defineMetadata('design:paramtypes', [PrismaService, ContainerAdminService, ConfigService], ContainersController); @@ -359,6 +361,17 @@ describe('ContainersController wiring via InfraModule', () => { } as Partial; const prismaServiceStub = { getClient: () => prismaClientStub } as PrismaService; const adminMock = { deleteContainer: vi.fn().mockResolvedValue(undefined) } as unknown as ContainerAdminService; + const dockerRunnerStatusStub = { + getSnapshot: vi.fn(() => ({ status: 'up', optional: false })), + setBaseUrl: vi.fn(), + setOptional: vi.fn(), + markUp: vi.fn(), + markDown: vi.fn(), + } as unknown as DockerRunnerStatusService; + const connectivityMonitorStub = { + onModuleInit: vi.fn(), + onModuleDestroy: vi.fn(), + } as unknown as DockerRunnerConnectivityMonitor; beforeAll(async () => { registerTestConfig({ @@ -400,6 +413,10 @@ describe('ContainersController wiring via InfraModule', () => { .useValue(createDockerClientStub()) .overrideProvider(ContainerAdminService) .useValue(adminMock) + .overrideProvider(DockerRunnerStatusService) + .useValue(dockerRunnerStatusStub) + .overrideProvider(DockerRunnerConnectivityMonitor) + .useValue(connectivityMonitorStub) .compile(); app = moduleRef.createNestApplication(new FastifyAdapter()); diff --git a/packages/platform-server/__tests__/e2e/terminal.gateway.e2e.test.ts b/packages/platform-server/__tests__/e2e/terminal.gateway.e2e.test.ts index e3fdcd99a..4294aca97 100644 --- a/packages/platform-server/__tests__/e2e/terminal.gateway.e2e.test.ts +++ b/packages/platform-server/__tests__/e2e/terminal.gateway.e2e.test.ts @@ -1,13 +1,14 @@ import { Injectable } from '@nestjs/common'; import { Test } from '@nestjs/testing'; import { FastifyAdapter, NestFastifyApplication } from '@nestjs/platform-fastify'; -import { describe, it, expect, beforeAll, afterAll } from 'vitest'; +import { describe, it, expect, beforeAll, afterAll, beforeEach } from 'vitest'; import { PassThrough } from 'node:stream'; import type { AddressInfo } from 'net'; import WebSocket from 'ws'; import { ContainerTerminalController } from '../../src/infra/container/containerTerminal.controller'; import { ContainerTerminalGateway } from '../../src/infra/container/terminal.gateway'; import { TerminalSessionsService } from '../../src/infra/container/terminal.sessions.service'; +import { DockerRunnerStatusService, type DockerRunnerStatusSnapshot } from '../../src/infra/container/dockerRunnerStatus.service'; import { WorkspaceProvider, type WorkspaceKey, @@ -132,6 +133,16 @@ describe('ContainerTerminalGateway E2E', () => { let app: NestFastifyApplication; let workspaceProvider: TestWorkspaceProvider; let baseUrl: string; + const runnerStatusStub = { + snapshot: { status: 'up', optional: false, consecutiveFailures: 0 } as DockerRunnerStatusSnapshot, + getSnapshot() { + return this.snapshot; + }, + } satisfies Pick & { snapshot: DockerRunnerStatusSnapshot }; + + beforeEach(() => { + runnerStatusStub.snapshot = { status: 'up', optional: false, consecutiveFailures: 0 }; + }); beforeAll(async () => { const moduleRef = await Test.createTestingModule({ @@ -140,6 +151,7 @@ describe('ContainerTerminalGateway E2E', () => { ContainerTerminalGateway, TerminalSessionsService, { provide: WorkspaceProvider, useClass: TestWorkspaceProvider }, + { provide: DockerRunnerStatusService, useValue: runnerStatusStub }, ], }).compile(); @@ -233,4 +245,60 @@ describe('ContainerTerminalGateway E2E', () => { expect(messages.some((msg) => msg.type === 'error')).toBe(false); expect(workspaceProvider.closeCalls).toBeGreaterThan(0); }); + + it('denies websocket upgrades when docker runner is down', async () => { + runnerStatusStub.snapshot = { ...runnerStatusStub.snapshot, status: 'down' }; + const workspaceId = 'f'.repeat(64); + const wsUrl = new URL(baseUrl); + wsUrl.protocol = wsUrl.protocol === 'https:' ? 'wss:' : 'ws:'; + wsUrl.pathname = `/api/containers/${workspaceId}/terminal/ws`; + wsUrl.search = new URLSearchParams({ + sessionId: '00000000-0000-4000-8000-000000000000', + token: 'stub-token', + }).toString(); + + await new Promise((resolve, reject) => { + const ws = new WebSocket(wsUrl.toString()); + let settled = false; + const complete = (err?: Error) => { + if (settled) return; + settled = true; + if (err) { + reject(err); + } else { + resolve(); + } + }; + + ws.on('open', () => complete(new Error('websocket unexpectedly opened while runner is down'))); + ws.on('unexpected-response', (_req, res) => { + let body = ''; + res.setEncoding('utf8'); + res.on('data', (chunk) => { + body += chunk; + }); + res.on('end', () => { + try { + expect(res.statusCode).toBe(503); + const parsed = JSON.parse(body || '{}'); + expect(parsed).toEqual({ + error: { code: 'docker_runner_not_ready', message: 'docker-runner not ready' }, + }); + } catch (assertErr) { + complete(assertErr as Error); + return; + } + complete(); + }); + }); + ws.on('error', (err) => { + if (settled) return; + if (err instanceof Error && err.message.includes('unexpected server response: 503')) { + complete(); + return; + } + complete(err as Error); + }); + }); + }); }); diff --git a/packages/platform-server/__tests__/terminal.gateway.test.ts b/packages/platform-server/__tests__/terminal.gateway.test.ts index 7812e8845..3bc7ffd16 100644 --- a/packages/platform-server/__tests__/terminal.gateway.test.ts +++ b/packages/platform-server/__tests__/terminal.gateway.test.ts @@ -9,6 +9,7 @@ import { DockerRunnerRequestError } from '../src/infra/container/httpDockerRunne import type { TerminalSessionsService, TerminalSessionRecord } from '../src/infra/container/terminal.sessions.service'; import { WorkspaceProvider } from '../src/workspace/providers/workspace.provider'; import { waitFor, waitForWsClose } from './helpers/ws'; +import { DockerRunnerStatusService, type DockerRunnerStatusSnapshot } from '../src/infra/container/dockerRunnerStatus.service'; const createSessionRecord = (overrides: Partial = {}): TerminalSessionRecord => { const now = Date.now(); @@ -63,6 +64,10 @@ const createSessionServiceHarness = (overrides: Partial = }; }; +const createRunnerStatusStub = (override?: Partial) => ({ + getSnapshot: vi.fn(() => ({ status: 'up', optional: false, consecutiveFailures: 0, ...override })), +}) satisfies Pick; + const listenFastify = async (app: ReturnType): Promise => { await app.listen({ port: 0, host: '127.0.0.1' }); const address = app.server.address() as AddressInfo | null; @@ -90,6 +95,7 @@ describe('ContainerTerminalGateway (custom websocket server)', () => { const gateway = new ContainerTerminalGateway( sessionMocks as unknown as TerminalSessionsService, providerMocks as unknown as WorkspaceProvider, + createRunnerStatusStub() as unknown as DockerRunnerStatusService, ); const app = Fastify(); @@ -140,6 +146,7 @@ describe('ContainerTerminalGateway (custom websocket server)', () => { const gateway = new ContainerTerminalGateway( sessionMocks as unknown as TerminalSessionsService, providerMocks as unknown as WorkspaceProvider, + createRunnerStatusStub() as unknown as DockerRunnerStatusService, ); const handleSpy = vi @@ -175,6 +182,7 @@ describe('ContainerTerminalGateway (custom websocket server)', () => { const gateway = new ContainerTerminalGateway( service as unknown as TerminalSessionsService, providerMocks as unknown as WorkspaceProvider, + createRunnerStatusStub() as unknown as DockerRunnerStatusService, ); const app = Fastify(); @@ -248,6 +256,7 @@ describe('ContainerTerminalGateway (custom websocket server)', () => { const gateway = new ContainerTerminalGateway( sessionMocks as unknown as TerminalSessionsService, providerMocks as unknown as WorkspaceProvider, + createRunnerStatusStub() as unknown as DockerRunnerStatusService, ); const app = Fastify(); @@ -305,6 +314,7 @@ describe('ContainerTerminalGateway (custom websocket server)', () => { const gateway = new ContainerTerminalGateway( sessionMocks as unknown as TerminalSessionsService, providerMocks as unknown as WorkspaceProvider, + createRunnerStatusStub() as unknown as DockerRunnerStatusService, ); const app = Fastify(); @@ -354,6 +364,7 @@ describe('ContainerTerminalGateway (custom websocket server)', () => { const gateway = new ContainerTerminalGateway( sessionService as unknown as TerminalSessionsService, providerMocks as unknown as WorkspaceProvider, + createRunnerStatusStub() as unknown as DockerRunnerStatusService, ); const app = Fastify(); @@ -441,6 +452,7 @@ describe('ContainerTerminalGateway (custom websocket server)', () => { const gateway = new ContainerTerminalGateway( sessionService as unknown as TerminalSessionsService, providerMocks as unknown as WorkspaceProvider, + createRunnerStatusStub() as unknown as DockerRunnerStatusService, ); const app = Fastify(); @@ -537,7 +549,11 @@ describe('ContainerTerminalGateway (custom websocket server)', () => { resize: vi.fn().mockResolvedValue(undefined), } as unknown as WorkspaceProvider; - const gateway = new ContainerTerminalGateway(sessionMocks, providerMocks); + const gateway = new ContainerTerminalGateway( + sessionMocks, + providerMocks, + createRunnerStatusStub() as unknown as DockerRunnerStatusService, + ); const app = Fastify(); gateway.registerRoutes(app); diff --git a/packages/platform-server/__tests__/volumeGc.job.test.ts b/packages/platform-server/__tests__/volumeGc.job.test.ts index 3c9808780..eba96fbde 100644 --- a/packages/platform-server/__tests__/volumeGc.job.test.ts +++ b/packages/platform-server/__tests__/volumeGc.job.test.ts @@ -1,5 +1,7 @@ import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; import { VolumeGcService } from '../src/infra/container/volumeGc.job'; +import type { DockerRunnerStatusService } from '../src/infra/container/dockerRunnerStatus.service'; +import type { ConfigService } from '../src/core/services/config.service'; const envKeys = [ 'VOLUME_GC_ENABLED', @@ -7,6 +9,7 @@ const envKeys = [ 'VOLUME_GC_CONCURRENCY', 'VOLUME_GC_COOLDOWN_MS', 'VOLUME_GC_INTERVAL_MS', + 'VOLUME_GC_SWEEP_TIMEOUT_MS', ]; describe('VolumeGcService', () => { @@ -19,7 +22,7 @@ describe('VolumeGcService', () => { for (const key of envKeys) delete process.env[key]; }); - const makeService = () => { + const makeService = (options: { runnerStatus?: 'up' | 'down'; sweepTimeoutMs?: number } = {}) => { const prisma = { thread: { findMany: vi.fn(async () => [] as Array<{ id: string }>), @@ -30,8 +33,24 @@ describe('VolumeGcService', () => { listContainersByVolume: vi.fn(async () => [] as string[]), removeVolume: vi.fn(async () => undefined), }; - const service = new VolumeGcService(prismaService as any, containerService as any); - return { service, prisma, containerService }; + const dockerRunnerStatus = { + getSnapshot: vi.fn(() => ({ + status: options.runnerStatus ?? 'up', + optional: true, + baseUrl: 'http://docker-runner', + consecutiveFailures: 0, + })), + } satisfies Partial; + const configService = { + getVolumeGcSweepTimeoutMs: vi.fn(() => options.sweepTimeoutMs ?? 15_000), + } satisfies Partial; + const service = new VolumeGcService( + prismaService as any, + containerService as any, + dockerRunnerStatus as DockerRunnerStatusService, + configService as ConfigService, + ); + return { service, prisma, containerService, dockerRunnerStatus }; }; it('removes volumes with no live references', async () => { @@ -93,4 +112,29 @@ describe('VolumeGcService', () => { expect(containerService.listContainersByVolume).toHaveBeenCalledTimes(1); expect(containerService.removeVolume).toHaveBeenCalledTimes(1); }); + + it('skips sweep entirely when docker runner is down', async () => { + const { service, containerService } = makeService({ runnerStatus: 'down' }); + + await service.sweep(new Date('2024-01-01T00:00:00Z')); + + expect(containerService.listContainersByVolume).not.toHaveBeenCalled(); + }); + + it('enforces sweep timeout', async () => { + vi.useFakeTimers(); + const { service } = makeService({ sweepTimeoutMs: 10 }); + const sweepSpy = vi.spyOn(service, 'sweep').mockImplementation(async () => { + await new Promise(() => {}); + }); + + const resultPromise = (service as unknown as { sweepWithTimeout: () => Promise }).sweepWithTimeout(); + await vi.advanceTimersByTimeAsync(15); + const result = await resultPromise; + + expect(result).toBe(false); + expect(sweepSpy).toHaveBeenCalledTimes(1); + sweepSpy.mockRestore(); + vi.useRealTimers(); + }); }); diff --git a/packages/platform-server/src/bootstrap/app.module.ts b/packages/platform-server/src/bootstrap/app.module.ts index 5a7f243b4..9ef8df440 100644 --- a/packages/platform-server/src/bootstrap/app.module.ts +++ b/packages/platform-server/src/bootstrap/app.module.ts @@ -72,6 +72,9 @@ export class AppModule implements OnModuleInit { constructor(@Inject(LLMProvisioner) private readonly llmProvisioner: LLMProvisioner) {} async onModuleInit(): Promise { + if (process.env.SKIP_LLM_PROVISIONER === '1') { + return; + } await this.llmProvisioner.init(); } } diff --git a/packages/platform-server/src/core/services/config.service.ts b/packages/platform-server/src/core/services/config.service.ts index d6080b05e..ee9376573 100644 --- a/packages/platform-server/src/core/services/config.service.ts +++ b/packages/platform-server/src/core/services/config.service.ts @@ -67,6 +67,45 @@ export const configSchema = z.object({ const num = typeof v === 'number' ? v : Number(v); return Number.isFinite(num) ? num : 30_000; }), + dockerRunnerOptional: z + .union([z.boolean(), z.string()]) + .default('true') + .transform((v) => (typeof v === 'string' ? v.toLowerCase() === 'true' : !!v)), + dockerRunnerConnectRetryBaseDelayMs: z + .union([z.string(), z.number()]) + .default('500') + .transform((v) => { + const num = typeof v === 'number' ? v : Number(v); + return Number.isFinite(num) && num > 0 ? num : 500; + }), + dockerRunnerConnectRetryMaxDelayMs: z + .union([z.string(), z.number()]) + .default('30000') + .transform((v) => { + const num = typeof v === 'number' ? v : Number(v); + return Number.isFinite(num) && num > 0 ? num : 30_000; + }), + dockerRunnerConnectRetryJitterMs: z + .union([z.string(), z.number()]) + .default('250') + .transform((v) => { + const num = typeof v === 'number' ? v : Number(v); + return Number.isFinite(num) && num >= 0 ? num : 250; + }), + dockerRunnerConnectProbeIntervalMs: z + .union([z.string(), z.number()]) + .default('30000') + .transform((v) => { + const num = typeof v === 'number' ? v : Number(v); + return Number.isFinite(num) && num > 0 ? num : 30_000; + }), + dockerRunnerConnectMaxRetries: z + .union([z.string(), z.number()]) + .default('0') + .transform((v) => { + const num = typeof v === 'number' ? v : Number(v); + return Number.isFinite(num) && num >= 0 ? num : 0; + }), // Workspace container network name workspaceNetworkName: z.string().min(1).default('agents_net'), // Nix search/proxy settings @@ -188,6 +227,13 @@ export const configSchema = z.object({ .map((x) => x.trim()) .filter((x) => !!x), ), + volumeGcSweepTimeoutMs: z + .union([z.string(), z.number()]) + .default('15000') + .transform((v) => { + const num = typeof v === 'number' ? v : Number(v); + return Number.isFinite(num) && num >= 0 ? num : 15_000; + }), }); export type Config = z.infer; @@ -325,6 +371,34 @@ export class ConfigService implements Config { return this.params.dockerRunnerTimeoutMs; } + get dockerRunnerOptional(): boolean { + return this.params.dockerRunnerOptional; + } + + get dockerRunnerConnectRetryBaseDelayMs(): number { + return this.params.dockerRunnerConnectRetryBaseDelayMs; + } + + get dockerRunnerConnectRetryMaxDelayMs(): number { + return this.params.dockerRunnerConnectRetryMaxDelayMs; + } + + get dockerRunnerConnectRetryJitterMs(): number { + return this.params.dockerRunnerConnectRetryJitterMs; + } + + get dockerRunnerConnectProbeIntervalMs(): number { + return this.params.dockerRunnerConnectProbeIntervalMs; + } + + get dockerRunnerConnectMaxRetries(): number { + return this.params.dockerRunnerConnectMaxRetries; + } + + get volumeGcSweepTimeoutMs(): number { + return this.params.volumeGcSweepTimeoutMs; + } + getDockerRunnerBaseUrl(): string { return this.dockerRunnerBaseUrl; } @@ -337,6 +411,34 @@ export class ConfigService implements Config { return this.dockerRunnerTimeoutMs; } + getDockerRunnerOptional(): boolean { + return this.dockerRunnerOptional; + } + + getDockerRunnerConnectRetryBaseDelayMs(): number { + return this.dockerRunnerConnectRetryBaseDelayMs; + } + + getDockerRunnerConnectRetryMaxDelayMs(): number { + return this.dockerRunnerConnectRetryMaxDelayMs; + } + + getDockerRunnerConnectRetryJitterMs(): number { + return this.dockerRunnerConnectRetryJitterMs; + } + + getDockerRunnerConnectProbeIntervalMs(): number { + return this.dockerRunnerConnectProbeIntervalMs; + } + + getDockerRunnerConnectMaxRetries(): number { + return this.dockerRunnerConnectMaxRetries; + } + + getVolumeGcSweepTimeoutMs(): number { + return this.volumeGcSweepTimeoutMs; + } + get workspaceNetworkName(): string { return this.params.workspaceNetworkName; } @@ -446,6 +548,12 @@ export class ConfigService implements Config { dockerRunnerBaseUrl: process.env.DOCKER_RUNNER_BASE_URL, dockerRunnerSharedSecret: process.env.DOCKER_RUNNER_SHARED_SECRET, dockerRunnerTimeoutMs: process.env.DOCKER_RUNNER_TIMEOUT_MS, + dockerRunnerOptional: process.env.DOCKER_RUNNER_OPTIONAL, + dockerRunnerConnectRetryBaseDelayMs: process.env.DOCKER_RUNNER_CONNECT_RETRY_BASE_DELAY_MS, + dockerRunnerConnectRetryMaxDelayMs: process.env.DOCKER_RUNNER_CONNECT_RETRY_MAX_DELAY_MS, + dockerRunnerConnectRetryJitterMs: process.env.DOCKER_RUNNER_CONNECT_RETRY_JITTER_MS, + dockerRunnerConnectProbeIntervalMs: process.env.DOCKER_RUNNER_CONNECT_PROBE_INTERVAL_MS, + dockerRunnerConnectMaxRetries: process.env.DOCKER_RUNNER_CONNECT_MAX_RETRIES, workspaceNetworkName: process.env.WORKSPACE_NETWORK_NAME, nixAllowedChannels: process.env.NIX_ALLOWED_CHANNELS, nixHttpTimeoutMs: process.env.NIX_HTTP_TIMEOUT_MS, @@ -471,6 +579,7 @@ export class ConfigService implements Config { ncpsAuthToken: process.env.NCPS_AUTH_TOKEN, agentsDatabaseUrl: process.env.AGENTS_DATABASE_URL, corsOrigins: process.env.CORS_ORIGINS, + volumeGcSweepTimeoutMs: process.env.VOLUME_GC_SWEEP_TIMEOUT_MS, }); const config = new ConfigService().init(parsed); ConfigService.register(config); diff --git a/packages/platform-server/src/index.ts b/packages/platform-server/src/index.ts index 06398476c..198ac2c17 100644 --- a/packages/platform-server/src/index.ts +++ b/packages/platform-server/src/index.ts @@ -20,6 +20,7 @@ import { ConfigService } from './core/services/config.service'; import { GraphSocketGateway } from './gateway/graph.socket.gateway'; import { LiveGraphRuntime } from './graph'; import { ContainerTerminalGateway } from './infra/container/terminal.gateway'; +import { VolumeGcService } from './infra/container/volumeGc.job'; const bootstrapLogger = new Logger('Bootstrap'); @@ -82,6 +83,13 @@ async function bootstrap() { await fastifyInstance.listen({ port: PORT, host: '0.0.0.0' }); bootstrapLogger.log(`HTTP server listening on :${PORT}`); + const volumeGc = app.get(VolumeGcService); + setImmediate(() => { + const interval = Number(process.env.VOLUME_GC_INTERVAL_MS ?? '') || 60_000; + bootstrapLogger.log(`Starting Volume GC background job interval=${interval}`); + volumeGc.start(interval); + }); + fastifyInstance.server.on('upgrade', (req, _socket, _head) => { bootstrapLogger.log( `HTTP upgrade received ${JSON.stringify({ diff --git a/packages/platform-server/src/infra/container/containerTerminal.controller.ts b/packages/platform-server/src/infra/container/containerTerminal.controller.ts index 4f033f9ab..afb659c65 100644 --- a/packages/platform-server/src/infra/container/containerTerminal.controller.ts +++ b/packages/platform-server/src/infra/container/containerTerminal.controller.ts @@ -1,7 +1,8 @@ -import { BadRequestException, Body, Controller, Inject, Param, Post } from '@nestjs/common'; +import { BadRequestException, Body, Controller, Inject, Param, Post, UseGuards } from '@nestjs/common'; import { IsInt, IsOptional, IsString, Max, Min } from 'class-validator'; import { Type } from 'class-transformer'; import { TerminalSessionsService } from './terminal.sessions.service'; +import { RequireDockerRunnerGuard } from './requireDockerRunner.guard'; class CreateTerminalSessionDto { @IsOptional() @@ -24,6 +25,7 @@ class CreateTerminalSessionDto { } @Controller('api/containers/:workspaceId/terminal') +@UseGuards(RequireDockerRunnerGuard) export class ContainerTerminalController { constructor(@Inject(TerminalSessionsService) private readonly sessions: TerminalSessionsService) {} diff --git a/packages/platform-server/src/infra/container/containers.controller.ts b/packages/platform-server/src/infra/container/containers.controller.ts index 4854b6433..9da933d44 100644 --- a/packages/platform-server/src/infra/container/containers.controller.ts +++ b/packages/platform-server/src/infra/container/containers.controller.ts @@ -12,6 +12,7 @@ import { HttpStatus, HttpException, Req, + UseGuards, } from '@nestjs/common'; import { PrismaService } from '../../core/services/prisma.service'; import { Prisma, type PrismaClient, type ContainerStatus } from '@prisma/client'; @@ -21,6 +22,7 @@ import { sanitizeContainerMounts, type ContainerMount } from '@agyn/docker-runne import { ContainerAdminService } from './containerAdmin.service'; import { DockerRunnerRequestError } from './httpDockerRunner.client'; import { ConfigService } from '../../core/services/config.service'; +import { RequireDockerRunnerGuard } from './requireDockerRunner.guard'; // Allowed sort columns for containers list enum SortBy { @@ -185,6 +187,7 @@ export class ListContainerEventsQueryDto { } @Controller('api/containers') +@UseGuards(RequireDockerRunnerGuard) export class ContainersController { private prisma: PrismaClient; private readonly logger = new Logger(ContainersController.name); diff --git a/packages/platform-server/src/infra/container/dockerClient.token.ts b/packages/platform-server/src/infra/container/dockerClient.token.ts index 18d8eee48..794642acb 100644 --- a/packages/platform-server/src/infra/container/dockerClient.token.ts +++ b/packages/platform-server/src/infra/container/dockerClient.token.ts @@ -2,4 +2,6 @@ import type { DockerClientPort } from '@agyn/docker-runner'; export const DOCKER_CLIENT = Symbol('DOCKER_CLIENT'); -export type DockerClient = DockerClientPort; +export interface DockerClient extends DockerClientPort { + checkConnectivity(): Promise<{ status: string }>; +} diff --git a/packages/platform-server/src/infra/container/dockerRunnerConnectivity.monitor.ts b/packages/platform-server/src/infra/container/dockerRunnerConnectivity.monitor.ts new file mode 100644 index 000000000..62b46faef --- /dev/null +++ b/packages/platform-server/src/infra/container/dockerRunnerConnectivity.monitor.ts @@ -0,0 +1,173 @@ +import { Inject, Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common'; +import { ConfigService } from '../../core/services/config.service'; +import { DOCKER_CLIENT, type DockerClient } from './dockerClient.token'; +import { DockerRunnerRequestError } from './httpDockerRunner.client'; +import { DockerRunnerStatusService } from './dockerRunnerStatus.service'; + +@Injectable() +export class DockerRunnerConnectivityMonitor implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(DockerRunnerConnectivityMonitor.name); + private timer?: NodeJS.Timeout; + private stopped = false; + private retriesExhausted = false; + private failureCount = 0; + + constructor( + @Inject(DOCKER_CLIENT) private readonly dockerClient: DockerClient, + @Inject(ConfigService) private readonly configService: ConfigService, + private readonly statusService: DockerRunnerStatusService, + ) { + ConfigService.assertInitialized(configService); + } + + async onModuleInit(): Promise { + const baseUrl = this.configService.getDockerRunnerBaseUrl(); + this.statusService.setBaseUrl(baseUrl); + const optional = this.configService.getDockerRunnerOptional(); + this.statusService.setOptional(optional); + this.logger.log( + `Docker runner monitor initialized ${JSON.stringify({ baseUrl, optional })}`, + ); + + if (!optional) { + await this.verifyRequiredRunner(baseUrl); + this.scheduleProbe(this.configService.getDockerRunnerConnectProbeIntervalMs()); + return; + } + + this.scheduleProbe(0); + } + + onModuleDestroy(): void { + this.stopped = true; + if (this.timer) { + clearTimeout(this.timer); + this.timer = undefined; + } + } + + private async verifyRequiredRunner(baseUrl: string): Promise { + const startedAt = Date.now(); + try { + await this.dockerClient.checkConnectivity(); + const durationMs = Date.now() - startedAt; + this.failureCount = 0; + this.retriesExhausted = false; + this.statusService.markUp({ checkedAt: startedAt, durationMs }); + this.logger.log( + `Docker runner connectivity ok ${JSON.stringify({ dependency: 'docker-runner', baseUrl, durationMs })}`, + ); + } catch (error) { + this.statusService.markDown({ checkedAt: startedAt, error }); + const payload = this.buildErrorPayload({ + baseUrl, + error, + consecutiveFailures: 1, + }); + const stack = error instanceof Error ? error.stack : undefined; + this.logger.error( + `Docker runner required connectivity failed ${JSON.stringify(payload)}`, + stack, + ); + throw error; + } + } + + private scheduleProbe(delayMs: number): void { + if (this.stopped || this.retriesExhausted) { + return; + } + if (this.timer) { + clearTimeout(this.timer); + } + this.timer = setTimeout(() => { + void this.runProbe(); + }, Math.max(delayMs, 0)); + } + + private async runProbe(): Promise { + if (this.stopped || this.retriesExhausted) { + return; + } + + const startedAt = Date.now(); + const baseUrl = this.configService.getDockerRunnerBaseUrl(); + + try { + await this.dockerClient.checkConnectivity(); + const durationMs = Date.now() - startedAt; + this.failureCount = 0; + this.retriesExhausted = false; + this.statusService.markUp({ checkedAt: startedAt, durationMs }); + this.logger.log( + `Docker runner connectivity ok ${JSON.stringify({ dependency: 'docker-runner', baseUrl, durationMs })}`, + ); + this.scheduleProbe(this.configService.getDockerRunnerConnectProbeIntervalMs()); + } catch (error) { + this.failureCount += 1; + const maxRetries = this.configService.getDockerRunnerConnectMaxRetries(); + const exhausted = maxRetries > 0 && this.failureCount >= maxRetries; + const retryDelay = exhausted ? undefined : this.calculateRetryDelay(); + const nextRetryAt = typeof retryDelay === 'number' ? Date.now() + retryDelay : undefined; + + this.statusService.markDown({ checkedAt: startedAt, error, nextRetryAt }); + const payload = this.buildErrorPayload({ + baseUrl, + error, + consecutiveFailures: this.failureCount, + retryInMs: retryDelay, + nextRetryAt, + }); + const stack = error instanceof Error ? error.stack : undefined; + + if (exhausted) { + this.retriesExhausted = true; + this.logger.error( + `Docker runner connectivity retries exhausted ${JSON.stringify(payload)}`, + stack, + ); + return; + } + + this.logger.warn( + `Docker runner connectivity failed ${JSON.stringify(payload)}`, + stack, + ); + if (typeof retryDelay === 'number') { + this.scheduleProbe(retryDelay); + } + } + } + + private calculateRetryDelay(): number { + const baseDelay = this.configService.getDockerRunnerConnectRetryBaseDelayMs(); + const maxDelay = this.configService.getDockerRunnerConnectRetryMaxDelayMs(); + const jitter = this.configService.getDockerRunnerConnectRetryJitterMs(); + const exponent = Math.max(this.failureCount - 1, 0); + const exponential = baseDelay * 2 ** exponent; + const clamped = Math.min(exponential, maxDelay); + const jitterValue = jitter > 0 ? Math.round(Math.random() * jitter) : 0; + return clamped + jitterValue; + } + + private buildErrorPayload(details: { + baseUrl: string; + error: unknown; + consecutiveFailures: number; + retryInMs?: number; + nextRetryAt?: number; + }): Record { + const { baseUrl, error, consecutiveFailures, retryInMs, nextRetryAt } = details; + const code = error instanceof DockerRunnerRequestError ? error.errorCode : undefined; + const message = error instanceof Error ? error.message : String(error); + return { + dependency: 'docker-runner', + baseUrl, + errorCode: code, + message, + retryInMs, + nextRetryAt: nextRetryAt ? new Date(nextRetryAt).toISOString() : undefined, + consecutiveFailures, + }; + } +} diff --git a/packages/platform-server/src/infra/container/dockerRunnerConnectivity.probe.ts b/packages/platform-server/src/infra/container/dockerRunnerConnectivity.probe.ts deleted file mode 100644 index d1fc7bf15..000000000 --- a/packages/platform-server/src/infra/container/dockerRunnerConnectivity.probe.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { Inject, Injectable, Logger, OnModuleInit } from '@nestjs/common'; - -import { ConfigService } from '../../core/services/config.service'; -import { DOCKER_CLIENT, type DockerClient } from './dockerClient.token'; -import { DockerRunnerRequestError, HttpDockerRunnerClient } from './httpDockerRunner.client'; - -@Injectable() -export class DockerRunnerConnectivityProbe implements OnModuleInit { - private readonly logger = new Logger(DockerRunnerConnectivityProbe.name); - - constructor( - @Inject(DOCKER_CLIENT) private readonly dockerClient: DockerClient, - private readonly config?: ConfigService, - ) {} - - async onModuleInit(): Promise { - const config = this.resolveConfig(); - if (!config) { - this.logger.log('Skipping docker runner connectivity probe (config unavailable)'); - return; - } - const baseUrl = config.getDockerRunnerBaseUrl(); - if (process.env.SKIP_DOCKER_RUNNER_PROBE === '1') { - this.logger.log('Skipping docker runner connectivity probe (explicit skip)', { baseUrl }); - return; - } - if (process.env.NODE_ENV === 'test' && process.env.ENABLE_DOCKER_RUNNER_PROBE !== '1') { - this.logger.log('Skipping docker runner connectivity probe in test environment', { baseUrl }); - return; - } - if (!(this.dockerClient instanceof HttpDockerRunnerClient)) { - this.logger.log('Skipping docker runner connectivity probe (non-HTTP client)', { baseUrl }); - return; - } - - try { - const response = await this.dockerClient.checkConnectivity(); - this.logger.log('Docker runner connectivity established', { baseUrl, status: response.status }); - } catch (error) { - const payload: Record = { baseUrl }; - if (error instanceof DockerRunnerRequestError) { - payload.statusCode = error.statusCode; - payload.runnerErrorCode = error.errorCode; - payload.retryable = error.retryable; - payload.message = error.message; - } else if (error instanceof Error) { - payload.message = error.message; - } else { - payload.error = error; - } - this.logger.error('Docker runner connectivity check failed', error instanceof Error ? error.stack : undefined, payload); - throw error; - } - } - - private resolveConfig(): ConfigService | undefined { - if (this.config) { - return this.config; - } - if (ConfigService.isRegistered()) { - return ConfigService.getInstance(); - } - return undefined; - } -} diff --git a/packages/platform-server/src/infra/container/dockerRunnerStatus.service.ts b/packages/platform-server/src/infra/container/dockerRunnerStatus.service.ts new file mode 100644 index 000000000..6267a41fc --- /dev/null +++ b/packages/platform-server/src/infra/container/dockerRunnerStatus.service.ts @@ -0,0 +1,113 @@ +import { Inject, Injectable } from '@nestjs/common'; +import { ConfigService } from '../../core/services/config.service'; +import { DockerRunnerRequestError } from './httpDockerRunner.client'; + +export type DockerRunnerStatusState = 'unknown' | 'up' | 'down'; + +export type DockerRunnerStatusSnapshot = { + status: DockerRunnerStatusState; + optional: boolean; + baseUrl?: string; + lastCheckedAt?: string; + lastSuccessAt?: string; + lastFailureAt?: string; + lastDurationMs?: number; + consecutiveFailures: number; + nextRetryAt?: string; + error?: { + name?: string; + message?: string; + statusCode?: number; + errorCode?: string; + retryable?: boolean; + }; +}; + +@Injectable() +export class DockerRunnerStatusService { + private snapshot: DockerRunnerStatusSnapshot; + + constructor(@Inject(ConfigService) private readonly configService: ConfigService) { + ConfigService.assertInitialized?.(this.configService); + this.snapshot = { + status: 'unknown', + optional: this.configService.getDockerRunnerOptional(), + baseUrl: this.configService.getDockerRunnerBaseUrl(), + consecutiveFailures: 0, + }; + } + + setBaseUrl(baseUrl: string): void { + this.snapshot = { ...this.snapshot, baseUrl }; + } + + setOptional(optional: boolean): void { + this.snapshot = { ...this.snapshot, optional }; + } + + markUnknown(): void { + this.snapshot = { + ...this.snapshot, + status: 'unknown', + nextRetryAt: undefined, + }; + } + + markUp(details: { checkedAt: number; durationMs: number }): void { + this.snapshot = { + ...this.snapshot, + status: 'up', + lastCheckedAt: this.toIso(details.checkedAt), + lastSuccessAt: this.toIso(Date.now()), + lastDurationMs: details.durationMs, + consecutiveFailures: 0, + nextRetryAt: undefined, + error: undefined, + }; + } + + markDown(details: { checkedAt: number; error: unknown; nextRetryAt?: number }): void { + this.snapshot = { + ...this.snapshot, + status: 'down', + lastCheckedAt: this.toIso(details.checkedAt), + lastFailureAt: this.toIso(Date.now()), + consecutiveFailures: this.snapshot.consecutiveFailures + 1, + nextRetryAt: this.toIso(details.nextRetryAt), + error: this.serializeError(details.error), + }; + } + + getSnapshot(): DockerRunnerStatusSnapshot { + const { error, ...rest } = this.snapshot; + return { + ...rest, + error: error ? { ...error } : undefined, + }; + } + + private toIso(timestamp: number | undefined): string | undefined { + if (typeof timestamp !== 'number') return undefined; + return new Date(timestamp).toISOString(); + } + + private serializeError(error: unknown): DockerRunnerStatusSnapshot['error'] | undefined { + if (!error) return undefined; + if (error instanceof DockerRunnerRequestError) { + return { + name: error.name, + message: error.message, + statusCode: error.statusCode, + errorCode: error.errorCode, + retryable: error.retryable, + }; + } + if (error instanceof Error) { + return { + name: error.name, + message: error.message, + }; + } + return { message: String(error) }; + } +} diff --git a/packages/platform-server/src/infra/container/requireDockerRunner.guard.ts b/packages/platform-server/src/infra/container/requireDockerRunner.guard.ts new file mode 100644 index 000000000..61aa243f1 --- /dev/null +++ b/packages/platform-server/src/infra/container/requireDockerRunner.guard.ts @@ -0,0 +1,29 @@ +import { CanActivate, ExecutionContext, HttpException, HttpStatus, Inject, Injectable, Optional } from '@nestjs/common'; +import { DockerRunnerStatusService } from './dockerRunnerStatus.service'; + +@Injectable() +export class RequireDockerRunnerGuard implements CanActivate { + constructor(@Optional() @Inject(DockerRunnerStatusService) private readonly statusService?: DockerRunnerStatusService) {} + + canActivate(_context: ExecutionContext): boolean { + const snapshot = this.statusService?.getSnapshot(); + if (!snapshot) { + return true; + } + if (snapshot.status === 'up') { + return true; + } + + throw new HttpException( + { + statusCode: HttpStatus.SERVICE_UNAVAILABLE, + message: 'docker-runner not ready', + error: { + code: 'docker_runner_not_ready', + status: snapshot.status, + }, + }, + HttpStatus.SERVICE_UNAVAILABLE, + ); + } +} diff --git a/packages/platform-server/src/infra/container/terminal.gateway.ts b/packages/platform-server/src/infra/container/terminal.gateway.ts index 3114d79fb..f90183fc5 100644 --- a/packages/platform-server/src/infra/container/terminal.gateway.ts +++ b/packages/platform-server/src/infra/container/terminal.gateway.ts @@ -9,6 +9,7 @@ import { WorkspaceProvider } from '../../workspace/providers/workspace.provider' import { WorkspaceHandle } from '../../workspace/workspace.handle'; import type { WorkspaceExecResult } from '../../workspace/runtime/workspace.runtime.provider'; import { DockerRunnerRequestError } from './httpDockerRunner.client'; +import { DockerRunnerStatusService } from './dockerRunnerStatus.service'; const QuerySchema = z .object({ sessionId: z.string().uuid(), token: z.string().min(1) }) @@ -142,6 +143,7 @@ export class ContainerTerminalGateway { constructor( @Inject(TerminalSessionsService) private readonly sessions: TerminalSessionsService, @Inject(WorkspaceProvider) private readonly workspaceProvider: WorkspaceProvider, + @Inject(DockerRunnerStatusService) private readonly dockerRunnerStatus: DockerRunnerStatusService, ) {} private wss: WebSocketServer | null = null; @@ -174,6 +176,31 @@ export class ContainerTerminalGateway { }); }); + const runnerSnapshot = this.dockerRunnerStatus.getSnapshot(); + if (runnerSnapshot.status !== 'up') { + this.logger.warn('Terminal WS denied because docker runner is not ready', { + path: parsedUrl.pathname, + workspaceId, + status: runnerSnapshot.status, + }); + try { + const payload = JSON.stringify({ + error: { code: 'docker_runner_not_ready', message: 'docker-runner not ready' }, + }); + const headers = [ + 'HTTP/1.1 503 Service Unavailable', + 'Content-Type: application/json; charset=utf-8', + `Content-Length: ${Buffer.byteLength(payload)}`, + 'Connection: close', + ].join('\r\n'); + socket.end(`${headers}\r\n\r\n${payload}`); + } catch { + // ignore + } + socket.destroy(); + return; + } + wss.handleUpgrade(req, socket, head, (ws) => { const stream: SocketStream = { socket: ws }; const fakeReq = { diff --git a/packages/platform-server/src/infra/container/volumeGc.job.ts b/packages/platform-server/src/infra/container/volumeGc.job.ts index c9d876867..2b5a51ff6 100644 --- a/packages/platform-server/src/infra/container/volumeGc.job.ts +++ b/packages/platform-server/src/infra/container/volumeGc.job.ts @@ -2,12 +2,16 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import pLimit from 'p-limit'; import { PrismaService } from '../../core/services/prisma.service'; import { DOCKER_CLIENT, type DockerClient } from './dockerClient.token'; +import { DockerRunnerStatusService } from './dockerRunnerStatus.service'; +import { ConfigService } from '../../core/services/config.service'; const DEFAULT_ENABLED = true; const DEFAULT_INTERVAL_MS = 60_000; const DEFAULT_MAX_PER_SWEEP = 100; const DEFAULT_CONCURRENCY = 3; const DEFAULT_COOLDOWN_MS = 10 * 60 * 1000; +const DEFAULT_SWEEP_TIMEOUT_MS = 15_000; +const SWEEP_TIMEOUT_ERROR = 'VOLUME_GC_SWEEP_TIMEOUT'; type SweepOutcome = 'removed' | 'referenced' | 'cooldown' | 'error' | 'not_found'; @@ -18,17 +22,25 @@ export class VolumeGcService { private readonly maxPerSweep: number; private readonly concurrency: number; private readonly cooldownMs: number; + private readonly sweepTimeoutMs: number; private readonly lastAttempt = new Map(); private timer?: NodeJS.Timeout; constructor( private readonly prismaService: PrismaService, @Inject(DOCKER_CLIENT) private readonly containerService: DockerClient, + private readonly dockerRunnerStatus: DockerRunnerStatusService, + @Inject(ConfigService) private readonly configService: ConfigService, ) { this.enabled = this.resolveBoolean(process.env.VOLUME_GC_ENABLED, DEFAULT_ENABLED); this.maxPerSweep = this.resolveInteger(process.env.VOLUME_GC_MAX_PER_SWEEP, DEFAULT_MAX_PER_SWEEP); this.concurrency = this.resolveInteger(process.env.VOLUME_GC_CONCURRENCY, DEFAULT_CONCURRENCY, 1); this.cooldownMs = this.resolveInteger(process.env.VOLUME_GC_COOLDOWN_MS, DEFAULT_COOLDOWN_MS, 0); + this.sweepTimeoutMs = this.resolveInteger( + process.env.VOLUME_GC_SWEEP_TIMEOUT_MS, + this.configService.getVolumeGcSweepTimeoutMs() ?? DEFAULT_SWEEP_TIMEOUT_MS, + 0, + ); } start(intervalMs = DEFAULT_INTERVAL_MS): void { @@ -39,7 +51,12 @@ export class VolumeGcService { const run = async () => { try { - await this.sweep(); + const completed = await this.sweepWithTimeout(); + if (!completed) { + this.logger.warn( + `VolumeGC: sweep timed out after ${this.sweepTimeoutMs}ms; scheduling next run`, + ); + } } catch (error) { this.logger.error('VolumeGC: sweep failed', error as Error); } finally { @@ -60,6 +77,14 @@ export class VolumeGcService { async sweep(now: Date = new Date()): Promise { if (!this.enabled) return; + const runnerSnapshot = this.dockerRunnerStatus.getSnapshot(); + if (runnerSnapshot.status !== 'up') { + this.logger.warn( + `VolumeGC: skipping sweep because docker runner is ${runnerSnapshot.status ?? 'unknown'}`, + ); + return; + } + const prisma = this.prisma; const candidates = await prisma.thread.findMany({ where: { status: 'closed' }, @@ -97,6 +122,32 @@ export class VolumeGcService { ); } + private async sweepWithTimeout(): Promise { + if (this.sweepTimeoutMs <= 0) { + await this.sweep(); + return true; + } + + let timeoutHandle: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, reject) => { + timeoutHandle = setTimeout(() => reject(new Error(SWEEP_TIMEOUT_ERROR)), this.sweepTimeoutMs); + }); + + try { + await Promise.race([this.sweep(), timeoutPromise]); + return true; + } catch (error) { + if (error instanceof Error && error.message === SWEEP_TIMEOUT_ERROR) { + return false; + } + throw error; + } finally { + if (timeoutHandle) { + clearTimeout(timeoutHandle); + } + } + } + private async handleThread(threadId: string, nowMs: number): Promise { const last = this.lastAttempt.get(threadId); if (typeof last === 'number' && nowMs - last < this.cooldownMs) { diff --git a/packages/platform-server/src/infra/health/health.controller.ts b/packages/platform-server/src/infra/health/health.controller.ts new file mode 100644 index 000000000..3059e0d1e --- /dev/null +++ b/packages/platform-server/src/infra/health/health.controller.ts @@ -0,0 +1,18 @@ +import { Controller, Get } from '@nestjs/common'; +import { DockerRunnerStatusService } from '../container/dockerRunnerStatus.service'; + +@Controller() +export class HealthController { + constructor(private readonly dockerRunnerStatus: DockerRunnerStatusService) {} + + @Get('health') + getHealth() { + return { + status: 'ok', + timestamp: new Date().toISOString(), + dependencies: { + dockerRunner: this.dockerRunnerStatus.getSnapshot(), + }, + }; + } +} diff --git a/packages/platform-server/src/infra/infra.module.ts b/packages/platform-server/src/infra/infra.module.ts index c43bd5233..6781f84f9 100644 --- a/packages/platform-server/src/infra/infra.module.ts +++ b/packages/platform-server/src/infra/infra.module.ts @@ -24,7 +24,10 @@ import { WorkspaceProvider } from '../workspace/providers/workspace.provider'; import { DockerWorkspaceRuntimeProvider } from '../workspace/providers/docker.workspace.provider'; import { DOCKER_CLIENT, type DockerClient } from './container/dockerClient.token'; import { HttpDockerRunnerClient } from './container/httpDockerRunner.client'; -import { DockerRunnerConnectivityProbe } from './container/dockerRunnerConnectivity.probe'; +import { DockerRunnerConnectivityMonitor } from './container/dockerRunnerConnectivity.monitor'; +import { DockerRunnerStatusService } from './container/dockerRunnerStatus.service'; +import { RequireDockerRunnerGuard } from './container/requireDockerRunner.guard'; +import { HealthController } from './health/health.controller'; @Module({ imports: [CoreModule, VaultModule], @@ -34,7 +37,9 @@ import { DockerRunnerConnectivityProbe } from './container/dockerRunnerConnectiv provide: ContainerRegistry, useFactory: async (prismaSvc: PrismaService) => { const svc = new ContainerRegistry(prismaSvc.getClient()); - await svc.ensureIndexes(); + if (process.env.SKIP_DB_BOOTSTRAP !== '1') { + await svc.ensureIndexes(); + } return svc; }, inject: [PrismaService], @@ -49,7 +54,18 @@ import { DockerRunnerConnectivityProbe } from './container/dockerRunnerConnectiv }), inject: [ConfigService], }, - DockerRunnerConnectivityProbe, + { + provide: DockerRunnerStatusService, + useFactory: (config: ConfigService) => new DockerRunnerStatusService(config), + inject: [ConfigService], + }, + { + provide: DockerRunnerConnectivityMonitor, + useFactory: (docker: DockerClient, config: ConfigService, status: DockerRunnerStatusService) => + new DockerRunnerConnectivityMonitor(docker, config, status), + inject: [DOCKER_CLIENT, ConfigService, DockerRunnerStatusService], + }, + RequireDockerRunnerGuard, { provide: ContainerCleanupService, useFactory: (registry: ContainerRegistry, containers: DockerClient) => { @@ -62,13 +78,13 @@ import { DockerRunnerConnectivityProbe } from './container/dockerRunnerConnectiv }, { provide: VolumeGcService, - useFactory: (prisma: PrismaService, containers: DockerClient) => { - const svc = new VolumeGcService(prisma, containers); - const interval = Number(process.env.VOLUME_GC_INTERVAL_MS ?? '') || 60_000; - svc.start(interval); - return svc; - }, - inject: [PrismaService, DOCKER_CLIENT], + useFactory: ( + prisma: PrismaService, + containers: DockerClient, + status: DockerRunnerStatusService, + config: ConfigService, + ) => new VolumeGcService(prisma, containers, status, config), + inject: [PrismaService, DOCKER_CLIENT, DockerRunnerStatusService, ConfigService], }, { provide: WorkspaceProvider, @@ -106,7 +122,7 @@ import { DockerRunnerConnectivityProbe } from './container/dockerRunnerConnectiv GithubService, PRService, ], - controllers: [NixController, NixRepoController, ContainersController, ContainerTerminalController], + controllers: [NixController, NixRepoController, ContainersController, ContainerTerminalController, HealthController], exports: [ VaultModule, DOCKER_CLIENT, @@ -123,6 +139,7 @@ import { DockerRunnerConnectivityProbe } from './container/dockerRunnerConnectiv ContainerRegistry, ArchiveService, WorkspaceProvider, + DockerRunnerStatusService, ], }) export class InfraModule {} diff --git a/packages/platform-server/src/llm/llm.module.ts b/packages/platform-server/src/llm/llm.module.ts index f8c818d1c..238898640 100644 --- a/packages/platform-server/src/llm/llm.module.ts +++ b/packages/platform-server/src/llm/llm.module.ts @@ -32,6 +32,9 @@ import { LiteLLMKeyStore } from './provisioners/litellm.key.store'; if (cfg.llmProvider === 'openai') { return openaiProvisioner; } + if (process.env.SKIP_LLM_PROVISIONER === '1') { + return liteProvisioner; + } await liteProvisioner.init(); return liteProvisioner; },