diff --git a/api/compose/api.go b/api/compose/api.go index b572c4a58..5efeb3700 100644 --- a/api/compose/api.go +++ b/api/compose/api.go @@ -184,6 +184,7 @@ type Stack struct { type LogConsumer interface { Log(service, container, message string) Status(service, container, msg string) + Register(service string, source string) } // ContainerEventListener is a callback to process ContainerEvent from services @@ -201,6 +202,8 @@ type ContainerEvent struct { const ( // ContainerEventLog is a ContainerEvent of type log. Line is set ContainerEventLog = iota + // ContainerEventAttach is a ContainerEvent of type attach. First event sent about a container + ContainerEventAttach // ContainerEventExit is a ContainerEvent of type exit. ExitCode is set ContainerEventExit ) diff --git a/cli/cmd/compose/logs.go b/cli/cmd/compose/logs.go index 684504359..b831b18f7 100644 --- a/cli/cmd/compose/logs.go +++ b/cli/cmd/compose/logs.go @@ -31,8 +31,10 @@ import ( type logsOptions struct { *projectOptions composeOptions - follow bool - tail string + follow bool + tail string + noColor bool + noPrefix bool } func logsCommand(p *projectOptions, contextType string) *cobra.Command { @@ -46,9 +48,13 @@ func logsCommand(p *projectOptions, contextType string) *cobra.Command { return runLogs(cmd.Context(), opts, args) }, } - logsCmd.Flags().BoolVar(&opts.follow, "follow", false, "Follow log output.") + flags := logsCmd.Flags() + flags.BoolVar(&opts.follow, "follow", false, "Follow log output.") + flags.BoolVar(&opts.noColor, "no-color", false, "Produce monochrome output.") + flags.BoolVar(&opts.noPrefix, "no-log-prefix", false, "Don't print prefix in logs.") + if contextType == store.DefaultContextType { - logsCmd.Flags().StringVar(&opts.tail, "tail", "all", "Number of lines to show from the end of the logs for each container.") + flags.StringVar(&opts.tail, "tail", "all", "Number of lines to show from the end of the logs for each container.") } return logsCmd } @@ -63,7 +69,7 @@ func runLogs(ctx context.Context, opts logsOptions, services []string) error { if err != nil { return err } - consumer := formatter.NewLogConsumer(ctx, os.Stdout) + consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix) return c.ComposeService().Logs(ctx, projectName, consumer, compose.LogOptions{ Services: services, Follow: opts.follow, diff --git a/cli/cmd/compose/start.go b/cli/cmd/compose/start.go index 1b0abb604..ed17ced89 100644 --- a/cli/cmd/compose/start.go +++ b/cli/cmd/compose/start.go @@ -28,7 +28,6 @@ import ( type startOptions struct { *projectOptions - Detach bool } func startCommand(p *projectOptions) *cobra.Command { @@ -42,8 +41,6 @@ func startCommand(p *projectOptions) *cobra.Command { return runStart(cmd.Context(), opts, args) }, } - - startCmd.Flags().BoolVarP(&opts.Detach, "detach", "d", false, "Detached mode: Run containers in the background") return startCmd } @@ -58,32 +55,8 @@ func runStart(ctx context.Context, opts startOptions, services []string) error { return err } - if opts.Detach { - _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { - return "", c.ComposeService().Start(ctx, project, compose.StartOptions{}) - }) - return err - } - - queue := make(chan compose.ContainerEvent) - printer := printer{ - queue: queue, - } - err = c.ComposeService().Start(ctx, project, compose.StartOptions{ - Attach: func(event compose.ContainerEvent) { - queue <- event - }, - }) - if err != nil { - return err - } - - _, err = printer.run(ctx, false, "", func() error { - ctx := context.Background() - _, err := progress.Run(ctx, func(ctx context.Context) (string, error) { - return "", c.ComposeService().Stop(ctx, project) - }) - return err + _, err = progress.Run(ctx, func(ctx context.Context) (string, error) { + return "", c.ComposeService().Start(ctx, project, compose.StartOptions{}) }) return err } diff --git a/cli/cmd/compose/up.go b/cli/cmd/compose/up.go index fc4f99dd0..4da5e1bc1 100644 --- a/cli/cmd/compose/up.go +++ b/cli/cmd/compose/up.go @@ -36,6 +36,7 @@ import ( "github.com/compose-spec/compose-go/types" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" ) // composeOptions hold options common to `up` and `run` to run compose project @@ -57,6 +58,8 @@ type upOptions struct { cascadeStop bool exitCodeFrom string scale []string + noColor bool + noPrefix bool } func (o upOptions) recreateStrategy() string { @@ -102,6 +105,8 @@ func upCommand(p *projectOptions, contextType string) *cobra.Command { flags.BoolVar(&opts.Build, "build", false, "Build images before starting containers.") flags.BoolVar(&opts.removeOrphans, "remove-orphans", false, "Remove containers for services not defined in the Compose file.") flags.StringArrayVar(&opts.scale, "scale", []string{}, "Scale SERVICE to NUM instances. Overrides the `scale` setting in the Compose file if present.") + flags.BoolVar(&opts.noColor, "no-color", false, "Produce monochrome output.") + flags.BoolVar(&opts.noPrefix, "no-log-prefix", false, "Don't print prefix in logs.") switch contextType { case store.AciContextType: @@ -199,6 +204,16 @@ func runCreateStart(ctx context.Context, opts upOptions, services []string) erro stopFunc() // nolint:errcheck }() + consumer := formatter.NewLogConsumer(ctx, os.Stdout, !opts.noColor, !opts.noPrefix) + + var exitCode int + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + code, err := printer.run(ctx, opts.cascadeStop, opts.exitCodeFrom, consumer, stopFunc) + exitCode = code + return err + }) + err = c.ComposeService().Start(ctx, project, compose.StartOptions{ Attach: func(event compose.ContainerEvent) { queue <- event @@ -208,7 +223,7 @@ func runCreateStart(ctx context.Context, opts upOptions, services []string) erro return err } - exitCode, err := printer.run(ctx, opts.cascadeStop, opts.exitCodeFrom, stopFunc) + err = eg.Wait() if exitCode != 0 { return cmd.ExitCodeError{ExitCode: exitCode} } @@ -298,27 +313,37 @@ type printer struct { queue chan compose.ContainerEvent } -func (p printer) run(ctx context.Context, cascadeStop bool, exitCodeFrom string, stopFn func() error) (int, error) { //nolint:unparam - consumer := formatter.NewLogConsumer(ctx, os.Stdout) +func (p printer) run(ctx context.Context, cascadeStop bool, exitCodeFrom string, consumer compose.LogConsumer, stopFn func() error) (int, error) { //nolint:unparam var aborting bool + var count int for { event := <-p.queue switch event.Type { + case compose.ContainerEventAttach: + consumer.Register(event.Service, event.Source) + count++ case compose.ContainerEventExit: if !aborting { consumer.Status(event.Service, event.Source, fmt.Sprintf("exited with code %d", event.ExitCode)) } - if cascadeStop && !aborting { - aborting = true - fmt.Println("Aborting on container exit...") - err := stopFn() - if err != nil { - return 0, err + if cascadeStop { + if !aborting { + aborting = true + fmt.Println("Aborting on container exit...") + err := stopFn() + if err != nil { + return 0, err + } + } + if exitCodeFrom == "" || exitCodeFrom == event.Service { + logrus.Error(event.ExitCode) + return event.ExitCode, nil } } - if exitCodeFrom == "" || exitCodeFrom == event.Service { - logrus.Error(event.ExitCode) - return event.ExitCode, nil + count-- + if count == 0 { + // Last container terminated, done + return 0, nil } case compose.ContainerEventLog: if !aborting { diff --git a/cli/formatter/colors.go b/cli/formatter/colors.go index 090a396b1..cf19f9a2e 100644 --- a/cli/formatter/colors.go +++ b/cli/formatter/colors.go @@ -35,6 +35,10 @@ var names = []string{ // colorFunc use ANSI codes to render colored text on console type colorFunc func(s string) string +var monochrome = func(s string) string { + return s +} + func ansiColor(code, s string) string { return fmt.Sprintf("%s%s%s", ansi(code), s, ansi("0")) } diff --git a/cli/formatter/logs.go b/cli/formatter/logs.go index 83eb4d4df..c5e2e0ab5 100644 --- a/cli/formatter/logs.go +++ b/cli/formatter/logs.go @@ -17,7 +17,6 @@ package formatter import ( - "bytes" "context" "fmt" "io" @@ -28,59 +27,91 @@ import ( ) // NewLogConsumer creates a new LogConsumer -func NewLogConsumer(ctx context.Context, w io.Writer) compose.LogConsumer { +func NewLogConsumer(ctx context.Context, w io.Writer, color bool, prefix bool) compose.LogConsumer { return &logConsumer{ - ctx: ctx, - colors: map[string]colorFunc{}, - width: 0, - writer: w, + ctx: ctx, + presenters: map[string]*presenter{}, + width: 0, + writer: w, + color: color, + prefix: prefix, } } +func (l *logConsumer) Register(service string, source string) { + l.register(service, source) +} + +func (l *logConsumer) register(service string, source string) *presenter { + cf := monochrome + if l.color { + cf = <-loop + } + p := &presenter{ + colors: cf, + service: service, + container: source, + } + l.presenters[source] = p + if l.prefix { + l.computeWidth() + for _, p := range l.presenters { + p.setPrefix(l.width) + } + } + return p +} + // Log formats a log message as received from service/container func (l *logConsumer) Log(service, container, message string) { if l.ctx.Err() != nil { return } - cf := l.getColorFunc(service) - prefix := fmt.Sprintf("%-"+strconv.Itoa(l.width)+"s |", container) - + p, ok := l.presenters[container] + if !ok { // should have been registered, but ¯\_(ツ)_/¯ + p = l.register(service, container) + } for _, line := range strings.Split(message, "\n") { - buf := bytes.NewBufferString(fmt.Sprintf("%s %s\n", cf(prefix), line)) - l.writer.Write(buf.Bytes()) // nolint:errcheck + fmt.Fprintf(l.writer, "%s %s\n", p.prefix, line) // nolint:errcheck } } func (l *logConsumer) Status(service, container, msg string) { - cf := l.getColorFunc(service) - buf := bytes.NewBufferString(cf(fmt.Sprintf("%s %s\n", container, msg))) - l.writer.Write(buf.Bytes()) // nolint:errcheck -} - -func (l *logConsumer) getColorFunc(service string) colorFunc { - cf, ok := l.colors[service] + p, ok := l.presenters[container] if !ok { - cf = <-loop - l.colors[service] = cf - l.computeWidth() + p = l.register(service, container) } - return cf + s := p.colors(fmt.Sprintf("%s %s\n", container, msg)) + l.writer.Write([]byte(s)) // nolint:errcheck } func (l *logConsumer) computeWidth() { width := 0 - for n := range l.colors { + for n := range l.presenters { if len(n) > width { width = len(n) } } - l.width = width + 3 + l.width = width + 1 } // LogConsumer consume logs from services and format them type logConsumer struct { - ctx context.Context - colors map[string]colorFunc - width int - writer io.Writer + ctx context.Context + presenters map[string]*presenter + width int + writer io.Writer + color bool + prefix bool +} + +type presenter struct { + colors colorFunc + service string + container string + prefix string +} + +func (p *presenter) setPrefix(width int) { + p.prefix = p.colors(fmt.Sprintf("%-"+strconv.Itoa(width)+"s |", p.container)) } diff --git a/ecs/aws.go b/ecs/aws.go index b9e85990b..b5fafbcab 100644 --- a/ecs/aws.go +++ b/ecs/aws.go @@ -69,7 +69,7 @@ type API interface { getURLWithPortMapping(ctx context.Context, targetGroupArns []string) ([]compose.PortPublisher, error) ListTasks(ctx context.Context, cluster string, family string) ([]string, error) GetPublicIPs(ctx context.Context, interfaces ...string) (map[string]string, error) - ResolveLoadBalancer(ctx context.Context, nameOrArn string) (awsResource, string, string, []awsResource, error) + ResolveLoadBalancer(ctx context.Context, nameOrArn string) (awsResource, string, string, vpcSubNets, error) GetLoadBalancerURL(ctx context.Context, arn string) (string, error) GetParameter(ctx context.Context, name string) (string, error) SecurityGroupExists(ctx context.Context, sg string) (bool, error) diff --git a/ecs/awsResources.go b/ecs/awsResources.go index 71d9d477e..bcea3dc43 100644 --- a/ecs/awsResources.go +++ b/ecs/awsResources.go @@ -37,10 +37,16 @@ import ( "github.com/sirupsen/logrus" ) +// vpcSubNets classification +type vpcSubNets struct { + public []awsResource + private []awsResource +} + // awsResources hold the AWS component being used or created to support services definition type awsResources struct { vpc string // shouldn't this also be an awsResource ? - subnets []awsResource + subnets vpcSubNets cluster awsResource loadBalancer awsResource loadBalancerType string @@ -66,7 +72,7 @@ func (r *awsResources) allSecurityGroups() []string { func (r *awsResources) subnetsIDs() []string { var ids []string - for _, r := range r.subnets { + for _, r := range append(r.subnets.private, r.subnets.public...) { ids = append(ids, r.ID()) } return ids @@ -207,6 +213,7 @@ func (b *ecsAPIService) parseVPCExtension(ctx context.Context, project *types.Pr } var publicSubNets []awsResource + var privateSubNets []awsResource for _, subNet := range subNets { isPublic, err := b.aws.IsPublicSubnet(ctx, subNet.ID()) if err != nil { @@ -214,6 +221,8 @@ func (b *ecsAPIService) parseVPCExtension(ctx context.Context, project *types.Pr } if isPublic { publicSubNets = append(publicSubNets, subNet) + } else { + privateSubNets = append(privateSubNets, subNet) } } @@ -222,7 +231,8 @@ func (b *ecsAPIService) parseVPCExtension(ctx context.Context, project *types.Pr } r.vpc = vpc - r.subnets = subNets + r.subnets.public = publicSubNets + r.subnets.private = privateSubNets return nil } @@ -315,7 +325,10 @@ func (b *ecsAPIService) ensureResources(resources *awsResources, project *types. if err != nil { return err } - b.ensureLoadBalancer(resources, project, template) + err = b.ensureLoadBalancer(resources, project, template) + if err != nil { + return err + } return nil } @@ -421,15 +434,15 @@ func (b *ecsAPIService) ensureVolumes(r *awsResources, project *types.Project, t return nil } -func (b *ecsAPIService) ensureLoadBalancer(r *awsResources, project *types.Project, template *cloudformation.Template) { +func (b *ecsAPIService) ensureLoadBalancer(r *awsResources, project *types.Project, template *cloudformation.Template) error { if r.loadBalancer != nil { - return + return nil } if allServices(project.Services, func(it types.ServiceConfig) bool { return len(it.Ports) == 0 }) { logrus.Debug("Application does not expose any public port, so no need for a LoadBalancer") - return + return nil } balancerType := getRequiredLoadBalancerType(project) @@ -450,10 +463,15 @@ func (b *ecsAPIService) ensureLoadBalancer(r *awsResources, project *types.Proje }) } + var publicSubNetIDs []string + for _, subNetID := range r.subnets.public { + publicSubNetIDs = append(publicSubNetIDs, subNetID.ID()) + } + template.Resources["LoadBalancer"] = &elasticloadbalancingv2.LoadBalancer{ Scheme: elbv2.LoadBalancerSchemeEnumInternetFacing, SecurityGroups: securityGroups, - Subnets: r.subnetsIDs(), + Subnets: publicSubNetIDs, Tags: projectTags(project), Type: balancerType, LoadBalancerAttributes: loadBalancerAttributes, @@ -463,6 +481,7 @@ func (b *ecsAPIService) ensureLoadBalancer(r *awsResources, project *types.Proje nameProperty: "LoadBalancerName", } r.loadBalancerType = balancerType + return nil } func (r *awsResources) getLoadBalancerSecurityGroups(project *types.Project) []string { diff --git a/ecs/aws_mock.go b/ecs/aws_mock.go index d857cda1c..78f881390 100644 --- a/ecs/aws_mock.go +++ b/ecs/aws_mock.go @@ -604,13 +604,13 @@ func (mr *MockAPIMockRecorder) ResolveFileSystem(arg0, arg1 interface{}) *gomock } // ResolveLoadBalancer mocks base method -func (m *MockAPI) ResolveLoadBalancer(arg0 context.Context, arg1 string) (awsResource, string, string, []awsResource, error) { +func (m *MockAPI) ResolveLoadBalancer(arg0 context.Context, arg1 string) (awsResource, string, string, vpcSubNets, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ResolveLoadBalancer", arg0, arg1) ret0, _ := ret[0].(awsResource) ret1, _ := ret[1].(string) ret2, _ := ret[2].(string) - ret3, _ := ret[3].([]awsResource) + ret3, _ := ret[3].(vpcSubNets) ret4, _ := ret[4].(error) return ret0, ret1, ret2, ret3, ret4 } diff --git a/ecs/sdk.go b/ecs/sdk.go index 34ba88be6..167cfe25c 100644 --- a/ecs/sdk.go +++ b/ecs/sdk.go @@ -1045,7 +1045,7 @@ func (s sdk) GetPublicIPs(ctx context.Context, interfaces ...string) (map[string } } -func (s sdk) ResolveLoadBalancer(ctx context.Context, nameOrArn string) (awsResource, string, string, []awsResource, error) { +func (s sdk) ResolveLoadBalancer(ctx context.Context, nameOrArn string) (awsResource, string, string, vpcSubNets, error) { logrus.Debug("Check if LoadBalancer exists: ", nameOrArn) var arns []*string var names []*string @@ -1060,17 +1060,27 @@ func (s sdk) ResolveLoadBalancer(ctx context.Context, nameOrArn string) (awsReso Names: names, }) if err != nil { - return nil, "", "", nil, err + return nil, "", "", vpcSubNets{}, err } if len(lbs.LoadBalancers) == 0 { - return nil, "", "", nil, errors.Wrapf(errdefs.ErrNotFound, "load balancer %q does not exist", nameOrArn) + return nil, "", "", vpcSubNets{}, errors.Wrapf(errdefs.ErrNotFound, "load balancer %q does not exist", nameOrArn) } it := lbs.LoadBalancers[0] - var subNets []awsResource + var subNets vpcSubNets for _, az := range it.AvailabilityZones { - subNets = append(subNets, existingAWSResource{ - id: aws.StringValue(az.SubnetId), - }) + isPublic, err := s.IsPublicSubnet(ctx, aws.StringValue(az.SubnetId)) + if err != nil { + return nil, "", "", subNets, err + } + if isPublic { + subNets.public = append(subNets.public, existingAWSResource{ + id: aws.StringValue(az.SubnetId), + }) + } else { + subNets.private = append(subNets.private, existingAWSResource{ + id: aws.StringValue(az.SubnetId), + }) + } } return existingAWSResource{ arn: aws.StringValue(it.LoadBalancerArn), diff --git a/ecs/volumes.go b/ecs/volumes.go index df1a296f4..9d046dc36 100644 --- a/ecs/volumes.go +++ b/ecs/volumes.go @@ -32,7 +32,7 @@ import ( func (b *ecsAPIService) createNFSMountTarget(project *types.Project, resources awsResources, template *cloudformation.Template) { for volume := range project.Volumes { - for _, subnet := range resources.subnets { + for _, subnet := range append(resources.subnets.public, resources.subnets.private...) { name := fmt.Sprintf("%sNFSMountTargetOn%s", normalizeResourceName(volume), normalizeResourceName(subnet.ID())) template.Resources[name] = &efs.MountTarget{ FileSystemId: resources.filesystems[volume].ID(), @@ -45,7 +45,7 @@ func (b *ecsAPIService) createNFSMountTarget(project *types.Project, resources a func (b *ecsAPIService) mountTargets(volume string, resources awsResources) []string { var refs []string - for _, subnet := range resources.subnets { + for _, subnet := range append(resources.subnets.public, resources.subnets.private...) { refs = append(refs, fmt.Sprintf("%sNFSMountTargetOn%s", normalizeResourceName(volume), normalizeResourceName(subnet.ID()))) } return refs diff --git a/local/compose/attach.go b/local/compose/attach.go index 5c3096059..3dd1c09a5 100644 --- a/local/compose/attach.go +++ b/local/compose/attach.go @@ -36,13 +36,21 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con return nil, err } + containers.sorted() // This enforce predictable colors assignment + var names []string for _, c := range containers { names = append(names, getCanonicalContainerName(c)) } + fmt.Printf("Attaching to %s\n", strings.Join(names, ", ")) for _, container := range containers { + consumer(compose.ContainerEvent{ + Type: compose.ContainerEventAttach, + Source: getContainerNameWithoutProject(container), + Service: container.Labels[serviceLabel], + }) err := s.attachContainer(ctx, container, consumer, project) if err != nil { return nil, err diff --git a/local/compose/containers.go b/local/compose/containers.go index 0e995b812..c31cd4202 100644 --- a/local/compose/containers.go +++ b/local/compose/containers.go @@ -18,6 +18,7 @@ package compose import ( "context" + "sort" "github.com/compose-spec/compose-go/types" moby "github.com/docker/docker/api/types" @@ -83,3 +84,10 @@ func (containers Containers) forEach(fn func(moby.Container)) { fn(c) } } + +func (containers Containers) sorted() Containers { + sort.Slice(containers, func(i, j int) bool { + return getCanonicalContainerName(containers[i]) < getCanonicalContainerName(containers[j]) + }) + return containers +} diff --git a/utils/logconsumer.go b/utils/logconsumer.go index 8caf6ba22..276d4c550 100644 --- a/utils/logconsumer.go +++ b/utils/logconsumer.go @@ -64,6 +64,12 @@ func (a *allowListLogConsumer) Status(service, container, message string) { } } +func (a *allowListLogConsumer) Register(service string, source string) { + if a.allowList[service] { + a.delegate.Register(service, source) + } +} + type splitBuffer struct { service string container string