diff --git a/catalog/catalog.go b/catalog/catalog.go index d6d7f1e..7aa4a19 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -20,10 +20,13 @@ package catalog import ( "context" "crypto/tls" + "encoding/json" "errors" + "fmt" "net/url" "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" ) @@ -52,6 +55,14 @@ func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] { } } +// WithDefaultLocation sets the default location for the catalog, this is used +// when a location is not provided in the create table operation. +func WithDefaultLocation(location string) Option[GlueCatalog] { + return func(o *options) { + o.defaultLocation = location + } +} + func WithCredential(cred string) Option[RestCatalog] { return func(o *options) { o.credential = cred @@ -117,7 +128,8 @@ func WithPrefix(prefix string) Option[RestCatalog] { type Option[T GlueCatalog | RestCatalog] func(*options) type options struct { - awsConfig aws.Config + awsConfig aws.Config + defaultLocation string tlsConfig *tls.Config credential string @@ -185,3 +197,17 @@ func TableNameFromIdent(ident table.Identifier) string { func NamespaceFromIdent(ident table.Identifier) table.Identifier { return ident[:len(ident)-1] } + +func writeTableMetaData(iofs io.IO, metadataPath string, metadata table.Metadata) error { + data, err := json.Marshal(metadata) + if err != nil { + return fmt.Errorf("failed to marshal table metadata: %w", err) + } + + err = iofs.WriteFile(metadataPath, data, 0644) + if err != nil { + return fmt.Errorf("failed to write metadata file: %w", err) + } + + return nil +} diff --git a/catalog/catalog_test.go b/catalog/catalog_test.go new file mode 100644 index 0000000..94989be --- /dev/null +++ b/catalog/catalog_test.go @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package catalog diff --git a/catalog/glue.go b/catalog/glue.go index b398185..0114355 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -21,6 +21,8 @@ import ( "context" "errors" "fmt" + "net/url" + "strings" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/io" @@ -39,6 +41,7 @@ var ( type glueAPI interface { GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error) GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error) + CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) } type GlueCatalog struct { @@ -121,6 +124,71 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier return icebergTable, nil } +// CreateTable creates a new table in the catalog. +// +// The identifier should contain the Glue database name, then glue table name. +// The location should be the S3 prefix for the table, which will have the database name, table name, and metadata file appended to it. +func (c *GlueCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder table.SortOrder, location string, props map[string]string) (*table.Table, error) { + database, tableName, err := identifierToGlueTable(identifier) + if err != nil { + return nil, err + } + + // s3://bucket/prefix/database.db/tablename + locationURL, err := url.Parse(location) + if err != nil { + return nil, fmt.Errorf("failed to parse location URL %s: %w", location, err) + } + + // 00000-UUID.metadata.json + newManifest, err := table.GenerateMetadataFileName(0) + if err != nil { + return nil, fmt.Errorf("failed to generate metadata file name: %w", err) + } + + // s3://bucket/prefix/database.db/tablename/manifest/00000-UUID.metadata.json + metadataURL := locationURL.JoinPath("metadata", newManifest) + + // prefix/database.db/tablename/manifest/00000-UUID.metadata.json + metadataLocation := strings.TrimPrefix(metadataURL.Path, "/") + + tbl, err := table.NewTableBuilder(identifier, schema, location, metadataLocation). + WithPartitionSpec(partitionSpec). + WithSortOrder(sortOrder). + WithProperties(props). + Build() + if err != nil { + return nil, err + } + + err = writeTableMetaData(tbl.FS(), tbl.MetadataLocation(), tbl.Metadata()) + if err != nil { + return nil, err + } + + // TODO: need to convert the schema to a glue schema and provide that to create table. + params := &glue.CreateTableInput{ + DatabaseName: aws.String(database), + + TableInput: &types.TableInput{ + Name: aws.String(tableName), + TableType: aws.String("EXTERNAL_TABLE"), + Parameters: map[string]string{ + "table_type": glueTableTypeIceberg, + "metadata_location": metadataURL.String(), + }, + StorageDescriptor: &types.StorageDescriptor{Location: aws.String(locationURL.String())}, + }, + } + + _, err = c.glueSvc.CreateTable(ctx, params) + if err != nil { + return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err) + } + + return tbl, nil +} + func (c *GlueCatalog) CatalogType() CatalogType { return Glue } diff --git a/catalog/glue_test.go b/catalog/glue_test.go index 1d3c42f..09b28c2 100644 --- a/catalog/glue_test.go +++ b/catalog/glue_test.go @@ -22,6 +22,8 @@ import ( "os" "testing" + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/glue" @@ -44,6 +46,11 @@ func (m *mockGlueClient) GetTables(ctx context.Context, params *glue.GetTablesIn return args.Get(0).(*glue.GetTablesOutput), args.Error(1) } +func (m *mockGlueClient) CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) { + args := m.Called(ctx, params, optFns) + return args.Get(0).(*glue.CreateTableOutput), args.Error(1) +} + func TestGlueGetTable(t *testing.T) { assert := require.New(t) @@ -131,9 +138,6 @@ func TestGlueLoadTableIntegration(t *testing.T) { if os.Getenv("TEST_TABLE_NAME") == "" { t.Skip() } - if os.Getenv("TEST_TABLE_LOCATION") == "" { - t.Skip() - } assert := require.New(t) @@ -146,3 +150,44 @@ func TestGlueLoadTableIntegration(t *testing.T) { assert.NoError(err) assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier()) } + +func TestGlueCreateTableIntegration(t *testing.T) { + if os.Getenv("TEST_DATABASE_NAME") == "" { + t.Skip() + } + if os.Getenv("TEST_CREATE_TABLE_NAME") == "" { + t.Skip() + } + if os.Getenv("TEST_CREATE_TABLE_LOCATION") == "" { + t.Skip() + } + + assert := require.New(t) + + location := os.Getenv("TEST_CREATE_TABLE_LOCATION") + + schema := iceberg.NewSchemaWithIdentifiers(1, []int{}, + iceberg.NestedField{ + ID: 1, Name: "vendor_id", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ + ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ + ID: 3, Name: "datetime", Type: iceberg.PrimitiveTypes.TimestampTz}) + partSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}}) + + props := map[string]string{ + "write.target-file-size-bytes": "536870912", + "write.format.default": "parquet", + } + + awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse)) + assert.NoError(err) + + catalog := NewGlueCatalog(WithAwsConfig(awscfg)) + + table, err := catalog.CreateTable(context.TODO(), + []string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, schema, partSpec, table.UnsortedSortOrder, location, props) + assert.NoError(err) + assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, table.Identifier()) +} diff --git a/cfn/AWS_TESTING.md b/cfn/AWS_TESTING.md new file mode 100644 index 0000000..ad71f0a --- /dev/null +++ b/cfn/AWS_TESTING.md @@ -0,0 +1,74 @@ + + +# AWS integration testing + +To validate the glue catalog you will need to create some test resources. + +# Prerequisites + +1. An AWS account. +2. [AWS CLI](https://aws.amazon.com/cli/) is installed. +2. Exported environment variables for `AWS_DEFAULT_REGION`, `AWS_REGION` and `AWS_PROFILE`, I use [direnv](https://direnv.net/) to maintain these variables in a `.envrc` file. +3. Your have logged into an AWS account via the AWS CLI. + +The way to deploy this template is using the included cloudformation template is as follows: + +``` +aws cloudformation deploy --stack-name test-iceberg-glue-catalog --template-file docs/cfn/glue-catalog.yaml +``` + +Once deployed you can retrieve the outputs of the stack. + +``` +aws cloudformation describe-stacks --stack-name test-iceberg-glue-catalog --query 'Stacks[0].Outputs' +``` + +This should output JSON as follows: + +``` +[ + { + "OutputKey": "IcebergBucket", + "OutputValue": "test-iceberg-glue-catalog-icebergbucket-abc123abc123" + }, + { + "OutputKey": "GlueDatabase", + "OutputValue": "iceberg_test" + } +] +``` + +Export the required environment variables. + +``` +# the glue database from the outputs of the stack +export TEST_DATABASE_NAME=iceberg_test + +# the s3 bucket name from the outputs of the stack +export TEST_CREATE_TABLE_LOCATION=s3://test-iceberg-glue-catalog-icebergbucket-abc123abc123/testing + +# the name of the table you will create in the glue catalog +export TEST_CREATE_TABLE_NAME=records +``` + +Run the creation integration test to validate the catalog creation, and provide a table which can be used to validate other integration tests. + +``` +go test -v -run TestGlueCreateTableIntegration ./catalog +``` + diff --git a/cfn/glue-catalog.yaml b/cfn/glue-catalog.yaml new file mode 100644 index 0000000..b058542 --- /dev/null +++ b/cfn/glue-catalog.yaml @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +AWSTemplateFormatVersion: "2010-09-09" +Description: "apache-iceberg: Glue database and S3 bucket for integration testing" + +Parameters: + Stage: + Type: String + Description: The stage where the stack is running in, e.g., dev, test, prod. + Default: test + +Outputs: + GlueDatabase: + Value: !Ref GlueDatabase + IcebergBucket: + Value: !Ref IcebergBucket + +Resources: + GlueDatabase: + Type: AWS::Glue::Database + Properties: + CatalogId: !Ref AWS::AccountId + DatabaseInput: + Name: !Sub iceberg_${Stage} + Description: iceberg database + + IcebergBucket: + Type: AWS::S3::Bucket + Properties: + BucketEncryption: + ServerSideEncryptionConfiguration: + - ServerSideEncryptionByDefault: + SSEAlgorithm: AES256 + PublicAccessBlockConfiguration: + BlockPublicAcls: true + BlockPublicPolicy: true + IgnorePublicAcls: true + RestrictPublicBuckets: true + + IcebergBucketPolicy: + Type: AWS::S3::BucketPolicy + Properties: + Bucket: !Ref IcebergBucket + PolicyDocument: + Statement: + - Sid: AllowSSLRequestsOnly + Effect: Deny + Principal: "*" + Action: + - s3:* + Resource: + - Fn::Sub: arn:aws:s3:::${IcebergBucket}/* + - Fn::Sub: arn:aws:s3:::${IcebergBucket} + Condition: + Bool: + aws:SecureTransport: false diff --git a/internal/mock_fs.go b/internal/mock_fs.go index 95f6c3f..069d4ce 100644 --- a/internal/mock_fs.go +++ b/internal/mock_fs.go @@ -39,6 +39,10 @@ func (m *MockFS) Remove(name string) error { return m.Called(name).Error(0) } +func (m *MockFS) WriteFile(name string, data []byte, perm fs.FileMode) error { + return m.Called(name, data, perm).Error(0) +} + type MockFSReadFile struct { MockFS } diff --git a/io/io.go b/io/io.go index abe5971..7700570 100644 --- a/io/io.go +++ b/io/io.go @@ -48,6 +48,9 @@ type IO interface { // // If there is an error, it will be of type *PathError. Remove(name string) error + + // Create creates or truncates the named file. + WriteFile(name string, data []byte, perm fs.FileMode) error } // ReadFileIO is the interface implemented by a file system that @@ -166,12 +169,23 @@ func (f ioFS) Remove(name string) error { return r.Remove(name) } +func (f ioFS) WriteFile(name string, data []byte, perm fs.FileMode) error { + r, ok := f.fsys.(interface { + WriteFile(name string, data []byte, perm fs.FileMode) error + }) + if !ok { + return errMissingWriteFile + } + return r.WriteFile(name, data, perm) +} + var ( - errMissingReadDir = errors.New("fs.File directory missing ReadDir method") - errMissingSeek = errors.New("fs.File missing Seek method") - errMissingReadAt = errors.New("fs.File missing ReadAt") - errMissingRemove = errors.New("fs.FS missing Remove method") - errMissingReadFile = errors.New("fs.FS missing ReadFile method") + errMissingReadDir = errors.New("fs.File directory missing ReadDir method") + errMissingSeek = errors.New("fs.File missing Seek method") + errMissingReadAt = errors.New("fs.File missing ReadAt") + errMissingRemove = errors.New("fs.FS missing Remove method") + errMissingWriteFile = errors.New("fs.FS missing WriteFile method") + errMissingReadFile = errors.New("fs.FS missing ReadFile method") ) type ioFile struct { diff --git a/io/local.go b/io/local.go index befa831..40d0f4a 100644 --- a/io/local.go +++ b/io/local.go @@ -30,3 +30,7 @@ func (LocalFS) Open(name string) (File, error) { func (LocalFS) Remove(name string) error { return os.Remove(name) } + +func (LocalFS) WriteFile(name string, data []byte, perm os.FileMode) error { + return os.WriteFile(name, data, perm) +} diff --git a/table/metadata.go b/table/metadata.go index 957e163..c92b4cc 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "io" + "time" "github.com/apache/iceberg-go" @@ -399,3 +400,32 @@ func (m *MetadataV2) UnmarshalJSON(b []byte) error { m.preValidate() return m.validate() } + +func NewMetadataV2(schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder SortOrder, location string, tableUUID uuid.UUID, properties iceberg.Properties) (*MetadataV2, error) { + metadata := &MetadataV2{ + commonMetadata: commonMetadata{ + FormatVersion: 2, + UUID: tableUUID, + Loc: location, + Specs: []iceberg.PartitionSpec{partitionSpec}, + DefaultSpecID: partitionSpec.ID(), + SortOrderList: []SortOrder{sortOrder}, + DefaultSortOrderID: sortOrder.OrderID, + SchemaList: []*iceberg.Schema{schema}, + CurrentSchemaID: schema.ID, + Props: properties, + LastColumnId: schema.HighestFieldID(), + LastPartitionID: intToPtr(partitionSpec.LastAssignedFieldID()), + LastUpdatedMS: time.Now().UnixMilli(), + }, + } + + metadata.preValidate() + + err := metadata.validate() + if err != nil { + return nil, fmt.Errorf("invalid metadata: %w", err) + } + + return metadata, nil +} diff --git a/table/table.go b/table/table.go index 80b68b3..af44a38 100644 --- a/table/table.go +++ b/table/table.go @@ -18,10 +18,12 @@ package table import ( + "fmt" "reflect" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/io" + "github.com/google/uuid" "golang.org/x/exp/slices" ) @@ -95,3 +97,85 @@ func NewFromLocation(ident Identifier, metalocation string, fsys io.IO) (*Table, } return New(ident, meta, metalocation, fsys), nil } + +type TableBuilder struct { + ident Identifier + schema *iceberg.Schema + partitionSpec iceberg.PartitionSpec + sortOrder SortOrder + location string + metadataLocation string // this path is relative to the location + properties iceberg.Properties +} + +// NewTableBuilder creates a new TableBuilder for building a Table. +// +// The ident, schema and location parameters are required to create a Table, others +// can be specified with the corresponding builder methods. +func NewTableBuilder(ident Identifier, schema *iceberg.Schema, location, metadataLocation string) *TableBuilder { + return &TableBuilder{ + ident: ident, + schema: schema, + location: location, + metadataLocation: metadataLocation, + properties: make(iceberg.Properties), + } +} + +// WithPartitionSpec sets the partition spec for the table. The partition spec defines how data is partitioned in the table. +func (b *TableBuilder) WithPartitionSpec(spec iceberg.PartitionSpec) *TableBuilder { + b.partitionSpec = spec + return b +} + +// WithSortOrder sets the sort order for the table. The sort order defines how data is sorted in the table. +func (b *TableBuilder) WithSortOrder(sortOrder SortOrder) *TableBuilder { + b.sortOrder = sortOrder + return b +} + +func (b *TableBuilder) WithProperties(properties iceberg.Properties) *TableBuilder { + b.properties = properties + return b +} + +func (b *TableBuilder) Build() (*Table, error) { + tableUUID := uuid.New() + + // TODO: we need to "freshen" the sequences in the schema, partition spec, and sort order + + metadata, err := NewMetadataV2(b.schema, b.partitionSpec, b.sortOrder, b.location, tableUUID, b.properties) + if err != nil { + return nil, err + } + + // location = s3:/// + fs, err := io.LoadFS(map[string]string{}, b.location) + if err != nil { + return nil, fmt.Errorf("failed to load fs: %w", err) + } + + return &Table{ + identifier: b.ident, + metadata: metadata, + metadataLocation: b.metadataLocation, + fs: fs, + }, nil +} + +// GenerateMetadataFileName generates a filename for a table metadata file based on the provided table version. +// The filename is in the format "-.metadata.json", where the V is a 5-digit zero-padded non-negative integer +// and the UUID is a randomly generated UUID string. +// +// If the provided version is negative, an error is returned. +func GenerateMetadataFileName(newVersion int) (string, error) { + if newVersion < 0 { + return "", fmt.Errorf("invalid table version: %d must be a non-negative integer", newVersion) + } + + return fmt.Sprintf("%05d-%s.metadata.json", newVersion, uuid.New().String()), nil +} + +func intToPtr(i int) *int { + return &i +} diff --git a/table/table_test.go b/table/table_test.go index cde94ab..6c47071 100644 --- a/table/table_test.go +++ b/table/table_test.go @@ -128,3 +128,34 @@ func (t *TableTestSuite) TestSnapshotByName() { t.True(testSnapshot.Equals(*t.tbl.SnapshotByName("test"))) } + +func (t *TableTestSuite) TestNewTable() { + + identifier := table.Identifier{"records"} + + schema := iceberg.NewSchemaWithIdentifiers(1, []int{1}, + iceberg.NestedField{ + ID: 1, Name: "vendor_id", Type: iceberg.PrimitiveTypes.Int64, Required: true}, + iceberg.NestedField{ + ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ + ID: 3, Name: "datetime", Type: iceberg.PrimitiveTypes.TimestampTz}) + + partSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}}) + + location := "s3://bucket/test/location" + metadataLocation := "metadata/00001-00000-00000-00000-00000.metadata.json" + tbl, err := table.NewTableBuilder(identifier, schema, location, metadataLocation). + WithPartitionSpec(partSpec). + WithSortOrder(table.UnsortedSortOrder). + Build() + + t.Require().NoError(err) + t.Require().Equal(identifier, tbl.Identifier()) + t.Require().Equal(schema, tbl.Schema()) + t.Require().Equal(partSpec, tbl.Spec()) + t.Require().Equal(table.UnsortedSortOrder, tbl.SortOrder()) + t.Require().Equal(location, tbl.Location()) + t.Require().Equal(metadataLocation, tbl.MetadataLocation()) +}