Skip to content

Commit

Permalink
Do metadata deletes concurrently
Browse files Browse the repository at this point in the history
If there is a non-context error, retry the specific dhstore that had an error. This helps avoid a retry for all dhstores.
  • Loading branch information
gammazero committed Dec 12, 2023
1 parent d31fcd0 commit efc5946
Showing 1 changed file with 55 additions and 17 deletions.
72 changes: 55 additions & 17 deletions store/dhstore/dhstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,31 +226,61 @@ func (s *dhStore) RemoveProviderContext(providerID peer.ID, contextID []byte) er
hvk := dhash.SHA256(vk, nil)
b58hvk := "/" + base58.Encode(hvk)

errs := make(chan error, len(s.metaDeleteURLs))
for _, u := range s.metaDeleteURLs {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, u+b58hvk, nil)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
go func(dhURL string) {
err := s.sendDelContextRequest(ctx, dhURL)
if err != nil && ctx.Err() == nil {
log.Errorw("Failed metadata delete, retrying", "err", err, "provider", providerID)
// If ctx is still ok, then try once more with the specific
// dhstore that returned the error. This avoids having to retry
// sending everything to all dhstores again.
time.Sleep(50 * time.Millisecond)
err = s.sendDelContextRequest(ctx, dhURL)
if err != nil {
log.Errorw("Failed metadata delete retry", "err", err, "provider", providerID)
}
}
errs <- err
}(u + b58hvk)
}

start := time.Now()
rsp, err := s.httpClient.Do(req)
for i := 0; i < len(s.metaDeleteURLs); i++ {
err := <-errs
if err != nil {
// Return first error. Goroutines will complete because errs
// channel is buffered to allow them all to write.
return err
}
io.Copy(io.Discard, rsp.Body)
rsp.Body.Close()
// No need to check context, because goroutines will exit if canceled.
}
log.Infow("Sent metadata delete to all dhstores", "dhstores", len(s.metaDeleteURLs), "provider", providerID)
return nil
}

if rsp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to delete metadata at %s: %s", u, http.StatusText(rsp.StatusCode))
}
func (s *dhStore) sendDelContextRequest(ctx context.Context, dhURL string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, dhURL, nil)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

stats.RecordWithOptions(context.Background(),
stats.WithTags(tag.Insert(metrics.Method, "delete")),
stats.WithMeasurements(metrics.DHMetadataLatency.M(metrics.MsecSince(start))))
start := time.Now()
rsp, err := s.httpClient.Do(req)
if err != nil {
return err
}
io.Copy(io.Discard, rsp.Body)
rsp.Body.Close()

log.Infow("Sent metadata delete to dhstore", "url", u, "provider", providerID)
if rsp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to delete metadata at %s: %s", dhURL, http.StatusText(rsp.StatusCode))
}

stats.RecordWithOptions(context.Background(),
stats.WithTags(tag.Insert(metrics.Method, "delete")),
stats.WithMeasurements(metrics.DHMetadataLatency.M(metrics.MsecSince(start))))

return nil
}

Expand Down Expand Up @@ -378,7 +408,15 @@ func (s *dhStore) sendDHDeleteIndexRequest(ctx context.Context, merges []client.
errs := make(chan error, len(s.indexDeleteURLs))
for _, u := range s.indexDeleteURLs {
go func(dhURL string) {
errs <- s.sendDelRequest(ctx, dhURL, data)
err := s.sendDelRequest(ctx, dhURL, data)
if err != nil && ctx.Err() == nil {
// If ctx is still ok, then try once more with the specific
// dhstore that returned the error. This avoids having to retry
// sending everything to all dhstores again.
time.Sleep(50 * time.Millisecond)
err = s.sendDelRequest(ctx, dhURL, data)
}
errs <- err
}(u)
}

Expand Down

0 comments on commit efc5946

Please sign in to comment.