Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(catalog): Propagate ctx from catalog interface through call stack #276

Merged
merged 4 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions catalog/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ var (
)

func init() {
catalog.Register("glue", catalog.RegistrarFunc(func(_ string, props iceberg.Properties) (catalog.Catalog, error) {
awsConfig, err := toAwsConfig(props)
catalog.Register("glue", catalog.RegistrarFunc(func(ctx context.Context, _ string, props iceberg.Properties) (catalog.Catalog, error) {
awsConfig, err := toAwsConfig(ctx, props)
if err != nil {
return nil, err
}
Expand All @@ -78,7 +78,7 @@ func init() {
}))
}

func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
func toAwsConfig(ctx context.Context, p iceberg.Properties) (aws.Config, error) {
opts := make([]func(*config.LoadOptions) error, 0)

for k, v := range p {
Expand Down Expand Up @@ -108,7 +108,7 @@ func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
credentials.NewStaticCredentialsProvider(key, secret, token)))
}

return config.LoadDefaultConfig(context.Background(), opts...)
return config.LoadDefaultConfig(ctx, opts...)
}

type glueAPI interface {
Expand Down Expand Up @@ -205,7 +205,7 @@ func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, pr
}

// TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog.
iofs, err := io.LoadFS(props, location)
iofs, err := io.LoadFS(ctx, props, location)
if err != nil {
return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err)
}
Expand Down
5 changes: 3 additions & 2 deletions catalog/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package internal

import (
"context"
"encoding/json"
"fmt"

Expand All @@ -39,8 +40,8 @@ func GetMetadataLoc(location string, newVersion uint) string {
location, newVersion, uuid.New().String())
}

func WriteMetadata(metadata table.Metadata, loc string, props iceberg.Properties) error {
fs, err := io.LoadFS(props, loc)
func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string, props iceberg.Properties) error {
fs, err := io.LoadFS(ctx, props, loc)
if err != nil {
return err
}
Expand Down
13 changes: 7 additions & 6 deletions catalog/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package catalog

import (
"context"
"fmt"
"maps"
"net/url"
Expand Down Expand Up @@ -64,13 +65,13 @@ var (
// Registrar is a factory for creating Catalog instances, used for registering to use
// with LoadCatalog.
type Registrar interface {
GetCatalog(catalogURI string, props iceberg.Properties) (Catalog, error)
GetCatalog(ctx context.Context, catalogName string, props iceberg.Properties) (Catalog, error)
}

type RegistrarFunc func(string, iceberg.Properties) (Catalog, error)
type RegistrarFunc func(context.Context, string, iceberg.Properties) (Catalog, error)

func (f RegistrarFunc) GetCatalog(catalogURI string, props iceberg.Properties) (Catalog, error) {
return f(catalogURI, props)
func (f RegistrarFunc) GetCatalog(ctx context.Context, catalogName string, props iceberg.Properties) (Catalog, error) {
return f(ctx, catalogName, props)
}

// Register adds the new catalog type to the registry. If the catalog type is already registered, it will be replaced.
Expand Down Expand Up @@ -125,7 +126,7 @@ func GetRegisteredCatalogs() []string {
// as the REST endpoint, otherwise the URI is used as the endpoint. The REST catalog also
// registers "http" and "https" so that Load with a http/s URI will automatically
// load the REST Catalog.
func Load(name string, props iceberg.Properties) (Catalog, error) {
func Load(ctx context.Context, name string, props iceberg.Properties) (Catalog, error) {
if name == "" {
name = config.EnvConfig.DefaultCatalog
}
Expand Down Expand Up @@ -159,5 +160,5 @@ func Load(name string, props iceberg.Properties) (Catalog, error) {
return nil, fmt.Errorf("%w: %s", ErrCatalogNotFound, catalogType)
}

return cat.GetCatalog(name, props)
return cat.GetCatalog(ctx, name, props)
}
31 changes: 18 additions & 13 deletions catalog/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package catalog_test

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
Expand All @@ -33,14 +34,15 @@ import (
)

func TestCatalogRegistry(t *testing.T) {
ctx := context.Background()
assert.ElementsMatch(t, []string{
"rest",
"http",
"https",
"glue",
}, catalog.GetRegisteredCatalogs())

catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("foobar", catalog.RegistrarFunc(func(ctx context.Context, s string, p iceberg.Properties) (catalog.Catalog, error) {
assert.Equal(t, "foobar", s)
assert.Equal(t, "baz", p.Get("foo", ""))
return nil, nil
Expand All @@ -54,28 +56,28 @@ func TestCatalogRegistry(t *testing.T) {
"glue",
}, catalog.GetRegisteredCatalogs())

c, err := catalog.Load("foobar", iceberg.Properties{"foo": "baz"})
c, err := catalog.Load(ctx, "foobar", iceberg.Properties{"foo": "baz"})
assert.Nil(t, c)
assert.ErrorIs(t, err, catalog.ErrCatalogNotFound)

catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("foobar", catalog.RegistrarFunc(func(ctx context.Context, s string, p iceberg.Properties) (catalog.Catalog, error) {
assert.Equal(t, "not found", s)
assert.Equal(t, "baz", p.Get("foo", ""))
return nil, nil
}))

c, err = catalog.Load("not found", iceberg.Properties{"type": "foobar", "foo": "baz"})
c, err = catalog.Load(ctx, "not found", iceberg.Properties{"type": "foobar", "foo": "baz"})
assert.Nil(t, c)
assert.NoError(t, err)

catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("foobar", catalog.RegistrarFunc(func(ctx context.Context, s string, p iceberg.Properties) (catalog.Catalog, error) {
assert.Equal(t, "not found", s)
assert.Equal(t, "foobar://helloworld", p.Get("uri", ""))
assert.Equal(t, "baz", p.Get("foo", ""))
return nil, nil
}))

c, err = catalog.Load("not found", iceberg.Properties{
c, err = catalog.Load(ctx, "not found", iceberg.Properties{
"uri": "foobar://helloworld",
"foo": "baz"})
assert.Nil(t, c)
Expand All @@ -95,6 +97,7 @@ func TestRegistryPanic(t *testing.T) {
}

func TestCatalogWithEmptyName(t *testing.T) {
ctx := context.Background()
config.EnvConfig.DefaultCatalog = "test-default"
config.EnvConfig.Catalogs = map[string]config.CatalogConfig{
"test-default": {
Expand All @@ -104,15 +107,15 @@ func TestCatalogWithEmptyName(t *testing.T) {
CatalogType: "mock",
},
}
catalog.Register("mock", catalog.RegistrarFunc(func(name string, props iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("mock", catalog.RegistrarFunc(func(ctx context.Context, name string, props iceberg.Properties) (catalog.Catalog, error) {
// Ensure the correct name and properties are passed
assert.Equal(t, "test-default", name)
assert.Equal(t, "http://localhost:8181/", props.Get("uri", ""))
assert.Equal(t, "default-credential", props.Get("credential", ""))
assert.Equal(t, "/default/warehouse", props.Get("warehouse", ""))
return nil, nil
}))
c, err := catalog.Load("", nil)
c, err := catalog.Load(ctx, "", nil)
assert.Nil(t, c)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{
Expand All @@ -127,6 +130,7 @@ func TestCatalogWithEmptyName(t *testing.T) {
}

func TestCatalogLoadInvalidURI(t *testing.T) {
ctx := context.Background()
config.EnvConfig.DefaultCatalog = "default"
config.EnvConfig.Catalogs = map[string]config.CatalogConfig{
"default": {
Expand All @@ -137,13 +141,13 @@ func TestCatalogLoadInvalidURI(t *testing.T) {
},
}

catalog.Register("mock", catalog.RegistrarFunc(func(name string, props iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("mock", catalog.RegistrarFunc(func(ctx context.Context, name string, props iceberg.Properties) (catalog.Catalog, error) {
return nil, nil
}))
props := iceberg.Properties{
"uri": "://invalid-uri", // This will cause url.Parse to fail
}
c, err := catalog.Load("mock", props)
c, err := catalog.Load(ctx, "mock", props)

assert.Nil(t, c)
assert.Error(t, err)
Expand All @@ -152,6 +156,7 @@ func TestCatalogLoadInvalidURI(t *testing.T) {
}

func TestRegistryFromConfig(t *testing.T) {
ctx := context.Background()
var params url.Values

mux := http.NewServeMux()
Expand All @@ -178,13 +183,13 @@ func TestRegistryFromConfig(t *testing.T) {
},
}

c, err := catalog.Load("foobar", nil)
c, err := catalog.Load(ctx, "foobar", nil)
assert.NoError(t, err)
assert.IsType(t, &rest.Catalog{}, c)
assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
assert.Equal(t, "catalog_name", params.Get("warehouse"))

c, err = catalog.Load("foobar", iceberg.Properties{"warehouse": "overriden"})
c, err = catalog.Load(ctx, "foobar", iceberg.Properties{"warehouse": "overriden"})
assert.NoError(t, err)
assert.IsType(t, &rest.Catalog{}, c)
assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
Expand All @@ -195,7 +200,7 @@ func TestRegistryFromConfig(t *testing.T) {
srv2 := httptest.NewServer(mux)
defer srv2.Close()

c, err = catalog.Load("foobar", iceberg.Properties{"uri": srv2.URL})
c, err = catalog.Load(ctx, "foobar", iceberg.Properties{"uri": srv2.URL})
assert.NoError(t, err)
assert.IsType(t, &rest.Catalog{}, c)
assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
Expand Down
38 changes: 19 additions & 19 deletions catalog/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var (
)

func init() {
reg := catalog.RegistrarFunc(func(name string, p iceberg.Properties) (catalog.Catalog, error) {
return newCatalogFromProps(name, p.Get("uri", ""), p)
reg := catalog.RegistrarFunc(func(ctx context.Context, name string, p iceberg.Properties) (catalog.Catalog, error) {
return newCatalogFromProps(ctx, name, p.Get("uri", ""), p)
})

catalog.Register(string(catalog.REST), reg)
Expand Down Expand Up @@ -439,39 +439,39 @@ type Catalog struct {
props iceberg.Properties
}

func newCatalogFromProps(name string, uri string, p iceberg.Properties) (*Catalog, error) {
func newCatalogFromProps(ctx context.Context, name string, uri string, p iceberg.Properties) (*Catalog, error) {
ops := fromProps(p)

r := &Catalog{name: name}
if err := r.init(ops, uri); err != nil {
if err := r.init(ctx, ops, uri); err != nil {
return nil, err
}

return r, nil
}

func NewCatalog(name, uri string, opts ...Option) (*Catalog, error) {
func NewCatalog(ctx context.Context, name, uri string, opts ...Option) (*Catalog, error) {
ops := &options{}
for _, o := range opts {
o(ops)
}

r := &Catalog{name: name}
if err := r.init(ops, uri); err != nil {
if err := r.init(ctx, ops, uri); err != nil {
return nil, err
}

return r, nil
}

func (r *Catalog) init(ops *options, uri string) error {
func (r *Catalog) init(ctx context.Context, ops *options, uri string) error {
baseuri, err := url.Parse(uri)
if err != nil {
return err
}

r.baseURI = baseuri.JoinPath("v1")
if r.cl, ops, err = r.fetchConfig(ops); err != nil {
if r.cl, ops, err = r.fetchConfig(ctx, ops); err != nil {
return err
}

Expand Down Expand Up @@ -535,7 +535,7 @@ func (r *Catalog) fetchAccessToken(cl *http.Client, creds string, opts *options)
}
}

func (r *Catalog) createSession(opts *options) (*http.Client, error) {
func (r *Catalog) createSession(ctx context.Context, opts *options) (*http.Client, error) {
session := &sessionTransport{
Transport: http.Transport{TLSClientConfig: opts.tlsConfig},
defaultHeaders: http.Header{},
Expand All @@ -560,7 +560,7 @@ func (r *Catalog) createSession(opts *options) (*http.Client, error) {
session.defaultHeaders.Set("X-Iceberg-Access-Delegation", "vended-credentials")

if opts.enableSigv4 {
cfg, err := config.LoadDefaultConfig(context.Background())
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, err
}
Expand All @@ -576,7 +576,7 @@ func (r *Catalog) createSession(opts *options) (*http.Client, error) {
return cl, nil
}

func (r *Catalog) fetchConfig(opts *options) (*http.Client, *options, error) {
func (r *Catalog) fetchConfig(ctx context.Context, opts *options) (*http.Client, *options, error) {
params := url.Values{}
if opts.warehouseLocation != "" {
params.Set(keyWarehouseLocation, opts.warehouseLocation)
Expand All @@ -585,12 +585,12 @@ func (r *Catalog) fetchConfig(opts *options) (*http.Client, *options, error) {
route := r.baseURI.JoinPath("config")
route.RawQuery = params.Encode()

sess, err := r.createSession(opts)
sess, err := r.createSession(ctx, opts)
if err != nil {
return nil, nil, err
}

rsp, err := doGet[configResponse](context.Background(), route, []string{}, sess, nil)
rsp, err := doGet[configResponse](ctx, route, []string{}, sess, nil)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -627,13 +627,13 @@ func checkValidNamespace(ident table.Identifier) error {
return nil
}

func (r *Catalog) tableFromResponse(identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
func (r *Catalog) tableFromResponse(ctx context.Context, identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
id := identifier
if r.name != "" {
id = append([]string{r.name}, identifier...)
}

iofs, err := iceio.LoadFS(config, loc)
iofs, err := iceio.LoadFS(ctx, config, loc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -719,7 +719,7 @@ func (r *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
maps.Copy(config, ret.Metadata.Properties())
maps.Copy(config, ret.Config)

return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *Catalog) CommitTable(ctx context.Context, tbl *table.Table, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) {
Expand Down Expand Up @@ -774,7 +774,7 @@ func (r *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
maps.Copy(config, ret.Config)
return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) {
Expand All @@ -796,7 +796,7 @@ func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, pr
config[k] = v
}

return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *Catalog) UpdateTable(ctx context.Context, ident table.Identifier, requirements []table.Requirement, updates []table.Update) (*table.Table, error) {
Expand Down Expand Up @@ -824,7 +824,7 @@ func (r *Catalog) UpdateTable(ctx context.Context, ident table.Identifier, requi
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())

return r.tableFromResponse(ident, ret.Metadata, ret.MetadataLoc, config)
return r.tableFromResponse(ctx, ident, ret.Metadata, ret.MetadataLoc, config)
}

func (r *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error {
Expand Down
Loading