diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go index 9f56e24446a2..82e5a019f7d2 100644 --- a/beacon-chain/db/iface/interface.go +++ b/beacon-chain/db/iface/interface.go @@ -161,6 +161,7 @@ type SlasherDatabase interface { ) ([]*ethpb.HighestAttestation, error) DatabasePath() string ClearDB() error + Migrate(ctx context.Context, headEpoch, maxPruningEpoch primitives.Epoch, batchSize int) error } // Database interface with full access. diff --git a/beacon-chain/db/slasherkv/BUILD.bazel b/beacon-chain/db/slasherkv/BUILD.bazel index 03d3a5105588..9b6edf28047b 100644 --- a/beacon-chain/db/slasherkv/BUILD.bazel +++ b/beacon-chain/db/slasherkv/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "kv.go", "log.go", "metrics.go", + "migrate.go", "pruning.go", "schema.go", "slasher.go", @@ -21,6 +22,7 @@ go_library( "//io/file:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//time/slots:go_default_library", + "@com_github_go_errors_errors//:go_default_library", "@com_github_golang_snappy//:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", diff --git a/beacon-chain/db/slasherkv/migrate.go b/beacon-chain/db/slasherkv/migrate.go new file mode 100644 index 000000000000..495dfd13334f --- /dev/null +++ b/beacon-chain/db/slasherkv/migrate.go @@ -0,0 +1,228 @@ +package slasherkv + +import ( + "context" + "encoding/binary" + "time" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/time/slots" + "github.com/sirupsen/logrus" + bolt "go.etcd.io/bbolt" +) + +// Migrate, its corresponding usage and tests can be totally removed once Electra is on mainnet. +// Previously, the first 8 bytes of keys of `attestation-data-roots` and `proposal-records` buckets +// were stored as little-endian respectively epoch and slots. It was the source of +// https://github.com/prysmaticlabs/prysm/issues/14142 and potentially +// https://github.com/prysmaticlabs/prysm/issues/13658. +// To solve this (or these) issue(s), we decided to store the first 8 bytes of keys as big-endian. +// See https://github.com/prysmaticlabs/prysm/pull/14151. +// However, not to break the backward compatibility, we need to migrate the existing data. +// The strategy is quite simple: If, for these bucket keys in the store, we detect +// a slot (resp. epoch) higher, than the curreet slot (resp. epoch), then we consider that the data +// is stored in little-endian. We create a new entry with the same value, but with the slot (resp. epoch) +// part in the key stored as a big-endian. +// We start the iterate by the highest key and iterate down until we reach the current slot (resp. epoch). +func (s *Store) Migrate(ctx context.Context, headEpoch, maxPruningEpoch primitives.Epoch, batchSize int) error { + // Migrate attestations. + log.Info("Starting migration of attestations. This may take a while.") + start := time.Now() + + if err := s.migrateAttestations(ctx, headEpoch, maxPruningEpoch, batchSize); err != nil { + return errors.Wrap(err, "migrate attestations") + } + + log.WithField("duration", time.Since(start)).Info("Migration of attestations completed successfully") + + // Migrate proposals. + log.Info("Starting migration of proposals. This may take a while.") + start = time.Now() + + if err := s.migrateProposals(ctx, headEpoch, maxPruningEpoch, batchSize); err != nil { + return errors.Wrap(err, "migrate proposals") + } + + log.WithField("duration", time.Since(start)).Info("Migration of proposals completed successfully") + + return nil +} + +func (s *Store) migrateAttestations(ctx context.Context, headEpoch, maxPruningEpoch primitives.Epoch, batchSize int) error { + done := false + + for !done { + count := 0 + + if err := s.db.Update(func(tx *bolt.Tx) error { + signingRootsBkt := tx.Bucket(attestationDataRootsBucket) + attRecordsBkt := tx.Bucket(attestationRecordsBucket) + + // We begin a migrating iteration starting from the last item in the bucket. + c := signingRootsBkt.Cursor() + for k, v := c.Last(); k != nil; k, v = c.Prev() { + if count >= batchSize { + return nil + } + + // Check if the context is done. + if ctx.Err() != nil { + return ctx.Err() + } + + // Extract the epoch encoded in the first 8 bytes of the key. + encodedEpoch := k[:8] + + // Convert it to an uint64, considering it is stored as big-endian. + epochBigEndian := binary.BigEndian.Uint64(encodedEpoch) + + // If the epoch is smaller or equal to the current epoch, we are done. + if epochBigEndian <= uint64(headEpoch) { + break + } + + // Increment the count of migrated items. + count++ + + // Otherwise, we consider that the epoch is stored as little-endian. + epochLittleEndian := binary.LittleEndian.Uint64(encodedEpoch) + + // If the epoch is still higher than the current epoch, then it is an issue. + // This should never happen. + if epochLittleEndian > uint64(headEpoch) { + log.WithFields(logrus.Fields{ + "epochLittleEndian": epochLittleEndian, + "epochBigEndian": epochBigEndian, + "headEpoch": headEpoch, + }).Error("Epoch is higher than the current epoch both if stored as little-endian or as big-endian") + + continue + } + + epoch := primitives.Epoch(epochLittleEndian) + if err := signingRootsBkt.Delete(k); err != nil { + return err + } + + // We don't bother migrating data that is going to be pruned by the pruning routine. + if epoch <= maxPruningEpoch { + if err := attRecordsBkt.Delete(v); err != nil { + return err + } + + continue + } + + // Create a new key with the epoch stored as big-endian. + encodedEpochBigEndian := make([]byte, 8) + binary.BigEndian.PutUint64(encodedEpochBigEndian, uint64(epoch)) + newK := append(encodedEpochBigEndian, k[8:]...) + + // Store the same value with the new key. + if err := signingRootsBkt.Put(newK, v); err != nil { + return err + } + } + + done = true + + return nil + }); err != nil { + return err + } + } + + return nil +} + +func (s *Store) migrateProposals(ctx context.Context, headEpoch, maxPruningEpoch primitives.Epoch, batchSize int) error { + done := false + + if !done { + count := 0 + + // Compute the max pruning slot. + maxPruningSlot, err := slots.EpochEnd(maxPruningEpoch) + if err != nil { + return errors.Wrap(err, "compute max pruning slot") + } + + // Compute the head slot. + headSlot, err := slots.EpochEnd(headEpoch) + if err != nil { + return errors.Wrap(err, "compute head slot") + } + + if err := s.db.Update(func(tx *bolt.Tx) error { + proposalBkt := tx.Bucket(proposalRecordsBucket) + + // We begin a migrating iteration starting from the last item in the bucket. + c := proposalBkt.Cursor() + for k, v := c.Last(); k != nil; k, v = c.Prev() { + if count >= batchSize { + return nil + } + + // Check if the context is done. + if ctx.Err() != nil { + return ctx.Err() + } + + // Extract the slot encoded in the first 8 bytes of the key. + encodedSlot := k[:8] + + // Convert it to an uint64, considering it is stored as big-endian. + slotBigEndian := binary.BigEndian.Uint64(encodedSlot) + + // If the epoch is smaller or equal to the current epoch, we are done. + if slotBigEndian <= uint64(maxPruningSlot) { + break + } + + // Otherwise, we consider that the epoch is stored as little-endian. + slotLittleEndian := binary.LittleEndian.Uint64(encodedSlot) + + // If the slot is still higher than the current slot, then it is an issue. + // This should never happen. + if slotLittleEndian > uint64(headSlot) { + log.WithFields(logrus.Fields{ + "slotLittleEndian": slotLittleEndian, + "slotBigEndian": slotBigEndian, + "headSlot": headSlot, + }).Error("Slot is higher than the current slot both if stored as little-endian or as big-endian") + + continue + } + + slot := primitives.Slot(slotLittleEndian) + if err := proposalBkt.Delete(k); err != nil { + return err + } + + // We don't bother migrating data that is going to be pruned by the pruning routine. + if slot <= maxPruningSlot { + continue + } + + // Create a new key with the epoch stored as big-endian. + encodedSlotBigEndian := make([]byte, 8) + binary.BigEndian.PutUint64(encodedSlotBigEndian, uint64(slot)) + newK := append(encodedSlotBigEndian, k[8:]...) + + // Store the same value with the new key. + if err := proposalBkt.Put(newK, v); err != nil { + return err + } + } + + done = true + + return nil + }); err != nil { + return err + } + } + + return nil +} diff --git a/beacon-chain/db/slasherkv/migrate_test.go b/beacon-chain/db/slasherkv/migrate_test.go new file mode 100644 index 000000000000..7a108731eb02 --- /dev/null +++ b/beacon-chain/db/slasherkv/migrate_test.go @@ -0,0 +1,235 @@ +package slasherkv + +import ( + "context" + "encoding/binary" + "testing" + + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/testing/require" + bolt "go.etcd.io/bbolt" +) + +type endianness int + +const ( + bigEndian endianness = iota + littleEndian +) + +func encodeEpochLittleEndian(epoch primitives.Epoch) []byte { + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, uint64(epoch)) + return buf +} + +func createAttestation(signingRootBucket, attRecordsBucket *bolt.Bucket, epoch primitives.Epoch, encoding endianness) error { + // Encode the target epoch. + var key []byte + if encoding == bigEndian { + key = encodeTargetEpoch(epoch) + } else { + key = encodeEpochLittleEndian(epoch) + } + + // Encode the validator index. + encodedValidatorIndex := encodeValidatorIndex(primitives.ValidatorIndex(epoch)) + + // Write the attestation to the database. + key = append(key, encodedValidatorIndex...) + if err := signingRootBucket.Put(key, encodedValidatorIndex); err != nil { + return err + } + + if err := attRecordsBucket.Put(encodedValidatorIndex, []byte("dummy")); err != nil { + return err + } + + return nil +} + +func createProposal(proposalBucket *bolt.Bucket, epoch primitives.Epoch, encoding endianness) error { + // Get the slot for the epoch. + slot := primitives.Slot(epoch) * params.BeaconConfig().SlotsPerEpoch + + // Encode the slot. + key := make([]byte, 8) + if encoding == bigEndian { + binary.BigEndian.PutUint64(key, uint64(slot)) + } else { + binary.LittleEndian.PutUint64(key, uint64(slot)) + } + + // Encode the validator index. + encodedValidatorIndex := encodeValidatorIndex(primitives.ValidatorIndex(slot)) + + // Write the proposal to the database. + key = append(key, encodedValidatorIndex...) + if err := proposalBucket.Put(key, []byte("dummy")); err != nil { + return err + } + + return nil +} + +func TestMigrate(t *testing.T) { + const ( + headEpoch = primitives.Epoch(65000) + maxPruningEpoch = primitives.Epoch(60000) + batchSize = 3 + ) + + /* + State of the DB before migration: + ================================= + + Attestations: + ------------- + 59000 (LE), 59100 (LE), 59200 (BE), 59300 (LE), 59400 (LE), 59500 (LE), 59600 (LE), 59700 (LE), 59800 (LE), 59900 (LE), + 60000 (LE), 60100 (LE), 60200 (LE), 60300 (LE), 60400 (BE), 60500 (LE), 60600 (LE), 60700 (LE), 60800 (LE), 60900 (LE) + + + Proposals: + ---------- + 59000*32 (LE), 59100*32 (LE), 59200*32 (BE), 59300*32 (LE), 59400*32 (LE), 59500*32 (LE), 59600*32 (LE), 59700*32 (LE), 59800*32 (LE), 59900*32 (LE), + 60000*32 (LE), 60100*32 (LE), 60200*32 (LE), 60300*32 (LE), 60400*32 (BE), 60500*32 (LE), 60600*32 (LE), 60700*32 (LE), 60800*32 (LE), 60900*32 (LE) + + + State of the DB after migration: + ================================ + + Attestations: + ------------- + 60100*32 (BE), 60200*32 (BE), 60300*32 (BE), 60400*32 (BE), 60500*32 (BE), 60600*32 (BE), 60700*32 (BE), 60800*32 (BE), 60900*32 (BE) + */ + + beforeLittleEndianEpochs := []primitives.Epoch{ + 59000, 59100, 59300, 59400, 59500, 59600, 59700, 59800, 59900, + 60000, 60100, 60200, 60300, 60500, 60600, 60700, 60800, 60900, + } + + beforeBigEndianEpochs := []primitives.Epoch{59200, 60400} + + afterBigEndianEpochs := []primitives.Epoch{ + 59200, 60100, 60200, 60300, 60400, 60500, 60600, 60700, 60800, 60900, + } + + // Create a new context. + ctx := context.Background() + + // Setup a test database. + beaconDB := setupDB(t) + + // Write attestations and proposals to the database. + err := beaconDB.db.Update(func(tx *bolt.Tx) error { + signingRootsBkt := tx.Bucket(attestationDataRootsBucket) + attRecordsBkt := tx.Bucket(attestationRecordsBucket) + proposalBkt := tx.Bucket(proposalRecordsBucket) + + // Create attestations with little-endian encoding. + for _, epoch := range beforeLittleEndianEpochs { + if err := createAttestation(signingRootsBkt, attRecordsBkt, epoch, littleEndian); err != nil { + return err + } + } + + // Create attestations with big-endian encoding. + for _, epoch := range beforeBigEndianEpochs { + if err := createAttestation(signingRootsBkt, attRecordsBkt, epoch, bigEndian); err != nil { + return err + } + } + + // Create proposals with little-endian encoding. + for _, epoch := range beforeLittleEndianEpochs { + if err := createProposal(proposalBkt, epoch, littleEndian); err != nil { + return err + } + } + + // Create proposals with big-endian encoding. + for _, epoch := range beforeBigEndianEpochs { + if err := createProposal(proposalBkt, epoch, bigEndian); err != nil { + return err + } + } + + return nil + }) + + require.NoError(t, err) + + // Migrate the database. + err = beaconDB.Migrate(ctx, headEpoch, maxPruningEpoch, batchSize) + require.NoError(t, err) + + // Check the state of the database after migration. + err = beaconDB.db.View(func(tx *bolt.Tx) error { + signingRootsBkt := tx.Bucket(attestationDataRootsBucket) + attRecordsBkt := tx.Bucket(attestationRecordsBucket) + proposalBkt := tx.Bucket(proposalRecordsBucket) + + // Check that all the expected attestations are in the store. + for _, epoch := range afterBigEndianEpochs { + // Check if the attestation exists. + key := encodeTargetEpoch(epoch) + encodedValidatorIndex := encodeValidatorIndex(primitives.ValidatorIndex(epoch)) + key = append(key, encodedValidatorIndex...) + + // Check the signing root bucket. + indexedAtt := signingRootsBkt.Get(key) + require.DeepSSZEqual(t, encodedValidatorIndex, indexedAtt) + + // Check the attestation records bucket. + attestationRecord := attRecordsBkt.Get(encodedValidatorIndex) + require.DeepSSZEqual(t, []byte("dummy"), attestationRecord) + } + + // Check only the expected attestations are in the store. + c := signingRootsBkt.Cursor() + count := 0 + for k, _ := c.First(); k != nil; k, _ = c.Next() { + count++ + } + + require.Equal(t, len(afterBigEndianEpochs), count) + + c = attRecordsBkt.Cursor() + count = 0 + for k, _ := c.First(); k != nil; k, _ = c.Next() { + count++ + } + + require.Equal(t, len(afterBigEndianEpochs), count) + + // Check that all the expected proposals are in the store. + slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch + + for _, epoch := range afterBigEndianEpochs { + // Check if the proposal exists. + slot := primitives.Slot(epoch) * slotsPerEpoch + key := make([]byte, 8) + binary.BigEndian.PutUint64(key, uint64(slot)) + encodedValidatorIndex := encodeValidatorIndex(primitives.ValidatorIndex(slot)) + key = append(key, encodedValidatorIndex...) + + // Check the proposal bucket. + proposal := proposalBkt.Get(key) + require.DeepEqual(t, []byte("dummy"), proposal) + } + + // Check only the expected proposals are in the store. + c = proposalBkt.Cursor() + count = 0 + for k, _ := c.First(); k != nil; k, _ = c.Next() { + count++ + } + + require.Equal(t, len(afterBigEndianEpochs), count) + + return nil + }) + + require.NoError(t, err) +} diff --git a/beacon-chain/slasher/service.go b/beacon-chain/slasher/service.go index d4c78994f9a0..73495c0724d1 100644 --- a/beacon-chain/slasher/service.go +++ b/beacon-chain/slasher/service.go @@ -121,6 +121,25 @@ func (s *Service) run() { indexedAttsChan := make(chan *types.WrappedIndexedAtt, 1) beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1) + // This section can be totally removed once Electra is on mainnet. + headSlot := s.serviceCfg.HeadStateFetcher.HeadSlot() + headEpoch := slots.ToEpoch(headSlot) + + maxPruningEpoch := primitives.Epoch(0) + if headEpoch >= s.params.historyLength { + maxPruningEpoch = headEpoch - s.params.historyLength + } + + // For database performance reasons, database read/write operations + // are chunked into batches of maximum `batchSize` elements. + const migrationBatchSize = 10_000 + + err = s.serviceCfg.Database.Migrate(s.ctx, headEpoch, maxPruningEpoch, migrationBatchSize) + if err != nil { + log.WithError(err).Error("Failed to migrate slasher database") + return + } + s.wg.Add(1) go s.receiveAttestations(s.ctx, indexedAttsChan)