fix: harden usenet/debrid downloaders from critico review
- Consolidate 3 maps (active, taskDirs, taskTrackers) into single activeDownload struct — eliminates out-of-sync state on mid-function panic - Cancel() runs os.RemoveAll in background goroutine (non-blocking) - Flush(): clear dirty before unlock to prevent concurrent flush race on same tmp file; remove fragile re-mark-on-error pattern - Revert RWMutex → Mutex in ProgressTracker (negligible benefit under write-heavy workload, higher overhead) - Remove file.Sync() from debrid and usenet downloaders (Close flushes kernel buffers; fsync blocks for seconds on large files) - Pin golangci-lint to v2.1.6 in CI (was floating with `latest`) - Fix CI matrix: Go 1.25+1.26 (was 1.24+1.25, but go.mod requires 1.25)
This commit is contained in:
parent
c9bcb96dab
commit
6e07e82d51
5 changed files with 61 additions and 43 deletions
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
|
|
@ -80,7 +80,7 @@ jobs:
|
|||
- name: Run golangci-lint
|
||||
uses: golangci/golangci-lint-action@v9
|
||||
with:
|
||||
version: latest
|
||||
version: v2.1.6
|
||||
|
||||
coverage:
|
||||
name: Coverage
|
||||
|
|
|
|||
|
|
@ -182,12 +182,7 @@ func (d *DebridDownloader) Download(ctx context.Context, task *Task, outputDir s
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("open file: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := file.Sync(); err != nil {
|
||||
log.Printf("[%s] sync warning: %v", shortID(task.ID), err)
|
||||
}
|
||||
file.Close()
|
||||
}()
|
||||
defer file.Close()
|
||||
|
||||
// Download with progress reporting
|
||||
downloaded := startOffset
|
||||
|
|
|
|||
|
|
@ -18,6 +18,13 @@ import (
|
|||
"github.com/torrentclaw/torrentclaw-cli/internal/usenet/postprocess"
|
||||
)
|
||||
|
||||
// activeDownload holds the state for a single in-progress usenet download.
|
||||
type activeDownload struct {
|
||||
cancel context.CancelFunc
|
||||
taskDir string // populated after MkdirAll; empty before
|
||||
tracker *download.ProgressTracker // populated after tracker creation; nil before
|
||||
}
|
||||
|
||||
// UsenetDownloader downloads via Usenet/NZB protocol.
|
||||
// It searches for NZBs, downloads articles via NNTP, and assembles the final files.
|
||||
type UsenetDownloader struct {
|
||||
|
|
@ -26,7 +33,7 @@ type UsenetDownloader struct {
|
|||
|
||||
mu sync.Mutex
|
||||
nntpClient *nntp.Client
|
||||
active map[string]context.CancelFunc
|
||||
active map[string]*activeDownload
|
||||
|
||||
// Cached credentials
|
||||
credentials *agent.UsenetCredentials
|
||||
|
|
@ -43,7 +50,7 @@ func NewUsenetDownloader(apiClient *agent.Client) *UsenetDownloader {
|
|||
return &UsenetDownloader{
|
||||
apiClient: apiClient,
|
||||
enabled: true,
|
||||
active: make(map[string]context.CancelFunc),
|
||||
active: make(map[string]*activeDownload),
|
||||
nzbCache: make(map[string]*agent.NzbSearchResult),
|
||||
}
|
||||
}
|
||||
|
|
@ -101,8 +108,9 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s
|
|||
// Create cancellable context
|
||||
dlCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
dl := &activeDownload{cancel: cancel}
|
||||
u.mu.Lock()
|
||||
u.active[task.ID] = cancel
|
||||
u.active[task.ID] = dl
|
||||
u.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
|
|
@ -189,8 +197,14 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s
|
|||
return nil, fmt.Errorf("create dir: %w", err)
|
||||
}
|
||||
|
||||
// Register tracker and taskDir for Cancel() cleanup
|
||||
u.mu.Lock()
|
||||
dl.taskDir = taskDir
|
||||
dl.tracker = tracker
|
||||
u.mu.Unlock()
|
||||
|
||||
// Step 6: Download all files via NNTP
|
||||
dl := download.NewDownloader(nntpClient)
|
||||
segDl := download.NewDownloader(nntpClient)
|
||||
|
||||
// Bridge download.Progress to engine.Progress
|
||||
dlProgressCh := make(chan download.Progress, 16)
|
||||
|
|
@ -213,7 +227,7 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s
|
|||
}
|
||||
}()
|
||||
|
||||
downloadedFiles, err := dl.DownloadNZB(dlCtx, nzbFile, taskDir, tracker, dlProgressCh)
|
||||
downloadedFiles, err := segDl.DownloadNZB(dlCtx, nzbFile, taskDir, tracker, dlProgressCh)
|
||||
close(dlProgressCh)
|
||||
|
||||
if err != nil {
|
||||
|
|
@ -276,17 +290,38 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s
|
|||
// Pause cancels an in-progress download but keeps files.
|
||||
func (u *UsenetDownloader) Pause(taskID string) error {
|
||||
u.mu.Lock()
|
||||
cancel, ok := u.active[taskID]
|
||||
dl := u.active[taskID]
|
||||
u.mu.Unlock()
|
||||
if ok {
|
||||
cancel()
|
||||
if dl != nil {
|
||||
dl.cancel()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cancel aborts an in-progress download.
|
||||
// Cancel aborts an in-progress download and removes partial files + resume state.
|
||||
func (u *UsenetDownloader) Cancel(taskID string) error {
|
||||
return u.Pause(taskID)
|
||||
u.mu.Lock()
|
||||
dl := u.active[taskID]
|
||||
u.mu.Unlock()
|
||||
|
||||
if dl == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Cancel context first — workers will stop and release file handles
|
||||
dl.cancel()
|
||||
|
||||
// Remove resume state (best-effort)
|
||||
if dl.tracker != nil {
|
||||
dl.tracker.Remove()
|
||||
}
|
||||
|
||||
// Remove partial download directory in background (can be slow for large dirs)
|
||||
if dl.taskDir != "" {
|
||||
go os.RemoveAll(dl.taskDir)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown closes the NNTP connection pool.
|
||||
|
|
@ -295,8 +330,8 @@ func (u *UsenetDownloader) Shutdown(_ context.Context) error {
|
|||
defer u.mu.Unlock()
|
||||
|
||||
// Cancel all active downloads
|
||||
for id, cancel := range u.active {
|
||||
cancel()
|
||||
for id, dl := range u.active {
|
||||
dl.cancel()
|
||||
delete(u.active, id)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -109,12 +109,7 @@ func (d *Downloader) DownloadFile(ctx context.Context, file nzb.File, fileIndex
|
|||
outFile.Truncate(totalBytes)
|
||||
}
|
||||
}
|
||||
defer func() {
|
||||
if err := outFile.Sync(); err != nil {
|
||||
log.Printf("[usenet] sync warning: %v", err)
|
||||
}
|
||||
outFile.Close()
|
||||
}()
|
||||
defer outFile.Close()
|
||||
|
||||
// Download segments using worker pool
|
||||
var downloaded atomic.Int64
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ type ProgressTracker struct {
|
|||
dir string // directory where progress files are stored
|
||||
files []fileProgress
|
||||
|
||||
mu sync.RWMutex
|
||||
mu sync.Mutex
|
||||
dirty bool
|
||||
lastFlush time.Time
|
||||
markCount int // marks since last flush
|
||||
|
|
@ -211,9 +211,9 @@ func (p *ProgressTracker) IsDone(fileIndex, segIndex int) bool {
|
|||
if segIndex < 0 || segIndex >= fp.segCount {
|
||||
return false
|
||||
}
|
||||
p.mu.RLock()
|
||||
p.mu.Lock()
|
||||
done := fp.completed[segIndex/8]&(1<<uint(segIndex%8)) != 0
|
||||
p.mu.RUnlock()
|
||||
p.mu.Unlock()
|
||||
return done
|
||||
}
|
||||
|
||||
|
|
@ -258,6 +258,8 @@ func (p *ProgressTracker) TotalCompleted() int {
|
|||
}
|
||||
|
||||
// Flush writes the current progress state to disk atomically (tmp + rename).
|
||||
// dirty is cleared before I/O to prevent concurrent Flush calls from racing
|
||||
// on the same tmp file. If I/O fails, the next MarkDone will re-set dirty.
|
||||
func (p *ProgressTracker) Flush() error {
|
||||
p.mu.Lock()
|
||||
if !p.dirty {
|
||||
|
|
@ -265,7 +267,9 @@ func (p *ProgressTracker) Flush() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Calculate total size
|
||||
// Snapshot state and clear dirty while holding the lock.
|
||||
// This serializes flushes: a concurrent MarkDone will set dirty=true
|
||||
// again, but won't trigger a competing Flush on the same tmp file.
|
||||
size := headerSize
|
||||
for i := range p.files {
|
||||
size += 4 + (p.files[i].segCount+7)/8
|
||||
|
|
@ -296,40 +300,29 @@ func (p *ProgressTracker) Flush() error {
|
|||
p.lastFlush = time.Now()
|
||||
p.mu.Unlock()
|
||||
|
||||
// Atomic write: tmp file + rename
|
||||
// Atomic write: tmp file + rename (outside lock — I/O is slow)
|
||||
if err := os.MkdirAll(p.dir, 0o755); err != nil {
|
||||
p.mu.Lock()
|
||||
p.dirty = true // re-mark so next Flush retries
|
||||
p.mu.Unlock()
|
||||
return fmt.Errorf("create resume dir: %w", err)
|
||||
}
|
||||
|
||||
tmpPath := p.progressPath() + ".tmp"
|
||||
if err := os.WriteFile(tmpPath, buf, 0o644); err != nil {
|
||||
p.mu.Lock()
|
||||
p.dirty = true
|
||||
p.mu.Unlock()
|
||||
return fmt.Errorf("write progress tmp: %w", err)
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpPath, p.progressPath()); err != nil {
|
||||
os.Remove(tmpPath)
|
||||
p.mu.Lock()
|
||||
p.dirty = true
|
||||
p.mu.Unlock()
|
||||
return fmt.Errorf("rename progress: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Remove deletes both the progress file and cached NZB file.
|
||||
func (p *ProgressTracker) Remove() error {
|
||||
// Remove deletes both the progress file and cached NZB file (best-effort).
|
||||
func (p *ProgressTracker) Remove() {
|
||||
os.Remove(p.progressPath())
|
||||
os.Remove(p.nzbPath())
|
||||
// Also remove tmp file if it exists
|
||||
os.Remove(p.progressPath() + ".tmp")
|
||||
return nil
|
||||
}
|
||||
|
||||
// CleanStaleFiles removes resume files older than maxAge from the given directory.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue