Skip to content

Commit

Permalink
TVL last sync time;
Browse files Browse the repository at this point in the history
  • Loading branch information
mismirnov committed Dec 23, 2024
1 parent e713682 commit f948594
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 22 deletions.
8 changes: 8 additions & 0 deletions internal/storage/postgres/tvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"github.com/celenium-io/celestia-indexer/internal/storage"
"github.com/dipdup-net/go-lib/database"
"time"
)

// Tvl -
Expand Down Expand Up @@ -36,3 +37,10 @@ func (t *Tvl) SaveBulk(ctx context.Context, tvls ...*storage.Tvl) error {
_, err := t.db.DB().NewInsert().Model(&tvls).Exec(ctx)
return err
}

func (t *Tvl) LastSyncTime(ctx context.Context) (response time.Time, err error) {
err = t.db.DB().NewSelect().Model((*storage.Tvl)(nil)).
ColumnExpr("MAX(time) AS time").
Scan(ctx, &response)
return
}
2 changes: 2 additions & 0 deletions internal/storage/tvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ const (

//go:generate mockgen -source=$GOFILE -destination=mock/$GOFILE -package=mock -typed
type ITvl interface {
LastSyncTime(ctx context.Context) (time.Time, error)

Save(ctx context.Context, rollupTvl *Tvl) error
SaveBulk(ctx context.Context, tvls ...*Tvl) error
}
Expand Down
77 changes: 55 additions & 22 deletions pkg/tvl/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const (
rollupLimit = 100
)

var (
syncTimestamp = time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC)
)

type Module struct {
modules.BaseModule
l2beatApi l2beat.IApi
Expand Down Expand Up @@ -56,6 +60,7 @@ func (m *Module) getTvl(ctx context.Context, timeframe storage.TvlTimeframe) {
rollups, err := m.rollup.List(ctx, rollupLimit, 0, strg.SortOrderAsc)
if err != nil {
m.Log.Err(err).Msg("receiving rollups")
return
}

for i := range rollups {
Expand All @@ -64,71 +69,93 @@ func (m *Module) getTvl(ctx context.Context, timeframe storage.TvlTimeframe) {
lastIndex := strings.LastIndex(url, "/")

if lastIndex == -1 {
return
continue
}

rollupProject := url[lastIndex+1:]
tvl, err := m.rollupTvlFromL2Beat(ctx, rollupProject, timeframe)
if err != nil {
m.Log.Err(err).Msg("receiving TVL from L2Beat")
continue
}

tvlResponse := tvl[0].Result.Data.Json
tvlModels := make([]*storage.Tvl, 0, len(tvlResponse))
tvlModels := make([]*storage.Tvl, 0)
for _, t := range tvlResponse {
rollupTvl := t[1].(float64) + t[2].(float64) + t[3].(float64)
tsTvl := int64(t[0].(float64))
tvlModels = append(tvlModels, &storage.Tvl{
Value: rollupTvl,
Time: time.Unix(tsTvl, 0),
Rollup: rollups[i],
RollupId: rollups[i].Id,
})
tvlTs := time.Unix(int64(t[0].(float64)), 0)
if tvlTs.After(syncTimestamp) {
tvlModels = append(tvlModels, &storage.Tvl{
Value: rollupTvl,
Time: tvlTs,
Rollup: rollups[i],
RollupId: rollups[i].Id,
})
}
}

if len(tvlModels) == 0 {
continue
}

if err := m.tvl.SaveBulk(ctx, tvlModels...); err != nil {
m.Log.Err(err).Msg("saving tvls")
}

continue
}

if len(rollups[i].DeFiLama) > 0 {
tvl, err := m.rollupTvlFromLama(ctx, rollups[i].DeFiLama)
if err != nil {
m.Log.Err(err).Msg("receiving TVL from DeFi Lama")
continue
}

tvlModels := make([]*storage.Tvl, 0, len(tvl))
tvlModels := make([]*storage.Tvl, 0)
for _, t := range tvl {
tvlModels = append(tvlModels, &storage.Tvl{
Value: t.TVL,
Time: time.Unix(t.Date, 0),
Rollup: rollups[i],
RollupId: rollups[i].Id,
})
tvlTs := time.Unix(t.Date, 0)
if tvlTs.After(syncTimestamp) {
tvlModels = append(tvlModels, &storage.Tvl{
Value: t.TVL,
Time: tvlTs,
Rollup: rollups[i],
RollupId: rollups[i].Id,
})
}
}

if len(tvlModels) == 0 {
continue
}

if err := m.tvl.SaveBulk(ctx, tvlModels...); err != nil {
m.Log.Err(err).Msg("saving tvls")
}

continue
}
}
syncTimestamp, err = m.lastSyncTimeTvl(ctx)
if err != nil {
m.Log.Err(err).Msg("receiving last sync time for TVL")
}
}

func (m *Module) receive(ctx context.Context) {
syncTime, err := m.lastSyncTimeTvl(ctx)
if err != nil {
m.Log.Err(err).Msg("receiving last sync time for TVL")
return
}

syncTimestamp = syncTime
m.getTvl(ctx, storage.TvlTimeframeMax)
ticker := time.NewTicker(time.Second * 24)
ticker := time.NewTicker(time.Hour * 24)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.getTvl(ctx, storage.TvlTimeframeMonth)
m.getTvl(ctx, storage.TvlTimeframe6Month)
}
}
}
Expand All @@ -144,3 +171,9 @@ func (m *Module) rollupTvlFromL2Beat(ctx context.Context, rollupName string, tim
defer cancel()
return m.l2beatApi.TVL(requestTimeout, rollupName, timeframe)
}

func (m *Module) lastSyncTimeTvl(ctx context.Context) (time.Time, error) {
requestTimeout, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
return m.tvl.LastSyncTime(requestTimeout)
}

0 comments on commit f948594

Please sign in to comment.