From 27a8f7a14125074fbb7dd54c01d08cf3f8d260e0 Mon Sep 17 00:00:00 2001 From: raghavyuva Date: Thu, 18 Sep 2025 00:43:39 +0530 Subject: [PATCH] feat: cluster based deployment, rollback, restart across services, and more methods wrapper for future integrations for multi server management --- .../features/deploy/docker/cluster.go | 188 ++++++++++++ api/internal/features/deploy/docker/init.go | 85 ++++-- api/internal/features/deploy/tasks/delete.go | 32 ++- api/internal/features/deploy/tasks/restart.go | 137 +++++---- .../features/deploy/tasks/rollback.go | 146 +++++----- api/internal/features/deploy/tasks/run.go | 270 +++++++++--------- 6 files changed, 553 insertions(+), 305 deletions(-) create mode 100644 api/internal/features/deploy/docker/cluster.go diff --git a/api/internal/features/deploy/docker/cluster.go b/api/internal/features/deploy/docker/cluster.go new file mode 100644 index 000000000..606f93163 --- /dev/null +++ b/api/internal/features/deploy/docker/cluster.go @@ -0,0 +1,188 @@ +package docker + +import ( + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/api/types/volume" + "github.com/raghavyuva/nixopus-api/internal/config" +) + +func (s *DockerService) InitCluster() error { + config := config.AppConfig + + // Use localhost as default advertise address if SSH host is not configured + // Useful during development + advertiseAddr := "127.0.0.1:2377" + if config.SSH.Host != "" { + advertiseAddr = config.SSH.Host + ":2377" + } + + _, err := s.Cli.SwarmInit(s.Ctx, swarm.InitRequest{ + ListenAddr: "0.0.0.0:2377", + // Address that Hosts can use to reach the master node + AdvertiseAddr: advertiseAddr, + }) + if err != nil { + return err + } + + return nil +} + +func (s *DockerService) JoinCluster() error { + config := config.AppConfig + + // Use localhost as default advertise address if SSH host is not configured + advertiseAddr := "127.0.0.1:2377" + if config.SSH.Host != "" { + advertiseAddr = config.SSH.Host + ":2377" + } + + err := s.Cli.SwarmJoin(s.Ctx, swarm.JoinRequest{ + ListenAddr: "0.0.0.0:2377", + AdvertiseAddr: advertiseAddr, + }) + return err +} + +func (s *DockerService) LeaveCluster(force bool) error { + err := s.Cli.SwarmLeave(s.Ctx, force) + return err +} + +func (s *DockerService) GetClusterInfo() (swarm.ClusterInfo, error) { + clusterInfo, err := s.Cli.SwarmInspect(s.Ctx) + return clusterInfo.ClusterInfo, err +} + +func (s *DockerService) GetClusterNodes() ([]swarm.Node, error) { + nodes, err := s.Cli.NodeList(s.Ctx, types.NodeListOptions{}) + return nodes, err +} + +func (s *DockerService) GetClusterServices() ([]swarm.Service, error) { + services, err := s.Cli.ServiceList(s.Ctx, types.ServiceListOptions{}) + return services, err +} + +func (s *DockerService) GetClusterTasks() ([]swarm.Task, error) { + tasks, err := s.Cli.TaskList(s.Ctx, types.TaskListOptions{}) + return tasks, err +} + +func (s *DockerService) GetClusterSecrets() ([]swarm.Secret, error) { + secrets, err := s.Cli.SecretList(s.Ctx, types.SecretListOptions{}) + return secrets, err +} + +func (s *DockerService) GetClusterConfigs() ([]swarm.Config, error) { + configs, err := s.Cli.ConfigList(s.Ctx, types.ConfigListOptions{}) + return configs, err +} + +func (s *DockerService) GetClusterVolumes() ([]*volume.Volume, error) { + volumes, err := s.Cli.VolumeList(s.Ctx, volume.ListOptions{}) + return volumes.Volumes, err +} + +func (s *DockerService) GetClusterNetworks() ([]network.Summary, error) { + networks, err := s.Cli.NetworkList(s.Ctx, network.ListOptions{}) + return networks, err +} + +func (s *DockerService) UpdateNodeAvailability(nodeID string, availability swarm.NodeAvailability) error { + node, _, err := s.Cli.NodeInspectWithRaw(s.Ctx, nodeID) + if err != nil { + return err + } + spec := node.Spec + spec.Availability = availability + return s.Cli.NodeUpdate(s.Ctx, nodeID, node.Version, spec) +} + +func (s *DockerService) ListenEvents(opts events.ListOptions) (<-chan events.Message, <-chan error) { + return s.Cli.Events(s.Ctx, opts) +} + +func (s *DockerService) ScaleService(serviceID string, replicas uint64, rollback string) error { + svc, _, err := s.Cli.ServiceInspectWithRaw(s.Ctx, serviceID, types.ServiceInspectOptions{}) + if err != nil { + return err + } + spec := svc.Spec + spec.Mode.Replicated.Replicas = &replicas + _, err = s.Cli.ServiceUpdate(s.Ctx, serviceID, svc.Version, spec, types.ServiceUpdateOptions{ + Rollback: rollback, + }) + return err +} + +func (s *DockerService) GetServiceHealth(service swarm.Service) (int, int, error) { + tasks, err := s.Cli.TaskList(s.Ctx, types.TaskListOptions{ + Filters: filters.NewArgs( + filters.Arg("service", service.ID), + ), + }) + if err != nil { + return 0, 0, err + } + + running := 0 + for _, t := range tasks { + if t.Status.State == swarm.TaskStateRunning { + running++ + } + } + desired := 0 + if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil { + desired = int(*service.Spec.Mode.Replicated.Replicas) + } + return running, desired, nil +} + +func (s *DockerService) GetTaskHealth(task swarm.Task) swarm.TaskState { + if task.Status.State != "" { + return task.Status.State + } + return swarm.TaskState("") +} + +func (s *DockerService) CreateService(service swarm.Service) error { + _, err := s.Cli.ServiceCreate(s.Ctx, service.Spec, types.ServiceCreateOptions{}) + if err != nil { + return err + } + return nil +} + +func (s *DockerService) UpdateService(serviceID string, serviceSpec swarm.ServiceSpec, rollback string) error { + svc, _, err := s.Cli.ServiceInspectWithRaw(s.Ctx, serviceID, types.ServiceInspectOptions{}) + if err != nil { + return err + } + + _, err = s.Cli.ServiceUpdate(s.Ctx, serviceID, svc.Version, serviceSpec, types.ServiceUpdateOptions{ + Rollback: rollback, + }) + return err +} + +func (s *DockerService) DeleteService(serviceID string) error { + err := s.Cli.ServiceRemove(s.Ctx, serviceID) + return err +} + +func (s *DockerService) RollbackService(serviceID string) error { + _, err := s.Cli.ServiceUpdate(s.Ctx, serviceID, swarm.Version{}, swarm.ServiceSpec{}, types.ServiceUpdateOptions{ + Rollback: "previous", + }) + return err +} + +func (s *DockerService) GetServiceByID(serviceID string) (swarm.Service, error) { + service, _, err := s.Cli.ServiceInspectWithRaw(s.Ctx, serviceID, types.ServiceInspectOptions{}) + return service, err +} diff --git a/api/internal/features/deploy/docker/init.go b/api/internal/features/deploy/docker/init.go index 9b25b9c9b..c40c61899 100644 --- a/api/internal/features/deploy/docker/init.go +++ b/api/internal/features/deploy/docker/init.go @@ -7,9 +7,12 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/image" "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/api/types/volume" "github.com/docker/docker/client" v1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/raghavyuva/nixopus-api/internal/features/logger" @@ -23,9 +26,9 @@ type DockerService struct { } type DockerRepository interface { - ListAllContainers() ([]container.Summary, error) - ListContainers(opts container.ListOptions) ([]container.Summary, error) - ListAllImages(opts image.ListOptions) []image.Summary + ListAllContainers() ([]container.Summary, error) + ListContainers(opts container.ListOptions) ([]container.Summary, error) + ListAllImages(opts image.ListOptions) []image.Summary StopContainer(containerID string, opts container.StopOptions) error RemoveContainer(containerID string, opts container.RemoveOptions) error @@ -46,6 +49,28 @@ type DockerRepository interface { RemoveImage(imageName string, opts image.RemoveOptions) error PruneBuildCache(opts types.BuildCachePruneOptions) error PruneImages(opts filters.Args) (image.PruneReport, error) + + InitCluster() error + JoinCluster() error + LeaveCluster(force bool) error + GetClusterInfo() (swarm.ClusterInfo, error) + GetClusterNodes() ([]swarm.Node, error) + GetClusterServices() ([]swarm.Service, error) + GetClusterTasks() ([]swarm.Task, error) + GetClusterSecrets() ([]swarm.Secret, error) + GetClusterConfigs() ([]swarm.Config, error) + GetClusterVolumes() ([]*volume.Volume, error) + GetClusterNetworks() ([]network.Summary, error) + UpdateNodeAvailability(nodeID string, availability swarm.NodeAvailability) error + ScaleService(serviceID string, replicas uint64, rollback string) error + ListenEvents(opts events.ListOptions) (<-chan events.Message, <-chan error) + GetServiceHealth(service swarm.Service) (int, int, error) + GetTaskHealth(task swarm.Task) swarm.TaskState + CreateService(service swarm.Service) error + UpdateService(serviceID string, serviceSpec swarm.ServiceSpec, rollback string) error + DeleteService(serviceID string) error + RollbackService(serviceID string) error + GetServiceByID(serviceID string) (swarm.Service, error) } type DockerClient struct { @@ -54,11 +79,35 @@ type DockerClient struct { // NewDockerService creates a new instance of DockerService using the default docker client. func NewDockerService() *DockerService { - return &DockerService{ - Cli: NewDockerClient(), + client := NewDockerClient() + service := &DockerService{ + Cli: client, Ctx: context.Background(), logger: logger.NewLogger(), } + + // Initialize cluster if not already initialized, this should be run on master node only + // TODO: Add a check to see if the node is the master node + // WARNING: This should be thought again during multi-server architecture feature + if !isClusterInitialized(client) { + if err := service.InitCluster(); err != nil { + service.logger.Log(logger.Warning, "Failed to initialize cluster", err.Error()) + } else { + service.logger.Log(logger.Info, "Cluster initialized successfully", "") + } + } else { + service.logger.Log(logger.Info, "Cluster already initialized", "") + } + + return service +} + +func isClusterInitialized(cli *client.Client) bool { + info, err := cli.Info(context.Background()) + if err != nil { + return false + } + return info.Swarm.LocalNodeState == swarm.LocalNodeStateActive } func NewDockerServiceWithClient(cli *client.Client, ctx context.Context, logger logger.Logger) *DockerService { @@ -101,24 +150,24 @@ func NewDockerClient() *client.Client { // // If an error occurs while listing the containers, it returns the error (no panic). func (s *DockerService) ListAllContainers() ([]container.Summary, error) { - containers, err := s.Cli.ContainerList(s.Ctx, container.ListOptions{ - All: true, - }) - if err != nil { - return nil, err - } - - return containers, nil + containers, err := s.Cli.ContainerList(s.Ctx, container.ListOptions{ + All: true, + }) + if err != nil { + return nil, err + } + + return containers, nil } // ListContainers returns containers using the provided docker list options // (including native filters like name/status/ancestor and optional limits). func (s *DockerService) ListContainers(opts container.ListOptions) ([]container.Summary, error) { - containers, err := s.Cli.ContainerList(s.Ctx, opts) - if err != nil { - return nil, err - } - return containers, nil + containers, err := s.Cli.ContainerList(s.Ctx, opts) + if err != nil { + return nil, err + } + return containers, nil } // StopContainer stops the container with the given ID. If the container does not exist, diff --git a/api/internal/features/deploy/tasks/delete.go b/api/internal/features/deploy/tasks/delete.go index afb9f10a8..5ba604fe5 100644 --- a/api/internal/features/deploy/tasks/delete.go +++ b/api/internal/features/deploy/tasks/delete.go @@ -5,7 +5,6 @@ import ( "os" "path/filepath" - "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/image" "github.com/google/uuid" "github.com/raghavyuva/nixopus-api/internal/features/deploy/types" @@ -13,7 +12,7 @@ import ( ) // DeleteDeployment deletes a deployment and its associated resources. -// It stops and removes the container, image, and repository. +// It stops and removes the service, image, and repository. // It returns an error if any operation fails. func (s *TaskService) DeleteDeployment(deployment *types.DeleteDeploymentRequest, userID uuid.UUID, organizationID uuid.UUID) error { application, err := s.Storage.GetApplicationById(deployment.ID.String(), organizationID) @@ -23,23 +22,28 @@ func (s *TaskService) DeleteDeployment(deployment *types.DeleteDeploymentRequest domain := application.Domain - deployments, err := s.Storage.GetApplicationDeployments(application.ID) + services, err := s.DockerRepo.GetClusterServices() if err != nil { - s.Logger.Log(logger.Error, "Failed to get application deployments", err.Error()) + s.Logger.Log(logger.Error, "Failed to get services", err.Error()) } else { - for _, dep := range deployments { - if dep.ContainerID != "" { - s.Logger.Log(logger.Info, "Stopping container", dep.ContainerID) - if err := s.DockerRepo.StopContainer(dep.ContainerID, container.StopOptions{}); err != nil { - s.Logger.Log(logger.Error, "Failed to stop container", err.Error()) - } - - s.Logger.Log(logger.Info, "Removing container", dep.ContainerID) - if err := s.DockerRepo.RemoveContainer(dep.ContainerID, container.RemoveOptions{Force: true}); err != nil { - s.Logger.Log(logger.Error, "Failed to remove container", err.Error()) + for _, service := range services { + if service.Spec.Annotations.Name == application.Name { + s.Logger.Log(logger.Info, "Deleting service", service.ID) + if err := s.DockerRepo.DeleteService(service.ID); err != nil { + s.Logger.Log(logger.Error, "Failed to delete service", err.Error()) + } else { + s.Logger.Log(logger.Info, "Service deleted successfully", service.ID) } + break } + } + } + deployments, err := s.Storage.GetApplicationDeployments(application.ID) + if err != nil { + s.Logger.Log(logger.Error, "Failed to get application deployments", err.Error()) + } else { + for _, dep := range deployments { if dep.ContainerImage != "" { s.Logger.Log(logger.Info, "Removing image", dep.ContainerImage) if err := s.DockerRepo.RemoveImage(dep.ContainerImage, image.RemoveOptions{Force: true}); err != nil { diff --git a/api/internal/features/deploy/tasks/restart.go b/api/internal/features/deploy/tasks/restart.go index 921455fbd..521d61300 100644 --- a/api/internal/features/deploy/tasks/restart.go +++ b/api/internal/features/deploy/tasks/restart.go @@ -1,70 +1,91 @@ package tasks import ( - "context" + "context" - "github.com/docker/docker/api/types/container" - "github.com/google/uuid" - "github.com/raghavyuva/nixopus-api/internal/features/deploy/types" - shared_types "github.com/raghavyuva/nixopus-api/internal/types" + "github.com/docker/docker/api/types/swarm" + "github.com/google/uuid" + "github.com/raghavyuva/nixopus-api/internal/features/deploy/types" + shared_types "github.com/raghavyuva/nixopus-api/internal/types" ) // RestartDeployment enqueues a restart task for an application deployment func (t *TaskService) RestartDeployment(request *types.RestartDeploymentRequest, userID uuid.UUID, organizationID uuid.UUID) error { - // Load target deployment and owning application - dep, err := t.Storage.GetApplicationDeploymentById(request.ID.String()) - if err != nil { - return err - } - - app, err := t.Storage.GetApplicationById(dep.ApplicationID.String(), organizationID) - if err != nil { - return err - } - - ctxTask := ContextTask{ - TaskService: t, - ContextConfig: request, - UserId: userID, - OrganizationId: organizationID, - Application: &app, - } - - payload, err := ctxTask.PrepareRestartContext() - if err != nil { - return err - } - - payload.CorrelationID = uuid.NewString() - - return RestartQueue.Add(TaskRestart.WithArgs(context.Background(), payload)) + dep, err := t.Storage.GetApplicationDeploymentById(request.ID.String()) + if err != nil { + return err + } + + app, err := t.Storage.GetApplicationById(dep.ApplicationID.String(), organizationID) + if err != nil { + return err + } + + ctxTask := ContextTask{ + TaskService: t, + ContextConfig: request, + UserId: userID, + OrganizationId: organizationID, + Application: &app, + } + + payload, err := ctxTask.PrepareRestartContext() + if err != nil { + return err + } + + payload.CorrelationID = uuid.NewString() + + return RestartQueue.Add(TaskRestart.WithArgs(context.Background(), payload)) } -// HandleRestart restarts currently running containers for the application and updates status/logs +// HandleRestart restarts currently running swarm service for the application and updates status/logs func (s *TaskService) HandleRestart(ctx context.Context, TaskPayload shared_types.TaskPayload) error { - taskCtx := s.NewTaskContext(TaskPayload) - - taskCtx.LogAndUpdateStatus("Restarting application containers", shared_types.Deploying) - - currentContainers, err := s.getRunningContainers(TaskPayload, taskCtx) - if err != nil { - taskCtx.LogAndUpdateStatus("Failed to list running containers: "+err.Error(), shared_types.Failed) - return err - } - - if len(currentContainers) == 0 { - taskCtx.LogAndUpdateStatus("No running containers found for application", shared_types.Failed) - return types.ErrContainerNotRunning - } - - for _, ctr := range currentContainers { - taskCtx.AddLog("Restarting container " + ctr.ID) - if err := s.DockerRepo.RestartContainer(ctr.ID, container.StopOptions{}); err != nil { - taskCtx.LogAndUpdateStatus("Failed to restart container: "+err.Error(), shared_types.Failed) - return err - } - } - - taskCtx.LogAndUpdateStatus("Application containers restarted", shared_types.Running) - return nil + taskCtx := s.NewTaskContext(TaskPayload) + + taskCtx.LogAndUpdateStatus("Restarting application service", shared_types.Deploying) + + // Find the existing service + existingService, err := s.getExistingService(TaskPayload, taskCtx) + if err != nil { + taskCtx.LogAndUpdateStatus("Failed to find service: "+err.Error(), shared_types.Failed) + return err + } + + if existingService == nil { + taskCtx.LogAndUpdateStatus("No running service found for application", shared_types.Failed) + return types.ErrContainerNotRunning + } + + taskCtx.AddLog("Restarting service " + existingService.ID) + + // Get current service spec + services, err := s.DockerRepo.GetClusterServices() + if err != nil { + taskCtx.LogAndUpdateStatus("Failed to get service details: "+err.Error(), shared_types.Failed) + return err + } + + var currentService swarm.Service + for _, service := range services { + if service.ID == existingService.ID { + currentService = service + break + } + } + + if currentService.ID == "" { + taskCtx.LogAndUpdateStatus("Service not found", shared_types.Failed) + return types.ErrContainerNotRunning + } + + // Note : Restart service by updating it with the same spec will restart the service so we don't need to specifically restart the services + err = s.DockerRepo.UpdateService(existingService.ID, currentService.Spec, "") + if err != nil { + taskCtx.LogAndUpdateStatus("Failed to restart service: "+err.Error(), shared_types.Failed) + return err + } + + taskCtx.LogAndUpdateStatus("Application service restarted", shared_types.Running) + return nil } diff --git a/api/internal/features/deploy/tasks/rollback.go b/api/internal/features/deploy/tasks/rollback.go index 5ada3d7ad..a505278d4 100644 --- a/api/internal/features/deploy/tasks/rollback.go +++ b/api/internal/features/deploy/tasks/rollback.go @@ -1,86 +1,84 @@ package tasks import ( - "context" + "context" + "time" - "github.com/google/uuid" - "github.com/raghavyuva/nixopus-api/internal/features/deploy/types" - shared_types "github.com/raghavyuva/nixopus-api/internal/types" + "github.com/google/uuid" + "github.com/raghavyuva/nixopus-api/internal/features/deploy/types" + shared_types "github.com/raghavyuva/nixopus-api/internal/types" ) // RollbackDeployment enqueues a rollback task to rebuild and deploy a previous commit func (t *TaskService) RollbackDeployment(request *types.RollbackDeploymentRequest, userID uuid.UUID, organizationID uuid.UUID) error { - // Find the target deployment and owning application - dep, err := t.Storage.GetApplicationDeploymentById(request.ID.String()) - if err != nil { - return err - } - - app, err := t.Storage.GetApplicationById(dep.ApplicationID.String(), organizationID) - if err != nil { - return err - } - - ctxTask := ContextTask{ - TaskService: t, - ContextConfig: request, - UserId: userID, - OrganizationId: organizationID, - Application: &app, - } - - payload, err := ctxTask.PrepareRollbackContext() - if err != nil { - return err - } - - payload.CorrelationID = uuid.NewString() - - return RollbackQueue.Add(TaskRollback.WithArgs(context.Background(), payload)) + dep, err := t.Storage.GetApplicationDeploymentById(request.ID.String()) + if err != nil { + return err + } + + app, err := t.Storage.GetApplicationById(dep.ApplicationID.String(), organizationID) + if err != nil { + return err + } + + ctxTask := ContextTask{ + TaskService: t, + ContextConfig: request, + UserId: userID, + OrganizationId: organizationID, + Application: &app, + } + + payload, err := ctxTask.PrepareRollbackContext() + if err != nil { + return err + } + + payload.CorrelationID = uuid.NewString() + + return RollbackQueue.Add(TaskRollback.WithArgs(context.Background(), payload)) } -// HandleRollback clones checkout at target commit, rebuilds and atomically updates container +// HandleRollback uses Docker Swarm's native rollback capability for instant rollback func (s *TaskService) HandleRollback(ctx context.Context, TaskPayload shared_types.TaskPayload) error { - taskCtx := s.NewTaskContext(TaskPayload) - - taskCtx.LogAndUpdateStatus("Starting rollback process", shared_types.Cloning) - - repoPath, err := s.Clone(CloneConfig{ - TaskPayload: TaskPayload, - DeploymentType: string(shared_types.DeploymentTypeRollback), - TaskContext: taskCtx, - }) - if err != nil { - taskCtx.LogAndUpdateStatus("Failed to clone repository: "+err.Error(), shared_types.Failed) - return err - } - - taskCtx.LogAndUpdateStatus("Repository cloned successfully", shared_types.Building) - taskCtx.AddLog("Building image from Dockerfile " + repoPath + " for application " + TaskPayload.Application.Name) - - buildImageResult, err := s.BuildImage(BuildConfig{ - TaskPayload: TaskPayload, - ContextPath: repoPath, - Force: false, - ForceWithoutCache: false, - TaskContext: taskCtx, - }) - if err != nil { - taskCtx.LogAndUpdateStatus("Failed to build image: "+err.Error(), shared_types.Failed) - return err - } - - taskCtx.AddLog("Image built successfully: " + buildImageResult + " for application " + TaskPayload.Application.Name) - taskCtx.UpdateStatus(shared_types.Deploying) - - containerResult, err := s.AtomicUpdateContainer(TaskPayload, taskCtx) - if err != nil { - taskCtx.LogAndUpdateStatus("Failed to update container: "+err.Error(), shared_types.Failed) - return err - } - - taskCtx.AddLog("Container updated successfully for application " + TaskPayload.Application.Name + " with container id " + containerResult.ContainerID) - taskCtx.LogAndUpdateStatus("Rollback completed successfully", shared_types.Deployed) - - return nil + taskCtx := s.NewTaskContext(TaskPayload) + + taskCtx.LogAndUpdateStatus("Starting native swarm rollback", shared_types.Deploying) + + serviceID := TaskPayload.ApplicationDeployment.ContainerID + if serviceID == "" { + taskCtx.LogAndUpdateStatus("No service ID found in deployment record", shared_types.Failed) + return types.ErrContainerNotRunning + } + + taskCtx.AddLog("Rolling back service " + serviceID + " using Docker Swarm native rollback") + + err := s.DockerRepo.RollbackService(serviceID) + if err != nil { + taskCtx.LogAndUpdateStatus("Failed to rollback service: "+err.Error(), shared_types.Failed) + return err + } + + // Wait for rollback to complete + time.Sleep(time.Second * 5) + + serviceInfo, err := s.DockerRepo.GetServiceByID(serviceID) + if err != nil { + taskCtx.LogAndUpdateStatus("Failed to get service info after rollback: "+err.Error(), shared_types.Failed) + return err + } + + // Check service's health + if serviceInfo.Spec.Mode.Replicated != nil && serviceInfo.Spec.Mode.Replicated.Replicas != nil { + running, _, err := s.DockerRepo.GetServiceHealth(serviceInfo) + if err != nil || running < int(*serviceInfo.Spec.Mode.Replicated.Replicas) { + taskCtx.LogAndUpdateStatus("Service health check failed after rollback", shared_types.Failed) + return types.ErrFailedToUpdateContainer + } + } + + taskCtx.AddLog("Service rolled back successfully using native swarm rollback") + taskCtx.LogAndUpdateStatus("Rollback completed successfully", shared_types.Deployed) + + return nil } diff --git a/api/internal/features/deploy/tasks/run.go b/api/internal/features/deploy/tasks/run.go index d5d8eabb7..95390db64 100644 --- a/api/internal/features/deploy/tasks/run.go +++ b/api/internal/features/deploy/tasks/run.go @@ -2,12 +2,11 @@ package tasks import ( "fmt" + "strconv" "strings" "time" - "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/network" - "github.com/docker/go-connections/nat" + "github.com/docker/docker/api/types/swarm" "github.com/raghavyuva/nixopus-api/internal/features/deploy/types" "github.com/raghavyuva/nixopus-api/internal/features/logger" "github.com/raghavyuva/nixopus-api/internal/features/ssh" @@ -53,45 +52,6 @@ func (s *TaskService) sanitizeEnvVars(envVars map[string]string) []string { return logEnvVars } -// prepareContainerConfig creates Docker container configuration -func (s *TaskService) prepareContainerConfig( - imageName string, - port nat.Port, - envVars []string, - applicationID string, -) container.Config { - return container.Config{ - Image: imageName, - Hostname: "nixopus", - ExposedPorts: nat.PortSet{ - port: struct{}{}, - }, - Env: envVars, - Labels: map[string]string{ - "com.docker.compose.project": "nixopus", - "com.docker.compose.version": "0.0.1", - "com.project.name": imageName, - "com.application.id": applicationID, - }, - } -} - -// prepareHostConfig creates Docker host configuration with port bindings -func (s *TaskService) prepareHostConfig(port nat.Port, availablePort string) container.HostConfig { - return container.HostConfig{ - NetworkMode: "bridge", - PortBindings: map[nat.Port][]nat.PortBinding{ - port: { - { - HostIP: "0.0.0.0", - HostPort: availablePort, - }, - }, - }, - PublishAllPorts: true, - } -} - func (s *TaskService) getAvailablePort() (string, error) { ssh := ssh.NewSSH() client, err := ssh.Connect() @@ -119,64 +79,6 @@ func (s *TaskService) getAvailablePort() (string, error) { return port, nil } -// prepareNetworkConfig creates Docker network configuration -func (s *TaskService) prepareNetworkConfig() network.NetworkingConfig { - return network.NetworkingConfig{ - EndpointsConfig: map[string]*network.EndpointSettings{ - "bridge": {}, - }, - } -} - -func (s *TaskService) getRunningContainers(r shared_types.TaskPayload, taskContext *TaskContext) ([]container.Summary, error) { - all_containers, err := s.DockerRepo.ListAllContainers() - if err != nil { - return nil, types.ErrFailedToListContainers - } - - var currentContainers []container.Summary - for _, ctr := range all_containers { - if ctr.Labels["com.application.id"] == r.Application.ID.String() { - currentContainers = append(currentContainers, ctr) - } - } - - s.formatLog(taskContext, "Found %d running containers", len(currentContainers)) - return currentContainers, nil -} - -func (s *TaskService) createContainerConfigs(r shared_types.TaskPayload, taskContext *TaskContext) (container.Config, container.HostConfig, network.NetworkingConfig, string) { - port_str := fmt.Sprintf("%d", r.Application.Port) - port, _ := nat.NewPort("tcp", port_str) - - var env_vars []string - for k, v := range GetMapFromString(r.Application.EnvironmentVariables) { - env_vars = append(env_vars, fmt.Sprintf("%s=%s", k, v)) - } - - logEnvVars := s.sanitizeEnvVars(GetMapFromString(r.Application.EnvironmentVariables)) - s.formatLog(taskContext, types.LogEnvironmentVariables, logEnvVars) - s.formatLog(taskContext, types.LogContainerExposingPort, port_str) - - container_config := s.prepareContainerConfig( - fmt.Sprintf("%s:latest", r.Application.Name), - port, - env_vars, - r.Application.ID.String(), - ) - - availablePort, err := s.getAvailablePort() - if err != nil { - taskContext.LogAndUpdateStatus("Failed to get available port: "+err.Error(), shared_types.Failed) - return container.Config{}, container.HostConfig{}, network.NetworkingConfig{}, "" - } - - host_config := s.prepareHostConfig(port, availablePort) - network_config := s.prepareNetworkConfig() - - return container_config, host_config, network_config, availablePort -} - // AtomicUpdateContainer performs a zero-downtime update of a running container func (s *TaskService) AtomicUpdateContainer(r shared_types.TaskPayload, taskContext *TaskContext) (AtomicUpdateContainerResult, error) { if r.Application.Name == "" { @@ -188,76 +90,162 @@ func (s *TaskService) AtomicUpdateContainer(r shared_types.TaskPayload, taskCont s.Logger.Log(logger.Info, types.LogUpdatingContainer, r.Application.Name) s.formatLog(taskContext, types.LogPreparingToUpdateContainer, r.Application.Name) - currentContainers, err := s.getRunningContainers(r, taskContext) + // Check if service already exists + existingService, err := s.getExistingService(r, taskContext) if err != nil { - taskContext.LogAndUpdateStatus("Failed to get running containers: "+err.Error(), shared_types.Failed) - return AtomicUpdateContainerResult{}, err + s.formatLog(taskContext, "No existing service found, creating new service", "") } - container_config, host_config, network_config, availablePort := s.createContainerConfigs(r, taskContext) + // Create service spec + serviceSpec, availablePort := s.createServiceSpec(r, taskContext) if availablePort == "" { taskContext.LogAndUpdateStatus("Failed to get available port", shared_types.Failed) return AtomicUpdateContainerResult{}, types.ErrFailedToGetAvailablePort } - s.formatLog(taskContext, types.LogCreatingNewContainer) - resp, err := s.DockerRepo.CreateContainer(container_config, host_config, network_config, "") - if err != nil { - taskContext.LogAndUpdateStatus("Failed to create container: "+err.Error(), shared_types.Failed) - return AtomicUpdateContainerResult{}, types.ErrFailedToCreateContainer - } - s.formatLog(taskContext, types.LogNewContainerCreated+" %s", resp.ID) - - for _, ctr := range currentContainers { - s.formatLog(taskContext, types.LogStoppingOldContainer+" %s", ctr.ID) - err = s.DockerRepo.StopContainer(ctr.ID, container.StopOptions{Timeout: intPtr(10)}) + if existingService != nil { + // Update existing service + s.formatLog(taskContext, "Updating existing service: %s", existingService.ID) + err = s.DockerRepo.UpdateService(existingService.ID, serviceSpec, "") if err != nil { - s.formatLog(taskContext, types.LogFailedToStopOldContainer, err.Error()) + taskContext.LogAndUpdateStatus("Failed to update service: "+err.Error(), shared_types.Failed) + return AtomicUpdateContainerResult{}, err } + s.formatLog(taskContext, "Service updated successfully: %s", existingService.ID) + } else { + // Create new service + s.formatLog(taskContext, "Creating new service") + err = s.DockerRepo.CreateService(swarm.Service{ + Spec: serviceSpec, + }) + if err != nil { + taskContext.LogAndUpdateStatus("Failed to create service: "+err.Error(), shared_types.Failed) + return AtomicUpdateContainerResult{}, err + } + s.formatLog(taskContext, "Service created successfully") } - s.formatLog(taskContext, types.LogStartingNewContainer) - err = s.DockerRepo.StartContainer(resp.ID, container.StartOptions{}) + // Wait for service to be ready + time.Sleep(time.Second * 10) + + // Get updated service info + serviceInfo, err := s.getServiceInfo(r, taskContext) if err != nil { - taskContext.LogAndUpdateStatus("Failed to start container: "+err.Error(), shared_types.Failed) - s.DockerRepo.RemoveContainer(resp.ID, container.RemoveOptions{Force: true}) - return AtomicUpdateContainerResult{}, types.ErrFailedToStartNewContainer + taskContext.LogAndUpdateStatus("Failed to get service info: "+err.Error(), shared_types.Failed) + return AtomicUpdateContainerResult{}, err } - s.formatLog(taskContext, types.LogNewContainerStartedSuccessfully) - - time.Sleep(time.Second * 5) - containerInfo, err := s.DockerRepo.GetContainerById(resp.ID) - if err != nil || containerInfo.State.Status != "running" { - taskContext.LogAndUpdateStatus("Container health check failed", shared_types.Failed) - s.DockerRepo.StopContainer(resp.ID, container.StopOptions{}) - s.DockerRepo.RemoveContainer(resp.ID, container.RemoveOptions{Force: true}) - return AtomicUpdateContainerResult{}, types.ErrFailedToUpdateContainer + // Check service health + if serviceInfo.Spec.Mode.Replicated != nil && serviceInfo.Spec.Mode.Replicated.Replicas != nil { + running, _, err := s.DockerRepo.GetServiceHealth(serviceInfo) + if err != nil || running < int(*serviceInfo.Spec.Mode.Replicated.Replicas) { + taskContext.LogAndUpdateStatus("Service health check failed", shared_types.Failed) + return AtomicUpdateContainerResult{}, types.ErrFailedToUpdateContainer + } } - taskContext.LogAndUpdateStatus("Container update completed successfully", shared_types.Deployed) + taskContext.LogAndUpdateStatus("Service update completed successfully", shared_types.Deployed) - r.ApplicationDeployment.ContainerID = resp.ID - r.ApplicationDeployment.ContainerName = containerInfo.Name - r.ApplicationDeployment.ContainerImage = containerInfo.Image + // Update deployment record + r.ApplicationDeployment.ContainerID = serviceInfo.ID + r.ApplicationDeployment.ContainerName = serviceInfo.Spec.Annotations.Name + r.ApplicationDeployment.ContainerImage = serviceInfo.Spec.TaskTemplate.ContainerSpec.Image r.ApplicationDeployment.ContainerStatus = "running" r.ApplicationDeployment.UpdatedAt = time.Now() taskContext.UpdateDeployment(&r.ApplicationDeployment) return AtomicUpdateContainerResult{ - ContainerID: resp.ID, - ContainerName: containerInfo.Name, - ContainerImage: containerInfo.Image, - ContainerStatus: containerInfo.State.Status, + ContainerID: serviceInfo.ID, + ContainerName: serviceInfo.Spec.Annotations.Name, + ContainerImage: serviceInfo.Spec.TaskTemplate.ContainerSpec.Image, + ContainerStatus: "running", UpdatedAt: time.Now(), AvailablePort: availablePort, }, nil } -// Helper function to create a pointer to an integer -func intPtr(i int) *int { - return &i +// getExistingService finds an existing swarm service for the application +func (s *TaskService) getExistingService(r shared_types.TaskPayload, taskContext *TaskContext) (*swarm.Service, error) { + services, err := s.DockerRepo.GetClusterServices() + if err != nil { + return nil, err + } + + for _, service := range services { + if service.Spec.Annotations.Name == r.Application.Name { + return &service, nil + } + } + return nil, nil +} + +// createServiceSpec creates a swarm service specification +func (s *TaskService) createServiceSpec(r shared_types.TaskPayload, taskContext *TaskContext) (swarm.ServiceSpec, string) { + availablePort, err := s.getAvailablePort() + if err != nil { + taskContext.LogAndUpdateStatus("Failed to get available port: "+err.Error(), shared_types.Failed) + return swarm.ServiceSpec{}, "" + } + + var env_vars []string + for k, v := range GetMapFromString(r.Application.EnvironmentVariables) { + env_vars = append(env_vars, fmt.Sprintf("%s=%s", k, v)) + } + + replicas := uint64(1) + port, _ := strconv.Atoi(availablePort) + + serviceSpec := swarm.ServiceSpec{ + Annotations: swarm.Annotations{ + Name: r.Application.Name, + }, + Mode: swarm.ServiceMode{ + Replicated: &swarm.ReplicatedService{ + Replicas: &replicas, + }, + }, + TaskTemplate: swarm.TaskSpec{ + ContainerSpec: &swarm.ContainerSpec{ + Image: fmt.Sprintf("%s:latest", r.Application.Name), + Env: env_vars, + Labels: map[string]string{ + "com.application.id": r.Application.ID.String(), + }, + }, + RestartPolicy: &swarm.RestartPolicy{ + Condition: swarm.RestartPolicyConditionAny, + }, + }, + EndpointSpec: &swarm.EndpointSpec{ + Mode: swarm.ResolutionModeVIP, + Ports: []swarm.PortConfig{ + { + Protocol: swarm.PortConfigProtocolTCP, + TargetPort: uint32(r.Application.Port), + PublishedPort: uint32(port), + PublishMode: swarm.PortConfigPublishModeHost, + }, + }, + }, + } + + return serviceSpec, availablePort +} + +// getServiceInfo retrieves service information +func (s *TaskService) getServiceInfo(r shared_types.TaskPayload, taskContext *TaskContext) (swarm.Service, error) { + services, err := s.DockerRepo.GetClusterServices() + if err != nil { + return swarm.Service{}, err + } + + for _, service := range services { + if service.Spec.Annotations.Name == r.Application.Name { + return service, nil + } + } + return swarm.Service{}, fmt.Errorf("service not found: %s", r.Application.Name) } // containsSensitiveKeyword checks if a key likely contains sensitive information