From c1bdfbb5cbc0552d62822d37c37a1dcc04ae7787 Mon Sep 17 00:00:00 2001 From: Ravi Suhag Date: Fri, 26 Sep 2025 20:19:50 -0500 Subject: [PATCH] feat: optimus support maxcompute --- .gitignore | 1 + .gitignore.bak | 36 ++ plugins/extractors/optimus/optimus.go.bak | 259 ++++++++++ plugins/extractors/optimus/optimus_test.go | 144 ++++++ .../extractors/optimus/testdata/expected.json | 471 +++++++++++------- plugins/util.go | 31 +- plugins/util_test.go | 44 ++ 7 files changed, 811 insertions(+), 175 deletions(-) create mode 100644 .gitignore.bak create mode 100644 plugins/extractors/optimus/optimus.go.bak diff --git a/.gitignore b/.gitignore index 997c7e39b..8137292f4 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,7 @@ config.yaml meteor _recipes meteor.yaml +vendor # plugins meteor-plugin-* diff --git a/.gitignore.bak b/.gitignore.bak new file mode 100644 index 000000000..997c7e39b --- /dev/null +++ b/.gitignore.bak @@ -0,0 +1,36 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +.DS_Store + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# IDEs +.idea +.vscode + +# Project specific ignore +.env +config.yaml +meteor +_recipes +meteor.yaml + +# plugins +meteor-plugin-* + +# build +/dist + +.playground diff --git a/plugins/extractors/optimus/optimus.go.bak b/plugins/extractors/optimus/optimus.go.bak new file mode 100644 index 000000000..fffb2619c --- /dev/null +++ b/plugins/extractors/optimus/optimus.go.bak @@ -0,0 +1,259 @@ +package optimus + +import ( + "context" + _ "embed" // used to print the embedded assets + "errors" + "fmt" + "strings" + + "github.com/raystack/meteor/models" + v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" + "github.com/raystack/meteor/plugins" + "github.com/raystack/meteor/plugins/extractors/optimus/client" + "github.com/raystack/meteor/registry" + "github.com/raystack/meteor/utils" + pb "github.com/raystack/optimus/protos/raystack/optimus/core/v1beta1" + "github.com/raystack/salt/log" + "google.golang.org/protobuf/types/known/anypb" +) + +const ( + service = "optimus" + sampleConfig = `host: optimus.com:80` + prefixBigQuery = "bigquery://" + prefixMaxcompute = "maxcompute://" + +var errInvalidDependency = errors.New("invalid dependency") +) + +// Register the extractor to catalog +func init() { + if err := registry.Extractors.Register("optimus", func() plugins.Extractor { + return New(plugins.GetLog(), client.New()) + }); err != nil { + panic(err) + } +} + +//go:embed README.md +var summary string + +// Config holds the set of configuration for the bigquery extractor +type Config struct { + Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"` + MaxSizeInMB int `json:"max_size_in_mb" yaml:"max_size_in_mb" mapstructure:"max_size_in_mb"` +} + +var info = plugins.Info{ + Description: "Optimus' jobs metadata", + SampleConfig: sampleConfig, + Summary: summary, + Tags: []string{"optimus", "bigquery"}, +} + +// Extractor manages the communication with the bigquery service +type Extractor struct { + plugins.BaseExtractor + logger log.Logger + config Config + client client.Client +} + +func New(l log.Logger, c client.Client) *Extractor { + e := &Extractor{ + logger: l, + client: c, + } + e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config) + + return e +} + +// Init initializes the extractor +func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { + if err := e.BaseExtractor.Init(ctx, config); err != nil { + return err + } + + if err := e.client.Connect(ctx, e.config.Host, e.config.MaxSizeInMB); err != nil { + return fmt.Errorf("connect to host: %w", err) + } + + return nil +} + +// Extract checks if the table is valid and extracts the table schema +func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { + defer e.client.Close() + + projResp, err := e.client.ListProjects(ctx, &pb.ListProjectsRequest{}) + if err != nil { + return fmt.Errorf("fetch projects: %w", err) + } + + for _, project := range projResp.Projects { + nspaceResp, err := e.client.ListProjectNamespaces(ctx, &pb.ListProjectNamespacesRequest{ + ProjectName: project.Name, + }) + if err != nil { + e.logger.Error("error fetching namespace list", "project", project.Name, "err", err) + continue + } + + for _, namespace := range nspaceResp.Namespaces { + jobResp, err := e.client.ListJobSpecification(ctx, &pb.ListJobSpecificationRequest{ + ProjectName: project.Name, + NamespaceName: namespace.Name, + }) + if err != nil { + e.logger.Error("error fetching job list", "err", err, "project", project.Name, "namespace", namespace.Name) + continue + } + + for _, job := range jobResp.Jobs { + data, err := e.buildJob(ctx, job, project.Name, namespace.Name) + if err != nil { + e.logger.Error( + "error building job model", + "err", err, + "project", project.Name, + "namespace", namespace.Name, + "job", job.Name) + continue + } + + emit(models.NewRecord(data)) + } + } + } + + return nil +} + +func (e *Extractor) buildJob(ctx context.Context, jobSpec *pb.JobSpecification, project, namespace string) (*v1beta2.Asset, error) { + jobResp, err := e.client.GetJobTask(ctx, &pb.GetJobTaskRequest{ + ProjectName: project, + NamespaceName: namespace, + JobName: jobSpec.Name, + }) + if err != nil { + return nil, fmt.Errorf("fetching task: %w", err) + } + + task := jobResp.Task + upstreams, downstreams, err := e.buildLineage(task) + if err != nil { + return nil, fmt.Errorf("building lineage: %w", err) + } + + jobID := fmt.Sprintf("%s.%s.%s", project, namespace, jobSpec.Name) + urn := models.NewURN(service, e.UrnScope, "job", jobID) + + jobModel, err := anypb.New(&v1beta2.Job{ + Attributes: utils.TryParseMapToProto(map[string]interface{}{ + "version": jobSpec.Version, + "project": project, + "namespace": namespace, + "owner": jobSpec.Owner, + "startDate": strOrNil(jobSpec.StartDate), + "endDate": strOrNil(jobSpec.EndDate), + "interval": jobSpec.Interval, + "dependsOnPast": jobSpec.DependsOnPast, + "catchUp": jobSpec.CatchUp, + "taskName": jobSpec.TaskName, + "windowSize": jobSpec.WindowSize, + "windowOffset": jobSpec.WindowOffset, + "windowTruncateTo": jobSpec.WindowTruncateTo, + "sql": jobSpec.Assets["query.sql"], + "task": map[string]interface{}{ + "name": task.Name, + "description": task.Description, + "image": task.Image, + }, + }), + }) + if err != nil { + return nil, fmt.Errorf("create Any struct: %w", err) + } + + return &v1beta2.Asset{ + Urn: urn, + Name: jobSpec.Name, + Service: service, + Description: jobSpec.Description, + Type: "job", + Data: jobModel, + Owners: []*v1beta2.Owner{ + { + Urn: jobSpec.Owner, + Email: jobSpec.Owner, + }, + }, + Lineage: &v1beta2.Lineage{ + Upstreams: upstreams, + Downstreams: downstreams, + }, + }, nil +} + +func (e *Extractor) buildLineage(task *pb.JobTask) (upstreams, downstreams []*v1beta2.Resource, err error) { + upstreams, err = e.buildUpstreams(task) + if err != nil { + return nil, nil, fmt.Errorf("build upstreams: %w", err) + } + + downstreams, err = e.buildDownstreams(task) + if err != nil { + return nil, nil, fmt.Errorf("build downstreams: %w", err) + } + + return upstreams, downstreams, nil +} + +func (e *Extractor) buildUpstreams(task *pb.JobTask) ([]*v1beta2.Resource, error) { + var upstreams []*v1beta2.Resource + for _, dependency := range task.Dependencies { + urn, err := plugins.BigQueryTableFQNToURN( + strings.TrimPrefix(dependency.Dependency, "bigquery://"), + ) + if err != nil { + return nil, err + } + + upstreams = append(upstreams, &v1beta2.Resource{ + Urn: urn, + Type: "table", + Service: "bigquery", + }) + } + + return upstreams, nil +} + +func (e *Extractor) buildDownstreams(task *pb.JobTask) ([]*v1beta2.Resource, error) { + if task.Destination == nil || task.Destination.Destination == "" { + return nil, nil + } + + urn, err := plugins.BigQueryTableFQNToURN( + strings.TrimPrefix(task.Destination.Destination, "bigquery://"), + ) + if err != nil { + return nil, err + } + + return []*v1beta2.Resource{{ + Urn: urn, + Type: "table", + Service: "bigquery", + }}, nil +} + +func strOrNil(s string) interface{} { + if s == "" { + return nil + } + + return s +} diff --git a/plugins/extractors/optimus/optimus_test.go b/plugins/extractors/optimus/optimus_test.go index 2ee0405f6..a51a056fe 100644 --- a/plugins/extractors/optimus/optimus_test.go +++ b/plugins/extractors/optimus/optimus_test.go @@ -309,6 +309,13 @@ func setupExtractExpectation(ctx context.Context, client *mockClient) { }, Secrets: []*pb.ProjectSpecification_ProjectSecret{}, }, + { + Name: "projectMaxCompute-A", + Config: map[string]string{ + "BAR": "foo", + }, + Secrets: []*pb.ProjectSpecification_ProjectSecret{}, + }, { Name: "project-B", Config: map[string]string{ @@ -330,6 +337,17 @@ func setupExtractExpectation(ctx context.Context, client *mockClient) { }, }, nil).Once() + client.On("ListProjectNamespaces", ctx, &pb.ListProjectNamespacesRequest{ + ProjectName: "projectMaxCompute-A", + }, mock.Anything).Return(&pb.ListProjectNamespacesResponse{ + Namespaces: []*pb.NamespaceSpecification{ + { + Name: "namespaceMaxCompute-A", + Config: map[string]string{}, + }, + }, + }, nil).Once() + client.On("ListProjectNamespaces", ctx, &pb.ListProjectNamespacesRequest{ ProjectName: "project-B", }, mock.Anything).Return(&pb.ListProjectNamespacesResponse{ @@ -473,6 +491,100 @@ func setupExtractExpectation(ctx context.Context, client *mockClient) { }, }, nil).Once() + client.On("ListJobSpecification", ctx, &pb.ListJobSpecificationRequest{ + ProjectName: "projectMaxCompute-A", + NamespaceName: "namespaceMaxCompute-A", + }, mock.Anything).Return(&pb.ListJobSpecificationResponse{ + Jobs: []*pb.JobSpecification{ + { + Version: 1, + Name: "jobMaxCompute-B", + Owner: "jane_doe@example.com", + StartDate: "2021-01-01", + EndDate: "", + Interval: "0 19 1 * *", + DependsOnPast: false, + CatchUp: true, + TaskName: "mc2mc", + Config: []*pb.JobConfigItem{ + { + Name: "FOO_B_1", + Value: "BAR_B_1", + }, + { + Name: "FOO_B_2", + Value: "BAR_B_2", + }, + }, + WindowSize: "720h", + WindowOffset: "-720h", + WindowTruncateTo: "M", + Dependencies: []*pb.JobDependency{}, + Assets: map[string]string{ + "query.sql": "SELECT * FROM projectZ.schemaY.tableX", + }, + Hooks: []*pb.JobSpecHook{}, + Description: "sample description for jobMaxCompute-B", + Labels: map[string]string{ + "orchestrator": "optimus", + }, + Behavior: &pb.JobSpecification_Behavior{ + Retry: &pb.JobSpecification_Behavior_Retry{ + Count: 0, + Delay: &durationpb.Duration{ + Seconds: 0, + }, + ExponentialBackoff: false, + }, + Notify: []*pb.JobSpecification_Behavior_Notifiers{}, + }, + }, + { + Version: 1, + Name: "jobMaxCompute-C", + Owner: "jane_doe@example.com", + StartDate: "2021-01-01", + EndDate: "", + Interval: "0 19 1 * *", + DependsOnPast: false, + CatchUp: true, + TaskName: "gcs2mc", + Config: []*pb.JobConfigItem{ + { + Name: "FOO_B_1", + Value: "BAR_B_1", + }, + { + Name: "FOO_B_2", + Value: "BAR_B_2", + }, + }, + WindowSize: "720h", + WindowOffset: "-720h", + WindowTruncateTo: "M", + Dependencies: []*pb.JobDependency{}, + Assets: map[string]string{ + "query.sql": "SELECT * FROM projectZ.schemaY.tableX", + }, + Hooks: []*pb.JobSpecHook{}, + Description: "sample description for jobMaxCompute-C", + Labels: map[string]string{ + "orchestrator": "optimus", + }, + Behavior: &pb.JobSpecification_Behavior{ + Retry: &pb.JobSpecification_Behavior_Retry{ + Count: 0, + Delay: &durationpb.Duration{ + Seconds: 0, + }, + ExponentialBackoff: false, + }, + Notify: []*pb.JobSpecification_Behavior_Notifiers{}, + }, + }, + }, + }, nil).Once() + client.On("GetJobTask", ctx, &pb.GetJobTaskRequest{ ProjectName: "project-A", NamespaceName: "namespace-A", @@ -512,6 +624,25 @@ func setupExtractExpectation(ctx context.Context, client *mockClient) { }, }, nil).Once() + client.On("GetJobTask", ctx, &pb.GetJobTaskRequest{ + ProjectName: "projectMaxCompute-A", + NamespaceName: "namespaceMaxCompute-A", + JobName: "jobMaxCompute-B", + }, mock.Anything).Return(&pb.GetJobTaskResponse{ + Task: &pb.JobTask{ + Name: "taskMaxCompute-B", + Description: "task's description MaxCompute B", + Image: "task's image MaxCompute B", + Destination: &pb.JobTask_Destination{ + Destination: "maxcompute://dst-b-project.dst-b-dataset.dst-b-table", + }, + Dependencies: []*pb.JobTask_Dependency{ + {Dependency: "bigquery://src-b1-project:src-b1-dataset.src-b1-table"}, + {Dependency: "maxcompute://src-b2-project.src-b2-dataset.src-b2-table"}, + }, + }, + }, nil).Once() + client.On("GetJobTask", ctx, &pb.GetJobTaskRequest{ ProjectName: "project-A", NamespaceName: "namespace-A", @@ -524,4 +655,17 @@ func setupExtractExpectation(ctx context.Context, client *mockClient) { Destination: &pb.JobTask_Destination{}, }, }, nil).Once() + + client.On("GetJobTask", ctx, &pb.GetJobTaskRequest{ + ProjectName: "projectMaxCompute-A", + NamespaceName: "namespaceMaxCompute-A", + JobName: "jobMaxCompute-C", + }, mock.Anything).Return(&pb.GetJobTaskResponse{ + Task: &pb.JobTask{ + Name: "taskMaxCompute-C", + Description: "task's description MaxCompute C", + Image: "task's image MaxCompute C", + Destination: &pb.JobTask_Destination{}, + }, + }, nil).Once() } diff --git a/plugins/extractors/optimus/testdata/expected.json b/plugins/extractors/optimus/testdata/expected.json index 3488dd7be..857985675 100644 --- a/plugins/extractors/optimus/testdata/expected.json +++ b/plugins/extractors/optimus/testdata/expected.json @@ -1,183 +1,306 @@ [ - { - "create_time": null, - "data": { - "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", - "attributes": { - "dependsOnPast": false, - "endDate": null, - "interval": "0 19 * * *", - "namespace": "namespace-A", - "owner": "john_doe@example.com", - "project": "project-A", - "sql": "SELECT * FROM projectA.datasetB.tableC", - "startDate": "2019-09-16", - "task": { - "description": "task's description", - "image": "task's image", - "name": "task-A" - }, - "taskName": "bq2bq", - "version": 1, - "windowOffset": "24h", - "windowSize": "48h", - "windowTruncateTo": "d" - }, - "create_time": null, - "update_time": null + { + "create_time": null, + "data": { + "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", + "attributes": { + "catchUp": false, + "dependsOnPast": false, + "endDate": null, + "interval": "0 19 * * *", + "namespace": "namespace-A", + "owner": "john_doe@example.com", + "project": "project-A", + "sql": "SELECT * FROM projectA.datasetB.tableC", + "startDate": "2019-09-16", + "task": { + "description": "task's description", + "image": "task's image", + "name": "task-A" }, - "description": "sample description for job-A", - "event": null, - "labels": {}, - "lineage": { - "downstreams": [ - { - "name": "", - "service": "bigquery", - "type": "table", - "urn": "urn:bigquery:dst-project:table:dst-project:dst-dataset.dst-table" - } - ], - "upstreams": [ - { - "name": "", - "service": "bigquery", - "type": "table", - "urn": "urn:bigquery:src-project:table:src-project:src-dataset.src-table" - } - ] + "taskName": "bq2bq", + "version": 1, + "windowOffset": "24h", + "windowSize": "48h", + "windowTruncateTo": "d" + }, + "create_time": null, + "update_time": null + }, + "description": "sample description for job-A", + "event": null, + "labels": {}, + "lineage": { + "downstreams": [ + { + "name": "", + "service": "bigquery", + "type": "table", + "urn": "urn:bigquery:dst-project:table:dst-project:dst-dataset.dst-table" + } + ], + "upstreams": [ + { + "name": "", + "service": "bigquery", + "type": "table", + "urn": "urn:bigquery:src-project:table:src-project:src-dataset.src-table" + } + ] + }, + "name": "job-A", + "owners": [ + { + "email": "john_doe@example.com", + "name": "", + "role": "", + "urn": "john_doe@example.com" + } + ], + "service": "optimus", + "type": "job", + "update_time": null, + "url": "", + "urn": "urn:optimus:test-optimus:job:project-A.namespace-A.job-A" + }, + { + "create_time": null, + "data": { + "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", + "attributes": { + "catchUp": true, + "dependsOnPast": false, + "endDate": null, + "interval": "0 19 1 * *", + "namespace": "namespace-A", + "owner": "jane_doe@example.com", + "project": "project-A", + "sql": "SELECT * FROM projectZ.datasetY.tableX", + "startDate": "2021-01-01", + "task": { + "description": "task's description B", + "image": "task's image B", + "name": "task-B" }, - "name": "job-A", - "owners": [ - { - "email": "john_doe@example.com", - "name": "", - "role": "", - "urn": "john_doe@example.com" - } - ], - "service": "optimus", - "type": "job", - "update_time": null, - "url": "", - "urn": "urn:optimus:test-optimus:job:project-A.namespace-A.job-A" + "taskName": "bq2bq", + "version": 1, + "windowOffset": "-720h", + "windowSize": "720h", + "windowTruncateTo": "M" + }, + "create_time": null, + "update_time": null }, - { - "create_time": null, - "data": { - "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", - "attributes": { - "dependsOnPast": false, - "endDate": null, - "interval": "0 19 1 * *", - "namespace": "namespace-A", - "owner": "jane_doe@example.com", - "project": "project-A", - "sql": "SELECT * FROM projectZ.datasetY.tableX", - "startDate": "2021-01-01", - "task": { - "description": "task's description B", - "image": "task's image B", - "name": "task-B" - }, - "taskName": "bq2bq", - "version": 1, - "windowOffset": "-720h", - "windowSize": "720h", - "windowTruncateTo": "M" - }, - "create_time": null, - "update_time": null + "description": "sample description for job-B", + "event": null, + "labels": {}, + "lineage": { + "downstreams": [ + { + "name": "", + "service": "bigquery", + "type": "table", + "urn": "urn:bigquery:dst-b-project:table:dst-b-project:dst-b-dataset.dst-b-table" + } + ], + "upstreams": [ + { + "name": "", + "service": "bigquery", + "type": "table", + "urn": "urn:bigquery:src-b1-project:table:src-b1-project:src-b1-dataset.src-b1-table" }, - "description": "sample description for job-B", - "event": null, - "labels": {}, - "lineage": { - "downstreams": [ - { - "name": "", - "service": "bigquery", - "type": "table", - "urn": "urn:bigquery:dst-b-project:table:dst-b-project:dst-b-dataset.dst-b-table" - } - ], - "upstreams": [ - { - "name": "", - "service": "bigquery", - "type": "table", - "urn": "urn:bigquery:src-b1-project:table:src-b1-project:src-b1-dataset.src-b1-table" - }, - { - "name": "", - "service": "bigquery", - "type": "table", - "urn": "urn:bigquery:src-b2-project:table:src-b2-project:src-b2-dataset.src-b2-table" - } - ] + { + "name": "", + "service": "bigquery", + "type": "table", + "urn": "urn:bigquery:src-b2-project:table:src-b2-project:src-b2-dataset.src-b2-table" + } + ] + }, + "name": "job-B", + "owners": [ + { + "email": "jane_doe@example.com", + "name": "", + "role": "", + "urn": "jane_doe@example.com" + } + ], + "service": "optimus", + "type": "job", + "update_time": null, + "url": "", + "urn": "urn:optimus:test-optimus:job:project-A.namespace-A.job-B" + }, + { + "create_time": null, + "data": { + "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", + "attributes": { + "catchUp": true, + "dependsOnPast": false, + "endDate": null, + "interval": "0 19 1 * *", + "namespace": "namespace-A", + "owner": "jane_doe@example.com", + "project": "project-A", + "sql": "SELECT * FROM projectZ.datasetY.tableX", + "startDate": "2021-01-01", + "task": { + "description": "task's description C", + "image": "task's image C", + "name": "task-C" }, - "name": "job-B", - "owners": [ - { - "email": "jane_doe@example.com", - "name": "", - "role": "", - "urn": "jane_doe@example.com" - } - ], - "service": "optimus", - "type": "job", - "update_time": null, - "url": "", - "urn": "urn:optimus:test-optimus:job:project-A.namespace-A.job-B" + "taskName": "gcs2bq", + "version": 1, + "windowOffset": "-720h", + "windowSize": "720h", + "windowTruncateTo": "M" + }, + "create_time": null, + "update_time": null }, - { - "create_time": null, - "data": { - "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", - "attributes": { - "dependsOnPast": false, - "endDate": null, - "interval": "0 19 1 * *", - "namespace": "namespace-A", - "owner": "jane_doe@example.com", - "project": "project-A", - "sql": "SELECT * FROM projectZ.datasetY.tableX", - "startDate": "2021-01-01", - "task": { - "description": "task's description C", - "image": "task's image C", - "name": "task-C" - }, - "taskName": "gcs2bq", - "version": 1, - "windowOffset": "-720h", - "windowSize": "720h", - "windowTruncateTo": "M" - }, - "create_time": null, - "update_time": null + "description": "sample description for job-C", + "event": null, + "labels": {}, + "lineage": { + "downstreams": [], + "upstreams": [] + }, + "name": "job-C", + "owners": [ + { + "email": "jane_doe@example.com", + "name": "", + "role": "", + "urn": "jane_doe@example.com" + } + ], + "service": "optimus", + "type": "job", + "update_time": null, + "url": "", + "urn": "urn:optimus:test-optimus:job:project-A.namespace-A.job-C" + }, + { + "create_time": null, + "data": { + "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", + "attributes": { + "catchUp": true, + "dependsOnPast": false, + "endDate": null, + "interval": "0 19 1 * *", + "namespace": "namespaceMaxCompute-A", + "owner": "jane_doe@example.com", + "project": "projectMaxCompute-A", + "sql": "SELECT * FROM projectZ.schemaY.tableX", + "startDate": "2021-01-01", + "task": { + "description": "task's description MaxCompute C", + "image": "task's image MaxCompute C", + "name": "taskMaxCompute-C" }, - "description": "sample description for job-C", - "event": null, - "labels": {}, - "lineage": { - "downstreams": [], - "upstreams": [] + "taskName": "gcs2mc", + "version": 1, + "windowOffset": "-720h", + "windowSize": "720h", + "windowTruncateTo": "M" + }, + "create_time": null, + "update_time": null + }, + "description": "sample description for jobMaxCompute-C", + "event": null, + "labels": {}, + "lineage": { + "downstreams": [], + "upstreams": [] + }, + "name": "jobMaxCompute-C", + "owners": [ + { + "email": "jane_doe@example.com", + "name": "", + "role": "", + "urn": "jane_doe@example.com" + } + ], + "service": "optimus", + "type": "job", + "update_time": null, + "url": "", + "urn": "urn:optimus:test-optimus:job:projectMaxCompute-A.namespaceMaxCompute-A.jobMaxCompute-C" + }, + { + "create_time": null, + "data": { + "@type": "type.googleapis.com/raystack.assets.v1beta2.Job", + "attributes": { + "catchUp": true, + "dependsOnPast": false, + "endDate": null, + "interval": "0 19 1 * *", + "namespace": "namespaceMaxCompute-A", + "owner": "jane_doe@example.com", + "project": "projectMaxCompute-A", + "sql": "SELECT * FROM projectZ.schemaY.tableX", + "startDate": "2021-01-01", + "task": { + "description": "task's description MaxCompute B", + "image": "task's image MaxCompute B", + "name": "taskMaxCompute-B" + }, + "taskName": "mc2mc", + "version": 1, + "windowOffset": "-720h", + "windowSize": "720h", + "windowTruncateTo": "M" + }, + "create_time": null, + "update_time": null + }, + "description": "sample description for jobMaxCompute-B", + "event": null, + "labels": {}, + "lineage": { + "downstreams": [ + { + "name": "", + "service": "maxcompute", + "type": "table", + "urn": "urn:maxcompute:dst-b-project:table:dst-b-project.dst-b-dataset.dst-b-table" + } + ], + "upstreams": [ + { + "name": "", + "service": "bigquery", + "type": "table", + "urn": "urn:bigquery:src-b1-project:table:src-b1-project:src-b1-dataset.src-b1-table" }, - "name": "job-C", - "owners": [ - { - "email": "jane_doe@example.com", - "name": "", - "role": "", - "urn": "jane_doe@example.com" - } - ], - "service": "optimus", - "type": "job", - "update_time": null, - "url": "", - "urn": "urn:optimus:test-optimus:job:project-A.namespace-A.job-C" - } -] \ No newline at end of file + { + "name": "", + "service": "maxcompute", + "type": "table", + "urn": "urn:maxcompute:src-b2-project:table:src-b2-project.src-b2-dataset.src-b2-table" + } + ] + }, + "name": "jobMaxCompute-B", + "owners": [ + { + "email": "jane_doe@example.com", + "name": "", + "role": "", + "urn": "jane_doe@example.com" + } + ], + "service": "optimus", + "type": "job", + "update_time": null, + "url": "", + "urn": "urn:optimus:test-optimus:job:projectMaxCompute-A.namespaceMaxCompute-A.jobMaxCompute-B" + } +] diff --git a/plugins/util.go b/plugins/util.go index c794178e0..907f4a8bd 100644 --- a/plugins/util.go +++ b/plugins/util.go @@ -17,6 +17,8 @@ import ( "github.com/raystack/meteor/models" ) +const numberOfFqnComponents = 3 + var validate *validator.Validate func init() { @@ -70,6 +72,22 @@ func buildConfig(configMap map[string]interface{}, c interface{}) (err error) { return err } +// MaxComputeTableFQNToURN get URN from FQN (Fully Qualified Name) MaxCompute +func MaxComputeTableFQNToURN(fqn string) (string, error) { + projectName, schemaName, tableName, err := parseMaxComputeTableFQN(fqn) + if err != nil { + return "", fmt.Errorf("map URN: %w", err) + } + + return MaxComputeURN(projectName, schemaName, tableName), nil +} + +func MaxComputeURN(projectName, schemaName, tableName string) string { + fqn := fmt.Sprintf("%s.%s.%s", projectName, schemaName, tableName) + return models.NewURN("maxcompute", projectName, "table", fqn) +} + +// BigQueryTableFQNToURN get URN from FQN (Fully Qualified Name) BigQuery func BigQueryTableFQNToURN(fqn string) (string, error) { projectID, datasetID, tableID, err := parseBQTableFQN(fqn) if err != nil { @@ -145,10 +163,21 @@ func parseBQTableFQN(fqn string) (projectID, datasetID, tableID string, err erro ss := strings.FieldsFunc(fqn, func(r rune) bool { return r == ':' || r == '.' }) - if len(ss) < 3 { + if len(ss) != numberOfFqnComponents { return "", "", "", fmt.Errorf( "unexpected BigQuery table FQN '%s', expected in format projectID:datasetID.tableID", fqn, ) } return ss[0], ss[1], ss[2], nil } + +func parseMaxComputeTableFQN(fqn string) (projectName, schemaName, tableName string, err error) { //nolint:revive + // fqn is projectID.schema.tableID format. + ss := strings.Split(fqn, ".") + if len(ss) != numberOfFqnComponents { + return "", "", "", fmt.Errorf( + "unexpected MaxCompute table FQN '%s', expected in format projectName.schemaName.tableName", fqn, + ) + } + return ss[0], ss[1], ss[2], nil +} diff --git a/plugins/util_test.go b/plugins/util_test.go index 01682ed35..c7666b733 100644 --- a/plugins/util_test.go +++ b/plugins/util_test.go @@ -11,6 +11,50 @@ import ( "github.com/stretchr/testify/assert" ) +func TestMaxComputeURN(t *testing.T) { + t.Run("should create maxcompute URN", func(t *testing.T) { + project := "my-project" + schema := "my-dataset" + table := "my-table" + + actual := plugins.MaxComputeURN(project, schema, table) + expected := "urn:maxcompute:my-project:table:my-project.my-dataset.my-table" + + assert.Equal(t, expected, actual) + }) +} + +func TestMaxComputeTableFQNToURN(t *testing.T) { + cases := []struct { + name string + fqn string + expected string + expectedErr string + }{ + { + name: "Valid", + fqn: "mc-raw-internal.dagstream.production_feast09_s2id13_30min_demand", + expected: "urn:maxcompute:mc-raw-internal:table:mc-raw-internal.dagstream.production_feast09_s2id13_30min_demand", + }, + { + name: "Invalid1", + fqn: "test1.test2", + expectedErr: "map URN: unexpected MaxCompute table FQN 'test1.test2', expected in format", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + urn, err := plugins.MaxComputeTableFQNToURN(tc.fqn) + if tc.expectedErr != "" { + assert.ErrorContains(t, err, tc.expectedErr) + } else { + assert.NoError(t, err) + } + assert.Equal(t, tc.expected, urn) + }) + } +} + func TestBigQueryURN(t *testing.T) { t.Run("should create bigquery URN", func(t *testing.T) { project := "my-project"