Skip to content

Commit ed22c27

Browse files
committed
Start uploading buffered files immediately
1 parent 932c805 commit ed22c27

File tree

5 files changed

+75
-4
lines changed

5 files changed

+75
-4
lines changed

modules/manager/contractor/contractor.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Contractor struct {
4949
interruptMaintenance chan struct{}
5050
maintenanceLock siasync.TryMutex
5151
uploadingBufferedFiles bool
52+
runningUploads map[string]func()
5253

5354
blockHeight uint64
5455
synced chan struct{}
@@ -336,6 +337,7 @@ func contractorBlockingStartup(db *sql.DB, cs modules.ConsensusSet, m modules.Ma
336337

337338
interruptMaintenance: make(chan struct{}),
338339
synced: make(chan struct{}),
340+
runningUploads: make(map[string]func()),
339341

340342
renters: make(map[types.PublicKey]modules.Renter),
341343

@@ -690,3 +692,29 @@ func (c *Contractor) DownloadObject(w io.Writer, rpk types.PublicKey, bucket, pa
690692

691693
return err
692694
}
695+
696+
// StartUploading initiates uploading buffered files.
697+
func (c *Contractor) StartUploading() {
698+
if err := c.tg.Add(); err != nil {
699+
return
700+
}
701+
defer c.tg.Done()
702+
c.managedUploadBufferedFiles()
703+
}
704+
705+
// CancelUpload cancels a running upload.
706+
func (c *Contractor) CancelUpload(rpk types.PublicKey, bucket, path []byte) {
707+
pk := make([]byte, 32)
708+
copy(pk, rpk[:])
709+
mapKey := string(pk) + string(bucket) + ":" + string(path)
710+
c.mu.Lock()
711+
cancelFunc, ok := c.runningUploads[mapKey]
712+
if ok {
713+
delete(c.runningUploads, mapKey)
714+
cancelFunc()
715+
if len(c.runningUploads) == 0 {
716+
c.uploadingBufferedFiles = false
717+
}
718+
}
719+
c.mu.Unlock()
720+
}

modules/manager/contractor/database.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1522,6 +1522,7 @@ func (c *Contractor) managedUploadBufferedFiles() {
15221522
c.mu.Lock()
15231523
if c.uploadingBufferedFiles {
15241524
c.mu.Unlock()
1525+
c.log.Println("INFO: skipping file uploads since another thread is running already")
15251526
return
15261527
}
15271528
c.uploadingBufferedFiles = true
@@ -1548,6 +1549,11 @@ func (c *Contractor) managedUploadBufferedFiles() {
15481549
defer rows.Close()
15491550

15501551
for rows.Next() {
1552+
select {
1553+
case <-c.tg.StopChan():
1554+
return
1555+
default:
1556+
}
15511557
var n, encrypted string
15521558
pk := make([]byte, 32)
15531559
var bucket, path, mimeType []byte

modules/manager/contractor/upload.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,18 @@ func (mgr *uploadManager) migrate(ctx context.Context, rpk types.PublicKey, shar
230230
func (c *Contractor) managedUploadObject(r io.Reader, rpk types.PublicKey, bucket, path, mimeType []byte, encrypted string) (fm modules.FileMetadata, err error) {
231231
// Create the context and setup its cancelling.
232232
ctx, cancel := context.WithCancel(context.Background())
233-
defer cancel()
233+
pk := make([]byte, 32)
234+
copy(pk, rpk[:])
235+
mapKey := string(pk) + string(bucket) + ":" + string(path)
236+
c.mu.Lock()
237+
c.runningUploads[mapKey] = cancel
238+
c.mu.Unlock()
239+
defer func() {
240+
c.mu.Lock()
241+
delete(c.runningUploads, mapKey)
242+
cancel()
243+
c.mu.Unlock()
244+
}()
234245

235246
// Fetch necessary params.
236247
c.mu.RLock()

modules/manager/database.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,14 @@ func (m *Manager) RegisterUpload(pk types.PublicKey, bucket, path, mimeType []by
642642
complete,
643643
encText,
644644
)
645-
return err
645+
if err != nil {
646+
return err
647+
}
648+
649+
// Initiate the upload.
650+
go m.hostContractor.StartUploading()
651+
652+
return nil
646653
}
647654

648655
// GetBufferSize returns the total size of the temporary files.
@@ -677,20 +684,27 @@ func (m *Manager) GetBufferSize() (total uint64, err error) {
677684
// DeleteBufferedFiles deletes the files waiting to be uploaded.
678685
func (m *Manager) DeleteBufferedFiles(pk types.PublicKey) error {
679686
// Make a list of file names.
680-
rows, err := m.db.Query("SELECT filename FROM ctr_uploads WHERE renter_pk = ?", pk[:])
687+
rows, err := m.db.Query(`
688+
SELECT filename, bucket, filepath
689+
FROM ctr_uploads
690+
WHERE renter_pk = ?
691+
`, pk[:])
681692
if err != nil {
682693
m.log.Println("ERROR: unable to query files:", err)
683694
return modules.AddContext(err, "unable to query files")
684695
}
685696

697+
// Make a list of filenames and also cancel any running uploads.
686698
var names []string
687699
for rows.Next() {
688700
var name string
689-
if err := rows.Scan(&name); err != nil {
701+
var bucket, path []byte
702+
if err := rows.Scan(&name, &bucket, &path); err != nil {
690703
rows.Close()
691704
m.log.Println("ERROR: unable to retrieve filename:", err)
692705
return modules.AddContext(err, "unable to retrieve filename")
693706
}
707+
m.hostContractor.CancelUpload(pk, bucket, path)
694708
names = append(names, name)
695709
}
696710
rows.Close()
@@ -723,6 +737,9 @@ func (m *Manager) DeleteBufferedFiles(pk types.PublicKey) error {
723737
// DeleteBufferedFile deletes the specified file and the associated
724738
// database record.
725739
func (m *Manager) DeleteBufferedFile(pk types.PublicKey, bucket, path []byte) error {
740+
// Cancel the upload.
741+
m.hostContractor.CancelUpload(pk, bucket, path)
742+
726743
var name string
727744
err := m.db.QueryRow(`
728745
SELECT filename
@@ -1203,5 +1220,8 @@ func (m *Manager) AssembleParts(pk types.PublicKey, id types.Hash256) (err error
12031220
return modules.AddContext(err, "couldn't register new upload")
12041221
}
12051222

1223+
// Initiate the upload.
1224+
go m.hostContractor.StartUploading()
1225+
12061226
return
12071227
}

modules/manager/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ type hostContractor interface {
5656
// CancelContract cancels the renter's contract.
5757
CancelContract(types.FileContractID) error
5858

59+
// CancelUpload cancels a running upload.
60+
CancelUpload(types.PublicKey, []byte, []byte)
61+
5962
// Contract returns the contract with the given ID.
6063
Contract(types.FileContractID) (modules.RenterContract, bool)
6164

@@ -136,6 +139,9 @@ type hostContractor interface {
136139
// RetrieveMetadata retrieves the file metadata from the database.
137140
RetrieveMetadata(types.PublicKey, []modules.BucketFiles) ([]modules.FileMetadata, error)
138141

142+
// StartUploading initiates uploading buffered files.
143+
StartUploading()
144+
139145
// Synced returns a channel that is closed when the contractor is fully
140146
// synced with the peer-to-peer network.
141147
Synced() <-chan struct{}

0 commit comments

Comments
 (0)