From 85238d218b658e098f6b5a050930a6cb0bbef822 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 15 Jan 2025 01:45:24 -0500 Subject: [PATCH] feat(catalog): Standardize Catalog create table function (#245) * standardize CreateTable * update catalog impl * add test for table.NewMetadata and AssignFresh* functions * add docstrings for the new functions * use proper type for return of With helpers * fix lint, missing func * Update catalog/catalog.go Co-authored-by: Kevin Liu --------- Co-authored-by: Kevin Liu --- catalog/catalog.go | 40 +++++++- catalog/glue.go | 4 + catalog/rest.go | 66 ++++++------- partitions.go | 31 +++++++ schema.go | 160 ++++++++++++++++++++++++++++++++ schema_test.go | 26 ++++++ table/metadata.go | 74 +++++++++++++++ table/metadata_internal_test.go | 115 +++++++++++++++++++++++ table/sorting.go | 35 +++++++ 9 files changed, 511 insertions(+), 40 deletions(-) diff --git a/catalog/catalog.go b/catalog/catalog.go index 0bc8f496..8e01abc6 100644 --- a/catalog/catalog.go +++ b/catalog/catalog.go @@ -24,6 +24,7 @@ import ( "fmt" "maps" "net/url" + "strings" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/table" @@ -156,6 +157,10 @@ type Catalog interface { // CatalogType returns the type of the catalog. CatalogType() CatalogType + // CreateTable creates a new iceberg table in the catalog using the provided identifier + // and schema. Options can be used to optionally provide location, partition spec, sort order, + // and custom properties. + CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...createTableOpt) (*table.Table, error) // ListTables returns a list of table identifiers in the catalog, with the returned // identifiers containing the information required to load the table via that catalog. ListTables(ctx context.Context, namespace table.Identifier) ([]table.Identifier, error) @@ -217,7 +222,6 @@ func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals [ if err := checkForOverlap(removals, updates); err != nil { return nil, PropertiesUpdateSummary{}, err } - var ( updatedProps = maps.Clone(currentProps) removed = make([]string, 0, len(removals)) @@ -243,6 +247,38 @@ func getUpdatedPropsAndUpdateSummary(currentProps iceberg.Properties, removals [ Updated: updated, Missing: iceberg.Difference(removals, removed), } - return updatedProps, summary, nil } + +type createTableOpt func(*createTableCfg) + +type createTableCfg struct { + location string + partitionSpec *iceberg.PartitionSpec + sortOrder table.SortOrder + properties iceberg.Properties +} + +func WithLocation(location string) createTableOpt { + return func(cfg *createTableCfg) { + cfg.location = strings.TrimRight(location, "/") + } +} + +func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOpt { + return func(cfg *createTableCfg) { + cfg.partitionSpec = spec + } +} + +func WithSortOrder(order table.SortOrder) createTableOpt { + return func(cfg *createTableCfg) { + cfg.sortOrder = order + } +} + +func WithProperties(props iceberg.Properties) createTableOpt { + return func(cfg *createTableCfg) { + cfg.properties = props + } +} diff --git a/catalog/glue.go b/catalog/glue.go index 245116b2..e628a4f4 100644 --- a/catalog/glue.go +++ b/catalog/glue.go @@ -220,6 +220,10 @@ func (c *GlueCatalog) CatalogType() CatalogType { return Glue } +func (c *GlueCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...createTableOpt) (*table.Table, error) { + panic("create table not implemented for Glue Catalog") +} + // DropTable deletes an Iceberg table from the Glue catalog. func (c *GlueCatalog) DropTable(ctx context.Context, identifier table.Identifier) error { database, tableName, err := identifierToGlueTable(identifier) diff --git a/catalog/rest.go b/catalog/rest.go index d068ae59..efb9217f 100644 --- a/catalog/rest.go +++ b/catalog/rest.go @@ -134,38 +134,6 @@ func (t *loadTableResponse) UnmarshalJSON(b []byte) (err error) { return } -type createTableOption func(*createTableRequest) - -func WithLocation(loc string) createTableOption { - return func(req *createTableRequest) { - req.Location = strings.TrimRight(loc, "/") - } -} - -func WithPartitionSpec(spec *iceberg.PartitionSpec) createTableOption { - return func(req *createTableRequest) { - req.PartitionSpec = spec - } -} - -func WithWriteOrder(order *table.SortOrder) createTableOption { - return func(req *createTableRequest) { - req.WriteOrder = order - } -} - -func WithStageCreate() createTableOption { - return func(req *createTableRequest) { - req.StageCreate = true - } -} - -func WithProperties(props iceberg.Properties) createTableOption { - return func(req *createTableRequest) { - req.Props = props - } -} - type createTableRequest struct { Name string `json:"name"` Schema *iceberg.Schema `json:"schema"` @@ -700,18 +668,40 @@ func splitIdentForPath(ident table.Identifier) (string, string, error) { return strings.Join(NamespaceFromIdent(ident), namespaceSeparator), TableNameFromIdent(ident), nil } -func (r *RestCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...createTableOption) (*table.Table, error) { +func (r *RestCatalog) CreateTable(ctx context.Context, identifier table.Identifier, schema *iceberg.Schema, opts ...createTableOpt) (*table.Table, error) { ns, tbl, err := splitIdentForPath(identifier) if err != nil { return nil, err } - payload := createTableRequest{ - Name: tbl, - Schema: schema, - } + var cfg createTableCfg for _, o := range opts { - o(&payload) + o(&cfg) + } + + freshSchema, err := iceberg.AssignFreshSchemaIDs(schema, nil) + if err != nil { + return nil, err + } + + freshPartitionSpec, err := iceberg.AssignFreshPartitionSpecIDs(cfg.partitionSpec, schema, freshSchema) + if err != nil { + return nil, err + } + + freshSortOrder, err := table.AssignFreshSortOrderIDs(cfg.sortOrder, schema, freshSchema) + if err != nil { + return nil, err + } + + payload := createTableRequest{ + Name: tbl, + Schema: freshSchema, + Location: cfg.location, + PartitionSpec: &freshPartitionSpec, + WriteOrder: &freshSortOrder, + StageCreate: false, + Props: cfg.properties, } ret, err := doPost[createTableRequest, loadTableResponse](ctx, r.baseURI, []string{"namespaces", ns, "tables"}, payload, diff --git a/partitions.go b/partitions.go index 9416d70b..ebf97ff4 100644 --- a/partitions.go +++ b/partitions.go @@ -235,3 +235,34 @@ func (ps *PartitionSpec) PartitionType(schema *Schema) *StructType { } return &StructType{FieldList: nestedFields} } + +// AssignFreshPartitionSpecIDs creates a new PartitionSpec by reassigning the field IDs +// from the old schema to the corresponding fields in the fresh schema, while re-assigning +// the actual Spec IDs to 1000 + the position of the field in the partition spec. +func AssignFreshPartitionSpecIDs(spec *PartitionSpec, old, fresh *Schema) (PartitionSpec, error) { + if spec == nil { + return PartitionSpec{}, nil + } + + newFields := make([]PartitionField, 0, len(spec.fields)) + for pos, field := range spec.fields { + origCol, ok := old.FindColumnName(field.SourceID) + if !ok { + return PartitionSpec{}, fmt.Errorf("could not find field in old schema: %s", field.Name) + } + + freshField, ok := fresh.FindFieldByName(origCol) + if !ok { + return PartitionSpec{}, fmt.Errorf("could not find field in fresh schema: %s", field.Name) + } + + newFields = append(newFields, PartitionField{ + Name: field.Name, + SourceID: freshField.ID, + FieldID: partitionDataIDStart + pos, + Transform: field.Transform, + }) + } + + return NewPartitionSpec(newFields...), nil +} diff --git a/schema.go b/schema.go index 18014dce..04c29e7f 100644 --- a/schema.go +++ b/schema.go @@ -588,6 +588,79 @@ func visitField[T any](f NestedField, visitor SchemaVisitor[T]) T { } } +type PreOrderSchemaVisitor[T any] interface { + Schema(*Schema, func() T) T + Struct(StructType, []func() T) T + Field(NestedField, func() T) T + List(ListType, func() T) T + Map(MapType, func() T, func() T) T + Primitive(PrimitiveType) T +} + +func PreOrderVisit[T any](sc *Schema, visitor PreOrderSchemaVisitor[T]) (res T, err error) { + if sc == nil { + err = fmt.Errorf("%w: cannot visit nil schema", ErrInvalidArgument) + return + } + + defer func() { + if r := recover(); r != nil { + switch e := r.(type) { + case string: + err = fmt.Errorf("error encountered during schema visitor: %s", e) + case error: + err = fmt.Errorf("error encountered during schema visitor: %w", e) + } + } + }() + + return visitor.Schema(sc, func() T { + return visitStructPreOrder(sc.AsStruct(), visitor) + }), nil +} + +func visitStructPreOrder[T any](obj StructType, visitor PreOrderSchemaVisitor[T]) T { + results := make([]func() T, len(obj.FieldList)) + + for i, f := range obj.FieldList { + results[i] = func() T { + return visitFieldPreOrder(f, visitor) + } + } + + return visitor.Struct(obj, results) +} + +func visitListPreOrder[T any](obj ListType, visitor PreOrderSchemaVisitor[T]) T { + return visitor.List(obj, func() T { + return visitFieldPreOrder(obj.ElementField(), visitor) + }) +} + +func visitMapPreOrder[T any](obj MapType, visitor PreOrderSchemaVisitor[T]) T { + return visitor.Map(obj, func() T { + return visitFieldPreOrder(obj.KeyField(), visitor) + }, func() T { + return visitFieldPreOrder(obj.ValueField(), visitor) + }) +} + +func visitFieldPreOrder[T any](f NestedField, visitor PreOrderSchemaVisitor[T]) T { + var fn func() T + switch typ := f.Type.(type) { + case *StructType: + fn = func() T { return visitStructPreOrder(*typ, visitor) } + case *ListType: + fn = func() T { return visitListPreOrder(*typ, visitor) } + case *MapType: + fn = func() T { return visitMapPreOrder(*typ, visitor) } + default: + fn = func() T { return visitor.Primitive(typ.(PrimitiveType)) } + } + + return visitor.Field(f, fn) +} + // IndexByID performs a post-order traversal of the given schema and // returns a mapping from field ID to field. func IndexByID(schema *Schema) (map[int]NestedField, error) { @@ -1069,6 +1142,93 @@ func buildAccessors(schema *Schema) (map[int]accessor, error) { return Visit(schema, buildPosAccessors{}) } +type setFreshIDs struct { + oldIdToNew map[int]int + nextIDFunc func() int +} + +func (s *setFreshIDs) getAndInc(currentID int) int { + next := s.nextIDFunc() + s.oldIdToNew[currentID] = next + return next +} + +func (s *setFreshIDs) Schema(_ *Schema, structResult func() Type) Type { + return structResult() +} + +func (s *setFreshIDs) Struct(st StructType, fieldResults []func() Type) Type { + newFields := make([]NestedField, len(st.FieldList)) + for idx, f := range st.FieldList { + newFields[idx] = NestedField{ + ID: s.getAndInc(f.ID), + Name: f.Name, + Type: fieldResults[idx](), + Doc: f.Doc, + Required: f.Required, + } + } + return &StructType{FieldList: newFields} +} + +func (s *setFreshIDs) Field(_ NestedField, fieldResult func() Type) Type { + return fieldResult() +} + +func (s *setFreshIDs) List(list ListType, elemResult func() Type) Type { + elemID := s.getAndInc(list.ElementID) + return &ListType{ + ElementID: elemID, + Element: elemResult(), + ElementRequired: list.ElementRequired, + } +} + +func (s *setFreshIDs) Map(mapType MapType, keyResult, valueResult func() Type) Type { + keyID := s.getAndInc(mapType.KeyID) + valueID := s.getAndInc(mapType.ValueID) + return &MapType{ + KeyID: keyID, + ValueID: valueID, + KeyType: keyResult(), + ValueType: valueResult(), + ValueRequired: mapType.ValueRequired, + } +} + +func (s *setFreshIDs) Primitive(p PrimitiveType) Type { + return p +} + +// AssignFreshSchemaIDs creates a new schema with fresh field IDs for all of the +// fields in it. The nextID function is used to iteratively generate the ids, if +// it is nil then a simple incrementing counter is used starting at 1. +func AssignFreshSchemaIDs(sc *Schema, nextID func() int) (*Schema, error) { + if nextID == nil { + var id int = 0 + nextID = func() int { + id++ + return id + } + } + visitor := &setFreshIDs{oldIdToNew: make(map[int]int), nextIDFunc: nextID} + outType, err := PreOrderVisit(sc, visitor) + if err != nil { + return nil, err + } + + fields := outType.(*StructType).FieldList + var newIdentifierIDs []int + if len(sc.IdentifierFieldIDs) != 0 { + newIdentifierIDs = make([]int, len(sc.IdentifierFieldIDs)) + for i, id := range sc.IdentifierFieldIDs { + newIdentifierIDs[i] = visitor.oldIdToNew[id] + } + } + + return NewSchemaWithIdentifiers(0, newIdentifierIDs, fields...), nil +} + type SchemaWithPartnerVisitor[T, P any] interface { Schema(sc *Schema, schemaPartner P, structResult T) T Struct(st StructType, structPartner P, fieldResults []T) T diff --git a/schema_test.go b/schema_test.go index 4e8e7469..d080b9fb 100644 --- a/schema_test.go +++ b/schema_test.go @@ -641,6 +641,32 @@ func TestPruneNilSchema(t *testing.T) { assert.ErrorIs(t, err, iceberg.ErrInvalidArgument) } +func TestAssignFreshSchemaIDs(t *testing.T) { + startID := 100 + sc, err := iceberg.AssignFreshSchemaIDs(tableSchemaNested, func() int { + startID++ + return startID + }) + require.NoError(t, err) + require.NotNil(t, sc) + + startID = 100 + var checkID func(iceberg.NestedField) + checkID = func(f iceberg.NestedField) { + startID++ + assert.Equal(t, startID, f.ID) + if nested, ok := f.Type.(iceberg.NestedType); ok { + for _, nf := range nested.Fields() { + checkID(nf) + } + } + } + + for _, f := range sc.Fields() { + checkID(f) + } +} + func TestSchemaRoundTrip(t *testing.T) { data, err := json.Marshal(tableSchemaNested) require.NoError(t, err) diff --git a/table/metadata.go b/table/metadata.go index 73021c53..bd4896db 100644 --- a/table/metadata.go +++ b/table/metadata.go @@ -25,6 +25,7 @@ import ( "iter" "maps" "slices" + "strconv" "time" "github.com/apache/iceberg-go" @@ -1029,3 +1030,76 @@ func (m *metadataV2) UnmarshalJSON(b []byte) error { m.preValidate() return m.validate() } + +const DefaultFormatVersion = 2 + +// NewMetadata creates a new table metadata object using the provided schema, information, generating a fresh UUID for +// the new table metadata. By default, this will generate a V2 table metadata, but this can be modified +// by adding a "format-version" property to the props map. An error will be returned if the "format-version" +// property exists and is not a valid version number. +func NewMetadata(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, sortOrder SortOrder, location string, props iceberg.Properties) (Metadata, error) { + return NewMetadataWithUUID(sc, partitions, sortOrder, location, props, uuid.Nil) +} + +// NewMetadataWithUUID is like NewMetadata, but allows the caller to specify the UUID of the table rather than creating a new one. +func NewMetadataWithUUID(sc *iceberg.Schema, partitions *iceberg.PartitionSpec, sortOrder SortOrder, location string, props iceberg.Properties, tableUuid uuid.UUID) (Metadata, error) { + freshSchema, err := iceberg.AssignFreshSchemaIDs(sc, nil) + if err != nil { + return nil, err + } + + freshPartitions, err := iceberg.AssignFreshPartitionSpecIDs(partitions, sc, freshSchema) + if err != nil { + return nil, err + } + + freshSortOrder, err := AssignFreshSortOrderIDs(sortOrder, sc, freshSchema) + if err != nil { + return nil, err + } + + if tableUuid == uuid.Nil { + tableUuid = uuid.New() + } + + formatVersion := DefaultFormatVersion + if props != nil { + verStr, ok := props["format-version"] + if ok { + if formatVersion, err = strconv.Atoi(verStr); err != nil { + formatVersion = DefaultFormatVersion + } + delete(props, "format-version") + } + } + + lastPartitionID := freshPartitions.LastAssignedFieldID() + common := commonMetadata{ + LastUpdatedMS: time.Now().UnixMilli(), + LastColumnId: freshSchema.HighestFieldID(), + FormatVersion: formatVersion, + UUID: tableUuid, + Loc: location, + SchemaList: []*iceberg.Schema{freshSchema}, + CurrentSchemaID: freshSchema.ID, + Specs: []iceberg.PartitionSpec{freshPartitions}, + DefaultSpecID: freshPartitions.ID(), + LastPartitionID: &lastPartitionID, + Props: props, + SortOrderList: []SortOrder{freshSortOrder}, + DefaultSortOrderID: freshSortOrder.OrderID, + } + + switch formatVersion { + case 1: + return &metadataV1{ + commonMetadata: common, + Schema: freshSchema, + Partition: slices.Collect(freshPartitions.Fields()), + }, nil + case 2: + return &metadataV2{commonMetadata: common}, nil + default: + return nil, fmt.Errorf("invalid format version: %d", formatVersion) + } +} diff --git a/table/metadata_internal_test.go b/table/metadata_internal_test.go index a02ac7f4..44aa0372 100644 --- a/table/metadata_internal_test.go +++ b/table/metadata_internal_test.go @@ -491,3 +491,118 @@ func TestV1WriteMetadataToV2(t *testing.T) { assert.NotContains(t, rawData, "schema") assert.NotContains(t, rawData, "partition-spec") } + +func TestNewMetadataWithExplicitV1Format(t *testing.T) { + schema := iceberg.NewSchemaWithIdentifiers(10, + []int{22}, + iceberg.NestedField{ID: 10, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: false}, + iceberg.NestedField{ID: 22, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 33, Name: "baz", Type: iceberg.PrimitiveTypes.Bool, Required: false}, + ) + + partitionSpec := iceberg.NewPartitionSpecID(10, + iceberg.PartitionField{SourceID: 22, FieldID: 1022, Transform: iceberg.IdentityTransform{}, Name: "bar"}) + + sortOrder := SortOrder{ + OrderID: 10, + Fields: []SortField{{ + SourceID: 10, + Transform: iceberg.IdentityTransform{}, + Direction: SortASC, NullOrder: NullsLast}}} + + actual, err := NewMetadata(schema, &partitionSpec, sortOrder, "s3://some_v1_location/", iceberg.Properties{"format-version": "1"}) + require.NoError(t, err) + + expectedSchema := iceberg.NewSchemaWithIdentifiers(0, []int{2}, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 3, Name: "baz", Type: iceberg.PrimitiveTypes.Bool}) + + expectedSpec := iceberg.NewPartitionSpec( + iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform: iceberg.IdentityTransform{}, Name: "bar"}) + + expectedSortOrder := SortOrder{ + OrderID: 1, + Fields: []SortField{{ + SourceID: 1, Transform: iceberg.IdentityTransform{}, + Direction: SortASC, NullOrder: NullsLast}}} + + lastPartitionID := 1000 + expected := &metadataV1{ + commonMetadata: commonMetadata{ + Loc: "s3://some_v1_location/", + UUID: actual.TableUUID(), + LastUpdatedMS: actual.LastUpdatedMillis(), + LastColumnId: 3, + SchemaList: []*iceberg.Schema{expectedSchema}, + CurrentSchemaID: 0, + Specs: []iceberg.PartitionSpec{expectedSpec}, + DefaultSpecID: 0, + LastPartitionID: &lastPartitionID, + SortOrderList: []SortOrder{expectedSortOrder}, + DefaultSortOrderID: 1, + FormatVersion: 1, + }, + Schema: expectedSchema, + Partition: slices.Collect(expectedSpec.Fields()), + } + + assert.Truef(t, expected.Equals(actual), "expected: %s\ngot: %s", expected, actual) +} + +func TestNewMetadataV2Format(t *testing.T) { + schema := iceberg.NewSchemaWithIdentifiers(10, + []int{22}, + iceberg.NestedField{ID: 10, Name: "foo", Type: iceberg.PrimitiveTypes.String, Required: false}, + iceberg.NestedField{ID: 22, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 33, Name: "baz", Type: iceberg.PrimitiveTypes.Bool, Required: false}, + ) + + partitionSpec := iceberg.NewPartitionSpecID(10, + iceberg.PartitionField{SourceID: 22, FieldID: 1022, Transform: iceberg.IdentityTransform{}, Name: "bar"}) + + sortOrder := SortOrder{ + OrderID: 10, + Fields: []SortField{{ + SourceID: 10, + Transform: iceberg.IdentityTransform{}, + Direction: SortASC, NullOrder: NullsLast}}} + + tableUUID := uuid.New() + + actual, err := NewMetadataWithUUID(schema, &partitionSpec, sortOrder, "s3://some_v1_location/", nil, tableUUID) + require.NoError(t, err) + + expectedSchema := iceberg.NewSchemaWithIdentifiers(0, []int{2}, + iceberg.NestedField{ID: 1, Name: "foo", Type: iceberg.PrimitiveTypes.String}, + iceberg.NestedField{ID: 2, Name: "bar", Type: iceberg.PrimitiveTypes.Int32, Required: true}, + iceberg.NestedField{ID: 3, Name: "baz", Type: iceberg.PrimitiveTypes.Bool}) + + expectedSpec := iceberg.NewPartitionSpec( + iceberg.PartitionField{SourceID: 2, FieldID: 1000, Transform: iceberg.IdentityTransform{}, Name: "bar"}) + + expectedSortOrder := SortOrder{ + OrderID: 1, + Fields: []SortField{{ + SourceID: 1, Transform: iceberg.IdentityTransform{}, + Direction: SortASC, NullOrder: NullsLast}}} + + lastPartitionID := 1000 + expected := &metadataV2{ + commonMetadata: commonMetadata{ + Loc: "s3://some_v1_location/", + UUID: tableUUID, + LastUpdatedMS: actual.LastUpdatedMillis(), + LastColumnId: 3, + SchemaList: []*iceberg.Schema{expectedSchema}, + CurrentSchemaID: 0, + Specs: []iceberg.PartitionSpec{expectedSpec}, + DefaultSpecID: 0, + LastPartitionID: &lastPartitionID, + SortOrderList: []SortOrder{expectedSortOrder}, + DefaultSortOrderID: 1, + FormatVersion: 2, + }} + + assert.Truef(t, expected.Equals(actual), "expected: %s\ngot: %s", expected, actual) +} diff --git a/table/sorting.go b/table/sorting.go index 425a92ed..e5104395 100644 --- a/table/sorting.go +++ b/table/sorting.go @@ -175,3 +175,38 @@ func (s *SortOrder) UnmarshalJSON(b []byte) error { return nil } + +// AssignFreshSortOrderIDs updates and reassigns the field source IDs from the old schema +// to the corresponding fields in the fresh schema, while also giving the Sort Order a fresh +// ID of 0 (the initial Sort Order ID). +func AssignFreshSortOrderIDs(sortOrder SortOrder, old, fresh *iceberg.Schema) (SortOrder, error) { + return AssignFreshSortOrderIDsWithID(sortOrder, old, fresh, InitialSortOrderID) +} + +// AssignFreshSortOrderIDsWithID is like AssignFreshSortOrderIDs but allows specifying the id of the +// returned SortOrder. +func AssignFreshSortOrderIDsWithID(sortOrder SortOrder, old, fresh *iceberg.Schema, sortOrderID int) (SortOrder, error) { + if sortOrder.Equals(UnsortedSortOrder) { + return UnsortedSortOrder, nil + } + + fields := make([]SortField, 0, len(sortOrder.Fields)) + for _, field := range sortOrder.Fields { + originalField, ok := old.FindColumnName(field.SourceID) + if !ok { + return SortOrder{}, fmt.Errorf("cannot find source column id %s in old schema", field.String()) + } + freshField, ok := fresh.FindFieldByName(originalField) + if !ok { + return SortOrder{}, fmt.Errorf("cannot find field %s in fresh schema", originalField) + } + + fields = append(fields, SortField{ + SourceID: freshField.ID, + Transform: field.Transform, + Direction: field.Direction, + NullOrder: field.NullOrder, + }) + } + return SortOrder{OrderID: sortOrderID, Fields: fields}, nil +}