Skip to content

Commit

Permalink
Merge pull request #912 from pastelnetwork/PSL-1240
Browse files Browse the repository at this point in the history
[PSL-1240] pool & insert keys in metadata database, pool and update l…
  • Loading branch information
j-rafique authored Jul 31, 2024
2 parents 82ba0eb + eba2a52 commit 7369972
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 23 deletions.
3 changes: 0 additions & 3 deletions p2p/kademlia/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ type Store interface {
// Retrieve the queries key/value from store
Retrieve(ctx context.Context, key []byte) ([]byte, error)

// RetrieveWithType gets data with type
RetrieveWithType(_ context.Context, key []byte) ([]byte, int, error)

// Delete a key/value pair from the store
Delete(ctx context.Context, key []byte)

Expand Down
5 changes: 0 additions & 5 deletions p2p/kademlia/store/mem/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@ func (s *Store) StoreBatch(_ context.Context, values [][]byte, _ int, _ bool) er
return nil
}

// RetrieveWithType will return the queries key/value if it exists
func (s *Store) RetrieveWithType(_ context.Context, _ []byte) ([]byte, int, error) {
return []byte{}, 0, nil
}

// NewStore returns a new memory store
func NewStore() *Store {
return &Store{
Expand Down
238 changes: 238 additions & 0 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package sqlite

import (
"context"
"fmt"

"os"
"path"
"sync"
"time"

"github.com/jmoiron/sqlx"
_ "github.com/mattn/go-sqlite3"
"github.com/pastelnetwork/gonode/common/log"
)

var (
commitLastAccessedInterval = 60 * time.Second
migrationMetaDB = "data001-migration-meta.sqlite3"
accessUpdateBufferSize = 100000
commitInsertsInterval = 90 * time.Second
updateChannel chan UpdateMessage
insertChannel chan UpdateMessage
)

func init() {
updateChannel = make(chan UpdateMessage, accessUpdateBufferSize)
insertChannel = make(chan UpdateMessage, accessUpdateBufferSize)
}

type UpdateMessages []UpdateMessage

// AccessUpdate holds the key and the last accessed time.
type UpdateMessage struct {
Key string
LastAccessTime time.Time
Size int
}

// MigrationMetaStore manages database operations.
type MigrationMetaStore struct {
db *sqlx.DB

updateTicker *time.Ticker
insertTicker *time.Ticker

updates sync.Map
inserts sync.Map
}

// NewMigrationMetaStore initializes the MigrationMetaStore.
func NewMigrationMetaStore(ctx context.Context, dataDir string) (*MigrationMetaStore, error) {
if _, err := os.Stat(dataDir); os.IsNotExist(err) {
if err := os.MkdirAll(dataDir, 0750); err != nil {
return nil, fmt.Errorf("mkdir %q: %w", dataDir, err)
}
} else if err != nil {
return nil, fmt.Errorf("cannot create data folder: %w", err)
}

dbFile := path.Join(dataDir, migrationMetaDB)
db, err := sqlx.Connect("sqlite3", dbFile)
if err != nil {
return nil, fmt.Errorf("cannot open sqlite database: %w", err)
}
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)

handler := &MigrationMetaStore{
db: db,
updateTicker: time.NewTicker(commitLastAccessedInterval),
insertTicker: time.NewTicker(commitInsertsInterval),
}
go handler.startLastAccessedUpdateWorker(ctx)
go handler.startInsertWorker(ctx)

return handler, nil
}

// PostAccessUpdate sends access updates to be handled by the worker.
func PostAccessUpdate(updates []string) {
for _, update := range updates {
select {
case updateChannel <- UpdateMessage{
Key: update,
LastAccessTime: time.Now(),
}:
// Inserted
default:
log.WithContext(context.Background()).Error("updateChannel is full, dropping update")
}

}
}

// startWorker listens for updates and commits them periodically.
func (d *MigrationMetaStore) startLastAccessedUpdateWorker(ctx context.Context) {
for {
select {
case update := <-updateChannel:
d.updates.Store(update.Key, update.LastAccessTime)

case <-d.updateTicker.C:
d.commitLastAccessedUpdates(ctx)
case <-ctx.Done():
log.WithContext(ctx).Info("Shutting down last accessed update worker")
d.commitLastAccessedUpdates(ctx) // Commit any remaining updates before shutdown
return
}
}
}

func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) {
tx, err := d.db.BeginTxx(ctx, nil)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error starting transaction (commitLastAccessedUpdates)")
return
}

stmt, err := tx.Prepare("UPDATE meta SET last_access_time = ? WHERE key = ?")
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error preparing statement (commitLastAccessedUpdates)")
return
}
defer stmt.Close()

keysToUpdate := make(map[string]time.Time)
d.updates.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
return false
}
v, ok := value.(time.Time)
if !ok {
return false
}
_, err := stmt.Exec(v, k)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("key", key).Error("Error executing statement (commitLastAccessedUpdates)")
return true // continue
}
keysToUpdate[k] = v

return true // continue
})

if err := tx.Commit(); err != nil {
tx.Rollback()
log.WithContext(ctx).WithError(err).Error("Error committing transaction (commitLastAccessedUpdates)")
return
}

// Clear updated keys from map after successful commit
for k := range keysToUpdate {
d.updates.Delete(k)
}

log.WithContext(ctx).WithField("count", len(keysToUpdate)).Info("Committed last accessed updates")
}

func PostKeysInsert(updates []UpdateMessage) {
for _, update := range updates {
select {
case insertChannel <- update:
// Inserted
default:
log.WithContext(context.Background()).Error("insertChannel is full, dropping update")
}
}
}

// startInsertWorker listens for updates and commits them periodically.
func (d *MigrationMetaStore) startInsertWorker(ctx context.Context) {
for {
select {
case update := <-insertChannel:
d.inserts.Store(update.Key, update)
case <-d.insertTicker.C:
d.commitInserts(ctx)
case <-ctx.Done():
log.WithContext(ctx).Info("Shutting down insert meta keys worker")
d.commitInserts(ctx)
return
}
}
}

func (d *MigrationMetaStore) commitInserts(ctx context.Context) {
tx, err := d.db.BeginTxx(ctx, nil)
if err != nil {
log.WithContext(ctx).WithError(err).Error("Error starting transaction (commitInserts)")
return
}

// Prepare an INSERT OR REPLACE statement that handles new insertions or updates existing entries
stmt, err := tx.Preparex("INSERT OR REPLACE INTO meta (key, last_accessed, access_count, data_size) VALUES (?, ?, ?, ?)")
if err != nil {
tx.Rollback() // Ensure to rollback in case of an error
log.WithContext(ctx).WithError(err).Error("Error preparing statement (commitInserts)")
return
}
defer stmt.Close()

keysToUpdate := make(map[string]bool)
d.inserts.Range(func(key, value interface{}) bool {
k, ok := key.(string)
if !ok {
return false
}
v, ok := value.(UpdateMessage)
if !ok {
return false
}
// Default values for access_count and data_size can be configured here
accessCount := 1
_, err := stmt.Exec(k, v.LastAccessTime, accessCount, v.Size)
if err != nil {
log.WithContext(ctx).WithError(err).WithField("key", k).Error("Error executing statement (commitInserts)")
return true // continue
}
keysToUpdate[k] = true

return true // continue
})

if err := tx.Commit(); err != nil {
tx.Rollback() // Rollback transaction if commit fails
log.WithContext(ctx).WithError(err).Error("Error committing transaction (commitInserts)")
return
}

// Clear updated keys from map after successful commit
for k := range keysToUpdate {
d.inserts.Delete(k)
}

log.WithContext(ctx).WithField("count", len(keysToUpdate)).Info("Committed inserts")
}
3 changes: 3 additions & 0 deletions p2p/kademlia/store/sqlite/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func (s *Store) RetrieveBatchValues(ctx context.Context, keys []string) ([][]byt

values := make([][]byte, len(keys))
keysFound := 0

for rows.Next() {
var key string
var value []byte
Expand All @@ -453,6 +454,8 @@ func (s *Store) RetrieveBatchValues(ctx context.Context, keys []string) ([][]byt
if idx, found := keyToIndex[key]; found {
values[idx] = value
keysFound++

PostAccessUpdate([]string{key})
}
}

Expand Down
24 changes: 9 additions & 15 deletions p2p/kademlia/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,20 +382,9 @@ func (s *Store) Retrieve(_ context.Context, key []byte) ([]byte, error) {
return nil, fmt.Errorf("failed to get record by key %s: %w", hkey, err)
}

return r.Data, nil
}

// RetrieveWithType will return the queries key/value if it exists
func (s *Store) RetrieveWithType(_ context.Context, key []byte) ([]byte, int, error) {
hkey := hex.EncodeToString(key)
PostAccessUpdate([]string{hkey})

r := Record{}
err := s.db.Get(&r, `SELECT data,datatype FROM data WHERE key = ?`, hkey)
if err != nil {
return nil, 0, fmt.Errorf("failed to get record with type by key %s: %w", hkey, err)
}

return r.Data, r.Datatype, nil
return r.Data, nil
}

// Checkpoint method for the store
Expand Down Expand Up @@ -446,9 +435,8 @@ func (s *Store) performJob(j Job) error {
// storeRecord will store a key/value pair for the queries node
func (s *Store) storeRecord(key []byte, value []byte, typ int, isOriginal bool) error {

hkey := hex.EncodeToString(key)
operation := func() error {
hkey := hex.EncodeToString(key)

now := time.Now().UTC()
r := Record{Key: hkey, Data: value, UpdatedAt: now, Datatype: typ, Isoriginal: isOriginal, CreatedAt: now}
res, err := s.db.NamedExec(`INSERT INTO data(key, data, datatype, is_original, createdAt, updatedAt) values(:key, :data, :datatype, :isoriginal, :createdat, :updatedat) ON CONFLICT(key) DO UPDATE SET data=:data,updatedAt=:updatedat`, r)
Expand All @@ -472,12 +460,15 @@ func (s *Store) storeRecord(key []byte, value []byte, typ int, isOriginal bool)
if err != nil {
return fmt.Errorf("error storing data: %w", err)
}
PostKeysInsert([]UpdateMessage{{Key: hkey, LastAccessTime: time.Now(), Size: len(value)}})

return nil
}

// storeBatchRecord will store a batch of values with their SHA256 hash as the key
func (s *Store) storeBatchRecord(values [][]byte, typ int, isOriginal bool) error {
hkeys := make([]UpdateMessage, len(values))

operation := func() error {
tx, err := s.db.Beginx()
if err != nil {
Expand Down Expand Up @@ -505,6 +496,7 @@ func (s *Store) storeBatchRecord(values [][]byte, typ int, isOriginal bool) erro
}

hkey := hex.EncodeToString(hashed)
hkeys[i] = UpdateMessage{Key: hkey, LastAccessTime: now, Size: len(values[i])}
r := Record{Key: hkey, Data: values[i], CreatedAt: now, UpdatedAt: now, Datatype: typ, Isoriginal: isOriginal}

// Execute the insert statement
Expand Down Expand Up @@ -532,6 +524,8 @@ func (s *Store) storeBatchRecord(values [][]byte, typ int, isOriginal bool) erro
return fmt.Errorf("error storing data: %w", err)
}

PostKeysInsert(hkeys)

return nil
}

Expand Down

0 comments on commit 7369972

Please sign in to comment.