From ca5aeb205f40745ce9bce9779e7f5400052b2637 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 20 Nov 2025 15:34:32 +0100 Subject: [PATCH 1/2] Parallel cache de/serialization --- block/internal/cache/generic_cache.go | 140 +++++++++++++++----------- 1 file changed, 83 insertions(+), 57 deletions(-) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index ba4697a6d..c38bcd91c 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -1,11 +1,15 @@ package cache import ( + "bufio" "encoding/gob" + "errors" "fmt" "os" "path/filepath" "sync" + + "golang.org/x/sync/errgroup" ) // Cache is a generic cache that maintains items that are seen and hard confirmed @@ -116,15 +120,17 @@ const ( ) // saveMapGob saves a map to a file using gob encoding. -func saveMapGob[K comparable, V any](filePath string, data map[K]V) error { +func saveMapGob[K comparable, V any](filePath string, data map[K]V) (err error) { file, err := os.Create(filePath) if err != nil { return fmt.Errorf("failed to create file %s: %w", filePath, err) } - defer file.Close() + writer := bufio.NewWriter(file) - encoder := gob.NewEncoder(file) - if err := encoder.Encode(data); err != nil { + defer func() { + err = errors.Join(err, writer.Flush(), file.Sync(), file.Close()) + }() + if err := gob.NewEncoder(writer).Encode(data); err != nil { return fmt.Errorf("failed to encode to file %s: %w", filePath, err) } return nil @@ -157,48 +163,60 @@ func (c *Cache[T]) SaveToDisk(folderPath string) error { if err := os.MkdirAll(folderPath, 0o755); err != nil { return fmt.Errorf("failed to create directory %s: %w", folderPath, err) } - + var wg errgroup.Group // prepare items maps - itemsByHeightMap := make(map[uint64]*T) + wg.Go(func() error { + itemsByHeightMap := make(map[uint64]*T) - c.itemsByHeight.Range(func(k, v any) bool { - if hk, ok := k.(uint64); ok { - if it, ok := v.(*T); ok { - itemsByHeightMap[hk] = it + c.itemsByHeight.Range(func(k, v any) bool { + if hk, ok := k.(uint64); ok { + if it, ok := v.(*T); ok { + itemsByHeightMap[hk] = it + } } + return true + }) + + if err := saveMapGob(filepath.Join(folderPath, itemsByHeightFilename), itemsByHeightMap); err != nil { + return fmt.Errorf("save %s: %w", itemsByHeightFilename, err) } - return true + return nil }) - if err := saveMapGob(filepath.Join(folderPath, itemsByHeightFilename), itemsByHeightMap); err != nil { - return err - } - // prepare hashes map - hashesToSave := make(map[string]bool) - c.hashes.Range(func(k, v any) bool { - keyStr, okKey := k.(string) - valBool, okVal := v.(bool) - if okKey && okVal { - hashesToSave[keyStr] = valBool + wg.Go(func() error { + hashesToSave := make(map[string]bool) + c.hashes.Range(func(k, v any) bool { + keyStr, okKey := k.(string) + valBool, okVal := v.(bool) + if okKey && okVal { + hashesToSave[keyStr] = valBool + } + return true + }) + if err := saveMapGob(filepath.Join(folderPath, hashesFilename), hashesToSave); err != nil { + return fmt.Errorf("save %s: %w", hashesFilename, err) } - return true + return nil }) - if err := saveMapGob(filepath.Join(folderPath, hashesFilename), hashesToSave); err != nil { - return err - } // prepare daIncluded map - daIncludedToSave := make(map[string]uint64) - c.daIncluded.Range(func(k, v any) bool { - keyStr, okKey := k.(string) - valUint64, okVal := v.(uint64) - if okKey && okVal { - daIncludedToSave[keyStr] = valUint64 + wg.Go(func() error { + daIncludedToSave := make(map[string]uint64) + c.daIncluded.Range(func(k, v any) bool { + keyStr, okKey := k.(string) + valUint64, okVal := v.(uint64) + if okKey && okVal { + daIncludedToSave[keyStr] = valUint64 + } + return true + }) + if err := saveMapGob(filepath.Join(folderPath, daIncludedFilename), daIncludedToSave); err != nil { + return fmt.Errorf("save %s: %w", daIncludedFilename, err) } - return true + return nil }) - return saveMapGob(filepath.Join(folderPath, daIncludedFilename), daIncludedToSave) + return wg.Wait() } // LoadFromDisk loads the cache contents from disk from the specified folder. @@ -206,32 +224,40 @@ func (c *Cache[T]) SaveToDisk(folderPath string) error { // It's the caller's responsibility to ensure that type T (and any types it contains) // are registered with the gob package if necessary (e.g., using gob.Register). func (c *Cache[T]) LoadFromDisk(folderPath string) error { + var wg errgroup.Group // load items by height - itemsByHeightMap, err := loadMapGob[uint64, *T](filepath.Join(folderPath, itemsByHeightFilename)) - if err != nil { - return fmt.Errorf("failed to load items by height: %w", err) - } - for k, v := range itemsByHeightMap { - c.itemsByHeight.Store(k, v) - } - + wg.Go(func() error { + itemsByHeightMap, err := loadMapGob[uint64, *T](filepath.Join(folderPath, itemsByHeightFilename)) + if err != nil { + return fmt.Errorf("failed to load %s : %w", itemsByHeightFilename, err) + } + for k, v := range itemsByHeightMap { + c.itemsByHeight.Store(k, v) + } + return nil + }) // load hashes - hashesMap, err := loadMapGob[string, bool](filepath.Join(folderPath, hashesFilename)) - if err != nil { - return fmt.Errorf("failed to load hashes: %w", err) - } - for k, v := range hashesMap { - c.hashes.Store(k, v) - } - + wg.Go(func() error { + hashesMap, err := loadMapGob[string, bool](filepath.Join(folderPath, hashesFilename)) + if err != nil { + return fmt.Errorf("failed to load %s : %w", hashesFilename, err) + } + for k, v := range hashesMap { + c.hashes.Store(k, v) + } + return nil + }) // load daIncluded - daIncludedMap, err := loadMapGob[string, uint64](filepath.Join(folderPath, daIncludedFilename)) - if err != nil { - return fmt.Errorf("failed to load daIncluded: %w", err) - } - for k, v := range daIncludedMap { - c.daIncluded.Store(k, v) - } + wg.Go(func() error { + daIncludedMap, err := loadMapGob[string, uint64](filepath.Join(folderPath, daIncludedFilename)) + if err != nil { + return fmt.Errorf("failed to load %s : %w", daIncludedFilename, err) + } + for k, v := range daIncludedMap { + c.daIncluded.Store(k, v) + } - return nil + return nil + }) + return wg.Wait() } From 901c4884872ad5ac0ad3d3b523c6d52056832eff Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Fri, 21 Nov 2025 10:36:54 +0100 Subject: [PATCH 2/2] Set limit on update max height loop --- block/internal/cache/generic_cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index dee1a28b8..25cb22111 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -102,7 +102,7 @@ func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint6 c.hashByHeight.Store(blockHeight, hash) // Update max DA height if necessary - for { + for range 1_000 { current := c.maxDAHeight.Load() if daHeight <= current { return