Skip to content

Nessie REST API Support for Namespace Operations #297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
213 changes: 167 additions & 46 deletions catalog/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,46 @@ func doGet[T any](ctx context.Context, baseURI *url.URL, path []string, cl *http
return do[T](ctx, http.MethodGet, baseURI, path, cl, override, false)
}

func doPut[Result any, Payload any](ctx context.Context, baseURI *url.URL, path []string, payload Payload, cl *http.Client, override map[int]error) (ret Result, err error) {
var (
data []byte
req *http.Request
)

uri := baseURI.JoinPath(path...).String()
data, err = json.Marshal(payload)
if err != nil {
return ret, err
}

req, err = http.NewRequestWithContext(ctx, http.MethodPut, uri, bytes.NewReader(data))
if err != nil {
return ret, err
}

req.Header.Set("Content-Type", "application/json")

rsp, err := cl.Do(req)
if err != nil {
return ret, err
}
defer rsp.Body.Close()

if rsp.StatusCode == http.StatusNoContent {
return ret, nil
}

if rsp.StatusCode != http.StatusOK {
return ret, handleNon200(rsp, override)
}

if err = json.NewDecoder(rsp.Body).Decode(&ret); err != nil {
return ret, err
}

return ret, nil
}

func doDelete[T any](ctx context.Context, baseURI *url.URL, path []string, cl *http.Client, override map[int]error) (ret T, err error) {
return do[T](ctx, http.MethodDelete, baseURI, path, cl, override, true)
}
Expand Down Expand Up @@ -308,10 +348,15 @@ func doPost[Payload, Result any](ctx context.Context, baseURI *url.URL, path []s
return
}

if rsp.StatusCode != http.StatusOK {
if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusNoContent {
return ret, handleNon200(rsp, override)
}

// For 204 No Content, return empty result without error
if rsp.StatusCode == http.StatusNoContent {
return ret, nil
}

if rsp.ContentLength == 0 {
return
}
Expand All @@ -325,44 +370,52 @@ func doPost[Payload, Result any](ctx context.Context, baseURI *url.URL, path []s
}

func handleNon200(rsp *http.Response, override map[int]error) error {
var rawResponse map[string]any
var e errorResponse

dec := json.NewDecoder(rsp.Body)
dec.Decode(&struct {
Error *errorResponse `json:"error"`
}{Error: &e})
body, err := io.ReadAll(rsp.Body)
if err != nil {
return fmt.Errorf("error reading response body: %w", err)
}
rsp.Body.Close()

if override != nil {
if err, ok := override[rsp.StatusCode]; ok {
e.wrapping = err
return e
if err := json.Unmarshal(body, &rawResponse); err != nil {
e = errorResponse{
Message: "Unknown error",
Type: "UnknownError",
Code: rsp.StatusCode,
}
} else {
if errData, exists := rawResponse["error"].(map[string]any); exists {
if msg, ok := errData["message"].(string); ok {
e.Message = msg
}
if errType, ok := errData["type"].(string); ok {
e.Type = errType
}
if code, ok := errData["code"].(float64); ok {
e.Code = int(code)
}
} else {
if msg, ok := rawResponse["message"].(string); ok {
e.Message = msg
}
if errType, ok := rawResponse["type"].(string); ok {
e.Type = errType
}
if code, ok := rawResponse["code"].(float64); ok {
e.Code = int(code)
}
}
}

switch rsp.StatusCode {
case http.StatusBadRequest:
e.wrapping = ErrBadRequest
case http.StatusUnauthorized:
e.wrapping = ErrUnauthorized
case http.StatusForbidden:
e.wrapping = ErrForbidden
case http.StatusUnprocessableEntity:
e.wrapping = ErrRESTError
case 419:
e.wrapping = ErrAuthorizationExpired
case http.StatusNotImplemented:
e.wrapping = iceberg.ErrNotImplemented
case http.StatusServiceUnavailable:
e.wrapping = ErrServiceUnavailable
default:
if 500 <= rsp.StatusCode && rsp.StatusCode < 600 {
e.wrapping = ErrServerError
} else {
e.wrapping = ErrRESTError
if override != nil {
if err, ok := override[rsp.StatusCode]; ok {
return fmt.Errorf("%w: %s", err, e.Message)
}
}

return e
return fmt.Errorf("%w: %s", ErrRESTError, e.Message)
}

func fromProps(props iceberg.Properties) *options {
Expand Down Expand Up @@ -750,6 +803,7 @@ func (r *Catalog) CommitTable(ctx context.Context, tbl *table.Table, requirement

config := maps.Clone(r.props)
maps.Copy(config, ret.Metadata.Properties())

return ret.Metadata, ret.MetadataLoc, nil
}

Expand Down Expand Up @@ -880,59 +934,112 @@ func (r *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) (*
}

func (r *Catalog) CreateNamespace(ctx context.Context, namespace table.Identifier, props iceberg.Properties) error {

if err := checkValidNamespace(namespace); err != nil {
return err
}

_, err := doPost[map[string]any, struct{}](ctx, r.baseURI, []string{"namespaces"},
map[string]any{"namespace": namespace, "properties": props}, r.cl, map[int]error{
http.StatusNotFound: catalog.ErrNoSuchNamespace, http.StatusConflict: catalog.ErrNamespaceAlreadyExists})
return err
ref := table.Identifier{}
if refStr := iceberg.GetRefFromContext(ctx); refStr != "" {
ref = catalog.ToIdentifier(refStr)
}

payload := map[string]any{
"type": "NAMESPACE",
"elements": namespace,
"properties": props,
}

path := []string{"namespaces", "namespace", strings.Join(ref, "."), strings.Join(namespace, ".")}

_, err := doPut[map[string]any, map[string]any](ctx, r.baseURI, path, payload, r.cl, map[int]error{
http.StatusNotFound: catalog.ErrNoSuchNamespace, http.StatusConflict: catalog.ErrNamespaceAlreadyExists})

if err != nil {
return err
}

return nil
}

func (r *Catalog) DropNamespace(ctx context.Context, namespace table.Identifier) error {
if err := checkValidNamespace(namespace); err != nil {
return err
}

_, err := doDelete[struct{}](ctx, r.baseURI, []string{"namespaces", strings.Join(namespace, namespaceSeparator)},
ref := table.Identifier{}
if refStr := iceberg.GetRefFromContext(ctx); refStr != "" {
ref = catalog.ToIdentifier(refStr)
}

path := []string{"namespaces", "namespace", strings.Join(ref, "."), strings.Join(namespace, ".")}
_, err := doDelete[struct{}](ctx, r.baseURI, path,
r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace})

return err
}

func (r *Catalog) ListNamespaces(ctx context.Context, parent table.Identifier) ([]table.Identifier, error) {
uri := r.baseURI.JoinPath("namespaces")
ref := table.Identifier{}
if refStr := iceberg.GetRefFromContext(ctx); refStr != "" {
ref = catalog.ToIdentifier(refStr)
}

uri := r.baseURI.JoinPath("namespaces/", strings.Join(ref, "."))

if len(parent) != 0 {
v := url.Values{}
v.Set("parent", strings.Join(parent, namespaceSeparator))
uri.RawQuery = v.Encode()
}

type namespaceItem struct {
Type string `json:"type"`
ID string `json:"id"`
Elements []string `json:"elements"`
}

type rsptype struct {
Namespaces []table.Identifier `json:"namespaces"`
Namespaces []namespaceItem `json:"namespaces"`
}

rsp, err := doGet[rsptype](ctx, uri, []string{}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace})
if err != nil {
return nil, err
}

return rsp.Namespaces, nil
var result []table.Identifier
for _, ns := range rsp.Namespaces {
result = append(result, ns.Elements)
}

return result, nil
}

func (r *Catalog) LoadNamespaceProperties(ctx context.Context, namespace table.Identifier) (iceberg.Properties, error) {
if err := checkValidNamespace(namespace); err != nil {
return nil, err
}

ref := table.Identifier{}
if refStr := iceberg.GetRefFromContext(ctx); refStr != "" {
ref = catalog.ToIdentifier(refStr)
}

type namespaceItem struct {
Type string `json:"type"`
ID string `json:"id"`
Elements []string `json:"elements"`
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this something in the actual REST API spec? or is it unique to Nessie's implementation? If this is unique to Nessie, then I would not be in favor of adding this to the REST catalog implementation


type nsresponse struct {
Namespace table.Identifier `json:"namespace"`
Namespace []namespaceItem `json:"namespace"`
Props iceberg.Properties `json:"properties"`
}

rsp, err := doGet[nsresponse](ctx, r.baseURI, []string{"namespaces", strings.Join(namespace, namespaceSeparator)},
r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace})
path := []string{"namespaces", "namespace", strings.Join(ref, "."), strings.Join(namespace, ".")}

rsp, err := doGet[nsresponse](ctx, r.baseURI, path, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace})
if err != nil {
return nil, err
}
Expand All @@ -947,14 +1054,28 @@ func (r *Catalog) UpdateNamespaceProperties(ctx context.Context, namespace table
return catalog.PropertiesUpdateSummary{}, err
}

ref := table.Identifier{}
if refStr := iceberg.GetRefFromContext(ctx); refStr != "" {
ref = catalog.ToIdentifier(refStr)
}

type payload struct {
Remove []string `json:"removals"`
Updates iceberg.Properties `json:"updates"`
PropertyUpdates iceberg.Properties `json:"propertyUpdates"`
PropertyRemovals []string `json:"propertyRemovals,omitempty"`
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this change incompatible with the REST API?


ns := strings.Join(namespace, namespaceSeparator)
return doPost[payload, catalog.PropertiesUpdateSummary](ctx, r.baseURI, []string{"namespaces", ns, "properties"},
payload{Remove: removals, Updates: updates}, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace})
p := payload{PropertyUpdates: updates, PropertyRemovals: removals}

path := []string{"namespaces", "namespace", strings.Join(ref, "."), strings.Join(namespace, ".")}

res, err := doPost[payload, catalog.PropertiesUpdateSummary](ctx, r.baseURI, path,
p, r.cl, map[int]error{http.StatusNotFound: catalog.ErrNoSuchNamespace})

if err != nil {
return catalog.PropertiesUpdateSummary{}, err
}

return res, nil
}

func (r *Catalog) CheckNamespaceExists(ctx context.Context, namespace table.Identifier) (bool, error) {
Expand Down
Loading
Loading