Skip to content
This repository was archived by the owner on May 15, 2024. It is now read-only.

Commit ecc1ff6

Browse files
authored
Fix race condition resulting from concurrent WaitGroup.Add. (#149)
Generally a WaitGroup should be added to before the goroutine starts. Not doing so can result in a situation where Wait is called before a starting goroutine adds. This is what happened here, causing a race condition. This PR also makes the reveiver variable consistent.
2 parents f870783 + 75ab079 commit ecc1ff6

File tree

1 file changed

+78
-75
lines changed

1 file changed

+78
-75
lines changed

integration/singularity/store.go

Lines changed: 78 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,25 @@ func NewStore(o ...Option) (*SingularityStore, error) {
5656
}, nil
5757
}
5858

59-
func (l *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPreparation, error) {
60-
createSourceStorageRes, err := l.singularityClient.Storage.CreateLocalStorage(&storage.CreateLocalStorageParams{
59+
func (s *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPreparation, error) {
60+
createSourceStorageRes, err := s.singularityClient.Storage.CreateLocalStorage(&storage.CreateLocalStorageParams{
6161
Context: ctx,
6262
Request: &models.StorageCreateLocalStorageRequest{
63-
Name: l.sourceName,
64-
Path: l.local.Dir(),
63+
Name: s.sourceName,
64+
Path: s.local.Dir(),
6565
},
6666
})
6767
if err != nil {
6868
return nil, fmt.Errorf("failed to create source storage: %w", err)
6969
}
7070
logger.Infow("Created source storage", "id", createSourceStorageRes.Payload.ID)
7171

72-
createPreparationRes, err := l.singularityClient.Preparation.CreatePreparation(&preparation.CreatePreparationParams{
72+
createPreparationRes, err := s.singularityClient.Preparation.CreatePreparation(&preparation.CreatePreparationParams{
7373
Context: ctx,
7474
Request: &models.DataprepCreateRequest{
75-
MaxSize: &l.maxCarSize,
76-
Name: l.preparationName,
77-
SourceStorages: []string{l.sourceName},
75+
MaxSize: &s.maxCarSize,
76+
Name: s.preparationName,
77+
SourceStorages: []string{s.sourceName},
7878
},
7979
})
8080
if err != nil {
@@ -85,12 +85,12 @@ func (l *SingularityStore) initPreparation(ctx context.Context) (*models.ModelPr
8585
return createPreparationRes.Payload, nil
8686
}
8787

88-
func (l *SingularityStore) Start(ctx context.Context) error {
89-
logger := logger.With("preparation", l.preparationName)
88+
func (s *SingularityStore) Start(ctx context.Context) error {
89+
logger := logger.With("preparation", s.preparationName)
9090

9191
// List out preparations and see if one with the configured name exists
9292

93-
listPreparationsRes, err := l.singularityClient.Preparation.ListPreparations(&preparation.ListPreparationsParams{
93+
listPreparationsRes, err := s.singularityClient.Preparation.ListPreparations(&preparation.ListPreparationsParams{
9494
Context: ctx,
9595
})
9696
if err != nil {
@@ -100,22 +100,22 @@ func (l *SingularityStore) Start(ctx context.Context) error {
100100

101101
var preparation *models.ModelPreparation
102102
for _, preparationCmp := range listPreparationsRes.Payload {
103-
if preparationCmp.Name == l.preparationName {
103+
if preparationCmp.Name == s.preparationName {
104104
preparation = preparationCmp
105105
break
106106
}
107107
}
108108
if preparation == nil {
109109
// If no preparation was found, initialize it
110-
_, err = l.initPreparation(ctx)
110+
_, err = s.initPreparation(ctx)
111111
if err != nil {
112112
logger.Errorw("First-time preparation initialization failed", "err", err)
113113
return fmt.Errorf("first-time preparation initialization failed: %w", err)
114114
}
115115
}
116116

117117
// Ensure default wallet is imported to singularity
118-
listWalletsRes, err := l.singularityClient.Wallet.ListWallets(&wallet.ListWalletsParams{
118+
listWalletsRes, err := s.singularityClient.Wallet.ListWallets(&wallet.ListWalletsParams{
119119
Context: ctx,
120120
})
121121
if err != nil {
@@ -124,18 +124,18 @@ func (l *SingularityStore) Start(ctx context.Context) error {
124124
}
125125
var wlt *models.ModelWallet
126126
for _, existing := range listWalletsRes.Payload {
127-
if existing.PrivateKey == l.walletKey {
127+
if existing.PrivateKey == s.walletKey {
128128
wlt = existing
129129
logger.Infow("Wallet found on singularity", "id", existing.ID)
130130
break
131131
}
132132
}
133133
if wlt == nil {
134134
logger.Info("Wallet is not found on singularity. Importing...")
135-
importWalletRes, err := l.singularityClient.Wallet.ImportWallet(&wallet.ImportWalletParams{
135+
importWalletRes, err := s.singularityClient.Wallet.ImportWallet(&wallet.ImportWalletParams{
136136
Context: ctx,
137137
Request: &models.WalletImportRequest{
138-
PrivateKey: l.walletKey,
138+
PrivateKey: s.walletKey,
139139
},
140140
})
141141
if err != nil {
@@ -147,9 +147,9 @@ func (l *SingularityStore) Start(ctx context.Context) error {
147147
}
148148

149149
// Ensure wallet is assigned to preparation
150-
listAttachedWalletsRes, err := l.singularityClient.WalletAssociation.ListAttachedWallets(&wallet_association.ListAttachedWalletsParams{
150+
listAttachedWalletsRes, err := s.singularityClient.WalletAssociation.ListAttachedWallets(&wallet_association.ListAttachedWalletsParams{
151151
Context: ctx,
152-
ID: l.preparationName,
152+
ID: s.preparationName,
153153
})
154154
if err != nil {
155155
return err
@@ -164,9 +164,9 @@ func (l *SingularityStore) Start(ctx context.Context) error {
164164
}
165165
if !walletFound {
166166
logger.Info("Wallet was not found. Creating...")
167-
if attachWalletRes, err := l.singularityClient.WalletAssociation.AttachWallet(&wallet_association.AttachWalletParams{
167+
if attachWalletRes, err := s.singularityClient.WalletAssociation.AttachWallet(&wallet_association.AttachWalletParams{
168168
Context: ctx,
169-
ID: l.preparationName,
169+
ID: s.preparationName,
170170
Wallet: wlt.Address,
171171
}); err != nil {
172172
logger.Errorw("Failed to add wallet to preparation", "err", err)
@@ -177,9 +177,9 @@ func (l *SingularityStore) Start(ctx context.Context) error {
177177
}
178178
// Ensure schedules are created
179179
// TODO: handle config changes for replication -- singularity currently has no modify schedule endpoint
180-
listPreparationSchedulesRes, err := l.singularityClient.DealSchedule.ListPreparationSchedules(&deal_schedule.ListPreparationSchedulesParams{
180+
listPreparationSchedulesRes, err := s.singularityClient.DealSchedule.ListPreparationSchedules(&deal_schedule.ListPreparationSchedulesParams{
181181
Context: ctx,
182-
ID: l.preparationName,
182+
ID: s.preparationName,
183183
})
184184

185185
switch {
@@ -192,12 +192,12 @@ func (l *SingularityStore) Start(ctx context.Context) error {
192192
return err
193193
}
194194

195-
pricePerGBEpoch, _ := (new(big.Rat).SetFrac(l.pricePerGiBEpoch.Int, big.NewInt(int64(1e18)))).Float64()
196-
pricePerGB, _ := (new(big.Rat).SetFrac(l.pricePerGiB.Int, big.NewInt(int64(1e18)))).Float64()
197-
pricePerDeal, _ := (new(big.Rat).SetFrac(l.pricePerDeal.Int, big.NewInt(int64(1e18)))).Float64()
195+
pricePerGBEpoch, _ := (new(big.Rat).SetFrac(s.pricePerGiBEpoch.Int, big.NewInt(int64(1e18)))).Float64()
196+
pricePerGB, _ := (new(big.Rat).SetFrac(s.pricePerGiB.Int, big.NewInt(int64(1e18)))).Float64()
197+
pricePerDeal, _ := (new(big.Rat).SetFrac(s.pricePerDeal.Int, big.NewInt(int64(1e18)))).Float64()
198198

199-
logger.Infof("Checking %v storage providers", len(l.storageProviders))
200-
for _, sp := range l.storageProviders {
199+
logger.Infof("Checking %v storage providers", len(s.storageProviders))
200+
for _, sp := range s.storageProviders {
201201
logger.Infof("Checking storage provider %s", sp)
202202
var foundSchedule *models.ModelSchedule
203203
logger := logger.With("provider", sp)
@@ -211,27 +211,27 @@ func (l *SingularityStore) Start(ctx context.Context) error {
211211
if foundSchedule != nil {
212212
// If schedule was found, update it
213213
logger.Infow("Schedule found for provider. Updating with latest settings...", "id", foundSchedule.ID)
214-
_, err := l.singularityClient.DealSchedule.UpdateSchedule(&deal_schedule.UpdateScheduleParams{
214+
_, err := s.singularityClient.DealSchedule.UpdateSchedule(&deal_schedule.UpdateScheduleParams{
215215
Context: ctx,
216216
ID: foundSchedule.ID,
217217
Body: &models.ScheduleUpdateRequest{
218218
PricePerGbEpoch: pricePerGBEpoch,
219219
PricePerGb: pricePerGB,
220220
PricePerDeal: pricePerDeal,
221-
Verified: &l.verifiedDeal,
222-
Ipni: &l.ipniAnnounce,
223-
KeepUnsealed: &l.keepUnsealed,
224-
StartDelay: ptr.String(strconv.Itoa(int(l.dealStartDelay)*builtin.EpochDurationSeconds) + "s"),
225-
Duration: ptr.String(strconv.Itoa(int(l.dealDuration)*builtin.EpochDurationSeconds) + "s"),
226-
ScheduleCron: l.scheduleCron,
227-
ScheduleCronPerpetual: l.scheduleCronPerpetual,
228-
ScheduleDealNumber: int64(l.scheduleDealNumber),
229-
TotalDealNumber: int64(l.totalDealNumber),
230-
ScheduleDealSize: l.scheduleDealSize,
231-
TotalDealSize: l.totalDealSize,
232-
MaxPendingDealSize: l.maxPendingDealSize,
233-
MaxPendingDealNumber: int64(l.maxPendingDealNumber),
234-
URLTemplate: l.scheduleUrlTemplate,
221+
Verified: &s.verifiedDeal,
222+
Ipni: &s.ipniAnnounce,
223+
KeepUnsealed: &s.keepUnsealed,
224+
StartDelay: ptr.String(strconv.Itoa(int(s.dealStartDelay)*builtin.EpochDurationSeconds) + "s"),
225+
Duration: ptr.String(strconv.Itoa(int(s.dealDuration)*builtin.EpochDurationSeconds) + "s"),
226+
ScheduleCron: s.scheduleCron,
227+
ScheduleCronPerpetual: s.scheduleCronPerpetual,
228+
ScheduleDealNumber: int64(s.scheduleDealNumber),
229+
TotalDealNumber: int64(s.totalDealNumber),
230+
ScheduleDealSize: s.scheduleDealSize,
231+
TotalDealSize: s.totalDealSize,
232+
MaxPendingDealSize: s.maxPendingDealSize,
233+
MaxPendingDealNumber: int64(s.maxPendingDealNumber),
234+
URLTemplate: s.scheduleUrlTemplate,
235235
},
236236
})
237237
if err != nil {
@@ -241,28 +241,28 @@ func (l *SingularityStore) Start(ctx context.Context) error {
241241
} else {
242242
// Otherwise, create it
243243
logger.Info("Schedule not found for provider. Creating...")
244-
if createScheduleRes, err := l.singularityClient.DealSchedule.CreateSchedule(&deal_schedule.CreateScheduleParams{
244+
if createScheduleRes, err := s.singularityClient.DealSchedule.CreateSchedule(&deal_schedule.CreateScheduleParams{
245245
Context: ctx,
246246
Schedule: &models.ScheduleCreateRequest{
247-
Preparation: l.preparationName,
247+
Preparation: s.preparationName,
248248
Provider: sp.String(),
249249
PricePerGbEpoch: pricePerGBEpoch,
250250
PricePerGb: pricePerGB,
251251
PricePerDeal: pricePerDeal,
252-
Verified: &l.verifiedDeal,
253-
Ipni: &l.ipniAnnounce,
254-
KeepUnsealed: &l.keepUnsealed,
255-
StartDelay: ptr.String(strconv.Itoa(int(l.dealStartDelay)*builtin.EpochDurationSeconds) + "s"),
256-
Duration: ptr.String(strconv.Itoa(int(l.dealDuration)*builtin.EpochDurationSeconds) + "s"),
257-
ScheduleCron: l.scheduleCron,
258-
ScheduleCronPerpetual: l.scheduleCronPerpetual,
259-
ScheduleDealNumber: int64(l.scheduleDealNumber),
260-
TotalDealNumber: int64(l.totalDealNumber),
261-
ScheduleDealSize: l.scheduleDealSize,
262-
TotalDealSize: l.totalDealSize,
263-
MaxPendingDealSize: l.maxPendingDealSize,
264-
MaxPendingDealNumber: int64(l.maxPendingDealNumber),
265-
URLTemplate: l.scheduleUrlTemplate,
252+
Verified: &s.verifiedDeal,
253+
Ipni: &s.ipniAnnounce,
254+
KeepUnsealed: &s.keepUnsealed,
255+
StartDelay: ptr.String(strconv.Itoa(int(s.dealStartDelay)*builtin.EpochDurationSeconds) + "s"),
256+
Duration: ptr.String(strconv.Itoa(int(s.dealDuration)*builtin.EpochDurationSeconds) + "s"),
257+
ScheduleCron: s.scheduleCron,
258+
ScheduleCronPerpetual: s.scheduleCronPerpetual,
259+
ScheduleDealNumber: int64(s.scheduleDealNumber),
260+
TotalDealNumber: int64(s.totalDealNumber),
261+
ScheduleDealSize: s.scheduleDealSize,
262+
TotalDealSize: s.totalDealSize,
263+
MaxPendingDealSize: s.maxPendingDealSize,
264+
MaxPendingDealNumber: int64(s.maxPendingDealNumber),
265+
URLTemplate: s.scheduleUrlTemplate,
266266
},
267267
}); err != nil {
268268
logger.Errorw("Failed to create schedule for provider", "err", err)
@@ -272,37 +272,41 @@ func (l *SingularityStore) Start(ctx context.Context) error {
272272
}
273273
}
274274
}
275-
go l.runPreparationJobs()
276-
go l.runCleanupWorker()
275+
276+
s.closed.Add(1)
277+
go s.runPreparationJobs()
278+
279+
s.closed.Add(1)
280+
go s.runCleanupWorker()
281+
277282
return nil
278283
}
279284

280-
func (l *SingularityStore) runPreparationJobs() {
281-
l.closed.Add(1)
282-
defer l.closed.Done()
285+
func (s *SingularityStore) runPreparationJobs() {
286+
defer s.closed.Done()
283287

284288
// Create a context that gets canceled when this function exits.
285289
ctx, cancel := context.WithCancel(context.Background())
286290
defer cancel()
287291

288292
for {
289293
select {
290-
case <-l.closing:
294+
case <-s.closing:
291295
return
292-
case fileID := <-l.toPack:
293-
prepareToPackSourceRes, err := l.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{
296+
case fileID := <-s.toPack:
297+
prepareToPackSourceRes, err := s.singularityClient.File.PrepareToPackFile(&file.PrepareToPackFileParams{
294298
Context: ctx,
295299
ID: int64(fileID),
296300
})
297301
if err != nil {
298302
logger.Errorw("preparing to pack file", "fileID", fileID, "error", err)
299303
}
300-
if prepareToPackSourceRes.Payload > l.packThreshold {
304+
if prepareToPackSourceRes.Payload > s.packThreshold {
301305
// mark outstanding pack jobs as ready to go so we can make CAR files
302-
_, err := l.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{
306+
_, err := s.singularityClient.Job.PrepareToPackSource(&job.PrepareToPackSourceParams{
303307
Context: ctx,
304-
ID: l.preparationName,
305-
Name: l.sourceName,
308+
ID: s.preparationName,
309+
Name: s.sourceName,
306310
})
307311
if err != nil {
308312
logger.Errorw("preparing to pack source", "error", err)
@@ -312,12 +316,12 @@ func (l *SingularityStore) runPreparationJobs() {
312316
}
313317
}
314318

315-
func (l *SingularityStore) Shutdown(ctx context.Context) error {
316-
close(l.closing)
319+
func (s *SingularityStore) Shutdown(ctx context.Context) error {
320+
close(s.closing)
317321

318322
done := make(chan struct{})
319323
go func() {
320-
l.closed.Wait()
324+
s.closed.Wait()
321325
close(done)
322326
}()
323327

@@ -472,7 +476,6 @@ func (s *SingularityStore) Describe(ctx context.Context, id blob.ID) (*blob.Desc
472476
}
473477

474478
func (s *SingularityStore) runCleanupWorker() {
475-
s.closed.Add(1)
476479
defer s.closed.Done()
477480

478481
// Run immediately once before starting ticker

0 commit comments

Comments
 (0)