Skip to content

Refactor/proof generation#398

Open
ilyasymbiotic wants to merge 10 commits intomainfrom
refactor/proof-generation
Open

Refactor/proof generation#398
ilyasymbiotic wants to merge 10 commits intomainfrom
refactor/proof-generation

Conversation

@ilyasymbiotic
Copy link
Collaborator

@ilyasymbiotic ilyasymbiotic commented Mar 19, 2026

PR Type

Enhancement, Tests, Documentation


Description

  • Queue aggregation requests for concurrent workers

  • Add configurable proof catch-up scanning

  • Expose aggregation settings in CLI/config

  • Improve role logging and catch-up tests


Diagram Walkthrough

flowchart LR
  sig["Signature processed message"]
  queue["Aggregation request queue"]
  workers["Concurrent aggregation workers"]
  proof["Proof generation and P2P broadcast"]
  catchup["Periodic missing-proof catch-up"]
  config["Aggregation worker and catch-up config"]
  sig -- "enqueue request ID" --> queue
  queue -- "dispatch work" --> workers
  workers -- "aggregate proofs" --> proof
  catchup -- "retry missing proofs" --> proof
  config -- "controls" --> workers
  config -- "controls" --> catchup
Loading

File Walkthrough

Relevant files
Enhancement
5 files
app.go
Start aggregation workers and catch-up loop                           
+19/-1   
helpers.go
Show validator aggregator and committer roles                       
+12/-2   
aggregator_app.go
Queue requests and scan for missing proofs                             
+171/-36
valset_generator_handle_agg_proof.go
Log committer validator index for debugging                           
+1/-0     
entity.go
Add validator index and typed role checks                               
+12/-2   
Configuration changes
1 files
config.go
Add aggregation worker and catch-up settings                         
+43/-0   
Miscellaneous
3 files
info.go
Update validator printer call signatures                                 
+2/-2     
printers.go
Use `ValidatorSet` directly in network printers                   
+5/-9     
info.go
Pass `ValidatorSet` into operator tree printer                     
+1/-1     
Tests
1 files
aggregator_app_test.go
Test catch-up flow and direct aggregation                               
+186/-9 
Documentation
1 files
example.config.yaml
Document aggregation workers and catch-up options               
+25/-0   

@github-actions
Copy link

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Dropped retries

Aggregation is now fully asynchronous, but worker failures are only logged and never requeued or surfaced to the caller. Transient prover, repository, or P2P errors can therefore drop a live aggregation attempt until the periodic catch-up runs, or indefinitely if catch-up is disabled.

	s.queue.Add(msg.RequestID())
	return nil
}

func (s *AggregatorApp) EnqueueRequestID(ctx context.Context, requestID common.Hash) {
	s.queue.Add(requestID)
	slog.DebugContext(ctx, "Enqueued aggregation request", "requestId", requestID.Hex())
}

func (s *AggregatorApp) HandleAggregationRequests(ctx context.Context, workerCount int) error {
	ctx = log.WithComponent(ctx, "aggregator")

	for i := range workerCount {
		go s.worker(ctx, i+1)
	}

	<-ctx.Done()
	s.queue.ShutDown()
	slog.InfoContext(ctx, "Aggregation workers stopped")
	return nil
}

func (s *AggregatorApp) worker(ctx context.Context, id int) {
	slog.InfoContext(ctx, "Aggregation worker started", "workerId", id)
	for {
		requestID, shutdown := s.queue.Get()
		if shutdown {
			slog.InfoContext(ctx, "Aggregation worker shutting down", "workerId", id)
			return
		}

		func() {
			defer s.queue.Done(requestID)

			if _, err := s.TryAggregateProofForRequestID(ctx, requestID); err != nil {
				slog.ErrorContext(ctx, "Failed to aggregate proof for request",
					"requestId", requestID.Hex(),
					"error", err,
				)
			}
Duplicate proving

The catch-up path calls TryAggregateProofForRequestID directly instead of going through the shared queue. That allows the same request to be processed concurrently by a catch-up cycle and a queue worker, which can duplicate expensive proof generation and rebroadcast the same proof.

proof, err := s.TryAggregateProofForRequestID(ctx, req.RequestID)
requestsChecked++
if err != nil {
	if ctx.Err() != nil {
		return ctx.Err()
	}
	slog.ErrorContext(ctx, "Failed to aggregate proof for request, skipping",
		"requestId", req.RequestID.Hex(),
		"epoch", epoch,
		"error", err,
	)
	continue
}

if len(proof.Proof) > 0 {
	proofsGenerated++
	if catchupCfg.MaxProofsPerCycle > 0 && proofsGenerated >= catchupCfg.MaxProofsPerCycle {
		slog.InfoContext(ctx, "Aggregation catch-up reached max proofs per cycle",
			"requestsChecked", requestsChecked,
			"proofsGenerated", proofsGenerated,
		)
		return nil
	}
Invalid workers

There is no guard against a non-positive workerCount. If the new CLI/config value is set to 0 or a negative number, the app starts no aggregation workers while requests continue to be enqueued, which can silently stall live proof generation and grow memory usage.

func (s *AggregatorApp) HandleAggregationRequests(ctx context.Context, workerCount int) error {
	ctx = log.WithComponent(ctx, "aggregator")

	for i := range workerCount {
		go s.worker(ctx, i+1)
	}

	<-ctx.Done()
	s.queue.ShutDown()
	slog.InfoContext(ctx, "Aggregation workers stopped")
	return nil

@github-actions
Copy link

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate worker count

Reject non-positive workerCount before starting the loop. As written, 0 or negative
values leave the queue running with no workers, so HandleSignatureProcessedMessage
keeps accepting requests that will never be processed.

internal/usecase/aggregator-app/aggregator_app.go [135-146]

 func (s *AggregatorApp) HandleAggregationRequests(ctx context.Context, workerCount int) error {
 	ctx = log.WithComponent(ctx, "aggregator")
+
+	if workerCount <= 0 {
+		return errors.New("aggregation worker count must be greater than zero")
+	}
 
 	for i := range workerCount {
 		go s.worker(ctx, i+1)
 	}
 
 	<-ctx.Done()
 	s.queue.ShutDown()
 	slog.InfoContext(ctx, "Aggregation workers stopped")
 	return nil
 }
Suggestion importance[1-10]: 7

__

Why: HandleAggregationRequests currently allows workerCount <= 0, which can leave s.queue accepting work with no workers ever draining it. This is a valid functional safeguard, although it mainly affects misconfiguration rather than the normal default path.

Medium
General
Honor cancellation during scans

Check ctx.Err() at the top of the epoch/page loops and before each request. Right
now cancellation is only honored on the error path, so shutdown can keep scanning
and enqueueing old work for a long time after the service is supposed to stop.

internal/usecase/aggregator-app/aggregator_app.go [342-357]

 for epoch := scanFrom; ; epoch-- {
+	if ctx.Err() != nil {
+		return ctx.Err()
+	}
+
 	var lastHash common.Hash
 	for {
+		if ctx.Err() != nil {
+			return ctx.Err()
+		}
+
 		requests, err := s.cfg.Repo.GetSignatureRequestsWithoutAggregationProof(ctx, epoch, 10, lastHash)
 		if err != nil {
 			return errors.Errorf("failed to get requests without proof for epoch %d: %w", epoch, err)
 		}
 
 		if len(requests) == 0 {
 			break
 		}
 
 		for _, req := range requests {
+			if ctx.Err() != nil {
+				return ctx.Err()
+			}
 			if !req.KeyTag.Type().AggregationKey() {
 				continue
 			}
Suggestion importance[1-10]: 6

__

Why: tryAggregateRequestsWithoutProof currently only reacts to cancellation indirectly, so long epoch/page scans may continue after shutdown is requested. Adding explicit ctx.Err() checks in these loops is accurate and improves responsiveness without conflicting with the PR's logic.

Low

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f0ebde8912

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

}

err := s.TryAggregateProofForRequestID(ctx, req.RequestID)
proof, err := s.TryAggregateProofForRequestID(ctx, req.RequestID)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Route catch-up aggregation through the deduplicating queue

When aggregation.catchup.enabled is on and proof generation is slow (for example, ZK proofs taking longer than the 1m default interval), the same request can now be aggregated twice in parallel. HandleAggregationRequests is already consuming live requests via the workqueue (cmd/relay/root/app.go:557-569), but catch-up scans the latest epoch and calls TryAggregateProofForRequestID directly here, bypassing that de-duplication/in-flight tracking. Because a request remains visible in GetSignatureRequestsWithoutAggregationProof until the proof is eventually saved, the catch-up loop can start a second expensive proof for a request that a worker is already processing.

Useful? React with 👍 / 👎.

Comment on lines +138 to +139
for i := range workerCount {
go s.worker(ctx, i+1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject non-positive aggregation worker counts

If an operator sets aggregation.worker-count to 0 or a negative value, this loop starts no workers and the app silently stops consuming queued aggregation requests. HandleSignatureProcessedMessage still enqueues request IDs, so live proof generation stalls unless the separate catch-up loop happens to rescue it later. Unlike signal.worker-count, the new aggregation setting is not validated anywhere in cmd/relay/root/config.go, so an invalid config is accepted at startup.

Useful? React with 👍 / 👎.

@github-actions
Copy link

github-actions bot commented Mar 19, 2026

🧪 Test Coverage Report

Coverage: 48.9%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant