Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial change for adding changeset proto and command line tool #39

Open
wants to merge 72 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
5cd79a1
Initial change for adding changeset proto and command line tool
yzang2019 Sep 25, 2023
275fdcf
Fix bug
yzang2019 Sep 25, 2023
97eb985
Fix bug
yzang2019 Sep 25, 2023
f2a4c9b
Fix bug
yzang2019 Sep 25, 2023
c0f5567
Add logs
yzang2019 Sep 25, 2023
962ae0d
Add logs
yzang2019 Sep 25, 2023
ad553d1
Add logs
yzang2019 Sep 25, 2023
e305f9f
Add logs
yzang2019 Sep 25, 2023
40f09ef
Add logs
yzang2019 Sep 25, 2023
d13275a
Add logs
yzang2019 Sep 25, 2023
c42bbb7
Add logs
yzang2019 Sep 25, 2023
71c938a
Add export command
yzang2019 Oct 4, 2023
89d5711
Add segment size
yzang2019 Oct 4, 2023
c2b2336
Add segment size
yzang2019 Oct 4, 2023
be08b97
Add more logs
yzang2019 Oct 4, 2023
cbc6f4c
Fix segment size
yzang2019 Oct 4, 2023
f54347b
Fix segment size
yzang2019 Oct 4, 2023
89d85c8
Fix concurrency
yzang2019 Oct 4, 2023
b25e7a8
Add logs
yzang2019 Oct 4, 2023
0327e1e
Add logs
yzang2019 Oct 4, 2023
4e9cd09
Add logs
yzang2019 Oct 4, 2023
4f54e2c
Add logs
yzang2019 Oct 4, 2023
a15ab32
Add logs
yzang2019 Oct 4, 2023
7737221
Add logs
yzang2019 Oct 4, 2023
09ce638
Add logs
yzang2019 Oct 4, 2023
55d9388
Add logs
yzang2019 Oct 4, 2023
9dba74c
Add logs
yzang2019 Oct 4, 2023
09eb599
Add logs
yzang2019 Oct 4, 2023
ad8d8bb
Add logs
yzang2019 Oct 4, 2023
eeba76a
Add logs
yzang2019 Oct 4, 2023
4d4d882
Add logs
yzang2019 Oct 4, 2023
aff12e1
Add logs
yzang2019 Oct 4, 2023
aaf2bb6
Add logs
yzang2019 Oct 4, 2023
3b16722
Add logs
yzang2019 Oct 4, 2023
505449f
Add logs
yzang2019 Oct 4, 2023
13b74a6
Remove logs
yzang2019 Oct 5, 2023
d75b459
Remove logs
yzang2019 Oct 5, 2023
56378bd
Remove logs
yzang2019 Oct 5, 2023
3cdf9ff
Add importer and reader for reading changeset
yzang2019 Oct 6, 2023
d45d7e5
Fix errors join
yzang2019 Oct 6, 2023
6564020
Add print changeset command
yzang2019 Oct 6, 2023
38f7667
Add logs
yzang2019 Oct 6, 2023
0a9c640
Add log
yzang2019 Oct 6, 2023
febc41e
Add log
yzang2019 Oct 6, 2023
a3ede19
Add log
yzang2019 Oct 6, 2023
b459153
Add log
yzang2019 Oct 6, 2023
677f664
Add log
yzang2019 Oct 7, 2023
86228a9
Add log
yzang2019 Oct 7, 2023
b85117d
Add log
yzang2019 Oct 7, 2023
95c9f75
Add log
yzang2019 Oct 7, 2023
5d10184
Add log
yzang2019 Oct 7, 2023
11ab8fe
Add log
yzang2019 Oct 7, 2023
e600427
Add log
yzang2019 Oct 7, 2023
156e7eb
Add log
yzang2019 Oct 7, 2023
a6b9788
Add chunk file support
yzang2019 Oct 7, 2023
ab545cd
Add chunk file support
yzang2019 Oct 7, 2023
a7fe73c
Add chunk file support
yzang2019 Oct 7, 2023
352cdf1
Fix logging
yzang2019 Oct 7, 2023
e032e12
Fix wrap reader
yzang2019 Oct 10, 2023
18e1d87
Fix go mod
yzang2019 Oct 10, 2023
ba73621
Add log
yzang2019 Oct 10, 2023
af43379
Fix print command
yzang2019 Oct 10, 2023
3116486
Fix go mod
yzang2019 Oct 10, 2023
3fc52ca
Add logs
yzang2019 Oct 10, 2023
47c352f
Fix flush
yzang2019 Oct 10, 2023
966af35
Add logs
yzang2019 Oct 10, 2023
1d713c6
Add logs
yzang2019 Oct 10, 2023
9ea4bcf
Add logs
yzang2019 Oct 10, 2023
be0d571
Add logs
yzang2019 Oct 10, 2023
cac4c5e
Fix ending header
yzang2019 Oct 10, 2023
316ab35
Fix logs
yzang2019 Oct 10, 2023
9365ed9
Add prints
yzang2019 Dec 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ all: lint test install
install:
ifeq ($(COLORS_ON),)
go install ./cmd/iaviewer
go install ./cmd/changeset
else
go install $(CMDFLAGS) ./cmd/iaviewer
go install $(CMDFLAGS) ./cmd/changeset
endif
.PHONY: install


test-short:
@echo "--> Running go test"
@go test ./... $(LDFLAGS) -v --race --short
Expand Down Expand Up @@ -125,5 +128,5 @@ proto-check-breaking:

proto-gen:
@echo "Generating Protobuf files"
$(DOCKER) run --rm -v $(CURDIR):/workspace --workdir /workspace tendermintdev/sdk-proto-gen:master sh scripts/protocgen.sh
$(DOCKER) run --platform linux/amd64 --rm -v $(CURDIR):/workspace --workdir /workspace tendermintdev/sdk-proto-gen:master sh scripts/protocgen.sh
.PHONY: proto-gen-d
297 changes: 297 additions & 0 deletions changeset/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
package changeset

import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
"math"
"math/bits"
"os"
"path/filepath"
"sync"
"time"

"github.com/alitto/pond"
"github.com/cosmos/iavl"
"github.com/klauspost/compress/zstd"
dbm "github.com/tendermint/tm-db"
)

const (
DefaultCacheSize int = 10000000
)

type Exporter struct {
db *dbm.PrefixDB
start int64
end int64
concurrency int
segmentSize int64
outputDir string
}

func NewExporter(
db *dbm.PrefixDB,
start int64,
end int64,
concurrency int,
segmentSize int64,
outputDir string,
) *Exporter {
return &Exporter{
db: db,
start: start,
end: end,
concurrency: concurrency,
segmentSize: segmentSize,
outputDir: outputDir,
}
}

func (exporter *Exporter) Start() error {
// use a worker pool with enough buffer to parallelize the export
pool := pond.New(exporter.concurrency, 10240)
defer pool.StopAndWait()

// share the iavl tree between tasks to reuse the node cache
iavlTreePool := sync.Pool{
New: func() any {
// use separate prefixdb and iavl tree in each task to maximize concurrency performance
return iavl.NewImmutableTree(exporter.db, DefaultCacheSize, true)
},
}

// split into segments
var segmentSize = exporter.segmentSize
for i := exporter.start; i < exporter.end; i += segmentSize {
end := i + segmentSize
if end > exporter.end {
end = exporter.end
}
var chunkFiles []string
group, _ := pool.GroupContext(context.Background())
fmt.Printf("Start exporting segment %d-%d at %s\n", i, end, time.Now().Format(time.RFC3339))
for _, workRange := range splitWorkLoad(exporter.concurrency, Range{i, end}) {
workRange := workRange
chunkFile := filepath.Join(exporter.outputDir, fmt.Sprintf("tmp-chunk-%d-%d.zst", workRange.Start, workRange.End))
group.Submit(func() error {
tree := iavlTreePool.Get().(*iavl.ImmutableTree)
defer iavlTreePool.Put(tree)
fmt.Printf("Start exporting chunk %s at %s\n", chunkFile, time.Now().Format(time.RFC3339))
err := writeChangesetChunks(chunkFile, tree, workRange.Start, workRange.End)
fmt.Printf("Finished exporting chunk %s at %s\n", chunkFile, time.Now().Format(time.RFC3339))
return err
})
chunkFiles = append(chunkFiles, chunkFile)
}

if err := group.Wait(); err != nil {
return err
}
segmentFile := filepath.Join(exporter.outputDir, fmt.Sprintf("changeset-%d-%d.zst", i, end))
err := collectChunksToSegment(segmentFile, chunkFiles)
if err != nil {
return err
}
fmt.Printf("Finished exporting segment %d-%d at %s\n", i, end, time.Now().Format(time.RFC3339))
}

return nil
}

func writeChangesetChunks(chunkFilePath string, tree *iavl.ImmutableTree, start int64, end int64) (returnErr error) {
fmt.Printf("Exporting changeset chunk %d-%d\n", start, end)
chunkFile, err := createFile(chunkFilePath)
zstdWriter, err := zstd.NewWriter(chunkFile)

if err != nil {
fmt.Printf("Error: %v\n", err)
return err
}

defer func() {
err := zstdWriter.Close()
if err != nil {
returnErr = err
}
if err := chunkFile.Close(); returnErr == nil {
returnErr = err
}
}()

if err != nil {
fmt.Printf("Error: %v\n", err)
return err
}
if err := tree.TraverseStateChanges(start, end, func(version int64, changeSet *iavl.ChangeSet) error {
return WriteChangeSet(zstdWriter, version, *changeSet)
}); err != nil {
fmt.Printf("Error: %v\n", err)
return err
}

return zstdWriter.Flush()
}

func collectChunksToSegment(outputFile string, chunkFiles []string) error {
fp, err := createFile(outputFile)
if err != nil {
return err
}

bufWriter := bufio.NewWriter(fp)
writer, _ := zstd.NewWriter(bufWriter)

defer fp.Close()

for _, chunkFile := range chunkFiles {
if err := copyTmpFile(writer, chunkFile); err != nil {
return err
}
if err := os.Remove(chunkFile); err != nil {
return err
}
}

// Write ending header
var endingHeader [16]byte
binary.LittleEndian.PutUint64(endingHeader[:], uint64(math.MaxUint64))
binary.LittleEndian.PutUint64(endingHeader[8:16], uint64(0))
if _, err := writer.Write(endingHeader[:]); err != nil {
return err
}

err = writer.Close()
if err != nil {
return err
}
return bufWriter.Flush()
}

// copyTmpFile append the compressed temporary file to writer
func copyTmpFile(writer io.Writer, tmpFile string) error {
fp, err := os.Open(tmpFile)
if err != nil {
return err
}
defer fp.Close()

reader, _ := zstd.NewReader(fp)
_, err = io.Copy(writer, reader)
return err
}

func createFile(name string) (*os.File, error) {
return os.OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o600)
}

func WriteChangeSet(writer io.Writer, version int64, cs iavl.ChangeSet) error {
if len(cs.Pairs) <= 0 {
return nil
}

var size int
items := make([][]byte, 0, len(cs.Pairs))
for _, pair := range cs.Pairs {
buf, err := encodeKVPair(pair)
if err != nil {
return err
}
size += len(buf)
items = append(items, buf)
}

// Write header
fmt.Printf("Writing version %d with %d items at %s\n", version, len(items), time.Now().Format(time.RFC3339))
var versionHeader [16]byte
binary.LittleEndian.PutUint64(versionHeader[:], uint64(version))
binary.LittleEndian.PutUint64(versionHeader[8:16], uint64(size))

if _, err := writer.Write(versionHeader[:]); err != nil {
return err
}
for _, item := range items {
if _, err := writer.Write(item); err != nil {
return err
}
}
return nil
}

type Range struct {
Start, End int64
}

func splitWorkLoad(workers int, full Range) []Range {
var ranges []Range
chunkSize := (full.End - full.Start + int64(workers) - 1) / int64(workers)
for i := full.Start; i < full.End; i += chunkSize {
end := i + chunkSize
if end > full.End {
end = full.End
}
ranges = append(ranges, Range{Start: i, End: end})
}
return ranges
}

// encodeKVPair encode a key-value pair in change set.
// see godoc of `encodedSizeOfKVPair` for layout description,
// returns error if key/value length overflows.
func encodeKVPair(pair *iavl.KVPair) ([]byte, error) {
buf := make([]byte, encodedSizeOfKVPair(pair))

offset := 1
keyLen := len(pair.Key)

written := binary.PutUvarint(buf[offset:], uint64(keyLen))
offset += written

copy(buf[offset:], pair.Key)
if pair.Delete {
buf[0] = 1
return buf, nil
} else {
buf[0] = 0
}

offset += keyLen

valueLen := len(pair.Value)
written = binary.PutUvarint(buf[offset:], uint64(valueLen))
offset += written

copy(buf[offset:], pair.Value)

return buf, nil
}

// encodedSizeOfKVPair returns the encoded length of a key-value pair
//
// layout: deletion(1) + keyLen(varint) + key + [ valueLen(varint) + value ]
func encodedSizeOfKVPair(pair *iavl.KVPair) int {
keyLen := len(pair.Key)
size := 1 + uvarintSize(uint64(keyLen)) + keyLen
if pair.Delete {
return size
}

valueLen := len(pair.Value)
return size + uvarintSize(uint64(valueLen)) + valueLen
}

// uvarintSize function calculates the size (in bytes) needed to encode an unsigned integer in a variable-length format
// based on the number of bits required to represent the integer's value.
// This is a common operation when serializing data structures into binary formats where compactness and
// variable-length encoding are desired.
func uvarintSize(num uint64) int {
bits := bits.Len64(num)
q, r := bits/7, bits%7
size := q
if r > 0 || size == 0 {
size++
}
return size
}
Loading
Loading