Skip to content

Commit

Permalink
feat: refactored maxcompute client with interface and added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
solsticemj25 committed Nov 15, 2024
1 parent c1206de commit 03a843b
Show file tree
Hide file tree
Showing 7 changed files with 780 additions and 127 deletions.
11 changes: 1 addition & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ require (
github.com/ClickHouse/clickhouse-go v1.4.5
github.com/IBM/sarama v1.43.2
github.com/MakeNowJust/heredoc v1.0.0
github.com/alibabacloud-go/darabonba-openapi/v2 v2.0.10
github.com/alibabacloud-go/maxcompute-20220104 v1.4.1
github.com/aliyun/aliyun-odps-go-sdk v0.3.14
github.com/aws/aws-sdk-go v1.44.151
github.com/blastrain/vitess-sqlparser v0.0.0-20201030050434-a139afbb1aba
Expand Down Expand Up @@ -85,14 +83,8 @@ require (
github.com/Microsoft/go-winio v0.5.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/alecthomas/chroma v0.10.0 // indirect
github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.5 // indirect
github.com/alibabacloud-go/debug v1.0.1 // indirect
github.com/alibabacloud-go/endpoint-util v1.1.0 // indirect
github.com/alibabacloud-go/openapi-util v0.1.0 // indirect
github.com/alibabacloud-go/tea v1.2.2 // indirect
github.com/alibabacloud-go/tea-utils v1.3.1 // indirect
github.com/alibabacloud-go/tea-utils/v2 v2.0.6 // indirect
github.com/alibabacloud-go/tea-xml v1.1.3 // indirect
github.com/aliyun/credentials-go v1.3.10 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
Expand All @@ -115,7 +107,6 @@ require (
github.com/briandowns/spinner v1.20.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/charmbracelet/glamour v0.6.0 // indirect
github.com/clbanning/mxj/v2 v2.5.5 // indirect
github.com/cli/safeexec v1.0.1 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/containerd/continuity v0.3.0 // indirect
Expand Down Expand Up @@ -155,6 +146,7 @@ require (
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
github.com/gorilla/css v1.0.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
Expand Down Expand Up @@ -225,7 +217,6 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/tklauser/go-sysconf v0.3.11 // indirect
github.com/tklauser/numcpus v0.6.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand Down
69 changes: 0 additions & 69 deletions go.sum

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions plugins/extractors/maxcompute/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package maxcompute

import (
"context"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)

type MaxComputeClient struct {
client *odps.Odps
project *odps.Project
}

func NewMaxComputeClient(config Config) *MaxComputeClient {
aliAccount := account.NewAliyunAccount(config.AccessKey.ID, config.AccessKey.Secret)
client := odps.NewOdps(aliAccount, config.EndpointProject)
client.SetDefaultProjectName(config.ProjectName)

project := client.Project(config.ProjectName)

return &MaxComputeClient{
client: client,
project: project,
}
}

func (c *MaxComputeClient) ListSchema(ctx context.Context) (schemas []*odps.Schema, err error) {
err = c.project.Schemas().List(func(schema *odps.Schema, err2 error) {
if err2 != nil {
err = err2
return
}
schemas = append(schemas, schema)
})

return
}

func (c *MaxComputeClient) ListTable(ctx context.Context, schemaName string) (tables []*odps.Table, err error) {
t := odps.NewTables(c.client, c.project.Name(), schemaName)
t.List(
func(table *odps.Table, err2 error) {
if err2 != nil {
err = err2
return
}
tables = append(tables, table)
},
)
return
}

func (c *MaxComputeClient) GetTable(ctx context.Context, table *odps.Table) (*odps.Table, error) {
err := table.Load()
if err != nil {
isView := table.Schema().IsVirtualView || table.Schema().IsMaterializedView
isLoaded := table.IsLoaded()
if !isView || (isView && !isLoaded) {
return nil, err
}
}
return table, nil
}
125 changes: 77 additions & 48 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/goto/meteor/models"
Expand Down Expand Up @@ -40,10 +39,13 @@ type Extractor struct {
logger log.Logger
config Config

client *odps.Odps
eg *errgroup.Group
client Client
newClient NewClientFunc
eg *errgroup.Group
}

type NewClientFunc func(ctx context.Context, logger log.Logger, config Config) (Client, error)

//go:embed README.md
var summary string

Expand All @@ -70,9 +72,17 @@ var info = plugins.Info{
Summary: summary,
}

func New(logger log.Logger) *Extractor {
//go:generate mockery --name=Client -r --case underscore --with-expecter --structname MaxComputeClient --filename maxcompute_client_mock.go --output=./mocks
type Client interface {
ListSchema(ctx context.Context) ([]*odps.Schema, error)
ListTable(ctx context.Context, schemaName string) ([]*odps.Table, error)
GetTable(ctx context.Context, table *odps.Table) (*odps.Table, error)
}

func New(logger log.Logger, clientFunc NewClientFunc) *Extractor {
e := &Extractor{
logger: logger,
logger: logger,
newClient: clientFunc,
}
e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
e.ScopeNotRequired = true

Check failure on line 88 in plugins/extractors/maxcompute/maxcompute.go

View workflow job for this annotation

GitHub Actions / golangci

e.ScopeNotRequired undefined (type *Extractor has no field or method ScopeNotRequired) (typecheck)

Check failure on line 88 in plugins/extractors/maxcompute/maxcompute.go

View workflow job for this annotation

GitHub Actions / golangci

e.ScopeNotRequired undefined (type *Extractor has no field or method ScopeNotRequired) (typecheck)
Expand All @@ -85,69 +95,80 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
return err
}

aliAccount := account.NewAliyunAccount(e.config.AccessKey.ID, e.config.AccessKey.Secret)
e.client = odps.NewOdps(aliAccount, e.config.EndpointProject)
e.client.SetDefaultProjectName(e.config.ProjectName)
if e.config.ProjectName == "" {
return fmt.Errorf("project_name is required")
}
if e.config.AccessKey.ID == "" || e.config.AccessKey.Secret == "" {
return fmt.Errorf("access_key is required")
}
if e.config.EndpointProject == "" {
return fmt.Errorf("endpoint_project is required")
}
if e.config.Concurrency == 0 {
e.config.Concurrency = 1
}

var err error
e.client, err = e.newClient(ctx, e.logger, e.config)
if err != nil {
return err
}

e.eg = &errgroup.Group{}
e.eg.SetLimit(e.config.Concurrency)

return nil
}

func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error {
schemas := e.client.Project(e.config.ProjectName).Schemas()
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {

err := schemas.List(func(schema *odps.Schema, err error) {
if err != nil {
e.logger.Error("failed to list schemas", "error", err)
return
}
schemas, err := e.client.ListSchema(ctx)

if err != nil {
return err
}

for _, schema := range schemas {
if e.config.SchemaName != "" && schema.Name() != e.config.SchemaName {
return
continue
}
if contains(e.config.Exclude.Schemas, schema.Name()) {
return
continue
}

tables := odps.NewTables(e.client, e.config.ProjectName, schema.Name())
err := e.fetchTablesFromSchema(ctx, schema, emit)
if err != nil {
return err
}
}

return e.eg.Wait()
}

e.fetchTablesFromSchema(tables, emit)
})
func (e *Extractor) fetchTablesFromSchema(ctx context.Context, schema *odps.Schema, emit plugins.Emit) error {

tables, err := e.client.ListTable(ctx, schema.Name())
if err != nil {
return err
}
return e.eg.Wait()
}

func (e *Extractor) fetchTablesFromSchema(tables *odps.Tables, emit plugins.Emit) {
tables.List(
func(t *odps.Table, err error) {
if err != nil {
e.logger.Error("table list", err)
return
}

if contains(e.config.Exclude.Tables, fmt.Sprintf("%s.%s", t.SchemaName(), t.Name())) {
return
}

e.eg.Go(func() error {
return e.processTable(t, emit)
})
},
)
for _, table := range tables {
if contains(e.config.Exclude.Tables, fmt.Sprintf("%s.%s", table.SchemaName(), table.Name())) {
continue
}

e.eg.Go(func() error {
return e.processTable(ctx, table, emit)
})
}

return nil
}

func (e *Extractor) processTable(table *odps.Table, emit plugins.Emit) error {
err := table.Load()
func (e *Extractor) processTable(ctx context.Context, table *odps.Table, emit plugins.Emit) error {
table, err := e.client.GetTable(ctx, table)
if err != nil {
isView := table.Schema().IsVirtualView || table.Schema().IsMaterializedView
isLoaded := table.IsLoaded()
if !isView || (isView && !isLoaded) {
e.logger.Error("failed to get table info", "schema", table.SchemaName(), "table", table.Name(), "error", err)
return err
}
return err
}

asset, err := e.buildAsset(table)
Expand Down Expand Up @@ -256,6 +277,10 @@ func buildTableAttributesData(tableInfo *odps.Table) map[string]interface{} {
attributesData["resource_url"] = tableInfo.ResourceUrl()
}

// if tableInfo.PartitionColumns() != nil {
// attributesData["partition_field"] = tableInfo.PartitionColumns()
// }

return attributesData
}

Expand Down Expand Up @@ -294,8 +319,12 @@ func contains(slice []string, item string) bool {

func init() {
if err := registry.Extractors.Register("maxcompute", func() plugins.Extractor {
return New(plugins.GetLog())
return New(plugins.GetLog(), CreateClient)
}); err != nil {
panic(err)
}
}

func CreateClient(ctx context.Context, logger log.Logger, config Config) (Client, error) {
return NewMaxComputeClient(config), nil
}
Loading

0 comments on commit 03a843b

Please sign in to comment.