Skip to content

Commit

Permalink
versiondb streaming service checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tasiov committed Aug 15, 2023
1 parent 4c3715f commit b33b394
Show file tree
Hide file tree
Showing 12 changed files with 1,316 additions and 9 deletions.
18 changes: 16 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ ifeq (,$(findstring nostrip,$(STARGAZE_BUILD_OPTIONS)))
BUILD_FLAGS += -trimpath
endif

export CGO_CFLAGS = -I/opt/homebrew/Cellar/rocksdb/8.3.2/include \
-I/opt/homebrew/Cellar/zstd/1.5.5/include \
-I/opt/homebrew/Cellar/lz4/1.9.4/include \
-I/opt/homebrew/Cellar/snappy/1.1.10/include

export CGO_LDFLAGS = -L/opt/homebrew/Cellar/rocksdb/8.3.2/lib -lrocksdb \
-L/opt/homebrew/Cellar/zstd/1.5.5/lib -lzstd \
-L/opt/homebrew/Cellar/lz4/1.9.4/lib -llz4 \
-L/opt/homebrew/Cellar/snappy/1.1.10/lib -lsnappy


check_go_version:
@echo "Go version: $(GO_MAJOR_VERSION).$(GO_MINOR_VERSION)"
ifneq ($(GO_MINOR_VERSION),20)
Expand Down Expand Up @@ -122,11 +133,14 @@ build-docker:
docker-test: build-linux
docker build -f docker/Dockerfile.test -t rocketprotocol/stargaze-relayer-test:latest .


test:
go test -v -race github.com/public-awesome/stargaze/v11/x/...

.PHONY: test build-linux docker-test lint build install format
test-pkg:
@echo "Running tests for package $(CGO_CFLAGS)"
go test $(PKG) -v -race $(BUILD_FLAGS)

.PHONY: test test-pkg build-linux docker-test lint build install format

format:
gofumpt -l -w .
Expand Down
164 changes: 164 additions & 0 deletions app/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package app

import (
"fmt"
"strings"
"sync"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/public-awesome/stargaze/v11/versiondb"
"github.com/spf13/cast"
)

// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, codec.BinaryCodec) (baseapp.StreamingService, error)

// ServiceType enum for specifying the type of StreamingService
type ServiceType int

const (
Unknown ServiceType = iota
File
VersionDb
)

// Streaming option keys
const (
OptStoreStreamers = "store.streamers"
)

// ServiceTypeFromString returns the streaming.ServiceType corresponding to the
// provided name.
func ServiceTypeFromString(name string) ServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File
case "versiondb", "v":
return VersionDb

default:
return Unknown
}
}

// String returns the string name of a streaming.ServiceType
func (sst ServiceType) String() string {
switch sst {
case File:
return "file"
case VersionDb:
return "versiondb"

default:
return "unknown"
}
}

// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to
// streaming.ServiceConstructors types.
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
File: streaming.NewFileStreamingService,
VersionDb: versiondb.NewVersionDbStreamingService,
}

// NewServiceConstructor returns the streaming.ServiceConstructor corresponding
// to the provided name.
func NewServiceConstructor(name string) (ServiceConstructor, error) {
ssType := ServiceTypeFromString(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}

if constructor, ok := ServiceConstructorLookupTable[ssType]; ok && constructor != nil {
return constructor, nil
}

return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}

// LoadStreamingServices is a function for loading StreamingServices onto the
// BaseApp using the provided AppOptions, codec, and keys. It returns the
// WaitGroup and quit channel used to synchronize with the streaming services
// and any error that occurs during the setup.
func LoadStreamingServices(
bApp *baseapp.BaseApp,
appOpts serverTypes.AppOptions,
appCodec codec.BinaryCodec,
keys map[string]*types.KVStoreKey,
) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
// waitgroup and quit channel for optional shutdown coordination of the streaming service(s)
wg := new(sync.WaitGroup)

// configure state listening capabilities using AppOptions
streamers := cast.ToStringSlice(appOpts.Get(OptStoreStreamers))
activeStreamers := make([]baseapp.StreamingService, 0, len(streamers))

for _, streamerName := range streamers {
var exposeStoreKeys []types.StoreKey

// get the store keys allowed to be exposed for this streaming service
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName)))

// if list contains '*', expose all store keys
if sdk.SliceContains(exposeKeyStrs, "*") {
exposeStoreKeys = make([]types.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
} else {
exposeStoreKeys = make([]types.StoreKey, 0, len(exposeKeyStrs))
for _, keyStr := range exposeKeyStrs {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
}
}

if len(exposeStoreKeys) == 0 {
continue
}

constructor, err := NewServiceConstructor(streamerName)
if err != nil {
// Close any services we may have already spun up before hitting the error
// on this one.
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}

return nil, nil, err
}

// Generate the streaming service using the constructor, appOptions, and the
// StoreKeys we want to expose.
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
// Close any services we may have already spun up before hitting the error
// on this one.
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}

return nil, nil, err
}

// register the streaming service with the BaseApp
bApp.SetStreamingService(streamingService)

// kick off the background streaming service loop
streamingService.Stream(wg)

// add to the list of active streamers
activeStreamers = append(activeStreamers, streamingService)
}

// If there are no active streamers, activeStreamers is empty (len == 0) and
// the waitGroup is not waiting on anything.
return activeStreamers, wg, nil
}
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ require (
github.com/cosmos/cosmos-sdk v0.45.16
github.com/cosmos/go-bip39 v1.0.0
github.com/cosmos/gogoproto v1.4.6
github.com/cosmos/iavl v0.19.5
github.com/cosmos/ibc-go/v4 v4.4.2
github.com/gogo/protobuf v1.3.3
github.com/golang/protobuf v1.5.3
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/linxGnu/grocksdb v1.8.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/regen-network/cosmos-proto v0.3.1
github.com/spf13/cast v1.5.1
github.com/spf13/cobra v1.7.0
github.com/stretchr/testify v1.8.4
Expand All @@ -28,8 +32,6 @@ require (
gopkg.in/yaml.v2 v2.4.0
)

require github.com/prometheus/client_golang v1.14.0

require (
cosmossdk.io/api v0.2.6 // indirect
cosmossdk.io/core v0.5.1 // indirect
Expand All @@ -56,7 +58,6 @@ require (
github.com/cosmos/btcutil v1.0.4 // indirect
github.com/cosmos/cosmos-db v0.0.0-20221226095112-f3c38ecb5e32 // indirect
github.com/cosmos/gorocksdb v1.2.0 // indirect
github.com/cosmos/iavl v0.19.5 // indirect
github.com/cosmos/ledger-cosmos-go v0.12.2 // indirect
github.com/creachadair/taskgroup v0.3.2 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
Expand Down Expand Up @@ -101,7 +102,6 @@ require (
github.com/kr/text v0.2.0 // indirect
github.com/lib/pq v1.10.7 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/linxGnu/grocksdb v1.7.10 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
Expand All @@ -120,7 +120,6 @@ require (
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rakyll/statik v0.1.7 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/regen-network/cosmos-proto v0.3.1 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/rs/zerolog v1.27.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -676,8 +676,8 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/linxGnu/grocksdb v1.7.10 h1:dz7RY7GnFUA+GJO6jodyxgkUeGMEkPp3ikt9hAcNGEw=
github.com/linxGnu/grocksdb v1.7.10/go.mod h1:0hTf+iA+GOr0jDX4CgIYyJZxqOH9XlBh6KVj8+zmF34=
github.com/linxGnu/grocksdb v1.8.0 h1:H4L/LhP7GOMf1j17oQAElHgVlbEje2h14A8Tz9cM2BE=
github.com/linxGnu/grocksdb v1.8.0/go.mod h1:09CeBborffXhXdNpEcOeZrLKEnRtrZFEpFdPNI9Zjjg=
github.com/lucasjones/reggen v0.0.0-20180717132126-cdb49ff09d77/go.mod h1:5ELEyG+X8f+meRWHuqUOewBOhvHkl7M76pdGEansxW4=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
Expand Down
116 changes: 116 additions & 0 deletions versiondb/streaming_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package versiondb

import (
"context"
"os"
"path/filepath"
"sort"
"strings"
"sync"

"github.com/public-awesome/stargaze/v11/versiondb/tsrocksdb"
abci "github.com/tendermint/tendermint/abci/types"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/store/types"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
)

var _ baseapp.StreamingService = &StreamingService{}

// StreamingService is a concrete implementation of StreamingService that accumulate the state changes in current block,
// writes the ordered changeset out to version storage.
type StreamingService struct {
listeners []*types.MemoryListener // the listeners that will be initialized with BaseApp
VersionStore VersionStore
CurrentBlockNumber int64 // the current block number
}

// NewFileStreamingService is the streaming.ServiceConstructor function for
// creating a FileStreamingService.
func NewVersionDbStreamingService(
homePath string,
keys []storetypes.StoreKey,
marshaller codec.BinaryCodec,
) (*StreamingService, error) {
dataDir := filepath.Join(homePath, "data", "versiondb")
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
return nil, err
}
versionDB, err := tsrocksdb.NewStore(dataDir)
if err != nil {
return nil, err
}

// default to exposing all
exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}

service := NewStreamingService(versionDB, exposeStoreKeys)

return service, nil
}

// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys
func NewStreamingService(versionStore VersionStore, storeKeys []types.StoreKey) *StreamingService {
// sort by the storeKeys first
sort.SliceStable(storeKeys, func(i, j int) bool {
return strings.Compare(storeKeys[i].Name(), storeKeys[j].Name()) < 0
})

listeners := make([]*types.MemoryListener, len(storeKeys))
for i, key := range storeKeys {
listeners[i] = types.NewMemoryListener(key)
}
return &StreamingService{listeners, versionStore, 0}
}

// Listeners satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener {
listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.listeners))
for _, listener := range fss.listeners {
listeners[listener.StoreKey()] = []types.WriteListener{listener}
}
return listeners
}

// ListenBeginBlock satisfies the baseapp.ABCIListener interface
// It sets the currentBlockNumber.
func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error {
fss.CurrentBlockNumber = req.GetHeader().Height
return nil
}

// ListenDeliverTx satisfies the baseapp.ABCIListener interface
func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error {
return nil
}

// ListenEndBlock satisfies the baseapp.ABCIListener interface
// It merge the state caches of all the listeners together, and write out to the versionStore.
func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error {
return nil
}

func (fss *StreamingService) ListenCommit(ctx context.Context, res abci.ResponseCommit) error {
// concat the state caches
var changeSet []types.StoreKVPair
for _, listener := range fss.listeners {
changeSet = append(changeSet, listener.PopStateCache()...)
}

return fss.VersionStore.PutAtVersion(fss.CurrentBlockNumber, changeSet)
}

// Stream satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Stream(wg *sync.WaitGroup) error {
return nil
}

// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface
func (fss *StreamingService) Close() error {
return nil
}
Loading

0 comments on commit b33b394

Please sign in to comment.