Skip to content

Commit

Permalink
fixup! feat(shed): actor state diff stats
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Jan 6, 2025
1 parent 2ec2aa6 commit 1372d63
Showing 1 changed file with 39 additions and 14 deletions.
53 changes: 39 additions & 14 deletions cmd/lotus-shed/state-stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,26 @@ type actorStats struct {
Actor *types.Actor
Fields []fieldItem
Stats api.ObjStat
Blocks []cid.Cid `json:",omitempty"`
}

type fieldItem struct {
Name string
Cid cid.Cid
Stats api.ObjStat
Name string
Cid cid.Cid
Stats api.ObjStat
Blocks []cid.Cid `json:",omitempty"`
}

type job struct {
c cid.Cid
key string // prefix path for the region being recorded i.e. "/state/mineractor"
}

type cidCall struct {
c cid.Cid
resp chan bool
}

type result struct {
key string
stats api.ObjStat
Expand Down Expand Up @@ -122,14 +126,16 @@ func (cng *cacheNodeGetter) GetMany(ctx context.Context, list []cid.Cid) <-chan
}

type dagStatCollector struct {
ds format.NodeGetter
walk func(format.Node) ([]*format.Link, error)
ds format.NodeGetter
walk func(format.Node) ([]*format.Link, error)
collectCids bool

statsLk sync.Mutex
stats api.ObjStat
cids []cid.Cid
}

func (dsc *dagStatCollector) record(ctx context.Context, nd format.Node) error {
func (dsc *dagStatCollector) record(c cid.Cid, nd format.Node) error {
size, err := nd.Size()
if err != nil {
return err
Expand All @@ -140,6 +146,9 @@ func (dsc *dagStatCollector) record(ctx context.Context, nd format.Node) error {

dsc.stats.Size = dsc.stats.Size + size
dsc.stats.Links = dsc.stats.Links + 1
if dsc.collectCids {
dsc.cids = append(dsc.cids, c)
}

return nil
}
Expand All @@ -150,7 +159,7 @@ func (dsc *dagStatCollector) walkLinks(ctx context.Context, c cid.Cid) ([]*forma
return nil, err
}

if err := dsc.record(ctx, nd); err != nil {
if err := dsc.record(c, nd); err != nil {
return nil, err
}

Expand Down Expand Up @@ -632,6 +641,10 @@ the total state of the actor in either tipset.
Name: "diff-tipset",
Usage: "specify tipset to diff against, stat output will include only the mutated state between the two tipsets (pass comma separated array of cids or @<height> to specify tipset by height)",
},
&cli.BoolFlag{
Name: "list-blocks",
Usage: "list the CIDs of blocks in the stat set being processed, in the case of a diff-tipset this will be the blocks that are different between the two tipsets",
},
&cli.IntFlag{
Name: "workers",
Usage: "number of workers to use when processing",
Expand Down Expand Up @@ -695,6 +708,7 @@ the total state of the actor in either tipset.

numWorkers := cctx.Int("workers")
dagCacheSize := cctx.Int("dag-cache-size")
listBlocks := cctx.Bool("list-blocks")

jobs := make(chan address.Address, numWorkers)
results := make(chan actorStats, numWorkers)
Expand Down Expand Up @@ -728,7 +742,7 @@ the total state of the actor in either tipset.
}
}

actStats, err := sc.collectStats(ctx, addr, actor, dag)
actStats, err := sc.collectStats(ctx, addr, actor, dag, listBlocks)
if err != nil {
return err
}
Expand Down Expand Up @@ -927,7 +941,7 @@ type statCollector struct {
fieldCidSets map[string]*cid.Set
}

func (sc *statCollector) collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter) (actorStats, error) {
func (sc *statCollector) collectStats(ctx context.Context, addr address.Address, actor *types.Actor, dag format.NodeGetter, collectCids bool) (actorStats, error) {
log.Infow("actor", "addr", addr, "code", actor.Code, "name", builtin.ActorNameByCode(actor.Code))

nd, err := dag.Get(ctx, actor.Head)
Expand Down Expand Up @@ -978,27 +992,38 @@ func (sc *statCollector) collectStats(ctx context.Context, addr address.Address,
}

dsc := &dagStatCollector{
ds: dag,
walk: carWalkFunc,
ds: dag,
walk: carWalkFunc,
collectCids: collectCids,
}

if err := merkledag.Walk(ctx, dsc.walkLinks, actor.Head, sc.rootCidSet.Visit, merkledag.Concurrent()); err != nil {
return actorStats{}, err
}

actStats.Stats = dsc.stats
if collectCids {
actStats.Blocks = dsc.cids
}

for _, field := range fields {
dsc := &dagStatCollector{
ds: dag,
walk: carWalkFunc,
ds: dag,
walk: carWalkFunc,
collectCids: collectCids,
}

if err := merkledag.Walk(ctx, dsc.walkLinks, field.Cid, sc.fieldCidSets[field.Name].Visit, merkledag.Concurrent()); err != nil {
if err := merkledag.Walk(ctx, func(ctx context.Context, c cid.Cid) ([]*format.Link, error) {
links, err := dsc.walkLinks(ctx, c)
return links, err
}, field.Cid, sc.fieldCidSets[field.Name].Visit, merkledag.Concurrent()); err != nil {
return actorStats{}, err
}

field.Stats = dsc.stats
if collectCids {
field.Blocks = dsc.cids
}

actStats.Fields = append(actStats.Fields, field)
}
Expand Down

0 comments on commit 1372d63

Please sign in to comment.