diff --git a/internal/agent/client.go b/internal/agent/client.go index aceeb66..97e4943 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -131,6 +131,32 @@ func (c *Client) MarkSessionReady(ctx context.Context, sessionID string, health return nil } +// ReportSessionError is the failure-path counterpart of MarkSessionReady: it +// tells the web a streaming session can NOT start (file gone, path rejected, +// ffmpeg missing, spawn failure…). The web marks the session failed, pushes an +// SSE "failed" event so the player stops probing a playlist that will never +// exist, and self-heals stale library state on code "file_missing". +// +// code is one of the stable machine codes the web understands: +// "file_missing" | "path_rejected" | "no_video_file" | "ffmpeg_unavailable" | +// "start_failed". message is free-form detail for diagnostics. +// +// Best-effort like MarkSessionReady: on older web deployments without the +// endpoint this 404s — the caller logs and the player falls back to its +// probe-deadline behaviour, exactly as before this channel existed. +func (c *Client) ReportSessionError(ctx context.Context, sessionID, code, message string) error { + req := struct { + SessionID string `json:"sessionId"` + Code string `json:"code"` + Message string `json:"message,omitempty"` + }{SessionID: sessionID, Code: code, Message: message} + var resp StatusResponse + if err := c.doPost(ctx, "/api/internal/agent/session-error", req, &resp); err != nil { + return fmt.Errorf("report session error: %w", err) + } + return nil +} + // SessionHealth is an OPTIONAL live-transcode health snapshot attached to a // session-ready report (F3). A nil *SessionHealth means the agent has no // telemetry to share (cache hit, direct-play, or progress not yet stable) and diff --git a/internal/agent/types.go b/internal/agent/types.go index f32929e..d1f5400 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -361,6 +361,17 @@ type LibrarySyncRequest struct { AgentID string `json:"agentId,omitempty"` // lets the server scope stale-cleanup per agent IsLastBatch bool `json:"isLastBatch"` SyncStartedAt string `json:"syncStartedAt,omitempty"` // ISO-8601; same for all batches in a session + // ScanRoots lists EVERY root this sync session covered (a session spans all + // roots since 1.0.9 — one syncStartedAt, one isLastBatch). The server scopes + // stale-row cleanup of a partial session to these prefixes. Older servers + // ignore the field and fall back to ScanPath. + ScanRoots []string `json:"scanRoots,omitempty"` + // FullCycle marks a session that covered every root the agent scans + // (daemon auto-scan, `unarr scan` without args). The server may then reap + // unseen rows REGARDLESS of path prefix — old-base-path ghost rows + // included. Must stay false for a manual subtree scan or when any root's + // scan failed, or the cleanup would reap rows the session never visited. + FullCycle bool `json:"fullCycle,omitempty"` } // LibrarySyncItem is a single scanned media file with ffprobe metadata. diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 48bc5d9..ffb147e 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -632,65 +632,19 @@ func runDaemonStart() error { }() } - allowedRoots := streamAllowedRoots(cfg) - - filePath := filepath.Clean(sr.FilePath) // Self-heal a base-path mismatch: the web may hand us a path under an old // root (e.g. /mnt/nas/peliculas/… from before a binary→docker move) that // is now outside our allowed dirs but whose file still exists under a - // current root (/downloads/…). Remap the path's tail onto an allowed root - // so playback works immediately; the next re-scan persists the fix to the - // DB. See docs/plans/unarr-path-resilience.md. - if !isAllowedStreamPath(filePath, allowedRoots...) { - if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" { - log.Printf("[%s] stream self-heal: remapped %s → %s", agent.ShortID(sr.TaskID), filePath, remapped) - filePath = remapped - } else { - log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath) - reportStreamError(fmt.Sprintf("path outside allowed dirs: %s", filePath)) - return - } - } - // os.Stat over NFS can transiently fail (ESTALE/EAGAIN/timeout) right - // after a remount or under load. Retry a few times before giving up so - // a hiccup doesn't surface as a spurious "file not found" — this is the - // root of the intermittent "works on the 3rd try" stream failures. - var info os.FileInfo - var statErr error - for attempt := 0; attempt < 3; attempt++ { - if info, statErr = os.Stat(filePath); statErr == nil { - break - } - if attempt < 2 { - time.Sleep(300 * time.Millisecond) - } - } - if statErr != nil { - // Last resort before failing: the file may simply have moved within - // an allowed root — try to relocate it by path tail. - if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" { - log.Printf("[%s] stream self-heal: relocated missing %s → %s", agent.ShortID(sr.TaskID), filePath, remapped) - filePath = remapped - info, statErr = os.Stat(filePath) - } - } - if statErr != nil { - log.Printf("[%s] stream request: file not found after retries: %s (%v)", agent.ShortID(sr.TaskID), filePath, statErr) - reportStreamError(fmt.Sprintf("file not found: %s", filePath)) + // current root (/downloads/…). resolvePlayableFile remaps, stat-retries + // (NFS) and resolves directories; the next re-scan persists the fix to + // the DB. See docs/plans/unarr-path-resilience.md. + filePath, errCode, perr := resolvePlayableFile(sr.FilePath, streamAllowedRoots(cfg), agent.ShortID(sr.TaskID)) + if perr != nil { + log.Printf("[%s] stream request rejected (%s): %v", agent.ShortID(sr.TaskID), errCode, perr) + reportStreamError(perr.Error()) return } - if info.IsDir() { - found := engine.FindVideoFile(filePath) - if found == "" { - log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath) - reportStreamError(fmt.Sprintf("no video file in directory: %s", filePath)) - return - } - filePath = found - log.Printf("[%s] resolved directory to video file: %s", agent.ShortID(sr.TaskID), filepath.Base(filePath)) - } - cancelStreamContexts() streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID) log.Printf("[%s] streaming from disk: %s → %s", agent.ShortID(sr.TaskID), filepath.Base(filePath), streamSrv.URL()) @@ -720,6 +674,24 @@ func runDaemonStart() error { return // already running } + // failSession logs AND reports a startup failure to the web — every + // abort path in this handler must go through it. A silent `return` + // here left the player probing a playlist that would never exist + // until its 30s deadline (incident 2026-06-10: deleted file + stale + // library row = eternal "Preparando sesión"). Best-effort: on old web + // deployments the endpoint 404s and the player falls back to the + // probe deadline, exactly as before. + failSession := func(sessionID, code, message string) { + log.Printf("[hls %s] failed (%s): %s", agent.ShortID(sessionID), code, message) + go func() { + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + if err := agentClient.ReportSessionError(rctx, sessionID, code, message); err != nil { + log.Printf("[hls %s] session error report failed: %v", agent.ShortID(sessionID), err) + } + }() + } + // startHLSPlayback starts an HLS encode (local file or debrid URL) and // wires it into the StreamServer. Shared by the local-file HLS path and // the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe @@ -771,7 +743,7 @@ func runDaemonStart() error { if err != nil { playerSessionRegistry.remove(hlsCfg.SessionID) hlsCancel() - log.Printf("[hls %s] start failed: %v", agent.ShortID(hlsCfg.SessionID), err) + failSession(hlsCfg.SessionID, "start_failed", err.Error()) return } if prewarm { @@ -811,7 +783,7 @@ func runDaemonStart() error { provider, perr := engine.NewDebridFileProvider(bctx, sess.DirectURL, sess.FileName, sess.FileSize, refresh) if perr != nil { playerSessionRegistry.remove(sess.SessionID) - log.Printf("[stream %s] debrid provider failed: %v", agent.ShortID(sess.SessionID), perr) + failSession(sess.SessionID, "start_failed", fmt.Sprintf("debrid provider: %v", perr)) return } streamSrv.SetFile(provider, sess.TaskID) @@ -834,7 +806,7 @@ func runDaemonStart() error { if sess.DirectURL != "" { // playMethod == "hls" implied (2a returned above) tcRuntime := buildTranscodeRuntime(ctx, cfg) if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { - log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable (debrid HLS)", agent.ShortID(sess.SessionID)) + failSession(sess.SessionID, "ffmpeg_unavailable", "ffmpeg/ffprobe unavailable (debrid HLS)") return } hlsCtx, hlsCancel := context.WithCancel(ctx) @@ -860,43 +832,22 @@ func runDaemonStart() error { return } - filePath := sess.FilePath - if filePath == "" { - log.Printf("[hls %s] rejected: empty file path", agent.ShortID(sess.SessionID)) + if sess.FilePath == "" { + failSession(sess.SessionID, "start_failed", "empty file path") return } - filePath = filepath.Clean(filePath) - // Apply the SAME base-path self-heal remap as the raw /stream handler - // (OnStreamRequest above). Without it, a path under an old/host base + // SAME base-path self-heal + stat-retry + dir resolution as the raw + // /stream handler (resolvePlayableFile). A path under an old/host base // (e.g. /mnt/nas/peliculas/… handed by the web while this docker agent - // mounts that media at /downloads) is rejected here even though the raw - // path self-heals it — so the web silently falls back to the raw stream - // and HLS/remux never runs (no transcode, slow funnel start). NOTE: this - // replicates only the lexical-remap; the raw handler additionally retries - // os.Stat for transient NFS errors. The HLS dir-check below proceeds (not - // rejects) on a stat error, so it tolerates an NFS blip differently. + // mounts that media at /downloads) remaps onto the current root; a path + // whose file is genuinely gone fails fast as "file_missing" so the web + // can prune the stale library row and the player can fall back, instead + // of the player probing a playlist that will never exist. // See docs/plans/unarr-path-resilience.md. - hlsAllowedRoots := streamAllowedRoots(cfg) - if !isAllowedStreamPath(filePath, hlsAllowedRoots...) { - if remapped := relocateUnreachable(filePath, hlsAllowedRoots); remapped != "" { - log.Printf("[hls %s] self-heal: remapped %s → %s", - agent.ShortID(sess.SessionID), filePath, remapped) - filePath = remapped - } else { - log.Printf("[hls %s] rejected: path outside allowed dirs: %s", - agent.ShortID(sess.SessionID), filePath) - return - } - } - // Resolve directory → first video file (matches StreamRequest behavior). - if info, err := os.Stat(filePath); err == nil && info.IsDir() { - found := engine.FindVideoFile(filePath) - if found == "" { - log.Printf("[hls %s] rejected: no video file in dir %s", - agent.ShortID(sess.SessionID), filePath) - return - } - filePath = found + filePath, errCode, perr := resolvePlayableFile(sess.FilePath, streamAllowedRoots(cfg), "hls "+agent.ShortID(sess.SessionID)) + if perr != nil { + failSession(sess.SessionID, errCode, perr.Error()) + return } // Direct-play (hueco #3 / 3a): the web decided this source is already @@ -925,7 +876,7 @@ func runDaemonStart() error { tcRuntime := buildTranscodeRuntime(ctx, cfg) if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { - log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID)) + failSession(sess.SessionID, "ffmpeg_unavailable", "ffmpeg/ffprobe unavailable") return } @@ -939,7 +890,7 @@ func runDaemonStart() error { probe, perr := engine.ProbeFile(probeCtx, tcRuntime.FFprobePath, filePath) cancelProbe() if perr != nil { - log.Printf("[stream %s] remux probe failed: %v", agent.ShortID(sess.SessionID), perr) + failSession(sess.SessionID, "start_failed", fmt.Sprintf("remux probe: %v", perr)) return } tProbe := time.Now() @@ -947,7 +898,7 @@ func runDaemonStart() error { src, serr := engine.NewRemuxSource(remuxCtx, filePath, probe, tcRuntime.FFmpegPath, sess.FileName) if serr != nil { remuxCancel() - log.Printf("[stream %s] remux start failed: %v", agent.ShortID(sess.SessionID), serr) + failSession(sess.SessionID, "start_failed", fmt.Sprintf("remux start: %v", serr)) return } streamSrv.SetGrowingFile(src, sess.TaskID) @@ -1201,6 +1152,79 @@ func relocateUnreachable(filePath string, allowedRoots []string) string { return "" } +// Stable machine codes for the web's session-error channel +// (POST /api/internal/agent/session-error). Only "file_missing" triggers +// destructive self-heal on the web (it prunes the stale library row + task +// pointer), so the resolver must never return it while the file may exist. +const ( + pathErrRejected = "path_rejected" + pathErrMissing = "file_missing" + pathErrNoVideo = "no_video_file" +) + +// resolvePlayableFile validates and self-heals a web-provided source path into +// a playable on-disk video file. Shared by the raw /stream handler and every +// session transport (HLS / remux / direct-play) so they all behave +// identically — before this, the HLS path replicated only the lexical remap +// and silently diverged on stat retries (docs/plans/unarr-path-resilience.md): +// +// 1. Containment: the cleaned path must live under an allowed root; if not, +// relocate it by path tail (old base path → current mount). +// 2. Existence: os.Stat with retries (NFS can transiently fail right after a +// remount or under load — the root of the "works on the 3rd try" stream +// failures), then one last relocate for files that moved within a root. +// 3. Directories resolve to their first contained video file. +// +// On failure returns a stable errCode: "path_rejected" means the file EXISTS +// at the original path but outside every allowed root (an agent config +// problem — the web must NOT prune library rows over it); "file_missing" +// means no readable file was found anywhere; "no_video_file" is a directory +// with nothing playable inside. +func resolvePlayableFile(rawPath string, allowedRoots []string, logLabel string) (string, string, error) { + filePath := filepath.Clean(rawPath) + if !isAllowedStreamPath(filePath, allowedRoots...) { + if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" { + log.Printf("[%s] stream self-heal: remapped %s → %s", logLabel, filePath, remapped) + filePath = remapped + } else if _, err := os.Stat(filePath); err == nil { + return "", pathErrRejected, fmt.Errorf("path outside allowed dirs: %s", filePath) + } else { + return "", pathErrMissing, fmt.Errorf("file not found under any allowed root: %s", filePath) + } + } + var info os.FileInfo + var statErr error + for attempt := 0; attempt < 3; attempt++ { + if info, statErr = os.Stat(filePath); statErr == nil { + break + } + if attempt < 2 { + time.Sleep(300 * time.Millisecond) + } + } + if statErr != nil { + // Last resort before failing: the file may simply have moved within + // an allowed root — try to relocate it by path tail. + if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" { + log.Printf("[%s] stream self-heal: relocated missing %s → %s", logLabel, filePath, remapped) + filePath = remapped + info, statErr = os.Stat(filePath) + } + } + if statErr != nil { + return "", pathErrMissing, fmt.Errorf("file not found after retries: %s (%v)", filePath, statErr) + } + if info.IsDir() { + found := engine.FindVideoFile(filePath) + if found == "" { + return "", pathErrNoVideo, fmt.Errorf("no video file in directory: %s", filePath) + } + log.Printf("[%s] resolved directory to video file: %s", logLabel, filepath.Base(found)) + filePath = found + } + return filePath, "", nil +} + func formatSpeedLog(bps int64) string { switch { case bps >= 1024*1024*1024: @@ -1291,19 +1315,26 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, } } - // Scan each path independently and sync per path so the server can - // scope stale-item deletion to the correct directory prefix. - const batchSize = 100 - totalSynced := 0 + // Scan every path, then sync ALL of them as ONE session (single + // syncStartedAt + final isLastBatch via library.SyncBatches). Per-root + // sessions let the server's per-agent stale cleanup reap rows of roots + // a session never visited; one full-cycle session makes the cleanup + // sound AND lets it reap old-base-path ghost rows (fullCycle=true — + // only when every root scanned cleanly). + var syncItems []agent.LibrarySyncItem + var coveredRoots []string + fullCycle := true var mergedItems []library.LibraryItem for _, scanPath := range scanPaths { cache, err := library.Scan(ctx, scanPath, existing, scanOpts) if err != nil { log.Printf("[auto-scan] scan failed for %s: %v", scanPath, err) + fullCycle = false continue } mergedItems = append(mergedItems, cache.Items...) + coveredRoots = append(coveredRoots, scanPath) if prewarmFFmpeg != "" { library.PrewarmSidecars(ctx, cache, library.PrewarmOptions{ @@ -1323,28 +1354,28 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, log.Printf("[auto-scan] no items under %s", scanPath) continue } + syncItems = append(syncItems, items...) + } - syncStartedAt := time.Now().UTC().Format(time.RFC3339) - for i := 0; i < len(items); i += batchSize { - end := i + batchSize - if end > len(items) { - end = len(items) - } - isLast := end >= len(items) - - _, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{ - Items: items[i:end], - ScanPath: scanPath, - AgentID: cfg.Agent.ID, - IsLastBatch: isLast, - SyncStartedAt: syncStartedAt, - }) - if err != nil { - log.Printf("[auto-scan] sync failed for %s: %v", scanPath, err) - break - } + totalSynced := 0 + if len(syncItems) > 0 { + res, err := library.SyncBatches(ctx, ac, syncItems, library.SyncOptions{ + AgentID: cfg.Agent.ID, + ScanPath: coveredRoots[0], + ScanRoots: coveredRoots, + FullCycle: fullCycle, + }) + if err != nil { + log.Printf("[auto-scan] sync failed: %v", err) + } else if res.Removed > 0 { + log.Printf("[auto-scan] server removed %d stale item(s)", res.Removed) } - totalSynced += len(items) + totalSynced = res.Synced + } else { + // An entirely-empty library can't open a sync session (the server + // requires ≥1 item per batch), so stale rows survive until a file + // reappears — same trade-off as before, now explicit. + log.Printf("[auto-scan] no items under any scan path — skipping sync") } // Save merged cache for incremental scanning next time. diff --git a/internal/cmd/resolve_playable_test.go b/internal/cmd/resolve_playable_test.go new file mode 100644 index 0000000..c027126 --- /dev/null +++ b/internal/cmd/resolve_playable_test.go @@ -0,0 +1,98 @@ +package cmd + +import ( + "os" + "path/filepath" + "testing" +) + +func TestResolvePlayableFile(t *testing.T) { + root := t.TempDir() + mkfile(t, filepath.Join(root, "Acme Show", "Season 01", "ep.mkv")) + roots := []string{root} + + t.Run("allowed path resolves to itself", func(t *testing.T) { + want := filepath.Join(root, "Acme Show", "Season 01", "ep.mkv") + got, code, err := resolvePlayableFile(want, roots, "test") + if err != nil { + t.Fatalf("unexpected error (%s): %v", code, err) + } + if got != want { + t.Errorf("got %q want %q", got, want) + } + }) + + t.Run("old base path relocates onto current root", func(t *testing.T) { + got, code, err := resolvePlayableFile("/old/base/Acme Show/Season 01/ep.mkv", roots, "test") + if err != nil { + t.Fatalf("unexpected error (%s): %v", code, err) + } + want := filepath.Join(root, "Acme Show", "Season 01", "ep.mkv") + if got != want { + t.Errorf("got %q want %q", got, want) + } + }) + + t.Run("deleted file under old base is file_missing, never path_rejected", func(t *testing.T) { + // The incident shape (2026-06-10): web hands a stale host path + // (/mnt/nas/…) whose file was deleted — the docker agent can't see the + // original path AND no tail relocates. file_missing tells the web to + // prune the stale row; path_rejected would block that self-heal. + _, code, err := resolvePlayableFile("/old/base/Acme Show/Season 01/gone.mkv", roots, "test") + if err == nil { + t.Fatal("expected error for deleted file") + } + if code != pathErrMissing { + t.Errorf("code = %q, want %q", code, pathErrMissing) + } + }) + + t.Run("existing file outside roots is path_rejected", func(t *testing.T) { + outside := t.TempDir() + // 1-segment-deep on purpose: a ≥3-segment tail could legitimately + // relocate INTO the root if a same-named file existed there. + mkfile(t, filepath.Join(outside, "leak.mkv")) + _, code, err := resolvePlayableFile(filepath.Join(outside, "leak.mkv"), roots, "test") + if err == nil { + t.Fatal("expected error for out-of-root file") + } + if code != pathErrRejected { + t.Errorf("code = %q, want %q", code, pathErrRejected) + } + }) + + t.Run("missing file inside an allowed root is file_missing", func(t *testing.T) { + _, code, err := resolvePlayableFile(filepath.Join(root, "Acme Show", "Season 01", "gone.mkv"), roots, "test") + if err == nil { + t.Fatal("expected error for missing file") + } + if code != pathErrMissing { + t.Errorf("code = %q, want %q", code, pathErrMissing) + } + }) + + t.Run("directory resolves to its video file", func(t *testing.T) { + got, code, err := resolvePlayableFile(filepath.Join(root, "Acme Show", "Season 01"), roots, "test") + if err != nil { + t.Fatalf("unexpected error (%s): %v", code, err) + } + want := filepath.Join(root, "Acme Show", "Season 01", "ep.mkv") + if got != want { + t.Errorf("got %q want %q", got, want) + } + }) + + t.Run("directory without video is no_video_file", func(t *testing.T) { + empty := filepath.Join(root, "Empty Show") + if err := os.MkdirAll(empty, 0o755); err != nil { + t.Fatal(err) + } + _, code, err := resolvePlayableFile(empty, roots, "test") + if err == nil { + t.Fatal("expected error for empty directory") + } + if code != pathErrNoVideo { + t.Errorf("code = %q, want %q", code, pathErrNoVideo) + } + }) +} diff --git a/internal/cmd/scan.go b/internal/cmd/scan.go index 754ebc1..baf15d2 100644 --- a/internal/cmd/scan.go +++ b/internal/cmd/scan.go @@ -9,7 +9,6 @@ import ( "sort" "strings" "syscall" - "time" "github.com/fatih/color" "github.com/spf13/cobra" @@ -40,20 +39,40 @@ to see available quality upgrades.`, if showStatus { return runScanStatus() } + cfg := loadConfig() + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + + // All scanned roots feed ONE sync session (single syncStartedAt + + // final isLastBatch) so the server's stale-row cleanup sees the + // whole cycle at once. fullCycle only without an explicit path — + // a subtree scan must never let the server reap outside it. if len(args) == 0 { - cfg := loadConfig() paths := library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath) if len(paths) == 0 { return fmt.Errorf("usage: unarr scan \n\nNo scan paths configured. Provide a path or set up downloads.dir via 'unarr init'") } + var items []agent.LibrarySyncItem for _, p := range paths { - if err := runScan(p, workers, ffprobe, noSync); err != nil { + cache, err := runScan(ctx, cfg, p, workers, ffprobe) + if err != nil { return err } + items = append(items, library.BuildSyncItems(cache)...) } + if noSync || jsonOut { + return nil + } + return syncToServer(ctx, cfg, items, paths, true) + } + cache, err := runScan(ctx, cfg, args[0], workers, ffprobe) + if err != nil { + return err + } + if noSync || jsonOut { return nil } - return runScan(args[0], workers, ffprobe, noSync) + return syncToServer(ctx, cfg, library.BuildSyncItems(cache), []string{args[0]}, false) }, } @@ -65,18 +84,20 @@ to see available quality upgrades.`, return cmd } -func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error { +// runScan walks one root, saves the cache and prewarms sidecars. Syncing to +// the server is the CALLER's job (RunE) — all roots of an invocation feed one +// sync session via syncToServer, so per-root sessions can't trick the server +// into reaping rows of roots the session never visited. +func runScan(ctx context.Context, cfg config.Config, dirPath string, workers int, ffprobePath string) (*library.LibraryCache, error) { // Validate path info, err := os.Stat(dirPath) if err != nil { - return fmt.Errorf("path not found: %s", dirPath) + return nil, fmt.Errorf("path not found: %s", dirPath) } if !info.IsDir() { - return fmt.Errorf("not a directory: %s", dirPath) + return nil, fmt.Errorf("not a directory: %s", dirPath) } - cfg := loadConfig() - // Resolve workers: flag → config → default 8 if workers == 0 { workers = cfg.Library.Workers @@ -93,10 +114,6 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error // Load existing cache for incremental scanning existing, _ := library.LoadCache() - // Context with signal handling - ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) - defer stop() - bold := color.New(color.Bold) bold.Printf("\n Scanning %s...\n\n", dirPath) @@ -114,14 +131,14 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error }, }) if err != nil { - return fmt.Errorf("scan failed: %w", err) + return nil, fmt.Errorf("scan failed: %w", err) } fmt.Fprintf(os.Stderr, "\r\033[K") // clear progress line // Save cache if err := library.SaveCache(cache); err != nil { - return fmt.Errorf("save cache: %w", err) + return nil, fmt.Errorf("save cache: %w", err) } // Remember scan path in config @@ -133,11 +150,12 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error // Print summary printScanSummary(cache) - // JSON output mode + // JSON output mode — emit the cache and skip the prewarm (the caller skips + // the sync via the same flag). if jsonOut { enc := json.NewEncoder(os.Stdout) enc.SetIndent("", " ") - return enc.Encode(cache) + return cache, enc.Encode(cache) } // Pre-extract sidecars (text subs → WebVTT, panel frames → JPEG) into a hidden @@ -162,15 +180,14 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error } } - // Sync to server - if !noSync { - return syncToServer(ctx, cfg, cache) - } - - return nil + return cache, nil } -func syncToServer(ctx context.Context, cfg config.Config, cache *library.LibraryCache) error { +// syncToServer uploads the scanned items of THIS invocation as one sync +// session. roots lists every root the invocation scanned; fullCycle marks a +// no-args run that covered all configured roots (the server may then reap +// stale rows regardless of prefix — see LibrarySyncRequest.FullCycle). +func syncToServer(ctx context.Context, cfg config.Config, items []agent.LibrarySyncItem, roots []string, fullCycle bool) error { apiKey := apiKeyFlag if apiKey == "" { apiKey = cfg.Auth.APIKey @@ -182,50 +199,28 @@ func syncToServer(ctx context.Context, cfg config.Config, cache *library.Library ac := agent.NewClient(cfg.Auth.APIURL, apiKey, "unarr/"+Version) - items := library.BuildSyncItems(cache) - if len(items) == 0 { color.Yellow("\n No valid items to sync.") return nil } - // Send in batches of 100 - const batchSize = 100 - totalSynced := 0 - totalMatched := 0 - totalRemoved := 0 - syncStartedAt := time.Now().UTC().Format(time.RFC3339) - - for i := 0; i < len(items); i += batchSize { - end := i + batchSize - if end > len(items) { - end = len(items) - } - batch := items[i:end] - isLast := end >= len(items) - - fmt.Fprintf(os.Stderr, "\r Syncing %d/%d items...\033[K", end, len(items)) - - resp, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{ - Items: batch, - ScanPath: cache.Path, - AgentID: cfg.Agent.ID, - IsLastBatch: isLast, - SyncStartedAt: syncStartedAt, - }) - if err != nil { - return fmt.Errorf("sync failed: %w", err) - } - - totalSynced += resp.Synced - totalMatched += resp.Matched - totalRemoved += resp.Removed + res, err := library.SyncBatches(ctx, ac, items, library.SyncOptions{ + AgentID: cfg.Agent.ID, + ScanPath: roots[0], + ScanRoots: roots, + FullCycle: fullCycle, + OnProgress: func(sent, total int) { + fmt.Fprintf(os.Stderr, "\r Syncing %d/%d items...\033[K", sent, total) + }, + }) + if err != nil { + return fmt.Errorf("sync failed: %w", err) } fmt.Fprintf(os.Stderr, "\r\033[K") green := color.New(color.FgGreen) - green.Printf("\n ✓ Synced %d items (%d matched, %d removed)\n", totalSynced, totalMatched, totalRemoved) + green.Printf("\n ✓ Synced %d items (%d matched, %d removed)\n", res.Synced, res.Matched, res.Removed) apiURL := strings.TrimSuffix(cfg.Auth.APIURL, "/") fmt.Printf(" → View upgrades at %s/library\n\n", apiURL) diff --git a/internal/library/sync.go b/internal/library/sync.go index 461c189..c2bef14 100644 --- a/internal/library/sync.go +++ b/internal/library/sync.go @@ -1,12 +1,74 @@ package library import ( + "context" "path/filepath" "strings" + "time" "github.com/torrentclaw/unarr/internal/agent" ) +// SyncOptions describes ONE library sync session — a set of batches sharing a +// single syncStartedAt so the server can reap rows not seen by the session. +type SyncOptions struct { + AgentID string + // ScanPath is the primary root, kept for pre-scanRoots servers. + ScanPath string + // ScanRoots lists every root this session covers (see LibrarySyncRequest). + ScanRoots []string + // FullCycle: the session spans every configured root — the server may reap + // unseen rows regardless of path prefix. NEVER set it for a subtree scan. + FullCycle bool + // OnProgress, when non-nil, is called after each batch with (sent, total). + OnProgress func(sent, total int) +} + +// SyncResult aggregates the per-batch server responses of a session. +type SyncResult struct { + Synced int + Matched int + Removed int +} + +// SyncBatches uploads items to the server in batches of 100 as ONE sync +// session: every batch shares the same syncStartedAt and only the final one +// carries isLastBatch, so the server's stale-row cleanup sees the whole cycle +// at once. The single source of the batching protocol — shared by `unarr scan` +// (cmd/scan.go) and the daemon auto-scan (cmd/daemon.go); before this each +// root synced as its own session and the per-agent cleanup could reap rows of +// roots the session never visited. +func SyncBatches(ctx context.Context, ac *agent.Client, items []agent.LibrarySyncItem, opts SyncOptions) (SyncResult, error) { + const batchSize = 100 + var res SyncResult + syncStartedAt := time.Now().UTC().Format(time.RFC3339) + for i := 0; i < len(items); i += batchSize { + end := i + batchSize + if end > len(items) { + end = len(items) + } + resp, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{ + Items: items[i:end], + ScanPath: opts.ScanPath, + AgentID: opts.AgentID, + IsLastBatch: end >= len(items), + SyncStartedAt: syncStartedAt, + ScanRoots: opts.ScanRoots, + FullCycle: opts.FullCycle, + }) + if err != nil { + return res, err + } + res.Synced += resp.Synced + res.Matched += resp.Matched + res.Removed += resp.Removed + if opts.OnProgress != nil { + opts.OnProgress(end, len(items)) + } + } + return res, nil +} + // relToRoot returns the file's path relative to the scan root (forward-slashed), // or "" when it doesn't live under root. The server stores this so streaming can // later reconstruct the absolute path from the agent's *current* root.