Skip to content

Commit 822cd29

Browse files
committed
[PSL-1254] revert channel mechanism instead use batching for meta sync
1 parent 5dfbeec commit 822cd29

File tree

3 files changed

+61
-36
lines changed

3 files changed

+61
-36
lines changed

p2p/kademlia/store/sqlite/meta_worker.go

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ var (
2424
migrationMetaDB = "data001-migration-meta.sqlite3"
2525
accessUpdateBufferSize = 100000
2626
commitInsertsInterval = 60 * time.Second
27-
metaSyncBatchSize = 10000
27+
metaSyncBatchSize = 5000
2828
lowSpaceThresholdGB = 50 // in GB
2929
minKeysToMigrate = 100
3030

@@ -39,7 +39,7 @@ func init() {
3939

4040
type UpdateMessages []UpdateMessage
4141

42-
// AccessUpdate holds the key and the last accessed time.
42+
// UpdateMessage holds the key and the last accessed time.
4343
type UpdateMessage struct {
4444
Key string
4545
LastAccessTime time.Time
@@ -104,12 +104,14 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string, cloud cloud.Stor
104104
log.P2P().WithContext(ctx).Errorf("cannot create migration table in sqlite database: %s", err.Error())
105105
}
106106

107-
if handler.isMetaSyncRequired() {
108-
err := handler.syncMetaWithData(ctx)
109-
if err != nil {
110-
log.WithContext(ctx).WithError(err).Error("error syncing meta with p2p data")
107+
go func() {
108+
if handler.isMetaSyncRequired() {
109+
err := handler.syncMetaWithData(ctx)
110+
if err != nil {
111+
log.WithContext(ctx).WithError(err).Error("error syncing meta with p2p data")
112+
}
111113
}
112-
}
114+
}()
113115

114116
go handler.startLastAccessedUpdateWorker(ctx)
115117
go handler.startInsertWorker(ctx)
@@ -184,21 +186,36 @@ func (d *MigrationMetaStore) isMetaSyncRequired() bool {
184186
}
185187

186188
func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
187-
query := `SELECT key, data, updatedAt FROM data LIMIT ? OFFSET ?`
188189
var offset int
189190

191+
query := `SELECT key, data, updatedAt FROM data LIMIT ? OFFSET ?`
192+
insertQuery := `
193+
INSERT INTO meta (key, last_accessed, access_count, data_size)
194+
VALUES (?, ?, 1, ?)
195+
ON CONFLICT(key) DO
196+
UPDATE SET
197+
last_accessed = EXCLUDED.last_accessed,
198+
data_size = EXCLUDED.data_size,
199+
access_count = access_count + 1`
200+
190201
for {
191202
rows, err := d.p2pDataStore.Queryx(query, metaSyncBatchSize, offset)
192203
if err != nil {
193204
log.WithContext(ctx).WithError(err).Error("Error querying p2p data store")
194205
return err
195206
}
196207

197-
log.WithContext(ctx).WithField("offset", offset).Info("Syncing meta with p2p data store")
198-
var batchUpdates []UpdateMessage
199-
found := false
208+
var batchProcessed bool
209+
210+
tx, err := d.db.Beginx()
211+
if err != nil {
212+
rows.Close()
213+
log.WithContext(ctx).WithError(err).Error("Failed to start transaction")
214+
return err
215+
}
216+
200217
for rows.Next() {
201-
found = true
218+
batchProcessed = true
202219
var r Record
203220
var t *time.Time
204221

@@ -210,23 +227,35 @@ func (d *MigrationMetaStore) syncMetaWithData(ctx context.Context) error {
210227
r.UpdatedAt = *t
211228
}
212229

213-
dataSize := len(r.Data)
214-
batchUpdates = append(batchUpdates, UpdateMessage{
215-
Key: r.Key,
216-
LastAccessTime: r.UpdatedAt,
217-
Size: dataSize,
218-
})
230+
if _, err := tx.Exec(insertQuery, r.Key, r.UpdatedAt, len(r.Data)); err != nil {
231+
tx.Rollback()
232+
rows.Close()
233+
log.WithContext(ctx).WithError(err).Error("Failed to execute batch insert")
234+
return err
235+
}
236+
}
237+
238+
if err := rows.Err(); err != nil {
239+
tx.Rollback()
240+
rows.Close()
241+
log.WithContext(ctx).WithError(err).Error("Error iterating rows")
242+
return err
219243
}
220-
rows.Close()
221244

222-
if !found {
245+
if batchProcessed {
246+
if err := tx.Commit(); err != nil {
247+
rows.Close()
248+
log.WithContext(ctx).WithError(err).Error("Failed to commit transaction")
249+
return err
250+
}
251+
} else {
252+
tx.Rollback()
253+
rows.Close()
223254
break
224255
}
225256

226-
// Send batch for insertion using the existing channel-based mechanism.
227-
PostKeysInsert(batchUpdates)
228-
229-
offset += len(batchUpdates) // Move the offset forward by the number of items processed.
257+
rows.Close()
258+
offset += metaSyncBatchSize
230259
}
231260

232261
return nil

p2p/p2p.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -262,12 +262,7 @@ func (s *p2p) configure(ctx context.Context) error {
262262
}
263263

264264
// New returns a new p2p instance.
265-
func New(ctx context.Context, config *Config, pastelClient pastel.Client, secInfo *alts.SecInfo, rqstore rqstore.Store, cloud cloud.Storage) (P2P, error) {
266-
mst, err := sqlite.NewMigrationMetaStore(ctx, config.DataDir, cloud)
267-
if err != nil {
268-
return nil, fmt.Errorf("cannot create meta store: %w", err)
269-
}
270-
265+
func New(ctx context.Context, config *Config, pastelClient pastel.Client, secInfo *alts.SecInfo, rqstore rqstore.Store, cloud cloud.Storage, mst *sqlite.MigrationMetaStore) (P2P, error) {
271266
store, err := sqlite.NewStore(ctx, config.DataDir, cloud, mst)
272267
if err != nil {
273268
return nil, errors.Errorf("new kademlia store: %w", err)

supernode/cmd/app.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -229,24 +229,25 @@ func runApp(ctx context.Context, config *configs.Config) error {
229229
config.P2P.ID = config.PastelID
230230

231231
var cloudStorage *cloud.RcloneStorage
232+
var metaMigratorStore *sqlite.MigrationMetaStore
232233
if config.RcloneStorageConfig != nil {
233234
cloudStorage = cloud.NewRcloneStorage(config.RcloneStorageConfig.BucketName, config.RcloneStorageConfig.SpecName)
234235
if config.RcloneStorageConfig.BucketName != "" && config.RcloneStorageConfig.SpecName != "" {
235236
if err := cloudStorage.CheckCloudConnection(); err != nil {
236237
log.WithContext(ctx).WithError(err).Fatal("error establishing connection with the cloud")
237238
return fmt.Errorf("rclone connection check failed: %w", err)
238239
}
240+
241+
metaMigratorStore, err = sqlite.NewMigrationMetaStore(ctx, config.P2P.DataDir, cloudStorage)
242+
if err != nil {
243+
return errors.Errorf("could not create p2p service, %w", err)
244+
}
239245
}
240246
} else {
241247
log.WithContext(ctx).Info("cloud backup unavailable")
242248
}
243249

244-
p2p, err := p2p.New(ctx, config.P2P, pastelClient, secInfo, rqstore, cloudStorage)
245-
if err != nil {
246-
return errors.Errorf("could not create p2p service, %w", err)
247-
}
248-
249-
metaMigratorStore, err := sqlite.NewMigrationMetaStore(ctx, config.P2P.DataDir, cloudStorage)
250+
p2p, err := p2p.New(ctx, config.P2P, pastelClient, secInfo, rqstore, cloudStorage, metaMigratorStore)
250251
if err != nil {
251252
return errors.Errorf("could not create p2p service, %w", err)
252253
}

0 commit comments

Comments
 (0)