Skip to content

Commit 2623b32

Browse files
committed
work around to terminate timeout transfer request
1 parent f2a28a8 commit 2623b32

File tree

1 file changed

+12
-7
lines changed
  • mooncake-p2p-store/src/p2pstore

1 file changed

+12
-7
lines changed

mooncake-p2p-store/src/p2pstore/core.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func (store *P2PStore) List(ctx context.Context, namePrefix string) ([]PayloadIn
246246
return result, nil
247247
}
248248

249-
func (store *P2PStore) doGetReplica(payload *Payload, addrList []uintptr, sizeList []uint64) error {
249+
func (store *P2PStore) doGetReplica(ctx context.Context, payload *Payload, addrList []uintptr, sizeList []uint64) error {
250250
var wg sync.WaitGroup
251251
errChan := make(chan error, 1)
252252

@@ -267,7 +267,7 @@ func (store *P2PStore) doGetReplica(payload *Payload, addrList []uintptr, sizeLi
267267
wg.Add(1)
268268
go func() {
269269
defer wg.Done()
270-
err = store.performTransfer(source, shard)
270+
err = store.performTransfer(ctx, source, shard)
271271
if err != nil {
272272
select {
273273
case errChan <- err:
@@ -336,7 +336,7 @@ func (store *P2PStore) GetReplica(ctx context.Context, name string, addrList []u
336336
}
337337
for {
338338
_ = store.transfer.syncSegmentCache()
339-
err = store.doGetReplica(payload, addrList, sizeList)
339+
err = store.doGetReplica(ctx, payload, addrList, sizeList)
340340
if err != nil {
341341
return err
342342
}
@@ -354,7 +354,7 @@ func (store *P2PStore) GetReplica(ctx context.Context, name string, addrList []u
354354
return store.updatePayloadMetadata(ctx, name, addrList, sizeList, payload, revision)
355355
}
356356

357-
func (store *P2PStore) performTransfer(source uintptr, shard Shard) error {
357+
func (store *P2PStore) performTransfer(ctx context.Context, source uintptr, shard Shard) error {
358358
retryCount := 0
359359
for retryCount < shard.Count() {
360360
batchID, err := store.transfer.allocateBatchID(1)
@@ -387,9 +387,14 @@ func (store *P2PStore) performTransfer(source uintptr, shard Shard) error {
387387

388388
var status int
389389
for status == STATUS_WAITING || status == STATUS_PENDING {
390-
status, _, err = store.transfer.getTransferStatus(batchID, 0)
391-
if err != nil {
392-
return err
390+
select {
391+
case <-ctx.Done():
392+
return ctx.Err()
393+
default:
394+
status, _, err = store.transfer.getTransferStatus(batchID, 0)
395+
if err != nil {
396+
return err
397+
}
393398
}
394399
}
395400

0 commit comments

Comments
 (0)