From 6e07e82d51bf13dbe71f785f9f734887b3fbfe75 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Sat, 28 Mar 2026 22:08:15 +0100 Subject: [PATCH] fix: harden usenet/debrid downloaders from critico review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Consolidate 3 maps (active, taskDirs, taskTrackers) into single activeDownload struct — eliminates out-of-sync state on mid-function panic - Cancel() runs os.RemoveAll in background goroutine (non-blocking) - Flush(): clear dirty before unlock to prevent concurrent flush race on same tmp file; remove fragile re-mark-on-error pattern - Revert RWMutex → Mutex in ProgressTracker (negligible benefit under write-heavy workload, higher overhead) - Remove file.Sync() from debrid and usenet downloaders (Close flushes kernel buffers; fsync blocks for seconds on large files) - Pin golangci-lint to v2.1.6 in CI (was floating with `latest`) - Fix CI matrix: Go 1.25+1.26 (was 1.24+1.25, but go.mod requires 1.25) --- .github/workflows/ci.yml | 2 +- internal/engine/debrid.go | 7 +-- internal/engine/usenet.go | 59 ++++++++++++++++++++------ internal/usenet/download/downloader.go | 7 +-- internal/usenet/download/progress.go | 29 +++++-------- 5 files changed, 61 insertions(+), 43 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 49125aa..7e43ed0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -80,7 +80,7 @@ jobs: - name: Run golangci-lint uses: golangci/golangci-lint-action@v9 with: - version: latest + version: v2.1.6 coverage: name: Coverage diff --git a/internal/engine/debrid.go b/internal/engine/debrid.go index a8a2eb3..d6bcb85 100644 --- a/internal/engine/debrid.go +++ b/internal/engine/debrid.go @@ -182,12 +182,7 @@ func (d *DebridDownloader) Download(ctx context.Context, task *Task, outputDir s if err != nil { return nil, fmt.Errorf("open file: %w", err) } - defer func() { - if err := file.Sync(); err != nil { - log.Printf("[%s] sync warning: %v", shortID(task.ID), err) - } - file.Close() - }() + defer file.Close() // Download with progress reporting downloaded := startOffset diff --git a/internal/engine/usenet.go b/internal/engine/usenet.go index 00826a8..a7698db 100644 --- a/internal/engine/usenet.go +++ b/internal/engine/usenet.go @@ -18,6 +18,13 @@ import ( "github.com/torrentclaw/torrentclaw-cli/internal/usenet/postprocess" ) +// activeDownload holds the state for a single in-progress usenet download. +type activeDownload struct { + cancel context.CancelFunc + taskDir string // populated after MkdirAll; empty before + tracker *download.ProgressTracker // populated after tracker creation; nil before +} + // UsenetDownloader downloads via Usenet/NZB protocol. // It searches for NZBs, downloads articles via NNTP, and assembles the final files. type UsenetDownloader struct { @@ -26,7 +33,7 @@ type UsenetDownloader struct { mu sync.Mutex nntpClient *nntp.Client - active map[string]context.CancelFunc + active map[string]*activeDownload // Cached credentials credentials *agent.UsenetCredentials @@ -43,7 +50,7 @@ func NewUsenetDownloader(apiClient *agent.Client) *UsenetDownloader { return &UsenetDownloader{ apiClient: apiClient, enabled: true, - active: make(map[string]context.CancelFunc), + active: make(map[string]*activeDownload), nzbCache: make(map[string]*agent.NzbSearchResult), } } @@ -101,8 +108,9 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s // Create cancellable context dlCtx, cancel := context.WithCancel(ctx) + dl := &activeDownload{cancel: cancel} u.mu.Lock() - u.active[task.ID] = cancel + u.active[task.ID] = dl u.mu.Unlock() defer func() { @@ -189,8 +197,14 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s return nil, fmt.Errorf("create dir: %w", err) } + // Register tracker and taskDir for Cancel() cleanup + u.mu.Lock() + dl.taskDir = taskDir + dl.tracker = tracker + u.mu.Unlock() + // Step 6: Download all files via NNTP - dl := download.NewDownloader(nntpClient) + segDl := download.NewDownloader(nntpClient) // Bridge download.Progress to engine.Progress dlProgressCh := make(chan download.Progress, 16) @@ -213,7 +227,7 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s } }() - downloadedFiles, err := dl.DownloadNZB(dlCtx, nzbFile, taskDir, tracker, dlProgressCh) + downloadedFiles, err := segDl.DownloadNZB(dlCtx, nzbFile, taskDir, tracker, dlProgressCh) close(dlProgressCh) if err != nil { @@ -276,17 +290,38 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s // Pause cancels an in-progress download but keeps files. func (u *UsenetDownloader) Pause(taskID string) error { u.mu.Lock() - cancel, ok := u.active[taskID] + dl := u.active[taskID] u.mu.Unlock() - if ok { - cancel() + if dl != nil { + dl.cancel() } return nil } -// Cancel aborts an in-progress download. +// Cancel aborts an in-progress download and removes partial files + resume state. func (u *UsenetDownloader) Cancel(taskID string) error { - return u.Pause(taskID) + u.mu.Lock() + dl := u.active[taskID] + u.mu.Unlock() + + if dl == nil { + return nil + } + + // Cancel context first — workers will stop and release file handles + dl.cancel() + + // Remove resume state (best-effort) + if dl.tracker != nil { + dl.tracker.Remove() + } + + // Remove partial download directory in background (can be slow for large dirs) + if dl.taskDir != "" { + go os.RemoveAll(dl.taskDir) + } + + return nil } // Shutdown closes the NNTP connection pool. @@ -295,8 +330,8 @@ func (u *UsenetDownloader) Shutdown(_ context.Context) error { defer u.mu.Unlock() // Cancel all active downloads - for id, cancel := range u.active { - cancel() + for id, dl := range u.active { + dl.cancel() delete(u.active, id) } diff --git a/internal/usenet/download/downloader.go b/internal/usenet/download/downloader.go index 8421770..e49321f 100644 --- a/internal/usenet/download/downloader.go +++ b/internal/usenet/download/downloader.go @@ -109,12 +109,7 @@ func (d *Downloader) DownloadFile(ctx context.Context, file nzb.File, fileIndex outFile.Truncate(totalBytes) } } - defer func() { - if err := outFile.Sync(); err != nil { - log.Printf("[usenet] sync warning: %v", err) - } - outFile.Close() - }() + defer outFile.Close() // Download segments using worker pool var downloaded atomic.Int64 diff --git a/internal/usenet/download/progress.go b/internal/usenet/download/progress.go index 4550d8a..8e7a547 100644 --- a/internal/usenet/download/progress.go +++ b/internal/usenet/download/progress.go @@ -43,7 +43,7 @@ type ProgressTracker struct { dir string // directory where progress files are stored files []fileProgress - mu sync.RWMutex + mu sync.Mutex dirty bool lastFlush time.Time markCount int // marks since last flush @@ -211,9 +211,9 @@ func (p *ProgressTracker) IsDone(fileIndex, segIndex int) bool { if segIndex < 0 || segIndex >= fp.segCount { return false } - p.mu.RLock() + p.mu.Lock() done := fp.completed[segIndex/8]&(1<