Skip to content

Commit

Permalink
only-one-copy-in-graph-per-cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
jt-dd committed Nov 13, 2024
1 parent c76e63f commit 8a18333
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 6 deletions.
5 changes: 4 additions & 1 deletion pkg/ingestor/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notif
}
}

func (g *IngestorAPI) Close(ctx context.Context) {
g.providers.Close(ctx)
}

// RehydrateLatest is just a GRPC wrapper around the Ingest method from the API package
func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedCluster, error) {
l := log.Logger(ctx)
l.Error("id123")
// first level key are cluster names
directories, errRet := g.puller.ListFiles(ctx, "", false)
if errRet != nil {
Expand Down
21 changes: 16 additions & 5 deletions pkg/kubehound/core/core_grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"github.com/DataDog/KubeHound/pkg/ingestor/notifier/noop"
"github.com/DataDog/KubeHound/pkg/ingestor/puller/blob"
"github.com/DataDog/KubeHound/pkg/kubehound/providers"
"github.com/DataDog/KubeHound/pkg/telemetry/events"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error {
func initCoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) (*api.IngestorAPI, error) {
l := log.Logger(ctx)
l.Info("Starting KubeHound Distributed Ingestor Service")
span, ctx := span.SpanRunFromContext(ctx, span.IngestorLaunch)
Expand All @@ -28,22 +29,32 @@ func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error {
l.Info("Initializing providers (graph, cache, store)")
p, err := providers.NewProvidersFactoryConfig(ctx, khCfg)
if err != nil {
return fmt.Errorf("factory config creation: %w", err)
return nil, fmt.Errorf("factory config creation: %w", err)
}
defer p.Close(ctx)

l.Info("Creating Blob Storage provider")
puller, err := blob.NewBlobStorage(khCfg, khCfg.Ingestor.Blob)
if err != nil {
return err
return nil, err
}

l.Info("Creating Noop Notifier")
noopNotifier := noop.NewNoopNotifier()

l.Info("Creating Ingestor API")
ingestorApi := api.NewIngestorAPI(khCfg, puller, noopNotifier, p)
return api.NewIngestorAPI(khCfg, puller, noopNotifier, p), nil
}

func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error {
ingestorApi, err := initCoreGrpcApi(ctx, khCfg)
defer ingestorApi.Close(ctx)
if err != nil {
events.PushEventIngestorFailed(ctx)

Check failure on line 52 in pkg/kubehound/core/core_grpc_api.go

View workflow job for this annotation

GitHub Actions / system-test

undefined: events.PushEventIngestorFailed

Check failure on line 52 in pkg/kubehound/core/core_grpc_api.go

View workflow job for this annotation

GitHub Actions / linter

undefined: events.PushEventIngestorFailed

Check failure on line 52 in pkg/kubehound/core/core_grpc_api.go

View workflow job for this annotation

GitHub Actions / linter

undefined: events.PushEventIngestorFailed
return err
}
events.PushEventIngestorInit(ctx)

Check failure on line 55 in pkg/kubehound/core/core_grpc_api.go

View workflow job for this annotation

GitHub Actions / system-test

undefined: events.PushEventIngestorInit

Check failure on line 55 in pkg/kubehound/core/core_grpc_api.go

View workflow job for this annotation

GitHub Actions / linter

undefined: events.PushEventIngestorInit (typecheck)

Check failure on line 55 in pkg/kubehound/core/core_grpc_api.go

View workflow job for this annotation

GitHub Actions / linter

undefined: events.PushEventIngestorInit) (typecheck)

l := log.Logger(ctx)
l.Info("Starting Ingestor API")
err = grpc.Listen(ctx, ingestorApi)
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions pkg/kubehound/storage/graphdb/janusgraph_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,29 @@ func (jgp *JanusGraphProvider) Close(ctx context.Context) error {

return nil
}

// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries.
func (jgp *JanusGraphProvider) Clean(ctx context.Context, cluster string) error {
l := log.Trace(ctx)
l.Infof("Cleaning cluster", log.FieldClusterKey, cluster)
g := gremlin.Traversal_().WithRemote(jgp.drc)
tx := g.Tx()
defer tx.Close()

gtx, err := tx.Begin()
if err != nil {
return err
}

err = <-gtx.V().Has("cluster", cluster).Drop().Iterate()
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
}

return nil
}
43 changes: 43 additions & 0 deletions pkg/kubehound/storage/graphdb/mocks/graph_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/kubehound/storage/graphdb/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Provider interface {
// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries.
Raw() any

// Droping all assets from the graph database from a cluster name
Clean(ctx context.Context, cluster string) error

// VertexWriter creates a new AsyncVertexWriter instance to enable asynchronous bulk inserts of vertices.
VertexWriter(ctx context.Context, v vertex.Builder, c cache.CacheProvider, opts ...WriterOption) (AsyncVertexWriter, error)

Expand Down

0 comments on commit 8a18333

Please sign in to comment.