From b2d8fb30a2c50cb48a9c0600e0d9b841ec716241 Mon Sep 17 00:00:00 2001 From: Corey Christous Date: Wed, 4 Feb 2026 20:43:42 -0500 Subject: [PATCH 1/2] feat(mcp): add scheduled tasks tools Add three new MCP tools for managing Semaphore scheduled tasks (periodics): - tasks_list: List scheduled tasks for a project with pagination - tasks_describe: Get task details including recent trigger history - tasks_run: Trigger a task to run immediately Key implementation details: - Integrates PeriodicScheduler gRPC service - Enforces RBAC permissions (project.scheduler.view, project.scheduler.run_manually) - Feature flag gating (mcp_server_read_tools, mcp_server_write_tools) - UUID validation on all ID parameters - Path traversal protection for pipeline_file overrides - Parameter name/value validation for tasks_run --- mcp_server/Makefile | 2 +- mcp_server/README.md | 4 + mcp_server/cmd/mcp_server/main.go | 2 + .../periodic_scheduler.pb.go | 713 ++++++++++++++++++ .../periodic_scheduler_grpc.pb.go | 99 +++ mcp_server/pkg/internalapi/config.go | 6 + mcp_server/pkg/internalapi/manager.go | 15 + mcp_server/pkg/tools/tasks/describe.go | 367 +++++++++ mcp_server/pkg/tools/tasks/list.go | 312 ++++++++ mcp_server/pkg/tools/tasks/register.go | 28 + mcp_server/pkg/tools/tasks/run.go | 364 +++++++++ mcp_server/pkg/tools/tasks/tasks_test.go | 384 ++++++++++ mcp_server/test/support/mock_provider.go | 4 + mcp_server/test/support/stubs.go | 107 +++ 14 files changed, 2406 insertions(+), 1 deletion(-) create mode 100644 mcp_server/pkg/internal_api/periodic_scheduler/periodic_scheduler.pb.go create mode 100644 mcp_server/pkg/internal_api/periodic_scheduler/periodic_scheduler_grpc.pb.go create mode 100644 mcp_server/pkg/tools/tasks/describe.go create mode 100644 mcp_server/pkg/tools/tasks/list.go create mode 100644 mcp_server/pkg/tools/tasks/register.go create mode 100644 mcp_server/pkg/tools/tasks/run.go create mode 100644 mcp_server/pkg/tools/tasks/tasks_test.go diff --git a/mcp_server/Makefile b/mcp_server/Makefile index d0a574afa..652cc7032 100644 --- a/mcp_server/Makefile +++ b/mcp_server/Makefile @@ -7,7 +7,7 @@ DOCKER_BUILD_PATH=.. INTERNAL_API_BRANCH?=master TMP_REPO_DIR ?= /tmp/internal_api -INTERNAL_API_MODULES?=include/internal_api/status,include/internal_api/response_status,plumber_w_f.workflow,plumber.pipeline,server_farm.job,loghub,loghub2,user,repository_integrator,rbac,organization,projecthub,feature,artifacthub +INTERNAL_API_MODULES?=include/internal_api/status,include/internal_api/response_status,plumber_w_f.workflow,plumber.pipeline,server_farm.job,loghub,loghub2,user,repository_integrator,rbac,organization,projecthub,feature,artifacthub,periodic_scheduler PROTOC_IMAGE?=golang:1.24-alpine .PHONY: tidy test test.setup lint pb.gen dev.run index diff --git a/mcp_server/README.md b/mcp_server/README.md index 749437a18..d403222ae 100644 --- a/mcp_server/README.md +++ b/mcp_server/README.md @@ -47,6 +47,9 @@ Refer to [`AGENTS.md`](AGENTS.md) for repository guidelines, project structure, | `pipeline_jobs` | List jobs belonging to a specific pipeline. | | `jobs_describe` | Describes a job, surfacing agent details and lifecycle timestamps. | | `jobs_logs` | Fetches job logs. Hosted jobs stream loghub events; self-hosted jobs return a URL to fetch logs. | +| `tasks_list` | List scheduled tasks (periodics) for a project. | +| `tasks_describe` | Get detailed information about a scheduled task including recent trigger history. | +| `tasks_run` | Trigger a scheduled task to run immediately. | ## Requirements @@ -78,6 +81,7 @@ The server dials internal gRPC services based on environment variables. Deployme | RBAC gRPC endpoint | `INTERNAL_API_URL_RBAC`, `MCP_RBAC_GRPC_ENDPOINT` | | Users gRPC endpoint | `INTERNAL_API_URL_USER`, `MCP_USER_GRPC_ENDPOINT` | | Featurehub gRPC endpoint | `INTERNAL_API_URL_FEATURE`, `MCP_FEATURE_GRPC_ENDPOINT` | +| Scheduler gRPC endpoint | `INTERNAL_API_URL_SCHEDULER`, `MCP_SCHEDULER_GRPC_ENDPOINT` | | Dial timeout | `MCP_GRPC_DIAL_TIMEOUT` (default `5s`) | | Call timeout | `MCP_GRPC_CALL_TIMEOUT` (default `15s`) | diff --git a/mcp_server/cmd/mcp_server/main.go b/mcp_server/cmd/mcp_server/main.go index 073be4aaf..a8416156e 100644 --- a/mcp_server/cmd/mcp_server/main.go +++ b/mcp_server/cmd/mcp_server/main.go @@ -27,6 +27,7 @@ import ( "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/organizations" "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/pipelines" "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/projects" + "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/tasks" "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/testresults" "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/workflows" "github.com/semaphoreio/semaphore/mcp_server/pkg/watchman" @@ -117,6 +118,7 @@ func main() { pipelines.Register(srv, provider) jobs.Register(srv, provider) testresults.Register(srv, provider) + tasks.Register(srv, provider) // Register prompts for agent configuration guidance prompts.Register(srv) diff --git a/mcp_server/pkg/internal_api/periodic_scheduler/periodic_scheduler.pb.go b/mcp_server/pkg/internal_api/periodic_scheduler/periodic_scheduler.pb.go new file mode 100644 index 000000000..3277bebef --- /dev/null +++ b/mcp_server/pkg/internal_api/periodic_scheduler/periodic_scheduler.pb.go @@ -0,0 +1,713 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// This is a manually created file to match the PeriodicScheduler internal API. + +package periodic_scheduler + +import ( + status "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/status" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" +) + +const ( + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Periodic represents a scheduled task configuration. +type Periodic struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Description string `protobuf:"bytes,3,opt,name=description,proto3" json:"description,omitempty"` + ProjectId string `protobuf:"bytes,4,opt,name=project_id,json=projectId,proto3" json:"project_id,omitempty"` + OrganizationId string `protobuf:"bytes,5,opt,name=organization_id,json=organizationId,proto3" json:"organization_id,omitempty"` + Branch string `protobuf:"bytes,6,opt,name=branch,proto3" json:"branch,omitempty"` + PipelineFile string `protobuf:"bytes,7,opt,name=pipeline_file,json=pipelineFile,proto3" json:"pipeline_file,omitempty"` + Schedule string `protobuf:"bytes,8,opt,name=schedule,proto3" json:"schedule,omitempty"` + Paused bool `protobuf:"varint,9,opt,name=paused,proto3" json:"paused,omitempty"` + Suspended bool `protobuf:"varint,10,opt,name=suspended,proto3" json:"suspended,omitempty"` + Parameters []*Parameter `protobuf:"bytes,11,rep,name=parameters,proto3" json:"parameters,omitempty"` + CreatedAt *timestamppb.Timestamp `protobuf:"bytes,12,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` + UpdatedAt *timestamppb.Timestamp `protobuf:"bytes,13,opt,name=updated_at,json=updatedAt,proto3" json:"updated_at,omitempty"` + RequesterId string `protobuf:"bytes,14,opt,name=requester_id,json=requesterId,proto3" json:"requester_id,omitempty"` +} + +func (x *Periodic) Reset() { + *x = Periodic{} +} + +func (x *Periodic) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Periodic) ProtoMessage() {} + +func (x *Periodic) ProtoReflect() protoreflect.Message { + return nil +} + +func (*Periodic) Descriptor() ([]byte, []int) { + return nil, nil +} + +func (x *Periodic) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Periodic) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Periodic) GetDescription() string { + if x != nil { + return x.Description + } + return "" +} + +func (x *Periodic) GetProjectId() string { + if x != nil { + return x.ProjectId + } + return "" +} + +func (x *Periodic) GetOrganizationId() string { + if x != nil { + return x.OrganizationId + } + return "" +} + +func (x *Periodic) GetBranch() string { + if x != nil { + return x.Branch + } + return "" +} + +func (x *Periodic) GetPipelineFile() string { + if x != nil { + return x.PipelineFile + } + return "" +} + +func (x *Periodic) GetSchedule() string { + if x != nil { + return x.Schedule + } + return "" +} + +func (x *Periodic) GetPaused() bool { + if x != nil { + return x.Paused + } + return false +} + +func (x *Periodic) GetSuspended() bool { + if x != nil { + return x.Suspended + } + return false +} + +func (x *Periodic) GetParameters() []*Parameter { + if x != nil { + return x.Parameters + } + return nil +} + +func (x *Periodic) GetCreatedAt() *timestamppb.Timestamp { + if x != nil { + return x.CreatedAt + } + return nil +} + +func (x *Periodic) GetUpdatedAt() *timestamppb.Timestamp { + if x != nil { + return x.UpdatedAt + } + return nil +} + +func (x *Periodic) GetRequesterId() string { + if x != nil { + return x.RequesterId + } + return "" +} + +// Parameter represents a key-value parameter for the task. +type Parameter struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Required bool `protobuf:"varint,3,opt,name=required,proto3" json:"required,omitempty"` + Description string `protobuf:"bytes,4,opt,name=description,proto3" json:"description,omitempty"` +} + +func (x *Parameter) Reset() { + *x = Parameter{} +} + +func (x *Parameter) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Parameter) ProtoMessage() {} + +func (x *Parameter) ProtoReflect() protoreflect.Message { + return nil +} + +func (*Parameter) Descriptor() ([]byte, []int) { + return nil, nil +} + +func (x *Parameter) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Parameter) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +func (x *Parameter) GetRequired() bool { + if x != nil { + return x.Required + } + return false +} + +func (x *Parameter) GetDescription() string { + if x != nil { + return x.Description + } + return "" +} + +// Trigger represents a historical trigger of a task. +type Trigger struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TriggeredAt *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=triggered_at,json=triggeredAt,proto3" json:"triggered_at,omitempty"` + WorkflowId string `protobuf:"bytes,2,opt,name=workflow_id,json=workflowId,proto3" json:"workflow_id,omitempty"` + Status TriggerStatus `protobuf:"varint,3,opt,name=status,proto3,enum=InternalApi.PeriodicScheduler.TriggerStatus" json:"status,omitempty"` + Branch string `protobuf:"bytes,4,opt,name=branch,proto3" json:"branch,omitempty"` + PipelineFile string `protobuf:"bytes,5,opt,name=pipeline_file,json=pipelineFile,proto3" json:"pipeline_file,omitempty"` + ErrorMessage string `protobuf:"bytes,6,opt,name=error_message,json=errorMessage,proto3" json:"error_message,omitempty"` +} + +func (x *Trigger) Reset() { + *x = Trigger{} +} + +func (x *Trigger) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Trigger) ProtoMessage() {} + +func (x *Trigger) ProtoReflect() protoreflect.Message { + return nil +} + +func (*Trigger) Descriptor() ([]byte, []int) { + return nil, nil +} + +func (x *Trigger) GetTriggeredAt() *timestamppb.Timestamp { + if x != nil { + return x.TriggeredAt + } + return nil +} + +func (x *Trigger) GetWorkflowId() string { + if x != nil { + return x.WorkflowId + } + return "" +} + +func (x *Trigger) GetStatus() TriggerStatus { + if x != nil { + return x.Status + } + return TriggerStatus_TRIGGER_STATUS_UNSPECIFIED +} + +func (x *Trigger) GetBranch() string { + if x != nil { + return x.Branch + } + return "" +} + +func (x *Trigger) GetPipelineFile() string { + if x != nil { + return x.PipelineFile + } + return "" +} + +func (x *Trigger) GetErrorMessage() string { + if x != nil { + return x.ErrorMessage + } + return "" +} + +// TriggerStatus represents the status of a trigger. +type TriggerStatus int32 + +const ( + TriggerStatus_TRIGGER_STATUS_UNSPECIFIED TriggerStatus = 0 + TriggerStatus_TRIGGER_STATUS_SCHEDULED TriggerStatus = 1 + TriggerStatus_TRIGGER_STATUS_RUNNING TriggerStatus = 2 + TriggerStatus_TRIGGER_STATUS_PASSED TriggerStatus = 3 + TriggerStatus_TRIGGER_STATUS_FAILED TriggerStatus = 4 + TriggerStatus_TRIGGER_STATUS_STOPPED TriggerStatus = 5 +) + +var TriggerStatus_name = map[int32]string{ + 0: "TRIGGER_STATUS_UNSPECIFIED", + 1: "TRIGGER_STATUS_SCHEDULED", + 2: "TRIGGER_STATUS_RUNNING", + 3: "TRIGGER_STATUS_PASSED", + 4: "TRIGGER_STATUS_FAILED", + 5: "TRIGGER_STATUS_STOPPED", +} + +var TriggerStatus_value = map[string]int32{ + "TRIGGER_STATUS_UNSPECIFIED": 0, + "TRIGGER_STATUS_SCHEDULED": 1, + "TRIGGER_STATUS_RUNNING": 2, + "TRIGGER_STATUS_PASSED": 3, + "TRIGGER_STATUS_FAILED": 4, + "TRIGGER_STATUS_STOPPED": 5, +} + +func (x TriggerStatus) Enum() *TriggerStatus { + p := new(TriggerStatus) + *p = x + return p +} + +func (x TriggerStatus) String() string { + if name, ok := TriggerStatus_name[int32(x)]; ok { + return name + } + return "TRIGGER_STATUS_UNSPECIFIED" +} + +func (TriggerStatus) Descriptor() protoreflect.EnumDescriptor { + return nil +} + +func (TriggerStatus) Type() protoreflect.EnumType { + return nil +} + +func (x TriggerStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +func (TriggerStatus) EnumDescriptor() ([]byte, []int) { + return nil, nil +} + +// ListKeysetRequest is the request for listing tasks. +type ListKeysetRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ProjectId string `protobuf:"bytes,1,opt,name=project_id,json=projectId,proto3" json:"project_id,omitempty"` + OrganizationId string `protobuf:"bytes,2,opt,name=organization_id,json=organizationId,proto3" json:"organization_id,omitempty"` + PageSize int32 `protobuf:"varint,3,opt,name=page_size,json=pageSize,proto3" json:"page_size,omitempty"` + PageToken string `protobuf:"bytes,4,opt,name=page_token,json=pageToken,proto3" json:"page_token,omitempty"` +} + +func (x *ListKeysetRequest) Reset() { + *x = ListKeysetRequest{} +} + +func (x *ListKeysetRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListKeysetRequest) ProtoMessage() {} + +func (x *ListKeysetRequest) ProtoReflect() protoreflect.Message { + return nil +} + +func (*ListKeysetRequest) Descriptor() ([]byte, []int) { + return nil, nil +} + +func (x *ListKeysetRequest) GetProjectId() string { + if x != nil { + return x.ProjectId + } + return "" +} + +func (x *ListKeysetRequest) GetOrganizationId() string { + if x != nil { + return x.OrganizationId + } + return "" +} + +func (x *ListKeysetRequest) GetPageSize() int32 { + if x != nil { + return x.PageSize + } + return 0 +} + +func (x *ListKeysetRequest) GetPageToken() string { + if x != nil { + return x.PageToken + } + return "" +} + +// ListKeysetResponse is the response for listing tasks. +type ListKeysetResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status *status.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + Periodics []*Periodic `protobuf:"bytes,2,rep,name=periodics,proto3" json:"periodics,omitempty"` + NextPageToken string `protobuf:"bytes,3,opt,name=next_page_token,json=nextPageToken,proto3" json:"next_page_token,omitempty"` +} + +func (x *ListKeysetResponse) Reset() { + *x = ListKeysetResponse{} +} + +func (x *ListKeysetResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ListKeysetResponse) ProtoMessage() {} + +func (x *ListKeysetResponse) ProtoReflect() protoreflect.Message { + return nil +} + +func (*ListKeysetResponse) Descriptor() ([]byte, []int) { + return nil, nil +} + +func (x *ListKeysetResponse) GetStatus() *status.Status { + if x != nil { + return x.Status + } + return nil +} + +func (x *ListKeysetResponse) GetPeriodics() []*Periodic { + if x != nil { + return x.Periodics + } + return nil +} + +func (x *ListKeysetResponse) GetNextPageToken() string { + if x != nil { + return x.NextPageToken + } + return "" +} + +// DescribeRequest is the request for describing a task. +type DescribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + OrganizationId string `protobuf:"bytes,2,opt,name=organization_id,json=organizationId,proto3" json:"organization_id,omitempty"` +} + +func (x *DescribeRequest) Reset() { + *x = DescribeRequest{} +} + +func (x *DescribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DescribeRequest) ProtoMessage() {} + +func (x *DescribeRequest) ProtoReflect() protoreflect.Message { + return nil +} + +func (*DescribeRequest) Descriptor() ([]byte, []int) { + return nil, nil +} + +func (x *DescribeRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *DescribeRequest) GetOrganizationId() string { + if x != nil { + return x.OrganizationId + } + return "" +} + +// DescribeResponse is the response for describing a task. +type DescribeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status *status.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + Periodic *Periodic `protobuf:"bytes,2,opt,name=periodic,proto3" json:"periodic,omitempty"` + RecentTriggers []*Trigger `protobuf:"bytes,3,rep,name=recent_triggers,json=recentTriggers,proto3" json:"recent_triggers,omitempty"` +} + +func (x *DescribeResponse) Reset() { + *x = DescribeResponse{} +} + +func (x *DescribeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DescribeResponse) ProtoMessage() {} + +func (x *DescribeResponse) ProtoReflect() protoreflect.Message { + return nil +} + +func (*DescribeResponse) Descriptor() ([]byte, []int) { + return nil, nil +} + +func (x *DescribeResponse) GetStatus() *status.Status { + if x != nil { + return x.Status + } + return nil +} + +func (x *DescribeResponse) GetPeriodic() *Periodic { + if x != nil { + return x.Periodic + } + return nil +} + +func (x *DescribeResponse) GetRecentTriggers() []*Trigger { + if x != nil { + return x.RecentTriggers + } + return nil +} + +// RunNowRequest is the request for triggering a task immediately. +type RunNowRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + OrganizationId string `protobuf:"bytes,2,opt,name=organization_id,json=organizationId,proto3" json:"organization_id,omitempty"` + ProjectId string `protobuf:"bytes,3,opt,name=project_id,json=projectId,proto3" json:"project_id,omitempty"` + RequesterId string `protobuf:"bytes,4,opt,name=requester_id,json=requesterId,proto3" json:"requester_id,omitempty"` + Branch string `protobuf:"bytes,5,opt,name=branch,proto3" json:"branch,omitempty"` + PipelineFile string `protobuf:"bytes,6,opt,name=pipeline_file,json=pipelineFile,proto3" json:"pipeline_file,omitempty"` + Parameters []*Parameter `protobuf:"bytes,7,rep,name=parameters,proto3" json:"parameters,omitempty"` +} + +func (x *RunNowRequest) Reset() { + *x = RunNowRequest{} +} + +func (x *RunNowRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RunNowRequest) ProtoMessage() {} + +func (x *RunNowRequest) ProtoReflect() protoreflect.Message { + return nil +} + +func (*RunNowRequest) Descriptor() ([]byte, []int) { + return nil, nil +} + +func (x *RunNowRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *RunNowRequest) GetOrganizationId() string { + if x != nil { + return x.OrganizationId + } + return "" +} + +func (x *RunNowRequest) GetProjectId() string { + if x != nil { + return x.ProjectId + } + return "" +} + +func (x *RunNowRequest) GetRequesterId() string { + if x != nil { + return x.RequesterId + } + return "" +} + +func (x *RunNowRequest) GetBranch() string { + if x != nil { + return x.Branch + } + return "" +} + +func (x *RunNowRequest) GetPipelineFile() string { + if x != nil { + return x.PipelineFile + } + return "" +} + +func (x *RunNowRequest) GetParameters() []*Parameter { + if x != nil { + return x.Parameters + } + return nil +} + +// RunNowResponse is the response for triggering a task immediately. +type RunNowResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Status *status.Status `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + WorkflowId string `protobuf:"bytes,2,opt,name=workflow_id,json=workflowId,proto3" json:"workflow_id,omitempty"` + PeriodicId string `protobuf:"bytes,3,opt,name=periodic_id,json=periodicId,proto3" json:"periodic_id,omitempty"` + PeriodicName string `protobuf:"bytes,4,opt,name=periodic_name,json=periodicName,proto3" json:"periodic_name,omitempty"` + Branch string `protobuf:"bytes,5,opt,name=branch,proto3" json:"branch,omitempty"` + PipelineFile string `protobuf:"bytes,6,opt,name=pipeline_file,json=pipelineFile,proto3" json:"pipeline_file,omitempty"` + TriggeredAt *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=triggered_at,json=triggeredAt,proto3" json:"triggered_at,omitempty"` +} + +func (x *RunNowResponse) Reset() { + *x = RunNowResponse{} +} + +func (x *RunNowResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RunNowResponse) ProtoMessage() {} + +func (x *RunNowResponse) ProtoReflect() protoreflect.Message { + return nil +} + +func (*RunNowResponse) Descriptor() ([]byte, []int) { + return nil, nil +} + +func (x *RunNowResponse) GetStatus() *status.Status { + if x != nil { + return x.Status + } + return nil +} + +func (x *RunNowResponse) GetWorkflowId() string { + if x != nil { + return x.WorkflowId + } + return "" +} + +func (x *RunNowResponse) GetPeriodicId() string { + if x != nil { + return x.PeriodicId + } + return "" +} + +func (x *RunNowResponse) GetPeriodicName() string { + if x != nil { + return x.PeriodicName + } + return "" +} + +func (x *RunNowResponse) GetBranch() string { + if x != nil { + return x.Branch + } + return "" +} + +func (x *RunNowResponse) GetPipelineFile() string { + if x != nil { + return x.PipelineFile + } + return "" +} + +func (x *RunNowResponse) GetTriggeredAt() *timestamppb.Timestamp { + if x != nil { + return x.TriggeredAt + } + return nil +} + +// Ensure unused imports are referenced. +var _ = reflect.TypeOf diff --git a/mcp_server/pkg/internal_api/periodic_scheduler/periodic_scheduler_grpc.pb.go b/mcp_server/pkg/internal_api/periodic_scheduler/periodic_scheduler_grpc.pb.go new file mode 100644 index 000000000..5ba117d58 --- /dev/null +++ b/mcp_server/pkg/internal_api/periodic_scheduler/periodic_scheduler_grpc.pb.go @@ -0,0 +1,99 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// This is a manually created file to match the PeriodicScheduler internal API. + +package periodic_scheduler + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// Ensure unused imports are referenced +var _ context.Context +var _ grpc.ClientConnInterface +var _ codes.Code +var _ = status.Code + +const ( + PeriodicScheduler_ListKeyset_FullMethodName = "/InternalApi.PeriodicScheduler/ListKeyset" + PeriodicScheduler_Describe_FullMethodName = "/InternalApi.PeriodicScheduler/Describe" + PeriodicScheduler_RunNow_FullMethodName = "/InternalApi.PeriodicScheduler/RunNow" +) + +// PeriodicSchedulerClient is the client API for PeriodicScheduler service. +type PeriodicSchedulerClient interface { + // ListKeyset returns a list of tasks for a project with keyset pagination. + ListKeyset(ctx context.Context, in *ListKeysetRequest, opts ...grpc.CallOption) (*ListKeysetResponse, error) + // Describe returns details about a specific task including recent triggers. + Describe(ctx context.Context, in *DescribeRequest, opts ...grpc.CallOption) (*DescribeResponse, error) + // RunNow triggers a task to run immediately. + RunNow(ctx context.Context, in *RunNowRequest, opts ...grpc.CallOption) (*RunNowResponse, error) +} + +type periodicSchedulerClient struct { + cc grpc.ClientConnInterface +} + +// NewPeriodicSchedulerClient creates a new PeriodicScheduler client. +func NewPeriodicSchedulerClient(cc grpc.ClientConnInterface) PeriodicSchedulerClient { + return &periodicSchedulerClient{cc} +} + +func (c *periodicSchedulerClient) ListKeyset(ctx context.Context, in *ListKeysetRequest, opts ...grpc.CallOption) (*ListKeysetResponse, error) { + out := new(ListKeysetResponse) + err := c.cc.Invoke(ctx, PeriodicScheduler_ListKeyset_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *periodicSchedulerClient) Describe(ctx context.Context, in *DescribeRequest, opts ...grpc.CallOption) (*DescribeResponse, error) { + out := new(DescribeResponse) + err := c.cc.Invoke(ctx, PeriodicScheduler_Describe_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *periodicSchedulerClient) RunNow(ctx context.Context, in *RunNowRequest, opts ...grpc.CallOption) (*RunNowResponse, error) { + out := new(RunNowResponse) + err := c.cc.Invoke(ctx, PeriodicScheduler_RunNow_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// PeriodicSchedulerServer is the server API for PeriodicScheduler service. +type PeriodicSchedulerServer interface { + ListKeyset(context.Context, *ListKeysetRequest) (*ListKeysetResponse, error) + Describe(context.Context, *DescribeRequest) (*DescribeResponse, error) + RunNow(context.Context, *RunNowRequest) (*RunNowResponse, error) + mustEmbedUnimplementedPeriodicSchedulerServer() +} + +// UnimplementedPeriodicSchedulerServer must be embedded to have forward compatible implementations. +type UnimplementedPeriodicSchedulerServer struct{} + +func (UnimplementedPeriodicSchedulerServer) ListKeyset(context.Context, *ListKeysetRequest) (*ListKeysetResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method ListKeyset not implemented") +} + +func (UnimplementedPeriodicSchedulerServer) Describe(context.Context, *DescribeRequest) (*DescribeResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Describe not implemented") +} + +func (UnimplementedPeriodicSchedulerServer) RunNow(context.Context, *RunNowRequest) (*RunNowResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RunNow not implemented") +} + +func (UnimplementedPeriodicSchedulerServer) mustEmbedUnimplementedPeriodicSchedulerServer() {} + +// UnsafePeriodicSchedulerServer may be embedded to opt out of forward compatibility for this service. +type UnsafePeriodicSchedulerServer interface { + mustEmbedUnimplementedPeriodicSchedulerServer() +} diff --git a/mcp_server/pkg/internalapi/config.go b/mcp_server/pkg/internalapi/config.go index 7caa1fc87..73541a7c7 100644 --- a/mcp_server/pkg/internalapi/config.go +++ b/mcp_server/pkg/internalapi/config.go @@ -64,6 +64,10 @@ var ( "INTERNAL_API_URL_FEATURE", "MCP_FEATURE_GRPC_ENDPOINT", } + schedulerEndpointEnvs = []string{ + "INTERNAL_API_URL_SCHEDULER", + "MCP_SCHEDULER_GRPC_ENDPOINT", + } ) // Config captures the connection settings for talking to internal API services. @@ -79,6 +83,7 @@ type Config struct { UserEndpoint string RBACEndpoint string FeatureHubEndpoint string + SchedulerEndpoint string BaseURL string @@ -109,6 +114,7 @@ func LoadConfig() (Config, error) { UserEndpoint: endpointFromEnv(userEndpointEnvs...), RBACEndpoint: endpointFromEnv(rbacEndpointEnvs...), FeatureHubEndpoint: endpointFromEnv(featureHubEndpointEnvs...), + SchedulerEndpoint: endpointFromEnv(schedulerEndpointEnvs...), BaseURL: baseURLFromEnv(), DialTimeout: dialTimeout, CallTimeout: callTimeout, diff --git a/mcp_server/pkg/internalapi/manager.go b/mcp_server/pkg/internalapi/manager.go index d89fda00e..012cc3f9e 100644 --- a/mcp_server/pkg/internalapi/manager.go +++ b/mcp_server/pkg/internalapi/manager.go @@ -11,6 +11,7 @@ import ( loghubpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/loghub" loghub2pb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/loghub2" orgpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/organization" + schedulerpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/periodic_scheduler" pipelinepb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/plumber.pipeline" workflowpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/plumber_w_f.workflow" projecthubpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/projecthub" @@ -37,6 +38,7 @@ type Provider interface { Users() userpb.UserServiceClient RBAC() rbacpb.RBACClient Features() featuresvc.FeatureClient + Scheduler() schedulerpb.PeriodicSchedulerClient } // Manager owns gRPC connections to internal API services and exposes typed clients. @@ -53,6 +55,7 @@ type Manager struct { loghub2Conn *grpc.ClientConn userConn *grpc.ClientConn rbacConn *grpc.ClientConn + schedulerConn *grpc.ClientConn workflowClient workflowpb.WorkflowServiceClient organizationClient orgpb.OrganizationServiceClient @@ -64,6 +67,7 @@ type Manager struct { loghub2Client loghub2pb.Loghub2Client userClient userpb.UserServiceClient rbacClient rbacpb.RBACClient + schedulerClient schedulerpb.PeriodicSchedulerClient featuresService featuresvc.FeatureClient } @@ -132,6 +136,9 @@ func NewManager(ctx context.Context, cfg Config) (*Manager, error) { if m.rbacConn, err = dial(cfg.RBACEndpoint); err != nil { return nil, handleDialError("rbac", err) } + if m.schedulerConn, err = dial(cfg.SchedulerEndpoint); err != nil { + return nil, handleDialError("scheduler", err) + } if m.workflowConn != nil { m.workflowClient = workflowpb.NewWorkflowServiceClient(m.workflowConn) @@ -163,6 +170,9 @@ func NewManager(ctx context.Context, cfg Config) (*Manager, error) { if m.rbacConn != nil { m.rbacClient = rbacpb.NewRBACClient(m.rbacConn) } + if m.schedulerConn != nil { + m.schedulerClient = schedulerpb.NewPeriodicSchedulerClient(m.schedulerConn) + } cacheService := featuresvc.NewCacheService() @@ -185,6 +195,7 @@ func (m *Manager) Close() error { m.loghub2Conn, m.userConn, m.rbacConn, + m.schedulerConn, } for _, conn := range closers { if conn == nil { @@ -255,6 +266,10 @@ func (m *Manager) Features() featuresvc.FeatureClient { return m.featuresService } +func (m *Manager) Scheduler() schedulerpb.PeriodicSchedulerClient { + return m.schedulerClient +} + func joinErrors(errs []error) error { if len(errs) == 1 { return errs[0] diff --git a/mcp_server/pkg/tools/tasks/describe.go b/mcp_server/pkg/tools/tasks/describe.go new file mode 100644 index 000000000..0cab18028 --- /dev/null +++ b/mcp_server/pkg/tools/tasks/describe.go @@ -0,0 +1,367 @@ +package tasks + +import ( + "context" + "fmt" + "strings" + + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" + "github.com/sirupsen/logrus" + + "github.com/semaphoreio/semaphore/mcp_server/pkg/authz" + schedulerpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/periodic_scheduler" + "github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi" + "github.com/semaphoreio/semaphore/mcp_server/pkg/logging" + "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/internal/shared" +) + +func describeFullDescription() string { + return `Get detailed information about a scheduled task (periodic). + +Use this when you need to answer: +- "Show me the details of task X" +- "What parameters does this task have?" +- "When did this task last run?" +- "What is the trigger history for this task?" + +- organization_id: identify which organization context (required) +- task_id: the UUID of the task to describe (required) + +Response modes: +- summary (default): task details, status, schedule +- detailed: adds recent trigger history and parameters + +Examples: +1. Get task details: + tasks_describe(task_id="...", organization_id="...") + +2. Get detailed trigger history: + tasks_describe(task_id="...", organization_id="...", mode="detailed") + +Next steps: +- Call tasks_run(task_id="...") to trigger this task immediately +- Call workflows_search() to find workflows triggered by this task` +} + +func newDescribeTool(name, description string) mcp.Tool { + return mcp.NewTool( + name, + mcp.WithDescription(description), + mcp.WithString("task_id", + mcp.Required(), + mcp.Description("Task UUID to describe. Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx."), + mcp.Pattern(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`), + ), + mcp.WithString("organization_id", + mcp.Required(), + mcp.Description("Organization UUID associated with the task. Keep this consistent across subsequent tool calls."), + mcp.Pattern(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`), + ), + mcp.WithString("mode", + mcp.Description("Response detail. Use 'summary' for compact output; 'detailed' adds trigger history and parameters."), + mcp.Enum("summary", "detailed"), + mcp.DefaultString("summary"), + ), + mcp.WithReadOnlyHintAnnotation(true), + mcp.WithIdempotentHintAnnotation(true), + mcp.WithOpenWorldHintAnnotation(true), + ) +} + +type taskParameter struct { + Name string `json:"name"` + Value string `json:"value,omitempty"` + Required bool `json:"required"` + Description string `json:"description,omitempty"` +} + +type taskDetail struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + ProjectID string `json:"project_id"` + Branch string `json:"branch"` + PipelineFile string `json:"pipeline_file"` + Schedule string `json:"schedule,omitempty"` + Parameters []taskParameter `json:"parameters,omitempty"` + Paused bool `json:"paused"` + Suspended bool `json:"suspended"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` +} + +type trigger struct { + TriggeredAt string `json:"triggered_at"` + WorkflowID string `json:"workflow_id,omitempty"` + Status string `json:"status"` + Branch string `json:"branch"` + PipelineFile string `json:"pipeline_file"` + ErrorMessage string `json:"error_message,omitempty"` +} + +type describeResult struct { + Task taskDetail `json:"task"` + RecentTriggers []trigger `json:"recent_triggers,omitempty"` +} + +func describeHandler(api internalapi.Provider) server.ToolHandlerFunc { + return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { + client := api.Scheduler() + if client == nil { + return mcp.NewToolResultError(missingSchedulerError), nil + } + + taskIDRaw, err := req.RequireString("task_id") + if err != nil { + return mcp.NewToolResultError(`Missing required argument: task_id. Provide the task UUID (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).`), nil + } + taskID := strings.TrimSpace(taskIDRaw) + if err := shared.ValidateUUID(taskID, "task_id"); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + orgIDRaw, err := req.RequireString("organization_id") + if err != nil { + return mcp.NewToolResultError(`Missing required argument: organization_id. Provide the organization UUID returned by organizations_list.`), nil + } + orgID := strings.TrimSpace(orgIDRaw) + if err := shared.ValidateUUID(orgID, "organization_id"); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + tracker := shared.TrackToolExecution(ctx, describeToolName, orgID) + defer tracker.Cleanup() + + if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + mode, err := shared.NormalizeMode(req.GetString("mode", "summary")) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Invalid mode parameter: %v", err)), nil + } + + userID := strings.ToLower(strings.TrimSpace(req.Header.Get("X-Semaphore-User-ID"))) + if err := shared.ValidateUUID(userID, "x-semaphore-user-id header"); err != nil { + return mcp.NewToolResultError(fmt.Sprintf(`%v + +The authentication layer must inject the X-Semaphore-User-ID header so we can authorize task access.`, err)), nil + } + + request := &schedulerpb.DescribeRequest{ + Id: taskID, + OrganizationId: orgID, + } + + callCtx, cancel := context.WithTimeout(ctx, api.CallTimeout()) + defer cancel() + + resp, err := client.Describe(callCtx, request) + if err != nil { + logging.ForComponent("rpc"). + WithFields(logrus.Fields{ + "rpc": "scheduler.Describe", + "taskId": taskID, + "mode": mode, + }). + WithError(err). + Error("scheduler describe RPC failed") + return mcp.NewToolResultError(fmt.Sprintf(`Task describe RPC failed: %v + +Possible causes: +- Task does not exist or you lack access rights +- Internal scheduler service is unavailable (retry shortly) +- Network connectivity issues between MCP server and scheduler service`, err)), nil + } + + if err := shared.CheckStatus(resp.GetStatus()); err != nil { + logging.ForComponent("rpc"). + WithFields(logrus.Fields{ + "rpc": "scheduler.Describe", + "taskId": taskID, + }). + WithError(err). + Warn("scheduler describe returned non-OK status") + return mcp.NewToolResultError(fmt.Sprintf(`Request failed: %v + +Double-check that: +- task_id is correct +- You have permission to view this task +- The organization is active and not suspended`, err)), nil + } + + periodic := resp.GetPeriodic() + if periodic == nil { + return mcp.NewToolResultError("Task not found or access denied."), nil + } + + // Check permission on the project that owns this task + projectID := periodic.GetProjectId() + if projectID != "" { + if err := authz.CheckProjectPermission(ctx, api, userID, orgID, projectID, schedulerViewPermission); err != nil { + return shared.ProjectAuthorizationError(err, orgID, projectID, schedulerViewPermission), nil + } + } + + // Build parameters list + var params []taskParameter + for _, p := range periodic.GetParameters() { + if p == nil { + continue + } + params = append(params, taskParameter{ + Name: p.GetName(), + Value: p.GetValue(), + Required: p.GetRequired(), + Description: p.GetDescription(), + }) + } + + detail := taskDetail{ + ID: periodic.GetId(), + Name: periodic.GetName(), + Description: periodic.GetDescription(), + ProjectID: periodic.GetProjectId(), + Branch: periodic.GetBranch(), + PipelineFile: periodic.GetPipelineFile(), + Schedule: periodic.GetSchedule(), + Parameters: params, + Paused: periodic.GetPaused(), + Suspended: periodic.GetSuspended(), + CreatedAt: shared.FormatTimestamp(periodic.GetCreatedAt()), + UpdatedAt: shared.FormatTimestamp(periodic.GetUpdatedAt()), + } + + // Build triggers list + var triggers []trigger + for _, t := range resp.GetRecentTriggers() { + if t == nil { + continue + } + triggers = append(triggers, trigger{ + TriggeredAt: shared.FormatTimestamp(t.GetTriggeredAt()), + WorkflowID: t.GetWorkflowId(), + Status: triggerStatusToString(t.GetStatus()), + Branch: t.GetBranch(), + PipelineFile: t.GetPipelineFile(), + ErrorMessage: t.GetErrorMessage(), + }) + } + + result := describeResult{ + Task: detail, + RecentTriggers: triggers, + } + + markdown := formatTaskDescribeMarkdown(result, mode) + markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + + tracker.MarkSuccess() + return &mcp.CallToolResult{ + Content: []mcp.Content{ + mcp.NewTextContent(markdown), + }, + StructuredContent: result, + }, nil + } +} + +func triggerStatusToString(status schedulerpb.TriggerStatus) string { + switch status { + case schedulerpb.TriggerStatus_TRIGGER_STATUS_SCHEDULED: + return "scheduled" + case schedulerpb.TriggerStatus_TRIGGER_STATUS_RUNNING: + return "running" + case schedulerpb.TriggerStatus_TRIGGER_STATUS_PASSED: + return "passed" + case schedulerpb.TriggerStatus_TRIGGER_STATUS_FAILED: + return "failed" + case schedulerpb.TriggerStatus_TRIGGER_STATUS_STOPPED: + return "stopped" + default: + return "unknown" + } +} + +func formatTaskDescribeMarkdown(result describeResult, mode string) string { + mb := shared.NewMarkdownBuilder() + + task := result.Task + mb.H1(fmt.Sprintf("Task: %s", task.Name)) + + mb.KeyValue("ID", fmt.Sprintf("`%s`", task.ID)) + if task.ProjectID != "" { + mb.KeyValue("Project ID", fmt.Sprintf("`%s`", task.ProjectID)) + } + + status := "Active" + if task.Paused { + status = "Paused" + } + if task.Suspended { + status = "Suspended" + } + mb.KeyValue("Status", status) + + if task.Branch != "" { + mb.KeyValue("Branch", task.Branch) + } + if task.PipelineFile != "" { + mb.KeyValue("Pipeline File", task.PipelineFile) + } + if task.Schedule != "" { + mb.KeyValue("Schedule", fmt.Sprintf("`%s`", task.Schedule)) + } + + if task.Description != "" { + mb.Line() + mb.Paragraph(fmt.Sprintf("**Description:** %s", task.Description)) + } + + if mode == "detailed" { + if task.CreatedAt != "" { + mb.KeyValue("Created At", task.CreatedAt) + } + if task.UpdatedAt != "" { + mb.KeyValue("Updated At", task.UpdatedAt) + } + + if len(task.Parameters) > 0 { + mb.Line() + mb.H2("Parameters") + for _, p := range task.Parameters { + reqStr := "" + if p.Required { + reqStr = " (required)" + } + if p.Description != "" { + mb.ListItem(fmt.Sprintf("`%s`%s: %s", p.Name, reqStr, p.Description)) + } else { + mb.ListItem(fmt.Sprintf("`%s`%s", p.Name, reqStr)) + } + } + } + + if len(result.RecentTriggers) > 0 { + mb.Line() + mb.H2("Recent Triggers") + for _, t := range result.RecentTriggers { + mb.Line() + mb.KeyValue("Triggered At", t.TriggeredAt) + mb.KeyValue("Status", t.Status) + if t.WorkflowID != "" { + mb.KeyValue("Workflow ID", fmt.Sprintf("`%s`", t.WorkflowID)) + } + if t.Branch != "" { + mb.KeyValue("Branch", t.Branch) + } + if t.ErrorMessage != "" { + mb.KeyValue("Error", t.ErrorMessage) + } + } + } + } + + return mb.String() +} diff --git a/mcp_server/pkg/tools/tasks/list.go b/mcp_server/pkg/tools/tasks/list.go new file mode 100644 index 000000000..b6603ab0d --- /dev/null +++ b/mcp_server/pkg/tools/tasks/list.go @@ -0,0 +1,312 @@ +package tasks + +import ( + "context" + "fmt" + "strings" + + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" + "github.com/sirupsen/logrus" + + "github.com/semaphoreio/semaphore/mcp_server/pkg/authz" + schedulerpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/periodic_scheduler" + "github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi" + "github.com/semaphoreio/semaphore/mcp_server/pkg/logging" + "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/internal/shared" + "github.com/semaphoreio/semaphore/mcp_server/pkg/utils" +) + +func listFullDescription() string { + return `List scheduled tasks (periodics) for a project. + +Use this when you need to answer: +- "Show me all scheduled tasks for project X" +- "What recurring jobs are configured?" +- "List tasks that run on a schedule" + +- organization_id: identify which organization's project you are querying (required) +- project_id: identify which project to list tasks from (required) +- cursor: paginate through results using the previous response's next_cursor +- limit: number of tasks to return (default 20, max 100) + +Response modes: +- summary (default): task ID, name, branch, schedule, paused/suspended status +- detailed: adds description, pipeline file, parameters, timestamps + +Examples: +1. List all tasks for a project: + tasks_list(project_id="...", organization_id="...") + +2. Paginate through more tasks: + tasks_list(project_id="...", organization_id="...", cursor="opaque-token") + +Next steps: +- Call tasks_describe(task_id="...") to get more details about a specific task +- Call tasks_run(task_id="...") to trigger a task immediately` +} + +func newListTool(name, description string) mcp.Tool { + return mcp.NewTool( + name, + mcp.WithDescription(description), + mcp.WithString("project_id", + mcp.Required(), + mcp.Description("Project UUID that scopes the task search. Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx."), + mcp.Pattern(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`), + ), + mcp.WithString("organization_id", + mcp.Required(), + mcp.Description("Organization UUID associated with the project. Keep this consistent across subsequent tool calls."), + mcp.Pattern(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`), + ), + mcp.WithString("cursor", + mcp.Description("Pagination token from a prior call's next_cursor. Use to fetch more tasks."), + ), + mcp.WithNumber("limit", + mcp.Description("Number of tasks to return (1-100). Defaults to 20."), + mcp.Min(1), + mcp.Max(maxLimit), + mcp.DefaultNumber(defaultLimit), + ), + mcp.WithString("mode", + mcp.Description("Response detail. Use 'summary' for compact output; 'detailed' adds description, parameters, and timestamps."), + mcp.Enum("summary", "detailed"), + mcp.DefaultString("summary"), + ), + mcp.WithReadOnlyHintAnnotation(true), + mcp.WithIdempotentHintAnnotation(true), + mcp.WithOpenWorldHintAnnotation(true), + ) +} + +type taskSummary struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Branch string `json:"branch"` + PipelineFile string `json:"pipeline_file"` + Schedule string `json:"schedule,omitempty"` + Paused bool `json:"paused"` + Suspended bool `json:"suspended"` + UpdatedAt string `json:"updated_at,omitempty"` +} + +type listResult struct { + Tasks []taskSummary `json:"tasks"` + NextCursor string `json:"next_cursor,omitempty"` +} + +func listHandler(api internalapi.Provider) server.ToolHandlerFunc { + return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { + client := api.Scheduler() + if client == nil { + return mcp.NewToolResultError(missingSchedulerError), nil + } + + projectIDRaw, err := req.RequireString("project_id") + if err != nil { + return mcp.NewToolResultError(`Missing required argument: project_id. Provide the project UUID (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).`), nil + } + projectID := strings.TrimSpace(projectIDRaw) + if err := shared.ValidateUUID(projectID, "project_id"); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + orgIDRaw, err := req.RequireString("organization_id") + if err != nil { + return mcp.NewToolResultError(`Missing required argument: organization_id. Provide the organization UUID returned by organizations_list.`), nil + } + orgID := strings.TrimSpace(orgIDRaw) + if err := shared.ValidateUUID(orgID, "organization_id"); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + tracker := shared.TrackToolExecution(ctx, listToolName, orgID) + defer tracker.Cleanup() + + if err := shared.EnsureReadToolsFeature(ctx, api, orgID); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + mode, err := shared.NormalizeMode(req.GetString("mode", "summary")) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Invalid mode parameter: %v", err)), nil + } + + cursor, err := shared.SanitizeCursorToken(req.GetString("cursor", ""), "cursor") + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + userID := strings.ToLower(strings.TrimSpace(req.Header.Get("X-Semaphore-User-ID"))) + if err := shared.ValidateUUID(userID, "x-semaphore-user-id header"); err != nil { + return mcp.NewToolResultError(fmt.Sprintf(`%v + +The authentication layer must inject the X-Semaphore-User-ID header so we can scope task searches to the authenticated caller. + +Troubleshooting: +- Ensure requests pass through the auth proxy +- Verify the header value is a UUID (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx) +- Retry once the header is present`, err)), nil + } + + if err := authz.CheckProjectPermission(ctx, api, userID, orgID, projectID, schedulerViewPermission); err != nil { + return shared.ProjectAuthorizationError(err, orgID, projectID, schedulerViewPermission), nil + } + + limit := req.GetInt("limit", defaultLimit) + if limit <= 0 { + limit = defaultLimit + } else if limit > maxLimit { + limit = maxLimit + } + + pageSize, err := utils.IntToInt32(limit, "limit") + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + request := &schedulerpb.ListKeysetRequest{ + ProjectId: projectID, + OrganizationId: orgID, + PageSize: pageSize, + PageToken: cursor, + } + + callCtx, cancel := context.WithTimeout(ctx, api.CallTimeout()) + defer cancel() + + resp, err := client.ListKeyset(callCtx, request) + if err != nil { + logging.ForComponent("rpc"). + WithFields(logrus.Fields{ + "rpc": "scheduler.ListKeyset", + "projectId": projectID, + "limit": limit, + "cursor": cursor, + "mode": mode, + }). + WithError(err). + Error("scheduler list RPC failed") + return mcp.NewToolResultError(fmt.Sprintf(`Task list RPC failed: %v + +Possible causes: +- Project does not exist or you lack access rights +- Internal scheduler service is unavailable (retry shortly) +- Network connectivity issues between MCP server and scheduler service + +Try reducing the limit or removing filters to see if results return.`, err)), nil + } + + if err := shared.CheckStatus(resp.GetStatus()); err != nil { + logging.ForComponent("rpc"). + WithFields(logrus.Fields{ + "rpc": "scheduler.ListKeyset", + "projectId": projectID, + }). + WithError(err). + Warn("scheduler list returned non-OK status") + return mcp.NewToolResultError(fmt.Sprintf(`Request failed: %v + +Double-check that: +- project_id is correct +- You have permission to view tasks for this project +- The organization is active and not suspended`, err)), nil + } + + tasks := make([]taskSummary, 0, len(resp.GetPeriodics())) + for _, p := range resp.GetPeriodics() { + if p == nil { + continue + } + tasks = append(tasks, taskSummary{ + ID: p.GetId(), + Name: p.GetName(), + Description: p.GetDescription(), + Branch: p.GetBranch(), + PipelineFile: p.GetPipelineFile(), + Schedule: p.GetSchedule(), + Paused: p.GetPaused(), + Suspended: p.GetSuspended(), + UpdatedAt: shared.FormatTimestamp(p.GetUpdatedAt()), + }) + } + + result := listResult{Tasks: tasks} + if token := strings.TrimSpace(resp.GetNextPageToken()); token != "" { + result.NextCursor = token + } + + markdown := formatTasksListMarkdown(result, mode, projectID, orgID, limit) + markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + + tracker.MarkSuccess() + return &mcp.CallToolResult{ + Content: []mcp.Content{ + mcp.NewTextContent(markdown), + }, + StructuredContent: result, + }, nil + } +} + +func formatTasksListMarkdown(result listResult, mode, projectID, orgID string, limit int) string { + mb := shared.NewMarkdownBuilder() + + header := fmt.Sprintf("Tasks (%d returned)", len(result.Tasks)) + mb.H1(header) + + if len(result.Tasks) == 0 { + mb.Paragraph("No scheduled tasks found for this project.") + mb.Paragraph("**Suggestions:**") + mb.ListItem("Verify the project_id is correct") + mb.ListItem("Check if the project has any scheduled tasks configured") + mb.ListItem("Confirm the authenticated user has permission to view this project") + return mb.String() + } + + for idx, task := range result.Tasks { + if idx > 0 { + mb.Line() + } + + mb.H2(fmt.Sprintf("Task: %s", task.Name)) + mb.KeyValue("ID", fmt.Sprintf("`%s`", task.ID)) + + if task.Branch != "" { + mb.KeyValue("Branch", task.Branch) + } + if task.Schedule != "" { + mb.KeyValue("Schedule", fmt.Sprintf("`%s`", task.Schedule)) + } + + status := "Active" + if task.Paused { + status = "Paused" + } + if task.Suspended { + status = "Suspended" + } + mb.KeyValue("Status", status) + + if mode == "detailed" { + if task.Description != "" { + mb.KeyValue("Description", task.Description) + } + if task.PipelineFile != "" { + mb.KeyValue("Pipeline File", task.PipelineFile) + } + if task.UpdatedAt != "" { + mb.KeyValue("Updated At", task.UpdatedAt) + } + } + } + + mb.Line() + if result.NextCursor != "" { + mb.Paragraph(fmt.Sprintf("More available. Use `cursor=\"%s\"`", result.NextCursor)) + } + + return mb.String() +} diff --git a/mcp_server/pkg/tools/tasks/register.go b/mcp_server/pkg/tools/tasks/register.go new file mode 100644 index 000000000..c39c12131 --- /dev/null +++ b/mcp_server/pkg/tools/tasks/register.go @@ -0,0 +1,28 @@ +package tasks + +import ( + "github.com/mark3labs/mcp-go/server" + + "github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi" +) + +const ( + listToolName = "tasks_list" + describeToolName = "tasks_describe" + runToolName = "tasks_run" + + defaultLimit = 20 + maxLimit = 100 + + missingSchedulerError = "scheduler gRPC endpoint is not configured" + + schedulerViewPermission = "project.scheduler.view" + schedulerRunPermission = "project.scheduler.run_manually" +) + +// Register wires task tooling into the MCP server. +func Register(s *server.MCPServer, api internalapi.Provider) { + s.AddTool(newListTool(listToolName, listFullDescription()), listHandler(api)) + s.AddTool(newDescribeTool(describeToolName, describeFullDescription()), describeHandler(api)) + s.AddTool(newRunTool(runToolName, runFullDescription()), runHandler(api)) +} diff --git a/mcp_server/pkg/tools/tasks/run.go b/mcp_server/pkg/tools/tasks/run.go new file mode 100644 index 000000000..979ed99e1 --- /dev/null +++ b/mcp_server/pkg/tools/tasks/run.go @@ -0,0 +1,364 @@ +package tasks + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "unicode/utf8" + + "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/server" + "github.com/sirupsen/logrus" + + "github.com/semaphoreio/semaphore/mcp_server/pkg/authz" + schedulerpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/periodic_scheduler" + "github.com/semaphoreio/semaphore/mcp_server/pkg/internalapi" + "github.com/semaphoreio/semaphore/mcp_server/pkg/logging" + "github.com/semaphoreio/semaphore/mcp_server/pkg/tools/internal/shared" +) + +func runFullDescription() string { + return `Trigger a scheduled task to run immediately. + +Use this when you need to: +- Manually trigger a scheduled task without waiting for the schedule +- Run a task with custom branch or parameters + +Required inputs: +- organization_id: Organization UUID that owns the project (required) +- project_id: Project UUID containing the task (required) +- task_id: Task UUID to trigger (required) + +Optional inputs: +- branch: Override the task's default branch +- pipeline_file: Override the task's default pipeline file +- parameters: A key/value map of parameters to override (values convert to strings) + +The authenticated user must have the 'project.scheduler.run_manually' permission. + +Examples: +1. Trigger a task: + tasks_run(task_id="...", project_id="...", organization_id="...") + +2. Trigger with a different branch: + tasks_run(task_id="...", project_id="...", organization_id="...", branch="develop") + +3. Trigger with custom parameters: + tasks_run(task_id="...", project_id="...", organization_id="...", parameters={"ENV": "staging"})` +} + +func newRunTool(name, description string) mcp.Tool { + return mcp.NewTool( + name, + mcp.WithDescription(description), + mcp.WithString("task_id", + mcp.Required(), + mcp.Description("Task UUID to trigger. Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx."), + mcp.Pattern(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`), + ), + mcp.WithString("project_id", + mcp.Required(), + mcp.Description("Project UUID containing the task. Format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx."), + mcp.Pattern(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`), + ), + mcp.WithString("organization_id", + mcp.Required(), + mcp.Description("Organization UUID that owns the project."), + mcp.Pattern(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`), + ), + mcp.WithString("branch", + mcp.Description("Optional branch to override the task's default branch."), + ), + mcp.WithString("pipeline_file", + mcp.Description("Optional pipeline file path to override the task's default."), + ), + mcp.WithObject("parameters", + mcp.Description("Optional key/value parameters to pass to the task run."), + mcp.AdditionalProperties(map[string]any{ + "oneOf": []any{ + map[string]any{"type": "string"}, + map[string]any{"type": "number"}, + map[string]any{"type": "boolean"}, + map[string]any{"type": "null"}, + }, + }), + ), + mcp.WithIdempotentHintAnnotation(false), + ) +} + +type runResult struct { + TaskID string `json:"task_id"` + TaskName string `json:"task_name"` + WorkflowID string `json:"workflow_id"` + Branch string `json:"branch"` + PipelineFile string `json:"pipeline_file"` + TriggeredAt string `json:"triggered_at"` +} + +func runHandler(api internalapi.Provider) server.ToolHandlerFunc { + return func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { + orgIDRaw, err := req.RequireString("organization_id") + if err != nil { + return mcp.NewToolResultError(`Missing required argument: organization_id. Provide the organization UUID.`), nil + } + orgID := strings.TrimSpace(orgIDRaw) + if err := shared.ValidateUUID(orgID, "organization_id"); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + if err := shared.EnsureWriteToolsFeature(ctx, api, orgID); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + tracker := shared.TrackToolExecution(ctx, runToolName, orgID) + defer tracker.Cleanup() + + client := api.Scheduler() + if client == nil { + return mcp.NewToolResultError(missingSchedulerError), nil + } + + taskIDRaw, err := req.RequireString("task_id") + if err != nil { + return mcp.NewToolResultError(`Missing required argument: task_id. Provide the task UUID (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).`), nil + } + taskID := strings.TrimSpace(taskIDRaw) + if err := shared.ValidateUUID(taskID, "task_id"); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + projectIDRaw, err := req.RequireString("project_id") + if err != nil { + return mcp.NewToolResultError(`Missing required argument: project_id. Provide the project UUID (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).`), nil + } + projectID := strings.TrimSpace(projectIDRaw) + if err := shared.ValidateUUID(projectID, "project_id"); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + userID := strings.ToLower(strings.TrimSpace(req.Header.Get("X-Semaphore-User-ID"))) + if err := shared.ValidateUUID(userID, "x-semaphore-user-id header"); err != nil { + return mcp.NewToolResultError(fmt.Sprintf(`%v + +The authentication layer must inject the X-Semaphore-User-ID header so we can authorize task runs.`, err)), nil + } + + if err := authz.CheckProjectPermission(ctx, api, userID, orgID, projectID, schedulerRunPermission); err != nil { + return shared.ProjectAuthorizationError(err, orgID, projectID, schedulerRunPermission), nil + } + + branch, err := shared.SanitizeBranch(req.GetString("branch", ""), "branch") + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + pipelineFile := strings.TrimSpace(req.GetString("pipeline_file", "")) + if err := validatePipelineFile(pipelineFile, "pipeline_file"); err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + parameters, err := extractParameters(req.GetArguments()["parameters"]) + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + pbParams, err := buildParameters(parameters) + if err != nil { + return mcp.NewToolResultError(err.Error()), nil + } + + request := &schedulerpb.RunNowRequest{ + Id: taskID, + OrganizationId: orgID, + ProjectId: projectID, + RequesterId: userID, + Branch: branch, + PipelineFile: pipelineFile, + Parameters: pbParams, + } + + callCtx, cancel := context.WithTimeout(ctx, api.CallTimeout()) + defer cancel() + + resp, err := client.RunNow(callCtx, request) + if err != nil { + logging.ForComponent("rpc"). + WithFields(logrus.Fields{ + "rpc": "scheduler.RunNow", + "taskId": taskID, + "projectId": projectID, + "orgId": orgID, + }). + WithError(err). + Error("scheduler RunNow RPC failed") + return mcp.NewToolResultError(fmt.Sprintf(`Task run failed: %v + +Possible causes: +- Task does not exist or is suspended +- Project repository configuration is invalid +- Internal scheduler service is unavailable (retry shortly)`, err)), nil + } + + if err := shared.CheckStatus(resp.GetStatus()); err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Task run failed: %v", err)), nil + } + + result := runResult{ + TaskID: resp.GetPeriodicId(), + TaskName: resp.GetPeriodicName(), + WorkflowID: resp.GetWorkflowId(), + Branch: resp.GetBranch(), + PipelineFile: resp.GetPipelineFile(), + TriggeredAt: shared.FormatTimestamp(resp.GetTriggeredAt()), + } + + markdown := formatRunMarkdown(result) + markdown = shared.TruncateResponse(markdown, shared.MaxResponseChars) + + tracker.MarkSuccess() + return &mcp.CallToolResult{ + Content: []mcp.Content{mcp.NewTextContent(markdown)}, + StructuredContent: result, + }, nil + } +} + +func validatePipelineFile(value, field string) error { + value = strings.TrimSpace(value) + if value == "" { + return nil + } + length := utf8.RuneCountInString(value) + if length > 512 { + return fmt.Errorf("%s must not exceed 512 characters", field) + } + for _, r := range value { + if r < 32 || r == 127 { + return fmt.Errorf("%s contains control characters", field) + } + if r == '\\' { + return fmt.Errorf("%s must not contain backslashes", field) + } + } + if strings.Contains(value, "..") { + return fmt.Errorf("%s must not contain '..' sequences", field) + } + if strings.HasPrefix(value, "/") { + return fmt.Errorf("%s must be a relative path", field) + } + return nil +} + +func extractParameters(raw any) (map[string]any, error) { + if raw == nil { + return nil, nil + } + params, ok := raw.(map[string]any) + if !ok { + return nil, fmt.Errorf("parameters must be a key/value map with string keys") + } + return params, nil +} + +func buildParameters(params map[string]any) ([]*schedulerpb.Parameter, error) { + if len(params) == 0 { + return nil, nil + } + names := make([]string, 0, len(params)) + for name := range params { + names = append(names, name) + } + sort.Strings(names) + result := make([]*schedulerpb.Parameter, 0, len(names)) + for _, name := range names { + clean := strings.TrimSpace(name) + if clean == "" { + return nil, fmt.Errorf("parameter names must not be empty") + } + if err := validateParameterName(clean); err != nil { + return nil, err + } + value, err := parameterValueToString(params[name]) + if err != nil { + return nil, err + } + result = append(result, &schedulerpb.Parameter{Name: clean, Value: value}) + } + return result, nil +} + +func validateParameterName(name string) error { + if utf8.RuneCountInString(name) > 128 { + return fmt.Errorf("parameter names must not exceed 128 characters") + } + for _, r := range name { + if r < 32 || r == 127 { + return fmt.Errorf("parameter %q contains control characters", name) + } + } + // Parameter names should start with a letter or underscore + if len(name) > 0 { + first := rune(name[0]) + if !((first >= 'a' && first <= 'z') || (first >= 'A' && first <= 'Z') || first == '_') { + return fmt.Errorf("parameter %q must start with a letter or underscore", name) + } + } + return nil +} + +func parameterValueToString(value any) (string, error) { + switch v := value.(type) { + case nil: + return "", nil + case string: + return v, nil + case bool: + if v { + return "true", nil + } + return "false", nil + case float64: + return strconv.FormatFloat(v, 'f', -1, 64), nil + case int: + return strconv.Itoa(v), nil + case int32: + return strconv.FormatInt(int64(v), 10), nil + case int64: + return strconv.FormatInt(v, 10), nil + case uint32: + return strconv.FormatUint(uint64(v), 10), nil + case uint64: + return strconv.FormatUint(v, 10), nil + default: + return "", fmt.Errorf("parameters values must be strings, numbers, booleans, or null") + } +} + +func formatRunMarkdown(result runResult) string { + mb := shared.NewMarkdownBuilder() + mb.H1("Task Triggered") + + if result.TaskName != "" { + mb.KeyValue("Task Name", result.TaskName) + } + if result.TaskID != "" { + mb.KeyValue("Task ID", fmt.Sprintf("`%s`", result.TaskID)) + } + if result.WorkflowID != "" { + mb.KeyValue("Workflow ID", fmt.Sprintf("`%s`", result.WorkflowID)) + } + if result.Branch != "" { + mb.KeyValue("Branch", result.Branch) + } + if result.PipelineFile != "" { + mb.KeyValue("Pipeline File", result.PipelineFile) + } + if result.TriggeredAt != "" { + mb.KeyValue("Triggered At", result.TriggeredAt) + } + + return mb.String() +} diff --git a/mcp_server/pkg/tools/tasks/tasks_test.go b/mcp_server/pkg/tools/tasks/tasks_test.go new file mode 100644 index 000000000..851f3d628 --- /dev/null +++ b/mcp_server/pkg/tools/tasks/tasks_test.go @@ -0,0 +1,384 @@ +package tasks + +import ( + "context" + "net/http" + "strings" + "testing" + "time" + + "github.com/mark3labs/mcp-go/mcp" + "github.com/semaphoreio/semaphore/mcp_server/pkg/feature" + schedulerpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/periodic_scheduler" + statuspb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/status" + support "github.com/semaphoreio/semaphore/mcp_server/test/support" + + "google.golang.org/genproto/googleapis/rpc/code" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestListTasks_FeatureFlagDisabled(t *testing.T) { + req := mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "organization_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "project_id": "11111111-2222-3333-4444-555555555555", + }}} + header := http.Header{} + header.Set("X-Semaphore-User-ID", "99999999-aaaa-bbbb-cccc-dddddddddddd") + req.Header = header + + provider := &support.MockProvider{ + FeaturesService: support.FeatureClientStub{State: feature.Hidden}, + Timeout: time.Second, + SchedulerClient: &support.SchedulerClientStub{}, + RBACClient: support.NewRBACStub("project.scheduler.view"), + } + + res, err := listHandler(provider)(context.Background(), req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + msg := requireErrorText(t, res) + if !strings.Contains(strings.ToLower(msg), "disabled") { + t.Fatalf("expected disabled feature error, got %q", msg) + } +} + +func TestListTasks(t *testing.T) { + projectID := "11111111-2222-3333-4444-555555555555" + orgID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + + client := &support.SchedulerClientStub{ + ListResp: &schedulerpb.ListKeysetResponse{ + Status: &statuspb.Status{Code: code.Code_OK}, + Periodics: []*schedulerpb.Periodic{ + { + Id: "task-123", + Name: "Nightly Build", + Description: "Runs every night", + ProjectId: projectID, + OrganizationId: orgID, + Branch: "main", + PipelineFile: ".semaphore/nightly.yml", + Schedule: "0 0 * * *", + Paused: false, + Suspended: false, + UpdatedAt: timestamppb.New(time.Unix(1700000000, 0)), + }, + }, + NextPageToken: "cursor", + }, + } + + provider := &support.MockProvider{ + SchedulerClient: client, + Timeout: time.Second, + RBACClient: support.NewRBACStub("project.scheduler.view"), + } + + handler := listHandler(provider) + req := mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Arguments: map[string]any{ + "project_id": projectID, + "organization_id": orgID, + "limit": 10, + }, + }, + } + header := http.Header{} + header.Set("X-Semaphore-User-ID", "99999999-aaaa-bbbb-cccc-dddddddddddd") + req.Header = header + + res, err := handler(context.Background(), req) + if err != nil { + t.Fatalf("handler error: %v", err) + } + + result, ok := res.StructuredContent.(listResult) + if !ok { + t.Fatalf("unexpected structured content type: %T", res.StructuredContent) + } + + if len(result.Tasks) != 1 { + t.Fatalf("expected 1 task, got %d", len(result.Tasks)) + } + + task := result.Tasks[0] + if task.ID != "task-123" { + t.Fatalf("expected task ID 'task-123', got %q", task.ID) + } + if task.Name != "Nightly Build" { + t.Fatalf("expected task name 'Nightly Build', got %q", task.Name) + } + if task.Schedule != "0 0 * * *" { + t.Fatalf("expected schedule '0 0 * * *', got %q", task.Schedule) + } + + if result.NextCursor != "cursor" { + t.Fatalf("expected next cursor 'cursor', got %q", result.NextCursor) + } + + if client.LastList == nil { + t.Fatal("expected list request to be recorded") + } + if client.LastList.GetPageSize() != 10 { + t.Fatalf("expected page size 10, got %d", client.LastList.GetPageSize()) + } +} + +func TestDescribeTask(t *testing.T) { + taskID := "11111111-2222-3333-4444-555555555555" + orgID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + projectID := "66666666-7777-8888-9999-aaaaaaaaaaaa" + + client := &support.SchedulerClientStub{ + DescribeResp: &schedulerpb.DescribeResponse{ + Status: &statuspb.Status{Code: code.Code_OK}, + Periodic: &schedulerpb.Periodic{ + Id: taskID, + Name: "Nightly Build", + Description: "Runs every night", + ProjectId: projectID, + OrganizationId: orgID, + Branch: "main", + PipelineFile: ".semaphore/nightly.yml", + Schedule: "0 0 * * *", + CreatedAt: timestamppb.New(time.Unix(1700000000, 0)), + UpdatedAt: timestamppb.New(time.Unix(1700000000, 0)), + }, + RecentTriggers: []*schedulerpb.Trigger{ + { + TriggeredAt: timestamppb.New(time.Unix(1700000000, 0)), + WorkflowId: "wf-456", + Status: schedulerpb.TriggerStatus_TRIGGER_STATUS_PASSED, + Branch: "main", + PipelineFile: ".semaphore/nightly.yml", + }, + }, + }, + } + + provider := &support.MockProvider{ + SchedulerClient: client, + Timeout: time.Second, + RBACClient: support.NewRBACStub("project.scheduler.view"), + } + + handler := describeHandler(provider) + req := mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Arguments: map[string]any{ + "task_id": taskID, + "organization_id": orgID, + "mode": "detailed", + }, + }, + } + header := http.Header{} + header.Set("X-Semaphore-User-ID", "99999999-aaaa-bbbb-cccc-dddddddddddd") + req.Header = header + + res, err := handler(context.Background(), req) + if err != nil { + t.Fatalf("handler error: %v", err) + } + + if res.IsError { + msg := "" + if len(res.Content) > 0 { + if text, ok := res.Content[0].(mcp.TextContent); ok { + msg = text.Text + } + } + t.Fatalf("unexpected error result: %s", msg) + } + + result, ok := res.StructuredContent.(describeResult) + if !ok { + t.Fatalf("unexpected structured content type: %T", res.StructuredContent) + } + + if result.Task.ID != taskID { + t.Fatalf("expected task ID %q, got %q", taskID, result.Task.ID) + } + if result.Task.Name != "Nightly Build" { + t.Fatalf("expected task name 'Nightly Build', got %q", result.Task.Name) + } + + if len(result.RecentTriggers) != 1 { + t.Fatalf("expected 1 trigger, got %d", len(result.RecentTriggers)) + } + + if result.RecentTriggers[0].WorkflowID != "wf-456" { + t.Fatalf("expected workflow ID 'wf-456', got %q", result.RecentTriggers[0].WorkflowID) + } +} + +func TestRunTask(t *testing.T) { + taskID := "11111111-2222-3333-4444-555555555555" + projectID := "66666666-7777-8888-9999-aaaaaaaaaaaa" + orgID := "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + + client := &support.SchedulerClientStub{ + RunNowResp: &schedulerpb.RunNowResponse{ + Status: &statuspb.Status{Code: code.Code_OK}, + WorkflowId: "bbbbbbbb-cccc-dddd-eeee-ffffffffffff", + PeriodicId: taskID, + PeriodicName: "Nightly Build", + Branch: "main", + PipelineFile: ".semaphore/nightly.yml", + TriggeredAt: timestamppb.New(time.Unix(1700000000, 0)), + }, + } + + provider := &support.MockProvider{ + SchedulerClient: client, + Timeout: time.Second, + FeaturesService: support.FeatureClientStub{State: feature.Enabled}, + RBACClient: support.NewRBACStub("project.scheduler.run_manually"), + } + + handler := runHandler(provider) + req := mcp.CallToolRequest{ + Params: mcp.CallToolParams{ + Arguments: map[string]any{ + "task_id": taskID, + "project_id": projectID, + "organization_id": orgID, + }, + }, + } + header := http.Header{} + header.Set("X-Semaphore-User-ID", "99999999-aaaa-bbbb-cccc-dddddddddddd") + req.Header = header + + res, err := handler(context.Background(), req) + if err != nil { + t.Fatalf("handler error: %v", err) + } + + if res.IsError { + msg := "" + if len(res.Content) > 0 { + if text, ok := res.Content[0].(mcp.TextContent); ok { + msg = text.Text + } + } + t.Fatalf("unexpected error result: %s", msg) + } + + result, ok := res.StructuredContent.(runResult) + if !ok { + t.Fatalf("unexpected structured content type: %T", res.StructuredContent) + } + + if result.TaskID != taskID { + t.Fatalf("expected task ID %q, got %q", taskID, result.TaskID) + } + if result.WorkflowID != "bbbbbbbb-cccc-dddd-eeee-ffffffffffff" { + t.Fatalf("expected workflow ID 'bbbbbbbb-cccc-dddd-eeee-ffffffffffff', got %q", result.WorkflowID) + } + + if client.LastRunNow == nil { + t.Fatal("expected RunNow request to be recorded") + } + if client.LastRunNow.GetId() != taskID { + t.Fatalf("expected task ID %q in request, got %q", taskID, client.LastRunNow.GetId()) + } +} + +func TestRunTask_WriteFeatureDisabled(t *testing.T) { + req := mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "task_id": "11111111-2222-3333-4444-555555555555", + "project_id": "66666666-7777-8888-9999-aaaaaaaaaaaa", + "organization_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + }}} + header := http.Header{} + header.Set("X-Semaphore-User-ID", "99999999-aaaa-bbbb-cccc-dddddddddddd") + req.Header = header + + provider := &support.MockProvider{ + FeaturesService: support.FeatureClientStub{State: feature.Hidden}, + Timeout: time.Second, + SchedulerClient: &support.SchedulerClientStub{}, + RBACClient: support.NewRBACStub("project.scheduler.run_manually"), + } + + res, err := runHandler(provider)(context.Background(), req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + msg := requireErrorText(t, res) + if !strings.Contains(strings.ToLower(msg), "disabled") { + t.Fatalf("expected disabled feature error, got %q", msg) + } +} + +func TestListTasks_MissingProjectID(t *testing.T) { + req := mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "organization_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + }}} + header := http.Header{} + header.Set("X-Semaphore-User-ID", "99999999-aaaa-bbbb-cccc-dddddddddddd") + req.Header = header + + provider := &support.MockProvider{ + Timeout: time.Second, + SchedulerClient: &support.SchedulerClientStub{}, + RBACClient: support.NewRBACStub("project.scheduler.view"), + } + + res, err := listHandler(provider)(context.Background(), req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + msg := requireErrorText(t, res) + if !strings.Contains(msg, "project_id") { + t.Fatalf("expected error mentioning project_id, got %q", msg) + } +} + +func TestDescribeTask_InvalidTaskID(t *testing.T) { + req := mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "task_id": "invalid-uuid", + "organization_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + }}} + header := http.Header{} + header.Set("X-Semaphore-User-ID", "99999999-aaaa-bbbb-cccc-dddddddddddd") + req.Header = header + + provider := &support.MockProvider{ + Timeout: time.Second, + SchedulerClient: &support.SchedulerClientStub{}, + RBACClient: support.NewRBACStub("project.scheduler.view"), + } + + res, err := describeHandler(provider)(context.Background(), req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + msg := requireErrorText(t, res) + if !strings.Contains(msg, "UUID") { + t.Fatalf("expected UUID validation error, got %q", msg) + } +} + +// --- test helpers --- + +func requireErrorText(t *testing.T, res *mcp.CallToolResult) string { + t.Helper() + if res == nil { + t.Fatalf("expected tool result") + } + if !res.IsError { + t.Fatalf("expected error result, got success") + } + if len(res.Content) == 0 { + t.Fatalf("expected error content") + } + text, ok := res.Content[0].(mcp.TextContent) + if !ok { + t.Fatalf("expected text content, got %T", res.Content[0]) + } + return text.Text +} diff --git a/mcp_server/test/support/mock_provider.go b/mcp_server/test/support/mock_provider.go index d8de00ebe..bdfa90c6c 100644 --- a/mcp_server/test/support/mock_provider.go +++ b/mcp_server/test/support/mock_provider.go @@ -8,6 +8,7 @@ import ( loghubpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/loghub" loghub2pb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/loghub2" orgpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/organization" + schedulerpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/periodic_scheduler" pipelinepb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/plumber.pipeline" workflowpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/plumber_w_f.workflow" projecthubpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/projecthub" @@ -29,6 +30,7 @@ type MockProvider struct { Loghub2Client loghub2pb.Loghub2Client UserClient userpb.UserServiceClient RBACClient rbacpb.RBACClient + SchedulerClient schedulerpb.PeriodicSchedulerClient FeaturesService featuresvc.FeatureClient Timeout time.Duration } @@ -66,6 +68,8 @@ func (m *MockProvider) Users() userpb.UserServiceClient { return m.UserClient } func (m *MockProvider) RBAC() rbacpb.RBACClient { return m.RBACClient } +func (m *MockProvider) Scheduler() schedulerpb.PeriodicSchedulerClient { return m.SchedulerClient } + func (m *MockProvider) Features() featuresvc.FeatureClient { if m.FeaturesService == nil { return alwaysEnabledFeatureClient{} diff --git a/mcp_server/test/support/stubs.go b/mcp_server/test/support/stubs.go index 069311c59..1c3d929a7 100644 --- a/mcp_server/test/support/stubs.go +++ b/mcp_server/test/support/stubs.go @@ -13,6 +13,7 @@ import ( loghubpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/loghub" loghub2pb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/loghub2" orgpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/organization" + schedulerpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/periodic_scheduler" pipelinepb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/plumber.pipeline" workflowpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/plumber_w_f.workflow" projecthubpb "github.com/semaphoreio/semaphore/mcp_server/pkg/internal_api/projecthub" @@ -42,6 +43,7 @@ func New() internalapi.Provider { loghub2: &loghub2Stub{}, users: &UserClientStub{}, rbac: &RBACStub{}, + scheduler: &SchedulerClientStub{}, features: &featureStub{}, } } @@ -58,6 +60,7 @@ type provider struct { loghub2 loghub2pb.Loghub2Client users userpb.UserServiceClient rbac rbacpb.RBACClient + scheduler schedulerpb.PeriodicSchedulerClient features featuresvc.FeatureClient } @@ -85,6 +88,8 @@ func (p *provider) Users() userpb.UserServiceClient { return p.users } func (p *provider) RBAC() rbacpb.RBACClient { return p.rbac } +func (p *provider) Scheduler() schedulerpb.PeriodicSchedulerClient { return p.scheduler } + func (p *provider) Features() featuresvc.FeatureClient { return p.features } // --- pipeline stub --- @@ -644,3 +649,105 @@ func (u *UserClientStub) DescribeByRepositoryProvider(ctx context.Context, in *u } return &userpb.User{Id: fmt.Sprintf("user-%s", login)}, nil } + +// --- scheduler stub --- + +// SchedulerClientStub records scheduler RPC requests and returns configurable responses. +type SchedulerClientStub struct { + schedulerpb.PeriodicSchedulerClient + + ListResp *schedulerpb.ListKeysetResponse + ListErr error + LastList *schedulerpb.ListKeysetRequest + DescribeResp *schedulerpb.DescribeResponse + DescribeErr error + LastDescribe *schedulerpb.DescribeRequest + RunNowResp *schedulerpb.RunNowResponse + RunNowErr error + LastRunNow *schedulerpb.RunNowRequest +} + +func (s *SchedulerClientStub) ListKeyset(ctx context.Context, in *schedulerpb.ListKeysetRequest, opts ...grpc.CallOption) (*schedulerpb.ListKeysetResponse, error) { + s.LastList = in + if s.ListErr != nil { + return nil, s.ListErr + } + if s.ListResp != nil { + return s.ListResp, nil + } + return &schedulerpb.ListKeysetResponse{ + Status: &statuspb.Status{Code: code.Code_OK}, + Periodics: []*schedulerpb.Periodic{ + { + Id: "task-local", + Name: "Nightly Build", + Description: "Runs nightly builds", + ProjectId: orDefault(in.GetProjectId(), "project-local"), + OrganizationId: orDefault(in.GetOrganizationId(), "org-local"), + Branch: "main", + PipelineFile: ".semaphore/nightly.yml", + Schedule: "0 0 * * *", + Paused: false, + Suspended: false, + CreatedAt: timestamppb.New(time.Unix(1_700_000_000, 0)), + UpdatedAt: timestamppb.New(time.Unix(1_700_000_000, 0)), + }, + }, + NextPageToken: "", + }, nil +} + +func (s *SchedulerClientStub) Describe(ctx context.Context, in *schedulerpb.DescribeRequest, opts ...grpc.CallOption) (*schedulerpb.DescribeResponse, error) { + s.LastDescribe = in + if s.DescribeErr != nil { + return nil, s.DescribeErr + } + if s.DescribeResp != nil { + return s.DescribeResp, nil + } + return &schedulerpb.DescribeResponse{ + Status: &statuspb.Status{Code: code.Code_OK}, + Periodic: &schedulerpb.Periodic{ + Id: orDefault(in.GetId(), "task-local"), + Name: "Nightly Build", + Description: "Runs nightly builds", + ProjectId: "project-local", + OrganizationId: orDefault(in.GetOrganizationId(), "org-local"), + Branch: "main", + PipelineFile: ".semaphore/nightly.yml", + Schedule: "0 0 * * *", + Paused: false, + Suspended: false, + CreatedAt: timestamppb.New(time.Unix(1_700_000_000, 0)), + UpdatedAt: timestamppb.New(time.Unix(1_700_000_000, 0)), + }, + RecentTriggers: []*schedulerpb.Trigger{ + { + TriggeredAt: timestamppb.New(time.Unix(1_700_000_000, 0)), + WorkflowId: "wf-local", + Status: schedulerpb.TriggerStatus_TRIGGER_STATUS_PASSED, + Branch: "main", + PipelineFile: ".semaphore/nightly.yml", + }, + }, + }, nil +} + +func (s *SchedulerClientStub) RunNow(ctx context.Context, in *schedulerpb.RunNowRequest, opts ...grpc.CallOption) (*schedulerpb.RunNowResponse, error) { + s.LastRunNow = in + if s.RunNowErr != nil { + return nil, s.RunNowErr + } + if s.RunNowResp != nil { + return s.RunNowResp, nil + } + return &schedulerpb.RunNowResponse{ + Status: &statuspb.Status{Code: code.Code_OK}, + WorkflowId: "wf-triggered", + PeriodicId: orDefault(in.GetId(), "task-local"), + PeriodicName: "Nightly Build", + Branch: orDefault(in.GetBranch(), "main"), + PipelineFile: orDefault(in.GetPipelineFile(), ".semaphore/nightly.yml"), + TriggeredAt: timestamppb.New(time.Now()), + }, nil +} From 099e04b7b6d87f3a5f9817cdab2b5830235e0818 Mon Sep 17 00:00:00 2001 From: Corey Christous Date: Wed, 4 Feb 2026 20:57:22 -0500 Subject: [PATCH 2/2] fix: use utf8.DecodeRuneInString for parameter name validation The previous code used rune(name[0]) which only reads the first byte, incorrectly interpreting multi-byte UTF-8 characters. This fix properly decodes the first rune using utf8.DecodeRuneInString. Added comprehensive tests for validateParameterName covering valid names, invalid starts (numbers, special chars, multi-byte UTF-8), control characters, and length validation. --- mcp_server/pkg/tools/tasks/run.go | 2 +- mcp_server/pkg/tools/tasks/tasks_test.go | 48 ++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/mcp_server/pkg/tools/tasks/run.go b/mcp_server/pkg/tools/tasks/run.go index 979ed99e1..78c70dad3 100644 --- a/mcp_server/pkg/tools/tasks/run.go +++ b/mcp_server/pkg/tools/tasks/run.go @@ -301,7 +301,7 @@ func validateParameterName(name string) error { } // Parameter names should start with a letter or underscore if len(name) > 0 { - first := rune(name[0]) + first, _ := utf8.DecodeRuneInString(name) if !((first >= 'a' && first <= 'z') || (first >= 'A' && first <= 'Z') || first == '_') { return fmt.Errorf("parameter %q must start with a letter or underscore", name) } diff --git a/mcp_server/pkg/tools/tasks/tasks_test.go b/mcp_server/pkg/tools/tasks/tasks_test.go index 851f3d628..5d2a55bd1 100644 --- a/mcp_server/pkg/tools/tasks/tasks_test.go +++ b/mcp_server/pkg/tools/tasks/tasks_test.go @@ -363,6 +363,54 @@ func TestDescribeTask_InvalidTaskID(t *testing.T) { } } +func TestValidateParameterName(t *testing.T) { + tests := []struct { + name string + input string + wantErr bool + }{ + // Valid names + {name: "lowercase letter start", input: "myParam", wantErr: false}, + {name: "uppercase letter start", input: "MyParam", wantErr: false}, + {name: "underscore start", input: "_myParam", wantErr: false}, + {name: "single letter", input: "x", wantErr: false}, + {name: "single underscore", input: "_", wantErr: false}, + + // Invalid: starts with number + {name: "number start", input: "1param", wantErr: true}, + + // Invalid: starts with special character + {name: "hyphen start", input: "-param", wantErr: true}, + {name: "dot start", input: ".param", wantErr: true}, + + // Invalid: starts with multi-byte UTF-8 character + // These test the utf8.DecodeRuneInString fix - the old code using + // rune(name[0]) would incorrectly interpret multi-byte characters + {name: "emoji start", input: "🚀param", wantErr: true}, + {name: "chinese char start", input: "中param", wantErr: true}, + {name: "accented char start", input: "éparam", wantErr: true}, + + // Invalid: control characters + {name: "tab in name", input: "my\tparam", wantErr: true}, + {name: "newline in name", input: "my\nparam", wantErr: true}, + + // Invalid: too long (over 128 characters) + {name: "too long", input: strings.Repeat("a", 129), wantErr: true}, + + // Valid: exactly 128 characters + {name: "max length", input: strings.Repeat("a", 128), wantErr: false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := validateParameterName(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("validateParameterName(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr) + } + }) + } +} + // --- test helpers --- func requireErrorText(t *testing.T, res *mcp.CallToolResult) string {