From 99f29d65b20690d0b3248c0b18057646e1610b58 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Fri, 2 Feb 2024 18:49:36 +1100 Subject: [PATCH 1/4] feat: glue table creation with some docs on testing --- catalog/catalog.go | 44 ++++++++++++- catalog/catalog_test.go | 123 +++++++++++++++++++++++++++++++++++++ catalog/glue.go | 76 ++++++++++++++++++++++- catalog/glue_test.go | 48 +++++++++++++++ docs/cfn/AWS_TESTING.md | 74 ++++++++++++++++++++++ docs/cfn/glue-catalog.yaml | 70 +++++++++++++++++++++ go.mod | 8 +-- go.sum | 16 ++--- internal/mock_fs.go | 4 ++ io/io.go | 24 ++++++-- io/local.go | 4 ++ table/metadata.go | 30 +++++++++ table/table.go | 27 ++++++++ table/table_test.go | 28 +++++++++ 14 files changed, 556 insertions(+), 20 deletions(-) create mode 100644 catalog/catalog_test.go create mode 100644 docs/cfn/AWS_TESTING.md create mode 100644 docs/cfn/glue-catalog.yaml diff --git a/catalog/catalog.go b/catalog/catalog.go index d6d7f1e..512834e 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -21,11 +21,14 @@ import ( "context" "crypto/tls" "errors" + "fmt" "net/url" + "strings" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/google/uuid" ) type CatalogType string @@ -52,6 +55,14 @@ func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] { } } +// WithDefaultLocation sets the default location for the catalog, this is used +// when a location is not provided in the create table operation. +func WithDefaultLocation(location string) Option[GlueCatalog] { + return func(o *options) { + o.defaultLocation = location + } +} + func WithCredential(cred string) Option[RestCatalog] { return func(o *options) { o.credential = cred @@ -117,7 +128,8 @@ func WithPrefix(prefix string) Option[RestCatalog] { type Option[T GlueCatalog | RestCatalog] func(*options) type options struct { - awsConfig aws.Config + awsConfig aws.Config + defaultLocation string tlsConfig *tls.Config credential string @@ -185,3 +197,33 @@ func TableNameFromIdent(ident table.Identifier) string { func NamespaceFromIdent(ident table.Identifier) table.Identifier { return ident[:len(ident)-1] } + +func getMetadataPath(locationPath string, newVersion int) (string, error) { + if newVersion < 0 { + return "", fmt.Errorf("invalid table version: %d must be a non-negative integer", newVersion) + } + + metaDataPath, err := url.JoinPath(strings.TrimLeft(locationPath, "/"), "metadata", fmt.Sprintf("%05d-%s.metadata.json", newVersion, uuid.New().String())) + if err != nil { + return "", fmt.Errorf("failed to build metadata path: %w", err) + } + + return metaDataPath, nil +} + +func getLocationForTable(location, defaultLocation, database, tableName string) (*url.URL, error) { + if location != "" { + return url.Parse(location) + } + + if defaultLocation == "" { + return nil, fmt.Errorf("no default path is set, please specify a location when creating a table") + } + + u, err := url.Parse(defaultLocation) + if err != nil { + return nil, fmt.Errorf("failed to parse location URL: %w", err) + } + + return u.JoinPath(fmt.Sprintf("%s.db", database), tableName), nil +} diff --git a/catalog/catalog_test.go b/catalog/catalog_test.go new file mode 100644 index 0000000..9e19a3f --- /dev/null +++ b/catalog/catalog_test.go @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package catalog + +import ( + "regexp" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetLocationForTable(t *testing.T) { + type args struct { + location string + defaultLocation string + database string + tableName string + } + + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "should return location if location is provided", + args: args{ + location: "s3://new-bucket/test-table", + defaultLocation: "s3://test-bucket", + database: "test-database", + tableName: "test-table", + }, + want: "s3://new-bucket/test-table", + wantErr: false, + }, + { + name: "should return default location with generated path if location is not provided", + args: args{ + location: "", + defaultLocation: "s3://test-bucket/test-prefix/", + database: "test-database", + tableName: "test-table", + }, + want: "s3://test-bucket/test-prefix/test-database.db/test-table", + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getLocationForTable(tt.args.location, tt.args.defaultLocation, tt.args.database, tt.args.tableName) + if (err != nil) != tt.wantErr { + t.Errorf("tableS3Location() error = %v, wantErr %v", err, tt.wantErr) + return + } + + if got.String() != tt.want { + t.Errorf("tableS3Location() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestGetMetadataPath(t *testing.T) { + type args struct { + locationPath string + version int + } + + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "should return metadata location with version 0", + args: args{ + locationPath: "/test-table/", + version: 1, + }, + want: "^test-table/metadata/00001-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$", + wantErr: false, + }, + { + name: "should return metadata location with version 1", + args: args{ + locationPath: "/test-table/", + version: 0, + }, + want: "^test-table/metadata/00000-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$", + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getMetadataPath(tt.args.locationPath, tt.args.version) + if (err != nil) != tt.wantErr { + t.Errorf("getMetadataPath() error = %v, wantErr %v", err, tt.wantErr) + return + } + + require.Regexp(t, regexp.MustCompile(tt.want), got) + }) + } +} diff --git a/catalog/glue.go b/catalog/glue.go index b398185..b0a0ece 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -19,6 +19,7 @@ package catalog import ( "context" + "encoding/json" "errors" "fmt" @@ -39,10 +40,12 @@ var ( type glueAPI interface { GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) + CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) } type GlueCatalog struct { - glueSvc glueAPI + glueSvc glueAPI + defaultLocation string } func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog { @@ -53,7 +56,8 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog { } return &GlueCatalog{ - glueSvc: glue.NewFromConfig(glueOps.awsConfig), + glueSvc: glue.NewFromConfig(glueOps.awsConfig), + defaultLocation: glueOps.defaultLocation, } } @@ -121,6 +125,55 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier return icebergTable, nil } +func (c *GlueCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder table.SortOrder, location string, props map[string]string) (*table.Table, error) { + database, tableName, err := identifierToGlueTable(identifier) + if err != nil { + return nil, err + } + + locationURL, err := getLocationForTable(location, c.defaultLocation, database, tableName) + if err != nil { + return nil, err + } + + // append metadata path to location path/to/database.db/tablename/metadata/00000-UUID.metadata.json + metadataPath, err := getMetadataPath(locationURL.Path, 0) + if err != nil { + return nil, err + } + + tbl, err := table.NewTable(identifier, schema, partitionSpec, sortOrder, locationURL.String(), props) + if err != nil { + return nil, err + } + + err = c.writeMetaData(locationURL.String(), metadataPath, tbl.Metadata()) + if err != nil { + return nil, err + } + + params := &glue.CreateTableInput{ + DatabaseName: aws.String(database), + + TableInput: &types.TableInput{ + Name: aws.String(tableName), + TableType: aws.String("EXTERNAL_TABLE"), + Parameters: map[string]string{ + "table_type": glueTableTypeIceberg, + "metadata_location": fmt.Sprintf("%s://%s/%s", locationURL.Scheme, locationURL.Host, metadataPath), + }, + StorageDescriptor: &types.StorageDescriptor{Location: aws.String(locationURL.String())}, + }, + } + + _, err = c.glueSvc.CreateTable(ctx, params) + if err != nil { + return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) + } + + return tbl, nil +} + func (c *GlueCatalog) CatalogType() CatalogType { return Glue } @@ -176,6 +229,25 @@ func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) return tblRes.Table.Parameters["metadata_location"], nil } +func (c *GlueCatalog) writeMetaData(location, metadataPath string, metadata table.Metadata) error { + iofs, err := io.LoadFS(map[string]string{}, location) + if err != nil { + return fmt.Errorf("failed to load fs: %w", err) + } + + data, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal table metadata: %w", err) + } + + err = iofs.WriteFile(metadataPath, data, 0644) + if err != nil { + return fmt.Errorf("failed to write metadata file: %w", err) + } + + return nil +} + func identifierToGlueTable(identifier table.Identifier) (string, string, error) { if len(identifier) != 2 { return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) diff --git a/catalog/glue_test.go b/catalog/glue_test.go index 1d3c42f..8e87374 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -22,6 +22,8 @@ import ( "os" "testing" + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/glue" @@ -44,6 +46,11 @@ func (m *mockGlueClient) GetTables(ctx context.Context, params *glue.GetTablesIn return args.Get(0).(*glue.GetTablesOutput), args.Error(1) } +func (m *mockGlueClient) CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.CreateTableOutput), args.Error(1) +} + func TestGlueGetTable(t *testing.T) { assert := require.New(t) @@ -146,3 +153,44 @@ func TestGlueLoadTableIntegration(t *testing.T) { assert.NoError(err) assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier()) } + +func TestGlueCreateTableIntegration(t *testing.T) { + if os.Getenv("TEST_DATABASE_NAME") == "" { + t.Skip() + } + if os.Getenv("TEST_CREATE_TABLE_NAME") == "" { + t.Skip() + } + if os.Getenv("TEST_CREATE_TABLE_LOCATION") == "" { + t.Skip() + } + + assert := require.New(t) + + location := os.Getenv("TEST_CREATE_TABLE_LOCATION") + + schema := iceberg.NewSchemaWithIdentifiers(1, []int{}, + iceberg.NestedField{ + ID: 1, Name: "vendor_id", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ + ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ + ID: 3, Name: "datetime", Type: iceberg.PrimitiveTypes.TimestampTz}) + partSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}}) + + props := map[string]string{ + "write.target-file-size-bytes": "536870912", + "write.format.default": "parquet", + } + + awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) + assert.NoError(err) + + catalog := NewGlueCatalog(WithAwsConfig(awscfg)) + + table, err := catalog.CreateTable(context.TODO(), + []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, schema, partSpec, table.UnsortedSortOrder, location, props) + assert.NoError(err) + assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, table.Identifier()) +} diff --git a/docs/cfn/AWS_TESTING.md b/docs/cfn/AWS_TESTING.md new file mode 100644 index 0000000..ad71f0a --- /dev/null +++ b/docs/cfn/AWS_TESTING.md @@ -0,0 +1,74 @@ + + +# AWS integration testing + +To validate the glue catalog you will need to create some test resources. + +# Prerequisites + +1. An AWS account. +2. [AWS CLI](https://aws.amazon.com/cli/) is installed. +2. Exported environment variables for `AWS_DEFAULT_REGION`, `AWS_REGION` and `AWS_PROFILE`, I use [direnv](https://direnv.net/) to maintain these variables in a `.envrc` file. +3. Your have logged into an AWS account via the AWS CLI. + +The way to deploy this template is using the included cloudformation template is as follows: + +``` +aws cloudformation deploy --stack-name test-iceberg-glue-catalog --template-file docs/cfn/glue-catalog.yaml +``` + +Once deployed you can retrieve the outputs of the stack. + +``` +aws cloudformation describe-stacks --stack-name test-iceberg-glue-catalog --query 'Stacks[0].Outputs' +``` + +This should output JSON as follows: + +``` +[ + { + "OutputKey": "IcebergBucket", + "OutputValue": "test-iceberg-glue-catalog-icebergbucket-abc123abc123" + }, + { + "OutputKey": "GlueDatabase", + "OutputValue": "iceberg_test" + } +] +``` + +Export the required environment variables. + +``` +# the glue database from the outputs of the stack +export TEST_DATABASE_NAME=iceberg_test + +# the s3 bucket name from the outputs of the stack +export TEST_CREATE_TABLE_LOCATION=s3://test-iceberg-glue-catalog-icebergbucket-abc123abc123/testing + +# the name of the table you will create in the glue catalog +export TEST_CREATE_TABLE_NAME=records +``` + +Run the creation integration test to validate the catalog creation, and provide a table which can be used to validate other integration tests. + +``` +go test -v -run TestGlueCreateTableIntegration ./catalog +``` + diff --git a/docs/cfn/glue-catalog.yaml b/docs/cfn/glue-catalog.yaml new file mode 100644 index 0000000..b058542 --- /dev/null +++ b/docs/cfn/glue-catalog.yaml @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +AWSTemplateFormatVersion: "2010-09-09" +Description: "apache-iceberg: Glue database and S3 bucket for integration testing" + +Parameters: + Stage: + Type: String + Description: The stage where the stack is running in, e.g., dev, test, prod. + Default: test + +Outputs: + GlueDatabase: + Value: !Ref GlueDatabase + IcebergBucket: + Value: !Ref IcebergBucket + +Resources: + GlueDatabase: + Type: AWS::Glue::Database + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseInput: + Name: !Sub iceberg_${Stage} + Description: iceberg database + + IcebergBucket: + Type: AWS::S3::Bucket + Properties: + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + PublicAccessBlockConfiguration: + BlockPublicAcls: true + BlockPublicPolicy: true + IgnorePublicAcls: true + RestrictPublicBuckets: true + + IcebergBucketPolicy: + Type: AWS::S3::BucketPolicy + Properties: + Bucket: !Ref IcebergBucket + PolicyDocument: + Statement: + - Sid: AllowSSLRequestsOnly + Effect: Deny + Principal: "*" + Action: + - s3:* + Resource: + - Fn::Sub: arn:aws:s3:::${IcebergBucket}/* + - Fn::Sub: arn:aws:s3:::${IcebergBucket} + Condition: + Bool: + aws:SecureTransport: false diff --git a/go.mod b/go.mod index 800fd60..612b8ea 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.27.16 github.com/aws/aws-sdk-go-v2/credentials v1.17.16 github.com/aws/aws-sdk-go-v2/service/glue v1.73.1 - github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0 + github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1 github.com/aws/smithy-go v1.20.2 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 github.com/google/uuid v1.6.0 @@ -47,9 +47,9 @@ require ( github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.20.9 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.28.10 // indirect @@ -74,7 +74,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.20.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.20.0 // indirect diff --git a/go.sum b/go.sum index e411441..1fe6b80 100644 --- a/go.sum +++ b/go.sum @@ -40,14 +40,14 @@ github.com/aws/aws-sdk-go-v2/service/glue v1.73.1 h1:z/NBYW8RygzWrDgNWib10fuLUBl github.com/aws/aws-sdk-go-v2/service/glue v1.73.1/go.mod h1:F3B9DC5FsIHAxUtHZdY5KUeqN+tHoGlRPzSSYdXjC38= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9 h1:UXqEWQI0n+q0QixzU0yUUQBZXRd5037qdInTIHFTl98= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9/go.mod h1:xP6Gq6fzGZT8w/ZN+XvGMZ2RU1LeEs7b2yUP5DN8NY4= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 h1:L0ai8WICYHozIKK+OtPzVJBugL7culcuM4E4JOpIEm8= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10/go.mod h1:byqfyxJBshFk0fF9YmK0M0ugIO8OWjzH2T3bPG4eGuA= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 h1:Wx0rlZoEJR7JwlSZcHnEa7CNjrSIyVxMFWGAaXy4fJY= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9/go.mod h1:aVMHdE0aHO3v+f/iw01fmXV/5DbfQ3Bi9nN7nd9bE9Y= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7 h1:uO5XR6QGBcmPyo2gxofYJLFkcVQ4izOoGDNenlZhTEk= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7/go.mod h1:feeeAYfAcwTReM6vbwjEyDmiGho+YgBhaFULuXDW8kc= -github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0 h1:PJTdBMsyvra6FtED7JZtDpQrIAflYDHFoZAu/sKYkwU= -github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0/go.mod h1:4qXHrG1Ne3VGIMZPCB8OjH/pLFO94sKABIusjh0KWPU= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 h1:KOxnQeWy5sXyS37fdKEvAsGHOr9fa/qvwxfJurR/BzE= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10/go.mod h1:jMx5INQFYFYB3lQD9W0D8Ohgq6Wnl7NYOJ2TQndbulI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1 h1:5XNlsBsEvBZBMO6p82y+sqpWg8j5aBCe+5C2GBFgqBQ= +github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1/go.mod h1:4qXHrG1Ne3VGIMZPCB8OjH/pLFO94sKABIusjh0KWPU= github.com/aws/aws-sdk-go-v2/service/sso v1.20.9 h1:aD7AGQhvPuAxlSUfo0CWU7s6FpkbyykMhGYMvlqTjVs= github.com/aws/aws-sdk-go-v2/service/sso v1.20.9/go.mod h1:c1qtZUWtygI6ZdvKppzCSXsDOq5I4luJPZ0Ud3juFCA= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3 h1:Pav5q3cA260Zqez42T9UhIlsd9QeypszRPwC9LdSSsQ= @@ -169,8 +169,8 @@ golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/internal/mock_fs.go b/internal/mock_fs.go index 95f6c3f..069d4ce 100644 --- a/internal/mock_fs.go +++ b/internal/mock_fs.go @@ -39,6 +39,10 @@ func (m *MockFS) Remove(name string) error { return m.Called(name).Error(0) } +func (m *MockFS) WriteFile(name string, data []byte, perm fs.FileMode) error { + return m.Called(name, data, perm).Error(0) +} + type MockFSReadFile struct { MockFS } diff --git a/io/io.go b/io/io.go index abe5971..7700570 100644 --- a/io/io.go +++ b/io/io.go @@ -48,6 +48,9 @@ type IO interface { // // If there is an error, it will be of type *PathError. Remove(name string) error + + // Create creates or truncates the named file. + WriteFile(name string, data []byte, perm fs.FileMode) error } // ReadFileIO is the interface implemented by a file system that @@ -166,12 +169,23 @@ func (f ioFS) Remove(name string) error { return r.Remove(name) } +func (f ioFS) WriteFile(name string, data []byte, perm fs.FileMode) error { + r, ok := f.fsys.(interface { + WriteFile(name string, data []byte, perm fs.FileMode) error + }) + if !ok { + return errMissingWriteFile + } + return r.WriteFile(name, data, perm) +} + var ( - errMissingReadDir = errors.New("fs.File directory missing ReadDir method") - errMissingSeek = errors.New("fs.File missing Seek method") - errMissingReadAt = errors.New("fs.File missing ReadAt") - errMissingRemove = errors.New("fs.FS missing Remove method") - errMissingReadFile = errors.New("fs.FS missing ReadFile method") + errMissingReadDir = errors.New("fs.File directory missing ReadDir method") + errMissingSeek = errors.New("fs.File missing Seek method") + errMissingReadAt = errors.New("fs.File missing ReadAt") + errMissingRemove = errors.New("fs.FS missing Remove method") + errMissingWriteFile = errors.New("fs.FS missing WriteFile method") + errMissingReadFile = errors.New("fs.FS missing ReadFile method") ) type ioFile struct { diff --git a/io/local.go b/io/local.go index befa831..40d0f4a 100644 --- a/io/local.go +++ b/io/local.go @@ -30,3 +30,7 @@ func (LocalFS) Open(name string) (File, error) { func (LocalFS) Remove(name string) error { return os.Remove(name) } + +func (LocalFS) WriteFile(name string, data []byte, perm os.FileMode) error { + return os.WriteFile(name, data, perm) +} diff --git a/table/metadata.go b/table/metadata.go index 957e163..c92b4cc 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "io" + "time" "github.com/apache/iceberg-go" @@ -399,3 +400,32 @@ func (m *MetadataV2) UnmarshalJSON(b []byte) error { m.preValidate() return m.validate() } + +func NewMetadataV2(schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder SortOrder, location string, tableUUID uuid.UUID, properties iceberg.Properties) (*MetadataV2, error) { + metadata := &MetadataV2{ + commonMetadata: commonMetadata{ + FormatVersion: 2, + UUID: tableUUID, + Loc: location, + Specs: []iceberg.PartitionSpec{partitionSpec}, + DefaultSpecID: partitionSpec.ID(), + SortOrderList: []SortOrder{sortOrder}, + DefaultSortOrderID: sortOrder.OrderID, + SchemaList: []*iceberg.Schema{schema}, + CurrentSchemaID: schema.ID, + Props: properties, + LastColumnId: schema.HighestFieldID(), + LastPartitionID: intToPtr(partitionSpec.LastAssignedFieldID()), + LastUpdatedMS: time.Now().UnixMilli(), + }, + } + + metadata.preValidate() + + err := metadata.validate() + if err != nil { + return nil, fmt.Errorf("invalid metadata: %w", err) + } + + return metadata, nil +} diff --git a/table/table.go b/table/table.go index 80b68b3..243e85b 100644 --- a/table/table.go +++ b/table/table.go @@ -22,6 +22,7 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/io" + "github.com/google/uuid" "golang.org/x/exp/slices" ) @@ -95,3 +96,29 @@ func NewFromLocation(ident Identifier, metalocation string, fsys io.IO) (*Table, } return New(ident, meta, metalocation, fsys), nil } + +func NewTable(ident Identifier, schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder SortOrder, location string, properties iceberg.Properties) (*Table, error) { + if properties == nil { + properties = make(iceberg.Properties) + } + + tableUUID := uuid.New() + + // TODO: we need to "freshen" the sequences in the schema, partition spec, and sort order + + metadata, err := NewMetadataV2(schema, partitionSpec, sortOrder, location, tableUUID, properties) + if err != nil { + return nil, err + } + + return &Table{ + identifier: ident, + metadata: metadata, + metadataLocation: location, + fs: nil, + }, nil +} + +func intToPtr(i int) *int { + return &i +} diff --git a/table/table_test.go b/table/table_test.go index cde94ab..80d068b 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -19,6 +19,8 @@ package table_test import ( "bytes" + "encoding/json" + "fmt" "testing" "github.com/apache/iceberg-go" @@ -128,3 +130,29 @@ func (t *TableTestSuite) TestSnapshotByName() { t.True(testSnapshot.Equals(*t.tbl.SnapshotByName("test"))) } + +func (t *TableTestSuite) TestNewTable() { + + identifier := table.Identifier{"records"} + + schema := iceberg.NewSchemaWithIdentifiers(1, []int{1}, + iceberg.NestedField{ + ID: 1, Name: "vendor_id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ + ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ + ID: 3, Name: "datetime", Type: iceberg.PrimitiveTypes.TimestampTz}) + + partSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}}) + + location := "s3://bucket/test/location" + + tbl, err := table.NewTable(identifier, schema, partSpec, table.UnsortedSortOrder, location, nil) + t.Require().NoError(err) + t.Require().Equal(identifier, tbl.Identifier()) + + data, err := json.Marshal(tbl.Metadata()) + t.Require().NoError(err) + fmt.Println(string(data)) +} From 2578b8389c1240b9ecd9d2b7037a6a625ee8a83e Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Sat, 1 Jun 2024 09:32:27 +1000 Subject: [PATCH 2/4] refactor: update table creation to use builder --- catalog/catalog.go | 2 +- catalog/glue.go | 2 +- table/table.go | 47 +++++++++++++++++++++++++++++++++++++++------ table/table_test.go | 5 ++++- 4 files changed, 47 insertions(+), 9 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index 512834e..a6d7eab 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -211,7 +211,7 @@ func getMetadataPath(locationPath string, newVersion int) (string, error) { return metaDataPath, nil } -func getLocationForTable(location, defaultLocation, database, tableName string) (*url.URL, error) { +func GetLocationForTable(location, defaultLocation, database, tableName string) (*url.URL, error) { if location != "" { return url.Parse(location) } diff --git a/catalog/glue.go b/catalog/glue.go index b0a0ece..83f52f0 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -142,7 +142,7 @@ func (c *GlueCatalog) CreateTable(ctx context.Context, identifier table.Identifi return nil, err } - tbl, err := table.NewTable(identifier, schema, partitionSpec, sortOrder, locationURL.String(), props) + tbl, err := table.NewTableBuilder(identifier, schema, location).WithPartitionSpec(partitionSpec).WithSortOrder(sortOrder).WithProperties(props).Build() if err != nil { return nil, err } diff --git a/table/table.go b/table/table.go index 243e85b..a4874f0 100644 --- a/table/table.go +++ b/table/table.go @@ -97,24 +97,59 @@ func NewFromLocation(ident Identifier, metalocation string, fsys io.IO) (*Table, return New(ident, meta, metalocation, fsys), nil } -func NewTable(ident Identifier, schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder SortOrder, location string, properties iceberg.Properties) (*Table, error) { - if properties == nil { - properties = make(iceberg.Properties) +type TableBuilder struct { + ident Identifier + schema *iceberg.Schema + partitionSpec iceberg.PartitionSpec + sortOrder SortOrder + location string + properties iceberg.Properties +} + +// NewTableBuilder creates a new TableBuilder for building a Table. +// +// The ident, schema and location parameters are required to create a Table, others +// can be specified with the corresponding builder methods. +func NewTableBuilder(ident Identifier, schema *iceberg.Schema, location string) *TableBuilder { + return &TableBuilder{ + ident: ident, + schema: schema, + location: location, + properties: make(iceberg.Properties), } +} + +// WithPartitionSpec sets the partition spec for the table. The partition spec defines how data is partitioned in the table. +func (b *TableBuilder) WithPartitionSpec(spec iceberg.PartitionSpec) *TableBuilder { + b.partitionSpec = spec + return b +} +// WithSortOrder sets the sort order for the table. The sort order defines how data is sorted in the table. +func (b *TableBuilder) WithSortOrder(sortOrder SortOrder) *TableBuilder { + b.sortOrder = sortOrder + return b +} + +func (b *TableBuilder) WithProperties(properties iceberg.Properties) *TableBuilder { + b.properties = properties + return b +} + +func (b *TableBuilder) Build() (*Table, error) { tableUUID := uuid.New() // TODO: we need to "freshen" the sequences in the schema, partition spec, and sort order - metadata, err := NewMetadataV2(schema, partitionSpec, sortOrder, location, tableUUID, properties) + metadata, err := NewMetadataV2(b.schema, b.partitionSpec, b.sortOrder, b.location, tableUUID, b.properties) if err != nil { return nil, err } return &Table{ - identifier: ident, + identifier: b.ident, metadata: metadata, - metadataLocation: location, + metadataLocation: b.location, fs: nil, }, nil } diff --git a/table/table_test.go b/table/table_test.go index 80d068b..fd8ddb4 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -147,8 +147,11 @@ func (t *TableTestSuite) TestNewTable() { SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}}) location := "s3://bucket/test/location" + tbl, err := table.NewTableBuilder(identifier, schema, location). + WithPartitionSpec(partSpec). + WithSortOrder(table.UnsortedSortOrder). + Build() - tbl, err := table.NewTable(identifier, schema, partSpec, table.UnsortedSortOrder, location, nil) t.Require().NoError(err) t.Require().Equal(identifier, tbl.Identifier()) From 9fd17f64dfacd565a9a1e6c5d827ccd070e67e20 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Sun, 9 Jun 2024 13:06:05 +1000 Subject: [PATCH 3/4] changes based on feedback. * moved cfn to root of the project and ditched the docs folder * moved table meta data name generation to table package * added a builder for the table * isolated and generified the metadata related writing * remove the default location as it isn't neccessary at the moment --- catalog/catalog.go | 32 +++------ catalog/catalog_test.go | 105 ---------------------------- catalog/glue.go | 60 ++++++++-------- catalog/glue_test.go | 3 - {docs/cfn => cfn}/AWS_TESTING.md | 0 {docs/cfn => cfn}/glue-catalog.yaml | 0 table/table.go | 48 +++++++++---- table/table_test.go | 14 ++-- 8 files changed, 78 insertions(+), 184 deletions(-) rename {docs/cfn => cfn}/AWS_TESTING.md (100%) rename {docs/cfn => cfn}/glue-catalog.yaml (100%) diff --git a/catalog/catalog.go b/catalog/catalog.go index a6d7eab..7aa4a19 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -20,15 +20,15 @@ package catalog import ( "context" "crypto/tls" + "encoding/json" "errors" "fmt" "net/url" - "strings" "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" - "github.com/google/uuid" ) type CatalogType string @@ -198,32 +198,16 @@ func NamespaceFromIdent(ident table.Identifier) table.Identifier { return ident[:len(ident)-1] } -func getMetadataPath(locationPath string, newVersion int) (string, error) { - if newVersion < 0 { - return "", fmt.Errorf("invalid table version: %d must be a non-negative integer", newVersion) - } - - metaDataPath, err := url.JoinPath(strings.TrimLeft(locationPath, "/"), "metadata", fmt.Sprintf("%05d-%s.metadata.json", newVersion, uuid.New().String())) +func writeTableMetaData(iofs io.IO, metadataPath string, metadata table.Metadata) error { + data, err := json.Marshal(metadata) if err != nil { - return "", fmt.Errorf("failed to build metadata path: %w", err) - } - - return metaDataPath, nil -} - -func GetLocationForTable(location, defaultLocation, database, tableName string) (*url.URL, error) { - if location != "" { - return url.Parse(location) - } - - if defaultLocation == "" { - return nil, fmt.Errorf("no default path is set, please specify a location when creating a table") + return fmt.Errorf("failed to marshal table metadata: %w", err) } - u, err := url.Parse(defaultLocation) + err = iofs.WriteFile(metadataPath, data, 0644) if err != nil { - return nil, fmt.Errorf("failed to parse location URL: %w", err) + return fmt.Errorf("failed to write metadata file: %w", err) } - return u.JoinPath(fmt.Sprintf("%s.db", database), tableName), nil + return nil } diff --git a/catalog/catalog_test.go b/catalog/catalog_test.go index 9e19a3f..94989be 100644 --- a/catalog/catalog_test.go +++ b/catalog/catalog_test.go @@ -16,108 +16,3 @@ // under the License. package catalog - -import ( - "regexp" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestGetLocationForTable(t *testing.T) { - type args struct { - location string - defaultLocation string - database string - tableName string - } - - tests := []struct { - name string - args args - want string - wantErr bool - }{ - { - name: "should return location if location is provided", - args: args{ - location: "s3://new-bucket/test-table", - defaultLocation: "s3://test-bucket", - database: "test-database", - tableName: "test-table", - }, - want: "s3://new-bucket/test-table", - wantErr: false, - }, - { - name: "should return default location with generated path if location is not provided", - args: args{ - location: "", - defaultLocation: "s3://test-bucket/test-prefix/", - database: "test-database", - tableName: "test-table", - }, - want: "s3://test-bucket/test-prefix/test-database.db/test-table", - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := getLocationForTable(tt.args.location, tt.args.defaultLocation, tt.args.database, tt.args.tableName) - if (err != nil) != tt.wantErr { - t.Errorf("tableS3Location() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if got.String() != tt.want { - t.Errorf("tableS3Location() = %v, want %v", got, tt.want) - } - }) - } -} - -func TestGetMetadataPath(t *testing.T) { - type args struct { - locationPath string - version int - } - - tests := []struct { - name string - args args - want string - wantErr bool - }{ - { - name: "should return metadata location with version 0", - args: args{ - locationPath: "/test-table/", - version: 1, - }, - want: "^test-table/metadata/00001-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$", - wantErr: false, - }, - { - name: "should return metadata location with version 1", - args: args{ - locationPath: "/test-table/", - version: 0, - }, - want: "^test-table/metadata/00000-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$", - wantErr: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := getMetadataPath(tt.args.locationPath, tt.args.version) - if (err != nil) != tt.wantErr { - t.Errorf("getMetadataPath() error = %v, wantErr %v", err, tt.wantErr) - return - } - - require.Regexp(t, regexp.MustCompile(tt.want), got) - }) - } -} diff --git a/catalog/glue.go b/catalog/glue.go index 83f52f0..0114355 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -19,9 +19,10 @@ package catalog import ( "context" - "encoding/json" "errors" "fmt" + "net/url" + "strings" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/io" @@ -44,8 +45,7 @@ type glueAPI interface { } type GlueCatalog struct { - glueSvc glueAPI - defaultLocation string + glueSvc glueAPI } func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog { @@ -56,8 +56,7 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog { } return &GlueCatalog{ - glueSvc: glue.NewFromConfig(glueOps.awsConfig), - defaultLocation: glueOps.defaultLocation, + glueSvc: glue.NewFromConfig(glueOps.awsConfig), } } @@ -125,33 +124,49 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier return icebergTable, nil } +// CreateTable creates a new table in the catalog. +// +// The identifier should contain the Glue database name, then glue table name. +// The location should be the S3 prefix for the table, which will have the database name, table name, and metadata file appended to it. func (c *GlueCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder table.SortOrder, location string, props map[string]string) (*table.Table, error) { database, tableName, err := identifierToGlueTable(identifier) if err != nil { return nil, err } - locationURL, err := getLocationForTable(location, c.defaultLocation, database, tableName) + // s3://bucket/prefix/database.db/tablename + locationURL, err := url.Parse(location) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to parse location URL %s: %w", location, err) } - // append metadata path to location path/to/database.db/tablename/metadata/00000-UUID.metadata.json - metadataPath, err := getMetadataPath(locationURL.Path, 0) + // 00000-UUID.metadata.json + newManifest, err := table.GenerateMetadataFileName(0) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to generate metadata file name: %w", err) } - tbl, err := table.NewTableBuilder(identifier, schema, location).WithPartitionSpec(partitionSpec).WithSortOrder(sortOrder).WithProperties(props).Build() + // s3://bucket/prefix/database.db/tablename/manifest/00000-UUID.metadata.json + metadataURL := locationURL.JoinPath("metadata", newManifest) + + // prefix/database.db/tablename/manifest/00000-UUID.metadata.json + metadataLocation := strings.TrimPrefix(metadataURL.Path, "/") + + tbl, err := table.NewTableBuilder(identifier, schema, location, metadataLocation). + WithPartitionSpec(partitionSpec). + WithSortOrder(sortOrder). + WithProperties(props). + Build() if err != nil { return nil, err } - err = c.writeMetaData(locationURL.String(), metadataPath, tbl.Metadata()) + err = writeTableMetaData(tbl.FS(), tbl.MetadataLocation(), tbl.Metadata()) if err != nil { return nil, err } + // TODO: need to convert the schema to a glue schema and provide that to create table. params := &glue.CreateTableInput{ DatabaseName: aws.String(database), @@ -160,7 +175,7 @@ func (c *GlueCatalog) CreateTable(ctx context.Context, identifier table.Identifi TableType: aws.String("EXTERNAL_TABLE"), Parameters: map[string]string{ "table_type": glueTableTypeIceberg, - "metadata_location": fmt.Sprintf("%s://%s/%s", locationURL.Scheme, locationURL.Host, metadataPath), + "metadata_location": metadataURL.String(), }, StorageDescriptor: &types.StorageDescriptor{Location: aws.String(locationURL.String())}, }, @@ -229,25 +244,6 @@ func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string) return tblRes.Table.Parameters["metadata_location"], nil } -func (c *GlueCatalog) writeMetaData(location, metadataPath string, metadata table.Metadata) error { - iofs, err := io.LoadFS(map[string]string{}, location) - if err != nil { - return fmt.Errorf("failed to load fs: %w", err) - } - - data, err := json.Marshal(metadata) - if err != nil { - return fmt.Errorf("failed to marshal table metadata: %w", err) - } - - err = iofs.WriteFile(metadataPath, data, 0644) - if err != nil { - return fmt.Errorf("failed to write metadata file: %w", err) - } - - return nil -} - func identifierToGlueTable(identifier table.Identifier) (string, string, error) { if len(identifier) != 2 { return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier) diff --git a/catalog/glue_test.go b/catalog/glue_test.go index 8e87374..09b28c2 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -138,9 +138,6 @@ func TestGlueLoadTableIntegration(t *testing.T) { if os.Getenv("TEST_TABLE_NAME") == "" { t.Skip() } - if os.Getenv("TEST_TABLE_LOCATION") == "" { - t.Skip() - } assert := require.New(t) diff --git a/docs/cfn/AWS_TESTING.md b/cfn/AWS_TESTING.md similarity index 100% rename from docs/cfn/AWS_TESTING.md rename to cfn/AWS_TESTING.md diff --git a/docs/cfn/glue-catalog.yaml b/cfn/glue-catalog.yaml similarity index 100% rename from docs/cfn/glue-catalog.yaml rename to cfn/glue-catalog.yaml diff --git a/table/table.go b/table/table.go index a4874f0..af44a38 100644 --- a/table/table.go +++ b/table/table.go @@ -18,6 +18,7 @@ package table import ( + "fmt" "reflect" "github.com/apache/iceberg-go" @@ -98,24 +99,26 @@ func NewFromLocation(ident Identifier, metalocation string, fsys io.IO) (*Table, } type TableBuilder struct { - ident Identifier - schema *iceberg.Schema - partitionSpec iceberg.PartitionSpec - sortOrder SortOrder - location string - properties iceberg.Properties + ident Identifier + schema *iceberg.Schema + partitionSpec iceberg.PartitionSpec + sortOrder SortOrder + location string + metadataLocation string // this path is relative to the location + properties iceberg.Properties } // NewTableBuilder creates a new TableBuilder for building a Table. // // The ident, schema and location parameters are required to create a Table, others // can be specified with the corresponding builder methods. -func NewTableBuilder(ident Identifier, schema *iceberg.Schema, location string) *TableBuilder { +func NewTableBuilder(ident Identifier, schema *iceberg.Schema, location, metadataLocation string) *TableBuilder { return &TableBuilder{ - ident: ident, - schema: schema, - location: location, - properties: make(iceberg.Properties), + ident: ident, + schema: schema, + location: location, + metadataLocation: metadataLocation, + properties: make(iceberg.Properties), } } @@ -146,14 +149,33 @@ func (b *TableBuilder) Build() (*Table, error) { return nil, err } + // location = s3:/// + fs, err := io.LoadFS(map[string]string{}, b.location) + if err != nil { + return nil, fmt.Errorf("failed to load fs: %w", err) + } + return &Table{ identifier: b.ident, metadata: metadata, - metadataLocation: b.location, - fs: nil, + metadataLocation: b.metadataLocation, + fs: fs, }, nil } +// GenerateMetadataFileName generates a filename for a table metadata file based on the provided table version. +// The filename is in the format "-.metadata.json", where the V is a 5-digit zero-padded non-negative integer +// and the UUID is a randomly generated UUID string. +// +// If the provided version is negative, an error is returned. +func GenerateMetadataFileName(newVersion int) (string, error) { + if newVersion < 0 { + return "", fmt.Errorf("invalid table version: %d must be a non-negative integer", newVersion) + } + + return fmt.Sprintf("%05d-%s.metadata.json", newVersion, uuid.New().String()), nil +} + func intToPtr(i int) *int { return &i } diff --git a/table/table_test.go b/table/table_test.go index fd8ddb4..6c47071 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -19,8 +19,6 @@ package table_test import ( "bytes" - "encoding/json" - "fmt" "testing" "github.com/apache/iceberg-go" @@ -147,15 +145,17 @@ func (t *TableTestSuite) TestNewTable() { SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}}) location := "s3://bucket/test/location" - tbl, err := table.NewTableBuilder(identifier, schema, location). + metadataLocation := "metadata/00001-00000-00000-00000-00000.metadata.json" + tbl, err := table.NewTableBuilder(identifier, schema, location, metadataLocation). WithPartitionSpec(partSpec). WithSortOrder(table.UnsortedSortOrder). Build() t.Require().NoError(err) t.Require().Equal(identifier, tbl.Identifier()) - - data, err := json.Marshal(tbl.Metadata()) - t.Require().NoError(err) - fmt.Println(string(data)) + t.Require().Equal(schema, tbl.Schema()) + t.Require().Equal(partSpec, tbl.Spec()) + t.Require().Equal(table.UnsortedSortOrder, tbl.SortOrder()) + t.Require().Equal(location, tbl.Location()) + t.Require().Equal(metadataLocation, tbl.MetadataLocation()) } From 04956878001b400bd92f7c57282ed191d9e18692 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Sun, 9 Jun 2024 13:19:06 +1000 Subject: [PATCH 4/4] fix: update go mod changes --- go.mod | 8 ++++---- go.sum | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 612b8ea..800fd60 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.27.16 github.com/aws/aws-sdk-go-v2/credentials v1.17.16 github.com/aws/aws-sdk-go-v2/service/glue v1.73.1 - github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1 + github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0 github.com/aws/smithy-go v1.20.2 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 github.com/google/uuid v1.6.0 @@ -47,9 +47,9 @@ require ( github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.20.9 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.28.10 // indirect @@ -74,7 +74,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.19.0 // indirect + golang.org/x/sys v0.20.0 // indirect golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.20.0 // indirect diff --git a/go.sum b/go.sum index 1fe6b80..e411441 100644 --- a/go.sum +++ b/go.sum @@ -40,14 +40,14 @@ github.com/aws/aws-sdk-go-v2/service/glue v1.73.1 h1:z/NBYW8RygzWrDgNWib10fuLUBl github.com/aws/aws-sdk-go-v2/service/glue v1.73.1/go.mod h1:F3B9DC5FsIHAxUtHZdY5KUeqN+tHoGlRPzSSYdXjC38= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 h1:Ji0DY1xUsUr3I8cHps0G+XM3WWU16lP6yG8qu1GAZAs= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2/go.mod h1:5CsjAbs3NlGQyZNFACh+zztPDI7fU6eW9QsxjfnuBKg= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 h1:L0ai8WICYHozIKK+OtPzVJBugL7culcuM4E4JOpIEm8= -github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10/go.mod h1:byqfyxJBshFk0fF9YmK0M0ugIO8OWjzH2T3bPG4eGuA= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9 h1:UXqEWQI0n+q0QixzU0yUUQBZXRd5037qdInTIHFTl98= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9/go.mod h1:xP6Gq6fzGZT8w/ZN+XvGMZ2RU1LeEs7b2yUP5DN8NY4= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 h1:Wx0rlZoEJR7JwlSZcHnEa7CNjrSIyVxMFWGAaXy4fJY= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9/go.mod h1:aVMHdE0aHO3v+f/iw01fmXV/5DbfQ3Bi9nN7nd9bE9Y= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10 h1:KOxnQeWy5sXyS37fdKEvAsGHOr9fa/qvwxfJurR/BzE= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.16.10/go.mod h1:jMx5INQFYFYB3lQD9W0D8Ohgq6Wnl7NYOJ2TQndbulI= -github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1 h1:5XNlsBsEvBZBMO6p82y+sqpWg8j5aBCe+5C2GBFgqBQ= -github.com/aws/aws-sdk-go-v2/service/s3 v1.48.1/go.mod h1:4qXHrG1Ne3VGIMZPCB8OjH/pLFO94sKABIusjh0KWPU= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7 h1:uO5XR6QGBcmPyo2gxofYJLFkcVQ4izOoGDNenlZhTEk= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7/go.mod h1:feeeAYfAcwTReM6vbwjEyDmiGho+YgBhaFULuXDW8kc= +github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0 h1:PJTdBMsyvra6FtED7JZtDpQrIAflYDHFoZAu/sKYkwU= +github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0/go.mod h1:4qXHrG1Ne3VGIMZPCB8OjH/pLFO94sKABIusjh0KWPU= github.com/aws/aws-sdk-go-v2/service/sso v1.20.9 h1:aD7AGQhvPuAxlSUfo0CWU7s6FpkbyykMhGYMvlqTjVs= github.com/aws/aws-sdk-go-v2/service/sso v1.20.9/go.mod h1:c1qtZUWtygI6ZdvKppzCSXsDOq5I4luJPZ0Ud3juFCA= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.3 h1:Pav5q3cA260Zqez42T9UhIlsd9QeypszRPwC9LdSSsQ= @@ -169,8 +169,8 @@ golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=