From 75b402bbf6550e496778e3f4c06362513e208b8f Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 31 Oct 2024 14:52:51 +0300 Subject: [PATCH 1/3] feat: self-bootstrapping tracker, jetstream updates * This removes redis as a hard dependency * add profiler utils (temp) --- .gitignore | 3 +- cmd/service/main.go | 25 ++++++- config.toml | 4 +- go.mod | 2 + go.sum | 2 + .../main.go => internal/cache/bootstrap.go | 68 ++++--------------- internal/cache/cache.go | 25 +++++-- internal/cache/xmap.go | 4 +- internal/pub/jetstream.go | 45 ++++++------ 9 files changed, 85 insertions(+), 93 deletions(-) rename cmd/bootstrap/main.go => internal/cache/bootstrap.go (78%) diff --git a/.gitignore b/.gitignore index 521176f..f5b4272 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ tracker_db .idx **/*.env eth-tracker -eth-tracker-cache-bootstrap \ No newline at end of file +eth-tracker-cache-bootstrap +*.pprof \ No newline at end of file diff --git a/cmd/service/main.go b/cmd/service/main.go index 1a7f2fc..df4248f 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -25,6 +25,7 @@ import ( "github.com/grassrootseconomics/eth-tracker/internal/syncer" "github.com/grassrootseconomics/eth-tracker/internal/util" "github.com/knadh/koanf/v2" + "github.com/knadh/profiler" ) const defaultGracefulShutdownPeriod = time.Second * 30 @@ -49,6 +50,14 @@ func init() { } func main() { + // PROFILE + p := profiler.New(profiler.Conf{ + MemProfileRate: 1, + NoShutdownHook: true, + }, profiler.Cpu, profiler.Mem) + p.Start() + // PROFILE + var wg sync.WaitGroup ctx, stop := notifyShutdown() @@ -71,9 +80,13 @@ func main() { } cache, err := cache.New(cache.CacheOpts{ - Logg: lo, - CacheType: ko.MustString("core.cache_type"), - RedisDSN: ko.MustString("redis.dsn"), + Chain: chain, + Registries: ko.Strings("bootstrap.ge_registries"), + Watchlist: ko.Strings("bootstrap.watchlist"), + Blacklist: ko.Strings("bootstrap.blacklist"), + CacheType: ko.MustString("core.cache_type"), + RedisDSN: ko.MustString("redis.dsn"), + Logg: lo, }) if err != nil { lo.Error("could not initialize cache", "error", err) @@ -185,6 +198,12 @@ func main() { lo.Info("graceful shutdown routine complete") }() + // PROFILE + runtime.GC() + p.Stop() + time.Sleep(time.Second * 10) + // PROFILE + go func() { wg.Wait() stop() diff --git a/config.toml b/config.toml index 9c7120d..c534e2a 100644 --- a/config.toml +++ b/config.toml @@ -4,7 +4,7 @@ address = ":5001" [core] # Use a specific cache implementation -cache_type = "redis" +cache_type = "internal" # Use a specific db implementation db_type = "bolt" # Tune max go routines that can process blocks @@ -28,7 +28,7 @@ start_block = 0 [bootstrap] # This will bootstrap the cache on which addresses to track ge_registries = ["0xE979a64D375F5D363d7cecF3c93B9aFD40Ba9f55"] -watchlist = ["0x14dc79964da2c08b23698b3d3cc7ca32193d9955"] +watchlist = [""] blacklist = [""] [jetstream] diff --git a/go.mod b/go.mod index 290cda4..0b8be73 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/knadh/koanf/providers/env v1.0.0 github.com/knadh/koanf/providers/file v1.1.2 github.com/knadh/koanf/v2 v2.1.1 + github.com/knadh/profiler v0.2.0 github.com/lmittmann/w3 v0.17.1 github.com/nats-io/nats.go v1.36.0 github.com/puzpuzpuz/xsync/v3 v3.4.0 @@ -59,6 +60,7 @@ require ( golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect golang.org/x/time v0.7.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect rsc.io/tmplfunc v0.0.3 // indirect diff --git a/go.sum b/go.sum index b002dd9..221cb41 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,8 @@ github.com/knadh/koanf/providers/file v1.1.2 h1:aCC36YGOgV5lTtAFz2qkgtWdeQsgfxUk github.com/knadh/koanf/providers/file v1.1.2/go.mod h1:/faSBcv2mxPVjFrXck95qeoyoZ5myJ6uxN8OOVNJJCI= github.com/knadh/koanf/v2 v2.1.1 h1:/R8eXqasSTsmDCsAyYj+81Wteg8AqrV9CP6gvsTsOmM= github.com/knadh/koanf/v2 v2.1.1/go.mod h1:4mnTRbZCK+ALuBXHZMjDfG9y714L7TykVnZkXbMU3Es= +github.com/knadh/profiler v0.2.0 h1:jaY0xlQs8iaWxKdvGHOftaZnX7d8l7yrCGQPSecwnng= +github.com/knadh/profiler v0.2.0/go.mod h1:LqNkAu++MfFkbEDA63AmRaIf6UkGrLXyZ5VQQdekZiI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= diff --git a/cmd/bootstrap/main.go b/internal/cache/bootstrap.go similarity index 78% rename from cmd/bootstrap/main.go rename to internal/cache/bootstrap.go index 974e5f4..b5f2879 100644 --- a/cmd/bootstrap/main.go +++ b/internal/cache/bootstrap.go @@ -1,80 +1,38 @@ -package main +package cache import ( "context" - "flag" "log/slog" "os" "time" "github.com/ethereum/go-ethereum/common" - "github.com/grassrootseconomics/eth-tracker/internal/cache" "github.com/grassrootseconomics/eth-tracker/internal/chain" - "github.com/grassrootseconomics/eth-tracker/internal/util" "github.com/grassrootseconomics/ethutils" - "github.com/knadh/koanf/v2" "github.com/lmittmann/w3" "github.com/lmittmann/w3/module/eth" ) -var ( - build = "dev" - - confFlag string - - lo *slog.Logger - ko *koanf.Koanf -) - -func init() { - flag.StringVar(&confFlag, "config", "config.toml", "Config file location") - flag.Parse() - - lo = util.InitLogger() - ko = util.InitConfig(lo, confFlag) - - lo.Info("starting GE redis cache bootstrapper", "build", build) -} - -func main() { - if err := bootstrapCache(); err != nil { - lo.Error("critical error bootstrapping cache", "error", err) - os.Exit(1) - } -} - -func bootstrapCache() error { +func bootstrapCache( + chain chain.Chain, + cache Cache, + registries []string, + watchlist []string, + blacklist []string, + lo *slog.Logger, +) error { var ( tokenRegistryGetter = w3.MustNewFunc("tokenRegistry()", "address") quoterGetter = w3.MustNewFunc("quoter()", "address") ) - chain, err := chain.NewRPCFetcher(chain.EthRPCOpts{ - RPCEndpoint: ko.MustString("chain.rpc_endpoint"), - ChainID: ko.MustInt64("chain.chainid"), - }) - if err != nil { - lo.Error("could not initialize chain client", "error", err) - os.Exit(1) - } - - cache, err := cache.New(cache.CacheOpts{ - Logg: lo, - CacheType: ko.MustString("core.cache_type"), - RedisDSN: ko.MustString("redis.dsn"), - }) - if err != nil { - lo.Error("could not initialize cache", "error", err) - os.Exit(1) - } - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) defer cancel() - for _, registry := range ko.MustStrings("bootstrap.ge_registries") { + for _, registry := range registries { registryMap, err := chain.Provider().RegistryMap(ctx, ethutils.HexToAddress(registry)) if err != nil { - lo.Error("could not fetch registry", "error", err) + lo.Error("could not fetch registry", "registry", registry, "error", err) os.Exit(1) } @@ -229,12 +187,12 @@ func bootstrapCache() error { } } - for _, address := range ko.MustStrings("bootstrap.watchlist") { + for _, address := range watchlist { if err := cache.Add(ctx, ethutils.HexToAddress(address).Hex()); err != nil { return err } } - for _, address := range ko.MustStrings("bootstrap.blacklist") { + for _, address := range blacklist { if err := cache.Remove(ctx, ethutils.HexToAddress(address).Hex()); err != nil { return err } diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 96a5077..bd5dbbe 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -3,6 +3,8 @@ package cache import ( "context" "log/slog" + + "github.com/grassrootseconomics/eth-tracker/internal/chain" ) type ( @@ -14,9 +16,13 @@ type ( } CacheOpts struct { - Logg *slog.Logger - RedisDSN string - CacheType string + RedisDSN string + CacheType string + Registries []string + Watchlist []string + Blacklist []string + Chain chain.Chain + Logg *slog.Logger } ) @@ -24,7 +30,7 @@ func New(o CacheOpts) (Cache, error) { var cache Cache switch o.CacheType { - case "map": + case "internal": cache = NewMapCache() case "redis": redisCache, err := NewRedisCache(redisOpts{ @@ -39,5 +45,16 @@ func New(o CacheOpts) (Cache, error) { o.Logg.Warn("invalid cache type, using default type (map)") } + if err := bootstrapCache( + o.Chain, + cache, + o.Registries, + o.Watchlist, + o.Blacklist, + o.Logg, + ); err != nil { + return cache, err + } + return cache, nil } diff --git a/internal/cache/xmap.go b/internal/cache/xmap.go index 3a10bf2..cc82584 100644 --- a/internal/cache/xmap.go +++ b/internal/cache/xmap.go @@ -7,12 +7,12 @@ import ( ) type mapCache struct { - xmap *xsync.Map + xmap *xsync.MapOf[string, bool] } func NewMapCache() Cache { return &mapCache{ - xmap: xsync.NewMap(), + xmap: xsync.NewMapOf[string, bool](), } } diff --git a/internal/pub/jetstream.go b/internal/pub/jetstream.go index 590d358..55924fa 100644 --- a/internal/pub/jetstream.go +++ b/internal/pub/jetstream.go @@ -2,25 +2,25 @@ package pub import ( "context" - "errors" "fmt" "log/slog" "time" "github.com/grassrootseconomics/eth-tracker/pkg/event" "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" ) type ( JetStreamOpts struct { - Logg *slog.Logger Endpoint string PersistDuration time.Duration + Logg *slog.Logger } jetStreamPub struct { + js jetstream.JetStream natsConn *nats.Conn - jsCtx nats.JetStreamContext } ) @@ -36,33 +36,25 @@ func NewJetStreamPub(o JetStreamOpts) (Pub, error) { return nil, err } - js, err := natsConn.JetStream() + js, err := jetstream.New(natsConn) if err != nil { return nil, err } - o.Logg.Info("successfully connected to NATS server") - stream, err := js.StreamInfo(streamName) - if err != nil && !errors.Is(err, nats.ErrStreamNotFound) { - return nil, err - } - if stream == nil { - _, err := js.AddStream(&nats.StreamConfig{ - Name: streamName, - MaxAge: o.PersistDuration, - Storage: nats.FileStorage, - Subjects: streamSubjects, - Duplicates: time.Minute, - }) - if err != nil { - return nil, err - } - o.Logg.Info("successfully created NATS JetStream stream", "stream_name", streamName) - } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + js.CreateStream(ctx, jetstream.StreamConfig{ + Name: streamName, + Subjects: streamSubjects, + MaxAge: o.PersistDuration, + Storage: jetstream.FileStorage, + Duplicates: time.Minute, + }) return &jetStreamPub{ natsConn: natsConn, - jsCtx: js, + js: js, }, nil } @@ -72,16 +64,17 @@ func (p *jetStreamPub) Close() { } } -func (p *jetStreamPub) Send(_ context.Context, payload event.Event) error { +func (p *jetStreamPub) Send(ctx context.Context, payload event.Event) error { data, err := payload.Serialize() if err != nil { return err } - _, err = p.jsCtx.Publish( + _, err = p.js.Publish( + ctx, fmt.Sprintf("%s.%s", streamName, payload.TxType), data, - nats.MsgId(fmt.Sprintf("%s:%d", payload.TxHash, payload.Index)), + jetstream.WithMsgID(fmt.Sprintf("%s:%d", payload.TxHash, payload.Index)), ) if err != nil { return err From 2283190147cea90a567ab852478da71a655d0e3d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 31 Oct 2024 14:53:39 +0300 Subject: [PATCH 2/3] build(deps): bump github.com/redis/rueidis from 1.0.47 to 1.0.48 (#51) Bumps [github.com/redis/rueidis](https://github.com/redis/rueidis) from 1.0.47 to 1.0.48. - [Release notes](https://github.com/redis/rueidis/releases) - [Commits](https://github.com/redis/rueidis/compare/v1.0.47...v1.0.48) --- updated-dependencies: - dependency-name: github.com/redis/rueidis dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 0b8be73..871881f 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/lmittmann/w3 v0.17.1 github.com/nats-io/nats.go v1.36.0 github.com/puzpuzpuz/xsync/v3 v3.4.0 - github.com/redis/rueidis v1.0.47 + github.com/redis/rueidis v1.0.48 github.com/stretchr/testify v1.9.0 github.com/uptrace/bunrouter v1.0.22 go.etcd.io/bbolt v1.3.11 diff --git a/go.sum b/go.sum index 221cb41..47c7614 100644 --- a/go.sum +++ b/go.sum @@ -173,8 +173,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4= github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA= -github.com/redis/rueidis v1.0.47 h1:41UdeXOo4eJuW+cfpUJuLtVGyO0QJY3A2rEYgJWlfHs= -github.com/redis/rueidis v1.0.47/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM= +github.com/redis/rueidis v1.0.48 h1:ggZHjEtc/echUmPkGTfssRisnc3p/mIUEwrpbNsZ1mQ= +github.com/redis/rueidis v1.0.48/go.mod h1:by+34b0cFXndxtYmPAHpoTHO5NkosDlBvhexoTURIxM= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= From eee3757895f2f37d3c27437c44ca569927299a26 Mon Sep 17 00:00:00 2001 From: Mohammed Sohail Date: Thu, 31 Oct 2024 14:58:59 +0300 Subject: [PATCH 3/3] docs: update README --- README.md | 53 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 319e5ea..61e6293 100644 --- a/README.md +++ b/README.md @@ -2,35 +2,45 @@ ![GitHub Tag](https://img.shields.io/github/v/tag/grassrootseconomics/eth-tracker) -A fast and lightweight tracker designed to monitor EVM blockchains for live and historical transaction events, including reverted transactions. It filters these events and publishes them to NATS for further processing. +A fast and lightweight tracker designed to monitor EVM blockchains for live and +historical transaction events, including reverted transactions. It filters these +events and publishes them to NATS for further processing. -It applies deduplication at the NATS level, making it safe to run in a distributed fashion. +It applies deduplication at the NATS level, making it safe to run in a +distributed fashion. -Note: To run it against an L2/EVM chain, you will need to manually add a replace directive in the `go.mod` file pointing to the EVM chain's `*geth` compatible source code. This will allow the tracker to process transaction types other than Ethereum's `0x0, 0x1 and 0x2`. +Note: To run it against an L2/EVM chain, you will need to manually add a replace +directive in the `go.mod` file pointing to the EVM chain's `*geth` compatible +source code. This will allow the tracker to process transaction types other than +Ethereum's `0x0, 0x1 and 0x2`. ### CEL2 -We maintain a CEL2 compatible tracker (source and container image) on the `cel2` branch. +We maintain a CEL2 compatible tracker (source and container image) on the `cel2` +branch. ## Getting Started A `Makefile` is also provided to build the required binaries to run eth-tracker. -### Bootstrap Cache +### Cache Bootstrap -An optional binary, `eth-tracker-cache-bootstrap`, is included to build the Redis cache with all relevant Grassroots Economics smart contract and user addresses to allow filtering on very busy smart contracts e.g. cUSD. +During startup `eth-tracker` will always build the cache with all relevant +Grassroots Economics smart contract and user addresses to allow filtering on +very busy smart contracts e.g. cUSD. The cache will auto-update based on any additions/removals from all indexes. ### Prerequisites -* Git -* Docker -* NATS server -* Redis server -* Access to a Celo RPC node +- Git +- Docker +- NATS server +- Redis server (Optional) +- Access to a Celo RPC node -See [docker-compose.yaml](dev/docker-compose.yaml) for an example on how to run and deploy a single instance. +See [docker-compose.yaml](dev/docker-compose.yaml) for an example on how to run +and deploy a single instance. ### 1. Build the Docker image @@ -48,9 +58,12 @@ docker images ### 2. Run NATS and Redis For an example, see `dev/docker-compose.yaml`. + ### 3. Update config values -See `.env.example` on how to override default values defined in `config.toml` using env variables. Alternatively, mount your own config.toml either during build time or Docker runtime. +See `.env.example` on how to override default values defined in `config.toml` +using env variables. Alternatively, mount your own config.toml either during +build time or Docker runtime. ```bash # Override only specific config values @@ -58,8 +71,8 @@ nano .env.example mv .env.example .env ``` -Refer to [`config.toml`](config.toml) to understand different config value settings. - +Refer to [`config.toml`](config.toml) to understand different config value +settings. ### 4. Run the tracker @@ -86,7 +99,8 @@ docker compose up ### Monitoring with NATS CLI -Install NATS CLI from [here](https://github.com/nats-io/natscli?tab=readme-ov-file#installation). +Install NATS CLI from +[here](https://github.com/nats-io/natscli?tab=readme-ov-file#installation). ```bash nats subscribe "TRACKER.*" @@ -94,8 +108,11 @@ nats subscribe "TRACKER.*" ### DB File -A `tracker_db` file is created on the first run. This keeps track of all blocks missed by the processor to attempt a retry later on. This file should not be deleted if you want to maintain resume support for historical tracking across restarts. +A `tracker_db` file is created on the first run. This keeps track of all blocks +missed by the processor to attempt a retry later on. This file should not be +deleted if you want to maintain resume support for historical tracking across +restarts. ## License -[AGPL-3.0](LICENSE). \ No newline at end of file +[AGPL-3.0](LICENSE).