fix(engine): cross-backend integrity guard with retry-then-damaged

A truncated debrid download (in-memory byte counter hit 100% while the
NFS write-back silently dropped most of the bytes) was marked completed.
The 1.1.6 fsync fix closed the debrid-specific hole; this generalizes the
guarantee so "completed" never means a corrupt file on ANY backend.

- IntegrityError + bounded retry: on a corrupt/short result the manager
  re-downloads the same source up to 3x (clean start), then surfaces the
  task as damaged ("corrupt download:" prefix) instead of completing it.
- verify (size mismatch / empty), debrid (incomplete / post-write / flush),
  torrent (BytesMissing), usenet (par2 unrepairable / repair-failed) all
  classify integrity failures so they route through the retry/damaged path.
- scanner: a file ffprobe can't read is emitted as a damaged library_item
  (reason "unreadable") instead of being silently dropped from the sync.
- tests: manager retry-then-success + retry-exhausted-then-damaged,
  verifying->resolving transition, damaged sync item.
This commit is contained in:
Deivid Soto 2026-06-17 12:51:47 +02:00
parent 271413e0f9
commit a5f3f0914a
13 changed files with 400 additions and 91 deletions

View file

@ -2,7 +2,9 @@ package engine
import (
"context"
"fmt"
"log"
"os"
"sync"
"sync/atomic"
@ -379,110 +381,144 @@ func (m *Manager) processTask(ctx context.Context, task *Task) {
m.activeMu.Unlock()
}()
// 1. Resolve method
if err := task.Transition(StatusResolving); err != nil {
m.fail(ctx, task, "transition error: "+err.Error())
return
}
// On a corrupt/truncated result (a downloader's own integrity guard, or the
// shared on-disk verify below), re-download the SAME source a bounded number
// of times — a fresh clean-start attempt usually lands intact (the 2026-06-15
// debrid NFS write-back truncation was transient). Only after exhausting the
// retries is the task surfaced as damaged, so "completed" NEVER means a corrupt
// file. (User-chosen "both" policy: auto-retry, then visible-damaged.)
const maxIntegrityAttempts = 3
method, err := resolveMethod(ctx, task, m.downloaders, m.cfg.PreferredMethods)
if err != nil {
m.fail(ctx, task, "no method available: "+err.Error())
return
}
task.ResolvedMethod = method
log.Printf("[%s] resolved method: %s", agent.ShortID(task.ID), method)
// 2. Download
if err := task.Transition(StatusDownloading); err != nil {
m.fail(ctx, task, "transition error: "+err.Error())
return
}
progressCh := make(chan Progress, 16)
// Drain progress channel (just for logging; reporter reads directly from task)
go func() {
for range progressCh {
// Progress already applied via task.UpdateProgress in the downloader
}
}()
dl := m.downloaders[method]
result, err := dl.Download(ctx, task, m.cfg.OutputDir, progressCh)
close(progressCh)
if err != nil {
// A full disk is terminal — another source would fill the same disk, so
// skip the fallback and surface the clear message immediately.
if IsInsufficientDisk(err) {
for attempt := 1; ; attempt++ {
result, err := m.attemptDownload(ctx, task)
if err != nil {
if IsInsufficientDisk(err) {
// Terminal — another source would fill the same disk.
m.fail(ctx, task, err.Error())
return
}
if IsIntegrity(err) {
if attempt < maxIntegrityAttempts {
log.Printf("[%s] integrity check failed (attempt %d/%d), re-downloading clean: %v",
agent.ShortID(task.ID), attempt, maxIntegrityAttempts, err)
continue
}
m.failDamaged(ctx, task, err)
return
}
m.fail(ctx, task, err.Error())
return
}
// Try fallback
if tryFallback(task, m.downloaders, m.cfg.PreferredMethods) {
log.Printf("[%s] %s failed, trying fallback: %v", agent.ShortID(task.ID), method, err)
if err := task.Transition(StatusResolving); err == nil {
m.processTaskRetry(ctx, task)
// Shared on-disk safety net across every backend — the last line of defense
// against a truncated/short file slipping past a downloader's own checks.
if err := task.Transition(StatusVerifying); err != nil {
m.fail(ctx, task, "transition error: "+err.Error())
return
}
if verr := verify(result); verr != nil {
if IsIntegrity(verr) {
removeBrokenResult(task.ID, result) // clean start so a resume doesn't append to a short file
if attempt < maxIntegrityAttempts {
log.Printf("[%s] verify failed (attempt %d/%d), re-downloading clean: %v",
agent.ShortID(task.ID), attempt, maxIntegrityAttempts, verr)
continue
}
m.failDamaged(ctx, task, verr)
return
}
m.fail(ctx, task, "verification failed: "+verr.Error())
return
}
m.fail(ctx, task, err.Error())
m.finalizeVerified(ctx, task, result)
return
}
m.finalize(ctx, task, result)
}
// processTaskRetry handles fallback after a method failure.
func (m *Manager) processTaskRetry(ctx context.Context, task *Task) {
// attemptDownload resolves a method and downloads once, falling back to the next
// configured method on a plain transport failure (NOT on disk-full or integrity
// failures — those are the caller's to handle). Returns the download Result.
func (m *Manager) attemptDownload(ctx context.Context, task *Task) (*Result, error) {
if err := task.Transition(StatusResolving); err != nil {
return nil, fmt.Errorf("transition error: %w", err)
}
method, err := resolveMethod(ctx, task, m.downloaders, m.cfg.PreferredMethods)
if err != nil {
m.fail(ctx, task, "fallback failed: "+err.Error())
return
return nil, fmt.Errorf("no method available: %w", err)
}
task.ResolvedMethod = method
log.Printf("[%s] fallback to: %s", agent.ShortID(task.ID), method)
log.Printf("[%s] resolved method: %s", agent.ShortID(task.ID), method)
if err := task.Transition(StatusDownloading); err != nil {
m.fail(ctx, task, "transition error: "+err.Error())
return
return nil, fmt.Errorf("transition error: %w", err)
}
result, err := m.runDownload(ctx, task, method)
if err != nil {
// Disk-full is terminal; an integrity failure is retried in-place by the
// caller (same source, clean start) — don't burn the method fallback on
// either. Only a plain transport failure tries the next method.
if IsInsufficientDisk(err) || IsIntegrity(err) {
return nil, err
}
if tryFallback(task, m.downloaders, m.cfg.PreferredMethods) {
log.Printf("[%s] %s failed, trying fallback: %v", agent.ShortID(task.ID), method, err)
if terr := task.Transition(StatusResolving); terr != nil {
return nil, err
}
return m.attemptFallback(ctx, task)
}
return nil, err
}
return result, nil
}
// attemptFallback runs the next available method after a transport failure.
func (m *Manager) attemptFallback(ctx context.Context, task *Task) (*Result, error) {
method, err := resolveMethod(ctx, task, m.downloaders, m.cfg.PreferredMethods)
if err != nil {
return nil, fmt.Errorf("fallback failed: %w", err)
}
task.ResolvedMethod = method
log.Printf("[%s] fallback to: %s", agent.ShortID(task.ID), method)
if err := task.Transition(StatusDownloading); err != nil {
return nil, fmt.Errorf("transition error: %w", err)
}
return m.runDownload(ctx, task, method)
}
// runDownload invokes a single downloader, draining its progress channel.
func (m *Manager) runDownload(ctx context.Context, task *Task, method DownloadMethod) (*Result, error) {
progressCh := make(chan Progress, 16)
// Drain progress channel (reporter reads progress directly from the task).
go func() {
for range progressCh {
}
}()
dl := m.downloaders[method]
result, err := dl.Download(ctx, task, m.cfg.OutputDir, progressCh)
close(progressCh)
if err != nil {
// No further fallback here — same disk, same outcome — so an
// InsufficientDiskError on the fallback surfaces its message directly.
m.fail(ctx, task, err.Error())
return
}
m.finalize(ctx, task, result)
return result, err
}
// finalize runs verify → organize → upgrade replacement → complete for a downloaded task.
func (m *Manager) finalize(ctx context.Context, task *Task, result *Result) {
// Verify
if err := task.Transition(StatusVerifying); err != nil {
m.fail(ctx, task, "transition error: "+err.Error())
// removeBrokenResult deletes a single-file result that failed the on-disk verify
// so the retry's downloader starts clean (debrid resumes from a partial via HTTP
// Range — appending to a truncated stub would compound the corruption). Multi-file
// (directory) results are left for the downloader/anacrolix to re-verify in place.
func removeBrokenResult(taskID string, result *Result) {
if result == nil || result.FilePath == "" {
return
}
if err := verify(result); err != nil {
m.fail(ctx, task, "verification failed: "+err.Error())
return
if fi, err := os.Stat(result.FilePath); err == nil && !fi.IsDir() {
if rmErr := os.Remove(result.FilePath); rmErr != nil {
log.Printf("[%s] failed to remove broken file %s: %v", agent.ShortID(taskID), result.FilePath, rmErr)
}
}
}
// finalizeVerified runs organize → upgrade replacement → complete for a download
// that already passed verify.
func (m *Manager) finalizeVerified(ctx context.Context, task *Task, result *Result) {
// Organize
if err := task.Transition(StatusOrganizing); err != nil {
m.fail(ctx, task, "transition error: "+err.Error())
@ -538,3 +574,16 @@ func (m *Manager) fail(ctx context.Context, task *Task, msg string) {
m.recordFinished(task.ToStatusUpdate())
m.reporter.ReportFinal(ctx, task)
}
// damagedErrorPrefix is a STABLE marker the web matches on (download_task.error_message)
// to render a "corrupt — re-download" affordance instead of a generic failure. Keep
// in sync with the web's detection (src/lib/services/agent.ts / downloads UI).
const damagedErrorPrefix = "corrupt download: "
// failDamaged marks a task failed after its bytes repeatedly failed the integrity
// check (truncated/short file, checksum/par2 failure). Same terminal path as fail,
// but with the damagedErrorPrefix so the web can surface a re-download CTA — the
// download_task table has no integrity column, so the message IS the signal.
func (m *Manager) failDamaged(ctx context.Context, task *Task, err error) {
m.fail(ctx, task, damagedErrorPrefix+err.Error())
}