From a5f3f0914a46564ec4a9ac7c3670c49455d338d0 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 17 Jun 2026 12:51:47 +0200 Subject: [PATCH] fix(engine): cross-backend integrity guard with retry-then-damaged A truncated debrid download (in-memory byte counter hit 100% while the NFS write-back silently dropped most of the bytes) was marked completed. The 1.1.6 fsync fix closed the debrid-specific hole; this generalizes the guarantee so "completed" never means a corrupt file on ANY backend. - IntegrityError + bounded retry: on a corrupt/short result the manager re-downloads the same source up to 3x (clean start), then surfaces the task as damaged ("corrupt download:" prefix) instead of completing it. - verify (size mismatch / empty), debrid (incomplete / post-write / flush), torrent (BytesMissing), usenet (par2 unrepairable / repair-failed) all classify integrity failures so they route through the retry/damaged path. - scanner: a file ffprobe can't read is emitted as a damaged library_item (reason "unreadable") instead of being silently dropped from the sync. - tests: manager retry-then-success + retry-exhausted-then-damaged, verifying->resolving transition, damaged sync item. --- internal/engine/debrid.go | 15 +- internal/engine/integrity.go | 41 +++++ internal/engine/manager.go | 191 ++++++++++++++-------- internal/engine/manager_integrity_test.go | 128 +++++++++++++++ internal/engine/task.go | 8 +- internal/engine/task_test.go | 2 +- internal/engine/torrent.go | 9 +- internal/engine/usenet.go | 10 +- internal/engine/verify.go | 11 +- internal/library/sync.go | 23 +++ internal/library/sync_test.go | 18 +- internal/usenet/postprocess/par2.go | 9 +- internal/usenet/postprocess/pipeline.go | 26 ++- 13 files changed, 400 insertions(+), 91 deletions(-) create mode 100644 internal/engine/integrity.go create mode 100644 internal/engine/manager_integrity_test.go diff --git a/internal/engine/debrid.go b/internal/engine/debrid.go index 9867255..644ab13 100644 --- a/internal/engine/debrid.go +++ b/internal/engine/debrid.go @@ -276,7 +276,11 @@ func (d *DebridDownloader) Download(ctx context.Context, task *Task, outputDir s // and we read fewer bytes, the transfer was truncated (e.g. a debrid CDN edge // closing the connection). Don't hand a short file to verify as if complete. if totalBytes > 0 && downloaded < totalBytes { - return nil, fmt.Errorf("incomplete download: got %s of %s", formatBytes(downloaded), formatBytes(totalBytes)) + // Integrity, not transport — the manager re-downloads. Keep the partial + // (NOT removed): the bytes written so far are sequentially correct, so the + // retry resumes via HTTP Range from where the stream was cut instead of + // re-fetching the whole file. + return nil, integrityErr("truncated", "incomplete download: got %s of %s", formatBytes(downloaded), formatBytes(totalBytes)) } // Force the OS to flush the file to durable storage BEFORE we report success. @@ -286,10 +290,13 @@ func (d *DebridDownloader) Download(ctx context.Context, task *Task, outputDir s // and rejects it ("size mismatch"). fsync surfaces a write-back error here, // where it's actionable, instead of silently truncating the file. if err := file.Sync(); err != nil { - return nil, fmt.Errorf("flush to disk failed (write-back/network-mount error): %w", err) + _ = closeFile() + _ = os.Remove(destPath) // uncertain on-disk state — drop it so the retry starts clean + return nil, integrityErr("flush_failed", "flush to disk failed (write-back/network-mount error): %v", err) } if err := closeFile(); err != nil { - return nil, fmt.Errorf("close file failed (write-back/network-mount error): %w", err) + _ = os.Remove(destPath) + return nil, integrityErr("flush_failed", "close file failed (write-back/network-mount error): %v", err) } // Safety net: after a durable flush, the on-disk size must match what we wrote. @@ -302,7 +309,7 @@ func (d *DebridDownloader) Download(ctx context.Context, task *Task, outputDir s if rmErr := os.Remove(destPath); rmErr != nil { log.Printf("[%s] failed to remove corrupt partial %s: %v", agent.ShortID(task.ID), destPath, rmErr) } - return nil, fmt.Errorf("post-write size mismatch: wrote %s but file is %s on disk — likely a stalled or failing storage mount (%s)", + return nil, integrityErr("truncated", "post-write size mismatch: wrote %s but file is %s on disk — likely a stalled or failing storage mount (%s)", formatBytes(downloaded), formatBytes(fi.Size()), outputDir) } diff --git a/internal/engine/integrity.go b/internal/engine/integrity.go new file mode 100644 index 0000000..a0456b2 --- /dev/null +++ b/internal/engine/integrity.go @@ -0,0 +1,41 @@ +package engine + +import ( + "errors" + "fmt" +) + +// IntegrityError marks a finished download whose bytes don't match what was +// expected: a truncated / short file, a checksum/par2 failure, or an on-disk +// size below the advertised length. It is DISTINCT from a transport failure +// (network drop, dead tracker) — on an IntegrityError the manager re-downloads +// the SAME source a bounded number of times (a fresh, clean-start attempt), and +// only after exhausting the retries surfaces the task as damaged. This is the +// cross-backend safety net that guarantees "completed" never means a corrupt +// file (incident: 2026-06-15 debrid NFS write-back truncation — a 20 MB stub of +// a 394 MB file was marked completed because nothing re-checked the on-disk size +// after the page-cache write-back silently dropped ~374 MB). +type IntegrityError struct { + // Reason is a stable short code surfaced to the web / logs: + // "truncated", "size_mismatch", "empty", "par2_failed", "flush_failed". + Reason string + Message string // human-readable detail +} + +func (e *IntegrityError) Error() string { + if e.Message != "" { + return e.Message + } + return "integrity check failed: " + e.Reason +} + +// IsIntegrity reports whether err is (or wraps) an IntegrityError. +func IsIntegrity(err error) bool { + var ie *IntegrityError + return errors.As(err, &ie) +} + +// integrityErr builds an IntegrityError with a printf-style message. +func integrityErr(reason, format string, args ...any) *IntegrityError { + return &IntegrityError{Reason: reason, Message: fmt.Sprintf(format, args...)} +} diff --git a/internal/engine/manager.go b/internal/engine/manager.go index 2b6d47f..15245dd 100644 --- a/internal/engine/manager.go +++ b/internal/engine/manager.go @@ -2,7 +2,9 @@ package engine import ( "context" + "fmt" "log" + "os" "sync" "sync/atomic" @@ -379,110 +381,144 @@ func (m *Manager) processTask(ctx context.Context, task *Task) { m.activeMu.Unlock() }() - // 1. Resolve method - if err := task.Transition(StatusResolving); err != nil { - m.fail(ctx, task, "transition error: "+err.Error()) - return - } + // On a corrupt/truncated result (a downloader's own integrity guard, or the + // shared on-disk verify below), re-download the SAME source a bounded number + // of times — a fresh clean-start attempt usually lands intact (the 2026-06-15 + // debrid NFS write-back truncation was transient). Only after exhausting the + // retries is the task surfaced as damaged, so "completed" NEVER means a corrupt + // file. (User-chosen "both" policy: auto-retry, then visible-damaged.) + const maxIntegrityAttempts = 3 - method, err := resolveMethod(ctx, task, m.downloaders, m.cfg.PreferredMethods) - if err != nil { - m.fail(ctx, task, "no method available: "+err.Error()) - return - } - - task.ResolvedMethod = method - log.Printf("[%s] resolved method: %s", agent.ShortID(task.ID), method) - - // 2. Download - if err := task.Transition(StatusDownloading); err != nil { - m.fail(ctx, task, "transition error: "+err.Error()) - return - } - - progressCh := make(chan Progress, 16) - - // Drain progress channel (just for logging; reporter reads directly from task) - go func() { - for range progressCh { - // Progress already applied via task.UpdateProgress in the downloader - } - }() - - dl := m.downloaders[method] - result, err := dl.Download(ctx, task, m.cfg.OutputDir, progressCh) - close(progressCh) - - if err != nil { - // A full disk is terminal — another source would fill the same disk, so - // skip the fallback and surface the clear message immediately. - if IsInsufficientDisk(err) { + for attempt := 1; ; attempt++ { + result, err := m.attemptDownload(ctx, task) + if err != nil { + if IsInsufficientDisk(err) { + // Terminal — another source would fill the same disk. + m.fail(ctx, task, err.Error()) + return + } + if IsIntegrity(err) { + if attempt < maxIntegrityAttempts { + log.Printf("[%s] integrity check failed (attempt %d/%d), re-downloading clean: %v", + agent.ShortID(task.ID), attempt, maxIntegrityAttempts, err) + continue + } + m.failDamaged(ctx, task, err) + return + } m.fail(ctx, task, err.Error()) return } - // Try fallback - if tryFallback(task, m.downloaders, m.cfg.PreferredMethods) { - log.Printf("[%s] %s failed, trying fallback: %v", agent.ShortID(task.ID), method, err) - if err := task.Transition(StatusResolving); err == nil { - m.processTaskRetry(ctx, task) + + // Shared on-disk safety net across every backend — the last line of defense + // against a truncated/short file slipping past a downloader's own checks. + if err := task.Transition(StatusVerifying); err != nil { + m.fail(ctx, task, "transition error: "+err.Error()) + return + } + if verr := verify(result); verr != nil { + if IsIntegrity(verr) { + removeBrokenResult(task.ID, result) // clean start so a resume doesn't append to a short file + if attempt < maxIntegrityAttempts { + log.Printf("[%s] verify failed (attempt %d/%d), re-downloading clean: %v", + agent.ShortID(task.ID), attempt, maxIntegrityAttempts, verr) + continue + } + m.failDamaged(ctx, task, verr) return } + m.fail(ctx, task, "verification failed: "+verr.Error()) + return } - m.fail(ctx, task, err.Error()) + + m.finalizeVerified(ctx, task, result) return } - - m.finalize(ctx, task, result) } -// processTaskRetry handles fallback after a method failure. -func (m *Manager) processTaskRetry(ctx context.Context, task *Task) { +// attemptDownload resolves a method and downloads once, falling back to the next +// configured method on a plain transport failure (NOT on disk-full or integrity +// failures — those are the caller's to handle). Returns the download Result. +func (m *Manager) attemptDownload(ctx context.Context, task *Task) (*Result, error) { + if err := task.Transition(StatusResolving); err != nil { + return nil, fmt.Errorf("transition error: %w", err) + } method, err := resolveMethod(ctx, task, m.downloaders, m.cfg.PreferredMethods) if err != nil { - m.fail(ctx, task, "fallback failed: "+err.Error()) - return + return nil, fmt.Errorf("no method available: %w", err) } - task.ResolvedMethod = method - log.Printf("[%s] fallback to: %s", agent.ShortID(task.ID), method) + log.Printf("[%s] resolved method: %s", agent.ShortID(task.ID), method) if err := task.Transition(StatusDownloading); err != nil { - m.fail(ctx, task, "transition error: "+err.Error()) - return + return nil, fmt.Errorf("transition error: %w", err) } + result, err := m.runDownload(ctx, task, method) + if err != nil { + // Disk-full is terminal; an integrity failure is retried in-place by the + // caller (same source, clean start) — don't burn the method fallback on + // either. Only a plain transport failure tries the next method. + if IsInsufficientDisk(err) || IsIntegrity(err) { + return nil, err + } + if tryFallback(task, m.downloaders, m.cfg.PreferredMethods) { + log.Printf("[%s] %s failed, trying fallback: %v", agent.ShortID(task.ID), method, err) + if terr := task.Transition(StatusResolving); terr != nil { + return nil, err + } + return m.attemptFallback(ctx, task) + } + return nil, err + } + return result, nil +} +// attemptFallback runs the next available method after a transport failure. +func (m *Manager) attemptFallback(ctx context.Context, task *Task) (*Result, error) { + method, err := resolveMethod(ctx, task, m.downloaders, m.cfg.PreferredMethods) + if err != nil { + return nil, fmt.Errorf("fallback failed: %w", err) + } + task.ResolvedMethod = method + log.Printf("[%s] fallback to: %s", agent.ShortID(task.ID), method) + if err := task.Transition(StatusDownloading); err != nil { + return nil, fmt.Errorf("transition error: %w", err) + } + return m.runDownload(ctx, task, method) +} + +// runDownload invokes a single downloader, draining its progress channel. +func (m *Manager) runDownload(ctx context.Context, task *Task, method DownloadMethod) (*Result, error) { progressCh := make(chan Progress, 16) + // Drain progress channel (reporter reads progress directly from the task). go func() { for range progressCh { } }() - dl := m.downloaders[method] result, err := dl.Download(ctx, task, m.cfg.OutputDir, progressCh) close(progressCh) - - if err != nil { - // No further fallback here — same disk, same outcome — so an - // InsufficientDiskError on the fallback surfaces its message directly. - m.fail(ctx, task, err.Error()) - return - } - - m.finalize(ctx, task, result) + return result, err } -// finalize runs verify → organize → upgrade replacement → complete for a downloaded task. -func (m *Manager) finalize(ctx context.Context, task *Task, result *Result) { - // Verify - if err := task.Transition(StatusVerifying); err != nil { - m.fail(ctx, task, "transition error: "+err.Error()) +// removeBrokenResult deletes a single-file result that failed the on-disk verify +// so the retry's downloader starts clean (debrid resumes from a partial via HTTP +// Range — appending to a truncated stub would compound the corruption). Multi-file +// (directory) results are left for the downloader/anacrolix to re-verify in place. +func removeBrokenResult(taskID string, result *Result) { + if result == nil || result.FilePath == "" { return } - if err := verify(result); err != nil { - m.fail(ctx, task, "verification failed: "+err.Error()) - return + if fi, err := os.Stat(result.FilePath); err == nil && !fi.IsDir() { + if rmErr := os.Remove(result.FilePath); rmErr != nil { + log.Printf("[%s] failed to remove broken file %s: %v", agent.ShortID(taskID), result.FilePath, rmErr) + } } +} +// finalizeVerified runs organize → upgrade replacement → complete for a download +// that already passed verify. +func (m *Manager) finalizeVerified(ctx context.Context, task *Task, result *Result) { // Organize if err := task.Transition(StatusOrganizing); err != nil { m.fail(ctx, task, "transition error: "+err.Error()) @@ -538,3 +574,16 @@ func (m *Manager) fail(ctx context.Context, task *Task, msg string) { m.recordFinished(task.ToStatusUpdate()) m.reporter.ReportFinal(ctx, task) } + +// damagedErrorPrefix is a STABLE marker the web matches on (download_task.error_message) +// to render a "corrupt — re-download" affordance instead of a generic failure. Keep +// in sync with the web's detection (src/lib/services/agent.ts / downloads UI). +const damagedErrorPrefix = "corrupt download: " + +// failDamaged marks a task failed after its bytes repeatedly failed the integrity +// check (truncated/short file, checksum/par2 failure). Same terminal path as fail, +// but with the damagedErrorPrefix so the web can surface a re-download CTA — the +// download_task table has no integrity column, so the message IS the signal. +func (m *Manager) failDamaged(ctx context.Context, task *Task, err error) { + m.fail(ctx, task, damagedErrorPrefix+err.Error()) +} diff --git a/internal/engine/manager_integrity_test.go b/internal/engine/manager_integrity_test.go new file mode 100644 index 0000000..e9c8a0c --- /dev/null +++ b/internal/engine/manager_integrity_test.go @@ -0,0 +1,128 @@ +package engine + +import ( + "context" + "os" + "path/filepath" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/torrentclaw/unarr/internal/agent" +) + +// truncatingMockDownloader writes a SHORT file (failing the on-disk verify) until +// goodOnAttempt, then writes a full file. reportedSize is what each Result claims, +// so verify() compares the advertised size against the (initially truncated) bytes +// on disk — the exact shape of the 2026-06-15 debrid NFS truncation. +type truncatingMockDownloader struct { + dir string + reportedSize int64 + goodOnAttempt int // 1-based attempt that finally writes a full file; 0 = never + callCount atomic.Int32 +} + +func (m *truncatingMockDownloader) Method() DownloadMethod { return MethodTorrent } +func (m *truncatingMockDownloader) Available(_ context.Context, _ *Task) (bool, error) { + return true, nil +} +func (m *truncatingMockDownloader) Download(_ context.Context, _ *Task, _ string, _ chan<- Progress) (*Result, error) { + n := int(m.callCount.Add(1)) + path := filepath.Join(m.dir, "movie.mkv") + size := int64(10) // truncated stub + if m.goodOnAttempt > 0 && n >= m.goodOnAttempt { + size = m.reportedSize + } + if err := os.WriteFile(path, make([]byte, size), 0o644); err != nil { + return nil, err + } + return &Result{FilePath: path, FileName: "movie.mkv", Method: MethodTorrent, Size: m.reportedSize}, nil +} +func (m *truncatingMockDownloader) Pause(_ string) error { return nil } +func (m *truncatingMockDownloader) Cancel(_ string) error { return nil } +func (m *truncatingMockDownloader) Shutdown(_ context.Context) error { return nil } + +// captureReporter builds a ProgressReporter over a mockStatusReporter we keep a +// handle to, so the test can read the final reported StatusUpdate. +func captureReporter() (*ProgressReporter, *mockStatusReporter) { + reporter := &mockStatusReporter{} + return &ProgressReporter{ + reporter: reporter, + interval: 50 * time.Millisecond, + latest: make(map[string]*Task), + lastReported: make(map[string]TaskStatus), + }, reporter +} + +func terminalUpdate(t *testing.T, r *mockStatusReporter, taskID string) agent.StatusUpdate { + t.Helper() + r.mu.Lock() + defer r.mu.Unlock() + for i := len(r.calls) - 1; i >= 0; i-- { + c := r.calls[i] + if c.TaskID == taskID && (c.Status == "completed" || c.Status == "failed") { + return c + } + } + t.Fatalf("no terminal (completed/failed) status update for %s", taskID) + return agent.StatusUpdate{} +} + +// A truncated download is re-tried clean and, once it lands intact, completes — +// "completed" is never reported for the corrupt attempt. +func TestManagerPipeline_IntegrityRetry_ThenSucceeds(t *testing.T) { + dir := t.TempDir() + pr, reporter := captureReporter() + dl := &truncatingMockDownloader{dir: dir, reportedSize: 10000, goodOnAttempt: 2} + + mgr := NewManager(ManagerConfig{MaxConcurrent: 1, OutputDir: dir}, pr, dl) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go pr.Run(ctx) + + const taskID = "integrity-retry-ok-123456" + mgr.Submit(ctx, agent.Task{ + ID: taskID, InfoHash: "abc123def456abc123def456abc123def456abc1", + Title: "Retry Test", PreferredMethod: "torrent", + }) + mgr.Wait() + + if got := dl.callCount.Load(); got != 2 { + t.Errorf("download attempts = %d, want 2 (1 truncated + 1 clean)", got) + } + if u := terminalUpdate(t, reporter, taskID); u.Status != "completed" { + t.Errorf("final status = %q (%s), want completed", u.Status, u.ErrorMessage) + } +} + +// A persistently-truncated download exhausts the bounded retries and is surfaced +// as damaged (failed + the stable corrupt-download marker), never completed. +func TestManagerPipeline_IntegrityRetry_ExhaustsThenDamaged(t *testing.T) { + dir := t.TempDir() + pr, reporter := captureReporter() + dl := &truncatingMockDownloader{dir: dir, reportedSize: 10000, goodOnAttempt: 0} + + mgr := NewManager(ManagerConfig{MaxConcurrent: 1, OutputDir: dir}, pr, dl) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go pr.Run(ctx) + + const taskID = "integrity-retry-bad-123456" + mgr.Submit(ctx, agent.Task{ + ID: taskID, InfoHash: "abc123def456abc123def456abc123def456abc1", + Title: "Damaged Test", PreferredMethod: "torrent", + }) + mgr.Wait() + + if got := dl.callCount.Load(); got != 3 { + t.Errorf("download attempts = %d, want 3 (bounded retries)", got) + } + u := terminalUpdate(t, reporter, taskID) + if u.Status != "failed" { + t.Fatalf("final status = %q, want failed", u.Status) + } + if !strings.HasPrefix(u.ErrorMessage, damagedErrorPrefix) { + t.Errorf("error message = %q, want prefix %q", u.ErrorMessage, damagedErrorPrefix) + } +} diff --git a/internal/engine/task.go b/internal/engine/task.go index 2685c6c..e9073fc 100644 --- a/internal/engine/task.go +++ b/internal/engine/task.go @@ -31,9 +31,11 @@ var validTransitions = map[TaskStatus][]TaskStatus{ StatusClaimed: {StatusResolving, StatusCancelled}, StatusResolving: {StatusDownloading, StatusFailed, StatusCancelled}, StatusDownloading: {StatusVerifying, StatusFailed, StatusResolving, StatusCancelled}, - StatusVerifying: {StatusOrganizing, StatusFailed}, - StatusOrganizing: {StatusSeeding, StatusCompleted}, - StatusSeeding: {StatusCompleted}, + // Verifying → Resolving: the on-disk verify found a truncated/corrupt file and + // the manager is re-downloading the same source (bounded integrity retry). + StatusVerifying: {StatusOrganizing, StatusFailed, StatusResolving}, + StatusOrganizing: {StatusSeeding, StatusCompleted}, + StatusSeeding: {StatusCompleted}, } // Task represents a download task with its full lifecycle state. diff --git a/internal/engine/task_test.go b/internal/engine/task_test.go index e99e339..564ab12 100644 --- a/internal/engine/task_test.go +++ b/internal/engine/task_test.go @@ -35,6 +35,7 @@ func TestTransitionValid(t *testing.T) { {StatusResolving, StatusDownloading}, {StatusDownloading, StatusVerifying}, {StatusVerifying, StatusOrganizing}, + {StatusVerifying, StatusResolving}, // integrity retry: re-download after a failed on-disk verify {StatusOrganizing, StatusCompleted}, } @@ -60,7 +61,6 @@ func TestTransitionInvalid(t *testing.T) { {StatusClaimed, StatusCompleted}, {StatusCompleted, StatusDownloading}, {StatusFailed, StatusCompleted}, - {StatusVerifying, StatusResolving}, } for _, tt := range invalid { diff --git a/internal/engine/torrent.go b/internal/engine/torrent.go index acd1838..f6ca140 100644 --- a/internal/engine/torrent.go +++ b/internal/engine/torrent.go @@ -534,11 +534,18 @@ func (d *TorrentDownloader) pollDownload(ctx context.Context, t *torrent.Torrent default: // don't block if channel full } - // Check completion + // Check completion. BytesCompleted counts only SHA1-VERIFIED pieces, so + // torrent content can't be silently truncated — but assert nothing is + // still missing (selective-file accounting, a piece that failed its last + // hash) before declaring done. A non-zero remainder is an integrity + // failure → the manager re-downloads (anacrolix re-checks pieces). if downloaded >= totalBytes { if isTTY { fmt.Fprintln(os.Stderr) // newline after \r progress } + if missing := t.BytesMissing(); missing > 0 { + return nil, integrityErr("truncated", "torrent reported complete but %s of verified pieces are still missing", formatBytes(missing)) + } log.Printf("[%s] download complete: %s", task.ID[:8], fileName) return &Result{}, nil } diff --git a/internal/engine/usenet.go b/internal/engine/usenet.go index e452801..af3b181 100644 --- a/internal/engine/usenet.go +++ b/internal/engine/usenet.go @@ -277,10 +277,16 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s log.Printf("[%s] extracted archive", shortID) } if ppResult.VerifyNote != "" { - // Degraded verification (par2 missing / repair failed): surface it loudly - // so the delivered file isn't silently assumed good. + // Degraded verification (par2 missing / transient probe error): surface it + // loudly so the delivered file isn't silently assumed good. log.Printf("[%s] WARNING: %s", shortID, ppResult.VerifyNote) } + if ppResult.Corrupt { + // par2 DEFINITIVELY confirmed unrepairable damage — fail as an integrity + // error so the manager re-downloads clean instead of completing a corrupt + // release (symmetric with the debrid/torrent guards). + return nil, integrityErr("par2_failed", "usenet delivery is corrupt: %s", ppResult.VerifyNote) + } finalPath := ppResult.FinalPath if finalPath == "" { diff --git a/internal/engine/verify.go b/internal/engine/verify.go index 598f4cb..0dbc8e9 100644 --- a/internal/engine/verify.go +++ b/internal/engine/verify.go @@ -29,14 +29,19 @@ func verify(result *Result) error { } if actualSize == 0 { - return fmt.Errorf("download is empty: %s", result.FilePath) + // Integrity, not transport: a zero-byte result is corrupt — let the manager + // re-download clean rather than surface an empty file as completed. + return integrityErr("empty", "download is empty: %s", result.FilePath) } - // If we know the expected size, check within 2% tolerance + // If we know the expected size, check within 2% tolerance (container/muxing + // overhead). A shortfall beyond that is a truncated/corrupt file — classify it + // as an IntegrityError so the manager re-downloads clean instead of completing + // a half file (the last line of defense across every backend). if result.Size > 0 { tolerance := int64(float64(result.Size) * 0.02) if actualSize < result.Size-tolerance { - return fmt.Errorf("size mismatch: expected %d, got %d", result.Size, actualSize) + return integrityErr("size_mismatch", "size mismatch: expected %d, got %d", result.Size, actualSize) } } diff --git a/internal/library/sync.go b/internal/library/sync.go index c2bef14..78dc112 100644 --- a/internal/library/sync.go +++ b/internal/library/sync.go @@ -89,6 +89,29 @@ func BuildSyncItems(cache *LibraryCache) []agent.LibrarySyncItem { items := make([]agent.LibrarySyncItem, 0, len(cache.Items)) for _, item := range cache.Items { if item.ScanError != "" { + // A file ffprobe can't read is almost always a truncated/corrupt + // download (2026-06-15 NFS write-back truncation). Previously these were + // silently dropped — the file vanished from the library with no trace. + // Emit a minimal DAMAGED row instead so the web flags it (badge + + // blocked playback + re-download) rather than hiding it. All fields below + // are populated before ffprobe runs, so they're valid even on scan error. + // The scanner re-probes damaged items every scan, so a clean re-download + // to the same path self-heals the verdict. + items = append(items, agent.LibrarySyncItem{ + FilePath: item.FilePath, + FileName: item.FileName, + FileSize: item.FileSize, + Title: item.Title, + Year: item.Year, + ContentType: DeriveContentType(item), + Season: item.Season, + Episode: item.Episode, + Fingerprint: item.Fingerprint, + RelPath: relToRoot(cache.Path, item.FilePath), + LibraryRootKey: "library", + Integrity: "damaged", + IntegrityReason: "unreadable", + }) continue } si := agent.LibrarySyncItem{ diff --git a/internal/library/sync_test.go b/internal/library/sync_test.go index fe7a113..b7c79a0 100644 --- a/internal/library/sync_test.go +++ b/internal/library/sync_test.go @@ -52,8 +52,10 @@ func TestBuildSyncItems(t *testing.T) { items := BuildSyncItems(cache) - if len(items) != 2 { - t.Fatalf("expected 2 items (1 skipped), got %d", len(items)) + // 3 items: the movie, the show, and the scan-error file surfaced as DAMAGED + // (no longer silently dropped — the web flags it for re-download). + if len(items) != 3 { + t.Fatalf("expected 3 items (1 damaged), got %d", len(items)) } // First item: movie with full media info @@ -97,6 +99,18 @@ func TestBuildSyncItems(t *testing.T) { if show.Resolution != "" { t.Errorf("resolution should be empty, got %q", show.Resolution) } + + // Third item: scan-error file surfaced as damaged (unreadable), not skipped. + damaged := items[2] + if damaged.FilePath != "/media/bad.mkv" { + t.Errorf("damaged filePath = %q, want /media/bad.mkv", damaged.FilePath) + } + if damaged.Integrity != "damaged" { + t.Errorf("integrity = %q, want damaged", damaged.Integrity) + } + if damaged.IntegrityReason != "unreadable" { + t.Errorf("integrityReason = %q, want unreadable", damaged.IntegrityReason) + } } func TestBuildSyncItemsEmpty(t *testing.T) { diff --git a/internal/usenet/postprocess/par2.go b/internal/usenet/postprocess/par2.go index e97b860..253f475 100644 --- a/internal/usenet/postprocess/par2.go +++ b/internal/usenet/postprocess/par2.go @@ -14,6 +14,13 @@ import ( // be checked is delivered UNVERIFIED, not verified. var ErrPar2NotInstalled = errors.New("par2 not installed") +// ErrPar2Unrepairable is returned by Par2Verify when parity confirms the data is +// damaged AND par2 reports repair is not possible — the file is definitively +// corrupt (distinct from a transient par2 probe error). The pipeline marks the +// delivery Corrupt so the engine treats it as an integrity failure and +// re-downloads, rather than shipping a broken file with a soft warning. +var ErrPar2Unrepairable = errors.New("par2: verification failed and repair not possible") + // par2Lookup probes whether the par2 binary is on PATH. It's a package var so // tests can simulate a missing binary without touching the real PATH. var par2Lookup = func() bool { @@ -42,7 +49,7 @@ func Par2Verify(par2File string) error { return &Par2RepairableError{Par2File: par2File} } if strings.Contains(outStr, "Repair is not possible") { - return fmt.Errorf("par2: verification failed and repair not possible:\n%s", outStr) + return fmt.Errorf("%w:\n%s", ErrPar2Unrepairable, outStr) } return fmt.Errorf("par2 verify: %w\n%s", err, outStr) } diff --git a/internal/usenet/postprocess/pipeline.go b/internal/usenet/postprocess/pipeline.go index 599d7b8..6beb70d 100644 --- a/internal/usenet/postprocess/pipeline.go +++ b/internal/usenet/postprocess/pipeline.go @@ -23,6 +23,12 @@ type Result struct { // the file is unverified rather than silently assuming it's good. Empty means // either "verified OK" or "no parity shipped" — both are non-degraded. VerifyNote string + // Corrupt is true when par2 DEFINITIVELY confirmed the data is damaged and it + // could not be repaired (repair failed, or corruption detected with no par2 + // binary to fix it). The engine treats this as an integrity failure and + // re-downloads — distinct from VerifyNote's softer "unverified but delivered" + // (e.g. no parity shipped, or a transient probe error). + Corrupt bool } // Options configures post-processing behavior. @@ -58,14 +64,28 @@ func Process(dir string, downloadedFiles map[string]string, opts Options) (*Resu result.Repaired = true log.Printf("[usenet] par2: repair successful") case errors.Is(repairErr, ErrPar2NotInstalled): - result.VerifyNote = "par2 corruption detected but `par2` is not installed — cannot repair, delivered POSSIBLY CORRUPT" + // Damage confirmed by parity, but no binary to repair it — the + // delivered file IS corrupt. Mark it so the engine re-downloads. + result.Corrupt = true + result.VerifyNote = "par2 corruption detected but `par2` is not installed — cannot repair, file is CORRUPT" log.Printf("[usenet] WARNING: %s", result.VerifyNote) default: - result.VerifyNote = fmt.Sprintf("par2 repair failed — file may be corrupt: %v", repairErr) + // Repair attempted and failed — the data is damaged beyond recovery. + result.Corrupt = true + result.VerifyNote = fmt.Sprintf("par2 repair failed — file is corrupt: %v", repairErr) log.Printf("[usenet] WARNING: %s", result.VerifyNote) } + case errors.Is(err, ErrPar2Unrepairable): + // Parity confirmed the data is damaged and unrepairable — definitively + // corrupt (NOT a transient probe error). Engine re-downloads. + result.Corrupt = true + result.VerifyNote = fmt.Sprintf("par2: file is corrupt and cannot be repaired: %v", err) + log.Printf("[usenet] WARNING: %s", result.VerifyNote) default: - result.VerifyNote = fmt.Sprintf("par2 verification error — file may be corrupt: %v", err) + // A transient par2 probe/exec error (binary crash, I/O hiccup) — can't + // confirm corruption, so deliver UNVERIFIED with a loud note rather than + // nuking a possibly-good file. + result.VerifyNote = fmt.Sprintf("par2 verification error — file unverified: %v", err) log.Printf("[usenet] WARNING: %s", result.VerifyNote) } }