From f5b768db09f620ef17a785d20343b4c83045f3a5 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Fri, 25 Oct 2024 15:19:34 +0700 Subject: [PATCH] seg: split `segments` field - on 2 fields guarded by 2 different mutexes (#12380) FYI: Next step will be: replace `visibleMutex` by `atomic.Pointer` --- README.md | 355 ++++++++-------- .../bls_to_execution_change_service_test.go | 4 + erigon-lib/downloader/snaptype/type.go | 2 + .../snapshotsync/freezeblocks/block_reader.go | 38 +- .../freezeblocks/block_snapshots.go | 397 +++++++----------- .../freezeblocks/block_snapshots_test.go | 69 ++- .../freezeblocks/bor_snapshots.go | 12 +- .../freezeblocks/caplin_snapshots.go | 102 ++--- 8 files changed, 443 insertions(+), 536 deletions(-) diff --git a/README.md b/README.md index 04db750445f..28cd3700d6e 100644 --- a/README.md +++ b/README.md @@ -12,99 +12,114 @@ by default. +- [Erigon](#erigon) - [System Requirements](#system-requirements) - [Usage](#usage) - + [Getting Started](#getting-started) - + [Logging](#logging) - + [Testnets](#testnets) - + [Block Production](#block-production-pow-miner-or-pos-validator) - + [Windows](#windows) - + [GoDoc](https://godoc.org/github.com/erigontech/erigon) - + [Beacon Chain](#beacon-chain-consensus-layer) - + [Dev Chain](#dev-chain) - + [Caplin (Internal Consensus Layer)](#caplin) - + - [Getting Started](#getting-started) + - [Datadir structure](#datadir-structure) + - [History on cheap disk](#history-on-cheap-disk) + - [Erigon3 datadir size](#erigon3-datadir-size) + - [Erigon3 changes from Erigon2](#erigon3-changes-from-erigon2) + - [Logging](#logging) + - [Modularity](#modularity) + - [Embedded Consensus Layer](#embedded-consensus-layer) + - [Testnets](#testnets) + - [Block Production (PoS Validator)](#block-production-pos-validator) + - [Windows](#windows) + - [Using TOML or YAML Config Files](#using-toml-or-yaml-config-files) + - [Example](#example) + - [TOML](#toml) + - [YAML](#yaml) + - [Beacon Chain (Consensus Layer)](#beacon-chain-consensus-layer) + - [Caplin](#caplin) + - [Caplin's Usage.](#caplins-usage) + - [Multiple Instances / One Machine](#multiple-instances--one-machine) + - [Dev Chain](#dev-chain) - [Key features](#key-features) - + [More Efficient State Storage](#more-efficient-state-storage) - + [Faster Initial Sync](#faster-initial-sync) - + [JSON-RPC daemon](#json-rpc-daemon) - + [Run all components by docker-compose](#run-all-components-by-docker-compose) - + [Grafana dashboard](#grafana-dashboard) - + [Internal Consensus Layer](#caplin) + - [More Efficient State Storage](#more-efficient-state-storage) + - [Faster Initial Sync](#faster-initial-sync) + - [JSON-RPC daemon](#json-rpc-daemon) + - [**For remote DB**](#for-remote-db) + - [**gRPC ports**](#grpc-ports) + - [Run all components by docker-compose](#run-all-components-by-docker-compose) + - [Optional: Setup dedicated user](#optional-setup-dedicated-user) + - [Environment Variables](#environment-variables) + - [Check: Permissions](#check-permissions) + - [Run](#run) + - [Grafana dashboard](#grafana-dashboard) + - [](#) - [Documentation](#documentation) - [FAQ](#faq) + - [How much RAM do I need](#how-much-ram-do-i-need) + - [Default Ports and Firewalls](#default-ports-and-firewalls) + - [`erigon` ports](#erigon-ports) + - [`caplin` ports](#caplin-ports) + - [`beaconAPI` ports](#beaconapi-ports) + - [`shared` ports](#shared-ports) + - [`other` ports](#other-ports) + - [Hetzner expecting strict firewall rules](#hetzner-expecting-strict-firewall-rules) + - [How to run erigon as a separate user? (e.g. as a + `systemd` daemon)](#how-to-run-erigon-as-a-separate-user-eg-as-a-systemd-daemon) + - [How to get diagnostic for bug report?](#how-to-get-diagnostic-for-bug-report) + - [How to run local devnet?](#how-to-run-local-devnet) + - [Docker permissions error](#docker-permissions-error) + - [How to run public RPC api](#how-to-run-public-rpc-api) + - [Run RaspberyPI](#run-raspberypi) + - [How to change db pagesize](#how-to-change-db-pagesize) - [Getting in touch](#getting-in-touch) - + [Erigon Discord Server](#erigon-discord-server) - + [Reporting security issues/concerns](#reporting-security-issues/concerns) - + [Team](#team) + - [Erigon Discord Server](#erigon-discord-server) + - [Reporting security issues/concerns](#reporting-security-issuesconcerns) - [Known issues](#known-issues) - + [`htop` shows incorrect memory usage](#htop-shows-incorrect-memory-usage) + - [`htop` shows incorrect memory usage](#htop-shows-incorrect-memory-usage) + - [Blocks Execution is slow on cloud-network-drives](#blocks-execution-is-slow-on-cloud-network-drives) + - [Filesystem's background features are expensive](#filesystems-background-features-are-expensive) + - [Gnome Tracker can kill Erigon](#gnome-tracker-can-kill-erigon) + - [the --mount option requires BuildKit error](#the---mount-option-requires-buildkit-error) + - [Erigon3 perf tricks](#erigon3-perf-tricks) -**Disclaimer**: this software is currently a tech preview. We will do our best to keep it stable and make no breaking -changes but we don't guarantee anything. Things can and will break. - -**Important defaults**: Erigon is an Archive Node by default (to remove history see: `--prune` flags -in `erigon --help`). We don't allow change this flag after first start. +**Important defaults**: Erigon is an Archive Node by default: use `--prune.mode` if need make it smaller (not allowed to +change after first start) In-depth links are marked by the microscope sign (🔬) System Requirements =================== -* For an Archive node of Ethereum Mainnet we recommend >=3.5TB storage space: 2.3TiB state (as of March 2024), - 643GiB snapshots (can symlink or mount folder `/snapshots` to another disk), 200GB temp files (can symlink or - mount folder `/temp` to another disk). - Ethereum Mainnet Full node (see [Pruned Node][pruned_node]): 1.1TiB not including temp files (June 2024). - -* Gnosis Chain Archive: 1.7TiB (March 2024). - Gnosis Chain Full node (see [Pruned Node][pruned_node]): 300GiB (June 2024). +RAM: >=32GB, [Golang >= 1.22](https://golang.org/doc/install); GCC 10+ or Clang; On Linux: kernel > v4. 64-bit +architecture. -* Polygon Mainnet Archive: 8.5TiB (December 2023). - Polygon Mainnet Full node (see [Pruned Node][pruned_node]) with `--prune.*.older 15768000`: 5.1Tb (September 2023). +- ArchiveNode Ethereum Mainnet: 2TB (April 2024). FullNode: 1.1TB (June 2024) +- ArchiveNode Gnosis: 1.7TB (March 2024). FullNode: 300GB (June 2024) +- ArchiveNode Polygon Mainnet: 4.1TB (April 2024). FullNode: 2Tb (April 2024) SSD or NVMe. Do not recommend HDD - on HDD Erigon will always stay N blocks behind chain tip, but not fall behind. -Bear in mind that SSD performance deteriorates when close to capacity. +Bear in mind that SSD performance deteriorates when close to capacity. CloudDrives (like gp3) - are high-latency not +very good for Erigon. -RAM: >=16GB, 64-bit architecture. +🔬 More details on [Erigon3 datadir size](#erigon3-datadir-size) -[Golang version >= 1.22](https://golang.org/doc/install); GCC 10+ or Clang; On Linux: kernel > v4 - -🔬 more details on disk storage [here](https://erigon.substack.com/p/disk-footprint-changes-in-new-erigon?s=r) -and [here](https://ledgerwatch.github.io/turbo_geth_release.html#Disk-space). - -[pruned_node]: https://erigon.gitbook.io/erigon/basic-usage/usage/type-of-node#full-node-or-pruned-node +🔬 More details on what type of data stored [here](https://ledgerwatch.github.io/turbo_geth_release.html#Disk-space) Usage ===== ### Getting Started -For building the latest release (this will be suitable for most users just wanting to run a node): - -```sh -git clone --branch release/ --single-branch https://github.com/erigontech/erigon.git -cd erigon -make erigon -./build/bin/erigon -``` - -You can check [the list of releases](https://github.com/erigontech/erigon/releases) for release notes. +[Release Notes and Binaries](https://github.com/erigontech/erigon/releases) -For building the bleeding edge development branch: +Build latest release (this will be suitable for most users just wanting to run a node): ```sh -git clone --recurse-submodules https://github.com/erigontech/erigon.git +git clone --branch release/ --single-branch https://github.com/erigontech/erigon.git cd erigon -git checkout main make erigon ./build/bin/erigon ``` -Default `--snapshots` for `mainnet`, `gnosis`, `chiado`. Other networks now have default `--snapshots=false`. -Increase -download speed by flag `--torrent.download.rate=20mb`. 🔬 See [Downloader docs](./cmd/downloader/readme.md) +Increase download speed by `--torrent.download.rate=20mb`. 🔬 +See [Downloader docs](./cmd/downloader/readme.md) Use `--datadir` to choose where to store data. @@ -117,12 +132,95 @@ Running `make help` will list and describe the convenience commands available in ### Datadir structure -- chaindata: recent blocks, state, recent state history. low-latency disk recommended. -- snapshots: old blocks, old state history. can symlink/mount it to cheaper disk. mostly immutable. must have ~100gb - free space (for merge recent files to bigger one). -- temp: can grow to ~100gb, but usually empty. can symlink/mount it to cheaper disk. -- txpool: pending transactions. safe to remove. -- nodes: p2p peers. safe to remove. +```sh +datadir + chaindata # "Recently-updated Latest State", "Recent History", "Recent Blocks" + snapshots # contains `.seg` files - it's old blocks + domain # Latest State + history # Historical values + idx # InvertedIndices: can search/filtering/union/intersect them - to find historical data. like eth_getLogs or trace_transaction + accessors # Additional (generated) indices of history - have "random-touch" read-pattern. They can serve only `Get` requests (no search/filters). + txpool # pending transactions. safe to remove. + nodes # p2p peers. safe to remove. + temp # used to sort data bigger than RAM. can grow to ~100gb. cleaned at startup. + +# There is 4 domains: account, storage, code, commitment +``` + +### History on cheap disk + +If you can afford store datadir on 1 nvme-raid - great. If can't - it's possible to store history on cheap drive. + +```sh +# place (or ln -s) `datadir` on slow disk. link some sub-folders to fast (low-latency) disk. +# Example: what need link to fast disk to speedup execution +datadir + chaindata # link to fast disk + snapshots + domain # link to fast disk + history + idx + accessors + temp # buffers to sort data >> RAM. sequential-buffered IO - is slow-disk-friendly + +# Example: how to speedup history access: +# - go step-by-step - first try store `accessors` on fast disk +# - if speed is not good enough: `idx` +# - if still not enough: `history` +``` + +### Erigon3 datadir size + +```sh +# eth-mainnet - archive - April 2024 + +du -hsc /erigon/* +6G /erigon/caplin +50G /erigon/chaindata +1.8T /erigon/snapshots +1.9T total + +du -hsc /erigon/snapshots/* +100G /erigon/snapshots/accessor +240G /erigon/snapshots/domain +260G /erigon/snapshots/history +410G /erigon/snapshots/idx +1.7T /erigon/snapshots +``` + +```sh +# bor-mainnet - archive - Jun 2024 + +du -hsc /erigon/* + +160M /erigon/bor +50G /erigon/chaindata +3.7T /erigon/snapshots +3.8T total + +du -hsc /erigon/snapshots/* +260G /erigon-data/snapshots/accessor +850G /erigon-data/snapshots/domain +650G /erigon-data/snapshots/history +1.4T /erigon-data/snapshots/idx +4.1T /erigon/snapshots +``` + +### Erigon3 changes from Erigon2 + +- Sync from scratch doesn't require re-exec all history. Latest state and it's history are in snapshots - can download. +- ExecutionStage - now including many E2 stages: stage_hash_state, stage_trie, stage_log_index, stage_history_index, + stage_trace_index +- E3 can execute 1 historical transaction - without executing it's block - because history/indices have + transaction-granularity, instead of block-granularity. +- E3 doesn't store Logs (aka Receipts) - it always re-executing historical txn (but it's cheaper then in E2 - see point + above). +- `--sync.loop.block.limit` is enabled by default. (Default: `5_000`. + Set `--sync.loop.block.limit=10_000 --batchSize=2g` to increase sync speed on good hardware). +- datadir/chaindata is small now - to prevent it's grow: we recommend set `--batchSize <= 2G`. And it's fine to + `rm -rf chaindata` +- can symlink/mount latest state to fast drive and history to cheap drive +- Archive Node is default. Full Node: `--prune.mode=full`, Minimal Node (EIP-4444): `--prune.mode=minimal` ### Logging @@ -168,26 +266,23 @@ How to start Erigon's services as separated processes, see in [docker-compose.ym ### Embedded Consensus Layer -On Ethereum Mainnet and Sepolia, the Engine API can be disabled in favour of the Erigon native Embedded -Consensus Layer. -If you want to use the internal Consensus Layer, run Erigon with flag `--internalcl`. -_Warning:_ Staking (block production) is not possible with the embedded CL. +Built-in consensus for Ethereum Mainnet, Sepolia, Holesky, Gnosis. +To use external Consensus Layer: `--externalcl`. ### Testnets -If you would like to give Erigon a try, but do not have spare 2TB on your drive, a good option is to start syncing one -of the public testnets, Sepolia. It syncs much quicker, and does not take so much disk space: +If you would like to give Erigon a try: a good option is to start syncing one of the public testnets, Holesky (or Amoy). +It syncs much quicker, and does not take so much disk space: ```sh -git clone --recurse-submodules -j8 https://github.com/erigontech/erigon.git +git clone https://github.com/erigontech/erigon.git cd erigon make erigon -./build/bin/erigon --datadir= --chain=sepolia +./build/bin/erigon --datadir= --chain=holesky --prune.mode=full ``` -Please note the `--datadir` option that allows you to store Erigon files in a non-default location, in this example, -in `sepolia` subdirectory of the current directory. Name of the directory `--datadir` does not have to match the name of -the chain in `--chain`. +Please note the `--datadir` option that allows you to store Erigon files in a non-default location. Name of the +directory `--datadir` does not have to match the name of the chain in `--chain`. ### Block Production (PoS Validator) @@ -752,113 +847,7 @@ XDG_DATA_HOME=/preferred/data/folder DOCKER_BUILDKIT=1 COMPOSE_DOCKER_CLI_BUILD= --------- -## Erigon3 user's guide - -Git branch `main`. Just start erigon as you usually do. - -RAM requirement is higher: 32gb and better 64gb. We will work on this topic a bit later. - -Golang 1.22 - -Almost all RPC methods are implemented - if something doesn't work - just drop it on our head. - -Supported networks: all. - -### E3 changes from E2: - -- Sync from scratch doesn't require re-exec all history. Latest state and it's history are in snapshots - can download. -- ExecutionStage - now including many E2 stages: stage_hash_state, stage_trie, stage_log_index, stage_history_index, - stage_trace_index -- E3 can execute 1 historical transaction - without executing it's block - because history/indices have - transaction-granularity, instead of block-granularity. -- E3 doesn't store Logs (aka Receipts) - it always re-executing historical txn (but it's cheaper then in E2 - see point - above). Known perf issues: https://github.com/erigontech/erigon/issues/10747 -- `--sync.loop.block.limit` is enabled by default. (Default: `5_000`. - Set `--sync.loop.block.limit=10_000 --batchSize=2g` to increase sync speed on good hardware). -- datadir/chaindata is small now - to prevent it's grow: we recommend set `--batchSize <= 2G`. And it's fine - to `rm -rf chaindata` -- can symlink/mount latest state to fast drive and history to cheap drive -- Archive Node is default. Full Node: `--prune.mode=full`, Minimal Node (EIP-4444): `--prune.mode=minimal` - -### Known Problems of E3: - -- don't `rm -rf downloader` - it will cause re-downloading of files: https://github.com/erigontech/erigon/issues/10976 - -### E3 datadir structure - -```sh -datadir - chaindata # "Recently-updated Latest State" and "Recent History" - snapshots - domain # Latest State: link to fast disk - history # Historical values - idx # InvertedIndices: can search/filtering/union/intersect them - to find historical data. like eth_getLogs or trace_transaction - accessors # Additional (generated) indices of history - have "random-touch" read-pattern. They can serve only `Get` requests (no search/filters). - temp # buffers to sort data >> RAM. sequential-buffered IO - is slow-disk-friendly - -# There is 4 domains: account, storage, code, commitment -``` - -### E3 can store state on fast disk and history on cheap disk - -If you can afford store datadir on 1 nvme-raid - great. If can't - it's possible to store history on cheap drive. - -```sh -# place (or ln -s) `datadir` on slow disk. link some sub-folders to fast disk. -# Example: what need link to fast disk to speedup execution -datadir - chaindata # link to fast disk - snapshots - domain # link to fast disk - history - idx - accessors - temp - -# Example: how to speedup history access: -# - go step-by-step - first try store `accessors` on fast disk -# - if speed is not good enough: `idx` -# - if still not enough: `history` -``` - -### E3 datadir size - -``` -# eth-mainnet - archive - April 2024 - -du -hsc /erigon/* -6G /erigon/caplin -50G /erigon/chaindata -1.8T /erigon/snapshots -1.9T total - -du -hsc /erigon/snapshots/* -100G /erigon/snapshots/accessor -240G /erigon/snapshots/domain -260G /erigon/snapshots/history -410G /erigon/snapshots/idx -1.7T /erigon/snapshots -``` - -``` -# bor-mainnet - archive - Jun 2024 - -du -hsc /erigon/* - -160M /erigon/bor -50G /erigon/chaindata -3.7T /erigon/snapshots -3.8T total - -du -hsc /erigon/snapshots/* -260G /erigon-data/snapshots/accessor -850G /erigon-data/snapshots/domain -650G /erigon-data/snapshots/history -1.4T /erigon-data/snapshots/idx -4.1T /erigon/snapshots -``` - -### E3 other perf tricks +### Erigon3 perf tricks - `--sync.loop.block.limit=10_000 --batchSize=2g` - likely will help for sync speed. - on cloud-drives (good throughput, bad latency) - can enable OS's brain to pre-fetch: `SNAPSHOT_MADV_RND=false` @@ -872,4 +861,4 @@ ls /mnt/erigon/snapshots/domain/*.kv | parallel vmtouch -vdlw # if it failing with "can't allocate memory", try: sync && sudo sysctl vm.drop_caches=3 echo 1 > /proc/sys/vm/compact_memory -``` +``` \ No newline at end of file diff --git a/cl/phase1/network/services/bls_to_execution_change_service_test.go b/cl/phase1/network/services/bls_to_execution_change_service_test.go index d6314b44241..becf3807829 100644 --- a/cl/phase1/network/services/bls_to_execution_change_service_test.go +++ b/cl/phase1/network/services/bls_to_execution_change_service_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "log" + "runtime" "testing" "time" @@ -231,5 +232,8 @@ func (t *blsToExecutionChangeTestSuite) TestProcessMessage() { } func TestBlsToExecutionChangeTestSuite(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("fix me on win please") + } suite.Run(t, new(blsToExecutionChangeTestSuite)) } diff --git a/erigon-lib/downloader/snaptype/type.go b/erigon-lib/downloader/snaptype/type.go index 966a67ddf78..adc33604e02 100644 --- a/erigon-lib/downloader/snaptype/type.go +++ b/erigon-lib/downloader/snaptype/type.go @@ -339,6 +339,8 @@ const MinCoreEnum = 1 const MinBorEnum = 4 const MinCaplinEnum = 8 +const MaxEnum = 11 + var CaplinEnums = struct { Enums BeaconBlocks, diff --git a/turbo/snapshotsync/freezeblocks/block_reader.go b/turbo/snapshotsync/freezeblocks/block_reader.go index a6a25bb9179..dd2b50e099e 100644 --- a/turbo/snapshotsync/freezeblocks/block_reader.go +++ b/turbo/snapshotsync/freezeblocks/block_reader.go @@ -527,7 +527,7 @@ func (r *BlockReader) HeaderByHash(ctx context.Context, tx kv.Getter, hash commo defer segmentRotx.Close() buf := make([]byte, 128) - segments := segmentRotx.VisibleSegments + segments := segmentRotx.segments for i := len(segments) - 1; i >= 0; i-- { h, err = r.headerFromSnapshotByHash(hash, segments[i], buf) if err != nil { @@ -851,7 +851,7 @@ func (r *BlockReader) blockWithSenders(ctx context.Context, tx kv.Getter, hash c txnSeg, ok, release := r.sn.ViewSingleFile(coresnaptype.Transactions, blockHeight) if !ok { if dbgLogs { - log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable(), "r.sn.indicesReady", r.sn.indicesReady.Load()) + log.Info(dbgPrefix+"no transactions file for this block num", "r.sn.BlocksAvailable()", r.sn.BlocksAvailable()) } return } @@ -1170,7 +1170,7 @@ func (r *BlockReader) TxnLookup(_ context.Context, tx kv.Getter, txnHash common. txns := r.sn.ViewType(coresnaptype.Transactions) defer txns.Close() - _, blockNum, ok, err := r.txnByHash(txnHash, txns.VisibleSegments, nil) + _, blockNum, ok, err := r.txnByHash(txnHash, txns.segments, nil) if err != nil { return 0, false, err } @@ -1355,7 +1355,7 @@ func (r *BlockReader) EventLookup(ctx context.Context, tx kv.Getter, txnHash com segs := r.borSn.ViewType(borsnaptype.BorEvents) defer segs.Close() - blockNum, ok, err := r.borBlockByEventHash(txnHash, segs.VisibleSegments, nil) + blockNum, ok, err := r.borBlockByEventHash(txnHash, segs.segments, nil) if err != nil { return 0, false, err } @@ -1414,8 +1414,8 @@ func (r *BlockReader) BorStartEventID(ctx context.Context, tx kv.Tx, hash common segments := r.borSn.ViewType(borsnaptype.BorEvents) defer segments.Close() - for i := len(segments.VisibleSegments) - 1; i >= 0; i-- { - sn := segments.VisibleSegments[i] + for i := len(segments.segments) - 1; i >= 0; i-- { + sn := segments.segments[i] if sn.from > blockHeight { continue } @@ -1494,8 +1494,8 @@ func (r *BlockReader) EventsByBlock(ctx context.Context, tx kv.Tx, hash common.H var buf []byte result := []rlp.RawValue{} - for i := len(segments.VisibleSegments) - 1; i >= 0; i-- { - sn := segments.VisibleSegments[i] + for i := len(segments.segments) - 1; i >= 0; i-- { + sn := segments.segments[i] if sn.from > blockHeight { continue } @@ -1536,7 +1536,7 @@ func (r *BlockReader) EventsByIdFromSnapshot(from uint64, to time.Time, limit in var result []*heimdall.EventRecordWithTime maxTime := false - for _, sn := range segments.VisibleSegments { + for _, sn := range segments.segments { idxBorTxnHash := sn.src.Index() if idxBorTxnHash == nil || idxBorTxnHash.KeyCount() == 0 { @@ -1609,12 +1609,12 @@ func (r *BlockReader) LastFrozenEventId() uint64 { segments := r.borSn.ViewType(borsnaptype.BorEvents) defer segments.Close() - if len(segments.VisibleSegments) == 0 { + if len(segments.segments) == 0 { return 0 } // find the last segment which has a built index var lastSegment *VisibleSegment - visibleSegments := segments.VisibleSegments + visibleSegments := segments.segments for i := len(visibleSegments) - 1; i >= 0; i-- { if visibleSegments[i].src.Index() != nil { gg := visibleSegments[i].src.MakeGetter() @@ -1648,7 +1648,7 @@ func (r *BlockReader) LastFrozenEventBlockNum() uint64 { segmentsRotx := r.borSn.ViewType(borsnaptype.BorEvents) defer segmentsRotx.Close() - segments := segmentsRotx.VisibleSegments + segments := segmentsRotx.segments if len(segments) == 0 { return 0 } @@ -1710,14 +1710,14 @@ func (r *BlockReader) LastFrozenSpanId() uint64 { segments := r.borSn.ViewType(borsnaptype.BorSpans) defer segments.Close() - if len(segments.VisibleSegments) == 0 { + if len(segments.segments) == 0 { return 0 } // find the last segment which has a built index var lastSegment *VisibleSegment - for i := len(segments.VisibleSegments) - 1; i >= 0; i-- { - if segments.VisibleSegments[i].src.Index() != nil { - lastSegment = segments.VisibleSegments[i] + for i := len(segments.segments) - 1; i >= 0; i-- { + if segments.segments[i].src.Index() != nil { + lastSegment = segments.segments[i] break } } @@ -1754,7 +1754,7 @@ func (r *BlockReader) Span(ctx context.Context, tx kv.Getter, spanId uint64) ([] segmentsRotx := r.borSn.ViewType(borsnaptype.BorSpans) defer segmentsRotx.Close() - segments := segmentsRotx.VisibleSegments + segments := segmentsRotx.segments for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] idx := sn.src.Index() @@ -1863,7 +1863,7 @@ func (r *BlockReader) Checkpoint(ctx context.Context, tx kv.Getter, checkpointId segmentsRotx := r.borSn.ViewType(borsnaptype.BorCheckpoints) defer segmentsRotx.Close() - segments := segmentsRotx.VisibleSegments + segments := segmentsRotx.segments for i := len(segments) - 1; i >= 0; i-- { sn := segments[i] index := sn.src.Index() @@ -1895,7 +1895,7 @@ func (r *BlockReader) LastFrozenCheckpointId() uint64 { defer segmentsRotx.Close() - segments := segmentsRotx.VisibleSegments + segments := segmentsRotx.segments if len(segments) == 0 { return 0 } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 0593d8453e7..c4b5f54e511 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -421,42 +421,23 @@ func (sn *DirtySegment) mappedTxnSnapshot() *silkworm.MappedTxnSnapshot { // transaction_hash -> transactions_segment_offset // transaction_hash -> block_number -type segments struct { - DirtySegments *btree.BTreeG[*DirtySegment] - VisibleSegments []*VisibleSegment - maxVisibleBlock atomic.Uint64 -} - -func (s *segments) View(f func(segments []*VisibleSegment) error) error { - return f(s.VisibleSegments) -} +type VisibleSegments []*VisibleSegment -// no caller yet -func (s *segments) Segment(blockNum uint64, f func(*VisibleSegment) error) (found bool, err error) { - for _, seg := range s.VisibleSegments { - if !(blockNum >= seg.from && blockNum < seg.to) { - continue - } - return true, f(seg) - } - return false, nil -} - -func (s *segments) BeginRotx() *segmentsRotx { - for _, seg := range s.VisibleSegments { +func (s VisibleSegments) BeginRotx() *segmentsRotx { + for _, seg := range s { if !seg.src.frozen { seg.src.refcount.Add(1) } } - return &segmentsRotx{segments: s, VisibleSegments: s.VisibleSegments} + return &segmentsRotx{segments: s} } func (s *segmentsRotx) Close() { - if s == nil || s.VisibleSegments == nil { + if s == nil || s.segments == nil { return } - VisibleSegments := s.VisibleSegments - s.VisibleSegments = nil + VisibleSegments := s.segments + s.segments = nil for i := range VisibleSegments { src := VisibleSegments[i].src @@ -471,20 +452,20 @@ func (s *segmentsRotx) Close() { } type segmentsRotx struct { - segments *segments - VisibleSegments []*VisibleSegment + segments []*VisibleSegment } type RoSnapshots struct { - indicesReady atomic.Bool segmentsReady atomic.Bool - types []snaptype.Type + types []snaptype.Type //immutable + enums []snaptype.Enum //immutable - dirtySegmentsLock sync.RWMutex - visibleSegmentsLock sync.RWMutex + dirtyLock sync.RWMutex // guards `dirty` field + dirty []*btree.BTreeG[*DirtySegment] // ordered map `type.Enum()` -> DirtySegments - segments btree.Map[snaptype.Enum, *segments] + visibleLock sync.RWMutex // guards `visible` field + visible []VisibleSegments // ordered map `type.Enum()` -> VisbileSegments dir string segmentsMax atomic.Uint64 // all types of .seg files are available - up to this number @@ -506,14 +487,17 @@ func NewRoSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, segmentsMin ui } func newRoSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, types []snaptype.Type, segmentsMin uint64, logger log.Logger) *RoSnapshots { - var segs btree.Map[snaptype.Enum, *segments] + enums := make([]snaptype.Enum, len(types)) + for i, t := range types { + enums[i] = t.Enum() + } + s := &RoSnapshots{dir: snapDir, cfg: cfg, logger: logger, + types: types, enums: enums, + dirty: make([]*btree.BTreeG[*DirtySegment], snaptype.MaxEnum), + } for _, snapType := range types { - segs.Set(snapType.Enum(), &segments{ - DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), - }) + s.dirty[snapType.Enum()] = btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}) } - - s := &RoSnapshots{dir: snapDir, cfg: cfg, segments: segs, logger: logger, types: types} s.segmentsMin.Store(segmentsMin) s.recalcVisibleFiles() @@ -523,7 +507,6 @@ func newRoSnapshots(cfg ethconfig.BlocksFreezing, snapDir string, types []snapty func (s *RoSnapshots) Cfg() ethconfig.BlocksFreezing { return s.cfg } func (s *RoSnapshots) Dir() string { return s.dir } func (s *RoSnapshots) SegmentsReady() bool { return s.segmentsReady.Load() } -func (s *RoSnapshots) IndicesReady() bool { return s.indicesReady.Load() } func (s *RoSnapshots) IndicesMax() uint64 { return s.idxMax.Load() } func (s *RoSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() } func (s *RoSnapshots) SegmentsMin() uint64 { return s.segmentsMin.Load() } @@ -552,8 +535,8 @@ func (s *RoSnapshots) EnsureExpectedBlocksAreAvailable(cfg *snapcfg.Cfg) error { func (s *RoSnapshots) Types() []snaptype.Type { return s.types } func (s *RoSnapshots) HasType(in snaptype.Type) bool { - for _, t := range s.types { - if t.Enum() == in.Enum() { + for _, t := range s.enums { + if t == in.Enum() { return true } } @@ -565,12 +548,11 @@ func (s *RoSnapshots) DisableReadAhead() *RoSnapshots { v := s.View() defer v.Close() - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - for _, sn := range value.VisibleSegments { + for _, t := range s.enums { + for _, sn := range v.segments[t].segments { sn.src.DisableReadAhead() } - return true - }) + } return s } @@ -579,12 +561,11 @@ func (s *RoSnapshots) EnableReadAhead() *RoSnapshots { v := s.View() defer v.Close() - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - for _, sn := range value.VisibleSegments { + for _, t := range s.enums { + for _, sn := range v.segments[t].segments { sn.src.EnableReadAhead() } - return true - }) + } return s } @@ -593,30 +574,30 @@ func (s *RoSnapshots) EnableMadvWillNeed() *RoSnapshots { v := s.View() defer v.Close() - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - for _, sn := range value.VisibleSegments { + for _, t := range s.enums { + for _, sn := range v.segments[t].segments { sn.src.EnableMadvWillNeed() } - return true - }) + } return s } func (s *RoSnapshots) recalcVisibleFiles() { defer func() { s.idxMax.Store(s.idxAvailability()) - s.indicesReady.Store(true) }() - s.visibleSegmentsLock.Lock() - defer s.visibleSegmentsLock.Unlock() + s.visibleLock.Lock() + defer s.visibleLock.Unlock() - s.dirtySegmentsLock.RLock() - defer s.dirtySegmentsLock.RUnlock() + s.dirtyLock.RLock() + defer s.dirtyLock.RUnlock() - var maxVisibleBlocks []uint64 - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - dirtySegments := value.DirtySegments + visible := make([]VisibleSegments, snaptype.MaxEnum) // create new pointer - only new readers will see it. old-alive readers will continue use previous pointer + + maxVisibleBlocks := make([]uint64, 0, len(s.types)) + for _, t := range s.enums { + dirtySegments := s.dirty[t] newVisibleSegments := make([]*VisibleSegment, 0, dirtySegments.Len()) dirtySegments.Walk(func(segs []*DirtySegment) bool { for _, seg := range segs { @@ -654,31 +635,31 @@ func (s *RoSnapshots) recalcVisibleFiles() { } } - value.VisibleSegments = newVisibleSegments + visible[t] = newVisibleSegments var to uint64 if len(newVisibleSegments) > 0 { to = newVisibleSegments[len(newVisibleSegments)-1].to - 1 } maxVisibleBlocks = append(maxVisibleBlocks, to) - return true - }) + } // all types must have same hight minMaxVisibleBlock := slices.Min(maxVisibleBlocks) - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { + for _, t := range s.enums { if minMaxVisibleBlock == 0 { - value.VisibleSegments = []*VisibleSegment{} + visible[t] = []*VisibleSegment{} } else { - for i, seg := range value.VisibleSegments { + visibleSegmentsOfType := visible[t] + for i, seg := range visibleSegmentsOfType { if seg.to > minMaxVisibleBlock+1 { - value.VisibleSegments = value.VisibleSegments[:i] + visible[t] = visibleSegmentsOfType[:i] break } } } - value.maxVisibleBlock.Store(minMaxVisibleBlock) - return true - }) + } + + s.visible = visible } // minimax of existing indices @@ -690,17 +671,15 @@ func (s *RoSnapshots) idxAvailability() uint64 { // 4. user can manually remove all .idx files of given type: `rm snapshots/*type1*.idx` // 5. file-types may have different height: 10 headers, 10 bodies, 9 transactions (for example if `kill -9` came during files building/merge). still need index all 3 types. - var maxIdx uint64 - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - if !s.HasType(segtype.Type()) { - return true - } - if len(value.VisibleSegments) > 0 { - maxIdx = value.VisibleSegments[len(value.VisibleSegments)-1].to - 1 - } - return false // all types of visible-segments have the same height. stop here - }) + if len(s.types) == 0 { + return 0 + } + var maxIdx uint64 + visible := s.visible[s.types[0].Enum()] + if len(visible) > 0 { + maxIdx = visible[len(visible)-1].to - 1 + } return maxIdx } @@ -708,39 +687,37 @@ func (s *RoSnapshots) LS() { view := s.View() defer view.Close() - view.VisibleSegments.Scan(func(segtype snaptype.Enum, value *segmentsRotx) bool { - for _, seg := range value.VisibleSegments { + for _, t := range s.enums { + for _, seg := range s.visible[t] { if seg.src.Decompressor == nil { continue } log.Info("[snapshots] ", "f", seg.src.Decompressor.FileName(), "from", seg.from, "to", seg.to) } - return true - }) + } } func (s *RoSnapshots) Files() (list []string) { view := s.View() defer view.Close() - view.VisibleSegments.Scan(func(segtype snaptype.Enum, value *segmentsRotx) bool { - for _, seg := range value.VisibleSegments { + for _, t := range s.enums { + for _, seg := range s.visible[t] { list = append(list, seg.src.FileName()) } - return true - }) + } return } func (s *RoSnapshots) OpenFiles() (list []string) { - s.dirtySegmentsLock.RLock() - defer s.dirtySegmentsLock.RUnlock() + s.dirtyLock.RLock() + defer s.dirtyLock.RUnlock() log.Warn("[dbg] OpenFiles") defer log.Warn("[dbg] OpenFiles end") - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.DirtySegments.Walk(func(segs []*DirtySegment) bool { + for _, t := range s.types { + s.dirty[t.Enum()].Walk(func(segs []*DirtySegment) bool { for _, seg := range segs { if seg.Decompressor == nil { continue @@ -749,8 +726,7 @@ func (s *RoSnapshots) OpenFiles() (list []string) { } return true }) - return true - }) + } return list } @@ -759,8 +735,8 @@ func (s *RoSnapshots) OpenFiles() (list []string) { func (s *RoSnapshots) OpenList(fileNames []string, optimistic bool) error { defer s.recalcVisibleFiles() - s.dirtySegmentsLock.Lock() - defer s.dirtySegmentsLock.Unlock() + s.dirtyLock.Lock() + defer s.dirtyLock.Unlock() s.closeWhatNotInList(fileNames) if err := s.openSegments(fileNames, true, optimistic); err != nil { @@ -772,8 +748,8 @@ func (s *RoSnapshots) OpenList(fileNames []string, optimistic bool) error { func (s *RoSnapshots) InitSegments(fileNames []string) error { defer s.recalcVisibleFiles() - s.dirtySegmentsLock.Lock() - defer s.dirtySegmentsLock.Unlock() + s.dirtyLock.Lock() + defer s.dirtyLock.Unlock() s.closeWhatNotInList(fileNames) if err := s.openSegments(fileNames, false, true); err != nil { @@ -795,17 +771,15 @@ func (s *RoSnapshots) openSegments(fileNames []string, open bool, optimistic boo continue } - segtype, ok := s.segments.Get(f.Type.Enum()) - if !ok { - segtype = &segments{ - DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), - } - s.segments.Set(f.Type.Enum(), segtype) + segtype := s.dirty[f.Type.Enum()] + if segtype == nil { + log.Debug("[snapshot] rebuildSegments: unknown type", "t", f.Type.Enum().String()) + continue } var sn *DirtySegment var exists bool - segtype.DirtySegments.Walk(func(segs []*DirtySegment) bool { + segtype.Walk(func(segs []*DirtySegment) bool { for _, sn2 := range segs { if sn2.Decompressor == nil { // it's ok if some segment was not able to open continue @@ -843,7 +817,7 @@ func (s *RoSnapshots) openSegments(fileNames []string, open bool, optimistic boo if !exists { // it's possible to iterate over .seg file even if you don't have index // then make segment available even if index open may fail - segtype.DirtySegments.Set(sn) + segtype.Set(sn) } if open { @@ -876,8 +850,8 @@ func (s *RoSnapshots) OptimisticalyOpenFolder() { _ = s.OpenFolder() } func (s *RoSnapshots) OpenFolder() error { defer s.recalcVisibleFiles() - s.dirtySegmentsLock.Lock() - defer s.dirtySegmentsLock.Unlock() + s.dirtyLock.Lock() + defer s.dirtyLock.Unlock() files, _, err := typedSegments(s.dir, s.Types(), false) if err != nil { @@ -899,8 +873,8 @@ func (s *RoSnapshots) OpenFolder() error { func (s *RoSnapshots) OpenSegments(types []snaptype.Type, allowGaps bool) error { defer s.recalcVisibleFiles() - s.dirtySegmentsLock.Lock() - defer s.dirtySegmentsLock.Unlock() + s.dirtyLock.Lock() + defer s.dirtyLock.Unlock() files, _, err := typedSegments(s.dir, types, allowGaps) @@ -926,8 +900,8 @@ func (s *RoSnapshots) Close() { // defer to preserve lock order defer s.recalcVisibleFiles() - s.dirtySegmentsLock.Lock() - defer s.dirtySegmentsLock.Unlock() + s.dirtyLock.Lock() + defer s.dirtyLock.Unlock() s.closeWhatNotInList(nil) } @@ -938,27 +912,27 @@ func (s *RoSnapshots) closeWhatNotInList(l []string) { protectFiles[f] = struct{}{} } toClose := make(map[snaptype.Enum][]*DirtySegment, 0) - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.DirtySegments.Walk(func(segs []*DirtySegment) bool { + for _, t := range s.enums { + s.dirty[t].Walk(func(segs []*DirtySegment) bool { + for _, seg := range segs { if _, ok := protectFiles[seg.FileName()]; ok { continue } if _, ok := toClose[seg.segType.Enum()]; !ok { - toClose[segtype] = make([]*DirtySegment, 0) + toClose[t] = make([]*DirtySegment, 0) } - toClose[segtype] = append(toClose[segtype], seg) + toClose[t] = append(toClose[t], seg) } return true }) - return true - }) + } for segtype, delSegments := range toClose { - segs, _ := s.segments.Get(segtype) + dirtyFiles := s.dirty[segtype] for _, delSeg := range delSegments { delSeg.close() - segs.DirtySegments.Delete(delSeg) + dirtyFiles.Delete(delSeg) } } } @@ -1015,17 +989,17 @@ func (s *RoSnapshots) buildMissedIndicesIfNeed(ctx context.Context, logPrefix st } func (s *RoSnapshots) delete(fileName string) error { - s.dirtySegmentsLock.Lock() - defer s.dirtySegmentsLock.Unlock() + s.dirtyLock.Lock() + defer s.dirtyLock.Unlock() var err error var delSeg *DirtySegment var dirtySegments *btree.BTreeG[*DirtySegment] _, fName := filepath.Split(fileName) - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { + for _, t := range s.enums { findDelSeg := false - value.DirtySegments.Walk(func(segs []*DirtySegment) bool { + s.dirty[t].Walk(func(segs []*DirtySegment) bool { for _, sn := range segs { if sn.Decompressor == nil { continue @@ -1038,14 +1012,16 @@ func (s *RoSnapshots) delete(fileName string) error { sn.closeAndRemoveFiles() } delSeg = sn - dirtySegments = value.DirtySegments + dirtySegments = s.dirty[t] findDelSeg = false return true } return true }) - return !findDelSeg - }) + if findDelSeg { + break + } + } dirtySegments.Delete(delSeg) return err } @@ -1103,12 +1079,12 @@ func (s *RoSnapshots) buildMissedIndices(logPrefix string, ctx context.Context, var fmu sync.Mutex failedIndexes := make(map[string]error, 0) - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - value.DirtySegments.Walk(func(segs []*DirtySegment) bool { + for _, t := range s.enums { + s.dirty[t].Walk(func(segs []*DirtySegment) bool { for _, segment := range segs { info := segment.FileInfo(dir) - if segtype.HasIndexFiles(info, logger) { + if t.HasIndexFiles(info, logger) { continue } @@ -1119,7 +1095,7 @@ func (s *RoSnapshots) buildMissedIndices(logPrefix string, ctx context.Context, ps.Add(p) defer notifySegmentIndexingFinished(info.Name()) defer ps.Delete(p) - if err := segtype.BuildIndexes(gCtx, info, chainConfig, tmpDir, p, log.LvlInfo, logger); err != nil { + if err := t.BuildIndexes(gCtx, info, chainConfig, tmpDir, p, log.LvlInfo, logger); err != nil { // unsuccessful indexing should allow other indexing to finish fmu.Lock() failedIndexes[info.Name()] = err @@ -1130,8 +1106,7 @@ func (s *RoSnapshots) buildMissedIndices(logPrefix string, ctx context.Context, } return true }) - return true - }) + } var ie error @@ -1162,8 +1137,9 @@ func (s *RoSnapshots) buildMissedIndices(logPrefix string, ctx context.Context, func (s *RoSnapshots) PrintDebug() { v := s.View() defer v.Close() - s.segments.Scan(func(key snaptype.Enum, value *segments) bool { - fmt.Println(" == [dbg] Snapshots,", key.String()) + + for _, t := range s.types { + fmt.Println(" == [dbg] Snapshots,", t.Enum().String()) printDebug := func(sn *DirtySegment) { args := make([]any, 0, len(sn.Type().Indexes())+1) args = append(args, sn.from) @@ -1172,31 +1148,30 @@ func (s *RoSnapshots) PrintDebug() { } fmt.Println(args...) } - value.DirtySegments.Scan(func(sn *DirtySegment) bool { + s.dirty[t.Enum()].Scan(func(sn *DirtySegment) bool { printDebug(sn) return true }) - return true - }) + } } func (s *RoSnapshots) AddSnapshotsToSilkworm(silkwormInstance *silkworm.Silkworm) error { v := s.View() defer v.Close() - s.visibleSegmentsLock.RLock() - defer s.visibleSegmentsLock.RUnlock() + s.visibleLock.RLock() + defer s.visibleLock.RUnlock() mappedHeaderSnapshots := make([]*silkworm.MappedHeaderSnapshot, 0) - if headers, ok := v.VisibleSegments.Get(coresnaptype.Enums.Headers); ok { - for _, headerSegment := range headers.VisibleSegments { + if vis := v.segments[coresnaptype.Enums.Headers]; vis != nil { + for _, headerSegment := range vis.segments { mappedHeaderSnapshots = append(mappedHeaderSnapshots, headerSegment.src.mappedHeaderSnapshot()) } } mappedBodySnapshots := make([]*silkworm.MappedBodySnapshot, 0) - if bodies, ok := v.VisibleSegments.Get(coresnaptype.Enums.Bodies); ok { - for _, bodySegment := range bodies.VisibleSegments { + if vis := v.segments[coresnaptype.Enums.Bodies]; vis != nil { + for _, bodySegment := range vis.segments { mappedBodySnapshots = append(mappedBodySnapshots, bodySegment.src.mappedBodySnapshot()) } return nil @@ -1204,8 +1179,8 @@ func (s *RoSnapshots) AddSnapshotsToSilkworm(silkwormInstance *silkworm.Silkworm } mappedTxnSnapshots := make([]*silkworm.MappedTxnSnapshot, 0) - if txs, ok := v.VisibleSegments.Get(coresnaptype.Enums.Transactions); ok { - for _, txnSegment := range txs.VisibleSegments { + if txs := v.segments[coresnaptype.Enums.Transactions]; txs != nil { + for _, txnSegment := range txs.segments { mappedTxnSnapshots = append(mappedTxnSnapshots, txnSegment.src.mappedTxnSnapshot()) } } @@ -2221,16 +2196,14 @@ func (m *Merger) FindMergeRanges(currentRanges []Range, maxBlockNum uint64) (toM func (m *Merger) filesByRange(v *View, from, to uint64) (map[snaptype.Enum][]*DirtySegment, error) { toMerge := map[snaptype.Enum][]*DirtySegment{} - v.VisibleSegments.Scan(func(key snaptype.Enum, value *segmentsRotx) bool { - toMerge[key.Type().Enum()] = m.filesByRangeOfType(v, from, to, key.Type()) - return true - }) - + for _, t := range v.s.types { + toMerge[t.Enum()] = m.filesByRangeOfType(v, from, to, t) + } return toMerge, nil } func (m *Merger) filesByRangeOfType(view *View, from, to uint64, snapshotType snaptype.Type) (out []*DirtySegment) { - for _, sn := range view.segments(snapshotType) { + for _, sn := range view.segmentsByType(snapshotType) { if sn.from < from { continue } @@ -2354,17 +2327,12 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, snapTypes [] func (m *Merger) integrateMergedDirtyFiles(snapshots *RoSnapshots, in, out map[snaptype.Enum][]*DirtySegment) { defer snapshots.recalcVisibleFiles() - snapshots.dirtySegmentsLock.Lock() - defer snapshots.dirtySegmentsLock.Unlock() + snapshots.dirtyLock.Lock() + defer snapshots.dirtyLock.Unlock() // add new segments for enum, newSegs := range in { - segs, b := snapshots.segments.Get(enum) - if !b { - m.logger.Error("[snapshots] Merge: segment not found", "enum", enum) - continue - } - dirtySegments := segs.DirtySegments + dirtySegments := snapshots.dirty[enum] for _, newSeg := range newSegs { dirtySegments.Set(newSeg) if newSeg.frozen { @@ -2386,12 +2354,7 @@ func (m *Merger) integrateMergedDirtyFiles(snapshots *RoSnapshots, in, out map[s // delete old sub segments for enum, delSegs := range out { - segs, b := snapshots.segments.Get(enum) - if !b { - m.logger.Error("[snapshots] Merge: segment not found", "enum", enum) - continue - } - dirtySegments := segs.DirtySegments + dirtySegments := snapshots.dirty[enum] for _, delSeg := range delSegs { dirtySegments.Delete(delSeg) delSeg.canDelete.Store(true) @@ -2480,75 +2443,47 @@ func removeOldFiles(toDel []string, snapDir string) { } type View struct { - s *RoSnapshots - VisibleSegments btree.Map[snaptype.Enum, *segmentsRotx] - baseSegType snaptype.Type + s *RoSnapshots + segments []*segmentsRotx + baseSegType snaptype.Type } func (s *RoSnapshots) View() *View { - s.visibleSegmentsLock.RLock() - defer s.visibleSegmentsLock.RUnlock() - - var sgs btree.Map[snaptype.Enum, *segmentsRotx] - s.segments.Scan(func(segtype snaptype.Enum, value *segments) bool { - // BeginRo increments refcount - which is contended - s.dirtySegmentsLock.RLock() - defer s.dirtySegmentsLock.RUnlock() - sgs.Set(segtype, value.BeginRotx()) - return true - }) - return &View{s: s, VisibleSegments: sgs, baseSegType: coresnaptype.Transactions} // Transactions is the last segment to be processed, so it's the most reliable. + s.visibleLock.RLock() + defer s.visibleLock.RUnlock() + + sgs := make([]*segmentsRotx, snaptype.MaxEnum) + for _, t := range s.enums { + sgs[t] = s.visible[t].BeginRotx() + } + return &View{s: s, segments: sgs, baseSegType: coresnaptype.Transactions} // Transactions is the last segment to be processed, so it's the most reliable. } func (v *View) Close() { if v == nil || v.s == nil { return } + for _, t := range v.s.enums { + v.segments[t].Close() + } v.s = nil - - v.VisibleSegments.Scan(func(segtype snaptype.Enum, value *segmentsRotx) bool { - value.Close() - return true - }) } var noop = func() {} func (s *RoSnapshots) ViewType(t snaptype.Type) *segmentsRotx { - s.visibleSegmentsLock.RLock() - defer s.visibleSegmentsLock.RUnlock() - - seg, ok := s.segments.Get(t.Enum()) - if !ok { - return nil - } - // BeginRo increments refcount - which is contended - s.dirtySegmentsLock.RLock() - defer s.dirtySegmentsLock.RUnlock() - return seg.BeginRotx() + s.visibleLock.RLock() + defer s.visibleLock.RUnlock() + return s.visible[t.Enum()].BeginRotx() } func (s *RoSnapshots) ViewSingleFile(t snaptype.Type, blockNum uint64) (segment *VisibleSegment, ok bool, close func()) { - s.visibleSegmentsLock.RLock() - defer s.visibleSegmentsLock.RUnlock() + s.visibleLock.RLock() + defer s.visibleLock.RUnlock() - segs, ok := s.segments.Get(t.Enum()) - if !ok { - return nil, false, noop - } + segmentRotx := s.visible[t.Enum()].BeginRotx() - if blockNum > segs.maxVisibleBlock.Load() { - return nil, false, noop - } - - segmentRotx := func() *segmentsRotx { - // BeginRo increments refcount - which is contended - s.dirtySegmentsLock.RLock() - defer s.dirtySegmentsLock.RUnlock() - return segs.BeginRotx() - }() - - for _, seg := range segmentRotx.VisibleSegments { + for _, seg := range segmentRotx.segments { if !(blockNum >= seg.from && blockNum < seg.to) { continue } @@ -2558,34 +2493,26 @@ func (s *RoSnapshots) ViewSingleFile(t snaptype.Type, blockNum uint64) (segment return nil, false, noop } -func (v *View) segments(t snaptype.Type) []*VisibleSegment { - if s, ok := v.s.segments.Get(t.Enum()); ok { - return s.VisibleSegments - } - return nil +func (v *View) segmentsByType(t snaptype.Type) []*VisibleSegment { + return v.segments[t.Enum()].segments } -func (v *View) Headers() []*VisibleSegment { return v.segments(coresnaptype.Headers) } -func (v *View) Bodies() []*VisibleSegment { return v.segments(coresnaptype.Bodies) } -func (v *View) Txs() []*VisibleSegment { return v.segments(coresnaptype.Transactions) } +func (v *View) Headers() []*VisibleSegment { return v.segmentsByType(coresnaptype.Headers) } +func (v *View) Bodies() []*VisibleSegment { return v.segmentsByType(coresnaptype.Bodies) } +func (v *View) Txs() []*VisibleSegment { return v.segmentsByType(coresnaptype.Transactions) } func (v *View) Segment(t snaptype.Type, blockNum uint64) (*VisibleSegment, bool) { - if s, ok := v.s.segments.Get(t.Enum()); ok { - if blockNum > s.maxVisibleBlock.Load() { - return nil, false - } - for _, seg := range s.VisibleSegments { - if !(blockNum >= seg.from && blockNum < seg.to) { - continue - } - return seg, true + for _, seg := range v.s.visible[t.Enum()] { + if !(blockNum >= seg.from && blockNum < seg.to) { + continue } + return seg, true } return nil, false } func (v *View) Ranges() (ranges []Range) { - for _, sn := range v.segments(v.baseSegType) { + for _, sn := range v.segmentsByType(v.baseSegType) { ranges = append(ranges, sn.Range) } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go index 075bac18ef9..c602457433e 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots_test.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots_test.go @@ -416,19 +416,15 @@ func TestOpenAllSnapshot(t *testing.T) { defer s.Close() err := s.OpenFolder() require.NoError(err) - require.NotNil(s.segments.Get(coresnaptype.Enums.Headers)) - getSegs := func(e snaptype.Enum) *segments { - res, _ := s.segments.Get(e) - return res - } - require.Equal(0, len(getSegs(coresnaptype.Enums.Headers).VisibleSegments)) + require.NotNil(s.visible[coresnaptype.Enums.Headers]) + require.Equal(0, len(s.visible[coresnaptype.Enums.Headers])) s.Close() createFile(step, step*2, coresnaptype.Bodies) s = NewRoSnapshots(cfg, dir, 0, logger) defer s.Close() - require.NotNil(getSegs(coresnaptype.Enums.Bodies)) - require.Equal(0, len(getSegs(coresnaptype.Enums.Bodies).VisibleSegments)) + require.NotNil(s.visible[coresnaptype.Enums.Bodies]) + require.Equal(0, len(s.visible[coresnaptype.Enums.Bodies])) s.Close() createFile(step, step*2, coresnaptype.Headers) @@ -436,9 +432,9 @@ func TestOpenAllSnapshot(t *testing.T) { s = NewRoSnapshots(cfg, dir, 0, logger) err = s.OpenFolder() require.NoError(err) - require.NotNil(getSegs(coresnaptype.Enums.Headers)) + require.NotNil(s.visible[coresnaptype.Enums.Headers]) s.OpenSegments(coresnaptype.BlockSnapshotTypes, false) - // require.Equal(1, len(getSegs(coresnaptype.Enums.Headers).visibleSegments)) + // require.Equal(1, len(getSegs(coresnaptype.Enums.Headers])) s.Close() createFile(0, step, coresnaptype.Bodies) @@ -449,8 +445,8 @@ func TestOpenAllSnapshot(t *testing.T) { err = s.OpenFolder() require.NoError(err) - require.NotNil(getSegs(coresnaptype.Enums.Headers)) - require.Equal(2, len(getSegs(coresnaptype.Enums.Headers).VisibleSegments)) + require.NotNil(s.visible[coresnaptype.Enums.Headers]) + require.Equal(2, len(s.visible[coresnaptype.Enums.Headers])) view := s.View() defer view.Close() @@ -473,8 +469,8 @@ func TestOpenAllSnapshot(t *testing.T) { err = s.OpenFolder() require.NoError(err) defer s.Close() - require.NotNil(getSegs(coresnaptype.Enums.Headers)) - require.Equal(2, len(getSegs(coresnaptype.Enums.Headers).VisibleSegments)) + require.NotNil(s.visible[coresnaptype.Enums.Headers]) + require.Equal(2, len(s.visible[coresnaptype.Enums.Headers])) createFile(step, step*2-step/5, coresnaptype.Headers) createFile(step, step*2-step/5, coresnaptype.Bodies) @@ -526,11 +522,6 @@ func TestParseCompressedFileName(t *testing.T) { require.Equal(2_000, int(f.To)) } -func getSeg(s *RoSnapshots, e snaptype.Enum) *segments { - res, _ := s.segments.Get(e) - return res -} - func TestCalculateVisibleSegments(t *testing.T) { logger := log.New() dir, require := t.TempDir(), require.New(t) @@ -556,13 +547,13 @@ func TestCalculateVisibleSegments(t *testing.T) { idx := s.idxAvailability() require.Equal(2_500_000-1, int(idx)) - require.Equal(5, len(getSeg(s, coresnaptype.Enums.Headers).VisibleSegments)) - require.Equal(5, len(getSeg(s, coresnaptype.Enums.Bodies).VisibleSegments)) - require.Equal(5, len(getSeg(s, coresnaptype.Enums.Transactions).VisibleSegments)) + require.Equal(5, len(s.visible[coresnaptype.Enums.Headers])) + require.Equal(5, len(s.visible[coresnaptype.Enums.Bodies])) + require.Equal(5, len(s.visible[coresnaptype.Enums.Transactions])) - require.Equal(7, getSeg(s, coresnaptype.Enums.Headers).DirtySegments.Len()) - require.Equal(6, getSeg(s, coresnaptype.Enums.Bodies).DirtySegments.Len()) - require.Equal(5, getSeg(s, coresnaptype.Enums.Transactions).DirtySegments.Len()) + require.Equal(7, s.dirty[coresnaptype.Enums.Headers].Len()) + require.Equal(6, s.dirty[coresnaptype.Enums.Bodies].Len()) + require.Equal(5, s.dirty[coresnaptype.Enums.Transactions].Len()) } // gap in transactions: [5*500_000 - 6*500_000] @@ -573,13 +564,13 @@ func TestCalculateVisibleSegments(t *testing.T) { idx := s.idxAvailability() require.Equal(2_500_000-1, int(idx)) - require.Equal(5, len(getSeg(s, coresnaptype.Enums.Headers).VisibleSegments)) - require.Equal(5, len(getSeg(s, coresnaptype.Enums.Bodies).VisibleSegments)) - require.Equal(5, len(getSeg(s, coresnaptype.Enums.Transactions).VisibleSegments)) + require.Equal(5, len(s.visible[coresnaptype.Enums.Headers])) + require.Equal(5, len(s.visible[coresnaptype.Enums.Bodies])) + require.Equal(5, len(s.visible[coresnaptype.Enums.Transactions])) - require.Equal(7, getSeg(s, coresnaptype.Enums.Headers).DirtySegments.Len()) - require.Equal(6, getSeg(s, coresnaptype.Enums.Bodies).DirtySegments.Len()) - require.Equal(5, getSeg(s, coresnaptype.Enums.Transactions).DirtySegments.Len()) + require.Equal(7, s.dirty[coresnaptype.Enums.Headers].Len()) + require.Equal(6, s.dirty[coresnaptype.Enums.Bodies].Len()) + require.Equal(5, s.dirty[coresnaptype.Enums.Transactions].Len()) } // overlap in transactions: [4*500_000 - 4.5*500_000] @@ -590,13 +581,13 @@ func TestCalculateVisibleSegments(t *testing.T) { idx := s.idxAvailability() require.Equal(2_500_000-1, int(idx)) - require.Equal(5, len(getSeg(s, coresnaptype.Enums.Headers).VisibleSegments)) - require.Equal(5, len(getSeg(s, coresnaptype.Enums.Bodies).VisibleSegments)) - require.Equal(5, len(getSeg(s, coresnaptype.Enums.Transactions).VisibleSegments)) + require.Equal(5, len(s.visible[coresnaptype.Enums.Headers])) + require.Equal(5, len(s.visible[coresnaptype.Enums.Bodies])) + require.Equal(5, len(s.visible[coresnaptype.Enums.Transactions])) - require.Equal(7, getSeg(s, coresnaptype.Enums.Headers).DirtySegments.Len()) - require.Equal(6, getSeg(s, coresnaptype.Enums.Bodies).DirtySegments.Len()) - require.Equal(5, getSeg(s, coresnaptype.Enums.Transactions).DirtySegments.Len()) + require.Equal(7, s.dirty[coresnaptype.Enums.Headers].Len()) + require.Equal(6, s.dirty[coresnaptype.Enums.Bodies].Len()) + require.Equal(5, s.dirty[coresnaptype.Enums.Transactions].Len()) } } @@ -625,6 +616,6 @@ func TestCalculateVisibleSegmentsWhenGapsInIdx(t *testing.T) { idx := s.idxAvailability() require.Equal(500_000-1, int(idx)) - require.Equal(1, len(getSeg(s, coresnaptype.Enums.Headers).VisibleSegments)) - require.Equal(3, getSeg(s, coresnaptype.Enums.Headers).DirtySegments.Len()) + require.Equal(1, len(s.visible[coresnaptype.Enums.Headers])) + require.Equal(3, s.dirty[coresnaptype.Enums.Headers].Len()) } diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index 7078b2f9ac7..3ca90961163 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -475,10 +475,14 @@ func (v *BorView) Close() { v.base.Close() } -func (v *BorView) Events() []*VisibleSegment { return v.base.segments(borsnaptype.BorEvents) } -func (v *BorView) Spans() []*VisibleSegment { return v.base.segments(borsnaptype.BorSpans) } -func (v *BorView) Checkpoints() []*VisibleSegment { return v.base.segments(borsnaptype.BorCheckpoints) } -func (v *BorView) Milestones() []*VisibleSegment { return v.base.segments(borsnaptype.BorMilestones) } +func (v *BorView) Events() []*VisibleSegment { return v.base.segmentsByType(borsnaptype.BorEvents) } +func (v *BorView) Spans() []*VisibleSegment { return v.base.segmentsByType(borsnaptype.BorSpans) } +func (v *BorView) Checkpoints() []*VisibleSegment { + return v.base.segmentsByType(borsnaptype.BorCheckpoints) +} +func (v *BorView) Milestones() []*VisibleSegment { + return v.base.segmentsByType(borsnaptype.BorMilestones) +} func (v *BorView) EventsSegment(blockNum uint64) (*VisibleSegment, bool) { return v.base.Segment(borsnaptype.BorEvents, blockNum) diff --git a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go index dfebdf600a8..5b70a46eac5 100644 --- a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go @@ -32,8 +32,6 @@ import ( "github.com/klauspost/compress/zstd" "github.com/tidwall/btree" - "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon-lib/chain/snapcfg" libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/background" @@ -42,6 +40,7 @@ import ( "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/kv/dbutils" + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/recsplit" "github.com/erigontech/erigon-lib/seg" @@ -86,16 +85,13 @@ func BeaconSimpleIdx(ctx context.Context, sn snaptype.FileInfo, salt uint32, tmp // slot -> beacon_slot_segment_offset type CaplinSnapshots struct { - indicesReady atomic.Bool - segmentsReady atomic.Bool - Salt uint32 - dirtySegmentsLock sync.RWMutex - visibleSegmentsLock sync.RWMutex + dirtyLock sync.RWMutex // guards `dirty` field + dirty []*btree.BTreeG[*DirtySegment] // ordered map `type.Enum()` -> DirtySegments - BeaconBlocks *segments - BlobSidecars *segments + visibleLock sync.RWMutex // guards `visible` field + visible []VisibleSegments // ordered map `type.Enum()` -> VisbileSegments dir string tmpdir string @@ -115,13 +111,12 @@ type CaplinSnapshots struct { // - gaps are not allowed // - segment have [from:to) semantic func NewCaplinSnapshots(cfg ethconfig.BlocksFreezing, beaconCfg *clparams.BeaconChainConfig, dirs datadir.Dirs, logger log.Logger) *CaplinSnapshots { - BeaconBlocks := &segments{ - DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), - } - BlobSidecars := &segments{ - DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), + c := &CaplinSnapshots{dir: dirs.Snap, tmpdir: dirs.Tmp, cfg: cfg, logger: logger, beaconCfg: beaconCfg, + dirty: make([]*btree.BTreeG[*DirtySegment], snaptype.MaxEnum), + visible: make([]VisibleSegments, snaptype.MaxEnum), } - c := &CaplinSnapshots{dir: dirs.Snap, tmpdir: dirs.Tmp, cfg: cfg, BeaconBlocks: BeaconBlocks, BlobSidecars: BlobSidecars, logger: logger, beaconCfg: beaconCfg} + c.dirty[snaptype.BeaconBlocks.Enum()] = btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}) + c.dirty[snaptype.BlobSidecars.Enum()] = btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}) c.recalcVisibleFiles() return c } @@ -142,12 +137,12 @@ func (s *CaplinSnapshots) LS() { defer view.Close() if view.BeaconBlockRotx != nil { - for _, seg := range view.BeaconBlockRotx.VisibleSegments { + for _, seg := range view.BeaconBlockRotx.segments { log.Info("[agg] ", "f", seg.src.Decompressor.FileName(), "words", seg.src.Decompressor.Count()) } } if view.BlobSidecarRotx != nil { - for _, seg := range view.BlobSidecarRotx.VisibleSegments { + for _, seg := range view.BlobSidecarRotx.segments { log.Info("[agg] ", "f", seg.src.Decompressor.FileName(), "words", seg.src.Decompressor.Count()) } } @@ -158,12 +153,12 @@ func (s *CaplinSnapshots) SegFileNames(from, to uint64) []string { defer view.Close() var res []string - for _, seg := range view.BeaconBlockRotx.VisibleSegments { + for _, seg := range view.BeaconBlockRotx.segments { if seg.from >= from && seg.to <= to { res = append(res, seg.src.FileName()) } } - for _, seg := range view.BlobSidecarRotx.VisibleSegments { + for _, seg := range view.BlobSidecarRotx.segments { if seg.from >= from && seg.to <= to { res = append(res, seg.src.FileName()) } @@ -179,8 +174,8 @@ func (s *CaplinSnapshots) Close() { if s == nil { return } - s.dirtySegmentsLock.Lock() - defer s.dirtySegmentsLock.Unlock() + s.dirtyLock.Lock() + defer s.dirtyLock.Unlock() s.closeWhatNotInList(nil) } @@ -189,8 +184,8 @@ func (s *CaplinSnapshots) Close() { func (s *CaplinSnapshots) OpenList(fileNames []string, optimistic bool) error { defer s.recalcVisibleFiles() - s.dirtySegmentsLock.Lock() - defer s.dirtySegmentsLock.Unlock() + s.dirtyLock.Lock() + defer s.dirtyLock.Unlock() s.closeWhatNotInList(fileNames) var segmentsMax uint64 @@ -206,7 +201,7 @@ Loop: case snaptype.CaplinEnums.BeaconBlocks: var sn *DirtySegment var exists bool - s.BeaconBlocks.DirtySegments.Walk(func(segments []*DirtySegment) bool { + s.dirty[snaptype.BeaconBlocks.Enum()].Walk(func(segments []*DirtySegment) bool { for _, sn2 := range segments { if sn2.Decompressor == nil { // it's ok if some segment was not able to open continue @@ -246,7 +241,7 @@ Loop: if !exists { // it's possible to iterate over .seg file even if you don't have index // then make segment available even if index open may fail - s.BeaconBlocks.DirtySegments.Set(sn) + s.dirty[snaptype.BeaconBlocks.Enum()].Set(sn) } if err := sn.openIdxIfNeed(s.dir, optimistic); err != nil { return err @@ -263,7 +258,7 @@ Loop: case snaptype.CaplinEnums.BlobSidecars: var sn *DirtySegment var exists bool - s.BlobSidecars.DirtySegments.Walk(func(segments []*DirtySegment) bool { + s.dirty[snaptype.BlobSidecars.Enum()].Walk(func(segments []*DirtySegment) bool { for _, sn2 := range segments { if sn2.Decompressor == nil { // it's ok if some segment was not able to open continue @@ -303,7 +298,7 @@ Loop: if !exists { // it's possible to iterate over .seg file even if you don't have index // then make segment available even if index open may fail - s.BlobSidecars.DirtySegments.Set(sn) + s.dirty[snaptype.BlobSidecars.Enum()].Set(sn) } if err := sn.openIdxIfNeed(s.dir, optimistic); err != nil { return err @@ -314,18 +309,16 @@ Loop: if segmentsMaxSet { s.segmentsMax.Store(segmentsMax) } - s.segmentsReady.Store(true) return nil } func (s *CaplinSnapshots) recalcVisibleFiles() { defer func() { s.idxMax.Store(s.idxAvailability()) - s.indicesReady.Store(true) }() - s.visibleSegmentsLock.Lock() - defer s.visibleSegmentsLock.Unlock() + s.visibleLock.Lock() + defer s.visibleLock.Unlock() getNewVisibleSegments := func(dirtySegments *btree.BTreeG[*DirtySegment]) []*VisibleSegment { newVisibleSegments := make([]*VisibleSegment, 0, dirtySegments.Len()) @@ -351,18 +344,18 @@ func (s *CaplinSnapshots) recalcVisibleFiles() { }) return newVisibleSegments } - s.BeaconBlocks.VisibleSegments = getNewVisibleSegments(s.BeaconBlocks.DirtySegments) - s.BlobSidecars.VisibleSegments = getNewVisibleSegments(s.BlobSidecars.DirtySegments) - - var maxIdx uint64 - if len(s.BeaconBlocks.VisibleSegments) > 0 { - maxIdx = s.BeaconBlocks.VisibleSegments[len(s.BeaconBlocks.VisibleSegments)-1].to - 1 - } - s.BeaconBlocks.maxVisibleBlock.Store(maxIdx) + s.visible = make([]VisibleSegments, snaptype.MaxEnum) // create new pointer - only new readers will see it. old-alive readers will continue use previous pointer + s.visible[snaptype.BeaconBlocks.Enum()] = getNewVisibleSegments(s.dirty[snaptype.BeaconBlocks.Enum()]) + s.visible[snaptype.BlobSidecars.Enum()] = getNewVisibleSegments(s.dirty[snaptype.BlobSidecars.Enum()]) } func (s *CaplinSnapshots) idxAvailability() uint64 { - return s.BeaconBlocks.maxVisibleBlock.Load() + s.visibleLock.RLock() + defer s.visibleLock.RUnlock() + if len(s.visible[snaptype.BeaconBlocks.Enum()]) == 0 { + return 0 + } + return s.visible[snaptype.BeaconBlocks.Enum()][len(s.visible[snaptype.BeaconBlocks.Enum()])-1].to } func (s *CaplinSnapshots) OpenFolder() error { @@ -384,7 +377,7 @@ func (s *CaplinSnapshots) closeWhatNotInList(l []string) { protectFiles[fName] = struct{}{} } toClose := make([]*DirtySegment, 0) - s.BeaconBlocks.DirtySegments.Walk(func(segments []*DirtySegment) bool { + s.dirty[snaptype.BeaconBlocks.Enum()].Walk(func(segments []*DirtySegment) bool { for _, sn := range segments { if sn.Decompressor == nil { continue @@ -399,11 +392,11 @@ func (s *CaplinSnapshots) closeWhatNotInList(l []string) { }) for _, sn := range toClose { sn.close() - s.BeaconBlocks.DirtySegments.Delete(sn) + s.dirty[snaptype.BeaconBlocks.Enum()].Delete(sn) } toClose = make([]*DirtySegment, 0) - s.BlobSidecars.DirtySegments.Walk(func(segments []*DirtySegment) bool { + s.dirty[snaptype.BlobSidecars.Enum()].Walk(func(segments []*DirtySegment) bool { for _, sn := range segments { if sn.Decompressor == nil { continue @@ -418,7 +411,7 @@ func (s *CaplinSnapshots) closeWhatNotInList(l []string) { }) for _, sn := range toClose { sn.close() - s.BlobSidecars.DirtySegments.Delete(sn) + s.dirty[snaptype.BlobSidecars.Enum()].Delete(sn) } } @@ -430,18 +423,15 @@ type CaplinView struct { } func (s *CaplinSnapshots) View() *CaplinView { - s.visibleSegmentsLock.RLock() - defer s.visibleSegmentsLock.RUnlock() + s.visibleLock.RLock() + defer s.visibleLock.RUnlock() v := &CaplinView{s: s} - // BeginRo increments refcount - which is contended - s.dirtySegmentsLock.RLock() - defer s.dirtySegmentsLock.RUnlock() - if s.BeaconBlocks != nil { - v.BeaconBlockRotx = s.BeaconBlocks.BeginRotx() + if s.visible[snaptype.BeaconBlocks.Enum()] != nil { + v.BeaconBlockRotx = s.visible[snaptype.BeaconBlocks.Enum()].BeginRotx() } - if s.BlobSidecars != nil { - v.BlobSidecarRotx = s.BlobSidecars.BeginRotx() + if s.visible[snaptype.BlobSidecars.Enum()] != nil { + v.BlobSidecarRotx = s.visible[snaptype.BlobSidecars.Enum()].BeginRotx() } return v } @@ -457,9 +447,9 @@ func (v *CaplinView) Close() { } func (v *CaplinView) BeaconBlocks() []*VisibleSegment { - return v.BeaconBlockRotx.VisibleSegments + return v.BeaconBlockRotx.segments } -func (v *CaplinView) BlobSidecars() []*VisibleSegment { return v.BlobSidecarRotx.VisibleSegments } +func (v *CaplinView) BlobSidecars() []*VisibleSegment { return v.BlobSidecarRotx.segments } func (v *CaplinView) BeaconBlocksSegment(slot uint64) (*VisibleSegment, bool) { for _, seg := range v.BeaconBlocks() { @@ -806,7 +796,7 @@ func (s *CaplinSnapshots) FrozenBlobs() uint64 { return 0 } ret := uint64(0) - for _, seg := range s.BlobSidecars.VisibleSegments { + for _, seg := range s.visible[snaptype.BlobSidecars.Enum()] { ret = max(ret, seg.to) }