Skip to content

Commit 281f2e2

Browse files
authored
Query the current aggregation bucket (#6293)
* graph, graphql: apply default order on the build_order call This makes sure that when the default order is applied the column used in ORDER BY is also included in the SELECT projection * graph, graphql, store: support querying the current bucket * docs: update the aggregation docs * store: add unit tests for select_current_sql generation Add expected SQL constants and check_eqv assertions for select_current_sql in the existing rollup() test function, covering all 5 aggregation types: Stats (with dimensions), TotalStats (no dimensions), OpenClose (first/last), Lifetime (cumulative), and CountOnly (count-only). * store: add GROUP BY timestamp to select_current_bucket Fix select_current_bucket to always include GROUP BY timestamp in the generated SQL, regardless of whether dimensions are present. * store: add store-level tests for root current bucket queries Added 4 test functions exercising the current aggregation bucket feature through the store's find method: current_include (verifies 6 rows with rolled-up + on-the-fly current bucket data), current_exclude (4 rolled-up rows only), current_include_with_filter (dimension filter on TOKEN1), and current_include_cumulative (totalValue cumulative aggregates). Added TestEnv helper methods (aggregation_query, find_aggregation) for constructing EntityQuery with explicit column selection and configurable AggregationCurrent. Adjusted TIMES[3] from minutes(120) to minutes(121) to ensure source data falls past the last_rollup boundary (max_agg_ts + interval + 1s). * store, graphql: extend current bucket support to nested aggregation queries Extend the current aggregation bucket querying to work for nested aggregation fields accessed through parent entities (SingleWindow path). Changes: - Remove forced aggregation_current = None for nested queries in prefetch.rs - Extend find_rollup to handle FilterCollection::SingleWindow - Extend query_window_one_entity to UNION ALL current bucket data with windowed aggregation data, using the dimension/link column to map current bucket rows to parent IDs * store: add store-level tests for nested current bucket queries Added 4 test functions for nested aggregation current bucket queries using EntityCollection::Window (Token parent, Stats_hour child): - nested_current_include: 6 rows with aggregate value verification - nested_current_exclude: 4 rolled-up rows only - nested_current_include_empty_bucket: TOKEN3 with no data returns 0 rows - nested_current_include_count: 3 rows per token Extended SCHEMA with Token entity, changed Data.token and Stats.token from Bytes! to Token!, added insert_entities helper for multi-type entity insertion, and inserted TOKEN1/TOKEN2/TOKEN3 entities in test data. * tests: add runner test for current aggregation bucket end-to-end * docs: update aggregation docs to fully document current bucket feature Documentation does not cover Parent-linked nested query limitation or SELECT * constraint * chore: ignore .pnpm-store directory * store: update instrospection mock * store: fix identation of example SQL They were treated as doctests * store: use inline FILTER comment, do not group by timestamp
1 parent 197dc5a commit 281f2e2

File tree

21 files changed

+3289
-85
lines changed

21 files changed

+3289
-85
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ lcov.info
2525

2626
# Node dependencies
2727
node_modules/
28+
.pnpm-store/
2829

2930
# Docker volumes and debug logs
3031
.postgres

docs/aggregations.md

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,8 +178,8 @@ accepts the following arguments:
178178
dimension
179179
- A mandatory `interval`
180180
- An optional `current` to indicate whether to include the current,
181-
partially filled bucket in the response. Can be either `ignore` (the
182-
default) or `include` (still **TODO** and not implemented)
181+
partially filled bucket in the response. Can be either `exclude` (the
182+
default) or `include`
183183
- Optional `timestamp_{gte|gt|lt|lte|eq|in}` filters to restrict the range
184184
of timestamps to return. The timestamp to filter by must be a string
185185
containing microseconds since the epoch. The value `"1704164640000000"`
@@ -189,7 +189,7 @@ accepts the following arguments:
189189

190190
```graphql
191191
token_stats(interval: "hour",
192-
current: ignore,
192+
current: exclude,
193193
where: {
194194
token: "0x1234",
195195
timestamp_gte: 1234567890,
@@ -201,3 +201,53 @@ token_stats(interval: "hour",
201201
avgVolume
202202
}
203203
```
204+
205+
### Current Bucket
206+
207+
By default, aggregation queries return only completed, rolled-up buckets
208+
(`current: exclude`). These are buckets whose time interval has ended and
209+
whose data has been fully aggregated by `graph-node`'s rollup process.
210+
211+
Setting `current: include` adds an additional, partially filled bucket that
212+
is computed on-the-fly from the unrolled timeseries data that has been
213+
inserted since the last rollup. This current bucket aggregates raw data
214+
points from the source timeseries table that have not yet been rolled up
215+
into the aggregation table. It covers the time period from the end of the
216+
last completed bucket up to the most recent data point.
217+
218+
- `current: exclude` (default) — return only completed, rolled-up buckets
219+
- `current: include` — also return the in-progress bucket computed from
220+
unrolled source data
221+
222+
The current bucket is useful when you need near-real-time aggregation data
223+
without waiting for the next rollup cycle to complete.
224+
225+
#### Nested Aggregation Queries
226+
227+
The `current` argument also works on nested aggregation fields accessed
228+
through a parent entity. For example, if a `Token` entity has a derived
229+
aggregation field `tokenStats`, you can query the current bucket for each
230+
token:
231+
232+
```graphql
233+
{
234+
tokens {
235+
id
236+
name
237+
tokenStats(interval: "hour", current: include) {
238+
timestamp
239+
totalVolume
240+
}
241+
}
242+
}
243+
```
244+
245+
This returns both the completed rolled-up hourly buckets and the current
246+
in-progress bucket for each token's stats.
247+
248+
#### Limitations
249+
250+
Current bucket support for nested aggregation fields is only available when
251+
the field references a single aggregation type. It is not supported when the
252+
aggregation field is accessed through an interface with multiple
253+
implementations.

graph/src/components/store/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,8 @@ pub struct EntityQuery {
466466
pub query_id: Option<String>,
467467

468468
pub trace: bool,
469+
470+
pub aggregation_current: Option<AggregationCurrent>,
469471
}
470472

471473
impl EntityQuery {
@@ -484,6 +486,7 @@ impl EntityQuery {
484486
logger: None,
485487
query_id: None,
486488
trace: false,
489+
aggregation_current: None,
487490
}
488491
}
489492

@@ -543,6 +546,19 @@ impl EntityQuery {
543546
}
544547
}
545548

549+
/// Indicates whether the current, partially filled bucket should be included in the response.
550+
///
551+
/// This is only relevant for aggregation entity queries.
552+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
553+
pub enum AggregationCurrent {
554+
/// Exclude the current, partially filled bucket from the response.
555+
#[default]
556+
Exclude,
557+
558+
/// Include the current, partially filled bucket in the response.
559+
Include,
560+
}
561+
546562
/// Operation types that lead to changes in assignments
547563
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
548564
#[serde(rename_all = "lowercase")]

graph/src/schema/api.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,11 @@ impl FilterOps {
703703
"",
704704
s::Type::NamedType("OrderDirection".to_string()),
705705
),
706+
input_value(
707+
"current",
708+
"",
709+
s::Type::NamedType("Aggregation_current".to_string()),
710+
),
706711
],
707712
};
708713

@@ -2231,6 +2236,8 @@ type Gravatar @entity {
22312236
assert_eq!("Aggregation_interval", interval.value_type.get_base_type());
22322237
let filter = field.argument("where").unwrap();
22332238
assert_eq!(&filter_type, filter.value_type.get_base_type());
2239+
let current = field.argument("current").unwrap();
2240+
assert_eq!("Aggregation_current", current.value_type.get_base_type());
22342241

22352242
let s::TypeDefinition::InputObject(filter) = schema
22362243
.get_type_definition_from_type(&filter.value_type)

graph/src/schema/input/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub mod kw {
5151
pub const INTERVALS: &str = "intervals";
5252
pub const INTERVAL: &str = "interval";
5353
pub const CUMULATIVE: &str = "cumulative";
54+
pub const CURRENT: &str = "current";
5455
}
5556

5657
/// The internal representation of a subgraph schema, i.e., the

graph/src/schema/meta.graphql

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,12 @@ enum Aggregation_interval {
106106
hour
107107
day
108108
}
109+
110+
"Indicates whether the current, partially filled bucket should be included in the response. Defaults to `exclude`"
111+
enum Aggregation_current {
112+
"Exclude the current, partially filled bucket from the response"
113+
exclude
114+
115+
"Include the current, partially filled bucket in the response"
116+
include
117+
}

graphql/src/execution/ast.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::collections::{BTreeSet, HashSet};
22

33
use graph::{
4-
components::store::{AttributeNames, ChildMultiplicity, EntityOrder},
4+
components::store::{AggregationCurrent, AttributeNames, ChildMultiplicity, EntityOrder},
55
data::{graphql::ObjectOrInterface, store::ID},
66
env::ENV_VARS,
77
prelude::{anyhow, q, r, s, QueryExecutionError, ValueMap},
@@ -364,6 +364,26 @@ impl Field {
364364
})
365365
.transpose()
366366
}
367+
368+
pub fn aggregation_current(&self) -> Result<Option<AggregationCurrent>, QueryExecutionError> {
369+
let Some(value) = self.argument_value(kw::CURRENT) else {
370+
return Ok(None);
371+
};
372+
373+
if let r::Value::Enum(current) = value {
374+
match current.as_str() {
375+
"exclude" => return Ok(Some(AggregationCurrent::Exclude)),
376+
"include" => return Ok(Some(AggregationCurrent::Include)),
377+
_ => {}
378+
}
379+
}
380+
381+
Err(QueryExecutionError::InvalidArgumentError(
382+
self.position,
383+
kw::CURRENT.to_string(),
384+
q::Value::from(value.clone()),
385+
))
386+
}
367387
}
368388

369389
impl ValueMap for Field {

graphql/src/store/prefetch.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Run a GraphQL query and fetch all the entitied needed to build the
22
//! final result
33
4+
use graph::components::store::AggregationCurrent;
45
use graph::data::graphql::ObjectTypeExt;
56
use graph::data::query::Trace;
67
use graph::data::store::Id;
@@ -9,7 +10,6 @@ use graph::data::store::IdType;
910
use graph::data::store::QueryObject;
1011
use graph::data::value::{Object, Word};
1112
use graph::prelude::{r, CacheWeight, CheapClone};
12-
use graph::schema::kw;
1313
use graph::schema::AggregationInterval;
1414
use graph::schema::Field;
1515
use graph::slog::warn;
@@ -626,6 +626,7 @@ impl<'a> Loader<'a> {
626626
let child_type = input_schema
627627
.object_or_interface(field_type.field_type.get_base_type(), child_interval)
628628
.expect("we only collect fields that are objects or interfaces");
629+
let aggregation_current = field.aggregation_current()?;
629630

630631
let join = if at_root {
631632
MaybeJoin::Root { child_type }
@@ -644,6 +645,7 @@ impl<'a> Loader<'a> {
644645
let field_type = object_type
645646
.field(&field.name)
646647
.expect("field names are valid");
648+
647649
MaybeJoin::Nested(Join::new(
648650
&input_schema,
649651
object_type.cheap_clone(),
@@ -652,7 +654,10 @@ impl<'a> Loader<'a> {
652654
))
653655
};
654656

655-
match self.fetch(&parents, &join, field).await {
657+
match self
658+
.fetch(&parents, &join, field, aggregation_current)
659+
.await
660+
{
656661
Ok((children, trace)) => {
657662
let exec_fut = Box::pin(self.execute_selection_set(
658663
children,
@@ -696,6 +701,7 @@ impl<'a> Loader<'a> {
696701
parents: &[&mut Node],
697702
join: &MaybeJoin<'_>,
698703
field: &a::Field,
704+
aggregation_current: Option<AggregationCurrent>,
699705
) -> Result<(Vec<Node>, Trace), QueryExecutionError> {
700706
let input_schema = self.resolver.store.input_schema().await?;
701707
let child_type = join.child_type();
@@ -715,11 +721,6 @@ impl<'a> Loader<'a> {
715721
// that causes unnecessary work in the database
716722
query.order = EntityOrder::Unordered;
717723
}
718-
// Apply default timestamp ordering for aggregations if no custom order is specified
719-
if child_type.is_aggregation() && matches!(query.order, EntityOrder::Default) {
720-
let ts = child_type.field(kw::TIMESTAMP).unwrap();
721-
query.order = EntityOrder::Descending(ts.name.to_string(), ts.value_type);
722-
}
723724
query.logger = Some(self.ctx.logger.cheap_clone());
724725
if let Some(r::Value::String(id)) = field.argument_value(ARG_ID) {
725726
query.filter = Some(
@@ -728,6 +729,10 @@ impl<'a> Loader<'a> {
728729
);
729730
}
730731

732+
if child_type.is_aggregation() {
733+
query.aggregation_current = Some(aggregation_current.unwrap_or_default());
734+
}
735+
731736
if let MaybeJoin::Nested(join) = join {
732737
// For anything but the root node, restrict the children we select
733738
// by the parent list

graphql/src/store/query.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use graph::data::value::Object;
1212
use graph::data::value::Value as DataValue;
1313
use graph::prelude::{r, TryFromValue, ENV_VARS};
1414
use graph::schema::ast::{self as sast, FilterOp};
15+
use graph::schema::kw;
1516
use graph::schema::{EntityType, InputSchema, ObjectOrInterface};
1617

1718
use crate::execution::ast as a;
@@ -552,6 +553,14 @@ fn build_order(
552553
}
553554
}
554555
}
556+
// Apply a default ordering to the aggregations so that the most recent buckets are returned first
557+
(None, _) if entity.is_aggregation() => {
558+
let ts = entity
559+
.field(kw::TIMESTAMP)
560+
.expect("aggregation entities have timestamps");
561+
562+
EntityOrder::Descending(ts.name.to_string(), ts.value_type)
563+
}
555564
(None, _) => EntityOrder::Default,
556565
};
557566
Ok(order)

0 commit comments

Comments
 (0)