From 3f4ca041570273fb24a695183f2a5943d12a71fb Mon Sep 17 00:00:00 2001 From: Edouard Schweisguth Date: Wed, 11 Sep 2024 14:13:44 +0200 Subject: [PATCH] [NOJIRA] Use the ParsePath function to gather the cluster name and run ID (#248) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Use the ParsePath function to gather the cluster name and run ID automatically * removing duplicate for run_id and cluster in config struct * Update pkg/cmd/dump.go Co-authored-by: Simon Maréchal <66471981+Minosity-VR@users.noreply.github.com> --------- Co-authored-by: jt-dd Co-authored-by: jt-dd <112463504+jt-dd@users.noreply.github.com> Co-authored-by: Simon Maréchal <66471981+Minosity-VR@users.noreply.github.com> --- cmd/kubehound/ingest.go | 10 ++- cmd/kubehound/root.go | 5 ++ pkg/cmd/config.go | 9 -- pkg/cmd/dump.go | 25 ++++-- pkg/collector/file.go | 18 ++-- pkg/collector/file_test.go | 4 +- pkg/collector/testdata/kubehound-test.yaml | 3 +- pkg/config/collector.go | 7 +- pkg/config/config_test.go | 6 +- pkg/config/dynamic.go | 12 +++ pkg/config/ingestor.go | 2 - .../testdata/kubehound-file-collector.yaml | 5 +- pkg/dump/ingestor.go | 40 +++++++++ pkg/dump/result_test.go | 83 +++++++++++++++++++ pkg/ingestor/api/api.go | 23 +++-- pkg/kubehound/core/core_dump.go | 1 + pkg/kubehound/core/core_ingest_local.go | 17 +++- pkg/kubehound/core/core_live.go | 19 ++++- 18 files changed, 238 insertions(+), 51 deletions(-) diff --git a/cmd/kubehound/ingest.go b/cmd/kubehound/ingest.go index 042a93b55..8605a9ed2 100644 --- a/cmd/kubehound/ingest.go +++ b/cmd/kubehound/ingest.go @@ -10,6 +10,10 @@ import ( "github.com/spf13/viper" ) +var ( + runID string +) + var ( ingestCmd = &cobra.Command{ Use: "ingest", @@ -23,6 +27,8 @@ var ( Long: `Run an ingestion locally using a previous dump (directory or tar.gz)`, Args: cobra.ExactArgs(1), PreRunE: func(cobraCmd *cobra.Command, args []string) error { + cmd.BindFlagCluster(cobraCmd) + return cmd.InitializeKubehoundConfig(cobraCmd.Context(), "", true, true) }, RunE: func(cobraCmd *cobra.Command, args []string) error { @@ -41,6 +47,7 @@ var ( Short: "Ingest data remotely on a KHaaS instance", Long: `Run an ingestion on KHaaS from a bucket to build the attack path, by default it will rehydrate the latest snapshot previously dumped on a KHaaS instance from all clusters`, PreRunE: func(cobraCmd *cobra.Command, args []string) error { + cmd.BindFlagCluster(cobraCmd) viper.BindPFlag(config.IngestorAPIEndpoint, cobraCmd.Flags().Lookup("khaas-server")) //nolint: errcheck viper.BindPFlag(config.IngestorAPIInsecure, cobraCmd.Flags().Lookup("insecure")) //nolint: errcheck @@ -62,7 +69,7 @@ var ( return core.CoreClientGRPCRehydrateLatest(khCfg.Ingestor) } - return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Ingestor.ClusterName, khCfg.Ingestor.RunID) + return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Dynamic.ClusterName, runID) }, } ) @@ -81,6 +88,7 @@ func init() { ingestCmd.AddCommand(remoteIngestCmd) cmd.InitRemoteIngestCmd(remoteIngestCmd, true) + remoteIngestCmd.Flags().StringVar(&runID, "run_id", "", "KubeHound run id to ingest (e.g.: 01htdgjj34mcmrrksw4bjy2e94)") rootCmd.AddCommand(ingestCmd) } diff --git a/cmd/kubehound/root.go b/cmd/kubehound/root.go index 3bb20e810..7631c2a9b 100644 --- a/cmd/kubehound/root.go +++ b/cmd/kubehound/root.go @@ -51,6 +51,11 @@ var ( return fmt.Errorf("get config: %w", err) } + err = core.CoreInitLive(cobraCmd.Context(), khCfg) + if err != nil { + return err + } + err = core.CoreLive(cobraCmd.Context(), khCfg) if err != nil { return err diff --git a/pkg/cmd/config.go b/pkg/cmd/config.go index eda04c93a..a8de0e239 100644 --- a/pkg/cmd/config.go +++ b/pkg/cmd/config.go @@ -31,16 +31,7 @@ func InitializeKubehoundConfig(ctx context.Context, configPath string, generateR viper.Set(config.DynamicRunID, config.NewRunID()) } - // This code is also used for file ingestion (dump), so it is not needed in this case. So, we can continue if it fails. - clusterName, err := config.GetClusterName(ctx) - if err == nil { - viper.Set(config.DynamicClusterName, clusterName) - } else { - log.I.Errorf("collector cluster info: %v", err) - } - khCfg := config.NewKubehoundConfig(configPath, inline) - // Activate debug mode if needed if khCfg.Debug { log.I.Info("Debug mode activated") diff --git a/pkg/cmd/dump.go b/pkg/cmd/dump.go index a42ef7459..af8f7c462 100644 --- a/pkg/cmd/dump.go +++ b/pkg/cmd/dump.go @@ -44,6 +44,11 @@ func InitRemoteDumpCmd(cmd *cobra.Command) { viper.BindPFlag(config.CollectorFileBlobRegion, cmd.Flags().Lookup("region")) //nolint: errcheck } +func InitLocalIngestCmd(cmd *cobra.Command) { + InitCluster(cmd) + cmd.Flags().MarkDeprecated(flagCluster, "Since v1.4.1, KubeHound dump archive contains a metadata file holding the clustername") //nolint: errcheck +} + func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) { cmd.PersistentFlags().String("khaas-server", "", "GRPC endpoint exposed by KubeHound as a Service (KHaaS) server (e.g.: localhost:9000)") @@ -51,16 +56,18 @@ func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) { // IngestorAPIEndpoint if standalone { - cmd.Flags().String("run_id", "", "KubeHound run id to ingest (e.g.: 01htdgjj34mcmrrksw4bjy2e94)") - viper.BindPFlag(config.IngestorRunID, cmd.Flags().Lookup("run_id")) //nolint: errcheck - - cmd.Flags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)") - viper.BindPFlag(config.IngestorClusterName, cmd.Flags().Lookup("cluster")) //nolint: errcheck + InitCluster(cmd) } } -func InitLocalIngestCmd(cmd *cobra.Command) { - cmd.PersistentFlags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)") - viper.BindPFlag(config.IngestorClusterName, cmd.PersistentFlags().Lookup("cluster")) //nolint: errcheck - cmd.MarkFlagRequired("cluster") //nolint: errcheck +const ( + flagCluster = "cluster" +) + +func InitCluster(cmd *cobra.Command) { + cmd.Flags().String(flagCluster, "", "Cluster name to ingest (e.g.: my-cluster-1)") +} + +func BindFlagCluster(cmd *cobra.Command) { + viper.BindPFlag(config.DynamicClusterName, cmd.Flags().Lookup(flagCluster)) //nolint: errcheck } diff --git a/pkg/collector/file.go b/pkg/collector/file.go index 9ad853bb7..64966ea7c 100644 --- a/pkg/collector/file.go +++ b/pkg/collector/file.go @@ -54,9 +54,10 @@ const ( // FileCollector implements a collector based on local K8s API json files generated outside the KubeHound application via e.g kubectl. type FileCollector struct { - cfg *config.FileCollectorConfig - log *log.KubehoundLogger - tags collectorTags + cfg *config.FileCollectorConfig + log *log.KubehoundLogger + tags collectorTags + clusterName string } // NewFileCollector creates a new instance of the file collector from the provided application config. @@ -73,9 +74,10 @@ func NewFileCollector(ctx context.Context, cfg *config.KubehoundConfig) (Collect l.Infof("Creating file collector from directory %s", cfg.Collector.File.Directory) return &FileCollector{ - cfg: cfg.Collector.File, - log: l, - tags: newCollectorTags(), + cfg: cfg.Collector.File, + log: l, + tags: newCollectorTags(), + clusterName: cfg.Dynamic.ClusterName, }, nil } @@ -98,7 +100,7 @@ func (c *FileCollector) HealthCheck(_ context.Context) (bool, error) { return false, fmt.Errorf("file collector base path is not a directory: %s", file.Name()) } - if c.cfg.ClusterName == "" { + if c.clusterName == "" { return false, errors.New("file collector cluster name not provided") } @@ -107,7 +109,7 @@ func (c *FileCollector) HealthCheck(_ context.Context) (bool, error) { func (c *FileCollector) ClusterInfo(ctx context.Context) (*config.ClusterInfo, error) { return &config.ClusterInfo{ - Name: c.cfg.ClusterName, + Name: c.clusterName, }, nil } diff --git a/pkg/collector/file_test.go b/pkg/collector/file_test.go index 4c100586f..e4e464951 100644 --- a/pkg/collector/file_test.go +++ b/pkg/collector/file_test.go @@ -50,9 +50,9 @@ func TestFileCollector_HealthCheck(t *testing.T) { c = &FileCollector{ cfg: &config.FileCollectorConfig{ - Directory: "testdata/test-cluster/", - ClusterName: "test-cluster", + Directory: "testdata/test-cluster/", }, + clusterName: "test-cluster", } ok, err = c.HealthCheck(context.Background()) diff --git a/pkg/collector/testdata/kubehound-test.yaml b/pkg/collector/testdata/kubehound-test.yaml index 9de84d9bb..6a5af916b 100644 --- a/pkg/collector/testdata/kubehound-test.yaml +++ b/pkg/collector/testdata/kubehound-test.yaml @@ -2,4 +2,5 @@ collector: type: file-collector file: directory: testdata/test-cluster/ - cluster_name: test-cluster \ No newline at end of file +dynamic: + cluster_name: test-cluster diff --git a/pkg/config/collector.go b/pkg/config/collector.go index 40d283970..f669d606d 100644 --- a/pkg/config/collector.go +++ b/pkg/config/collector.go @@ -40,10 +40,9 @@ type K8SAPICollectorConfig struct { // FileCollectorConfig configures the file collector. type FileCollectorConfig struct { - ClusterName string `mapstructure:"cluster_name"` // Target cluster (must be specified in config as not present in JSON files) - Directory string `mapstructure:"directory"` // Base directory holding the K8s data JSON files - Archive *FileArchiveConfig `mapstructure:"archive"` // Archive configuration - Blob *BlobConfig `mapstructure:"blob"` // Blob storage configuration + Directory string `mapstructure:"directory"` // Base directory holding the K8s data JSON files + Archive *FileArchiveConfig `mapstructure:"archive"` // Archive configuration + Blob *BlobConfig `mapstructure:"blob"` // Blob storage configuration } type FileArchiveConfig struct { diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 81c2c3f5e..203f92468 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -32,8 +32,7 @@ func TestMustLoadConfig(t *testing.T) { Collector: CollectorConfig{ Type: CollectorTypeFile, File: &FileCollectorConfig{ - Directory: "cluster-data/", - ClusterName: "test-cluster", + Directory: "cluster-data/", Archive: &FileArchiveConfig{ NoCompress: DefaultArchiveNoCompress, }, @@ -89,6 +88,9 @@ func TestMustLoadConfig(t *testing.T) { ArchiveName: "archive.tar.gz", MaxArchiveSize: DefaultMaxArchiveSize, }, + Dynamic: DynamicConfig{ + ClusterName: "test-cluster", + }, }, wantErr: false, }, diff --git a/pkg/config/dynamic.go b/pkg/config/dynamic.go index 5ec1733cd..dbc2938db 100644 --- a/pkg/config/dynamic.go +++ b/pkg/config/dynamic.go @@ -17,6 +17,18 @@ type DynamicConfig struct { ClusterName string `mapstructure:"cluster_name"` } +func (d *DynamicConfig) HealthCheck() error { + if d.ClusterName == "" { + return fmt.Errorf("missing cluster name") + } + + if d.RunID == nil { + return fmt.Errorf("missing run id") + } + + return nil +} + // DynamicOption is a functional option for configuring the dynamic config. type DynamicOption func() (func(*DynamicConfig), error) diff --git a/pkg/config/ingestor.go b/pkg/config/ingestor.go index b0d18f1e7..675c306ed 100644 --- a/pkg/config/ingestor.go +++ b/pkg/config/ingestor.go @@ -26,8 +26,6 @@ type IngestorConfig struct { TempDir string `mapstructure:"temp_dir"` ArchiveName string `mapstructure:"archive_name"` MaxArchiveSize int64 `mapstructure:"max_archive_size"` - ClusterName string `mapstructure:"cluster_name"` - RunID string `mapstructure:"run_id"` } type IngestorAPIConfig struct { diff --git a/pkg/config/testdata/kubehound-file-collector.yaml b/pkg/config/testdata/kubehound-file-collector.yaml index c92f3a31a..06b941225 100644 --- a/pkg/config/testdata/kubehound-file-collector.yaml +++ b/pkg/config/testdata/kubehound-file-collector.yaml @@ -2,9 +2,10 @@ collector: type: file-collector file: directory: cluster-data/ - cluster_name: test-cluster +dynamic: + cluster_name: test-cluster mongodb: url: "mongodb://localhost:27017" telemetry: statsd: - url: "127.0.0.1:8125" \ No newline at end of file + url: "127.0.0.1:8125" diff --git a/pkg/dump/ingestor.go b/pkg/dump/ingestor.go index 78d9cd99a..cc51dbde5 100644 --- a/pkg/dump/ingestor.go +++ b/pkg/dump/ingestor.go @@ -6,11 +6,13 @@ import ( "fmt" "os" "path/filepath" + "regexp" "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" "github.com/DataDog/KubeHound/pkg/dump/pipeline" "github.com/DataDog/KubeHound/pkg/dump/writer" + "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/span" "github.com/DataDog/KubeHound/pkg/telemetry/tag" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" @@ -103,3 +105,41 @@ func (d *DumpIngestor) Close(ctx context.Context) error { return d.writer.Close(ctx) } + +// Backward Compatibility: Extracting the metadata from the path +const ( + DumpResultFilenameRegex = DumpResultPrefix + DumpResultClusterNameRegex + "_" + DumpResultRunIDRegex + DumpResultExtensionRegex + DumpResultPathRegex = DumpResultClusterNameRegex + "/" + DumpResultFilenameRegex +) + +func ParsePath(path string) (*DumpResult, error) { + log.I.Warnf("[Backward Compatibility] Extracting the metadata from the path: %s", path) + + // .//kubehound__[.tar.gz] + // re := regexp.MustCompile(`([a-z0-9\.\-_]+)/kubehound_([a-z0-9\.-_]+)_([a-z0-9]{26})\.?([a-z0-9\.]+)?`) + re := regexp.MustCompile(DumpResultPathRegex) + if !re.MatchString(path) { + return nil, fmt.Errorf("Invalid path provided: %q", path) + } + + matches := re.FindStringSubmatch(path) + // The cluster name should match (parent dir and in the filename) + if matches[1] != matches[2] { + return nil, fmt.Errorf("Cluster name does not match in the path provided: %q", path) + } + + clusterName := matches[1] + runID := matches[3] + extension := matches[4] + + isCompressed := false + if extension != "" { + isCompressed = true + } + result, err := NewDumpResult(clusterName, runID, isCompressed) + if err != nil { + return nil, err + } + + return result, nil +} diff --git a/pkg/dump/result_test.go b/pkg/dump/result_test.go index 20e2009ed..0ea57a57a 100644 --- a/pkg/dump/result_test.go +++ b/pkg/dump/result_test.go @@ -18,6 +18,89 @@ const ( nonValidRunID = "01j2qs8TH6yarr5hkafysekn0j" ) +func TestParsePath(t *testing.T) { + t.Parallel() + type args struct { + path string + } + tests := []struct { + name string + args args + want *DumpResult + wantErr bool + }{ + { + name: "valid path with no compression", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: &DumpResult{ + Metadata: collector.Metadata{ + ClusterName: validClusterName, + RunID: validRunID, + }, + isDir: true, + extension: "", + }, + wantErr: false, + }, + { + name: "valid path with compressed data", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + want: &DumpResult{ + Metadata: collector.Metadata{ + ClusterName: validClusterName, + RunID: validRunID, + }, + isDir: false, + extension: "tar.gz", + }, + wantErr: false, + }, + { + name: "invalid path", + args: args{ + path: "/tmp/cluster1.k8s.local/cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: nil, + wantErr: true, + }, + { + name: "not matching clustername ", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: nil, + wantErr: true, + }, + { + name: "invalid runID", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8TH6yarr5hkafysekn0j", + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := ParsePath(tt.args.path) + if (err != nil) != tt.wantErr { + t.Errorf("ParsePath() error = %v, wantErr %v", err, tt.wantErr) + + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParsePath() = %v, want %v", got, tt.want) + } + }) + } +} + func TestDumpResult_GetFilename(t *testing.T) { t.Parallel() diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index 2207648cf..3e60cdd0f 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -122,17 +122,29 @@ func (g *IngestorAPI) Ingest(_ context.Context, path string) error { metadataFilePath := filepath.Join(filepath.Dir(archivePath), collector.MetadataPath) md, err := dump.ParseMetadata(runCtx, metadataFilePath) //nolint: contextcheck if err != nil { - return err + log.I.Warnf("no metadata has been parsed (old dump format from v1.4.0 or below do not embed metadata information): %v", err) + // Backward Compatibility: Extracting the metadata from the path + dumpMetadata, err := dump.ParsePath(archivePath) + if err != nil { + log.I.Warn("parsing path for metadata", err) + + return err + } + md = dumpMetadata.Metadata } clusterName := md.ClusterName runID := md.RunID + err = g.Cfg.ComputeDynamic(config.WithClusterName(clusterName), config.WithRunID(runID)) + if err != nil { + return err + } + runCfg := g.Cfg runCfg.Collector = config.CollectorConfig{ Type: config.CollectorTypeFile, File: &config.FileCollectorConfig{ - Directory: filepath.Dir(archivePath), - ClusterName: clusterName, + Directory: filepath.Dir(archivePath), }, } @@ -179,11 +191,6 @@ func (g *IngestorAPI) Ingest(_ context.Context, path string) error { }() log.I.Infof("Loaded %s collector client", collect.Name()) - err = g.Cfg.ComputeDynamic(config.WithClusterName(clusterName), config.WithRunID(runID)) - if err != nil { - return err - } - // Run the ingest pipeline log.I.Info("Starting Kubernetes raw data ingest") alreadyIngestedInDB, err := g.isAlreadyIngestedInDB(runCtx, clusterName, runID) //nolint: contextcheck diff --git a/pkg/kubehound/core/core_dump.go b/pkg/kubehound/core/core_dump.go index 9a140ce19..bf9798ad1 100644 --- a/pkg/kubehound/core/core_dump.go +++ b/pkg/kubehound/core/core_dump.go @@ -36,6 +36,7 @@ func DumpCore(ctx context.Context, khCfg *config.KubehoundConfig, upload bool) ( if err != nil { return "", fmt.Errorf("collector cluster info: %w", err) } + khCfg.Dynamic.ClusterName = clusterName events.PushEvent( fmt.Sprintf("Starting KubeHound dump for %s", clusterName), diff --git a/pkg/kubehound/core/core_ingest_local.go b/pkg/kubehound/core/core_ingest_local.go index f2165ebb7..fd1f1ce4d 100644 --- a/pkg/kubehound/core/core_ingest_local.go +++ b/pkg/kubehound/core/core_ingest_local.go @@ -10,6 +10,7 @@ import ( "github.com/DataDog/KubeHound/pkg/config" "github.com/DataDog/KubeHound/pkg/dump" "github.com/DataDog/KubeHound/pkg/ingestor/puller" + "github.com/DataDog/KubeHound/pkg/telemetry/log" ) func CoreLocalIngest(ctx context.Context, khCfg *config.KubehoundConfig, resultPath string) error { @@ -42,9 +43,21 @@ func CoreLocalIngest(ctx context.Context, khCfg *config.KubehoundConfig, resultP // Getting the metadata from the metadata file md, err := dump.ParseMetadata(ctx, metadataFilePath) if err != nil { - return err + // Backward Compatibility: not returning error for now as the metadata feature is new + log.I.Warnf("no metadata has been parsed (old dump format from v1.4.0 or below do not embed metadata information): %v", err) + } else { + khCfg.Dynamic.ClusterName = md.ClusterName + } + + // Backward Compatibility: Extracting the metadata from the path or input args + // If the cluster name is not provided by the command args (deprecated flag), we try to get it from the path + if khCfg.Dynamic.ClusterName == "" { + dumpMetadata, err := dump.ParsePath(resultPath) + if err != nil { + log.I.Warnf("parsing path for metadata: %v", err) + } + khCfg.Dynamic.ClusterName = dumpMetadata.Metadata.ClusterName } - khCfg.Collector.File.ClusterName = md.ClusterName return CoreLive(ctx, khCfg) } diff --git a/pkg/kubehound/core/core_live.go b/pkg/kubehound/core/core_live.go index 949b9ba77..a78c3ec87 100644 --- a/pkg/kubehound/core/core_live.go +++ b/pkg/kubehound/core/core_live.go @@ -12,15 +12,32 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) +// Setting the current cluster targeted for the live run. +func CoreInitLive(ctx context.Context, khCfg *config.KubehoundConfig) error { + clusterName, err := config.GetClusterName(ctx) + if err != nil { + return fmt.Errorf("collector cluster info: %w", err) + } + khCfg.Dynamic.ClusterName = clusterName + + return nil +} + // CoreLive will launch the KubeHound application to ingest data from a collector and create an attack graph. func CoreLive(ctx context.Context, khCfg *config.KubehoundConfig) error { span, ctx := tracer.StartSpanFromContext(ctx, span.Launch, tracer.Measured()) var err error defer func() { span.Finish(tracer.WithError(err)) }() + // Check for run configuration + err = khCfg.Dynamic.HealthCheck() + if err != nil { + return fmt.Errorf("health check: %w", err) + } + // Start the run start := time.Now() - log.I.Infof("Starting KubeHound (run_id: %s)", khCfg.Dynamic.RunID.String()) + log.I.Infof("Starting KubeHound (run_id: %s, cluster: %s)", khCfg.Dynamic.RunID.String(), khCfg.Dynamic.ClusterName) // Initialize the providers (graph, cache, store) log.I.Info("Initializing providers (graph, cache, store)")