Skip to content

Commit

Permalink
aggregated materialized views for big networks
Browse files Browse the repository at this point in the history
  • Loading branch information
angaz committed Mar 19, 2024
1 parent ebad0ff commit bbf93ed
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 1 deletion.
1 change: 1 addition & 0 deletions pkg/database/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (db *DB) Migrate(geoipdb string) error {
migrations.Migrate003GeoIP,
migrations.Migrate004Portal,
migrations.Migrate005PortalStatsViews,
migrations.Migrate007AggregatedStatsViews,
},
map[string]migrationFn{
"insert networks": func(ctx context.Context, tx pgx.Tx) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/database/migrations/002_stats_views.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func createStatsView(
SELECT set_chunk_time_interval(
'%[1]s',
make_interval(secs => %[4]d)
make_interval(secs => %[5]d)
);
`,
tableName,
Expand Down
134 changes: 134 additions & 0 deletions pkg/database/migrations/007_aggregated_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package migrations

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/jackc/pgx/v5"
)

func createAggregatedStatsView(
ctx context.Context,
tx pgx.Tx,
tableName string,
fromTableName string,
networkID uint64,
interval time.Duration,
startOffset time.Duration,
retention time.Duration,
chunkInterval time.Duration,
) error {
_, err := tx.Exec(
ctx,
fmt.Sprintf(
`
CREATE MATERIALIZED VIEW %[1]s
WITH (timescaledb.continuous) AS
SELECT
time_bucket(
make_interval(secs => %[4]d),
%[2]s.bucket
) bucket,
client_name_id,
client_version_id,
fork_id,
next_fork_id,
synced,
dial_success,
sum(total) total
FROM %[2]s
WHERE network_id = %[3]d
GROUP BY
1,
client_name_id,
client_version_id,
fork_id,
next_fork_id,
synced,
dial_success,
total
ORDER BY
bucket ASC
WITH NO DATA;
SELECT add_continuous_aggregate_policy(
'%[1]s',
start_offset => make_interval(secs => %[5]d),
end_offset => INTERVAL '30 minutes',
schedule_interval => make_interval(secs => %[4]d)
);
SELECT add_retention_policy(
'%[1]s',
make_interval(secs => %[6]d)
);
SELECT set_chunk_time_interval(
'%[1]s',
make_interval(secs => %[7]d)
);
`,
tableName,
fromTableName,
networkID,
int(interval.Seconds()),
int(startOffset.Seconds()),
int(retention.Seconds()),
int(chunkInterval.Seconds()),
),
)
if err != nil {
return fmt.Errorf("create materialized view: %w", err)
}

slog.Warn(
"Don't forget to add initial data",
"query", fmt.Sprintf(
"CALL refresh_continuous_aggregate('%s', '%s', INTERVAL '30 minutes');",
tableName,
time.Now().Add(-retention).Format("2006-01-02"),
),
)

return nil
}

func Migrate007AggregatedStatsViews(ctx context.Context, tx pgx.Tx) error {
networkIDs := []uint64{1, 17000, 11155111}

for _, networkID := range networkIDs {
err := createAggregatedStatsView(
ctx,
tx,
fmt.Sprintf("stats.execution_nodes_3h_%d", networkID),
"stats.execution_nodes_3h",
networkID,
3*time.Hour,
9*time.Hour,
14*day,
3*day,
)
if err != nil {
return fmt.Errorf("create view 3 hourly for %d: %w", networkID, err)
}

err = createAggregatedStatsView(
ctx,
tx,
fmt.Sprintf("stats.execution_nodes_24h_%d", networkID),
"stats.execution_nodes_24h",
networkID,
24*time.Hour,
3*day,
32*day,
7*day,
)
if err != nil {
return fmt.Errorf("create view daily for %d: %w", networkID, err)
}
}

return nil
}

0 comments on commit bbf93ed

Please sign in to comment.