fix(daemon): reportar fallos de arranque de sesión a la web + scan en sesión única

- nuevo agentClient.ReportSessionError → POST /agent/session-error;
  failSession() en todos los abortos del handler de sesiones (path muerto,
  ffmpeg ausente, remux, provider debrid, StartHLSSession). Antes eran
  returns mudos y el player quedaba en "Preparando sesión" hasta agotar el
  deadline de probes
- resolvePlayableFile() unifica la resolución de paths del /stream raw y de
  las sesiones HLS/remux/direct (remap de base path + stat con retries NFS +
  directorio→vídeo, antes duplicada y divergente) y distingue file_missing
  (la web self-heala filas stale) de path_rejected (el fichero existe fuera
  de los roots = config; la web no debe podar nada)
- library.SyncBatches: el batching del sync de biblioteca vive en un solo
  sitio; el scan manual y el auto-scan sincronizan todos los roots en UNA
  sesión con scanRoots/fullCycle, en vez de una sesión por root que dejaba
  al server podar filas de roots que la sesión nunca visitó
This commit is contained in:
Deivid Soto 2026-06-10 17:39:09 +02:00
parent 4bdd161e02
commit 0dca296fec
6 changed files with 397 additions and 174 deletions

View file

@ -131,6 +131,32 @@ func (c *Client) MarkSessionReady(ctx context.Context, sessionID string, health
return nil return nil
} }
// ReportSessionError is the failure-path counterpart of MarkSessionReady: it
// tells the web a streaming session can NOT start (file gone, path rejected,
// ffmpeg missing, spawn failure…). The web marks the session failed, pushes an
// SSE "failed" event so the player stops probing a playlist that will never
// exist, and self-heals stale library state on code "file_missing".
//
// code is one of the stable machine codes the web understands:
// "file_missing" | "path_rejected" | "no_video_file" | "ffmpeg_unavailable" |
// "start_failed". message is free-form detail for diagnostics.
//
// Best-effort like MarkSessionReady: on older web deployments without the
// endpoint this 404s — the caller logs and the player falls back to its
// probe-deadline behaviour, exactly as before this channel existed.
func (c *Client) ReportSessionError(ctx context.Context, sessionID, code, message string) error {
req := struct {
SessionID string `json:"sessionId"`
Code string `json:"code"`
Message string `json:"message,omitempty"`
}{SessionID: sessionID, Code: code, Message: message}
var resp StatusResponse
if err := c.doPost(ctx, "/api/internal/agent/session-error", req, &resp); err != nil {
return fmt.Errorf("report session error: %w", err)
}
return nil
}
// SessionHealth is an OPTIONAL live-transcode health snapshot attached to a // SessionHealth is an OPTIONAL live-transcode health snapshot attached to a
// session-ready report (F3). A nil *SessionHealth means the agent has no // session-ready report (F3). A nil *SessionHealth means the agent has no
// telemetry to share (cache hit, direct-play, or progress not yet stable) and // telemetry to share (cache hit, direct-play, or progress not yet stable) and

View file

@ -361,6 +361,17 @@ type LibrarySyncRequest struct {
AgentID string `json:"agentId,omitempty"` // lets the server scope stale-cleanup per agent AgentID string `json:"agentId,omitempty"` // lets the server scope stale-cleanup per agent
IsLastBatch bool `json:"isLastBatch"` IsLastBatch bool `json:"isLastBatch"`
SyncStartedAt string `json:"syncStartedAt,omitempty"` // ISO-8601; same for all batches in a session SyncStartedAt string `json:"syncStartedAt,omitempty"` // ISO-8601; same for all batches in a session
// ScanRoots lists EVERY root this sync session covered (a session spans all
// roots since 1.0.9 — one syncStartedAt, one isLastBatch). The server scopes
// stale-row cleanup of a partial session to these prefixes. Older servers
// ignore the field and fall back to ScanPath.
ScanRoots []string `json:"scanRoots,omitempty"`
// FullCycle marks a session that covered every root the agent scans
// (daemon auto-scan, `unarr scan` without args). The server may then reap
// unseen rows REGARDLESS of path prefix — old-base-path ghost rows
// included. Must stay false for a manual subtree scan or when any root's
// scan failed, or the cleanup would reap rows the session never visited.
FullCycle bool `json:"fullCycle,omitempty"`
} }
// LibrarySyncItem is a single scanned media file with ffprobe metadata. // LibrarySyncItem is a single scanned media file with ffprobe metadata.

View file

@ -632,64 +632,18 @@ func runDaemonStart() error {
}() }()
} }
allowedRoots := streamAllowedRoots(cfg)
filePath := filepath.Clean(sr.FilePath)
// Self-heal a base-path mismatch: the web may hand us a path under an old // Self-heal a base-path mismatch: the web may hand us a path under an old
// root (e.g. /mnt/nas/peliculas/… from before a binary→docker move) that // root (e.g. /mnt/nas/peliculas/… from before a binary→docker move) that
// is now outside our allowed dirs but whose file still exists under a // is now outside our allowed dirs but whose file still exists under a
// current root (/downloads/…). Remap the path's tail onto an allowed root // current root (/downloads/…). resolvePlayableFile remaps, stat-retries
// so playback works immediately; the next re-scan persists the fix to the // (NFS) and resolves directories; the next re-scan persists the fix to
// DB. See docs/plans/unarr-path-resilience.md. // the DB. See docs/plans/unarr-path-resilience.md.
if !isAllowedStreamPath(filePath, allowedRoots...) { filePath, errCode, perr := resolvePlayableFile(sr.FilePath, streamAllowedRoots(cfg), agent.ShortID(sr.TaskID))
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" { if perr != nil {
log.Printf("[%s] stream self-heal: remapped %s → %s", agent.ShortID(sr.TaskID), filePath, remapped) log.Printf("[%s] stream request rejected (%s): %v", agent.ShortID(sr.TaskID), errCode, perr)
filePath = remapped reportStreamError(perr.Error())
} else {
log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath)
reportStreamError(fmt.Sprintf("path outside allowed dirs: %s", filePath))
return return
} }
}
// os.Stat over NFS can transiently fail (ESTALE/EAGAIN/timeout) right
// after a remount or under load. Retry a few times before giving up so
// a hiccup doesn't surface as a spurious "file not found" — this is the
// root of the intermittent "works on the 3rd try" stream failures.
var info os.FileInfo
var statErr error
for attempt := 0; attempt < 3; attempt++ {
if info, statErr = os.Stat(filePath); statErr == nil {
break
}
if attempt < 2 {
time.Sleep(300 * time.Millisecond)
}
}
if statErr != nil {
// Last resort before failing: the file may simply have moved within
// an allowed root — try to relocate it by path tail.
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" {
log.Printf("[%s] stream self-heal: relocated missing %s → %s", agent.ShortID(sr.TaskID), filePath, remapped)
filePath = remapped
info, statErr = os.Stat(filePath)
}
}
if statErr != nil {
log.Printf("[%s] stream request: file not found after retries: %s (%v)", agent.ShortID(sr.TaskID), filePath, statErr)
reportStreamError(fmt.Sprintf("file not found: %s", filePath))
return
}
if info.IsDir() {
found := engine.FindVideoFile(filePath)
if found == "" {
log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath)
reportStreamError(fmt.Sprintf("no video file in directory: %s", filePath))
return
}
filePath = found
log.Printf("[%s] resolved directory to video file: %s", agent.ShortID(sr.TaskID), filepath.Base(filePath))
}
cancelStreamContexts() cancelStreamContexts()
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID) streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID)
@ -720,6 +674,24 @@ func runDaemonStart() error {
return // already running return // already running
} }
// failSession logs AND reports a startup failure to the web — every
// abort path in this handler must go through it. A silent `return`
// here left the player probing a playlist that would never exist
// until its 30s deadline (incident 2026-06-10: deleted file + stale
// library row = eternal "Preparando sesión"). Best-effort: on old web
// deployments the endpoint 404s and the player falls back to the
// probe deadline, exactly as before.
failSession := func(sessionID, code, message string) {
log.Printf("[hls %s] failed (%s): %s", agent.ShortID(sessionID), code, message)
go func() {
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := agentClient.ReportSessionError(rctx, sessionID, code, message); err != nil {
log.Printf("[hls %s] session error report failed: %v", agent.ShortID(sessionID), err)
}
}()
}
// startHLSPlayback starts an HLS encode (local file or debrid URL) and // startHLSPlayback starts an HLS encode (local file or debrid URL) and
// wires it into the StreamServer. Shared by the local-file HLS path and // wires it into the StreamServer. Shared by the local-file HLS path and
// the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe // the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe
@ -771,7 +743,7 @@ func runDaemonStart() error {
if err != nil { if err != nil {
playerSessionRegistry.remove(hlsCfg.SessionID) playerSessionRegistry.remove(hlsCfg.SessionID)
hlsCancel() hlsCancel()
log.Printf("[hls %s] start failed: %v", agent.ShortID(hlsCfg.SessionID), err) failSession(hlsCfg.SessionID, "start_failed", err.Error())
return return
} }
if prewarm { if prewarm {
@ -811,7 +783,7 @@ func runDaemonStart() error {
provider, perr := engine.NewDebridFileProvider(bctx, sess.DirectURL, sess.FileName, sess.FileSize, refresh) provider, perr := engine.NewDebridFileProvider(bctx, sess.DirectURL, sess.FileName, sess.FileSize, refresh)
if perr != nil { if perr != nil {
playerSessionRegistry.remove(sess.SessionID) playerSessionRegistry.remove(sess.SessionID)
log.Printf("[stream %s] debrid provider failed: %v", agent.ShortID(sess.SessionID), perr) failSession(sess.SessionID, "start_failed", fmt.Sprintf("debrid provider: %v", perr))
return return
} }
streamSrv.SetFile(provider, sess.TaskID) streamSrv.SetFile(provider, sess.TaskID)
@ -834,7 +806,7 @@ func runDaemonStart() error {
if sess.DirectURL != "" { // playMethod == "hls" implied (2a returned above) if sess.DirectURL != "" { // playMethod == "hls" implied (2a returned above)
tcRuntime := buildTranscodeRuntime(ctx, cfg) tcRuntime := buildTranscodeRuntime(ctx, cfg)
if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" {
log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable (debrid HLS)", agent.ShortID(sess.SessionID)) failSession(sess.SessionID, "ffmpeg_unavailable", "ffmpeg/ffprobe unavailable (debrid HLS)")
return return
} }
hlsCtx, hlsCancel := context.WithCancel(ctx) hlsCtx, hlsCancel := context.WithCancel(ctx)
@ -860,44 +832,23 @@ func runDaemonStart() error {
return return
} }
filePath := sess.FilePath if sess.FilePath == "" {
if filePath == "" { failSession(sess.SessionID, "start_failed", "empty file path")
log.Printf("[hls %s] rejected: empty file path", agent.ShortID(sess.SessionID))
return return
} }
filePath = filepath.Clean(filePath) // SAME base-path self-heal + stat-retry + dir resolution as the raw
// Apply the SAME base-path self-heal remap as the raw /stream handler // /stream handler (resolvePlayableFile). A path under an old/host base
// (OnStreamRequest above). Without it, a path under an old/host base
// (e.g. /mnt/nas/peliculas/… handed by the web while this docker agent // (e.g. /mnt/nas/peliculas/… handed by the web while this docker agent
// mounts that media at /downloads) is rejected here even though the raw // mounts that media at /downloads) remaps onto the current root; a path
// path self-heals it — so the web silently falls back to the raw stream // whose file is genuinely gone fails fast as "file_missing" so the web
// and HLS/remux never runs (no transcode, slow funnel start). NOTE: this // can prune the stale library row and the player can fall back, instead
// replicates only the lexical-remap; the raw handler additionally retries // of the player probing a playlist that will never exist.
// os.Stat for transient NFS errors. The HLS dir-check below proceeds (not
// rejects) on a stat error, so it tolerates an NFS blip differently.
// See docs/plans/unarr-path-resilience.md. // See docs/plans/unarr-path-resilience.md.
hlsAllowedRoots := streamAllowedRoots(cfg) filePath, errCode, perr := resolvePlayableFile(sess.FilePath, streamAllowedRoots(cfg), "hls "+agent.ShortID(sess.SessionID))
if !isAllowedStreamPath(filePath, hlsAllowedRoots...) { if perr != nil {
if remapped := relocateUnreachable(filePath, hlsAllowedRoots); remapped != "" { failSession(sess.SessionID, errCode, perr.Error())
log.Printf("[hls %s] self-heal: remapped %s → %s",
agent.ShortID(sess.SessionID), filePath, remapped)
filePath = remapped
} else {
log.Printf("[hls %s] rejected: path outside allowed dirs: %s",
agent.ShortID(sess.SessionID), filePath)
return return
} }
}
// Resolve directory → first video file (matches StreamRequest behavior).
if info, err := os.Stat(filePath); err == nil && info.IsDir() {
found := engine.FindVideoFile(filePath)
if found == "" {
log.Printf("[hls %s] rejected: no video file in dir %s",
agent.ShortID(sess.SessionID), filePath)
return
}
filePath = found
}
// Direct-play (hueco #3 / 3a): the web decided this source is already // Direct-play (hueco #3 / 3a): the web decided this source is already
// browser-native (mp4 h264/aac 8-bit SDR) from library scan metadata, // browser-native (mp4 h264/aac 8-bit SDR) from library scan metadata,
@ -925,7 +876,7 @@ func runDaemonStart() error {
tcRuntime := buildTranscodeRuntime(ctx, cfg) tcRuntime := buildTranscodeRuntime(ctx, cfg)
if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" {
log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID)) failSession(sess.SessionID, "ffmpeg_unavailable", "ffmpeg/ffprobe unavailable")
return return
} }
@ -939,7 +890,7 @@ func runDaemonStart() error {
probe, perr := engine.ProbeFile(probeCtx, tcRuntime.FFprobePath, filePath) probe, perr := engine.ProbeFile(probeCtx, tcRuntime.FFprobePath, filePath)
cancelProbe() cancelProbe()
if perr != nil { if perr != nil {
log.Printf("[stream %s] remux probe failed: %v", agent.ShortID(sess.SessionID), perr) failSession(sess.SessionID, "start_failed", fmt.Sprintf("remux probe: %v", perr))
return return
} }
tProbe := time.Now() tProbe := time.Now()
@ -947,7 +898,7 @@ func runDaemonStart() error {
src, serr := engine.NewRemuxSource(remuxCtx, filePath, probe, tcRuntime.FFmpegPath, sess.FileName) src, serr := engine.NewRemuxSource(remuxCtx, filePath, probe, tcRuntime.FFmpegPath, sess.FileName)
if serr != nil { if serr != nil {
remuxCancel() remuxCancel()
log.Printf("[stream %s] remux start failed: %v", agent.ShortID(sess.SessionID), serr) failSession(sess.SessionID, "start_failed", fmt.Sprintf("remux start: %v", serr))
return return
} }
streamSrv.SetGrowingFile(src, sess.TaskID) streamSrv.SetGrowingFile(src, sess.TaskID)
@ -1201,6 +1152,79 @@ func relocateUnreachable(filePath string, allowedRoots []string) string {
return "" return ""
} }
// Stable machine codes for the web's session-error channel
// (POST /api/internal/agent/session-error). Only "file_missing" triggers
// destructive self-heal on the web (it prunes the stale library row + task
// pointer), so the resolver must never return it while the file may exist.
const (
pathErrRejected = "path_rejected"
pathErrMissing = "file_missing"
pathErrNoVideo = "no_video_file"
)
// resolvePlayableFile validates and self-heals a web-provided source path into
// a playable on-disk video file. Shared by the raw /stream handler and every
// session transport (HLS / remux / direct-play) so they all behave
// identically — before this, the HLS path replicated only the lexical remap
// and silently diverged on stat retries (docs/plans/unarr-path-resilience.md):
//
// 1. Containment: the cleaned path must live under an allowed root; if not,
// relocate it by path tail (old base path → current mount).
// 2. Existence: os.Stat with retries (NFS can transiently fail right after a
// remount or under load — the root of the "works on the 3rd try" stream
// failures), then one last relocate for files that moved within a root.
// 3. Directories resolve to their first contained video file.
//
// On failure returns a stable errCode: "path_rejected" means the file EXISTS
// at the original path but outside every allowed root (an agent config
// problem — the web must NOT prune library rows over it); "file_missing"
// means no readable file was found anywhere; "no_video_file" is a directory
// with nothing playable inside.
func resolvePlayableFile(rawPath string, allowedRoots []string, logLabel string) (string, string, error) {
filePath := filepath.Clean(rawPath)
if !isAllowedStreamPath(filePath, allowedRoots...) {
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" {
log.Printf("[%s] stream self-heal: remapped %s → %s", logLabel, filePath, remapped)
filePath = remapped
} else if _, err := os.Stat(filePath); err == nil {
return "", pathErrRejected, fmt.Errorf("path outside allowed dirs: %s", filePath)
} else {
return "", pathErrMissing, fmt.Errorf("file not found under any allowed root: %s", filePath)
}
}
var info os.FileInfo
var statErr error
for attempt := 0; attempt < 3; attempt++ {
if info, statErr = os.Stat(filePath); statErr == nil {
break
}
if attempt < 2 {
time.Sleep(300 * time.Millisecond)
}
}
if statErr != nil {
// Last resort before failing: the file may simply have moved within
// an allowed root — try to relocate it by path tail.
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" {
log.Printf("[%s] stream self-heal: relocated missing %s → %s", logLabel, filePath, remapped)
filePath = remapped
info, statErr = os.Stat(filePath)
}
}
if statErr != nil {
return "", pathErrMissing, fmt.Errorf("file not found after retries: %s (%v)", filePath, statErr)
}
if info.IsDir() {
found := engine.FindVideoFile(filePath)
if found == "" {
return "", pathErrNoVideo, fmt.Errorf("no video file in directory: %s", filePath)
}
log.Printf("[%s] resolved directory to video file: %s", logLabel, filepath.Base(found))
filePath = found
}
return filePath, "", nil
}
func formatSpeedLog(bps int64) string { func formatSpeedLog(bps int64) string {
switch { switch {
case bps >= 1024*1024*1024: case bps >= 1024*1024*1024:
@ -1291,19 +1315,26 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration,
} }
} }
// Scan each path independently and sync per path so the server can // Scan every path, then sync ALL of them as ONE session (single
// scope stale-item deletion to the correct directory prefix. // syncStartedAt + final isLastBatch via library.SyncBatches). Per-root
const batchSize = 100 // sessions let the server's per-agent stale cleanup reap rows of roots
totalSynced := 0 // a session never visited; one full-cycle session makes the cleanup
// sound AND lets it reap old-base-path ghost rows (fullCycle=true —
// only when every root scanned cleanly).
var syncItems []agent.LibrarySyncItem
var coveredRoots []string
fullCycle := true
var mergedItems []library.LibraryItem var mergedItems []library.LibraryItem
for _, scanPath := range scanPaths { for _, scanPath := range scanPaths {
cache, err := library.Scan(ctx, scanPath, existing, scanOpts) cache, err := library.Scan(ctx, scanPath, existing, scanOpts)
if err != nil { if err != nil {
log.Printf("[auto-scan] scan failed for %s: %v", scanPath, err) log.Printf("[auto-scan] scan failed for %s: %v", scanPath, err)
fullCycle = false
continue continue
} }
mergedItems = append(mergedItems, cache.Items...) mergedItems = append(mergedItems, cache.Items...)
coveredRoots = append(coveredRoots, scanPath)
if prewarmFFmpeg != "" { if prewarmFFmpeg != "" {
library.PrewarmSidecars(ctx, cache, library.PrewarmOptions{ library.PrewarmSidecars(ctx, cache, library.PrewarmOptions{
@ -1323,28 +1354,28 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration,
log.Printf("[auto-scan] no items under %s", scanPath) log.Printf("[auto-scan] no items under %s", scanPath)
continue continue
} }
syncItems = append(syncItems, items...)
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
} }
isLast := end >= len(items)
_, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{ totalSynced := 0
Items: items[i:end], if len(syncItems) > 0 {
ScanPath: scanPath, res, err := library.SyncBatches(ctx, ac, syncItems, library.SyncOptions{
AgentID: cfg.Agent.ID, AgentID: cfg.Agent.ID,
IsLastBatch: isLast, ScanPath: coveredRoots[0],
SyncStartedAt: syncStartedAt, ScanRoots: coveredRoots,
FullCycle: fullCycle,
}) })
if err != nil { if err != nil {
log.Printf("[auto-scan] sync failed for %s: %v", scanPath, err) log.Printf("[auto-scan] sync failed: %v", err)
break } else if res.Removed > 0 {
log.Printf("[auto-scan] server removed %d stale item(s)", res.Removed)
} }
} totalSynced = res.Synced
totalSynced += len(items) } else {
// An entirely-empty library can't open a sync session (the server
// requires ≥1 item per batch), so stale rows survive until a file
// reappears — same trade-off as before, now explicit.
log.Printf("[auto-scan] no items under any scan path — skipping sync")
} }
// Save merged cache for incremental scanning next time. // Save merged cache for incremental scanning next time.

View file

@ -0,0 +1,98 @@
package cmd
import (
"os"
"path/filepath"
"testing"
)
func TestResolvePlayableFile(t *testing.T) {
root := t.TempDir()
mkfile(t, filepath.Join(root, "Acme Show", "Season 01", "ep.mkv"))
roots := []string{root}
t.Run("allowed path resolves to itself", func(t *testing.T) {
want := filepath.Join(root, "Acme Show", "Season 01", "ep.mkv")
got, code, err := resolvePlayableFile(want, roots, "test")
if err != nil {
t.Fatalf("unexpected error (%s): %v", code, err)
}
if got != want {
t.Errorf("got %q want %q", got, want)
}
})
t.Run("old base path relocates onto current root", func(t *testing.T) {
got, code, err := resolvePlayableFile("/old/base/Acme Show/Season 01/ep.mkv", roots, "test")
if err != nil {
t.Fatalf("unexpected error (%s): %v", code, err)
}
want := filepath.Join(root, "Acme Show", "Season 01", "ep.mkv")
if got != want {
t.Errorf("got %q want %q", got, want)
}
})
t.Run("deleted file under old base is file_missing, never path_rejected", func(t *testing.T) {
// The incident shape (2026-06-10): web hands a stale host path
// (/mnt/nas/…) whose file was deleted — the docker agent can't see the
// original path AND no tail relocates. file_missing tells the web to
// prune the stale row; path_rejected would block that self-heal.
_, code, err := resolvePlayableFile("/old/base/Acme Show/Season 01/gone.mkv", roots, "test")
if err == nil {
t.Fatal("expected error for deleted file")
}
if code != pathErrMissing {
t.Errorf("code = %q, want %q", code, pathErrMissing)
}
})
t.Run("existing file outside roots is path_rejected", func(t *testing.T) {
outside := t.TempDir()
// 1-segment-deep on purpose: a ≥3-segment tail could legitimately
// relocate INTO the root if a same-named file existed there.
mkfile(t, filepath.Join(outside, "leak.mkv"))
_, code, err := resolvePlayableFile(filepath.Join(outside, "leak.mkv"), roots, "test")
if err == nil {
t.Fatal("expected error for out-of-root file")
}
if code != pathErrRejected {
t.Errorf("code = %q, want %q", code, pathErrRejected)
}
})
t.Run("missing file inside an allowed root is file_missing", func(t *testing.T) {
_, code, err := resolvePlayableFile(filepath.Join(root, "Acme Show", "Season 01", "gone.mkv"), roots, "test")
if err == nil {
t.Fatal("expected error for missing file")
}
if code != pathErrMissing {
t.Errorf("code = %q, want %q", code, pathErrMissing)
}
})
t.Run("directory resolves to its video file", func(t *testing.T) {
got, code, err := resolvePlayableFile(filepath.Join(root, "Acme Show", "Season 01"), roots, "test")
if err != nil {
t.Fatalf("unexpected error (%s): %v", code, err)
}
want := filepath.Join(root, "Acme Show", "Season 01", "ep.mkv")
if got != want {
t.Errorf("got %q want %q", got, want)
}
})
t.Run("directory without video is no_video_file", func(t *testing.T) {
empty := filepath.Join(root, "Empty Show")
if err := os.MkdirAll(empty, 0o755); err != nil {
t.Fatal(err)
}
_, code, err := resolvePlayableFile(empty, roots, "test")
if err == nil {
t.Fatal("expected error for empty directory")
}
if code != pathErrNoVideo {
t.Errorf("code = %q, want %q", code, pathErrNoVideo)
}
})
}

View file

@ -9,7 +9,6 @@ import (
"sort" "sort"
"strings" "strings"
"syscall" "syscall"
"time"
"github.com/fatih/color" "github.com/fatih/color"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -40,20 +39,40 @@ to see available quality upgrades.`,
if showStatus { if showStatus {
return runScanStatus() return runScanStatus()
} }
if len(args) == 0 {
cfg := loadConfig() cfg := loadConfig()
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
// All scanned roots feed ONE sync session (single syncStartedAt +
// final isLastBatch) so the server's stale-row cleanup sees the
// whole cycle at once. fullCycle only without an explicit path —
// a subtree scan must never let the server reap outside it.
if len(args) == 0 {
paths := library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath) paths := library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath)
if len(paths) == 0 { if len(paths) == 0 {
return fmt.Errorf("usage: unarr scan <path>\n\nNo scan paths configured. Provide a path or set up downloads.dir via 'unarr init'") return fmt.Errorf("usage: unarr scan <path>\n\nNo scan paths configured. Provide a path or set up downloads.dir via 'unarr init'")
} }
var items []agent.LibrarySyncItem
for _, p := range paths { for _, p := range paths {
if err := runScan(p, workers, ffprobe, noSync); err != nil { cache, err := runScan(ctx, cfg, p, workers, ffprobe)
if err != nil {
return err return err
} }
items = append(items, library.BuildSyncItems(cache)...)
} }
if noSync || jsonOut {
return nil return nil
} }
return runScan(args[0], workers, ffprobe, noSync) return syncToServer(ctx, cfg, items, paths, true)
}
cache, err := runScan(ctx, cfg, args[0], workers, ffprobe)
if err != nil {
return err
}
if noSync || jsonOut {
return nil
}
return syncToServer(ctx, cfg, library.BuildSyncItems(cache), []string{args[0]}, false)
}, },
} }
@ -65,18 +84,20 @@ to see available quality upgrades.`,
return cmd return cmd
} }
func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error { // runScan walks one root, saves the cache and prewarms sidecars. Syncing to
// the server is the CALLER's job (RunE) — all roots of an invocation feed one
// sync session via syncToServer, so per-root sessions can't trick the server
// into reaping rows of roots the session never visited.
func runScan(ctx context.Context, cfg config.Config, dirPath string, workers int, ffprobePath string) (*library.LibraryCache, error) {
// Validate path // Validate path
info, err := os.Stat(dirPath) info, err := os.Stat(dirPath)
if err != nil { if err != nil {
return fmt.Errorf("path not found: %s", dirPath) return nil, fmt.Errorf("path not found: %s", dirPath)
} }
if !info.IsDir() { if !info.IsDir() {
return fmt.Errorf("not a directory: %s", dirPath) return nil, fmt.Errorf("not a directory: %s", dirPath)
} }
cfg := loadConfig()
// Resolve workers: flag → config → default 8 // Resolve workers: flag → config → default 8
if workers == 0 { if workers == 0 {
workers = cfg.Library.Workers workers = cfg.Library.Workers
@ -93,10 +114,6 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error
// Load existing cache for incremental scanning // Load existing cache for incremental scanning
existing, _ := library.LoadCache() existing, _ := library.LoadCache()
// Context with signal handling
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()
bold := color.New(color.Bold) bold := color.New(color.Bold)
bold.Printf("\n Scanning %s...\n\n", dirPath) bold.Printf("\n Scanning %s...\n\n", dirPath)
@ -114,14 +131,14 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error
}, },
}) })
if err != nil { if err != nil {
return fmt.Errorf("scan failed: %w", err) return nil, fmt.Errorf("scan failed: %w", err)
} }
fmt.Fprintf(os.Stderr, "\r\033[K") // clear progress line fmt.Fprintf(os.Stderr, "\r\033[K") // clear progress line
// Save cache // Save cache
if err := library.SaveCache(cache); err != nil { if err := library.SaveCache(cache); err != nil {
return fmt.Errorf("save cache: %w", err) return nil, fmt.Errorf("save cache: %w", err)
} }
// Remember scan path in config // Remember scan path in config
@ -133,11 +150,12 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error
// Print summary // Print summary
printScanSummary(cache) printScanSummary(cache)
// JSON output mode // JSON output mode — emit the cache and skip the prewarm (the caller skips
// the sync via the same flag).
if jsonOut { if jsonOut {
enc := json.NewEncoder(os.Stdout) enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ") enc.SetIndent("", " ")
return enc.Encode(cache) return cache, enc.Encode(cache)
} }
// Pre-extract sidecars (text subs → WebVTT, panel frames → JPEG) into a hidden // Pre-extract sidecars (text subs → WebVTT, panel frames → JPEG) into a hidden
@ -162,15 +180,14 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error
} }
} }
// Sync to server return cache, nil
if !noSync {
return syncToServer(ctx, cfg, cache)
} }
return nil // syncToServer uploads the scanned items of THIS invocation as one sync
} // session. roots lists every root the invocation scanned; fullCycle marks a
// no-args run that covered all configured roots (the server may then reap
func syncToServer(ctx context.Context, cfg config.Config, cache *library.LibraryCache) error { // stale rows regardless of prefix — see LibrarySyncRequest.FullCycle).
func syncToServer(ctx context.Context, cfg config.Config, items []agent.LibrarySyncItem, roots []string, fullCycle bool) error {
apiKey := apiKeyFlag apiKey := apiKeyFlag
if apiKey == "" { if apiKey == "" {
apiKey = cfg.Auth.APIKey apiKey = cfg.Auth.APIKey
@ -182,50 +199,28 @@ func syncToServer(ctx context.Context, cfg config.Config, cache *library.Library
ac := agent.NewClient(cfg.Auth.APIURL, apiKey, "unarr/"+Version) ac := agent.NewClient(cfg.Auth.APIURL, apiKey, "unarr/"+Version)
items := library.BuildSyncItems(cache)
if len(items) == 0 { if len(items) == 0 {
color.Yellow("\n No valid items to sync.") color.Yellow("\n No valid items to sync.")
return nil return nil
} }
// Send in batches of 100 res, err := library.SyncBatches(ctx, ac, items, library.SyncOptions{
const batchSize = 100
totalSynced := 0
totalMatched := 0
totalRemoved := 0
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batch := items[i:end]
isLast := end >= len(items)
fmt.Fprintf(os.Stderr, "\r Syncing %d/%d items...\033[K", end, len(items))
resp, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
Items: batch,
ScanPath: cache.Path,
AgentID: cfg.Agent.ID, AgentID: cfg.Agent.ID,
IsLastBatch: isLast, ScanPath: roots[0],
SyncStartedAt: syncStartedAt, ScanRoots: roots,
FullCycle: fullCycle,
OnProgress: func(sent, total int) {
fmt.Fprintf(os.Stderr, "\r Syncing %d/%d items...\033[K", sent, total)
},
}) })
if err != nil { if err != nil {
return fmt.Errorf("sync failed: %w", err) return fmt.Errorf("sync failed: %w", err)
} }
totalSynced += resp.Synced
totalMatched += resp.Matched
totalRemoved += resp.Removed
}
fmt.Fprintf(os.Stderr, "\r\033[K") fmt.Fprintf(os.Stderr, "\r\033[K")
green := color.New(color.FgGreen) green := color.New(color.FgGreen)
green.Printf("\n ✓ Synced %d items (%d matched, %d removed)\n", totalSynced, totalMatched, totalRemoved) green.Printf("\n ✓ Synced %d items (%d matched, %d removed)\n", res.Synced, res.Matched, res.Removed)
apiURL := strings.TrimSuffix(cfg.Auth.APIURL, "/") apiURL := strings.TrimSuffix(cfg.Auth.APIURL, "/")
fmt.Printf(" → View upgrades at %s/library\n\n", apiURL) fmt.Printf(" → View upgrades at %s/library\n\n", apiURL)

View file

@ -1,12 +1,74 @@
package library package library
import ( import (
"context"
"path/filepath" "path/filepath"
"strings" "strings"
"time"
"github.com/torrentclaw/unarr/internal/agent" "github.com/torrentclaw/unarr/internal/agent"
) )
// SyncOptions describes ONE library sync session — a set of batches sharing a
// single syncStartedAt so the server can reap rows not seen by the session.
type SyncOptions struct {
AgentID string
// ScanPath is the primary root, kept for pre-scanRoots servers.
ScanPath string
// ScanRoots lists every root this session covers (see LibrarySyncRequest).
ScanRoots []string
// FullCycle: the session spans every configured root — the server may reap
// unseen rows regardless of path prefix. NEVER set it for a subtree scan.
FullCycle bool
// OnProgress, when non-nil, is called after each batch with (sent, total).
OnProgress func(sent, total int)
}
// SyncResult aggregates the per-batch server responses of a session.
type SyncResult struct {
Synced int
Matched int
Removed int
}
// SyncBatches uploads items to the server in batches of 100 as ONE sync
// session: every batch shares the same syncStartedAt and only the final one
// carries isLastBatch, so the server's stale-row cleanup sees the whole cycle
// at once. The single source of the batching protocol — shared by `unarr scan`
// (cmd/scan.go) and the daemon auto-scan (cmd/daemon.go); before this each
// root synced as its own session and the per-agent cleanup could reap rows of
// roots the session never visited.
func SyncBatches(ctx context.Context, ac *agent.Client, items []agent.LibrarySyncItem, opts SyncOptions) (SyncResult, error) {
const batchSize = 100
var res SyncResult
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
resp, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
Items: items[i:end],
ScanPath: opts.ScanPath,
AgentID: opts.AgentID,
IsLastBatch: end >= len(items),
SyncStartedAt: syncStartedAt,
ScanRoots: opts.ScanRoots,
FullCycle: opts.FullCycle,
})
if err != nil {
return res, err
}
res.Synced += resp.Synced
res.Matched += resp.Matched
res.Removed += resp.Removed
if opts.OnProgress != nil {
opts.OnProgress(end, len(items))
}
}
return res, nil
}
// relToRoot returns the file's path relative to the scan root (forward-slashed), // relToRoot returns the file's path relative to the scan root (forward-slashed),
// or "" when it doesn't live under root. The server stores this so streaming can // or "" when it doesn't live under root. The server stores this so streaming can
// later reconstruct the absolute path from the agent's *current* root. // later reconstruct the absolute path from the agent's *current* root.