Skip to content

Commit 6cb048b

Browse files
committed
feat: glue table creation with some docs on testing
1 parent 5db83a0 commit 6cb048b

File tree

14 files changed

+554
-18
lines changed

14 files changed

+554
-18
lines changed

catalog/catalog.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@ import (
2121
"context"
2222
"crypto/tls"
2323
"errors"
24+
"fmt"
2425
"net/url"
26+
"strings"
2527

2628
"github.com/apache/iceberg-go"
2729
"github.com/apache/iceberg-go/table"
2830
"github.com/aws/aws-sdk-go-v2/aws"
31+
"github.com/google/uuid"
2932
)
3033

3134
type CatalogType string
@@ -52,6 +55,14 @@ func WithAwsConfig(cfg aws.Config) Option[GlueCatalog] {
5255
}
5356
}
5457

58+
// WithDefaultLocation sets the default location for the catalog, this is used
59+
// when a location is not provided in the create table operation.
60+
func WithDefaultLocation(location string) Option[GlueCatalog] {
61+
return func(o *options) {
62+
o.defaultLocation = location
63+
}
64+
}
65+
5566
func WithCredential(cred string) Option[RestCatalog] {
5667
return func(o *options) {
5768
o.credential = cred
@@ -117,7 +128,8 @@ func WithPrefix(prefix string) Option[RestCatalog] {
117128
type Option[T GlueCatalog | RestCatalog] func(*options)
118129

119130
type options struct {
120-
awsConfig aws.Config
131+
awsConfig aws.Config
132+
defaultLocation string
121133

122134
tlsConfig *tls.Config
123135
credential string
@@ -185,3 +197,33 @@ func TableNameFromIdent(ident table.Identifier) string {
185197
func NamespaceFromIdent(ident table.Identifier) table.Identifier {
186198
return ident[:len(ident)-1]
187199
}
200+
201+
func getMetadataPath(locationPath string, newVersion int) (string, error) {
202+
if newVersion < 0 {
203+
return "", fmt.Errorf("invalid table version: %d must be a non-negative integer", newVersion)
204+
}
205+
206+
metaDataPath, err := url.JoinPath(strings.TrimLeft(locationPath, "/"), "metadata", fmt.Sprintf("%05d-%s.metadata.json", newVersion, uuid.New().String()))
207+
if err != nil {
208+
return "", fmt.Errorf("failed to build metadata path: %w", err)
209+
}
210+
211+
return metaDataPath, nil
212+
}
213+
214+
func getLocationForTable(location, defaultLocation, database, tableName string) (*url.URL, error) {
215+
if location != "" {
216+
return url.Parse(location)
217+
}
218+
219+
if defaultLocation == "" {
220+
return nil, fmt.Errorf("no default path is set, please specify a location when creating a table")
221+
}
222+
223+
u, err := url.Parse(defaultLocation)
224+
if err != nil {
225+
return nil, fmt.Errorf("failed to parse location URL: %w", err)
226+
}
227+
228+
return u.JoinPath(fmt.Sprintf("%s.db", database), tableName), nil
229+
}

catalog/catalog_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package catalog
19+
20+
import (
21+
"regexp"
22+
"testing"
23+
24+
"github.com/stretchr/testify/require"
25+
)
26+
27+
func TestGetLocationForTable(t *testing.T) {
28+
type args struct {
29+
location string
30+
defaultLocation string
31+
database string
32+
tableName string
33+
}
34+
35+
tests := []struct {
36+
name string
37+
args args
38+
want string
39+
wantErr bool
40+
}{
41+
{
42+
name: "should return location if location is provided",
43+
args: args{
44+
location: "s3://new-bucket/test-table",
45+
defaultLocation: "s3://test-bucket",
46+
database: "test-database",
47+
tableName: "test-table",
48+
},
49+
want: "s3://new-bucket/test-table",
50+
wantErr: false,
51+
},
52+
{
53+
name: "should return default location with generated path if location is not provided",
54+
args: args{
55+
location: "",
56+
defaultLocation: "s3://test-bucket/test-prefix/",
57+
database: "test-database",
58+
tableName: "test-table",
59+
},
60+
want: "s3://test-bucket/test-prefix/test-database.db/test-table",
61+
wantErr: false,
62+
},
63+
}
64+
65+
for _, tt := range tests {
66+
t.Run(tt.name, func(t *testing.T) {
67+
got, err := getLocationForTable(tt.args.location, tt.args.defaultLocation, tt.args.database, tt.args.tableName)
68+
if (err != nil) != tt.wantErr {
69+
t.Errorf("tableS3Location() error = %v, wantErr %v", err, tt.wantErr)
70+
return
71+
}
72+
73+
if got.String() != tt.want {
74+
t.Errorf("tableS3Location() = %v, want %v", got, tt.want)
75+
}
76+
})
77+
}
78+
}
79+
80+
func TestGetMetadataPath(t *testing.T) {
81+
type args struct {
82+
locationPath string
83+
version int
84+
}
85+
86+
tests := []struct {
87+
name string
88+
args args
89+
want string
90+
wantErr bool
91+
}{
92+
{
93+
name: "should return metadata location with version 0",
94+
args: args{
95+
locationPath: "/test-table/",
96+
version: 1,
97+
},
98+
want: "^test-table/metadata/00001-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$",
99+
wantErr: false,
100+
},
101+
{
102+
name: "should return metadata location with version 1",
103+
args: args{
104+
locationPath: "/test-table/",
105+
version: 0,
106+
},
107+
want: "^test-table/metadata/00000-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}.metadata.json$",
108+
wantErr: false,
109+
},
110+
}
111+
112+
for _, tt := range tests {
113+
t.Run(tt.name, func(t *testing.T) {
114+
got, err := getMetadataPath(tt.args.locationPath, tt.args.version)
115+
if (err != nil) != tt.wantErr {
116+
t.Errorf("getMetadataPath() error = %v, wantErr %v", err, tt.wantErr)
117+
return
118+
}
119+
120+
require.Regexp(t, regexp.MustCompile(tt.want), got)
121+
})
122+
}
123+
}

catalog/glue.go

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package catalog
1919

2020
import (
2121
"context"
22+
"encoding/json"
2223
"errors"
2324
"fmt"
2425

@@ -39,10 +40,12 @@ var (
3940
type glueAPI interface {
4041
GetTable(ctx context.Context, params *glue.GetTableInput, optFns ...func(*glue.Options)) (*glue.GetTableOutput, error)
4142
GetTables(ctx context.Context, params *glue.GetTablesInput, optFns ...func(*glue.Options)) (*glue.GetTablesOutput, error)
43+
CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error)
4244
}
4345

4446
type GlueCatalog struct {
45-
glueSvc glueAPI
47+
glueSvc glueAPI
48+
defaultLocation string
4649
}
4750

4851
func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog {
@@ -53,7 +56,8 @@ func NewGlueCatalog(opts ...Option[GlueCatalog]) *GlueCatalog {
5356
}
5457

5558
return &GlueCatalog{
56-
glueSvc: glue.NewFromConfig(glueOps.awsConfig),
59+
glueSvc: glue.NewFromConfig(glueOps.awsConfig),
60+
defaultLocation: glueOps.defaultLocation,
5761
}
5862
}
5963

@@ -121,6 +125,55 @@ func (c *GlueCatalog) LoadTable(ctx context.Context, identifier table.Identifier
121125
return icebergTable, nil
122126
}
123127

128+
func (c *GlueCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, partitionSpec iceberg.PartitionSpec, sortOrder table.SortOrder, location string, props map[string]string) (*table.Table, error) {
129+
database, tableName, err := identifierToGlueTable(identifier)
130+
if err != nil {
131+
return nil, err
132+
}
133+
134+
locationURL, err := getLocationForTable(location, c.defaultLocation, database, tableName)
135+
if err != nil {
136+
return nil, err
137+
}
138+
139+
// append metadata path to location path/to/database.db/tablename/metadata/00000-UUID.metadata.json
140+
metadataPath, err := getMetadataPath(locationURL.Path, 0)
141+
if err != nil {
142+
return nil, err
143+
}
144+
145+
tbl, err := table.NewTable(identifier, schema, partitionSpec, sortOrder, locationURL.String(), props)
146+
if err != nil {
147+
return nil, err
148+
}
149+
150+
err = c.writeMetaData(locationURL.String(), metadataPath, tbl.Metadata())
151+
if err != nil {
152+
return nil, err
153+
}
154+
155+
params := &glue.CreateTableInput{
156+
DatabaseName: aws.String(database),
157+
158+
TableInput: &types.TableInput{
159+
Name: aws.String(tableName),
160+
TableType: aws.String("EXTERNAL_TABLE"),
161+
Parameters: map[string]string{
162+
"table_type": glueTableTypeIceberg,
163+
"metadata_location": fmt.Sprintf("%s://%s/%s", locationURL.Scheme, locationURL.Host, metadataPath),
164+
},
165+
StorageDescriptor: &types.StorageDescriptor{Location: aws.String(locationURL.String())},
166+
},
167+
}
168+
169+
_, err = c.glueSvc.CreateTable(ctx, params)
170+
if err != nil {
171+
return nil, fmt.Errorf("failed to create table %s.%s: %w", database, tableName, err)
172+
}
173+
174+
return tbl, nil
175+
}
176+
124177
func (c *GlueCatalog) CatalogType() CatalogType {
125178
return Glue
126179
}
@@ -176,6 +229,25 @@ func (c *GlueCatalog) getTable(ctx context.Context, database, tableName string)
176229
return tblRes.Table.Parameters["metadata_location"], nil
177230
}
178231

232+
func (c *GlueCatalog) writeMetaData(location, metadataPath string, metadata table.Metadata) error {
233+
iofs, err := io.LoadFS(map[string]string{}, location)
234+
if err != nil {
235+
return fmt.Errorf("failed to load fs: %w", err)
236+
}
237+
238+
data, err := json.Marshal(metadata)
239+
if err != nil {
240+
return fmt.Errorf("failed to marshal table metadata: %w", err)
241+
}
242+
243+
err = iofs.WriteFile(metadataPath, data, 0644)
244+
if err != nil {
245+
return fmt.Errorf("failed to write metadata file: %w", err)
246+
}
247+
248+
return nil
249+
}
250+
179251
func identifierToGlueTable(identifier table.Identifier) (string, string, error) {
180252
if len(identifier) != 2 {
181253
return "", "", fmt.Errorf("invalid identifier, missing database name: %v", identifier)

catalog/glue_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"os"
2323
"testing"
2424

25+
"github.com/apache/iceberg-go"
26+
"github.com/apache/iceberg-go/table"
2527
"github.com/aws/aws-sdk-go-v2/aws"
2628
"github.com/aws/aws-sdk-go-v2/config"
2729
"github.com/aws/aws-sdk-go-v2/service/glue"
@@ -44,6 +46,11 @@ func (m *mockGlueClient) GetTables(ctx context.Context, params *glue.GetTablesIn
4446
return args.Get(0).(*glue.GetTablesOutput), args.Error(1)
4547
}
4648

49+
func (m *mockGlueClient) CreateTable(ctx context.Context, params *glue.CreateTableInput, optFns ...func(*glue.Options)) (*glue.CreateTableOutput, error) {
50+
args := m.Called(ctx, params, optFns)
51+
return args.Get(0).(*glue.CreateTableOutput), args.Error(1)
52+
}
53+
4754
func TestGlueGetTable(t *testing.T) {
4855
assert := require.New(t)
4956

@@ -146,3 +153,44 @@ func TestGlueLoadTableIntegration(t *testing.T) {
146153
assert.NoError(err)
147154
assert.Equal([]string{os.Getenv("TEST_TABLE_NAME")}, table.Identifier())
148155
}
156+
157+
func TestGlueCreateTableIntegration(t *testing.T) {
158+
if os.Getenv("TEST_DATABASE_NAME") == "" {
159+
t.Skip()
160+
}
161+
if os.Getenv("TEST_CREATE_TABLE_NAME") == "" {
162+
t.Skip()
163+
}
164+
if os.Getenv("TEST_CREATE_TABLE_LOCATION") == "" {
165+
t.Skip()
166+
}
167+
168+
assert := require.New(t)
169+
170+
location := os.Getenv("TEST_CREATE_TABLE_LOCATION")
171+
172+
schema := iceberg.NewSchemaWithIdentifiers(1, []int{},
173+
iceberg.NestedField{
174+
ID: 1, Name: "vendor_id", Type: iceberg.PrimitiveTypes.String},
175+
iceberg.NestedField{
176+
ID: 2, Name: "name", Type: iceberg.PrimitiveTypes.String},
177+
iceberg.NestedField{
178+
ID: 3, Name: "datetime", Type: iceberg.PrimitiveTypes.TimestampTz})
179+
partSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
180+
SourceID: 3, FieldID: 1000, Name: "datetime", Transform: iceberg.DayTransform{}})
181+
182+
props := map[string]string{
183+
"write.target-file-size-bytes": "536870912",
184+
"write.format.default": "parquet",
185+
}
186+
187+
awscfg, err := config.LoadDefaultConfig(context.TODO(), config.WithClientLogMode(aws.LogRequest|aws.LogResponse))
188+
assert.NoError(err)
189+
190+
catalog := NewGlueCatalog(WithAwsConfig(awscfg))
191+
192+
table, err := catalog.CreateTable(context.TODO(),
193+
[]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, schema, partSpec, table.UnsortedSortOrder, location, props)
194+
assert.NoError(err)
195+
assert.Equal([]string{os.Getenv("TEST_DATABASE_NAME"), os.Getenv("TEST_CREATE_TABLE_NAME")}, table.Identifier())
196+
}

0 commit comments

Comments
 (0)