Skip to content

Commit

Permalink
add data-dir arg and fix commit inserts worker
Browse files Browse the repository at this point in the history
  • Loading branch information
mateeullahmalik committed Aug 16, 2024
1 parent d479a9b commit 3bd55b1
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 54 deletions.
9 changes: 7 additions & 2 deletions common/storage/queries/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ package queries
import (
"context"
"fmt"
"path/filepath"

"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3" //go-sqlite3
"github.com/pastelnetwork/gonode/common/configurer"
"github.com/pastelnetwork/gonode/common/log"
"path/filepath"
)

var (
DefaulthPath = configurer.DefaultPath()
)

const minVerifications = 3
Expand Down Expand Up @@ -292,7 +297,7 @@ func (s *SQLiteStore) CloseHistoryDB(ctx context.Context) {

// OpenHistoryDB opens history DB
func OpenHistoryDB() (LocalStoreInterface, error) {
dbFile := filepath.Join(configurer.DefaultPath(), historyDBName)
dbFile := filepath.Join(DefaulthPath, historyDBName)
db, err := sqlx.Connect("sqlite3", dbFile)
if err != nil {
return nil, fmt.Errorf("cannot open sqlite database: %w", err)
Expand Down
5 changes: 2 additions & 3 deletions common/storage/ticketstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"path/filepath"

"github.com/jmoiron/sqlx"
"github.com/pastelnetwork/gonode/common/configurer"
"github.com/pastelnetwork/gonode/common/log"
)

Expand Down Expand Up @@ -108,8 +107,8 @@ func (s *TicketStore) CloseTicketDB(ctx context.Context) {
}

// OpenTicketingDb opens ticket DB
func OpenTicketingDb() (TicketStorageInterface, error) {
dbFile := filepath.Join(configurer.DefaultPath(), ticketDBName)
func OpenTicketingDb(defaultPath string) (TicketStorageInterface, error) {
dbFile := filepath.Join(defaultPath, ticketDBName)

db, err := sqlx.Connect("sqlite3", dbFile)
if err != nil {
Expand Down
84 changes: 50 additions & 34 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ var (
migrationExecutionTicker = 12 * time.Hour
migrationMetaDB = "data001-migration-meta.sqlite3"
accessUpdateBufferSize = 100000
commitInsertsInterval = 60 * time.Second
metaSyncBatchSize = 5000
commitInsertsInterval = 10 * time.Second
metaSyncBatchSize = 10000
lowSpaceThresholdGB = 50 // in GB
minKeysToMigrate = 100

Expand Down Expand Up @@ -314,6 +314,27 @@ func (d *MigrationMetaStore) startLastAccessedUpdateWorker(ctx context.Context)
}

func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) {
keysToUpdate := make(map[string]time.Time)
d.updates.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
log.WithContext(ctx).Error("Error converting key to string (commitLastAccessedUpdates)")
return false
}
v, ok := value.(time.Time)
if !ok {
log.WithContext(ctx).Error("Error converting value to time.Time (commitLastAccessedUpdates)")
return false
}
keysToUpdate[k] = v

return true // continue
})

if len(keysToUpdate) == 0 {
return
}

tx, err := d.db.BeginTxx(ctx, nil)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error starting transaction (commitLastAccessedUpdates)")
Expand All @@ -334,25 +355,12 @@ func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) {
}
defer stmt.Close()

keysToUpdate := make(map[string]time.Time)
d.updates.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
return false
}
v, ok := value.(time.Time)
if !ok {
return false
}
for k, v := range keysToUpdate {
_, err := stmt.Exec(k, v)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("key", key).Error("Error executing statement (commitLastAccessedUpdates)")
return true // continue
log.WithContext(ctx).WithError(err).WithField("key", k).Error("Error executing statement (commitLastAccessedUpdates)")
}
keysToUpdate[k] = v

return true // continue
})
}

if err := tx.Commit(); err != nil {
tx.Rollback()
Expand Down Expand Up @@ -396,6 +404,28 @@ func (d *MigrationMetaStore) startInsertWorker(ctx context.Context) {
}

func (d *MigrationMetaStore) commitInserts(ctx context.Context) {
keysToUpdate := make(map[string]UpdateMessage)
d.inserts.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
log.WithContext(ctx).Error("Error converting key to string (commitInserts)")
return false
}
v, ok := value.(UpdateMessage)
if !ok {
log.WithContext(ctx).Error("Error converting value to UpdateMessage (commitInserts)")
return false
}

keysToUpdate[k] = v

return true // continue
})

if len(keysToUpdate) == 0 {
return
}

tx, err := d.db.BeginTxx(ctx, nil)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error starting transaction (commitInserts)")
Expand All @@ -411,27 +441,13 @@ func (d *MigrationMetaStore) commitInserts(ctx context.Context) {
}
defer stmt.Close()

keysToUpdate := make(map[string]bool)
d.inserts.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
return false
}
v, ok := value.(UpdateMessage)
if !ok {
return false
}
// Default values for access_count and data_size can be configured here
for k, v := range keysToUpdate {
accessCount := 1
_, err := stmt.Exec(k, v.LastAccessTime, accessCount, v.Size)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("key", k).Error("Error executing statement (commitInserts)")
return true // continue
}
keysToUpdate[k] = true

return true // continue
})
}

if err := tx.Commit(); err != nil {
tx.Rollback() // Rollback transaction if commit fails
Expand Down
54 changes: 44 additions & 10 deletions walletnode/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ const (
)

var (
defaultPath = configurer.DefaultPath()

defaultTempDir = filepath.Join(os.TempDir(), appName)
defaultConfigFile = filepath.Join(defaultPath, appName+".yml")
defaultPastelConfigFile = filepath.Join(defaultPath, "pastel.conf")
defaultRqFilesDir = filepath.Join(defaultPath, rqFilesDir)
defaultStaticFilesDir = filepath.Join(defaultPath, staticFilesDir)
defaultCascadeFilesDir = filepath.Join(defaultPath, cascadeFiles)
defaultPath = configurer.DefaultPath()
defaultTempDir = ""
defaultConfigFile = ""
defaultPastelConfigFile = ""
defaultRqFilesDir = ""
defaultStaticFilesDir = ""
defaultCascadeFilesDir = ""
)

// NewApp configures our app by parsing command line flags, config files, and setting up logging and temporary directories
Expand All @@ -70,13 +69,15 @@ func NewApp() *cli.App {

app.AddFlags(
// Main
cli.NewFlag("data-dir", &defaultPath).SetUsage("Set `path` to the data directory, usually ~/.pastel in linux").SetValue(defaultPath).SetAliases("dd"),
cli.NewFlag("config-file", &configFile).SetUsage("Set `path` to the config file.").SetValue(defaultConfigFile).SetAliases("c"),
cli.NewFlag("pastel-config-file", &pastelConfigFile).SetUsage("Set `path` to the pastel config file.").SetValue(defaultPastelConfigFile),
cli.NewFlag("temp-dir", &config.TempDir).SetUsage("Set `path` for storing temp data.").SetValue(defaultTempDir),
cli.NewFlag("rq-files-dir", &config.RqFilesDir).SetUsage("Set `path` for storing files for rqservice.").SetValue(defaultRqFilesDir),
cli.NewFlag("log-level", &config.LogConfig.Level).SetUsage("Set the log `level`.").SetValue(config.LogConfig.Level),
cli.NewFlag("log-file", &config.LogConfig.File).SetUsage("The log `file` to write to."),
cli.NewFlag("quiet", &config.Quiet).SetUsage("Disallows log output to stdout.").SetAliases("q"),

// API
cli.NewFlag("swagger", &config.API.Swagger).SetUsage("Enable Swagger UI."),
)
Expand All @@ -85,12 +86,43 @@ func NewApp() *cli.App {
//Sets up configs and logging, and also returns the "app" function to main.go to be called (Run) there.
app.SetActionFunc(func(ctx context.Context, args []string) error {
//Sets logging prefix to pastel-app
defaultTempDir = filepath.Join(os.TempDir(), appName)
defaultConfigFile = filepath.Join(defaultPath, appName+".yml")
defaultPastelConfigFile = filepath.Join(defaultPath, "pastel.conf")
defaultRqFilesDir = filepath.Join(defaultPath, rqFilesDir)
defaultStaticFilesDir = filepath.Join(defaultPath, staticFilesDir)
defaultCascadeFilesDir = filepath.Join(defaultPath, cascadeFiles)

if config.TempDir == "" {
config.TempDir = defaultTempDir
}

if config.RqFilesDir == "" {
config.RqFilesDir = defaultRqFilesDir
}

if config.StaticFilesDir == "" {
config.StaticFilesDir = defaultStaticFilesDir
}

if config.CascadeFilesDir == "" {
config.CascadeFilesDir = defaultCascadeFilesDir
}

if configFile == "" {
configFile = defaultConfigFile
}

if pastelConfigFile == "" {
pastelConfigFile = defaultPastelConfigFile
}

ctx = log.ContextWithPrefix(ctx, "pastel-app")

//Parse config files supplied in arguments
if configFile != "" {
if err := configurer.ParseFile(configFile, config); err != nil {
return fmt.Errorf("error parsing walletnode config file: %v", err)
return fmt.Errorf("error parsing walletnode config file: %v - file: %s", err, configFile)
}
}

Expand Down Expand Up @@ -162,6 +194,8 @@ func runApp(ctx context.Context, config *configs.Config) error {
log.WithContext(ctx).Info("Interrupt signal received. Gracefully shutting down...")
})

queries.DefaulthPath = defaultPath

if _, err := os.Stat(defaultStaticFilesDir); os.IsNotExist(err) {
// directory does not exist, create it
errDir := os.MkdirAll(defaultStaticFilesDir, 0755)
Expand Down Expand Up @@ -211,7 +245,7 @@ func runApp(ctx context.Context, config *configs.Config) error {
}
defer hDB.CloseHistoryDB(ctx)

tDB, err := ticketstore.OpenTicketingDb()
tDB, err := ticketstore.OpenTicketingDb(defaultPath)
if err != nil {
log.WithContext(ctx).WithError(err).Error("error connecting ticket db..")
}
Expand Down
6 changes: 3 additions & 3 deletions walletnode/services/cascaderegister/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ func (service *CascadeRegistrationService) ActivateActionTicketAndRegisterVolume
if err != nil {
return err
}
aar.BlockNum = regTicket.Height
aar.BlockNum = regTicket.ActionTicketData.CalledAt

actTxId, err := service.pastelHandler.PastelClient.ActivateActionTicket(ctx, aar)
if err != nil {
Expand Down Expand Up @@ -791,7 +791,7 @@ func (service *CascadeRegistrationService) RestoreFile(ctx context.Context, p *c

if v.RegTxid == "" {
volumesWithPendingRegistration++
logger.WithField("volume_name", v.FileID).Info("find a volume with no registration, trying again...")
logger.WithField("volume_name", v.FileID).Info("found a volume with no registration, trying again...")

var burnTxId string
if v.BurnTxnID != "" && service.IsBurnTxIDValidForRecovery(ctx, v.BurnTxnID, v.ReqAmount-10) {
Expand Down Expand Up @@ -830,7 +830,7 @@ func (service *CascadeRegistrationService) RestoreFile(ctx context.Context, p *c

volumesWithInProgressRegCount += 1
} else if v.ActivationTxid == "" {
logger.WithField("volume_name", v.FileID).Info("find a volume with no activation, trying again...")
logger.WithField("volume_name", v.FileID).Info("found a volume with no activation, trying again...")

// activation code
actAttemptId, err := service.InsertActivationAttempt(types.ActivationAttempt{
Expand Down
6 changes: 4 additions & 2 deletions walletnode/services/cascaderegister/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package cascaderegister

import (
"context"
"github.com/pastelnetwork/gonode/common/storage/ticketstore"
"image"
"image/png"
"os"
"path/filepath"
"testing"
"time"

"github.com/pastelnetwork/gonode/common/configurer"
"github.com/pastelnetwork/gonode/common/storage/ticketstore"

"github.com/pastelnetwork/gonode/walletnode/services/download"

rqnode "github.com/pastelnetwork/gonode/raptorq/node"
Expand Down Expand Up @@ -210,7 +212,7 @@ func TestTaskRun(t *testing.T) {
rqClientMock.ListenOnRaptorQ().ListenOnClose(nil)
rqClientMock.ListenOnConnect(testCase.args.connectErr)

ticketDB, err := ticketstore.OpenTicketingDb()
ticketDB, err := ticketstore.OpenTicketingDb(configurer.DefaultPath())
assert.NoError(t, err)

downloadService := download.NewNftDownloadService(download.NewConfig(), pastelClientMock, nodeClient, nil)
Expand Down

0 comments on commit 3bd55b1

Please sign in to comment.