Skip to content

Commit

Permalink
v0.13.0 (#1560)
Browse files Browse the repository at this point in the history
  • Loading branch information
MauroToscano authored Jan 2, 2025
2 parents 96458a3 + 306f72d commit e7c447a
Show file tree
Hide file tree
Showing 136 changed files with 6,358 additions and 2,263 deletions.
56 changes: 39 additions & 17 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CONFIG_FILE?=config-files/config.yaml
export OPERATOR_ADDRESS ?= $(shell yq -r '.operator.address' $(CONFIG_FILE))
AGG_CONFIG_FILE?=config-files/config-aggregator.yaml

OPERATOR_VERSION=v0.12.2
OPERATOR_VERSION=v0.13.0

ifeq ($(OS),Linux)
BUILD_ALL_FFI = $(MAKE) build_all_ffi_linux
Expand Down Expand Up @@ -557,7 +557,13 @@ generate_groth16_ineq_proof: ## Run the gnark_plonk_bn254_script
@go run scripts/test_files/gnark_groth16_bn254_infinite_script/cmd/main.go 1

__METRICS__:
# Prometheus and graphana
# Prometheus and Grafana
metrics_remove_containers:
@docker stop prometheus grafana
@docker rm prometheus grafana
metrics_clean_db: metrics_remove_containers
@docker volume rm aligned_layer_grafana_data aligned_layer_prometheus_data

run_metrics: ## Run metrics using metrics-docker-compose.yaml
@echo "Running metrics..."
@docker compose -f metrics-docker-compose.yaml up
Expand Down Expand Up @@ -604,6 +610,16 @@ upgrade_add_aggregator: ## Add Aggregator to Aligned Contracts
@echo "Adding Aggregator to Aligned Contracts..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_add_aggregator_to_service_manager.sh

set_aggregator_address:
@echo "Setting Aggregator Address in Aligned Service Manager Contract on $(NETWORK) network..."
@echo "Aggregator address: $(AGGREGATOR_ADDRESS)"
@. contracts/scripts/.env.$(NETWORK) && . contracts/scripts/set_aggregator_address.sh $(AGGREGATOR_ADDRESS)

set_aggregator_address_devnet:
@echo "Setting Aggregator Address in Aligned Service Manager Contract..."
@echo "Aggregator address: $(AGGREGATOR_ADDRESS)"
RPC_URL="http://localhost:8545" PRIVATE_KEY="0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80" OUTPUT_PATH=./script/output/devnet/alignedlayer_deployment_output.json ./contracts/scripts/set_aggregator_address.sh $(AGGREGATOR_ADDRESS)

upgrade_initialize_disabled_verifiers:
@echo "Adding disabled verifiers to Aligned Service Manager..."
@. contracts/scripts/.env && . contracts/scripts/upgrade_disabled_verifiers_in_service_manager.sh
Expand Down Expand Up @@ -906,7 +922,7 @@ docker_down:
@echo "Everything down"
docker ps

DOCKER_BURST_SIZE=2
DOCKER_BURST_SIZE=1
DOCKER_PROOFS_PRIVATE_KEY=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80

docker_batcher_send_sp1_burst:
Expand All @@ -918,7 +934,8 @@ docker_batcher_send_sp1_burst:
--vm_program ./scripts/test_files/sp1/sp1_fibonacci.elf \
--repetitions $(DOCKER_BURST_SIZE) \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--rpc_url $(DOCKER_RPC_URL)
--rpc_url $(DOCKER_RPC_URL) \
--max_fee 0.1ether

docker_batcher_send_risc0_burst:
@echo "Sending Risc0 fibonacci task to Batcher..."
Expand All @@ -930,7 +947,8 @@ docker_batcher_send_risc0_burst:
--public_input ./scripts/test_files/risc_zero/fibonacci_proof_generator/risc_zero_fibonacci.pub \
--repetitions $(DOCKER_BURST_SIZE) \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--rpc_url $(DOCKER_RPC_URL)
--rpc_url $(DOCKER_RPC_URL) \
--max_fee 0.1ether

docker_batcher_send_plonk_bn254_burst:
@echo "Sending Groth16Bn254 1!=0 task to Batcher..."
Expand All @@ -942,7 +960,8 @@ docker_batcher_send_plonk_bn254_burst:
--vk ./scripts/test_files/gnark_plonk_bn254_script/plonk.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--rpc_url $(DOCKER_RPC_URL) \
--repetitions $(DOCKER_BURST_SIZE)
--repetitions $(DOCKER_BURST_SIZE) \
--max_fee 0.1ether

docker_batcher_send_plonk_bls12_381_burst:
@echo "Sending Groth16 BLS12-381 1!=0 task to Batcher..."
Expand All @@ -954,19 +973,21 @@ docker_batcher_send_plonk_bls12_381_burst:
--vk ./scripts/test_files/gnark_plonk_bls12_381_script/plonk.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--repetitions $(DOCKER_BURST_SIZE) \
--rpc_url $(DOCKER_RPC_URL)
--rpc_url $(DOCKER_RPC_URL) \
--max_fee 0.1ether

docker_batcher_send_groth16_burst:
@echo "Sending Groth16 BLS12-381 1!=0 task to Batcher..."
docker exec $(shell docker ps | grep batcher | awk '{print $$1}') aligned submit \
--private_key $(DOCKER_PROOFS_PRIVATE_KEY) \
--proving_system Groth16Bn254 \
--proof ./scripts/test_files/gnark_groth16_bn254_script/groth16.proof \
--public_input ./scripts/test_files/gnark_groth16_bn254_script/plonk_pub_input.pub \
--vk ./scripts/test_files/gnark_groth16_bn254_script/groth16.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--repetitions $(DOCKER_BURST_SIZE) \
--rpc_url $(DOCKER_RPC_URL)
--private_key $(DOCKER_PROOFS_PRIVATE_KEY) \
--proving_system Groth16Bn254 \
--proof ./scripts/test_files/gnark_groth16_bn254_script/groth16.proof \
--public_input ./scripts/test_files/gnark_groth16_bn254_script/plonk_pub_input.pub \
--vk ./scripts/test_files/gnark_groth16_bn254_script/groth16.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS) \
--repetitions $(DOCKER_BURST_SIZE) \
--rpc_url $(DOCKER_RPC_URL) \
--max_fee 0.1ether

# Update target as new proofs are supported.
docker_batcher_send_all_proofs_burst:
Expand All @@ -993,6 +1014,7 @@ docker_batcher_send_infinite_groth16:
--public_input scripts/test_files/gnark_groth16_bn254_infinite_script/infinite_proofs/ineq_$${counter}_groth16.pub \
--vk scripts/test_files/gnark_groth16_bn254_infinite_script/infinite_proofs/ineq_$${counter}_groth16.vk \
--proof_generator_addr $(PROOF_GENERATOR_ADDRESS); \
--max_fee 0.1ether
sleep $${timer}; \
counter=$$((counter + 1)); \
done \
Expand All @@ -1010,7 +1032,7 @@ docker_verify_proofs_onchain:
done \
'

DOCKER_PROOFS_WAIT_TIME=30
DOCKER_PROOFS_WAIT_TIME=60

docker_verify_proof_submission_success:
@echo "Verifying proofs were successfully submitted..."
Expand All @@ -1032,7 +1054,7 @@ docker_verify_proof_submission_success:
fi; \
echo "---------------------------------------------------------------------------------------------------"; \
done; \
if [ $$(ls -1 ./aligned_verification_data/*.cbor | wc -l) -ne 10 ]; then \
if [ $$(ls -1 ./aligned_verification_data/*.cbor | wc -l) -ne 5 ]; then \
echo "ERROR: Some proofs were verified successfully, but some proofs are missing in the aligned_verification_data/ directory"; \
exit 1; \
fi; \
Expand Down
30 changes: 24 additions & 6 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Aggregator struct {
// Stores the TaskResponse for each batch by batchIdentifierHash
batchDataByIdentifierHash map[[32]byte]BatchData

// Stores the start time for each batch of the aggregator by task index
batchStartTimeByIdx map[uint32]time.Time

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
Expand All @@ -78,6 +81,7 @@ type Aggregator struct {
// - batchCreatedBlockByIdx
// - batchDataByIdentifierHash
// - nextBatchIndex
// - batchStartTimeByIdx
taskMutex *sync.Mutex

// Mutex to protect ethereum wallet
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash := make(map[[32]byte]uint32)
batchDataByIdentifierHash := make(map[[32]byte]BatchData)
batchCreatedBlockByIdx := make(map[uint32]uint64)
batchStartTimeByIdx := make(map[uint32]time.Time)

chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
Expand Down Expand Up @@ -172,6 +177,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
batchDataByIdentifierHash: batchDataByIdentifierHash,
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
batchStartTimeByIdx: batchStartTimeByIdx,
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},
walletMutex: &sync.Mutex{},
Expand Down Expand Up @@ -233,6 +239,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
taskCreatedAt := agg.batchStartTimeByIdx[blsAggServiceResp.TaskIndex]
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data")

Expand Down Expand Up @@ -266,6 +273,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA

agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot)

// Only observe quorum reached if successful
agg.metrics.ObserveTaskQuorumReached(time.Since(taskCreatedAt))

agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

Expand All @@ -285,10 +295,12 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
if err == nil {
// In some cases, we may fail to retrieve the receipt for the transaction.
txHash := "Unknown"
effectiveGasPrice := "Unknown"
if receipt != nil {
txHash = receipt.TxHash.String()
effectiveGasPrice = receipt.EffectiveGasPrice.String()
}
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash)
agg.telemetry.TaskSentToEthereum(batchData.BatchMerkleRoot, txHash, effectiveGasPrice)
agg.logger.Info("Aggregator successfully responded to task",
"taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))
Expand Down Expand Up @@ -316,10 +328,11 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))

// This function is a callback that is called when the gas price is bumped on the avsWriter.SendAggregatedResponse
onGasPriceBumped := func(bumpedGasPrice *big.Int) {
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
onSetGasPrice := func(gasPrice *big.Int) {
agg.telemetry.TaskSetGasPrice(batchMerkleRoot, gasPrice.String())
}

startTime := time.Now()
receipt, err := agg.avsWriter.SendAggregatedResponse(
batchIdentifierHash,
batchMerkleRoot,
Expand All @@ -329,15 +342,18 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
agg.AggregatorConfig.Aggregator.GasBumpIncrementalPercentage,
agg.AggregatorConfig.Aggregator.GasBumpPercentageLimit,
agg.AggregatorConfig.Aggregator.TimeToWaitBeforeBump,
onGasPriceBumped,
agg.metrics,
onSetGasPrice,
)
if err != nil {
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
agg.telemetry.LogTaskError(batchMerkleRoot, err)
return nil, err
}

// We only send the latency metric if the response is successul
agg.metrics.ObserveLatencyForRespondToTask(time.Since(startTime))

agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))

Expand Down Expand Up @@ -383,6 +399,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.batchStartTimeByIdx[batchIndex] = time.Now()
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
Expand Down Expand Up @@ -447,6 +464,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
delete(agg.batchStartTimeByIdx, i)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
Expand Down
28 changes: 15 additions & 13 deletions aggregator/pkg/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ type TaskErrorMessage struct {
TaskError string `json:"error"`
}

type TaskGasPriceBumpMessage struct {
MerkleRoot string `json:"merkle_root"`
BumpedGasPrice string `json:"bumped_gas_price"`
type TaskSetGasPriceMessage struct {
MerkleRoot string `json:"merkle_root"`
GasPrice string `json:"gas_price"`
}

type TaskSentToEthereumMessage struct {
MerkleRoot string `json:"merkle_root"`
TxHash string `json:"tx_hash"`
MerkleRoot string `json:"merkle_root"`
TxHash string `json:"tx_hash"`
EffectiveGasPrice string `json:"effective_gas_price"`
}

type Telemetry struct {
Expand Down Expand Up @@ -101,20 +102,21 @@ func (t *Telemetry) LogTaskError(batchMerkleRoot [32]byte, taskError error) {
}
}

func (t *Telemetry) BumpedTaskGasPrice(batchMerkleRoot [32]byte, bumpedGasPrice string) {
body := TaskGasPriceBumpMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
BumpedGasPrice: bumpedGasPrice,
func (t *Telemetry) TaskSetGasPrice(batchMerkleRoot [32]byte, gasPrice string) {
body := TaskSetGasPriceMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
GasPrice: gasPrice,
}
if err := t.sendTelemetryMessage("/api/aggregatorTaskGasPriceBump", body); err != nil {
if err := t.sendTelemetryMessage("/api/aggregatorTaskSetGasPrice", body); err != nil {
t.logger.Warn("[Telemetry] Error in LogOperatorResponse", "error", err)
}
}

func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string) {
func (t *Telemetry) TaskSentToEthereum(batchMerkleRoot [32]byte, txHash string, effectiveGasPrice string) {
body := TaskSentToEthereumMessage{
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
TxHash: txHash,
MerkleRoot: fmt.Sprintf("0x%s", hex.EncodeToString(batchMerkleRoot[:])),
TxHash: txHash,
EffectiveGasPrice: effectiveGasPrice,
}
if err := t.sendTelemetryMessage("/api/aggregatorTaskSent", body); err != nil {
t.logger.Warn("[Telemetry] Error in TaskSentToEthereum", "error", err)
Expand Down
Loading

0 comments on commit e7c447a

Please sign in to comment.