Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add data-dir arg to walletnode to control default path, fix commit inserts and update last accessed time workers #925

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions common/storage/queries/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ package queries
import (
"context"
"fmt"
"path/filepath"

"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3" //go-sqlite3
"github.com/pastelnetwork/gonode/common/configurer"
"github.com/pastelnetwork/gonode/common/log"
"path/filepath"
)

var (
DefaulthPath = configurer.DefaultPath()
)

const minVerifications = 3
Expand Down Expand Up @@ -292,7 +297,7 @@ func (s *SQLiteStore) CloseHistoryDB(ctx context.Context) {

// OpenHistoryDB opens history DB
func OpenHistoryDB() (LocalStoreInterface, error) {
dbFile := filepath.Join(configurer.DefaultPath(), historyDBName)
dbFile := filepath.Join(DefaulthPath, historyDBName)
db, err := sqlx.Connect("sqlite3", dbFile)
if err != nil {
return nil, fmt.Errorf("cannot open sqlite database: %w", err)
Expand Down
5 changes: 2 additions & 3 deletions common/storage/ticketstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"path/filepath"

"github.com/jmoiron/sqlx"
"github.com/pastelnetwork/gonode/common/configurer"
"github.com/pastelnetwork/gonode/common/log"
)

Expand Down Expand Up @@ -108,8 +107,8 @@ func (s *TicketStore) CloseTicketDB(ctx context.Context) {
}

// OpenTicketingDb opens ticket DB
func OpenTicketingDb() (TicketStorageInterface, error) {
dbFile := filepath.Join(configurer.DefaultPath(), ticketDBName)
func OpenTicketingDb(defaultPath string) (TicketStorageInterface, error) {
dbFile := filepath.Join(defaultPath, ticketDBName)

db, err := sqlx.Connect("sqlite3", dbFile)
if err != nil {
Expand Down
84 changes: 50 additions & 34 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ var (
migrationExecutionTicker = 12 * time.Hour
migrationMetaDB = "data001-migration-meta.sqlite3"
accessUpdateBufferSize = 100000
commitInsertsInterval = 60 * time.Second
metaSyncBatchSize = 5000
commitInsertsInterval = 10 * time.Second
metaSyncBatchSize = 10000
lowSpaceThresholdGB = 50 // in GB
minKeysToMigrate = 100

Expand Down Expand Up @@ -314,6 +314,27 @@ func (d *MigrationMetaStore) startLastAccessedUpdateWorker(ctx context.Context)
}

func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) {
keysToUpdate := make(map[string]time.Time)
d.updates.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
log.WithContext(ctx).Error("Error converting key to string (commitLastAccessedUpdates)")
return false
}
v, ok := value.(time.Time)
if !ok {
log.WithContext(ctx).Error("Error converting value to time.Time (commitLastAccessedUpdates)")
return false
}
keysToUpdate[k] = v

return true // continue
})

if len(keysToUpdate) == 0 {
return
}

tx, err := d.db.BeginTxx(ctx, nil)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error starting transaction (commitLastAccessedUpdates)")
Expand All @@ -334,25 +355,12 @@ func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) {
}
defer stmt.Close()

keysToUpdate := make(map[string]time.Time)
d.updates.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
return false
}
v, ok := value.(time.Time)
if !ok {
return false
}
for k, v := range keysToUpdate {
_, err := stmt.Exec(k, v)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("key", key).Error("Error executing statement (commitLastAccessedUpdates)")
return true // continue
log.WithContext(ctx).WithError(err).WithField("key", k).Error("Error executing statement (commitLastAccessedUpdates)")
}
keysToUpdate[k] = v

return true // continue
})
}

if err := tx.Commit(); err != nil {
tx.Rollback()
Expand Down Expand Up @@ -396,6 +404,28 @@ func (d *MigrationMetaStore) startInsertWorker(ctx context.Context) {
}

func (d *MigrationMetaStore) commitInserts(ctx context.Context) {
keysToUpdate := make(map[string]UpdateMessage)
d.inserts.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
log.WithContext(ctx).Error("Error converting key to string (commitInserts)")
return false
}
v, ok := value.(UpdateMessage)
if !ok {
log.WithContext(ctx).Error("Error converting value to UpdateMessage (commitInserts)")
return false
}

keysToUpdate[k] = v

return true // continue
})

if len(keysToUpdate) == 0 {
return
}

tx, err := d.db.BeginTxx(ctx, nil)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error starting transaction (commitInserts)")
Expand All @@ -411,27 +441,13 @@ func (d *MigrationMetaStore) commitInserts(ctx context.Context) {
}
defer stmt.Close()

keysToUpdate := make(map[string]bool)
d.inserts.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
return false
}
v, ok := value.(UpdateMessage)
if !ok {
return false
}
// Default values for access_count and data_size can be configured here
for k, v := range keysToUpdate {
accessCount := 1
_, err := stmt.Exec(k, v.LastAccessTime, accessCount, v.Size)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("key", k).Error("Error executing statement (commitInserts)")
return true // continue
}
keysToUpdate[k] = true

return true // continue
})
}

if err := tx.Commit(); err != nil {
tx.Rollback() // Rollback transaction if commit fails
Expand Down
54 changes: 44 additions & 10 deletions walletnode/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ const (
)

var (
defaultPath = configurer.DefaultPath()

defaultTempDir = filepath.Join(os.TempDir(), appName)
defaultConfigFile = filepath.Join(defaultPath, appName+".yml")
defaultPastelConfigFile = filepath.Join(defaultPath, "pastel.conf")
defaultRqFilesDir = filepath.Join(defaultPath, rqFilesDir)
defaultStaticFilesDir = filepath.Join(defaultPath, staticFilesDir)
defaultCascadeFilesDir = filepath.Join(defaultPath, cascadeFiles)
defaultPath = configurer.DefaultPath()
defaultTempDir = ""
defaultConfigFile = ""
defaultPastelConfigFile = ""
defaultRqFilesDir = ""
defaultStaticFilesDir = ""
defaultCascadeFilesDir = ""
)

// NewApp configures our app by parsing command line flags, config files, and setting up logging and temporary directories
Expand All @@ -70,13 +69,15 @@ func NewApp() *cli.App {

app.AddFlags(
// Main
cli.NewFlag("data-dir", &defaultPath).SetUsage("Set `path` to the data directory, usually ~/.pastel in linux").SetValue(defaultPath).SetAliases("dd"),
cli.NewFlag("config-file", &configFile).SetUsage("Set `path` to the config file.").SetValue(defaultConfigFile).SetAliases("c"),
cli.NewFlag("pastel-config-file", &pastelConfigFile).SetUsage("Set `path` to the pastel config file.").SetValue(defaultPastelConfigFile),
cli.NewFlag("temp-dir", &config.TempDir).SetUsage("Set `path` for storing temp data.").SetValue(defaultTempDir),
cli.NewFlag("rq-files-dir", &config.RqFilesDir).SetUsage("Set `path` for storing files for rqservice.").SetValue(defaultRqFilesDir),
cli.NewFlag("log-level", &config.LogConfig.Level).SetUsage("Set the log `level`.").SetValue(config.LogConfig.Level),
cli.NewFlag("log-file", &config.LogConfig.File).SetUsage("The log `file` to write to."),
cli.NewFlag("quiet", &config.Quiet).SetUsage("Disallows log output to stdout.").SetAliases("q"),

// API
cli.NewFlag("swagger", &config.API.Swagger).SetUsage("Enable Swagger UI."),
)
Expand All @@ -85,12 +86,43 @@ func NewApp() *cli.App {
//Sets up configs and logging, and also returns the "app" function to main.go to be called (Run) there.
app.SetActionFunc(func(ctx context.Context, args []string) error {
//Sets logging prefix to pastel-app
defaultTempDir = filepath.Join(os.TempDir(), appName)
defaultConfigFile = filepath.Join(defaultPath, appName+".yml")
defaultPastelConfigFile = filepath.Join(defaultPath, "pastel.conf")
defaultRqFilesDir = filepath.Join(defaultPath, rqFilesDir)
defaultStaticFilesDir = filepath.Join(defaultPath, staticFilesDir)
defaultCascadeFilesDir = filepath.Join(defaultPath, cascadeFiles)

if config.TempDir == "" {
config.TempDir = defaultTempDir
}

if config.RqFilesDir == "" {
config.RqFilesDir = defaultRqFilesDir
}

if config.StaticFilesDir == "" {
config.StaticFilesDir = defaultStaticFilesDir
}

if config.CascadeFilesDir == "" {
config.CascadeFilesDir = defaultCascadeFilesDir
}

if configFile == "" {
configFile = defaultConfigFile
}

if pastelConfigFile == "" {
pastelConfigFile = defaultPastelConfigFile
}

ctx = log.ContextWithPrefix(ctx, "pastel-app")

//Parse config files supplied in arguments
if configFile != "" {
if err := configurer.ParseFile(configFile, config); err != nil {
return fmt.Errorf("error parsing walletnode config file: %v", err)
return fmt.Errorf("error parsing walletnode config file: %v - file: %s", err, configFile)
}
}

Expand Down Expand Up @@ -162,6 +194,8 @@ func runApp(ctx context.Context, config *configs.Config) error {
log.WithContext(ctx).Info("Interrupt signal received. Gracefully shutting down...")
})

queries.DefaulthPath = defaultPath

if _, err := os.Stat(defaultStaticFilesDir); os.IsNotExist(err) {
// directory does not exist, create it
errDir := os.MkdirAll(defaultStaticFilesDir, 0755)
Expand Down Expand Up @@ -211,7 +245,7 @@ func runApp(ctx context.Context, config *configs.Config) error {
}
defer hDB.CloseHistoryDB(ctx)

tDB, err := ticketstore.OpenTicketingDb()
tDB, err := ticketstore.OpenTicketingDb(defaultPath)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error connecting ticket db..")
}
Expand Down
6 changes: 3 additions & 3 deletions walletnode/services/cascaderegister/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (service *CascadeRegistrationService) ActivateActionTicketAndRegisterVolume
if err != nil {
return err
}
aar.BlockNum = regTicket.Height
aar.BlockNum = regTicket.ActionTicketData.CalledAt

actTxId, err := service.pastelHandler.PastelClient.ActivateActionTicket(ctx, aar)
if err != nil {
Expand Down Expand Up @@ -791,7 +791,7 @@ func (service *CascadeRegistrationService) RestoreFile(ctx context.Context, p *c

if v.RegTxid == "" {
volumesWithPendingRegistration++
logger.WithField("volume_name", v.FileID).Info("find a volume with no registration, trying again...")
logger.WithField("volume_name", v.FileID).Info("found a volume with no registration, trying again...")

var burnTxId string
if v.BurnTxnID != "" && service.IsBurnTxIDValidForRecovery(ctx, v.BurnTxnID, v.ReqAmount-10) {
Expand Down Expand Up @@ -830,7 +830,7 @@ func (service *CascadeRegistrationService) RestoreFile(ctx context.Context, p *c

volumesWithInProgressRegCount += 1
} else if v.ActivationTxid == "" {
logger.WithField("volume_name", v.FileID).Info("find a volume with no activation, trying again...")
logger.WithField("volume_name", v.FileID).Info("found a volume with no activation, trying again...")

// activation code
actAttemptId, err := service.InsertActivationAttempt(types.ActivationAttempt{
Expand Down
6 changes: 4 additions & 2 deletions walletnode/services/cascaderegister/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package cascaderegister

import (
"context"
"github.com/pastelnetwork/gonode/common/storage/ticketstore"
"image"
"image/png"
"os"
"path/filepath"
"testing"
"time"

"github.com/pastelnetwork/gonode/common/configurer"
"github.com/pastelnetwork/gonode/common/storage/ticketstore"

"github.com/pastelnetwork/gonode/walletnode/services/download"

rqnode "github.com/pastelnetwork/gonode/raptorq/node"
Expand Down Expand Up @@ -210,7 +212,7 @@ func TestTaskRun(t *testing.T) {
rqClientMock.ListenOnRaptorQ().ListenOnClose(nil)
rqClientMock.ListenOnConnect(testCase.args.connectErr)

ticketDB, err := ticketstore.OpenTicketingDb()
ticketDB, err := ticketstore.OpenTicketingDb(configurer.DefaultPath())
assert.NoError(t, err)

downloadService := download.NewNftDownloadService(download.NewConfig(), pastelClientMock, nodeClient, nil)
Expand Down
Loading