Merge branch 'main' into feat/agent-tls-direct
# Conflicts: # internal/cmd/daemon.go
This commit is contained in:
commit
b0637f266b
42 changed files with 2862 additions and 340 deletions
|
|
@ -24,7 +24,7 @@ const browserAuthTimeout = 60 * time.Second
|
|||
// 3. User logs in and clicks "Authorize" on the web page
|
||||
// 4. Web redirects to localhost:{port}/callback?token=tc_...&state={state}
|
||||
// 5. CLI validates state, extracts token, closes server
|
||||
func browserAuth(apiURL string) (string, error) {
|
||||
func browserAuth(apiURL, agentID string) (string, error) {
|
||||
// Validate apiURL is a well-formed HTTP(S) URL
|
||||
parsed, err := url.Parse(apiURL)
|
||||
if err != nil || (parsed.Scheme != "http" && parsed.Scheme != "https") || parsed.Host == "" {
|
||||
|
|
@ -96,8 +96,12 @@ func browserAuth(apiURL string) (string, error) {
|
|||
}
|
||||
}()
|
||||
|
||||
// Open browser
|
||||
// Open browser. Forward the agentId so the server mints a per-machine key
|
||||
// bound to it (omitted → server falls back to the legacy general key).
|
||||
authURL := fmt.Sprintf("%s/unarr/auth?state=%s&port=%d", apiURL, url.QueryEscape(state), port)
|
||||
if agentID != "" {
|
||||
authURL += "&agentId=" + url.QueryEscape(agentID)
|
||||
}
|
||||
openBrowser(authURL)
|
||||
|
||||
// Listen for Enter key to skip to manual fallback
|
||||
|
|
|
|||
|
|
@ -653,6 +653,14 @@ func runDaemonStart() error {
|
|||
}
|
||||
}
|
||||
|
||||
// Wire: sync receives on-demand subtitle-fetch jobs (write VTT sidecars).
|
||||
// Always available (additive, no deletion) as long as we have scan paths.
|
||||
if len(daemonCfg.ScanPaths) > 0 {
|
||||
sc.OnSubtitleFetch = func(reqs []agent.SubtitleFetchRequest) ([]int, []agent.SubtitleFetchError) {
|
||||
return library.FetchSubtitles(reqs, daemonCfg.ScanPaths)
|
||||
}
|
||||
}
|
||||
|
||||
// Wire: sync receives stream requests for completed downloads
|
||||
d.OnStreamRequested = func(sr agent.StreamRequest) {
|
||||
if streamSrv.CurrentTaskID() == sr.TaskID {
|
||||
|
|
@ -683,65 +691,19 @@ func runDaemonStart() error {
|
|||
}()
|
||||
}
|
||||
|
||||
allowedRoots := streamAllowedRoots(cfg)
|
||||
|
||||
filePath := filepath.Clean(sr.FilePath)
|
||||
// Self-heal a base-path mismatch: the web may hand us a path under an old
|
||||
// root (e.g. /mnt/nas/peliculas/… from before a binary→docker move) that
|
||||
// is now outside our allowed dirs but whose file still exists under a
|
||||
// current root (/downloads/…). Remap the path's tail onto an allowed root
|
||||
// so playback works immediately; the next re-scan persists the fix to the
|
||||
// DB. See docs/plans/unarr-path-resilience.md.
|
||||
if !isAllowedStreamPath(filePath, allowedRoots...) {
|
||||
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" {
|
||||
log.Printf("[%s] stream self-heal: remapped %s → %s", agent.ShortID(sr.TaskID), filePath, remapped)
|
||||
filePath = remapped
|
||||
} else {
|
||||
log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath)
|
||||
reportStreamError(fmt.Sprintf("path outside allowed dirs: %s", filePath))
|
||||
return
|
||||
}
|
||||
}
|
||||
// os.Stat over NFS can transiently fail (ESTALE/EAGAIN/timeout) right
|
||||
// after a remount or under load. Retry a few times before giving up so
|
||||
// a hiccup doesn't surface as a spurious "file not found" — this is the
|
||||
// root of the intermittent "works on the 3rd try" stream failures.
|
||||
var info os.FileInfo
|
||||
var statErr error
|
||||
for attempt := 0; attempt < 3; attempt++ {
|
||||
if info, statErr = os.Stat(filePath); statErr == nil {
|
||||
break
|
||||
}
|
||||
if attempt < 2 {
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
if statErr != nil {
|
||||
// Last resort before failing: the file may simply have moved within
|
||||
// an allowed root — try to relocate it by path tail.
|
||||
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" {
|
||||
log.Printf("[%s] stream self-heal: relocated missing %s → %s", agent.ShortID(sr.TaskID), filePath, remapped)
|
||||
filePath = remapped
|
||||
info, statErr = os.Stat(filePath)
|
||||
}
|
||||
}
|
||||
if statErr != nil {
|
||||
log.Printf("[%s] stream request: file not found after retries: %s (%v)", agent.ShortID(sr.TaskID), filePath, statErr)
|
||||
reportStreamError(fmt.Sprintf("file not found: %s", filePath))
|
||||
// current root (/downloads/…). resolvePlayableFile remaps, stat-retries
|
||||
// (NFS) and resolves directories; the next re-scan persists the fix to
|
||||
// the DB. See docs/plans/unarr-path-resilience.md.
|
||||
filePath, errCode, perr := resolvePlayableFile(sr.FilePath, streamAllowedRoots(cfg), agent.ShortID(sr.TaskID))
|
||||
if perr != nil {
|
||||
log.Printf("[%s] stream request rejected (%s): %v", agent.ShortID(sr.TaskID), errCode, perr)
|
||||
reportStreamError(perr.Error())
|
||||
return
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
found := engine.FindVideoFile(filePath)
|
||||
if found == "" {
|
||||
log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath)
|
||||
reportStreamError(fmt.Sprintf("no video file in directory: %s", filePath))
|
||||
return
|
||||
}
|
||||
filePath = found
|
||||
log.Printf("[%s] resolved directory to video file: %s", agent.ShortID(sr.TaskID), filepath.Base(filePath))
|
||||
}
|
||||
|
||||
cancelStreamContexts()
|
||||
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID)
|
||||
log.Printf("[%s] streaming from disk: %s → %s", agent.ShortID(sr.TaskID), filepath.Base(filePath), streamSrv.URL())
|
||||
|
|
@ -771,20 +733,104 @@ func runDaemonStart() error {
|
|||
return // already running
|
||||
}
|
||||
|
||||
// failSession logs AND reports a startup failure to the web — every
|
||||
// abort path in this handler must go through it. A silent `return`
|
||||
// here left the player probing a playlist that would never exist
|
||||
// until its 30s deadline (incident 2026-06-10: deleted file + stale
|
||||
// library row = eternal "Preparando sesión"). Best-effort: on old web
|
||||
// deployments the endpoint 404s and the player falls back to the
|
||||
// probe deadline, exactly as before.
|
||||
failSession := func(sessionID, code, message string) {
|
||||
log.Printf("[hls %s] failed (%s): %s", agent.ShortID(sessionID), code, message)
|
||||
go func() {
|
||||
// Fresh context on purpose: failures cluster exactly when the
|
||||
// daemon ctx is being cancelled (shutdown kills in-flight
|
||||
// session starts), and a report derived from it would die
|
||||
// before reaching the web. The 10s cap still bounds it.
|
||||
rctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if err := agentClient.ReportSessionError(rctx, sessionID, code, message); err != nil {
|
||||
log.Printf("[hls %s] session error report failed: %v", agent.ShortID(sessionID), err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// markReady reports "first bytes are servable" for the no-transcode
|
||||
// paths (direct-play, remux, debrid direct) — one place instead of a
|
||||
// copy per branch. HLS sessions report via watchSessionReady instead
|
||||
// (they wait for seg-0 + attach a health snapshot).
|
||||
markReady := func(sessionID string) {
|
||||
go func() {
|
||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
if err := agentClient.MarkSessionReady(rctx, sessionID, nil); err != nil {
|
||||
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// startHLSPlayback starts an HLS encode (local file or debrid URL) and
|
||||
// wires it into the StreamServer. Shared by the local-file HLS path and
|
||||
// the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe
|
||||
// off the sync loop, and report readiness identically.
|
||||
//
|
||||
// Prewarm sessions (background cache-fill: next-episode, hover) take a
|
||||
// deferential path: wait until no live encode is running (never steal
|
||||
// the encoder from the viewer), then register WITHOUT displacing other
|
||||
// sessions. Before this, a prewarm claimed mid-playback went through
|
||||
// Register() and KILLED the stream the user was watching (verified
|
||||
// 2026-06-10: prewarm started → live session "closed (cache
|
||||
// discarded)" → player 404).
|
||||
startHLSPlayback := func(hlsCfg engine.HLSSessionConfig, hlsCtx context.Context, hlsCancel context.CancelFunc) {
|
||||
playerSessionRegistry.add(hlsCfg.SessionID, hlsCancel)
|
||||
prewarm := sess.Prewarm
|
||||
go func() {
|
||||
if prewarm {
|
||||
// Defer until the encoder is free. Poll is cheap (10 s);
|
||||
// cap the wait at 30 min — a prewarm that can't start
|
||||
// within an episode's runtime has lost its purpose.
|
||||
deadline := time.Now().Add(30 * time.Minute)
|
||||
for streamSrv.HLS().HasLiveEncode() {
|
||||
if time.Now().After(deadline) || hlsCtx.Err() != nil {
|
||||
playerSessionRegistry.remove(hlsCfg.SessionID)
|
||||
hlsCancel()
|
||||
log.Printf("[hls %s] prewarm abandoned (encoder busy %s)",
|
||||
agent.ShortID(hlsCfg.SessionID), "30m")
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-hlsCtx.Done():
|
||||
playerSessionRegistry.remove(hlsCfg.SessionID)
|
||||
return
|
||||
case <-time.After(10 * time.Second):
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// REAL session: reap in-flight prewarm encodes BEFORE
|
||||
// StartHLSSession so the per-key cache writer-lock is
|
||||
// free and the viewer's encode lands in the persistent
|
||||
// cache (not an uncached tmpdir). A SEALED prewarm is
|
||||
// unaffected — this session simply cache-HITs it.
|
||||
if n := streamSrv.HLS().CloseWhere(func(s *engine.HLSSession) bool { return s.IsPrewarm() }); n > 0 {
|
||||
log.Printf("[hls %s] reaped %d in-flight prewarm(s) for the viewer session",
|
||||
agent.ShortID(hlsCfg.SessionID), n)
|
||||
}
|
||||
}
|
||||
hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg)
|
||||
if err != nil {
|
||||
playerSessionRegistry.remove(hlsCfg.SessionID)
|
||||
hlsCancel()
|
||||
log.Printf("[hls %s] start failed: %v", agent.ShortID(hlsCfg.SessionID), err)
|
||||
failSession(hlsCfg.SessionID, sessErrStartFailed, err.Error())
|
||||
return
|
||||
}
|
||||
if prewarm {
|
||||
// Side-by-side: never evict the viewer's session. A later
|
||||
// REAL session still evicts this one via Register — by
|
||||
// then the encode is usually sealed in the segment cache.
|
||||
streamSrv.HLS().RegisterKeep(hsess)
|
||||
log.Printf("[hls %s] prewarm encoding: %s", agent.ShortID(hlsCfg.SessionID), hlsCfg.FileName)
|
||||
return // no viewer waiting → no ready-watcher
|
||||
}
|
||||
streamSrv.HLS().Register(hsess)
|
||||
go watchSessionReady(hlsCtx, agentClient, hsess, hlsCfg.SessionID)
|
||||
}()
|
||||
|
|
@ -814,17 +860,13 @@ func runDaemonStart() error {
|
|||
provider, perr := engine.NewDebridFileProvider(bctx, sess.DirectURL, sess.FileName, sess.FileSize, refresh)
|
||||
if perr != nil {
|
||||
playerSessionRegistry.remove(sess.SessionID)
|
||||
log.Printf("[stream %s] debrid provider failed: %v", agent.ShortID(sess.SessionID), perr)
|
||||
failSession(sess.SessionID, sessErrStartFailed, fmt.Sprintf("debrid provider: %v", perr))
|
||||
return
|
||||
}
|
||||
streamSrv.SetFile(provider, sess.TaskID)
|
||||
log.Printf("[stream %s] debrid direct-play: %s (%d bytes)",
|
||||
agent.ShortID(sess.SessionID), provider.FileName(), provider.FileSize())
|
||||
rctx, rcancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer rcancel()
|
||||
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
|
||||
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
||||
}
|
||||
markReady(sess.SessionID)
|
||||
}()
|
||||
return
|
||||
}
|
||||
|
|
@ -837,7 +879,7 @@ func runDaemonStart() error {
|
|||
if sess.DirectURL != "" { // playMethod == "hls" implied (2a returned above)
|
||||
tcRuntime := buildTranscodeRuntime(ctx, cfg)
|
||||
if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" {
|
||||
log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable (debrid HLS)", agent.ShortID(sess.SessionID))
|
||||
failSession(sess.SessionID, sessErrFfmpegMissing, "ffmpeg/ffprobe unavailable (debrid HLS)")
|
||||
return
|
||||
}
|
||||
hlsCtx, hlsCancel := context.WithCancel(ctx)
|
||||
|
|
@ -849,6 +891,8 @@ func runDaemonStart() error {
|
|||
Quality: sess.Quality,
|
||||
AudioIndex: sess.AudioIndex,
|
||||
BurnSubtitleIndex: sess.BurnSubtitleIndex,
|
||||
StartSec: sess.StartSec,
|
||||
Prewarm: sess.Prewarm,
|
||||
Transcode: tcRuntime,
|
||||
Cache: hlsCache,
|
||||
// 2c: refresh the debrid link if it expires mid-transcode; the
|
||||
|
|
@ -861,43 +905,22 @@ func runDaemonStart() error {
|
|||
return
|
||||
}
|
||||
|
||||
filePath := sess.FilePath
|
||||
if filePath == "" {
|
||||
log.Printf("[hls %s] rejected: empty file path", agent.ShortID(sess.SessionID))
|
||||
if sess.FilePath == "" {
|
||||
failSession(sess.SessionID, sessErrStartFailed, "empty file path")
|
||||
return
|
||||
}
|
||||
filePath = filepath.Clean(filePath)
|
||||
// Apply the SAME base-path self-heal remap as the raw /stream handler
|
||||
// (OnStreamRequest above). Without it, a path under an old/host base
|
||||
// SAME base-path self-heal + stat-retry + dir resolution as the raw
|
||||
// /stream handler (resolvePlayableFile). A path under an old/host base
|
||||
// (e.g. /mnt/nas/peliculas/… handed by the web while this docker agent
|
||||
// mounts that media at /downloads) is rejected here even though the raw
|
||||
// path self-heals it — so the web silently falls back to the raw stream
|
||||
// and HLS/remux never runs (no transcode, slow funnel start). NOTE: this
|
||||
// replicates only the lexical-remap; the raw handler additionally retries
|
||||
// os.Stat for transient NFS errors. The HLS dir-check below proceeds (not
|
||||
// rejects) on a stat error, so it tolerates an NFS blip differently.
|
||||
// mounts that media at /downloads) remaps onto the current root; a path
|
||||
// whose file is genuinely gone fails fast as "file_missing" so the web
|
||||
// can prune the stale library row and the player can fall back, instead
|
||||
// of the player probing a playlist that will never exist.
|
||||
// See docs/plans/unarr-path-resilience.md.
|
||||
hlsAllowedRoots := streamAllowedRoots(cfg)
|
||||
if !isAllowedStreamPath(filePath, hlsAllowedRoots...) {
|
||||
if remapped := relocateUnreachable(filePath, hlsAllowedRoots); remapped != "" {
|
||||
log.Printf("[hls %s] self-heal: remapped %s → %s",
|
||||
agent.ShortID(sess.SessionID), filePath, remapped)
|
||||
filePath = remapped
|
||||
} else {
|
||||
log.Printf("[hls %s] rejected: path outside allowed dirs: %s",
|
||||
agent.ShortID(sess.SessionID), filePath)
|
||||
return
|
||||
}
|
||||
}
|
||||
// Resolve directory → first video file (matches StreamRequest behavior).
|
||||
if info, err := os.Stat(filePath); err == nil && info.IsDir() {
|
||||
found := engine.FindVideoFile(filePath)
|
||||
if found == "" {
|
||||
log.Printf("[hls %s] rejected: no video file in dir %s",
|
||||
agent.ShortID(sess.SessionID), filePath)
|
||||
return
|
||||
}
|
||||
filePath = found
|
||||
filePath, errCode, perr := resolvePlayableFile(sess.FilePath, streamAllowedRoots(cfg), "hls "+agent.ShortID(sess.SessionID))
|
||||
if perr != nil {
|
||||
failSession(sess.SessionID, errCode, perr.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// Direct-play (hueco #3 / 3a): the web decided this source is already
|
||||
|
|
@ -914,19 +937,13 @@ func runDaemonStart() error {
|
|||
log.Printf("[stream %s] direct-play: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath))
|
||||
// File is on disk → ready immediately. Tell the web so the player
|
||||
// attaches <video src> without burning its HEAD-probe retry budget.
|
||||
go func() {
|
||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
|
||||
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
||||
}
|
||||
}()
|
||||
markReady(sess.SessionID)
|
||||
return
|
||||
}
|
||||
|
||||
tcRuntime := buildTranscodeRuntime(ctx, cfg)
|
||||
if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" {
|
||||
log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID))
|
||||
failSession(sess.SessionID, sessErrFfmpegMissing, "ffmpeg/ffprobe unavailable")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -940,7 +957,7 @@ func runDaemonStart() error {
|
|||
probe, perr := engine.ProbeFile(probeCtx, tcRuntime.FFprobePath, filePath)
|
||||
cancelProbe()
|
||||
if perr != nil {
|
||||
log.Printf("[stream %s] remux probe failed: %v", agent.ShortID(sess.SessionID), perr)
|
||||
failSession(sess.SessionID, sessErrStartFailed, fmt.Sprintf("remux probe: %v", perr))
|
||||
return
|
||||
}
|
||||
tProbe := time.Now()
|
||||
|
|
@ -948,7 +965,7 @@ func runDaemonStart() error {
|
|||
src, serr := engine.NewRemuxSource(remuxCtx, filePath, probe, tcRuntime.FFmpegPath, sess.FileName)
|
||||
if serr != nil {
|
||||
remuxCancel()
|
||||
log.Printf("[stream %s] remux start failed: %v", agent.ShortID(sess.SessionID), serr)
|
||||
failSession(sess.SessionID, sessErrStartFailed, fmt.Sprintf("remux start: %v", serr))
|
||||
return
|
||||
}
|
||||
streamSrv.SetGrowingFile(src, sess.TaskID)
|
||||
|
|
@ -962,13 +979,7 @@ func runDaemonStart() error {
|
|||
log.Printf("[stream %s] remux (copy) → fMP4: %s [probe=%v spawn=%v]",
|
||||
agent.ShortID(sess.SessionID), filepath.Base(filePath),
|
||||
tProbe.Sub(tStart).Round(time.Millisecond), time.Since(tProbe).Round(time.Millisecond))
|
||||
go func() {
|
||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
|
||||
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
||||
}
|
||||
}()
|
||||
markReady(sess.SessionID)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -984,6 +995,8 @@ func runDaemonStart() error {
|
|||
Quality: sess.Quality,
|
||||
AudioIndex: sess.AudioIndex,
|
||||
BurnSubtitleIndex: sess.BurnSubtitleIndex,
|
||||
StartSec: sess.StartSec,
|
||||
Prewarm: sess.Prewarm,
|
||||
Transcode: tcRuntime,
|
||||
Cache: hlsCache,
|
||||
}, hlsCtx, hlsCancel)
|
||||
|
|
@ -1037,6 +1050,26 @@ func runDaemonStart() error {
|
|||
// Start reporter only for stream task handling
|
||||
go reporter.Run(ctx)
|
||||
|
||||
// Credential revoked mid-run (agent deleted from the dashboard): wipe the
|
||||
// stored key + agentId so a supervisor restart can't loop on a rejected
|
||||
// identity, then stop the daemon. Reconnecting needs a fresh `unarr login`.
|
||||
d.SyncClient().OnRevoked = func(err error) {
|
||||
reportAgentRevoked(cfg, err)
|
||||
cancel()
|
||||
}
|
||||
|
||||
// Legacy bootstrap: if register hands back a per-machine key, persist it so
|
||||
// the next start authenticates with the bound agent key (one-time migration;
|
||||
// also stops the server re-minting on every restart).
|
||||
d.OnAgentKeyMinted = func(newKey string) {
|
||||
cfg.Auth.APIKey = newKey
|
||||
if serr := config.Save(cfg, resolvedConfigPath()); serr != nil {
|
||||
log.Printf("[agent] could not persist per-machine key: %v", serr)
|
||||
} else {
|
||||
log.Printf("[agent] migrated to a per-machine agent key")
|
||||
}
|
||||
}
|
||||
|
||||
// Start daemon (blocks — runs sync loop)
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
|
|
@ -1076,10 +1109,34 @@ func runDaemonStart() error {
|
|||
cancelAllPlayerSessions()
|
||||
streamSrv.Shutdown(context.Background())
|
||||
cancel()
|
||||
// Registration was rejected because this agent's credential is revoked
|
||||
// (deleted from the dashboard). Wipe it and exit cleanly so the service
|
||||
// supervisor doesn't restart-loop against a 410; user must re-login.
|
||||
if agent.IsRevoked(err) {
|
||||
reportAgentRevoked(cfg, err)
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// reportAgentRevoked tells the user their agent was removed and wipes the
|
||||
// stored credential (api key + agentId) so the next start requires a fresh
|
||||
// `unarr login` (which mints a new per-machine key bound to a new agentId)
|
||||
// instead of looping against a server that keeps rejecting the old identity.
|
||||
func reportAgentRevoked(cfg config.Config, err error) {
|
||||
log.Printf("[agent] credential revoked by server (%v) — this machine was removed from your account", err)
|
||||
cfg.Auth.APIKey = ""
|
||||
cfg.Agent.ID = ""
|
||||
if serr := config.Save(cfg, resolvedConfigPath()); serr != nil {
|
||||
log.Printf("[agent] could not clear stored credential: %v", serr)
|
||||
}
|
||||
fmt.Println()
|
||||
fmt.Println(" This agent was removed from your account.")
|
||||
fmt.Println(" Run `unarr login` on this machine to reconnect it.")
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
// isAllowedStreamPath checks that filePath is within one of the directories
|
||||
// the daemon is configured to manage. This defends against a compromised API
|
||||
// server sending a path traversal payload (e.g. /etc/passwd) in StreamRequest.
|
||||
|
|
@ -1156,6 +1213,82 @@ func relocateUnreachable(filePath string, allowedRoots []string) string {
|
|||
return ""
|
||||
}
|
||||
|
||||
// Stable machine codes for the web's session-error channel
|
||||
// (POST /api/internal/agent/session-error) — mirrored by
|
||||
// SESSION_ERROR_CODES in the web repo. Only "file_missing" triggers
|
||||
// destructive self-heal on the web (it prunes the stale library row + task
|
||||
// pointer), so the resolver must never return it while the file may exist.
|
||||
const (
|
||||
pathErrRejected = "path_rejected"
|
||||
pathErrMissing = "file_missing"
|
||||
pathErrNoVideo = "no_video_file"
|
||||
sessErrFfmpegMissing = "ffmpeg_unavailable"
|
||||
sessErrStartFailed = "start_failed"
|
||||
)
|
||||
|
||||
// resolvePlayableFile validates and self-heals a web-provided source path into
|
||||
// a playable on-disk video file. Shared by the raw /stream handler and every
|
||||
// session transport (HLS / remux / direct-play) so they all behave
|
||||
// identically — before this, the HLS path replicated only the lexical remap
|
||||
// and silently diverged on stat retries (docs/plans/unarr-path-resilience.md):
|
||||
//
|
||||
// 1. Containment: the cleaned path must live under an allowed root; if not,
|
||||
// relocate it by path tail (old base path → current mount).
|
||||
// 2. Existence: os.Stat with retries (NFS can transiently fail right after a
|
||||
// remount or under load — the root of the "works on the 3rd try" stream
|
||||
// failures), then one last relocate for files that moved within a root.
|
||||
// 3. Directories resolve to their first contained video file.
|
||||
//
|
||||
// On failure returns a stable errCode: "path_rejected" means the file EXISTS
|
||||
// at the original path but outside every allowed root (an agent config
|
||||
// problem — the web must NOT prune library rows over it); "file_missing"
|
||||
// means no readable file was found anywhere; "no_video_file" is a directory
|
||||
// with nothing playable inside.
|
||||
func resolvePlayableFile(rawPath string, allowedRoots []string, logLabel string) (string, string, error) {
|
||||
filePath := filepath.Clean(rawPath)
|
||||
if !isAllowedStreamPath(filePath, allowedRoots...) {
|
||||
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" {
|
||||
log.Printf("[%s] stream self-heal: remapped %s → %s", logLabel, filePath, remapped)
|
||||
filePath = remapped
|
||||
} else if _, err := os.Stat(filePath); err == nil {
|
||||
return "", pathErrRejected, fmt.Errorf("path outside allowed dirs: %s", filePath)
|
||||
} else {
|
||||
return "", pathErrMissing, fmt.Errorf("file not found under any allowed root: %s", filePath)
|
||||
}
|
||||
}
|
||||
var info os.FileInfo
|
||||
var statErr error
|
||||
for attempt := 0; attempt < 3; attempt++ {
|
||||
if info, statErr = os.Stat(filePath); statErr == nil {
|
||||
break
|
||||
}
|
||||
if attempt < 2 {
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
if statErr != nil {
|
||||
// Last resort before failing: the file may simply have moved within
|
||||
// an allowed root — try to relocate it by path tail.
|
||||
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" {
|
||||
log.Printf("[%s] stream self-heal: relocated missing %s → %s", logLabel, filePath, remapped)
|
||||
filePath = remapped
|
||||
info, statErr = os.Stat(filePath)
|
||||
}
|
||||
}
|
||||
if statErr != nil {
|
||||
return "", pathErrMissing, fmt.Errorf("file not found after retries: %s (%v)", filePath, statErr)
|
||||
}
|
||||
if info.IsDir() {
|
||||
found := engine.FindVideoFile(filePath)
|
||||
if found == "" {
|
||||
return "", pathErrNoVideo, fmt.Errorf("no video file in directory: %s", filePath)
|
||||
}
|
||||
log.Printf("[%s] resolved directory to video file: %s", logLabel, filepath.Base(found))
|
||||
filePath = found
|
||||
}
|
||||
return filePath, "", nil
|
||||
}
|
||||
|
||||
func formatSpeedLog(bps int64) string {
|
||||
switch {
|
||||
case bps >= 1024*1024*1024:
|
||||
|
|
@ -1246,19 +1379,26 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration,
|
|||
}
|
||||
}
|
||||
|
||||
// Scan each path independently and sync per path so the server can
|
||||
// scope stale-item deletion to the correct directory prefix.
|
||||
const batchSize = 100
|
||||
totalSynced := 0
|
||||
// Scan every path, then sync ALL of them as ONE session (single
|
||||
// syncStartedAt + final isLastBatch via library.SyncBatches). Per-root
|
||||
// sessions let the server's per-agent stale cleanup reap rows of roots
|
||||
// a session never visited; one full-cycle session makes the cleanup
|
||||
// sound AND lets it reap old-base-path ghost rows (fullCycle=true —
|
||||
// only when every root scanned cleanly).
|
||||
var syncItems []agent.LibrarySyncItem
|
||||
var coveredRoots []string
|
||||
fullCycle := true
|
||||
var mergedItems []library.LibraryItem
|
||||
|
||||
for _, scanPath := range scanPaths {
|
||||
cache, err := library.Scan(ctx, scanPath, existing, scanOpts)
|
||||
if err != nil {
|
||||
log.Printf("[auto-scan] scan failed for %s: %v", scanPath, err)
|
||||
fullCycle = false
|
||||
continue
|
||||
}
|
||||
mergedItems = append(mergedItems, cache.Items...)
|
||||
coveredRoots = append(coveredRoots, scanPath)
|
||||
|
||||
if prewarmFFmpeg != "" {
|
||||
library.PrewarmSidecars(ctx, cache, library.PrewarmOptions{
|
||||
|
|
@ -1278,28 +1418,28 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration,
|
|||
log.Printf("[auto-scan] no items under %s", scanPath)
|
||||
continue
|
||||
}
|
||||
syncItems = append(syncItems, items...)
|
||||
}
|
||||
|
||||
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
|
||||
for i := 0; i < len(items); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(items) {
|
||||
end = len(items)
|
||||
}
|
||||
isLast := end >= len(items)
|
||||
|
||||
_, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
|
||||
Items: items[i:end],
|
||||
ScanPath: scanPath,
|
||||
AgentID: cfg.Agent.ID,
|
||||
IsLastBatch: isLast,
|
||||
SyncStartedAt: syncStartedAt,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[auto-scan] sync failed for %s: %v", scanPath, err)
|
||||
break
|
||||
}
|
||||
totalSynced := 0
|
||||
if len(syncItems) > 0 {
|
||||
res, err := library.SyncBatches(ctx, ac, syncItems, library.SyncOptions{
|
||||
AgentID: cfg.Agent.ID,
|
||||
ScanPath: coveredRoots[0],
|
||||
ScanRoots: coveredRoots,
|
||||
FullCycle: fullCycle,
|
||||
})
|
||||
if err != nil {
|
||||
log.Printf("[auto-scan] sync failed: %v", err)
|
||||
} else if res.Removed > 0 {
|
||||
log.Printf("[auto-scan] server removed %d stale item(s)", res.Removed)
|
||||
}
|
||||
totalSynced += len(items)
|
||||
totalSynced = res.Synced
|
||||
} else {
|
||||
// An entirely-empty library can't open a sync session (the server
|
||||
// requires ≥1 item per batch), so stale rows survive until a file
|
||||
// reappears — same trade-off as before, now explicit.
|
||||
log.Printf("[auto-scan] no items under any scan path — skipping sync")
|
||||
}
|
||||
|
||||
// Save merged cache for incremental scanning next time.
|
||||
|
|
@ -1445,6 +1585,17 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.
|
|||
deadline := time.Now().Add(60 * time.Second)
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
readyPosted := false
|
||||
postReady := func(health *agent.SessionHealth) {
|
||||
// Parent ctx so a session cancel mid-POST (user closed tab, daemon
|
||||
// shutdown) tears down the in-flight webhook instead of blocking the
|
||||
// goroutine for up to 10 s on a now-orphan call.
|
||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
if err := client.MarkSessionReady(rctx, sessionID, health); err != nil {
|
||||
log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
for {
|
||||
// Session torn down through a path that didn't cancel ctx (registry
|
||||
// replace, idle sweep, internal kill). Bail before polling further —
|
||||
|
|
@ -1453,17 +1604,29 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.
|
|||
if hsess.IsClosed() {
|
||||
return
|
||||
}
|
||||
// Cache HIT or seg-0 ready → notify + done.
|
||||
if hsess.FromCache() || hsess.ReadyCount() >= 1 {
|
||||
// Parent ctx so a session cancel mid-POST (user closed tab,
|
||||
// daemon shutdown) tears down the in-flight webhook instead of
|
||||
// blocking the goroutine for up to 10 s on a now-orphan call.
|
||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
if err := client.MarkSessionReady(rctx, sessionID); err != nil {
|
||||
log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
|
||||
// Phase 1: cache HIT or first segment ready → flip the "Preparando…"
|
||||
// UI now. Compare against WriterStartIdx, not `>= 1`: a resume
|
||||
// session (StartSec) pre-seeds readyMax to the start index, so
|
||||
// ReadyCount() is ≥ 1 before ffmpeg has written a single byte —
|
||||
// `>= 1` would fire "ready" instantly and freeze the player waiting
|
||||
// on a segment that doesn't exist yet.
|
||||
if !readyPosted && (hsess.FromCache() || hsess.ReadyCount() > hsess.WriterStartIdx()) {
|
||||
postReady(nil)
|
||||
readyPosted = true
|
||||
// Cache replay has no live encode → no telemetry to report, done.
|
||||
if hsess.FromCache() {
|
||||
return
|
||||
}
|
||||
}
|
||||
// Phase 2 (F3): once enough -stats samples accumulated (encoder past
|
||||
// its cold ramp), report ONE live-health snapshot so the player can
|
||||
// name a too-slow transcode in ~4s instead of inferring it from stalls.
|
||||
// >=4 samples ≈ 2s of encoding past seg-0; the EWMA has settled by then.
|
||||
if readyPosted {
|
||||
if st := hsess.GetTranscodeStats(); st.Samples >= 4 {
|
||||
postReady(classifyAgentHealth(st))
|
||||
return
|
||||
}
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
@ -1471,7 +1634,15 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.
|
|||
case <-ticker.C:
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID))
|
||||
if !readyPosted {
|
||||
log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID))
|
||||
return
|
||||
}
|
||||
// Ready but never got stable telemetry — report whatever we have so
|
||||
// the player isn't left without a verdict (better partial than none).
|
||||
if st := hsess.GetTranscodeStats(); st.Samples > 0 {
|
||||
postReady(classifyAgentHealth(st))
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
@ -1514,3 +1685,36 @@ func fetchAgentCert(ctx context.Context, client *agent.Client, hash string) {
|
|||
}
|
||||
log.Printf("[acme] installed cert for *.%s.%s", hash, base)
|
||||
}
|
||||
|
||||
// Realtime-ratio cutoffs for classifyAgentHealth. This is a cross-repo contract
|
||||
// with the web bottleneck classifier (src/lib/stream/bottleneck-classifier.ts):
|
||||
// - ≥ realtimeFloor → "ok" (encoder keeps up)
|
||||
// - [strugglingFloor,..) → "marginal" (barely)
|
||||
// - < strugglingFloor → "struggling" (can't) — the web fast-path commits
|
||||
// the honest overlay + pauses on this WITHOUT waiting for a stall, so the
|
||||
// floor is intentionally conservative (the web uses a looser 0.85 only once
|
||||
// a stall has already corroborated the slowdown).
|
||||
const (
|
||||
agentRealtimeFloor = 0.95
|
||||
agentStrugglingFloor = 0.75
|
||||
)
|
||||
|
||||
// classifyAgentHealth turns a live ffmpeg telemetry snapshot into the health
|
||||
// report the web side consumes (F3). The ×realtime speed is the load-bearing
|
||||
// signal: < 1.0 means the encode can't keep up with playback. An input-bound
|
||||
// hint (source read error) reclassifies the cause as the link, not the encoder.
|
||||
func classifyAgentHealth(st engine.TranscodeStats) *agent.SessionHealth {
|
||||
ratio := st.SpeedX
|
||||
var health, reason string
|
||||
switch {
|
||||
case st.InputBound && ratio < agentRealtimeFloor:
|
||||
health, reason = "struggling", "input_bound"
|
||||
case ratio >= agentRealtimeFloor:
|
||||
health, reason = "ok", "realtime"
|
||||
case ratio >= agentStrugglingFloor:
|
||||
health, reason = "marginal", "transcode"
|
||||
default:
|
||||
health, reason = "struggling", "transcode"
|
||||
}
|
||||
return &agent.SessionHealth{Health: health, RealtimeRatio: ratio, Reason: reason}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,12 +75,19 @@ func runInit(apiURLOverride string) error {
|
|||
|
||||
apiKey := cfg.Auth.APIKey
|
||||
|
||||
// Resolve the agentId up front so browser-authorize can bind the minted
|
||||
// per-machine key to it.
|
||||
agentID := cfg.Agent.ID
|
||||
if agentID == "" {
|
||||
agentID = uuid.New().String()
|
||||
}
|
||||
|
||||
if apiKey == "" {
|
||||
// Try browser-based auth first (like Claude Code / GitHub CLI)
|
||||
fmt.Println(" Opening browser to connect your account...")
|
||||
fmt.Println()
|
||||
|
||||
browserKey, browserErr := browserAuth(apiURL)
|
||||
browserKey, browserErr := browserAuth(apiURL, agentID)
|
||||
if browserErr == nil && strings.HasPrefix(browserKey, "tc_") {
|
||||
apiKey = browserKey
|
||||
green.Println(" ✓ Connected via browser")
|
||||
|
|
@ -127,11 +134,6 @@ func runInit(apiURLOverride string) error {
|
|||
// Validate API key by registering with the server
|
||||
fmt.Print(" Verifying API key... ")
|
||||
|
||||
agentID := cfg.Agent.ID
|
||||
if agentID == "" {
|
||||
agentID = uuid.New().String()
|
||||
}
|
||||
|
||||
hostname, _ := os.Hostname()
|
||||
agentName := cfg.Agent.Name
|
||||
if agentName == "" {
|
||||
|
|
@ -150,9 +152,21 @@ func runInit(apiURLOverride string) error {
|
|||
if err != nil {
|
||||
color.Red("FAILED")
|
||||
fmt.Println()
|
||||
// Stored credential was revoked (machine deleted from the dashboard) —
|
||||
// drop it so a re-run mints a fresh identity.
|
||||
if agent.IsRevoked(err) {
|
||||
clearRevokedIdentity(cfg, "init")
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("API key validation failed: %w", err)
|
||||
}
|
||||
|
||||
// Manual-paste bootstrap: swap to the minted per-machine key, discard the
|
||||
// general key the user pasted.
|
||||
if resp.AgentKey != "" {
|
||||
apiKey = resp.AgentKey
|
||||
}
|
||||
|
||||
green.Println("OK")
|
||||
fmt.Printf(" Connected as %s (%s) [%s]\n", resp.User.Name, resp.User.Email, strings.ToUpper(resp.User.Plan))
|
||||
fmt.Println()
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
|
|
@ -16,6 +17,20 @@ import (
|
|||
"github.com/torrentclaw/unarr/internal/config"
|
||||
)
|
||||
|
||||
// clearRevokedIdentity wipes the stored credential (api key + agentId) after the
|
||||
// server reports this machine's registration was revoked, so a re-run of the
|
||||
// given command mints a fresh identity instead of looping against a dead key.
|
||||
func clearRevokedIdentity(cfg config.Config, retryCmd string) {
|
||||
cfg.Auth.APIKey = ""
|
||||
cfg.Agent.ID = ""
|
||||
if err := config.Save(cfg, resolvedConfigPath()); err != nil {
|
||||
log.Printf("could not clear revoked credential: %v", err)
|
||||
}
|
||||
fmt.Println(" This machine's previous registration was removed from your account.")
|
||||
fmt.Printf(" Run `unarr %s` again to reconnect it as a new agent.\n", retryCmd)
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
func newLoginCmd() *cobra.Command {
|
||||
var apiURL string
|
||||
|
||||
|
|
@ -70,11 +85,18 @@ func runLogin(apiURLOverride string) error {
|
|||
|
||||
var apiKey string
|
||||
|
||||
// Resolve the agentId up front so the browser-authorize flow can bind the
|
||||
// minted per-machine key to it.
|
||||
agentID := cfg.Agent.ID
|
||||
if agentID == "" {
|
||||
agentID = uuid.New().String()
|
||||
}
|
||||
|
||||
// Try browser-based auth first
|
||||
fmt.Println(" Opening browser to connect your account...")
|
||||
fmt.Println()
|
||||
|
||||
browserKey, browserErr := browserAuth(apiURL)
|
||||
browserKey, browserErr := browserAuth(apiURL, agentID)
|
||||
if browserErr == nil && strings.HasPrefix(browserKey, "tc_") {
|
||||
apiKey = browserKey
|
||||
green.Println(" ✓ Connected via browser")
|
||||
|
|
@ -120,11 +142,6 @@ func runLogin(apiURLOverride string) error {
|
|||
|
||||
fmt.Print(" Verifying API key... ")
|
||||
|
||||
agentID := cfg.Agent.ID
|
||||
if agentID == "" {
|
||||
agentID = uuid.New().String()
|
||||
}
|
||||
|
||||
hostname, _ := os.Hostname()
|
||||
agentName := cfg.Agent.Name
|
||||
if agentName == "" {
|
||||
|
|
@ -143,9 +160,21 @@ func runLogin(apiURLOverride string) error {
|
|||
if err != nil {
|
||||
color.Red("FAILED")
|
||||
fmt.Println()
|
||||
// The stored credential was revoked (this machine was deleted from the
|
||||
// dashboard). Drop it so the next run mints a fresh identity.
|
||||
if agent.IsRevoked(err) {
|
||||
clearRevokedIdentity(cfg, "login")
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("API key validation failed: %w", err)
|
||||
}
|
||||
|
||||
// Manual-paste bootstrap: the server minted a per-machine key bound to this
|
||||
// agentId. Swap to it and discard the general key the user pasted.
|
||||
if resp.AgentKey != "" {
|
||||
apiKey = resp.AgentKey
|
||||
}
|
||||
|
||||
green.Println("OK")
|
||||
fmt.Printf(" Connected as %s (%s) [%s]\n", resp.User.Name, resp.User.Email, strings.ToUpper(resp.User.Plan))
|
||||
fmt.Println()
|
||||
|
|
|
|||
98
internal/cmd/resolve_playable_test.go
Normal file
98
internal/cmd/resolve_playable_test.go
Normal file
|
|
@ -0,0 +1,98 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestResolvePlayableFile(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
mkfile(t, filepath.Join(root, "Acme Show", "Season 01", "ep.mkv"))
|
||||
roots := []string{root}
|
||||
|
||||
t.Run("allowed path resolves to itself", func(t *testing.T) {
|
||||
want := filepath.Join(root, "Acme Show", "Season 01", "ep.mkv")
|
||||
got, code, err := resolvePlayableFile(want, roots, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error (%s): %v", code, err)
|
||||
}
|
||||
if got != want {
|
||||
t.Errorf("got %q want %q", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("old base path relocates onto current root", func(t *testing.T) {
|
||||
got, code, err := resolvePlayableFile("/old/base/Acme Show/Season 01/ep.mkv", roots, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error (%s): %v", code, err)
|
||||
}
|
||||
want := filepath.Join(root, "Acme Show", "Season 01", "ep.mkv")
|
||||
if got != want {
|
||||
t.Errorf("got %q want %q", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("deleted file under old base is file_missing, never path_rejected", func(t *testing.T) {
|
||||
// The incident shape (2026-06-10): web hands a stale host path
|
||||
// (/mnt/nas/…) whose file was deleted — the docker agent can't see the
|
||||
// original path AND no tail relocates. file_missing tells the web to
|
||||
// prune the stale row; path_rejected would block that self-heal.
|
||||
_, code, err := resolvePlayableFile("/old/base/Acme Show/Season 01/gone.mkv", roots, "test")
|
||||
if err == nil {
|
||||
t.Fatal("expected error for deleted file")
|
||||
}
|
||||
if code != pathErrMissing {
|
||||
t.Errorf("code = %q, want %q", code, pathErrMissing)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("existing file outside roots is path_rejected", func(t *testing.T) {
|
||||
outside := t.TempDir()
|
||||
// 1-segment-deep on purpose: a ≥3-segment tail could legitimately
|
||||
// relocate INTO the root if a same-named file existed there.
|
||||
mkfile(t, filepath.Join(outside, "leak.mkv"))
|
||||
_, code, err := resolvePlayableFile(filepath.Join(outside, "leak.mkv"), roots, "test")
|
||||
if err == nil {
|
||||
t.Fatal("expected error for out-of-root file")
|
||||
}
|
||||
if code != pathErrRejected {
|
||||
t.Errorf("code = %q, want %q", code, pathErrRejected)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("missing file inside an allowed root is file_missing", func(t *testing.T) {
|
||||
_, code, err := resolvePlayableFile(filepath.Join(root, "Acme Show", "Season 01", "gone.mkv"), roots, "test")
|
||||
if err == nil {
|
||||
t.Fatal("expected error for missing file")
|
||||
}
|
||||
if code != pathErrMissing {
|
||||
t.Errorf("code = %q, want %q", code, pathErrMissing)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("directory resolves to its video file", func(t *testing.T) {
|
||||
got, code, err := resolvePlayableFile(filepath.Join(root, "Acme Show", "Season 01"), roots, "test")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error (%s): %v", code, err)
|
||||
}
|
||||
want := filepath.Join(root, "Acme Show", "Season 01", "ep.mkv")
|
||||
if got != want {
|
||||
t.Errorf("got %q want %q", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("directory without video is no_video_file", func(t *testing.T) {
|
||||
empty := filepath.Join(root, "Empty Show")
|
||||
if err := os.MkdirAll(empty, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_, code, err := resolvePlayableFile(empty, roots, "test")
|
||||
if err == nil {
|
||||
t.Fatal("expected error for empty directory")
|
||||
}
|
||||
if code != pathErrNoVideo {
|
||||
t.Errorf("code = %q, want %q", code, pathErrNoVideo)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -192,6 +192,17 @@ func Execute() {
|
|||
}
|
||||
|
||||
// loadConfig loads config once (lazy initialization).
|
||||
// resolvedConfigPath returns the config file the CLI actually reads/writes,
|
||||
// honouring the global --config flag. Use this for every Save so a revocation
|
||||
// wipe or key migration lands in the right file (e.g. the dev-local agent's
|
||||
// ~/.config/unarr-dev/config.toml), not always the default path.
|
||||
func resolvedConfigPath() string {
|
||||
if cfgFile != "" {
|
||||
return cfgFile
|
||||
}
|
||||
return config.FilePath()
|
||||
}
|
||||
|
||||
func loadConfig() config.Config {
|
||||
if cfgLoaded {
|
||||
return appCfg
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import (
|
|||
"sort"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/fatih/color"
|
||||
"github.com/spf13/cobra"
|
||||
|
|
@ -40,20 +39,40 @@ to see available quality upgrades.`,
|
|||
if showStatus {
|
||||
return runScanStatus()
|
||||
}
|
||||
cfg := loadConfig()
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
// All scanned roots feed ONE sync session (single syncStartedAt +
|
||||
// final isLastBatch) so the server's stale-row cleanup sees the
|
||||
// whole cycle at once. fullCycle only without an explicit path —
|
||||
// a subtree scan must never let the server reap outside it.
|
||||
if len(args) == 0 {
|
||||
cfg := loadConfig()
|
||||
paths := library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath)
|
||||
if len(paths) == 0 {
|
||||
return fmt.Errorf("usage: unarr scan <path>\n\nNo scan paths configured. Provide a path or set up downloads.dir via 'unarr init'")
|
||||
}
|
||||
var items []agent.LibrarySyncItem
|
||||
for _, p := range paths {
|
||||
if err := runScan(p, workers, ffprobe, noSync); err != nil {
|
||||
cache, err := runScan(ctx, cfg, p, workers, ffprobe)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
items = append(items, library.BuildSyncItems(cache)...)
|
||||
}
|
||||
if noSync || jsonOut {
|
||||
return nil
|
||||
}
|
||||
return syncToServer(ctx, cfg, items, paths, true)
|
||||
}
|
||||
cache, err := runScan(ctx, cfg, args[0], workers, ffprobe)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if noSync || jsonOut {
|
||||
return nil
|
||||
}
|
||||
return runScan(args[0], workers, ffprobe, noSync)
|
||||
return syncToServer(ctx, cfg, library.BuildSyncItems(cache), []string{args[0]}, false)
|
||||
},
|
||||
}
|
||||
|
||||
|
|
@ -65,18 +84,20 @@ to see available quality upgrades.`,
|
|||
return cmd
|
||||
}
|
||||
|
||||
func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error {
|
||||
// runScan walks one root, saves the cache and prewarms sidecars. Syncing to
|
||||
// the server is the CALLER's job (RunE) — all roots of an invocation feed one
|
||||
// sync session via syncToServer, so per-root sessions can't trick the server
|
||||
// into reaping rows of roots the session never visited.
|
||||
func runScan(ctx context.Context, cfg config.Config, dirPath string, workers int, ffprobePath string) (*library.LibraryCache, error) {
|
||||
// Validate path
|
||||
info, err := os.Stat(dirPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("path not found: %s", dirPath)
|
||||
return nil, fmt.Errorf("path not found: %s", dirPath)
|
||||
}
|
||||
if !info.IsDir() {
|
||||
return fmt.Errorf("not a directory: %s", dirPath)
|
||||
return nil, fmt.Errorf("not a directory: %s", dirPath)
|
||||
}
|
||||
|
||||
cfg := loadConfig()
|
||||
|
||||
// Resolve workers: flag → config → default 8
|
||||
if workers == 0 {
|
||||
workers = cfg.Library.Workers
|
||||
|
|
@ -93,10 +114,6 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error
|
|||
// Load existing cache for incremental scanning
|
||||
existing, _ := library.LoadCache()
|
||||
|
||||
// Context with signal handling
|
||||
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
defer stop()
|
||||
|
||||
bold := color.New(color.Bold)
|
||||
bold.Printf("\n Scanning %s...\n\n", dirPath)
|
||||
|
||||
|
|
@ -114,14 +131,14 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error
|
|||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("scan failed: %w", err)
|
||||
return nil, fmt.Errorf("scan failed: %w", err)
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "\r\033[K") // clear progress line
|
||||
|
||||
// Save cache
|
||||
if err := library.SaveCache(cache); err != nil {
|
||||
return fmt.Errorf("save cache: %w", err)
|
||||
return nil, fmt.Errorf("save cache: %w", err)
|
||||
}
|
||||
|
||||
// Remember scan path in config
|
||||
|
|
@ -133,11 +150,12 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error
|
|||
// Print summary
|
||||
printScanSummary(cache)
|
||||
|
||||
// JSON output mode
|
||||
// JSON output mode — emit the cache and skip the prewarm (the caller skips
|
||||
// the sync via the same flag).
|
||||
if jsonOut {
|
||||
enc := json.NewEncoder(os.Stdout)
|
||||
enc.SetIndent("", " ")
|
||||
return enc.Encode(cache)
|
||||
return cache, enc.Encode(cache)
|
||||
}
|
||||
|
||||
// Pre-extract sidecars (text subs → WebVTT, panel frames → JPEG) into a hidden
|
||||
|
|
@ -162,15 +180,14 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error
|
|||
}
|
||||
}
|
||||
|
||||
// Sync to server
|
||||
if !noSync {
|
||||
return syncToServer(ctx, cfg, cache)
|
||||
}
|
||||
|
||||
return nil
|
||||
return cache, nil
|
||||
}
|
||||
|
||||
func syncToServer(ctx context.Context, cfg config.Config, cache *library.LibraryCache) error {
|
||||
// syncToServer uploads the scanned items of THIS invocation as one sync
|
||||
// session. roots lists every root the invocation scanned; fullCycle marks a
|
||||
// no-args run that covered all configured roots (the server may then reap
|
||||
// stale rows regardless of prefix — see LibrarySyncRequest.FullCycle).
|
||||
func syncToServer(ctx context.Context, cfg config.Config, items []agent.LibrarySyncItem, roots []string, fullCycle bool) error {
|
||||
apiKey := apiKeyFlag
|
||||
if apiKey == "" {
|
||||
apiKey = cfg.Auth.APIKey
|
||||
|
|
@ -182,50 +199,28 @@ func syncToServer(ctx context.Context, cfg config.Config, cache *library.Library
|
|||
|
||||
ac := agent.NewClient(cfg.Auth.APIURL, apiKey, "unarr/"+Version)
|
||||
|
||||
items := library.BuildSyncItems(cache)
|
||||
|
||||
if len(items) == 0 {
|
||||
color.Yellow("\n No valid items to sync.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send in batches of 100
|
||||
const batchSize = 100
|
||||
totalSynced := 0
|
||||
totalMatched := 0
|
||||
totalRemoved := 0
|
||||
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
|
||||
|
||||
for i := 0; i < len(items); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(items) {
|
||||
end = len(items)
|
||||
}
|
||||
batch := items[i:end]
|
||||
isLast := end >= len(items)
|
||||
|
||||
fmt.Fprintf(os.Stderr, "\r Syncing %d/%d items...\033[K", end, len(items))
|
||||
|
||||
resp, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
|
||||
Items: batch,
|
||||
ScanPath: cache.Path,
|
||||
AgentID: cfg.Agent.ID,
|
||||
IsLastBatch: isLast,
|
||||
SyncStartedAt: syncStartedAt,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("sync failed: %w", err)
|
||||
}
|
||||
|
||||
totalSynced += resp.Synced
|
||||
totalMatched += resp.Matched
|
||||
totalRemoved += resp.Removed
|
||||
res, err := library.SyncBatches(ctx, ac, items, library.SyncOptions{
|
||||
AgentID: cfg.Agent.ID,
|
||||
ScanPath: roots[0],
|
||||
ScanRoots: roots,
|
||||
FullCycle: fullCycle,
|
||||
OnProgress: func(sent, total int) {
|
||||
fmt.Fprintf(os.Stderr, "\r Syncing %d/%d items...\033[K", sent, total)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("sync failed: %w", err)
|
||||
}
|
||||
|
||||
fmt.Fprintf(os.Stderr, "\r\033[K")
|
||||
|
||||
green := color.New(color.FgGreen)
|
||||
green.Printf("\n ✓ Synced %d items (%d matched, %d removed)\n", totalSynced, totalMatched, totalRemoved)
|
||||
green.Printf("\n ✓ Synced %d items (%d matched, %d removed)\n", res.Synced, res.Matched, res.Removed)
|
||||
|
||||
apiURL := strings.TrimSuffix(cfg.Auth.APIURL, "/")
|
||||
fmt.Printf(" → View upgrades at %s/library\n\n", apiURL)
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package cmd
|
||||
|
||||
// Version is the CLI version. Overridden by goreleaser ldflags at release time.
|
||||
var Version = "1.0.4-beta"
|
||||
var Version = "1.0.9-beta"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue