Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for sub aggs #48

Merged
merged 11 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ smoke-test: ## smoke tests
@cd internal/smoke-tests && go run . && cd ../..

test: ## run unit tests
@echo "Running unit tests"
@go test -v -count=1 -cover -race ./xata
TEST_DIRECTORY=./xata go run gotest.tools/gotestsum@latest --format testname

integration-test: ## run integration tests
@echo "Running integration test"
@go test -v -count=1 -cover -race ./internal/integration-tests
TEST_DIRECTORY=./internal/integration-tests go run gotest.tools/gotestsum@latest --format testname
$(MAKE) clean-workspaces

download-openapi-specs: ## download openapi specs
Expand Down
90 changes: 90 additions & 0 deletions internal/integration-tests/search_filter_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,36 @@ func Test_searchAndFilterClient(t *testing.T) {
}, time.Second*10, time.Second)
})

t.Run("aggregate table date histogram with nested aggregations", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
BranchRequestOptional: xata.BranchRequestOptional{
DatabaseName: xata.String(cfg.databaseName),
},
TableName: cfg.tableName,
Payload: xata.AggregateTableRequestPayload{
Aggregations: xata.AggExpressionMap{
"histogram": xata.NewDateHistogramAggExpression(xata.DateHistogramAgg{
Column: dateTimeColumn,
Interval: xata.String("1d"),
CalendarInterval: nil,
Timezone: nil,
Aggs: &xata.NestedAggsMap{
"max": xata.NewMaxAggExpression(integerColumn),
"avg": xata.NewAverageAggExpression(integerColumn),
},
}),
},
},
})
assert.NoError(t, err)
if (*aggTableRes.Aggs)["histogram"] != nil && (*aggTableRes.Aggs)["histogram"].AggResponseValues != nil {
return len((*aggTableRes.Aggs)["histogram"].AggResponseValues.Values.AggResponseValuesValuesItemList) > 0
}
return false
}, time.Second*10, time.Second)
})

t.Run("aggregate table top values", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
Expand All @@ -619,6 +649,34 @@ func Test_searchAndFilterClient(t *testing.T) {
}, time.Second*10, time.Second)
})

t.Run("aggregate table top values with nested aggs", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
BranchRequestOptional: xata.BranchRequestOptional{
DatabaseName: xata.String(cfg.databaseName),
},
TableName: cfg.tableName,
Payload: xata.AggregateTableRequestPayload{
Aggregations: xata.AggExpressionMap{
"top_values": xata.NewTopValuesAggExpression(xata.TopValuesAgg{
Column: stringColumn,
Size: nil,
Aggs: &xata.NestedAggsMap{
"max": xata.NewMaxAggExpression(integerColumn),
"avg": xata.NewAverageAggExpression(integerColumn),
},
}),
},
},
})
assert.NoError(t, err)
if (*aggTableRes.Aggs)["top_values"] != nil && (*aggTableRes.Aggs)["top_values"].AggResponseValues != nil {
return len((*aggTableRes.Aggs)["top_values"].AggResponseValues.Values.AggResponseValuesValuesItemList) > 0
}
return false
}, time.Second*10, time.Second)
})

t.Run("aggregate table numeric histogram", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
Expand All @@ -643,4 +701,36 @@ func Test_searchAndFilterClient(t *testing.T) {
return false
}, time.Second*10, time.Second)
})

t.Run("aggregate table numeric histogram with nested aggregations", func(t *testing.T) {
assert.Eventually(t, func() bool {
aggTableRes, err := searchFilterCli.Aggregate(ctx, xata.AggregateTableRequest{
BranchRequestOptional: xata.BranchRequestOptional{
DatabaseName: xata.String(cfg.databaseName),
},
TableName: cfg.tableName,
Payload: xata.AggregateTableRequestPayload{
Aggregations: xata.AggExpressionMap{
"num_histogram": xata.NewNumericHistogramAggExpression(xata.NumericHistogramAgg{
Column: integerColumn,
Interval: 1.0,
Offset: nil,
Aggs: &xata.NestedAggsMap{
"max": xata.NewMaxAggExpression(integerColumn),
"top_values": xata.NewTopValuesAggExpression(xata.TopValuesAgg{
Column: stringColumn,
Size: nil,
}),
},
}),
},
},
})
assert.NoError(t, err)
if (*aggTableRes.Aggs)["num_histogram"] != nil && (*aggTableRes.Aggs)["num_histogram"].AggResponseValues != nil {
return len((*aggTableRes.Aggs)["num_histogram"].AggResponseValues.Values.AggResponseValuesValuesItemList) > 0
}
return false
}, time.Second*10, time.Second)
})
}
29 changes: 10 additions & 19 deletions internal/integration-tests/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,21 @@ func setupDatabase() (*config, error) {
if !found {
return nil, fmt.Errorf("%s not found in env vars", "XATA_API_KEY")
}
// require workspace ID to come from the env var
// instead of creating new workspace on each client
wsID, found := os.LookupEnv("XATA_WORKSPACE_ID")
if !found {
return nil, fmt.Errorf("%s not found in env vars", "XATA_WORKSPACE_ID")
}

testID := testIdentifier()

cfg := &config{
apiKey: apiKey,
testID: testID,
wsID: wsID,
httpCli: retryablehttp.NewClient().StandardClient(),
}

workspaceCli, err := xata.NewWorkspacesClient(
xata.WithAPIKey(cfg.apiKey),
xata.WithHTTPClient(cfg.httpCli),
)
if err != nil {
return nil, err
}

ws, err := workspaceCli.Create(ctx, &xata.WorkspaceMeta{Name: "ws" + testID})
if err != nil {
return nil, err
}

cfg.wsID = ws.Id

databaseCli, err := xata.NewDatabasesClient(
xata.WithAPIKey(cfg.apiKey),
xata.WithHTTPClient(cfg.httpCli),
Expand All @@ -63,7 +54,7 @@ func setupDatabase() (*config, error) {
return nil, err
}

listRegionsResponse, err := databaseCli.GetRegionsWithWorkspaceID(ctx, ws.Id)
listRegionsResponse, err := databaseCli.GetRegionsWithWorkspaceID(ctx, cfg.wsID)
if err != nil {
return nil, err
}
Expand All @@ -77,8 +68,8 @@ func setupDatabase() (*config, error) {
)

db, err := databaseCli.Create(ctx, xata.CreateDatabaseRequest{
DatabaseName: "db" + testID,
WorkspaceID: xata.String(ws.Id),
DatabaseName: "db" + cfg.testID,
WorkspaceID: xata.String(cfg.wsID),
Region: &cfg.region,
UI: &xata.UI{Color: xata.String("RED")},
BranchMetaData: &xata.BranchMetadata{
Expand Down
85 changes: 84 additions & 1 deletion xata/search_filter_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,8 @@ func Test_searchAndFilterCli_Summarize(t *testing.T) {
}
}

func Test_searchAndFilterCli_Aggregate(t *testing.T) {
// Agg: Date Histogram
func Test_searchAndFilterCli_Aggregate_DateHistogram(t *testing.T) {
assert := assert.New(t)

type tc struct {
Expand Down Expand Up @@ -779,3 +780,85 @@ func Test_searchAndFilterCli_Aggregate(t *testing.T) {
})
}
}

// Agg: TopValues
// with nested aggregations
// https://github.com/xataio/xata-go/issues/47
func Test_searchAndFilterCli_Aggregate_TopValues_With_NestedAggs(t *testing.T) {
assert := assert.New(t)

type tc struct {
name string
want *xatagenworkspace.AggregateTableResponse
statusCode int
apiErr *xatagencore.APIError
}

aggRes := map[string]*xatagenworkspace.AggResponse{
"test": xatagenworkspace.NewAggResponseFromDoubleOptional(xata.Float64(2)),
}

tests := []tc{
{
name: "should aggregate a table with nested aggregation",
want: &xatagenworkspace.AggregateTableResponse{Aggs: &aggRes},
statusCode: http.StatusOK,
},
}

for _, eTC := range errTestCasesWorkspace {
tests = append(tests, tc{
name: eTC.name,
statusCode: eTC.statusCode,
apiErr: eTC.apiErr,
})
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testSrv := testService(t, http.MethodPost, "/db", tt.statusCode, tt.apiErr != nil, tt.want)

cli, err := xata.NewSearchAndFilterClient(
xata.WithBaseURL(testSrv.URL),
xata.WithAPIKey("test-key"),
)
assert.NoError(err)
assert.NotNil(cli)

got, err := cli.Aggregate(
context.TODO(),
xata.AggregateTableRequest{
BranchRequestOptional: xata.BranchRequestOptional{
DatabaseName: xata.String("my-db"),
},
TableName: "my-table",
Payload: xata.AggregateTableRequestPayload{
Aggregations: xata.AggExpressionMap{
"topKey": xata.NewTopValuesAggExpression(xata.TopValuesAgg{
Column: "my-column",
Size: xata.Int(5),
Aggs: &xata.NestedAggsMap{
"nestedKeyOne": xata.NewAverageAggExpression("my-sub-col_a"),
"nestedKeyTwo": xata.NewMaxAggExpression("my-sub-col_b"),
},
}),
},
},
},
)

if tt.apiErr != nil {
errAPI := tt.apiErr.Unwrap()
if errAPI == nil {
t.Fatal("expected error but got nil")
}
assert.ErrorAs(err, &errAPI)
assert.Equal(err.Error(), tt.apiErr.Error())
assert.Nil(got)
} else {
assert.Equal(tt.want, got)
assert.NoError(err)
}
})
}
}
11 changes: 11 additions & 0 deletions xata/search_filter_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ type AggExpression *xatagenworkspace.AggExpression

type AggExpressionMap = map[string]AggExpression

type NestedAggsMap = map[string]*xatagenworkspace.AggExpression

type AggExpressionCount xatagenworkspace.AggExpressionCount

type CountAggFilter struct {
Expand Down Expand Up @@ -298,6 +300,8 @@ const (

// Split data into buckets by a datetime column. Accepts sub-aggregations for each bucket.
type DateHistogramAgg struct {
// Nested Aggregations
Aggs *NestedAggsMap `json:"aggs,omitempty"`
// The column to use for bucketing. Must be of type datetime.
Column string
// The fixed interval to use when bucketing.
Expand All @@ -315,6 +319,7 @@ type DateHistogramAgg struct {
func NewDateHistogramAggExpression(value DateHistogramAgg) *xatagenworkspace.AggExpression {
return xatagenworkspace.NewAggExpressionFromAggExpressionDateHistogram(&xatagenworkspace.AggExpressionDateHistogram{
DateHistogram: &xatagenworkspace.DateHistogramAgg{
Aggs: value.Aggs,
Column: value.Column,
Interval: value.Interval,
CalendarInterval: (*xatagenworkspace.DateHistogramAggCalendarInterval)(value.CalendarInterval),
Expand All @@ -326,6 +331,8 @@ func NewDateHistogramAggExpression(value DateHistogramAgg) *xatagenworkspace.Agg
// Split data into buckets by the unique values in a column. Accepts sub-aggregations for each bucket.
// The top values as ordered by the number of records (`$count`) are returned.
type TopValuesAgg struct {
// Nested Aggregations
Aggs *NestedAggsMap `json:"aggs,omitempty"`
// The column to use for bucketing. Accepted types are `string`, `email`, `int`, `float`, or `bool`.
Column string
// The maximum number of unique values to return.
Expand All @@ -337,12 +344,15 @@ func NewTopValuesAggExpression(value TopValuesAgg) *xatagenworkspace.AggExpressi
TopValues: &xatagenworkspace.TopValuesAgg{
Column: value.Column,
Size: value.Size,
Aggs: value.Aggs,
},
})
}

// Split data into buckets by dynamic numeric ranges. Accepts sub-aggregations for each bucket.
type NumericHistogramAgg struct {
// Nested Aggregations
Aggs *NestedAggsMap `json:"aggs,omitempty"`
// The column to use for bucketing. Must be of numeric type.
Column string
// The numeric interval to use for bucketing. The resulting buckets will be ranges
Expand All @@ -357,6 +367,7 @@ type NumericHistogramAgg struct {

func NewNumericHistogramAggExpression(value NumericHistogramAgg) *xatagenworkspace.AggExpression {
return xatagenworkspace.NewAggExpressionFromAggExpressionNumericHistogram(&xatagenworkspace.AggExpressionNumericHistogram{NumericHistogram: &xatagenworkspace.NumericHistogramAgg{
Aggs: value.Aggs,
Column: value.Column,
Offset: value.Offset,
Interval: value.Interval,
Expand Down
1 change: 1 addition & 0 deletions xata/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
personalAPIKeyLocation = "~/.config/xata/key"
defaultControlPlaneDomain = "api.xata.io"
xataAPIKeyEnvVar = "XATA_API_KEY"
xataWsIDEnvVar = "XATA_WORKSPACE_ID" // TODO: not in use yet
dbURLFormat = "https://{workspace_id}.{region}.xata.sh/db/{db_name}:{branch_name}"
defaultBranchName = "main"
configFileName = ".xatarc"
Expand Down
Loading