feat: complete pipeline management with encapsulated PrescribedOperations#493
Open
philwinder wants to merge 9 commits intomainfrom
Open
feat: complete pipeline management with encapsulated PrescribedOperations#493philwinder wants to merge 9 commits intomainfrom
philwinder wants to merge 9 commits intomainfrom
Conversation
Contributor
Go Test CoverageTotal coverage: 30.6% Full coverage report |
- Add Pipeline, Step, and StepDependency domain models in domain/repository/pipeline.go with immutable value semantics and constructor patterns matching existing entities - Add PipelineStore interface to domain/repository/store.go - Implement PipelineStore with mapper for models.Pipeline <-> domain.Pipeline conversion including nested Steps and dependency management - Add models.Base custom base type without soft delete (DeletedAt) to enable proper CASCADE delete behavior via database constraints - Register Pipeline models with AutoMigrate for schema management - Add generic Save and Delete methods to database.Repository to reduce boilerplate across stores that don't need custom upsert logic
Redesign pipeline, step, and step dependency as independent entities: - **Pipeline**: Simplified to just id, name (unique), and timestamps - **Step**: Now independent with PipelineID foreign key (no cascade) - **StepDependency**: Explicit join table linking steps without cascade All three entities use simple stores with only the generic base layer (Save, Delete, Find, FindOne from Repository[D, E]). Add WithPipelineID and WithStepID options for filtering in queries. This allows steps and dependencies to exist independently of pipelines, and simplifies the persistence layer by removing custom transaction logic.
Redesign Step as fully independent entity with no pipeline reference: - **Step**: Remove PipelineID, keeping only name and kind - **PipelineStep**: New association table bridging pipelines and steps with composite unique index on (pipeline_id, step_id) and ON DELETE CASCADE on both foreign keys - **StepDependency**: Add explicit foreign key references with ON DELETE CASCADE so dependencies are cleaned up when either step is deleted All entities (Step, PipelineStep, StepDependency) use simple stores with the generic base layer. Database constraints handle cascading deletes automatically. Add PipelineStepStore interface and implementation.
Change Base.ID from uint to int64 to match the rest of the codebase. Remove all int64/uint type casts from mappers since domain and model types are now consistent.
Implement the Pipeline application service to manage pipelines and their steps: - Add DeleteBy(ctx, ...Option) to Store[T] interface (already implemented via Repository) - Add WithStepIDIn query option for batch step filtering - Create PipelineDetail view type combining pipeline, steps, and dependencies - Implement Pipeline service with Create, Detail, Update, Delete methods - Add comprehensive validation (non-empty names, unique steps, no circular dependencies) - Wire pipeline stores and service into Client - Add 10 integration tests covering all CRUD operations and validation Fix composite unique index on PipelineStep and StepDependency models to allow multiple steps per pipeline (was incorrectly preventing this). All tests pass, no regressions.
β¦lation The Pipeline service now owns the prescribed operations configuration, seeding the default pipeline at startup and resolving operations from the database at runtime. This removes PrescribedOperations from scattered services and handlers, centralizing pipeline operation logic. Changes: - Pipeline service: inject PrescribedOperations, add Initialise() and RequiredOperations() methods, change Operations() to accept *int64 with nil fallback to default pipeline - CommitOperationResolver interface: update signature to accept *int64 - Sync handler: remove prescribedOps, use resolver unconditionally - Repository service: replace prescribedOps with resolver, hardcode infrastructure operations (Clone, Sync, Delete) - PeriodicSync: drop prescribedOps, hardcode sync operations - Client: remove prescribedOps field, initialize Pipeline service before Repository to support dependency injection - validateHandlers: accept operations parameter instead of accessing PrescribedOperations directly All tests pass; smoke tests pass except for pre-existing local directory issues unrelated to pipeline persistence.
a7ba58c to
d778e8a
Compare
Implements HTTP endpoints for pipeline CRUD operations and adds pipeline_id to repository responses. Steps are now shared across pipelines via a join table, supporting the two-pipeline architecture (default and RAG-only).
## Key Changes
**Pipeline Endpoints** (`/api/v1/pipelines`):
- GET /pipelines β list all pipelines with pagination
- POST /pipelines β create pipeline with steps and dependencies
- GET /pipelines/{id} β get pipeline detail with steps and dependencies
- PUT /pipelines/{id} β replace pipeline steps and name
- DELETE /pipelines/{id} β delete pipeline and associated steps
**Step Endpoints** (`/api/v1/steps`):
- GET /steps β list all steps with pagination
- GET /steps/{id} β get single step with dependencies
**Repository Updates**:
- Added pipeline_id attribute to repository responses
- Added links.pipeline URL when a repository has an assigned pipeline
- PUT /repositories/{id}/pipeline β assign a pipeline to a repository
**Shared Step Architecture**:
- Steps identified by unique kind (e.g., kodit.commit.scan)
- Steps are shared across pipelines via pipeline_steps join table
- Dependencies are also shared via step_dependencies table
- Find-or-create pattern prevents step duplication
- Orphan cleanup removes steps when no pipelines reference them
**Service Methods**:
- FindSteps(ctx, ...Option) β query top-level steps
- FindStep(ctx, id) β get single step
- CountSteps(ctx, ...Option) β count steps
- DetailStep(ctx, stepID) β load step with dependencies
**Initialization**:
- Refactored Initialise() to seed two pipelines:
- "default": full operation set from FullPrescribedOperations
- "rag": RAG-only subset from RAGOnlyPrescribedOperations
- Both pipelines share the same step rows via kind lookup
- Idempotent: calling Initialise multiple times has no effect
**DTOs** (new):
- pipeline.go: PipelineData, PipelineAttributes, PipelineLinks
- StepData, StepAttributes, StepLinks
- PipelineDetailResponse (data + included steps)
- Request types: PipelineCreateRequest, PipelineUpdateRequest, AssignPipelineRequest
**Query Options**:
- WithKind(string) β filter steps by kind
- WithDependsOnID(int64) β filter dependencies by target step
All IDs are int64. DependsOn in responses uses step IDs, not names. All tests pass.
- name is now the unique namespaced identifier (e.g. "kodit.commit.scan") - kind is now the step category (e.g. "internal") - findOrCreateStep looks up by name instead of kind - topologicalSort uses Name() instead of Kind() for operation conversion - Operations() filters to commit operations only (repo ops resolved separately) - Added 4 missing repository-level steps: create, clone, sync, delete - Both default and rag pipelines now include all repository operations - Added unique index on step name column
Add join_type field to PipelineStep to support OR ("any") semantics
in addition to AND ("all"). Join type is configured per pipeline,
since the same step may combine dependencies differently across
pipelines.
Changes:
- Add JoinType column to PipelineStep GORM model (default: "all")
- Add joinType field to domain PipelineStep with accessor
- Update PipelineStepMapper to map join_type in both directions
- Add JoinType to StepParams for API input
- Add Associations() accessor to PipelineDetail for API output
- Update createSteps to capture and return associations
- Update loadStepsAndDependencies to return associations
- Add join_type validation (must be "", "all", or "any")
- Add JoinType to StepAttributes and StepInput DTOs
- Update stepToDTO to include join_type from associations
- Update API handlers to map and display join_type
- Add three tests: round-trip, default, validation
All tests pass (unit, integration, smoke).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Complete pipeline management implementation with domain models, persistence layer, application service, HTTP API endpoints, shared step architecture, and Airflow-style join semantics for pipeline steps.
Pipeline API Endpoints
Pipeline CRUD (
/api/v1/pipelines):GET /pipelinesβ list all pipelines with paginationPOST /pipelinesβ create pipeline with steps, dependencies, and join typesGET /pipelines/{id}β get pipeline detail with steps, dependencies, and join typesPUT /pipelines/{id}β replace pipeline steps, name, and join typesDELETE /pipelines/{id}β delete pipeline and associated stepsStep Queries (
/api/v1/steps):GET /stepsβ list all steps with paginationGET /steps/{id}β get single step with dependenciesRepository Integration:
pipeline_idattributelinks.pipelineURL added when a repository has an assigned pipelinePUT /repositories/{id}/pipelineβ assign a pipeline to a repositoryShared Step Architecture
Steps are shared across pipelines:
kind(e.g.,kodit.commit.scan)pipeline_stepsjoin tablestep_dependenciestableExample: Both "default" and "rag" pipelines share the same
kodit.commit.scanstep row, but with different dependency chains.Join Type Semantics
Pipeline steps now support Airflow-style trigger rules:
join_type: "all"(default) β fire when all upstream dependencies completejoin_type: "any"β fire when any upstream dependency completesPipelineStepassociation since the same step can combine dependencies differently in different pipelinesjoin_typefield in step attributes (responses and requests)Example:
{ "id": 3, "attributes": { "name": "kodit.commit.create_bm25_index", "kind": "internal", "depends_on": [1, 2], "join_type": "any" } }Domain Models (
domain/repository/pipeline.go)Pipeline: Immutable type with id, name (unique), timestampsStep: Independent type with name and kindPipelineStep: Association joining pipelines and steps, includes joinTypeStepDependency: Links between stepsPersistence Layer
PipelineStore,StepStore,PipelineStepStore,StepDependencyStoredatabase.Repository[D, E]patternPipelineStepGORM model includesJoinTypecolumn (default: "all")Application Service (
application/service/pipeline.go)Core Methods:
Create(ctx, params)β validate and persist pipeline with steps, dependencies, and join typesDetail(ctx, id)β return pipeline with all steps, relationships, and join typesUpdate(ctx, id, params)β replace steps, name, and join typesDelete(ctx, id)β remove pipeline and associated dataOperations(ctx, pipelineID)β resolve pipeline steps in topological order (nil β default)Find/Get/Countβ inherited from Collection interfaceStep Methods:
FindSteps(ctx, ...Option)β query top-level stepsFindStep(ctx, id)β get single stepCountSteps(ctx, ...Option)β count stepsDetailStep(ctx, stepID)β load step with dependenciesInitialization:
Initialise(ctx)β seed two pipelines if none exist:FullPrescribedOperationsRAGOnlyPrescribedOperationsValidation:
API DTOs
Pipeline DTOs (
infrastructure/api/v1/dto/pipeline.go):Request Types:
PipelineCreateRequest/PipelineUpdateRequestβ create/update pipelineStepInputβ name, kind, depends_on (names for creation), join_typeAssignPipelineRequestβ assign pipeline to repositoryRepository DTOs (
infrastructure/api/v1/dto/repository.go):PipelineID *int64toRepositoryAttributesRepositoryLinks { Pipeline *string }for self-referential linksRouters
PipelinesRouter (
infrastructure/api/v1/pipelines.go):pipelineToDTO,pipelineDetailToDTO,stepToDTOStepsRouter (
infrastructure/api/v1/steps.go):client.Pipelines.DetailStep()to load dependenciesRepositoriesRouter (
infrastructure/api/v1/repositories.go):repoToDTOto includePipelineIDand linksAssignPipelinehandler forPUT /{id}/pipelineQuery Options
Domain (
domain/repository/options.go):WithKind(string)β filter steps by kindWithDependsOnID(int64)β filter dependencies by target stepIntegration
/api/v1/pipelinesand/api/v1/steps(write-protected group)Testing