package cmd import ( "context" "errors" "fmt" "log" "os" "os/signal" "path/filepath" "strings" "syscall" "time" "github.com/fatih/color" "github.com/spf13/cobra" "github.com/torrentclaw/unarr/internal/agent" "github.com/torrentclaw/unarr/internal/config" "github.com/torrentclaw/unarr/internal/engine" "github.com/torrentclaw/unarr/internal/funnel" "github.com/torrentclaw/unarr/internal/library" "github.com/torrentclaw/unarr/internal/library/mediainfo" "github.com/torrentclaw/unarr/internal/usenet/download" "github.com/torrentclaw/unarr/internal/vpn" ) // newStartCmd creates the top-level `unarr start` command. func newStartCmd() *cobra.Command { return &cobra.Command{ Use: "start", Short: "Start the download daemon (foreground)", Long: `Start the unarr daemon in the foreground. Registers with the server, receives download tasks via periodic sync, and executes them using the configured download method. Supports torrent, debrid, and usenet downloads concurrently. The daemon syncs state with the server every 3s when someone is viewing the web dashboard, or every 60s when idle. Press Ctrl+C to stop gracefully — active downloads get up to 30 seconds to finish. Requires: API key, agent ID, and download directory (run 'unarr init' first). To run as a background service, use 'unarr daemon install' instead.`, Example: ` unarr start unarr start --config /path/to/config.toml`, RunE: func(cmd *cobra.Command, args []string) error { return runDaemonStart() }, } } // newStopCmd creates the top-level `unarr stop` command. func newStopCmd() *cobra.Command { return &cobra.Command{ Use: "stop", Short: "Stop the running daemon", Long: `Stop the unarr daemon gracefully. Reads the daemon PID from the state file and sends a graceful stop signal. Works regardless of whether the daemon was started in the foreground or as a service. To stop a service-managed daemon and prevent auto-restart, use 'unarr daemon stop' instead.`, Example: ` unarr stop`, RunE: func(cmd *cobra.Command, args []string) error { return stopDaemonByPID() }, } } // newDaemonCmd creates `unarr daemon` for administrative subcommands. func newDaemonCmd() *cobra.Command { cmd := &cobra.Command{ Use: "daemon ", Short: "Manage the daemon as a system service", Long: `Install, control and inspect the unarr daemon as a system service. Linux: systemd user service (~/.config/systemd/user/unarr.service) macOS: launchd agent (~/Library/LaunchAgents/com.torrentclaw.unarr.plist) Windows: Task Scheduler task (runs at logon)`, Example: ` unarr daemon install unarr daemon start unarr daemon status unarr daemon logs -f unarr daemon reload unarr daemon restart unarr daemon stop unarr daemon uninstall`, } cmd.AddCommand( newDaemonInstallCmdReal(), newDaemonUninstallCmdReal(), newDaemonStartCmd(), newDaemonStopCmd(), newDaemonRestartCmd(), newDaemonSvcStatusCmd(), newDaemonLogsCmd(), newDaemonReloadCmd(), ) return cmd } func runDaemonStart() error { cfg := loadConfig() bold := color.New(color.Bold) // Validate config if cfg.Auth.APIKey == "" { return fmt.Errorf("no API key configured — run 'unarr init' first") } if cfg.Agent.ID == "" { return fmt.Errorf("no agent ID — run 'unarr init' first") } if cfg.Download.Dir == "" { return fmt.Errorf("no download directory — run 'unarr init' first") } // Validate configured paths are safe if err := cfg.ValidatePaths(); err != nil { return fmt.Errorf("unsafe configuration: %w", err) } // Ensure download dir exists if err := os.MkdirAll(cfg.Download.Dir, 0o755); err != nil { return fmt.Errorf("create download dir: %w", err) } // Clean up stale resume files (>7 days old) resumeDir := filepath.Join(config.DataDir(), "resume") if removed := download.CleanStaleFiles(resumeDir, 7*24*time.Hour); removed > 0 { log.Printf("Cleaned %d stale resume file(s)", removed) } fmt.Println() bold.Println(" unarr Daemon") fmt.Println() userAgent := "unarr/" + Version // Probe HW accel + derive a sensible transcode resolution cap. The cap // is what the web side uses to decide whether the user should pre-empt // transcoding by downloading a smaller version (4K source on a software // libx264-only host is the canonical case where pre-download wins). // // Use the full diagnostic (encoders + devices + ffmpeg version) instead // of just the picked backend — the extra fields ride along in the // register payload so the web "Diagnose transcoder" modal can show *why* // libx264 was selected on a host with a GPU (e.g. brew's ffmpeg without // --enable-nvenc). 10 s ceiling so a hung ffmpeg binary can't stall // startup forever. ffmpegResolved, _ := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath) probeCtx, probeCancel := context.WithTimeout(context.Background(), 10*time.Second) defer probeCancel() // guard against a panic inside DetectHWAccelDiagnostic hwDiag := engine.DetectHWAccelDiagnostic(probeCtx, ffmpegResolved) log.Println(hwDiag.LogLine()) hwAccelPick := hwDiag.Pick // Measure the real transcode ceiling instead of guessing from the backend. // HW encoders return 2160 instantly; a software-only host runs a bounded // encode benchmark so a weak NAS/CPU reports the rung it can actually // sustain (720/480) and the web side routes oversized sources to an // external player instead of a stuttering transcode. This blocks // registration on a software host, so it's bounded tight (3 rungs × 6 s = // 18 s worst case; <1 s on a capable box that passes the first rung). Own // timeout — the 10 s probeCtx above is sized for the quick diagnostic. benchCtx, benchCancel := context.WithTimeout(context.Background(), 20*time.Second) maxTranscodeHeight := engine.BenchmarkMaxTranscodeHeight(benchCtx, ffmpegResolved, hwAccelPick) benchCancel() // Warm the tonemap capability caches off the hot path. The libplacebo probe // actually RUNS the filter (Vulkan device init ~1.7 s), so doing it lazily // in buildTranscodeRuntime would tax the FIRST stream session and risk its // setup timeout. A real session arrives seconds-to-minutes after startup, so // a background warm has finished by then; if one races in first, the cache's // own mutex makes the concurrent cold call safe (both compute the same bool). if cfg.Download.Transcode.Enabled && ffmpegResolved != "" { go func() { engine.FFmpegSupportsLibplacebo(ffmpegResolved) engine.FFmpegSupportsZscale(ffmpegResolved) }() } // Create daemon config daemonCfg := agent.DaemonConfig{ AgentID: cfg.Agent.ID, AgentName: cfg.Agent.Name, Version: Version, DownloadDir: cfg.Download.Dir, StreamPort: cfg.Download.StreamPort, LanIP: engine.LanIP(), TailscaleIP: engine.TailscaleIP(), CanDelete: cfg.Library.AllowDelete, ScanPaths: library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath), HWAccel: string(hwAccelPick), MaxTranscodeHeight: maxTranscodeHeight, FFmpegVersion: hwDiag.FFmpegVersion, FFmpegPath: hwDiag.FFmpegPath, HWEncoders: hwDiag.Encoders, HWDevices: hwDiag.Devices, AutoUpgrade: cfg.Daemon.AutoUpgradeEnabled(), Downlink: cfg.Daemon.Downlink, } // Create HTTP client with mirror failover so a `.com` block-out rolls // over to `.to` / .onion without restarting the daemon. agentClient := newAgentClientFromConfig(cfg, userAgent) log.Printf("Transport: HTTP sync → %s (mirrors: %d)", cfg.Auth.APIURL, len(cfg.Auth.Mirrors)) // Create daemon d := agent.NewDaemon(daemonCfg, agentClient) // Start SIGUSR1 reload watcher (unix only, no-op on Windows) startReloadWatcher(&ReloadableConfig{Daemon: d}) // Daemon-scoped context — cancelled on shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Parse speed limits maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed) maxUl, _ := config.ParseSpeed(cfg.Download.MaxUploadSpeed) // Parse torrent timeouts metaTimeout, _ := time.ParseDuration(cfg.Download.MetadataTimeout) stallTimeout, _ := time.ParseDuration(cfg.Download.StallTimeout) // Parse the seeding time target (0/"" = no time target — ratio-only or forever) seedTime, _ := time.ParseDuration(cfg.Download.SeedTime) // Create progress reporter — only used for stream tasks (handleStreamTask) // The sync goroutine handles all regular progress reporting. statusInterval, _ := time.ParseDuration(cfg.Daemon.StatusInterval) if statusInterval == 0 { statusInterval = 3 * time.Second } reporter := engine.NewProgressReporter(agentClient, statusInterval) reporter.SetWatchingFunc(func() bool { return d.Watching.Load() }) // Managed-VPN add-on: bring up the in-process WireGuard split-tunnel before // the torrent client so peer + tracker traffic routes through it. Failure is // non-fatal — log and download in the clear (better than refusing to run). var vpnTunnel *vpn.Tunnel if cfg.Download.VPN.ConfigFile != "" { // Self-hosted / personal-VPN mode: read a local .conf directly. raw, rerr := os.ReadFile(cfg.Download.VPN.ConfigFile) if rerr != nil { log.Printf("[vpn] could not read config_file %q (%v) — downloading in the clear", cfg.Download.VPN.ConfigFile, rerr) } else if t, uerr := vpn.Up(string(raw)); uerr != nil { log.Printf("[vpn] tunnel failed to start from config_file (%v) — downloading in the clear", uerr) } else { vpnTunnel = t defer vpnTunnel.Close() log.Printf("[vpn] managed VPN active (local config_file) — torrent traffic split-tunnelled through WireGuard") } } else if cfg.Download.VPN.Enabled { apiURL := cfg.Auth.APIURL if apiURL == "" { apiURL = "https://torrentclaw.com" } fetchCtx, cancel := context.WithTimeout(context.Background(), 25*time.Second) conf, ferr := vpn.FetchConfig(fetchCtx, apiURL, cfg.Auth.APIKey, "unarr/"+Version, cfg.Agent.ID, false) cancel() var fe *vpn.FetchError switch { case ferr != nil && errors.As(ferr, &fe) && fe.Code == vpn.ErrSlotOnDevice: log.Printf("[vpn] the single WireGuard slot is already held by another unarr agent — this one downloads in the clear. To protect this machine too, set up OpenVPN on it (1 agent uses WireGuard, the rest use OpenVPN — up to 10). See https://torrentclaw.com/vpn") case ferr != nil: log.Printf("[vpn] could not enable VPN (%v) — downloading in the clear", ferr) default: if t, uerr := vpn.Up(conf); uerr != nil { log.Printf("[vpn] tunnel failed to start (%v) — downloading in the clear", uerr) } else { vpnTunnel = t defer vpnTunnel.Close() log.Printf("[vpn] managed VPN active — torrent traffic split-tunnelled through WireGuard") } } } // Record VPN split-tunnel state for `unarr vpn status`. if vpnTunnel != nil { mode := "managed" if cfg.Download.VPN.ConfigFile != "" { mode = "self-hosted" } d.SetVPNState(true, mode, vpnTunnel.Endpoint) } // Create torrent downloader torrentDl, err := engine.NewTorrentDownloader(engine.TorrentConfig{ DataDir: cfg.Download.Dir, PieceCompletionDir: config.DataDir(), // keep piece-completion DB off NFS/SMB mounts MetadataTimeout: metaTimeout, StallTimeout: stallTimeout, MaxTimeout: 0, MaxDownloadRate: maxDl, MaxUploadRate: maxUl, ListenPort: cfg.Download.ListenPort, SeedEnabled: cfg.Download.SeedEnabled, SeedRatio: cfg.Download.SeedRatio, SeedTime: seedTime, VPNTunnel: vpnTunnel, }) if err != nil { return fmt.Errorf("create torrent downloader: %w", err) } if maxDl > 0 || maxUl > 0 { dlStr, ulStr := "unlimited", "unlimited" if maxDl > 0 { dlStr = formatSpeedLog(maxDl) } if maxUl > 0 { ulStr = formatSpeedLog(maxUl) } log.Printf("Speed limits: download=%s upload=%s", dlStr, ulStr) } if cfg.Download.SeedEnabled { switch { case cfg.Download.SeedRatio > 0 && seedTime > 0: log.Printf("[torrent] seeding enabled (stop at ratio %.2f or %s, whichever first)", cfg.Download.SeedRatio, seedTime) case cfg.Download.SeedRatio > 0: log.Printf("[torrent] seeding enabled (stop at ratio %.2f)", cfg.Download.SeedRatio) case seedTime > 0: log.Printf("[torrent] seeding enabled (stop after %s)", seedTime) default: log.Printf("[torrent] seeding enabled (no ratio/time target — seeds until shutdown)") } } // Create debrid downloader debridDl := engine.NewDebridDownloader() usenetDl := engine.NewUsenetDownloader(agentClient) // Pre-flight disk reserve: refuse a download that would leave less than this // many bytes free, so a download never fills the filesystem to 0 mid-write. minFreeBytes := int64(cfg.Download.MinFreeDiskMB) << 20 torrentDl.SetMinFreeBytes(minFreeBytes) debridDl.SetMinFreeBytes(minFreeBytes) usenetDl.SetMinFreeBytes(minFreeBytes) log.Printf("[disk] download free-space reserve: %d MiB", cfg.Download.MinFreeDiskMB) // Create download manager manager := engine.NewManager(engine.ManagerConfig{ MaxConcurrent: cfg.Download.MaxConcurrent, OutputDir: cfg.Download.Dir, Notifications: cfg.Notifications.Enabled, Organize: engine.OrganizeConfig{ Enabled: cfg.Organize.Enabled, MoviesDir: cfg.Organize.MoviesDir, TVShowsDir: cfg.Organize.TVShowsDir, OutputDir: cfg.Download.Dir, }, }, reporter, torrentDl, debridDl, usenetDl) // Resume store: persist in-flight downloads so a daemon restart can re-submit // them (the downloaders resume the partial data). Wire it before any Submit. taskStore := agent.NewActiveTaskStore() manager.SetTaskStore(taskStore) // Create persistent stream server streamSrv := engine.NewStreamServer(cfg.Download.StreamPort) streamSrv.SetUPnPEnabled(cfg.Download.EnableUPnP) // Wire ffmpeg so /thumbnail can extract single frames for the web's "file // characteristics" panel (frames on demand). Empty = thumbnails 503. streamSrv.SetFFmpegPath(ffmpegResolved) // Write-through cache extracted WebVTT into the hidden ".unarr" sidecar dir so // /sub serves instantly (and giant remuxes that exceed the on-demand timeout // work once the scan prewarm has filled the cache). Default true. streamSrv.SetCacheSubtitles(cfg.Library.CacheSubtitles) streamSrv.SetCacheThumbnails(cfg.Library.CacheThumbnails) // Tell /trickplay which tile width the scan prewarm built the sprite at (the // agent owns the width; the web requests by path only). 0 = disabled → 404. trickW := 0 if cfg.Library.Trickplay.Enabled { if trickW = cfg.Library.Trickplay.Width; trickW <= 0 { trickW = 240 } } streamSrv.SetTrickplayWidth(trickW) // Self-heal a host→container base-path skew for the path-scoped handlers // (/thumbnail, /trickplay, /sub), mirroring the /stream + /hls remap. Without // it, a docker agent whose web DB holds host paths (/mnt/nas/peliculas/…) but // mounts that media at /downloads returns 404 for every scrubber frame / // trickplay sprite / external subtitle. Same allowed roots + relocate logic. // NOTE: relocateUnreachable needs a ≥3-segment path tail, so a FLAT media // layout (file directly under the root) is not self-healed here — those // sidecars 404 on a docker agent with a host→container skew until a re-scan // rewrites the DB path. Same limitation as the /stream self-heal. streamSrv.SetPathResolver(func(p string) string { p = filepath.Clean(p) roots := streamAllowedRoots(cfg) if isAllowedStreamPath(p, roots...) { return p } return relocateUnreachable(p, roots) // "" when not locatable → caller 404s }) streamSrv.SetRequireStreamToken(cfg.Download.RequireStreamToken) // Report the stream-token signing key ONLY when enforcing, so the web's // "secret present → mint HLS token" signal accurately means "this agent // verifies tokens". Reporting it with enforcement off would make the web // mint HLS path tokens the agent never peels → 404. Set before Register(). if cfg.Download.RequireStreamToken { d.UpdateStreamSecret(streamSrv.StreamSecretHex()) } // CORS extras = operator config + dynamic mirror list from /api/mirrors. // Without the mirror merge, a user playing from `torrentclaw.to` (or any // future mirror) hits the daemon, gets 200 + body, but no // `Access-Control-Allow-Origin` → browser drops the response → player // reports "404 todos los canales". Fetching /api/mirrors at startup // future-proofs against mirror additions without a CLI rebuild. corsExtras := append([]string(nil), cfg.Download.CORSExtraOrigins...) corsExtras = append(corsExtras, mirrorCORSOrigins(ctx, cfg, userAgent)...) streamSrv.SetCORSAllowedOrigins(corsExtras) // HTTPS stream listener (agent-TLS feature): only armed when a certificate is // present on disk — without a valid cert there is nothing to serve over TLS, // and the HTTP listener + funnel keep working. The future ACME broker writes // the cert pair to certs/agent.{crt,key} under the agent state dir. if cfg.Download.HTTPSStreamPort > 0 { certPath := filepath.Join(config.DataDir(), "certs", "agent.crt") keyPath := filepath.Join(config.DataDir(), "certs", "agent.key") if err := streamSrv.LoadTLSCertificateFromFiles(certPath, keyPath); err != nil { log.Printf("[stream] HTTPS disabled — no usable certificate at %s (%v)", certPath, err) } else { streamSrv.EnableTLS(cfg.Download.HTTPSStreamPort) log.Printf("[stream] HTTPS armed on port %d with certificate %s", cfg.Download.HTTPSStreamPort, certPath) } } // Reap HLS tmpdirs left over from a previous daemon run before we start // accepting new sessions. The in-memory registry doesn't survive a // restart, so without this disk usage grows unbounded across restarts. if err := engine.CleanupHLSOrphanDirs(); err != nil { log.Printf("[hls] orphan tmpdir cleanup: %v", err) } // Persistent HLS segment cache — survives across sessions so re-plays // of the same file at the same quality skip ffmpeg entirely. Off when // hls_cache.enabled = false; size cap from hls_cache.size_gb; path from // hls_cache.dir (defaults to ~/.cache/unarr/hls-cache). var hlsCache *engine.HLSCache if cfg.Download.HLSCache.Enabled { cacheDir := cfg.Download.HLSCache.Dir if cacheDir == "" { if base, err := os.UserCacheDir(); err == nil { cacheDir = filepath.Join(base, "unarr", "hls-cache") } else { cacheDir = filepath.Join(os.TempDir(), "unarr-hls-cache") } } c, err := engine.NewHLSCache(cacheDir, cfg.Download.HLSCache.SizeGB) if err != nil { log.Printf("[hls_cache] init failed (%v) — falling back to per-session tmpdirs", err) } else { hlsCache = c hlsCache.StartSweeper(ctx, time.Hour) log.Printf("[hls_cache] enabled: dir=%s budget=%dGB", cacheDir, cfg.Download.HLSCache.SizeGB) } } else { log.Printf("[hls_cache] disabled by config — every play re-encodes from scratch") } if err := streamSrv.Listen(ctx); err != nil { return fmt.Errorf("start stream server: %w", err) } d.UpdateStreamPort(streamSrv.Port()) // CloudFlare Quick Tunnel — needs the ACTUAL listening port (the // configured port may have been busy and bumped). Spawning here ensures // cloudflared --url points at the right socket. Failures degrade to // Tailscale/LAN only; the supervisor keeps the tunnel up across CF's // periodic rotation + transient cloudflared crashes. if cfg.Download.Funnel.Enabled { go superviseFunnel(ctx, d, streamSrv.Port()) } // Warn at startup if transcode is enabled but ffmpeg/ffprobe are missing. // HLS sessions get rejected at runtime (see daemon.go ~line 455), but // surfacing it here gives the operator a chance to install ffmpeg before // a user hits a confusing "rejected" line in the logs. if cfg.Download.Transcode.Enabled { if _, err := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath); err != nil { log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS") } else if _, err := mediainfo.ResolveFFprobe(cfg.Library.FFprobePath); err != nil { log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS") } } // Wire sync client callbacks sc := d.SyncClient() sc.GetFreeSlots = manager.FreeSlots sc.GetTaskStates = manager.TaskStates d.GetActiveCount = manager.ActiveCount // Trigger immediate sync when a download slot frees up manager.OnTaskDone = func() { d.TriggerSync() } // Event-driven uplink: every status transition (resolving/downloading/ // verifying/organizing/…) pushes to the server right away instead of waiting // for the next adaptive tick. Coalesced by TriggerSync's buffered-1 channel. manager.OnStateChange = func() { d.TriggerSync() } // Wire: sync receives new tasks → submit to manager or handle stream d.OnTasksClaimed = func(tasks []agent.Task) { for _, t := range tasks { if t.Mode == "stream" { if isStreamingTask(t.ID) { continue } cancelStreamContexts() streamSrv.ClearFile() streamCtx, streamCancel := context.WithCancel(ctx) //nolint:gosec // G118: cancel stored in registry streamRegistry.mu.Lock() streamRegistry.cancels[t.ID] = streamCancel streamRegistry.mu.Unlock() go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv, func() { d.TriggerSync() }) } else { manager.Submit(ctx, t) } } } // Resume downloads interrupted by the previous shutdown/crash. Re-submit // each persisted task; its downloader picks up the partial data (torrent via // the piece-completion DB, debrid via Range, usenet via its tracker). Done // before the sync loop starts; a later web re-dispatch of the same id is // deduped by the manager. if resume := taskStore.Load(); len(resume) > 0 { log.Printf("[resume] re-submitting %d interrupted download(s)", len(resume)) for _, t := range resume { t.ForceStart = false // respect MaxConcurrent on bulk auto-resume log.Printf("[resume] %s — %s", agent.ShortID(t.ID), t.Title) manager.Submit(ctx, t) } } // Wire: sync receives control signals → act on manager d.OnControlAction = func(action, taskID string, deleteFiles bool) { switch action { case "cancel": if deleteFiles { manager.CancelAndDeleteFiles(taskID) } else { manager.CancelTask(taskID) } cancelStreamTask(taskID) if streamSrv.CurrentTaskID() == taskID { streamSrv.ClearFile() } case "pause": manager.PauseTask(taskID) cancelStreamTask(taskID) if streamSrv.CurrentTaskID() == taskID { streamSrv.ClearFile() } case "resume": log.Printf("[%s] resume requested, triggering sync", agent.ShortID(taskID)) d.TriggerSync() case "stream": if streamSrv.CurrentTaskID() == taskID { return } task := manager.GetTask(taskID) if task == nil || task.GetStreamURL() != "" { return } provider, err := torrentDl.GetStreamProvider(taskID) if err != nil { log.Printf("[%s] stream failed: %v", agent.ShortID(taskID), err) return } cancelStreamContexts() streamSrv.SetFile(provider, taskID) task.SetStreamURL(streamSrv.URLsJSON()) log.Printf("[%s] streaming: %s", agent.ShortID(taskID), provider.FileName()) watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118 streamRegistry.mu.Lock() streamRegistry.cancels["watch:"+taskID] = watchCancel streamRegistry.mu.Unlock() go engine.NewWatchReporter(agentClient, streamSrv, taskID).Run(watchCtx) case "stop-stream": cancelStreamTask(taskID) if streamSrv.CurrentTaskID() == taskID { streamSrv.ClearFile() } } } // Wire: sync receives file deletion requests from the server if cfg.Library.AllowDelete && len(daemonCfg.ScanPaths) > 0 { sc.OnDeleteFiles = func(items []agent.LibraryDeleteRequest) []int { return library.DeleteFiles(items, daemonCfg.ScanPaths) } } // Wire: sync receives on-demand subtitle-fetch jobs (write VTT sidecars). // Always available (additive, no deletion) as long as we have scan paths. if len(daemonCfg.ScanPaths) > 0 { sc.OnSubtitleFetch = func(reqs []agent.SubtitleFetchRequest) ([]int, []agent.SubtitleFetchError) { return library.FetchSubtitles(reqs, daemonCfg.ScanPaths) } } // Wire: sync receives stream requests for completed downloads d.OnStreamRequested = func(sr agent.StreamRequest) { if streamSrv.CurrentTaskID() == sr.TaskID { // Already serving — notify server it's ready go func() { if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ TaskID: sr.TaskID, StreamReady: true, }); err != nil { log.Printf("[%s] stream ready re-notify failed: %v", agent.ShortID(sr.TaskID), err) } }() return } // reportStreamError tells the web a /stream attempt failed WITHOUT // marking the download failed (StreamError, not Status). The web clears // streamRequested and surfaces this so the player fails fast with the // real reason instead of polling out the 20s "agent didn't respond". reportStreamError := func(reason string) { go func() { if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ TaskID: sr.TaskID, StreamError: reason, }); err != nil { log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err) } }() } // Self-heal a base-path mismatch: the web may hand us a path under an old // root (e.g. /mnt/nas/peliculas/… from before a binary→docker move) that // is now outside our allowed dirs but whose file still exists under a // current root (/downloads/…). resolvePlayableFile remaps, stat-retries // (NFS) and resolves directories; the next re-scan persists the fix to // the DB. See docs/plans/unarr-path-resilience.md. filePath, errCode, perr := resolvePlayableFile(sr.FilePath, streamAllowedRoots(cfg), agent.ShortID(sr.TaskID)) if perr != nil { log.Printf("[%s] stream request rejected (%s): %v", agent.ShortID(sr.TaskID), errCode, perr) reportStreamError(perr.Error()) return } cancelStreamContexts() streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID) log.Printf("[%s] streaming from disk: %s → %s", agent.ShortID(sr.TaskID), filepath.Base(filePath), streamSrv.URL()) watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118 streamRegistry.mu.Lock() streamRegistry.cancels["watch:"+sr.TaskID] = watchCancel streamRegistry.mu.Unlock() go engine.NewWatchReporter(agentClient, streamSrv, sr.TaskID).Run(watchCtx) go func() { if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ TaskID: sr.TaskID, StreamReady: true, }); err != nil { log.Printf("[%s] stream ready report failed: %v", agent.ShortID(sr.TaskID), err) } }() } // Wire: sync receives HLS streaming session requests. Each session spawns // one ffmpeg process and registers its HLS playlist with the StreamServer. // Validate FilePath against allowed dirs to prevent path traversal abuse // from a compromised server. d.OnStreamSession = func(sess agent.StreamSession) { if playerSessionRegistry.has(sess.SessionID) { return // already running } // failSession logs AND reports a startup failure to the web — every // abort path in this handler must go through it. A silent `return` // here left the player probing a playlist that would never exist // until its 30s deadline (incident 2026-06-10: deleted file + stale // library row = eternal "Preparando sesión"). Best-effort: on old web // deployments the endpoint 404s and the player falls back to the // probe deadline, exactly as before. failSession := func(sessionID, code, message string) { log.Printf("[hls %s] failed (%s): %s", agent.ShortID(sessionID), code, message) go func() { // Fresh context on purpose: failures cluster exactly when the // daemon ctx is being cancelled (shutdown kills in-flight // session starts), and a report derived from it would die // before reaching the web. The 10s cap still bounds it. rctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := agentClient.ReportSessionError(rctx, sessionID, code, message); err != nil { log.Printf("[hls %s] session error report failed: %v", agent.ShortID(sessionID), err) } }() } // markReady reports "first bytes are servable" for the no-transcode // paths (direct-play, remux, debrid direct) — one place instead of a // copy per branch. HLS sessions report via watchSessionReady instead // (they wait for seg-0 + attach a health snapshot). markReady := func(sessionID string) { go func() { rctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := agentClient.MarkSessionReady(rctx, sessionID, nil); err != nil { log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sessionID), err) } }() } // startHLSPlayback starts an HLS encode (local file or debrid URL) and // wires it into the StreamServer. Shared by the local-file HLS path and // the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe // off the sync loop, and report readiness identically. // // Prewarm sessions (background cache-fill: next-episode, hover) take a // deferential path: wait until no live encode is running (never steal // the encoder from the viewer), then register WITHOUT displacing other // sessions. Before this, a prewarm claimed mid-playback went through // Register() and KILLED the stream the user was watching (verified // 2026-06-10: prewarm started → live session "closed (cache // discarded)" → player 404). startHLSPlayback := func(hlsCfg engine.HLSSessionConfig, hlsCtx context.Context, hlsCancel context.CancelFunc) { playerSessionRegistry.add(hlsCfg.SessionID, hlsCancel) prewarm := sess.Prewarm go func() { if prewarm { // Defer until the encoder is free. Poll is cheap (10 s); // cap the wait at 30 min — a prewarm that can't start // within an episode's runtime has lost its purpose. deadline := time.Now().Add(30 * time.Minute) for streamSrv.HLS().HasLiveEncode() { if time.Now().After(deadline) || hlsCtx.Err() != nil { playerSessionRegistry.remove(hlsCfg.SessionID) hlsCancel() log.Printf("[hls %s] prewarm abandoned (encoder busy %s)", agent.ShortID(hlsCfg.SessionID), "30m") return } select { case <-hlsCtx.Done(): playerSessionRegistry.remove(hlsCfg.SessionID) return case <-time.After(10 * time.Second): } } } else { // REAL session: reap in-flight prewarm encodes BEFORE // StartHLSSession so the per-key cache writer-lock is // free and the viewer's encode lands in the persistent // cache (not an uncached tmpdir). A SEALED prewarm is // unaffected — this session simply cache-HITs it. if n := streamSrv.HLS().CloseWhere(func(s *engine.HLSSession) bool { return s.IsPrewarm() }); n > 0 { log.Printf("[hls %s] reaped %d in-flight prewarm(s) for the viewer session", agent.ShortID(hlsCfg.SessionID), n) } } hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) if err != nil { playerSessionRegistry.remove(hlsCfg.SessionID) hlsCancel() failSession(hlsCfg.SessionID, sessErrStartFailed, err.Error()) return } if prewarm { // Side-by-side: never evict the viewer's session. A later // REAL session still evicts this one via Register — by // then the encode is usually sealed in the segment cache. streamSrv.HLS().RegisterKeep(hsess) log.Printf("[hls %s] prewarm encoding: %s", agent.ShortID(hlsCfg.SessionID), hlsCfg.FileName) return // no viewer waiting → no ready-watcher } streamSrv.HLS().Register(hsess) go watchSessionReady(hlsCtx, agentClient, hsess, hlsCfg.SessionID) }() } // Debrid direct-play (hueco #2 / 2a): the source has no local file — the // web resolved an HTTPS debrid link (cache-confirmed, browser-native // container) and the daemon streams /stream from it via ranged GETs. // Runs BEFORE the filePath checks (there is no local path) and needs no // ffmpeg. PlayMethod != "hls" distinguishes this from the debrid // HLS-from-URL branch below (a non-native container the web wants // transcoded). Provider setup does a HEAD, so hand it off to a goroutine // to keep the sync loop from blocking other pending actions; register the // session up front so a duplicate sync within the setup window is a // no-op (matches the HLS branch's handoff rationale). if sess.DirectURL != "" && sess.PlayMethod != "hls" { playerSessionRegistry.add(sess.SessionID, func() { streamSrv.ClearFile() }) // refresh re-resolves a fresh debrid link when this one expires // mid-stream (hueco #2 / 2c). Bound to the daemon ctx so a shutdown // cancels an in-flight refresh. refresh := func(rctx context.Context) (string, error) { return agentClient.RefreshStreamURL(rctx, sess.SessionID) } go func() { bctx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() provider, perr := engine.NewDebridFileProvider(bctx, sess.DirectURL, sess.FileName, sess.FileSize, refresh) if perr != nil { playerSessionRegistry.remove(sess.SessionID) failSession(sess.SessionID, sessErrStartFailed, fmt.Sprintf("debrid provider: %v", perr)) return } streamSrv.SetFile(provider, sess.TaskID) log.Printf("[stream %s] debrid direct-play: %s (%d bytes)", agent.ShortID(sess.SessionID), provider.FileName(), provider.FileSize()) markReady(sess.SessionID) }() return } // Debrid HLS-from-URL (hueco #2 / 2b): the source is debrid-cached but // NOT browser-native (mkv/HEVC/…), so the web set playMethod="hls" // alongside the DirectURL. ffmpeg transcodes straight from the HTTP URL — // no local file, no torrent. Cache is keyed by info_hash (not the // per-resolution URL) so a re-play hits the segment cache. if sess.DirectURL != "" { // playMethod == "hls" implied (2a returned above) tcRuntime := buildTranscodeRuntime(ctx, cfg) if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { failSession(sess.SessionID, sessErrFfmpegMissing, "ffmpeg/ffprobe unavailable (debrid HLS)") return } hlsCtx, hlsCancel := context.WithCancel(ctx) startHLSPlayback(engine.HLSSessionConfig{ SessionID: sess.SessionID, SourceURL: sess.DirectURL, CacheID: sess.InfoHash, FileName: sess.FileName, Quality: sess.Quality, AudioIndex: sess.AudioIndex, BurnSubtitleIndex: sess.BurnSubtitleIndex, StartSec: sess.StartSec, Prewarm: sess.Prewarm, Transcode: tcRuntime, Cache: hlsCache, // 2c: refresh the debrid link if it expires mid-transcode; the // auto-restart supervisor calls this before relaunching ffmpeg. RefreshURL: func(rctx context.Context) (string, error) { return agentClient.RefreshStreamURL(rctx, sess.SessionID) }, }, hlsCtx, hlsCancel) log.Printf("[hls %s] debrid HLS-from-URL: %s", agent.ShortID(sess.SessionID), sess.FileName) return } if sess.FilePath == "" { failSession(sess.SessionID, sessErrStartFailed, "empty file path") return } // SAME base-path self-heal + stat-retry + dir resolution as the raw // /stream handler (resolvePlayableFile). A path under an old/host base // (e.g. /mnt/nas/peliculas/… handed by the web while this docker agent // mounts that media at /downloads) remaps onto the current root; a path // whose file is genuinely gone fails fast as "file_missing" so the web // can prune the stale library row and the player can fall back, instead // of the player probing a playlist that will never exist. // See docs/plans/unarr-path-resilience.md. filePath, errCode, perr := resolvePlayableFile(sess.FilePath, streamAllowedRoots(cfg), "hls "+agent.ShortID(sess.SessionID)) if perr != nil { failSession(sess.SessionID, errCode, perr.Error()) return } // Direct-play (hueco #3 / 3a): the web decided this source is already // browser-native (mp4 h264/aac 8-bit SDR) from library scan metadata, // gated on agent version. Serve the raw file over /stream (HTTP Range, // no ffmpeg) instead of transcoding to HLS — zero CPU, instant seek. // Runs BEFORE the ffmpeg-availability check on purpose: direct-play // needs no ffmpeg, so it must work even when transcode is disabled. if sess.PlayMethod == "direct" { streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sess.TaskID) // cancel just clears the served file so daemon shutdown / drain // stops exposing it on /stream. There's no ffmpeg child to kill. playerSessionRegistry.add(sess.SessionID, func() { streamSrv.ClearFile() }) log.Printf("[stream %s] direct-play: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath)) // File is on disk → ready immediately. Tell the web so the player // attaches