Skip to content

Commit

Permalink
feat(catalog): Propagate ctx from catalog interface through call stack (
Browse files Browse the repository at this point in the history
#276)

In general we should pass `context.Context` through call path rather
than creating `context.Background` at random places.

I made this change in response to [this
comment](#275 (comment)).
I'll fix the original problem in a subsequent PR.
  • Loading branch information
curtisr7 authored Jan 30, 2025
1 parent 48d094d commit 97a9566
Show file tree
Hide file tree
Showing 17 changed files with 149 additions and 139 deletions.
10 changes: 5 additions & 5 deletions catalog/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ var (
)

func init() {
catalog.Register("glue", catalog.RegistrarFunc(func(_ string, props iceberg.Properties) (catalog.Catalog, error) {
awsConfig, err := toAwsConfig(props)
catalog.Register("glue", catalog.RegistrarFunc(func(ctx context.Context, _ string, props iceberg.Properties) (catalog.Catalog, error) {
awsConfig, err := toAwsConfig(ctx, props)
if err != nil {
return nil, err
}
Expand All @@ -78,7 +78,7 @@ func init() {
}))
}

func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
func toAwsConfig(ctx context.Context, p iceberg.Properties) (aws.Config, error) {
opts := make([]func(*config.LoadOptions) error, 0)

for k, v := range p {
Expand Down Expand Up @@ -108,7 +108,7 @@ func toAwsConfig(p iceberg.Properties) (aws.Config, error) {
credentials.NewStaticCredentialsProvider(key, secret, token)))
}

return config.LoadDefaultConfig(context.Background(), opts...)
return config.LoadDefaultConfig(ctx, opts...)
}

type glueAPI interface {
Expand Down Expand Up @@ -205,7 +205,7 @@ func (c *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, pr
}

// TODO: consider providing a way to directly access the S3 iofs to enable testing of the catalog.
iofs, err := io.LoadFS(props, location)
iofs, err := io.LoadFS(ctx, props, location)
if err != nil {
return nil, fmt.Errorf("failed to load table %s.%s: %w", database, tableName, err)
}
Expand Down
5 changes: 3 additions & 2 deletions catalog/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package internal

import (
"context"
"encoding/json"
"fmt"

Expand All @@ -39,8 +40,8 @@ func GetMetadataLoc(location string, newVersion uint) string {
location, newVersion, uuid.New().String())
}

func WriteMetadata(metadata table.Metadata, loc string, props iceberg.Properties) error {
fs, err := io.LoadFS(props, loc)
func WriteMetadata(ctx context.Context, metadata table.Metadata, loc string, props iceberg.Properties) error {
fs, err := io.LoadFS(ctx, props, loc)
if err != nil {
return err
}
Expand Down
13 changes: 7 additions & 6 deletions catalog/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package catalog

import (
"context"
"fmt"
"maps"
"net/url"
Expand Down Expand Up @@ -64,13 +65,13 @@ var (
// Registrar is a factory for creating Catalog instances, used for registering to use
// with LoadCatalog.
type Registrar interface {
GetCatalog(catalogURI string, props iceberg.Properties) (Catalog, error)
GetCatalog(ctx context.Context, catalogName string, props iceberg.Properties) (Catalog, error)
}

type RegistrarFunc func(string, iceberg.Properties) (Catalog, error)
type RegistrarFunc func(context.Context, string, iceberg.Properties) (Catalog, error)

func (f RegistrarFunc) GetCatalog(catalogURI string, props iceberg.Properties) (Catalog, error) {
return f(catalogURI, props)
func (f RegistrarFunc) GetCatalog(ctx context.Context, catalogName string, props iceberg.Properties) (Catalog, error) {
return f(ctx, catalogName, props)
}

// Register adds the new catalog type to the registry. If the catalog type is already registered, it will be replaced.
Expand Down Expand Up @@ -125,7 +126,7 @@ func GetRegisteredCatalogs() []string {
// as the REST endpoint, otherwise the URI is used as the endpoint. The REST catalog also
// registers "http" and "https" so that Load with a http/s URI will automatically
// load the REST Catalog.
func Load(name string, props iceberg.Properties) (Catalog, error) {
func Load(ctx context.Context, name string, props iceberg.Properties) (Catalog, error) {
if name == "" {
name = config.EnvConfig.DefaultCatalog
}
Expand Down Expand Up @@ -159,5 +160,5 @@ func Load(name string, props iceberg.Properties) (Catalog, error) {
return nil, fmt.Errorf("%w: %s", ErrCatalogNotFound, catalogType)
}

return cat.GetCatalog(name, props)
return cat.GetCatalog(ctx, name, props)
}
31 changes: 18 additions & 13 deletions catalog/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package catalog_test

import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
Expand All @@ -33,14 +34,15 @@ import (
)

func TestCatalogRegistry(t *testing.T) {
ctx := context.Background()
assert.ElementsMatch(t, []string{
"rest",
"http",
"https",
"glue",
}, catalog.GetRegisteredCatalogs())

catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("foobar", catalog.RegistrarFunc(func(ctx context.Context, s string, p iceberg.Properties) (catalog.Catalog, error) {
assert.Equal(t, "foobar", s)
assert.Equal(t, "baz", p.Get("foo", ""))
return nil, nil
Expand All @@ -54,28 +56,28 @@ func TestCatalogRegistry(t *testing.T) {
"glue",
}, catalog.GetRegisteredCatalogs())

c, err := catalog.Load("foobar", iceberg.Properties{"foo": "baz"})
c, err := catalog.Load(ctx, "foobar", iceberg.Properties{"foo": "baz"})
assert.Nil(t, c)
assert.ErrorIs(t, err, catalog.ErrCatalogNotFound)

catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("foobar", catalog.RegistrarFunc(func(ctx context.Context, s string, p iceberg.Properties) (catalog.Catalog, error) {
assert.Equal(t, "not found", s)
assert.Equal(t, "baz", p.Get("foo", ""))
return nil, nil
}))

c, err = catalog.Load("not found", iceberg.Properties{"type": "foobar", "foo": "baz"})
c, err = catalog.Load(ctx, "not found", iceberg.Properties{"type": "foobar", "foo": "baz"})
assert.Nil(t, c)
assert.NoError(t, err)

catalog.Register("foobar", catalog.RegistrarFunc(func(s string, p iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("foobar", catalog.RegistrarFunc(func(ctx context.Context, s string, p iceberg.Properties) (catalog.Catalog, error) {
assert.Equal(t, "not found", s)
assert.Equal(t, "foobar://helloworld", p.Get("uri", ""))
assert.Equal(t, "baz", p.Get("foo", ""))
return nil, nil
}))

c, err = catalog.Load("not found", iceberg.Properties{
c, err = catalog.Load(ctx, "not found", iceberg.Properties{
"uri": "foobar://helloworld",
"foo": "baz"})
assert.Nil(t, c)
Expand All @@ -95,6 +97,7 @@ func TestRegistryPanic(t *testing.T) {
}

func TestCatalogWithEmptyName(t *testing.T) {
ctx := context.Background()
config.EnvConfig.DefaultCatalog = "test-default"
config.EnvConfig.Catalogs = map[string]config.CatalogConfig{
"test-default": {
Expand All @@ -104,15 +107,15 @@ func TestCatalogWithEmptyName(t *testing.T) {
CatalogType: "mock",
},
}
catalog.Register("mock", catalog.RegistrarFunc(func(name string, props iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("mock", catalog.RegistrarFunc(func(ctx context.Context, name string, props iceberg.Properties) (catalog.Catalog, error) {
// Ensure the correct name and properties are passed
assert.Equal(t, "test-default", name)
assert.Equal(t, "http://localhost:8181/", props.Get("uri", ""))
assert.Equal(t, "default-credential", props.Get("credential", ""))
assert.Equal(t, "/default/warehouse", props.Get("warehouse", ""))
return nil, nil
}))
c, err := catalog.Load("", nil)
c, err := catalog.Load(ctx, "", nil)
assert.Nil(t, c)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{
Expand All @@ -127,6 +130,7 @@ func TestCatalogWithEmptyName(t *testing.T) {
}

func TestCatalogLoadInvalidURI(t *testing.T) {
ctx := context.Background()
config.EnvConfig.DefaultCatalog = "default"
config.EnvConfig.Catalogs = map[string]config.CatalogConfig{
"default": {
Expand All @@ -137,13 +141,13 @@ func TestCatalogLoadInvalidURI(t *testing.T) {
},
}

catalog.Register("mock", catalog.RegistrarFunc(func(name string, props iceberg.Properties) (catalog.Catalog, error) {
catalog.Register("mock", catalog.RegistrarFunc(func(ctx context.Context, name string, props iceberg.Properties) (catalog.Catalog, error) {
return nil, nil
}))
props := iceberg.Properties{
"uri": "://invalid-uri", // This will cause url.Parse to fail
}
c, err := catalog.Load("mock", props)
c, err := catalog.Load(ctx, "mock", props)

assert.Nil(t, c)
assert.Error(t, err)
Expand All @@ -152,6 +156,7 @@ func TestCatalogLoadInvalidURI(t *testing.T) {
}

func TestRegistryFromConfig(t *testing.T) {
ctx := context.Background()
var params url.Values

mux := http.NewServeMux()
Expand All @@ -178,13 +183,13 @@ func TestRegistryFromConfig(t *testing.T) {
},
}

c, err := catalog.Load("foobar", nil)
c, err := catalog.Load(ctx, "foobar", nil)
assert.NoError(t, err)
assert.IsType(t, &rest.Catalog{}, c)
assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
assert.Equal(t, "catalog_name", params.Get("warehouse"))

c, err = catalog.Load("foobar", iceberg.Properties{"warehouse": "overriden"})
c, err = catalog.Load(ctx, "foobar", iceberg.Properties{"warehouse": "overriden"})
assert.NoError(t, err)
assert.IsType(t, &rest.Catalog{}, c)
assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
Expand All @@ -195,7 +200,7 @@ func TestRegistryFromConfig(t *testing.T) {
srv2 := httptest.NewServer(mux)
defer srv2.Close()

c, err = catalog.Load("foobar", iceberg.Properties{"uri": srv2.URL})
c, err = catalog.Load(ctx, "foobar", iceberg.Properties{"uri": srv2.URL})
assert.NoError(t, err)
assert.IsType(t, &rest.Catalog{}, c)
assert.Equal(t, "foobar", c.(*rest.Catalog).Name())
Expand Down
38 changes: 19 additions & 19 deletions catalog/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var (
)

func init() {
reg := catalog.RegistrarFunc(func(name string, p iceberg.Properties) (catalog.Catalog, error) {
return newCatalogFromProps(name, p.Get("uri", ""), p)
reg := catalog.RegistrarFunc(func(ctx context.Context, name string, p iceberg.Properties) (catalog.Catalog, error) {
return newCatalogFromProps(ctx, name, p.Get("uri", ""), p)
})

catalog.Register(string(catalog.REST), reg)
Expand Down Expand Up @@ -439,39 +439,39 @@ type Catalog struct {
props iceberg.Properties
}

func newCatalogFromProps(name string, uri string, p iceberg.Properties) (*Catalog, error) {
func newCatalogFromProps(ctx context.Context, name string, uri string, p iceberg.Properties) (*Catalog, error) {
ops := fromProps(p)

r := &Catalog{name: name}
if err := r.init(ops, uri); err != nil {
if err := r.init(ctx, ops, uri); err != nil {
return nil, err
}

return r, nil
}

func NewCatalog(name, uri string, opts ...Option) (*Catalog, error) {
func NewCatalog(ctx context.Context, name, uri string, opts ...Option) (*Catalog, error) {
ops := &options{}
for _, o := range opts {
o(ops)
}

r := &Catalog{name: name}
if err := r.init(ops, uri); err != nil {
if err := r.init(ctx, ops, uri); err != nil {
return nil, err
}

return r, nil
}

func (r *Catalog) init(ops *options, uri string) error {
func (r *Catalog) init(ctx context.Context, ops *options, uri string) error {
baseuri, err := url.Parse(uri)
if err != nil {
return err
}

r.baseURI = baseuri.JoinPath("v1")
if r.cl, ops, err = r.fetchConfig(ops); err != nil {
if r.cl, ops, err = r.fetchConfig(ctx, ops); err != nil {
return err
}

Expand Down Expand Up @@ -535,7 +535,7 @@ func (r *Catalog) fetchAccessToken(cl *http.Client, creds string, opts *options)
}
}

func (r *Catalog) createSession(opts *options) (*http.Client, error) {
func (r *Catalog) createSession(ctx context.Context, opts *options) (*http.Client, error) {
session := &sessionTransport{
Transport: http.Transport{TLSClientConfig: opts.tlsConfig},
defaultHeaders: http.Header{},
Expand All @@ -560,7 +560,7 @@ func (r *Catalog) createSession(opts *options) (*http.Client, error) {
session.defaultHeaders.Set("X-Iceberg-Access-Delegation", "vended-credentials")

if opts.enableSigv4 {
cfg, err := config.LoadDefaultConfig(context.Background())
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, err
}
Expand All @@ -576,7 +576,7 @@ func (r *Catalog) createSession(opts *options) (*http.Client, error) {
return cl, nil
}

func (r *Catalog) fetchConfig(opts *options) (*http.Client, *options, error) {
func (r *Catalog) fetchConfig(ctx context.Context, opts *options) (*http.Client, *options, error) {
params := url.Values{}
if opts.warehouseLocation != "" {
params.Set(keyWarehouseLocation, opts.warehouseLocation)
Expand All @@ -585,12 +585,12 @@ func (r *Catalog) fetchConfig(opts *options) (*http.Client, *options, error) {
route := r.baseURI.JoinPath("config")
route.RawQuery = params.Encode()

sess, err := r.createSession(opts)
sess, err := r.createSession(ctx, opts)
if err != nil {
return nil, nil, err
}

rsp, err := doGet[configResponse](context.Background(), route, []string{}, sess, nil)
rsp, err := doGet[configResponse](ctx, route, []string{}, sess, nil)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -627,13 +627,13 @@ func checkValidNamespace(ident table.Identifier) error {
return nil
}

func (r *Catalog) tableFromResponse(identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
func (r *Catalog) tableFromResponse(ctx context.Context, identifier []string, metadata table.Metadata, loc string, config iceberg.Properties) (*table.Table, error) {
id := identifier
if r.name != "" {
id = append([]string{r.name}, identifier...)
}

iofs, err := iceio.LoadFS(config, loc)
iofs, err := iceio.LoadFS(ctx, config, loc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -719,7 +719,7 @@ func (r *Catalog) CreateTable(ctx context.Context, identifier table.Identifier,
maps.Copy(config, ret.Metadata.Properties())
maps.Copy(config, ret.Config)

return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *Catalog) CommitTable(ctx context.Context, tbl *table.Table, requirements []table.Requirement, updates []table.Update) (table.Metadata, string, error) {
Expand Down Expand Up @@ -774,7 +774,7 @@ func (r *Catalog) RegisterTable(ctx context.Context, identifier table.Identifier
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())
maps.Copy(config, ret.Config)
return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, props iceberg.Properties) (*table.Table, error) {
Expand All @@ -796,7 +796,7 @@ func (r *Catalog) LoadTable(ctx context.Context, identifier table.Identifier, pr
config[k] = v
}

return r.tableFromResponse(identifier, ret.Metadata, ret.MetadataLoc, config)
return r.tableFromResponse(ctx, identifier, ret.Metadata, ret.MetadataLoc, config)
}

func (r *Catalog) UpdateTable(ctx context.Context, ident table.Identifier, requirements []table.Requirement, updates []table.Update) (*table.Table, error) {
Expand Down Expand Up @@ -824,7 +824,7 @@ func (r *Catalog) UpdateTable(ctx context.Context, ident table.Identifier, requi
config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())

return r.tableFromResponse(ident, ret.Metadata, ret.MetadataLoc, config)
return r.tableFromResponse(ctx, ident, ret.Metadata, ret.MetadataLoc, config)
}

func (r *Catalog) DropTable(ctx context.Context, identifier table.Identifier) error {
Expand Down
Loading

0 comments on commit 97a9566

Please sign in to comment.