unarr/internal/cmd/daemon.go
Deivid Soto cda2e1322c feat(hls): full-GPU scale_cuda for NVENC SDR downscales
Keep an NVENC downscale of an SDR source entirely on the GPU
(decode -> scale_cuda -> h264_nvenc) instead of copying every frame to the
CPU for `scale=` and back. That GPU->CPU->GPU round-trip is the wall on
modest GPUs; even a strong box gains ~37% (scale_cuda 14.9x vs CPU 10.9x
on a 4K SDR HEVC -> 1080p encode).

Strictly gated so every case that needs CPU frames is unchanged:
- HDR (libplacebo Vulkan / zscale CPU tonemap can't consume a CUDA surface),
- burn-in (the scale2ref+overlay composite runs on CPU frames),
- non-NVENC encoders, and no-op when not actually downscaling.

- hwscale.go: FFmpegSupportsScaleCuda — a functional 1-frame probe mirroring
  the libplacebo probe (presence in -filters lies; needs a real CUDA device).
  Probes the worst-case real input (10-bit p010 -> 8-bit yuv420p) so a host
  whose scale_cuda can't do the 10->8-bit conversion fails closed to CPU.
- hls.go: useCudaScale gate + `-hwaccel_output_format cuda` + a
  `scale_cuda=-2:H:format=yuv420p` filter branch. Output is 8-bit
  (format=yuv420p + `-profile:v main`), browser-safe.
- transcode_quality.go / player_session_registry.go / daemon.go: HasScaleCuda
  flag, populated + warmed at startup like the other ffmpeg capability probes.

Fail-closed: probe absent/fails -> keep the CPU scale path, no regression.
Verified live (real 4K SDR HEVC Main10 session emitted scale_cuda, 5.54x
realtime, nvenc at 100%) + 8 arg-builder unit tests for the gate.
2026-06-10 21:44:58 +02:00

1721 lines
67 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package cmd
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/fatih/color"
"github.com/spf13/cobra"
"github.com/torrentclaw/unarr/internal/acme"
"github.com/torrentclaw/unarr/internal/agent"
"github.com/torrentclaw/unarr/internal/config"
"github.com/torrentclaw/unarr/internal/engine"
"github.com/torrentclaw/unarr/internal/funnel"
"github.com/torrentclaw/unarr/internal/library"
"github.com/torrentclaw/unarr/internal/library/mediainfo"
"github.com/torrentclaw/unarr/internal/usenet/download"
"github.com/torrentclaw/unarr/internal/vpn"
)
// newStartCmd creates the top-level `unarr start` command.
func newStartCmd() *cobra.Command {
return &cobra.Command{
Use: "start",
Short: "Start the download daemon (foreground)",
Long: `Start the unarr daemon in the foreground.
Registers with the server, receives download tasks via periodic sync,
and executes them using the configured download method.
Supports torrent, debrid, and usenet downloads concurrently.
The daemon syncs state with the server every 3s when someone is viewing
the web dashboard, or every 60s when idle. Press Ctrl+C to stop
gracefully — active downloads get up to 30 seconds to finish.
Requires: API key, agent ID, and download directory (run 'unarr init' first).
To run as a background service, use 'unarr daemon install' instead.`,
Example: ` unarr start
unarr start --config /path/to/config.toml`,
RunE: func(cmd *cobra.Command, args []string) error {
return runDaemonStart()
},
}
}
// newStopCmd creates the top-level `unarr stop` command.
func newStopCmd() *cobra.Command {
return &cobra.Command{
Use: "stop",
Short: "Stop the running daemon",
Long: `Stop the unarr daemon gracefully.
Reads the daemon PID from the state file and sends a graceful stop signal.
Works regardless of whether the daemon was started in the foreground or as a service.
To stop a service-managed daemon and prevent auto-restart, use 'unarr daemon stop' instead.`,
Example: ` unarr stop`,
RunE: func(cmd *cobra.Command, args []string) error {
return stopDaemonByPID()
},
}
}
// newDaemonCmd creates `unarr daemon` for administrative subcommands.
func newDaemonCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "daemon <command>",
Short: "Manage the daemon as a system service",
Long: `Install, control and inspect the unarr daemon as a system service.
Linux: systemd user service (~/.config/systemd/user/unarr.service)
macOS: launchd agent (~/Library/LaunchAgents/com.torrentclaw.unarr.plist)
Windows: Task Scheduler task (runs at logon)`,
Example: ` unarr daemon install
unarr daemon start
unarr daemon status
unarr daemon logs -f
unarr daemon reload
unarr daemon restart
unarr daemon stop
unarr daemon uninstall`,
}
cmd.AddCommand(
newDaemonInstallCmdReal(),
newDaemonUninstallCmdReal(),
newDaemonStartCmd(),
newDaemonStopCmd(),
newDaemonRestartCmd(),
newDaemonSvcStatusCmd(),
newDaemonLogsCmd(),
newDaemonReloadCmd(),
)
return cmd
}
func runDaemonStart() error {
cfg := loadConfig()
bold := color.New(color.Bold)
// Validate config
if cfg.Auth.APIKey == "" {
return fmt.Errorf("no API key configured — run 'unarr init' first")
}
if cfg.Agent.ID == "" {
return fmt.Errorf("no agent ID — run 'unarr init' first")
}
if cfg.Download.Dir == "" {
return fmt.Errorf("no download directory — run 'unarr init' first")
}
// Validate configured paths are safe
if err := cfg.ValidatePaths(); err != nil {
return fmt.Errorf("unsafe configuration: %w", err)
}
// Ensure download dir exists
if err := os.MkdirAll(cfg.Download.Dir, 0o755); err != nil {
return fmt.Errorf("create download dir: %w", err)
}
// Per-agent direct-TLS: ensure a stable high-entropy hash exists, generated
// + persisted once. Distinct from the (enumerable) agent UUID; the cert
// broker issues *.<hash>.agent.unarr.app for it.
if cfg.Download.HTTPSStreamPort > 0 && cfg.Agent.Hash == "" {
if h, err := acme.GenerateHash(); err != nil {
log.Printf("[acme] could not generate agent hash (%v) — direct-TLS disabled", err)
} else {
cfg.Agent.Hash = h
if err := config.Save(cfg, config.FilePath()); err != nil {
log.Printf("[acme] could not persist agent hash (%v) — direct-TLS disabled until persisted", err)
cfg.Agent.Hash = ""
} else {
log.Printf("[acme] generated agent hash %s", h)
}
}
}
// Clean up stale resume files (>7 days old)
resumeDir := filepath.Join(config.DataDir(), "resume")
if removed := download.CleanStaleFiles(resumeDir, 7*24*time.Hour); removed > 0 {
log.Printf("Cleaned %d stale resume file(s)", removed)
}
fmt.Println()
bold.Println(" unarr Daemon")
fmt.Println()
userAgent := "unarr/" + Version
// Probe HW accel + derive a sensible transcode resolution cap. The cap
// is what the web side uses to decide whether the user should pre-empt
// transcoding by downloading a smaller version (4K source on a software
// libx264-only host is the canonical case where pre-download wins).
//
// Use the full diagnostic (encoders + devices + ffmpeg version) instead
// of just the picked backend — the extra fields ride along in the
// register payload so the web "Diagnose transcoder" modal can show *why*
// libx264 was selected on a host with a GPU (e.g. brew's ffmpeg without
// --enable-nvenc). 10 s ceiling so a hung ffmpeg binary can't stall
// startup forever.
ffmpegResolved, _ := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath)
probeCtx, probeCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer probeCancel() // guard against a panic inside DetectHWAccelDiagnostic
hwDiag := engine.DetectHWAccelDiagnostic(probeCtx, ffmpegResolved)
log.Println(hwDiag.LogLine())
hwAccelPick := hwDiag.Pick
// Measure the real transcode ceiling instead of guessing from the backend.
// HW encoders return 2160 instantly; a software-only host runs a bounded
// encode benchmark so a weak NAS/CPU reports the rung it can actually
// sustain (720/480) and the web side routes oversized sources to an
// external player instead of a stuttering transcode. This blocks
// registration on a software host, so it's bounded tight (3 rungs × 6 s =
// 18 s worst case; <1 s on a capable box that passes the first rung). Own
// timeout — the 10 s probeCtx above is sized for the quick diagnostic.
benchCtx, benchCancel := context.WithTimeout(context.Background(), 20*time.Second)
maxTranscodeHeight := engine.BenchmarkMaxTranscodeHeight(benchCtx, ffmpegResolved, hwAccelPick)
benchCancel()
// Warm the tonemap capability caches off the hot path. The libplacebo probe
// actually RUNS the filter (Vulkan device init ~1.7 s), so doing it lazily
// in buildTranscodeRuntime would tax the FIRST stream session and risk its
// setup timeout. A real session arrives seconds-to-minutes after startup, so
// a background warm has finished by then; if one races in first, the cache's
// own mutex makes the concurrent cold call safe (both compute the same bool).
if cfg.Download.Transcode.Enabled && ffmpegResolved != "" {
go func() {
engine.FFmpegSupportsLibplacebo(ffmpegResolved)
engine.FFmpegSupportsZscale(ffmpegResolved)
engine.FFmpegSupportsScaleCuda(ffmpegResolved)
}()
}
// Create daemon config
daemonCfg := agent.DaemonConfig{
AgentID: cfg.Agent.ID,
AgentName: cfg.Agent.Name,
Version: Version,
DownloadDir: cfg.Download.Dir,
StreamPort: cfg.Download.StreamPort,
HTTPSStreamPort: cfg.Download.HTTPSStreamPort,
AgentHash: cfg.Agent.Hash,
LanIP: engine.LanIP(),
TailscaleIP: engine.TailscaleIP(),
CanDelete: cfg.Library.AllowDelete,
ScanPaths: library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath),
HWAccel: string(hwAccelPick),
MaxTranscodeHeight: maxTranscodeHeight,
FFmpegVersion: hwDiag.FFmpegVersion,
FFmpegPath: hwDiag.FFmpegPath,
HWEncoders: hwDiag.Encoders,
HWDevices: hwDiag.Devices,
AutoUpgrade: cfg.Daemon.AutoUpgradeEnabled(),
Downlink: cfg.Daemon.Downlink,
}
// Create HTTP client with mirror failover so a `.com` block-out rolls
// over to `.to` / .onion without restarting the daemon.
agentClient := newAgentClientFromConfig(cfg, userAgent)
log.Printf("Transport: HTTP sync → %s (mirrors: %d)", cfg.Auth.APIURL, len(cfg.Auth.Mirrors))
// Create daemon
d := agent.NewDaemon(daemonCfg, agentClient)
// Start SIGUSR1 reload watcher (unix only, no-op on Windows)
startReloadWatcher(&ReloadableConfig{Daemon: d})
// Daemon-scoped context — cancelled on shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Parse speed limits
maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed)
maxUl, _ := config.ParseSpeed(cfg.Download.MaxUploadSpeed)
// Parse torrent timeouts
metaTimeout, _ := time.ParseDuration(cfg.Download.MetadataTimeout)
stallTimeout, _ := time.ParseDuration(cfg.Download.StallTimeout)
// Parse the seeding time target (0/"" = no time target — ratio-only or forever)
seedTime, _ := time.ParseDuration(cfg.Download.SeedTime)
// Create progress reporter — only used for stream tasks (handleStreamTask)
// The sync goroutine handles all regular progress reporting.
statusInterval, _ := time.ParseDuration(cfg.Daemon.StatusInterval)
if statusInterval == 0 {
statusInterval = 3 * time.Second
}
reporter := engine.NewProgressReporter(agentClient, statusInterval)
reporter.SetWatchingFunc(func() bool { return d.Watching.Load() })
// Managed-VPN add-on: bring up the in-process WireGuard split-tunnel before
// the torrent client so peer + tracker traffic routes through it. Failure is
// non-fatal — log and download in the clear (better than refusing to run).
var vpnTunnel *vpn.Tunnel
if cfg.Download.VPN.ConfigFile != "" {
// Self-hosted / personal-VPN mode: read a local .conf directly.
raw, rerr := os.ReadFile(cfg.Download.VPN.ConfigFile)
if rerr != nil {
log.Printf("[vpn] could not read config_file %q (%v) — downloading in the clear", cfg.Download.VPN.ConfigFile, rerr)
} else if t, uerr := vpn.Up(string(raw)); uerr != nil {
log.Printf("[vpn] tunnel failed to start from config_file (%v) — downloading in the clear", uerr)
} else {
vpnTunnel = t
defer vpnTunnel.Close()
log.Printf("[vpn] managed VPN active (local config_file) — torrent traffic split-tunnelled through WireGuard")
}
} else if cfg.Download.VPN.Enabled {
apiURL := cfg.Auth.APIURL
if apiURL == "" {
apiURL = "https://torrentclaw.com"
}
fetchCtx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
conf, ferr := vpn.FetchConfig(fetchCtx, apiURL, cfg.Auth.APIKey, "unarr/"+Version, cfg.Agent.ID, false)
cancel()
var fe *vpn.FetchError
switch {
case ferr != nil && errors.As(ferr, &fe) && fe.Code == vpn.ErrSlotOnDevice:
log.Printf("[vpn] the single WireGuard slot is already held by another unarr agent — this one downloads in the clear. To protect this machine too, set up OpenVPN on it (1 agent uses WireGuard, the rest use OpenVPN — up to 10). See https://torrentclaw.com/vpn")
case ferr != nil:
log.Printf("[vpn] could not enable VPN (%v) — downloading in the clear", ferr)
default:
if t, uerr := vpn.Up(conf); uerr != nil {
log.Printf("[vpn] tunnel failed to start (%v) — downloading in the clear", uerr)
} else {
vpnTunnel = t
defer vpnTunnel.Close()
log.Printf("[vpn] managed VPN active — torrent traffic split-tunnelled through WireGuard")
}
}
}
// Record VPN split-tunnel state for `unarr vpn status`.
if vpnTunnel != nil {
mode := "managed"
if cfg.Download.VPN.ConfigFile != "" {
mode = "self-hosted"
}
d.SetVPNState(true, mode, vpnTunnel.Endpoint)
}
// Create torrent downloader
torrentDl, err := engine.NewTorrentDownloader(engine.TorrentConfig{
DataDir: cfg.Download.Dir,
PieceCompletionDir: config.DataDir(), // keep piece-completion DB off NFS/SMB mounts
MetadataTimeout: metaTimeout,
StallTimeout: stallTimeout,
MaxTimeout: 0,
MaxDownloadRate: maxDl,
MaxUploadRate: maxUl,
ListenPort: cfg.Download.ListenPort,
SeedEnabled: cfg.Download.SeedEnabled,
SeedRatio: cfg.Download.SeedRatio,
SeedTime: seedTime,
VPNTunnel: vpnTunnel,
})
if err != nil {
return fmt.Errorf("create torrent downloader: %w", err)
}
if maxDl > 0 || maxUl > 0 {
dlStr, ulStr := "unlimited", "unlimited"
if maxDl > 0 {
dlStr = formatSpeedLog(maxDl)
}
if maxUl > 0 {
ulStr = formatSpeedLog(maxUl)
}
log.Printf("Speed limits: download=%s upload=%s", dlStr, ulStr)
}
if cfg.Download.SeedEnabled {
switch {
case cfg.Download.SeedRatio > 0 && seedTime > 0:
log.Printf("[torrent] seeding enabled (stop at ratio %.2f or %s, whichever first)", cfg.Download.SeedRatio, seedTime)
case cfg.Download.SeedRatio > 0:
log.Printf("[torrent] seeding enabled (stop at ratio %.2f)", cfg.Download.SeedRatio)
case seedTime > 0:
log.Printf("[torrent] seeding enabled (stop after %s)", seedTime)
default:
log.Printf("[torrent] seeding enabled (no ratio/time target — seeds until shutdown)")
}
}
// Create debrid downloader
debridDl := engine.NewDebridDownloader()
usenetDl := engine.NewUsenetDownloader(agentClient)
// Pre-flight disk reserve: refuse a download that would leave less than this
// many bytes free, so a download never fills the filesystem to 0 mid-write.
minFreeBytes := int64(cfg.Download.MinFreeDiskMB) << 20
torrentDl.SetMinFreeBytes(minFreeBytes)
debridDl.SetMinFreeBytes(minFreeBytes)
usenetDl.SetMinFreeBytes(minFreeBytes)
log.Printf("[disk] download free-space reserve: %d MiB", cfg.Download.MinFreeDiskMB)
// Create download manager
manager := engine.NewManager(engine.ManagerConfig{
MaxConcurrent: cfg.Download.MaxConcurrent,
OutputDir: cfg.Download.Dir,
Notifications: cfg.Notifications.Enabled,
Organize: engine.OrganizeConfig{
Enabled: cfg.Organize.Enabled,
MoviesDir: cfg.Organize.MoviesDir,
TVShowsDir: cfg.Organize.TVShowsDir,
OutputDir: cfg.Download.Dir,
},
}, reporter, torrentDl, debridDl, usenetDl)
// Resume store: persist in-flight downloads so a daemon restart can re-submit
// them (the downloaders resume the partial data). Wire it before any Submit.
taskStore := agent.NewActiveTaskStore()
manager.SetTaskStore(taskStore)
// Create persistent stream server
streamSrv := engine.NewStreamServer(cfg.Download.StreamPort)
streamSrv.SetUPnPEnabled(cfg.Download.EnableUPnP)
// Wire ffmpeg so /thumbnail can extract single frames for the web's "file
// characteristics" panel (frames on demand). Empty = thumbnails 503.
streamSrv.SetFFmpegPath(ffmpegResolved)
// Write-through cache extracted WebVTT into the hidden ".unarr" sidecar dir so
// /sub serves instantly (and giant remuxes that exceed the on-demand timeout
// work once the scan prewarm has filled the cache). Default true.
streamSrv.SetCacheSubtitles(cfg.Library.CacheSubtitles)
streamSrv.SetCacheThumbnails(cfg.Library.CacheThumbnails)
// Tell /trickplay which tile width the scan prewarm built the sprite at (the
// agent owns the width; the web requests by path only). 0 = disabled → 404.
trickW := 0
if cfg.Library.Trickplay.Enabled {
if trickW = cfg.Library.Trickplay.Width; trickW <= 0 {
trickW = 240
}
}
streamSrv.SetTrickplayWidth(trickW)
// Self-heal a host→container base-path skew for the path-scoped handlers
// (/thumbnail, /trickplay, /sub), mirroring the /stream + /hls remap. Without
// it, a docker agent whose web DB holds host paths (/mnt/nas/peliculas/…) but
// mounts that media at /downloads returns 404 for every scrubber frame /
// trickplay sprite / external subtitle. Same allowed roots + relocate logic.
// NOTE: relocateUnreachable needs a ≥3-segment path tail, so a FLAT media
// layout (file directly under the root) is not self-healed here — those
// sidecars 404 on a docker agent with a host→container skew until a re-scan
// rewrites the DB path. Same limitation as the /stream self-heal.
streamSrv.SetPathResolver(func(p string) string {
p = filepath.Clean(p)
roots := streamAllowedRoots(cfg)
if isAllowedStreamPath(p, roots...) {
return p
}
return relocateUnreachable(p, roots) // "" when not locatable → caller 404s
})
streamSrv.SetRequireStreamToken(cfg.Download.RequireStreamToken)
// Report the stream-token signing key ONLY when enforcing, so the web's
// "secret present → mint HLS token" signal accurately means "this agent
// verifies tokens". Reporting it with enforcement off would make the web
// mint HLS path tokens the agent never peels → 404. Set before Register().
if cfg.Download.RequireStreamToken {
d.UpdateStreamSecret(streamSrv.StreamSecretHex())
}
// CORS extras = operator config + dynamic mirror list from /api/mirrors.
// Without the mirror merge, a user playing from `torrentclaw.to` (or any
// future mirror) hits the daemon, gets 200 + body, but no
// `Access-Control-Allow-Origin` → browser drops the response → player
// reports "404 todos los canales". Fetching /api/mirrors at startup
// future-proofs against mirror additions without a CLI rebuild.
corsExtras := append([]string(nil), cfg.Download.CORSExtraOrigins...)
corsExtras = append(corsExtras, mirrorCORSOrigins(ctx, cfg, userAgent)...)
streamSrv.SetCORSAllowedOrigins(corsExtras)
// HTTPS stream listener (per-agent direct-TLS): obtain/renew the cert from the
// broker FIRST (broker runs ACME DNS-01 with our CSR; the private key never
// leaves us), then arm the listener if a usable cert is on disk. Without a
// valid cert there is nothing to serve over TLS, and the HTTP listener +
// funnel keep working regardless.
if cfg.Download.HTTPSStreamPort > 0 {
if cfg.Agent.Hash != "" {
// The broker's ownership check requires the agent to be registered
// first (the agent_hash must live on THIS user's agent_registration
// row). Register now — best-effort — so a fresh agent can get its cert
// on the first boot; d.Run() registers again later (idempotent upsert).
if err := d.Register(ctx); err != nil {
log.Printf("[acme] pre-cert registration failed (%v) — cert will arrive on a later renewal tick", err)
} else {
fetchAgentCert(ctx, agentClient, cfg.Agent.Hash)
}
}
keyPath, certPath := acme.Paths(config.DataDir())
if err := streamSrv.LoadTLSCertificateFromFiles(certPath, keyPath); err != nil {
log.Printf("[stream] HTTPS disabled — no usable certificate at %s (%v)", certPath, err)
} else {
streamSrv.EnableTLS(cfg.Download.HTTPSStreamPort)
log.Printf("[stream] HTTPS armed on port %d with certificate %s", cfg.Download.HTTPSStreamPort, certPath)
}
}
// Reap HLS tmpdirs left over from a previous daemon run before we start
// accepting new sessions. The in-memory registry doesn't survive a
// restart, so without this disk usage grows unbounded across restarts.
if err := engine.CleanupHLSOrphanDirs(); err != nil {
log.Printf("[hls] orphan tmpdir cleanup: %v", err)
}
// Persistent HLS segment cache — survives across sessions so re-plays
// of the same file at the same quality skip ffmpeg entirely. Off when
// hls_cache.enabled = false; size cap from hls_cache.size_gb; path from
// hls_cache.dir (defaults to ~/.cache/unarr/hls-cache).
var hlsCache *engine.HLSCache
if cfg.Download.HLSCache.Enabled {
cacheDir := cfg.Download.HLSCache.Dir
if cacheDir == "" {
if base, err := os.UserCacheDir(); err == nil {
cacheDir = filepath.Join(base, "unarr", "hls-cache")
} else {
cacheDir = filepath.Join(os.TempDir(), "unarr-hls-cache")
}
}
c, err := engine.NewHLSCache(cacheDir, cfg.Download.HLSCache.SizeGB)
if err != nil {
log.Printf("[hls_cache] init failed (%v) — falling back to per-session tmpdirs", err)
} else {
hlsCache = c
hlsCache.StartSweeper(ctx, time.Hour)
log.Printf("[hls_cache] enabled: dir=%s budget=%dGB", cacheDir, cfg.Download.HLSCache.SizeGB)
}
} else {
log.Printf("[hls_cache] disabled by config — every play re-encodes from scratch")
}
if err := streamSrv.Listen(ctx); err != nil {
return fmt.Errorf("start stream server: %w", err)
}
d.UpdateStreamPort(streamSrv.Port())
// Per-agent direct-TLS renewal: re-fetch the cert ahead of expiry and
// hot-swap it into the live listener (no restart). Only meaningful once the
// listener was armed at startup (a first-issuance that failed then needs a
// daemon restart to arm). Cheap 6 h poll; NeedsIssue gates the actual fetch.
if cfg.Download.HTTPSStreamPort > 0 && cfg.Agent.Hash != "" {
go func() {
t := time.NewTicker(6 * time.Hour)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
if !acme.NeedsIssue(config.DataDir()) {
continue
}
fetchAgentCert(ctx, agentClient, cfg.Agent.Hash)
keyPath, certPath := acme.Paths(config.DataDir())
if err := streamSrv.LoadTLSCertificateFromFiles(certPath, keyPath); err != nil {
log.Printf("[acme] hot-swap after renewal failed: %v", err)
} else {
log.Printf("[acme] renewed cert hot-swapped into live listener")
}
}
}
}()
}
// CloudFlare Quick Tunnel — needs the ACTUAL listening port (the
// configured port may have been busy and bumped). Spawning here ensures
// cloudflared --url points at the right socket. Failures degrade to
// Tailscale/LAN only; the supervisor keeps the tunnel up across CF's
// periodic rotation + transient cloudflared crashes.
if cfg.Download.Funnel.Enabled {
go superviseFunnel(ctx, d, streamSrv.Port())
}
// Warn at startup if transcode is enabled but ffmpeg/ffprobe are missing.
// HLS sessions get rejected at runtime (see daemon.go ~line 455), but
// surfacing it here gives the operator a chance to install ffmpeg before
// a user hits a confusing "rejected" line in the logs.
if cfg.Download.Transcode.Enabled {
if _, err := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath); err != nil {
log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS")
} else if _, err := mediainfo.ResolveFFprobe(cfg.Library.FFprobePath); err != nil {
log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS")
}
}
// Wire sync client callbacks
sc := d.SyncClient()
sc.GetFreeSlots = manager.FreeSlots
sc.GetTaskStates = manager.TaskStates
d.GetActiveCount = manager.ActiveCount
// Trigger immediate sync when a download slot frees up
manager.OnTaskDone = func() { d.TriggerSync() }
// Event-driven uplink: every status transition (resolving/downloading/
// verifying/organizing/…) pushes to the server right away instead of waiting
// for the next adaptive tick. Coalesced by TriggerSync's buffered-1 channel.
manager.OnStateChange = func() { d.TriggerSync() }
// Wire: sync receives new tasks → submit to manager or handle stream
d.OnTasksClaimed = func(tasks []agent.Task) {
for _, t := range tasks {
if t.Mode == "stream" {
if isStreamingTask(t.ID) {
continue
}
cancelStreamContexts()
streamSrv.ClearFile()
streamCtx, streamCancel := context.WithCancel(ctx) //nolint:gosec // G118: cancel stored in registry
streamRegistry.mu.Lock()
streamRegistry.cancels[t.ID] = streamCancel
streamRegistry.mu.Unlock()
go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv, func() { d.TriggerSync() })
} else {
manager.Submit(ctx, t)
}
}
}
// Resume downloads interrupted by the previous shutdown/crash. Re-submit
// each persisted task; its downloader picks up the partial data (torrent via
// the piece-completion DB, debrid via Range, usenet via its tracker). Done
// before the sync loop starts; a later web re-dispatch of the same id is
// deduped by the manager.
if resume := taskStore.Load(); len(resume) > 0 {
log.Printf("[resume] re-submitting %d interrupted download(s)", len(resume))
for _, t := range resume {
t.ForceStart = false // respect MaxConcurrent on bulk auto-resume
log.Printf("[resume] %s — %s", agent.ShortID(t.ID), t.Title)
manager.Submit(ctx, t)
}
}
// Wire: sync receives control signals → act on manager
d.OnControlAction = func(action, taskID string, deleteFiles bool) {
switch action {
case "cancel":
if deleteFiles {
manager.CancelAndDeleteFiles(taskID)
} else {
manager.CancelTask(taskID)
}
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
}
case "pause":
manager.PauseTask(taskID)
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
}
case "resume":
log.Printf("[%s] resume requested, triggering sync", agent.ShortID(taskID))
d.TriggerSync()
case "stream":
if streamSrv.CurrentTaskID() == taskID {
return
}
task := manager.GetTask(taskID)
if task == nil || task.GetStreamURL() != "" {
return
}
provider, err := torrentDl.GetStreamProvider(taskID)
if err != nil {
log.Printf("[%s] stream failed: %v", agent.ShortID(taskID), err)
return
}
cancelStreamContexts()
streamSrv.SetFile(provider, taskID)
task.SetStreamURL(streamSrv.URLsJSON())
log.Printf("[%s] streaming: %s", agent.ShortID(taskID), provider.FileName())
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
streamRegistry.mu.Lock()
streamRegistry.cancels["watch:"+taskID] = watchCancel
streamRegistry.mu.Unlock()
go engine.NewWatchReporter(agentClient, streamSrv, taskID).Run(watchCtx)
case "stop-stream":
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
}
}
}
// Wire: sync receives file deletion requests from the server
if cfg.Library.AllowDelete && len(daemonCfg.ScanPaths) > 0 {
sc.OnDeleteFiles = func(items []agent.LibraryDeleteRequest) []int {
return library.DeleteFiles(items, daemonCfg.ScanPaths)
}
}
// Wire: sync receives on-demand subtitle-fetch jobs (write VTT sidecars).
// Always available (additive, no deletion) as long as we have scan paths.
if len(daemonCfg.ScanPaths) > 0 {
sc.OnSubtitleFetch = func(reqs []agent.SubtitleFetchRequest) ([]int, []agent.SubtitleFetchError) {
return library.FetchSubtitles(reqs, daemonCfg.ScanPaths)
}
}
// Wire: sync receives stream requests for completed downloads
d.OnStreamRequested = func(sr agent.StreamRequest) {
if streamSrv.CurrentTaskID() == sr.TaskID {
// Already serving — notify server it's ready
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamReady: true,
}); err != nil {
log.Printf("[%s] stream ready re-notify failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
return
}
// reportStreamError tells the web a /stream attempt failed WITHOUT
// marking the download failed (StreamError, not Status). The web clears
// streamRequested and surfaces this so the player fails fast with the
// real reason instead of polling out the 20s "agent didn't respond".
reportStreamError := func(reason string) {
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamError: reason,
}); err != nil {
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
}
// Self-heal a base-path mismatch: the web may hand us a path under an old
// root (e.g. /mnt/nas/peliculas/… from before a binary→docker move) that
// is now outside our allowed dirs but whose file still exists under a
// current root (/downloads/…). resolvePlayableFile remaps, stat-retries
// (NFS) and resolves directories; the next re-scan persists the fix to
// the DB. See docs/plans/unarr-path-resilience.md.
filePath, errCode, perr := resolvePlayableFile(sr.FilePath, streamAllowedRoots(cfg), agent.ShortID(sr.TaskID))
if perr != nil {
log.Printf("[%s] stream request rejected (%s): %v", agent.ShortID(sr.TaskID), errCode, perr)
reportStreamError(perr.Error())
return
}
cancelStreamContexts()
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID)
log.Printf("[%s] streaming from disk: %s → %s", agent.ShortID(sr.TaskID), filepath.Base(filePath), streamSrv.URL())
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
streamRegistry.mu.Lock()
streamRegistry.cancels["watch:"+sr.TaskID] = watchCancel
streamRegistry.mu.Unlock()
go engine.NewWatchReporter(agentClient, streamSrv, sr.TaskID).Run(watchCtx)
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamReady: true,
}); err != nil {
log.Printf("[%s] stream ready report failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
}
// Wire: sync receives HLS streaming session requests. Each session spawns
// one ffmpeg process and registers its HLS playlist with the StreamServer.
// Validate FilePath against allowed dirs to prevent path traversal abuse
// from a compromised server.
d.OnStreamSession = func(sess agent.StreamSession) {
if playerSessionRegistry.has(sess.SessionID) {
return // already running
}
// failSession logs AND reports a startup failure to the web — every
// abort path in this handler must go through it. A silent `return`
// here left the player probing a playlist that would never exist
// until its 30s deadline (incident 2026-06-10: deleted file + stale
// library row = eternal "Preparando sesión"). Best-effort: on old web
// deployments the endpoint 404s and the player falls back to the
// probe deadline, exactly as before.
failSession := func(sessionID, code, message string) {
log.Printf("[hls %s] failed (%s): %s", agent.ShortID(sessionID), code, message)
go func() {
// Fresh context on purpose: failures cluster exactly when the
// daemon ctx is being cancelled (shutdown kills in-flight
// session starts), and a report derived from it would die
// before reaching the web. The 10s cap still bounds it.
rctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := agentClient.ReportSessionError(rctx, sessionID, code, message); err != nil {
log.Printf("[hls %s] session error report failed: %v", agent.ShortID(sessionID), err)
}
}()
}
// markReady reports "first bytes are servable" for the no-transcode
// paths (direct-play, remux, debrid direct) — one place instead of a
// copy per branch. HLS sessions report via watchSessionReady instead
// (they wait for seg-0 + attach a health snapshot).
markReady := func(sessionID string) {
go func() {
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := agentClient.MarkSessionReady(rctx, sessionID, nil); err != nil {
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
}
}()
}
// startHLSPlayback starts an HLS encode (local file or debrid URL) and
// wires it into the StreamServer. Shared by the local-file HLS path and
// the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe
// off the sync loop, and report readiness identically.
//
// Prewarm sessions (background cache-fill: next-episode, hover) take a
// deferential path: wait until no live encode is running (never steal
// the encoder from the viewer), then register WITHOUT displacing other
// sessions. Before this, a prewarm claimed mid-playback went through
// Register() and KILLED the stream the user was watching (verified
// 2026-06-10: prewarm started → live session "closed (cache
// discarded)" → player 404).
startHLSPlayback := func(hlsCfg engine.HLSSessionConfig, hlsCtx context.Context, hlsCancel context.CancelFunc) {
playerSessionRegistry.add(hlsCfg.SessionID, hlsCancel)
prewarm := sess.Prewarm
go func() {
if prewarm {
// Defer until the encoder is free. Poll is cheap (10 s);
// cap the wait at 30 min — a prewarm that can't start
// within an episode's runtime has lost its purpose.
deadline := time.Now().Add(30 * time.Minute)
for streamSrv.HLS().HasLiveEncode() {
if time.Now().After(deadline) || hlsCtx.Err() != nil {
playerSessionRegistry.remove(hlsCfg.SessionID)
hlsCancel()
log.Printf("[hls %s] prewarm abandoned (encoder busy %s)",
agent.ShortID(hlsCfg.SessionID), "30m")
return
}
select {
case <-hlsCtx.Done():
playerSessionRegistry.remove(hlsCfg.SessionID)
return
case <-time.After(10 * time.Second):
}
}
} else {
// REAL session: reap in-flight prewarm encodes BEFORE
// StartHLSSession so the per-key cache writer-lock is
// free and the viewer's encode lands in the persistent
// cache (not an uncached tmpdir). A SEALED prewarm is
// unaffected — this session simply cache-HITs it.
if n := streamSrv.HLS().CloseWhere(func(s *engine.HLSSession) bool { return s.IsPrewarm() }); n > 0 {
log.Printf("[hls %s] reaped %d in-flight prewarm(s) for the viewer session",
agent.ShortID(hlsCfg.SessionID), n)
}
}
hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg)
if err != nil {
playerSessionRegistry.remove(hlsCfg.SessionID)
hlsCancel()
failSession(hlsCfg.SessionID, sessErrStartFailed, err.Error())
return
}
if prewarm {
// Side-by-side: never evict the viewer's session. A later
// REAL session still evicts this one via Register — by
// then the encode is usually sealed in the segment cache.
streamSrv.HLS().RegisterKeep(hsess)
log.Printf("[hls %s] prewarm encoding: %s", agent.ShortID(hlsCfg.SessionID), hlsCfg.FileName)
return // no viewer waiting → no ready-watcher
}
streamSrv.HLS().Register(hsess)
go watchSessionReady(hlsCtx, agentClient, hsess, hlsCfg.SessionID)
}()
}
// Debrid direct-play (hueco #2 / 2a): the source has no local file — the
// web resolved an HTTPS debrid link (cache-confirmed, browser-native
// container) and the daemon streams /stream from it via ranged GETs.
// Runs BEFORE the filePath checks (there is no local path) and needs no
// ffmpeg. PlayMethod != "hls" distinguishes this from the debrid
// HLS-from-URL branch below (a non-native container the web wants
// transcoded). Provider setup does a HEAD, so hand it off to a goroutine
// to keep the sync loop from blocking other pending actions; register the
// session up front so a duplicate sync within the setup window is a
// no-op (matches the HLS branch's handoff rationale).
if sess.DirectURL != "" && sess.PlayMethod != "hls" {
playerSessionRegistry.add(sess.SessionID, func() { streamSrv.ClearFile() })
// refresh re-resolves a fresh debrid link when this one expires
// mid-stream (hueco #2 / 2c). Bound to the daemon ctx so a shutdown
// cancels an in-flight refresh.
refresh := func(rctx context.Context) (string, error) {
return agentClient.RefreshStreamURL(rctx, sess.SessionID)
}
go func() {
bctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
provider, perr := engine.NewDebridFileProvider(bctx, sess.DirectURL, sess.FileName, sess.FileSize, refresh)
if perr != nil {
playerSessionRegistry.remove(sess.SessionID)
failSession(sess.SessionID, sessErrStartFailed, fmt.Sprintf("debrid provider: %v", perr))
return
}
streamSrv.SetFile(provider, sess.TaskID)
log.Printf("[stream %s] debrid direct-play: %s (%d bytes)",
agent.ShortID(sess.SessionID), provider.FileName(), provider.FileSize())
markReady(sess.SessionID)
}()
return
}
// Debrid HLS-from-URL (hueco #2 / 2b): the source is debrid-cached but
// NOT browser-native (mkv/HEVC/…), so the web set playMethod="hls"
// alongside the DirectURL. ffmpeg transcodes straight from the HTTP URL —
// no local file, no torrent. Cache is keyed by info_hash (not the
// per-resolution URL) so a re-play hits the segment cache.
if sess.DirectURL != "" { // playMethod == "hls" implied (2a returned above)
tcRuntime := buildTranscodeRuntime(ctx, cfg)
if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" {
failSession(sess.SessionID, sessErrFfmpegMissing, "ffmpeg/ffprobe unavailable (debrid HLS)")
return
}
hlsCtx, hlsCancel := context.WithCancel(ctx)
startHLSPlayback(engine.HLSSessionConfig{
SessionID: sess.SessionID,
SourceURL: sess.DirectURL,
CacheID: sess.InfoHash,
FileName: sess.FileName,
Quality: sess.Quality,
AudioIndex: sess.AudioIndex,
BurnSubtitleIndex: sess.BurnSubtitleIndex,
StartSec: sess.StartSec,
Prewarm: sess.Prewarm,
Transcode: tcRuntime,
Cache: hlsCache,
// 2c: refresh the debrid link if it expires mid-transcode; the
// auto-restart supervisor calls this before relaunching ffmpeg.
RefreshURL: func(rctx context.Context) (string, error) {
return agentClient.RefreshStreamURL(rctx, sess.SessionID)
},
}, hlsCtx, hlsCancel)
log.Printf("[hls %s] debrid HLS-from-URL: %s", agent.ShortID(sess.SessionID), sess.FileName)
return
}
if sess.FilePath == "" {
failSession(sess.SessionID, sessErrStartFailed, "empty file path")
return
}
// SAME base-path self-heal + stat-retry + dir resolution as the raw
// /stream handler (resolvePlayableFile). A path under an old/host base
// (e.g. /mnt/nas/peliculas/… handed by the web while this docker agent
// mounts that media at /downloads) remaps onto the current root; a path
// whose file is genuinely gone fails fast as "file_missing" so the web
// can prune the stale library row and the player can fall back, instead
// of the player probing a playlist that will never exist.
// See docs/plans/unarr-path-resilience.md.
filePath, errCode, perr := resolvePlayableFile(sess.FilePath, streamAllowedRoots(cfg), "hls "+agent.ShortID(sess.SessionID))
if perr != nil {
failSession(sess.SessionID, errCode, perr.Error())
return
}
// Direct-play (hueco #3 / 3a): the web decided this source is already
// browser-native (mp4 h264/aac 8-bit SDR) from library scan metadata,
// gated on agent version. Serve the raw file over /stream (HTTP Range,
// no ffmpeg) instead of transcoding to HLS — zero CPU, instant seek.
// Runs BEFORE the ffmpeg-availability check on purpose: direct-play
// needs no ffmpeg, so it must work even when transcode is disabled.
if sess.PlayMethod == "direct" {
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sess.TaskID)
// cancel just clears the served file so daemon shutdown / drain
// stops exposing it on /stream. There's no ffmpeg child to kill.
playerSessionRegistry.add(sess.SessionID, func() { streamSrv.ClearFile() })
log.Printf("[stream %s] direct-play: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath))
// File is on disk → ready immediately. Tell the web so the player
// attaches <video src> without burning its HEAD-probe retry budget.
markReady(sess.SessionID)
return
}
tcRuntime := buildTranscodeRuntime(ctx, cfg)
if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" {
failSession(sess.SessionID, sessErrFfmpegMissing, "ffmpeg/ffprobe unavailable")
return
}
// Remux path (hueco #3 / 3b): codecs are browser-native (h264/aac) but
// the container isn't (mkv). ffmpeg `-c copy` → growing fMP4 served raw
// over /stream — no video re-encode, no HLS. The web decided this from
// scan metadata + version gate; we still need ffmpeg (copy uses it).
if sess.PlayMethod == "remux" {
tStart := time.Now()
probeCtx, cancelProbe := context.WithTimeout(ctx, 15*time.Second)
probe, perr := engine.ProbeFile(probeCtx, tcRuntime.FFprobePath, filePath)
cancelProbe()
if perr != nil {
failSession(sess.SessionID, sessErrStartFailed, fmt.Sprintf("remux probe: %v", perr))
return
}
tProbe := time.Now()
remuxCtx, remuxCancel := context.WithCancel(ctx)
src, serr := engine.NewRemuxSource(remuxCtx, filePath, probe, tcRuntime.FFmpegPath, sess.FileName)
if serr != nil {
remuxCancel()
failSession(sess.SessionID, sessErrStartFailed, fmt.Sprintf("remux start: %v", serr))
return
}
streamSrv.SetGrowingFile(src, sess.TaskID)
// cancel stops the ffmpeg copy; SetGrowingFile/ClearFile also Close()
// the source, so the temp file is always cleaned up.
playerSessionRegistry.add(sess.SessionID, func() { remuxCancel(); streamSrv.ClearFile() })
// Startup timing (TTFF diagnosis): probe = ffprobe on the source;
// spawn = ffmpeg launch + tmp setup. First-fMP4-byte is logged by the
// source itself; serveGrowing logs any client read that blocks waiting
// for ffmpeg to catch up.
log.Printf("[stream %s] remux (copy) → fMP4: %s [probe=%v spawn=%v]",
agent.ShortID(sess.SessionID), filepath.Base(filePath),
tProbe.Sub(tStart).Round(time.Millisecond), time.Since(tProbe).Round(time.Millisecond))
markReady(sess.SessionID)
return
}
// Local-file HLS (the original path). StartHLSSession runs ffprobe
// (15 s cap) inside startHLSPlayback's goroutine so the sync loop
// returns immediately — browser HEAD probes have a 30 s retry budget
// that absorbs the gap until the playlist registers.
hlsCtx, hlsCancel := context.WithCancel(ctx)
startHLSPlayback(engine.HLSSessionConfig{
SessionID: sess.SessionID,
SourcePath: filePath,
FileName: sess.FileName,
Quality: sess.Quality,
AudioIndex: sess.AudioIndex,
BurnSubtitleIndex: sess.BurnSubtitleIndex,
StartSec: sess.StartSec,
Prewarm: sess.Prewarm,
Transcode: tcRuntime,
Cache: hlsCache,
}, hlsCtx, hlsCancel)
}
// Periodic DHT node persistence (every 5 min)
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
torrentDl.SaveDhtNodes()
case <-ctx.Done():
return
}
}
}()
// Periodic HLS session sweeper (every 5 min). Closes sessions whose last
// segment fetch was over 30 min ago — kills the orphan ffmpeg + removes
// the per-session tmpdir, so a tab that died mid-stream doesn't leak
// disk space until daemon shutdown.
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if n := streamSrv.HLS().SweepIdle(); n > 0 {
log.Printf("[hls] swept %d idle session(s)", n)
}
case <-ctx.Done():
return
}
}
}()
// Start auto-scan goroutine
scanPaths := daemonCfg.ScanPaths
if len(scanPaths) > 0 && cfg.Library.AutoScan {
scanInterval := 24 * time.Hour
if cfg.Library.ScanInterval != "" {
if parsed, err := time.ParseDuration(cfg.Library.ScanInterval); err == nil && parsed > 0 {
scanInterval = parsed
}
}
go runAutoScan(ctx, cfg, scanInterval, agentClient, d.ScanNow, scanPaths)
}
// Start reporter only for stream task handling
go reporter.Run(ctx)
// Credential revoked mid-run (agent deleted from the dashboard): wipe the
// stored key + agentId so a supervisor restart can't loop on a rejected
// identity, then stop the daemon. Reconnecting needs a fresh `unarr login`.
d.SyncClient().OnRevoked = func(err error) {
reportAgentRevoked(cfg, err)
cancel()
}
// Legacy bootstrap: if register hands back a per-machine key, persist it so
// the next start authenticates with the bound agent key (one-time migration;
// also stops the server re-minting on every restart).
d.OnAgentKeyMinted = func(newKey string) {
cfg.Auth.APIKey = newKey
if serr := config.Save(cfg, resolvedConfigPath()); serr != nil {
log.Printf("[agent] could not persist per-machine key: %v", serr)
} else {
log.Printf("[agent] migrated to a per-machine agent key")
}
}
// Start daemon (blocks — runs sync loop)
errCh := make(chan error, 1)
go func() {
errCh <- d.Run(ctx)
}()
// Start idle guard for the persistent stream server
go startIdleGuard(ctx, streamSrv)
// Signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Wait for signal or error
select {
case sig := <-sigCh:
fmt.Printf("\n Received %s, shutting down...\n", sig)
cancelStreamContexts()
cancelAllPlayerSessions()
streamSrv.Shutdown(context.Background())
// Drain active downloads BEFORE cancelling the daemon context. Shutdown
// sets shuttingDown + cancels each task context itself, so interrupted
// downloads keep their resume-store entry. Cancelling the shared ctx first
// would make them look like genuine failures and wipe the entry → no resume.
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer shutdownCancel()
manager.Shutdown(shutdownCtx)
cancel()
d.Deregister()
fmt.Println(" Daemon stopped.")
return nil
case err := <-errCh:
cancelStreamContexts()
cancelAllPlayerSessions()
streamSrv.Shutdown(context.Background())
cancel()
// Registration was rejected because this agent's credential is revoked
// (deleted from the dashboard). Wipe it and exit cleanly so the service
// supervisor doesn't restart-loop against a 410; user must re-login.
if agent.IsRevoked(err) {
reportAgentRevoked(cfg, err)
return nil
}
return err
}
}
// reportAgentRevoked tells the user their agent was removed and wipes the
// stored credential (api key + agentId) so the next start requires a fresh
// `unarr login` (which mints a new per-machine key bound to a new agentId)
// instead of looping against a server that keeps rejecting the old identity.
func reportAgentRevoked(cfg config.Config, err error) {
log.Printf("[agent] credential revoked by server (%v) — this machine was removed from your account", err)
cfg.Auth.APIKey = ""
cfg.Agent.ID = ""
if serr := config.Save(cfg, resolvedConfigPath()); serr != nil {
log.Printf("[agent] could not clear stored credential: %v", serr)
}
fmt.Println()
fmt.Println(" This agent was removed from your account.")
fmt.Println(" Run `unarr login` on this machine to reconnect it.")
fmt.Println()
}
// isAllowedStreamPath checks that filePath is within one of the directories
// the daemon is configured to manage. This defends against a compromised API
// server sending a path traversal payload (e.g. /etc/passwd) in StreamRequest.
// isAllowedStreamPath reports whether filePath is contained within one of the
// streamAllowedRoots returns the directory roots a stream / sidecar path is
// permitted under. Single source of truth so the raw /stream, HLS, and
// path-scoped (/thumbnail, /trickplay, /sub) handlers never disagree about what
// is reachable — a root added to one place but not the others would otherwise
// produce confusing partial failures (stream plays, scrubber frames 404).
func streamAllowedRoots(cfg config.Config) []string {
return []string{cfg.Download.Dir, cfg.Library.ScanPath,
cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir}
}
// allowedDirs. filePath must already be cleaned (filepath.Clean) by the caller.
// This defends against a compromised API server sending a path traversal payload.
func isAllowedStreamPath(filePath string, allowedDirs ...string) bool {
for _, dir := range allowedDirs {
if dir == "" {
continue
}
rel, err := filepath.Rel(filepath.Clean(dir), filePath)
if err == nil && !strings.HasPrefix(rel, "..") {
return true
}
}
return false
}
// relocateUnreachable tries to find a file the web asked us to stream under a
// path we can't serve (e.g. an old base path) by joining the longest suffix of
// that path onto each current allowed root and checking it exists. Returns the
// found absolute path or "".
//
// Conservative by design — it must never serve the WRONG file:
// - Requires a tail of at least three segments (collection/season/file), so a
// generic "Season 01/Episode.mkv" can't match a different show by accident.
// Flat single-file-at-root layouts simply aren't self-healed here; the next
// re-scan re-maps them instead.
// - Re-checks containment AFTER resolving symlinks, so a symlink inside a root
// pointing outside it can't be used to escape the allowed dirs (isAllowed
// StreamPath alone is a lexical check that os.Stat would happily follow out).
func relocateUnreachable(filePath string, allowedRoots []string) string {
segs := strings.Split(filepath.ToSlash(filePath), "/")
// Longest tail first (most specific match wins). Stop before 3-segment tails
// so a short, ambiguous suffix can't match the wrong file.
for start := 0; start <= len(segs)-3; start++ {
tail := filepath.Join(segs[start:]...)
if tail == "" {
continue
}
for _, root := range allowedRoots {
if root == "" {
continue
}
cand := filepath.Join(root, tail)
if !isAllowedStreamPath(cand, root) {
continue
}
fi, err := os.Stat(cand)
if err != nil || fi.IsDir() {
continue
}
// Re-validate containment against the symlink-resolved real paths so
// a symlink under the root can't point the stream outside it.
realCand, e1 := filepath.EvalSymlinks(cand)
realRoot, e2 := filepath.EvalSymlinks(root)
if e1 != nil || e2 != nil || !isAllowedStreamPath(realCand, realRoot) {
continue
}
return cand
}
}
return ""
}
// Stable machine codes for the web's session-error channel
// (POST /api/internal/agent/session-error) — mirrored by
// SESSION_ERROR_CODES in the web repo. Only "file_missing" triggers
// destructive self-heal on the web (it prunes the stale library row + task
// pointer), so the resolver must never return it while the file may exist.
const (
pathErrRejected = "path_rejected"
pathErrMissing = "file_missing"
pathErrNoVideo = "no_video_file"
sessErrFfmpegMissing = "ffmpeg_unavailable"
sessErrStartFailed = "start_failed"
)
// resolvePlayableFile validates and self-heals a web-provided source path into
// a playable on-disk video file. Shared by the raw /stream handler and every
// session transport (HLS / remux / direct-play) so they all behave
// identically — before this, the HLS path replicated only the lexical remap
// and silently diverged on stat retries (docs/plans/unarr-path-resilience.md):
//
// 1. Containment: the cleaned path must live under an allowed root; if not,
// relocate it by path tail (old base path → current mount).
// 2. Existence: os.Stat with retries (NFS can transiently fail right after a
// remount or under load — the root of the "works on the 3rd try" stream
// failures), then one last relocate for files that moved within a root.
// 3. Directories resolve to their first contained video file.
//
// On failure returns a stable errCode: "path_rejected" means the file EXISTS
// at the original path but outside every allowed root (an agent config
// problem — the web must NOT prune library rows over it); "file_missing"
// means no readable file was found anywhere; "no_video_file" is a directory
// with nothing playable inside.
func resolvePlayableFile(rawPath string, allowedRoots []string, logLabel string) (string, string, error) {
filePath := filepath.Clean(rawPath)
if !isAllowedStreamPath(filePath, allowedRoots...) {
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" {
log.Printf("[%s] stream self-heal: remapped %s → %s", logLabel, filePath, remapped)
filePath = remapped
} else if _, err := os.Stat(filePath); err == nil {
return "", pathErrRejected, fmt.Errorf("path outside allowed dirs: %s", filePath)
} else {
return "", pathErrMissing, fmt.Errorf("file not found under any allowed root: %s", filePath)
}
}
var info os.FileInfo
var statErr error
for attempt := 0; attempt < 3; attempt++ {
if info, statErr = os.Stat(filePath); statErr == nil {
break
}
if attempt < 2 {
time.Sleep(300 * time.Millisecond)
}
}
if statErr != nil {
// Last resort before failing: the file may simply have moved within
// an allowed root — try to relocate it by path tail.
if remapped := relocateUnreachable(filePath, allowedRoots); remapped != "" {
log.Printf("[%s] stream self-heal: relocated missing %s → %s", logLabel, filePath, remapped)
filePath = remapped
info, statErr = os.Stat(filePath)
}
}
if statErr != nil {
return "", pathErrMissing, fmt.Errorf("file not found after retries: %s (%v)", filePath, statErr)
}
if info.IsDir() {
found := engine.FindVideoFile(filePath)
if found == "" {
return "", pathErrNoVideo, fmt.Errorf("no video file in directory: %s", filePath)
}
log.Printf("[%s] resolved directory to video file: %s", logLabel, filepath.Base(found))
filePath = found
}
return filePath, "", nil
}
func formatSpeedLog(bps int64) string {
switch {
case bps >= 1024*1024*1024:
return fmt.Sprintf("%.1f GB/s", float64(bps)/(1024*1024*1024))
case bps >= 1024*1024:
return fmt.Sprintf("%.1f MB/s", float64(bps)/(1024*1024))
case bps >= 1024:
return fmt.Sprintf("%.0f KB/s", float64(bps)/1024)
default:
return fmt.Sprintf("%d B/s", bps)
}
}
// runAutoScan runs a library scan + sync on a timer or on-demand via scanNow channel.
// It scans all provided paths and syncs each independently so stale-item cleanup
// is scoped to the correct directory prefix on the server.
// basePathChanged reports whether the library's scan root moved since the last
// saved cache — i.e. the previously-scanned root is no longer one of the current
// scan paths. Used to force a full (non-incremental) re-scan so the server can
// re-map paths by fingerprint and reap the old prefix.
func basePathChanged(existing *library.LibraryCache, scanPaths []string) bool {
if existing == nil || len(existing.Items) == 0 || existing.Path == "" {
return false
}
prev := filepath.Clean(existing.Path)
for _, p := range scanPaths {
if filepath.Clean(p) == prev {
return false
}
}
return true
}
func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, ac *agent.Client, scanNow <-chan struct{}, scanPaths []string) {
log.Printf("[auto-scan] enabled: every %s, paths: %v", interval, scanPaths)
select {
case <-time.After(30 * time.Second):
case <-scanNow:
case <-ctx.Done():
return
}
doScan := func() {
defer func() {
if r := recover(); r != nil {
log.Printf("[auto-scan] panic recovered: %v", r)
}
}()
log.Printf("[auto-scan] starting scan of %v", scanPaths)
existing, _ := library.LoadCache()
workers := cfg.Library.Workers
if workers == 0 {
workers = 8
}
// If the library base path changed (e.g. the agent moved from the host
// binary to docker, remapping /mnt/nas/peliculas → /downloads, or the
// user moved their media folder), force a FULL re-scan instead of an
// incremental one. The fingerprint merge on the server then relocates
// existing rows in place rather than duplicating, and per-agent cleanup
// reaps the old prefix. See docs/plans/unarr-path-resilience.md.
forceFull := basePathChanged(existing, scanPaths)
if forceFull {
log.Printf("[auto-scan] WARNING: library base path changed (was %q, now %v) — "+
"running a FULL re-scan. This can take a while on large libraries; "+
"playback and matches are preserved.", existing.Path, scanPaths)
}
scanOpts := library.ScanOptions{
Workers: workers,
FFprobePath: cfg.Library.FFprobePath,
Incremental: existing != nil && !forceFull,
}
// Resolve ffmpeg once for the sidecar prewarm (extracts text subs → WebVTT
// and panel thumbnail frames → JPEG into the hidden ".unarr" cache so /sub
// and /thumbnail are instant + huge remuxes work). Empty/err = prewarm is
// skipped silently (on-demand extraction still runs).
prewarmFFmpeg := ""
if cfg.Library.CacheSubtitles || cfg.Library.CacheThumbnails || cfg.Library.Trickplay.Enabled {
if ff, err := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath); err == nil {
prewarmFFmpeg = ff
} else {
log.Printf("[auto-scan] sidecar prewarm disabled: ffmpeg unavailable: %v", err)
}
}
// Scan every path, then sync ALL of them as ONE session (single
// syncStartedAt + final isLastBatch via library.SyncBatches). Per-root
// sessions let the server's per-agent stale cleanup reap rows of roots
// a session never visited; one full-cycle session makes the cleanup
// sound AND lets it reap old-base-path ghost rows (fullCycle=true —
// only when every root scanned cleanly).
var syncItems []agent.LibrarySyncItem
var coveredRoots []string
fullCycle := true
var mergedItems []library.LibraryItem
for _, scanPath := range scanPaths {
cache, err := library.Scan(ctx, scanPath, existing, scanOpts)
if err != nil {
log.Printf("[auto-scan] scan failed for %s: %v", scanPath, err)
fullCycle = false
continue
}
mergedItems = append(mergedItems, cache.Items...)
coveredRoots = append(coveredRoots, scanPath)
if prewarmFFmpeg != "" {
library.PrewarmSidecars(ctx, cache, library.PrewarmOptions{
FFmpegPath: prewarmFFmpeg,
CacheSubtitles: cfg.Library.CacheSubtitles,
CacheThumbnails: cfg.Library.CacheThumbnails,
Workers: 2,
Trickplay: cfg.Library.Trickplay.Enabled,
TrickplayIntervalSec: cfg.Library.Trickplay.IntervalSeconds(),
TrickplayWidth: cfg.Library.Trickplay.Width,
MaxLoadRatio: cfg.Library.PrewarmMaxLoadRatio,
})
}
items := library.BuildSyncItems(cache)
if len(items) == 0 {
log.Printf("[auto-scan] no items under %s", scanPath)
continue
}
syncItems = append(syncItems, items...)
}
totalSynced := 0
if len(syncItems) > 0 {
res, err := library.SyncBatches(ctx, ac, syncItems, library.SyncOptions{
AgentID: cfg.Agent.ID,
ScanPath: coveredRoots[0],
ScanRoots: coveredRoots,
FullCycle: fullCycle,
})
if err != nil {
log.Printf("[auto-scan] sync failed: %v", err)
} else if res.Removed > 0 {
log.Printf("[auto-scan] server removed %d stale item(s)", res.Removed)
}
totalSynced = res.Synced
} else {
// An entirely-empty library can't open a sync session (the server
// requires ≥1 item per batch), so stale rows survive until a file
// reappears — same trade-off as before, now explicit.
log.Printf("[auto-scan] no items under any scan path — skipping sync")
}
// Save merged cache for incremental scanning next time.
if len(mergedItems) > 0 {
mergedCache := &library.LibraryCache{
ScannedAt: time.Now().UTC().Format(time.RFC3339),
Path: scanPaths[0],
Items: mergedItems,
}
if err := library.SaveCache(mergedCache); err != nil {
log.Printf("[auto-scan] save cache failed: %v", err)
}
}
log.Printf("[auto-scan] synced %d items across %d path(s)", totalSynced, len(scanPaths))
}
doScan()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
doScan()
case <-scanNow:
log.Printf("[auto-scan] on-demand scan triggered")
ticker.Reset(interval)
doScan()
case <-ctx.Done():
return
}
}
}
// superviseFunnel keeps a CloudFlare Quick Tunnel up across cloudflared
// crashes and CF's ~6h tunnel rotation. On a clean exit (cancellation) it
// returns; on a crash it clears the reported URL and respawns with an
// exponential backoff so we don't hammer cloudflared into a tight loop when
// it can't reach the CF edge.
func superviseFunnel(ctx context.Context, d *agent.Daemon, port int) {
backoff := 2 * time.Second
const maxBackoff = 5 * time.Minute
for ctx.Err() == nil {
t, err := funnel.Start(ctx, funnel.Config{Port: port})
if err != nil {
log.Printf("[funnel] could not start CloudFlare tunnel (%v) — retrying in %s", err, backoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
backoff = min(backoff*2, maxBackoff)
continue
}
log.Printf("[funnel] cloudflared started, waiting for public URL...")
go func() {
url, werr := t.WaitURL(45 * time.Second)
if werr != nil {
log.Printf("[funnel] cloudflared did not emit a URL (%v)", werr)
return
}
log.Printf("[funnel] public URL: %s", url)
d.SetFunnelURL(url)
}()
// Block until cloudflared exits (CF rotation, crash, or shutdown).
exitErr := <-t.Done()
_ = t.Close()
d.SetFunnelURL("")
if ctx.Err() != nil {
return
}
if exitErr != nil {
log.Printf("[funnel] cloudflared exited: %v — restarting in %s", exitErr, backoff)
} else {
log.Printf("[funnel] cloudflared exited cleanly — restarting in %s", backoff)
}
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
backoff = min(backoff*2, maxBackoff)
}
}
// mirrorCORSOrigins fetches /api/mirrors from the configured primary (+ extra
// mirror candidates + static IPFS fallback) and returns the discovered URLs as
// Origin strings. Best-effort: any failure logs a warning and returns an empty
// slice; the static defaultCORSAllowedOrigins in validate.go covers the known
// mirrors (.com / .to / built-in onion) so the daemon still accepts the
// official surfaces when this call fails.
//
// Bounded to a short timeout so a slow /api/mirrors response can't delay
// daemon startup — every second here is a second the user can't play.
func mirrorCORSOrigins(parent context.Context, cfg config.Config, userAgent string) []string {
ctx, cancel := context.WithTimeout(parent, 10*time.Second)
defer cancel()
candidates := append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...)
resp, err := agent.FetchMirrorsWithFallback(ctx, candidates, userAgent)
if err != nil {
log.Printf("[cors] mirror discovery failed (%v) — using static allowlist only", err)
return nil
}
seen := make(map[string]struct{})
out := make([]string, 0, len(resp.Mirrors))
add := func(rawURL string) {
if rawURL == "" {
return
}
origin := strings.TrimRight(rawURL, "/")
if _, dup := seen[origin]; dup {
return
}
seen[origin] = struct{}{}
out = append(out, origin)
}
for _, m := range resp.Mirrors {
add(m.URL)
}
if resp.Tor != nil {
add(resp.Tor.URL)
}
if len(out) > 0 {
log.Printf("[cors] merged %d mirror origins from /api/mirrors", len(out))
}
return out
}
// watchSessionReady polls HLSSession.ReadyCount until the first segment +
// init.mp4 are on disk, then POSTs /api/internal/agent/session-ready so
// the web side flips streaming_session.ready_at — which its SSE endpoint
// pushes to subscribed players. Cache-HIT sessions are ready the moment
// StartHLSSession returns and POST immediately.
//
// Bounded by a 60 s deadline so a permanently stuck encoder doesn't keep
// a goroutine alive forever; if seg-0 never lands the player falls back
// to its existing HEAD-probe retry path anyway.
func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.HLSSession, sessionID string) {
deadline := time.Now().Add(60 * time.Second)
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
readyPosted := false
postReady := func(health *agent.SessionHealth) {
// Parent ctx so a session cancel mid-POST (user closed tab, daemon
// shutdown) tears down the in-flight webhook instead of blocking the
// goroutine for up to 10 s on a now-orphan call.
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
if err := client.MarkSessionReady(rctx, sessionID, health); err != nil {
log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
}
cancel()
}
for {
// Session torn down through a path that didn't cancel ctx (registry
// replace, idle sweep, internal kill). Bail before polling further —
// without this check the watcher could keep alive for up to 60 s on
// a dead HLSSession that's never going to become ready.
if hsess.IsClosed() {
return
}
// Phase 1: cache HIT or first segment ready → flip the "Preparando…"
// UI now. Compare against WriterStartIdx, not `>= 1`: a resume
// session (StartSec) pre-seeds readyMax to the start index, so
// ReadyCount() is ≥ 1 before ffmpeg has written a single byte —
// `>= 1` would fire "ready" instantly and freeze the player waiting
// on a segment that doesn't exist yet.
if !readyPosted && (hsess.FromCache() || hsess.ReadyCount() > hsess.WriterStartIdx()) {
postReady(nil)
readyPosted = true
// Cache replay has no live encode → no telemetry to report, done.
if hsess.FromCache() {
return
}
}
// Phase 2 (F3): once enough -stats samples accumulated (encoder past
// its cold ramp), report ONE live-health snapshot so the player can
// name a too-slow transcode in ~4s instead of inferring it from stalls.
// >=4 samples ≈ 2s of encoding past seg-0; the EWMA has settled by then.
if readyPosted {
if st := hsess.GetTranscodeStats(); st.Samples >= 4 {
postReady(classifyAgentHealth(st))
return
}
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if time.Now().After(deadline) {
if !readyPosted {
log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID))
return
}
// Ready but never got stable telemetry — report whatever we have so
// the player isn't left without a verdict (better partial than none).
if st := hsess.GetTranscodeStats(); st.Samples > 0 {
postReady(classifyAgentHealth(st))
}
return
}
}
}
// agentTLSBaseDomain is the zone the cert broker issues per-agent wildcards
// under. Overridable for staging via UNARR_AGENT_TLS_BASE.
func agentTLSBaseDomain() string {
if v := os.Getenv("UNARR_AGENT_TLS_BASE"); v != "" {
return v
}
return "agent.unarr.app"
}
// fetchAgentCert obtains (or renews) the per-agent TLS cert from the web broker
// and writes it to the agent state dir. The agent's private key never leaves the
// machine — only a CSR is sent. Failure is non-fatal: HTTPS stays off and the
// HTTP listener + CloudFlare funnel keep serving.
func fetchAgentCert(ctx context.Context, client *agent.Client, hash string) {
dataDir := config.DataDir()
if !acme.NeedsIssue(dataDir) {
return
}
base := agentTLSBaseDomain()
csr, err := acme.BuildCSR(dataDir, hash, base)
if err != nil {
log.Printf("[acme] build CSR failed: %v", err)
return
}
cctx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()
cert, err := client.IssueCert(cctx, csr)
if err != nil {
log.Printf("[acme] cert issuance failed (HTTPS stays off, funnel still works): %v", err)
return
}
if err := acme.WriteCert(dataDir, cert); err != nil {
log.Printf("[acme] write cert failed: %v", err)
return
}
log.Printf("[acme] installed cert for *.%s.%s", hash, base)
}
// Realtime-ratio cutoffs for classifyAgentHealth. This is a cross-repo contract
// with the web bottleneck classifier (src/lib/stream/bottleneck-classifier.ts):
// - ≥ realtimeFloor → "ok" (encoder keeps up)
// - [strugglingFloor,..) → "marginal" (barely)
// - < strugglingFloor → "struggling" (can't) — the web fast-path commits
// the honest overlay + pauses on this WITHOUT waiting for a stall, so the
// floor is intentionally conservative (the web uses a looser 0.85 only once
// a stall has already corroborated the slowdown).
const (
agentRealtimeFloor = 0.95
agentStrugglingFloor = 0.75
)
// classifyAgentHealth turns a live ffmpeg telemetry snapshot into the health
// report the web side consumes (F3). The ×realtime speed is the load-bearing
// signal: < 1.0 means the encode can't keep up with playback. An input-bound
// hint (source read error) reclassifies the cause as the link, not the encoder.
func classifyAgentHealth(st engine.TranscodeStats) *agent.SessionHealth {
ratio := st.SpeedX
var health, reason string
switch {
case st.InputBound && ratio < agentRealtimeFloor:
health, reason = "struggling", "input_bound"
case ratio >= agentRealtimeFloor:
health, reason = "ok", "realtime"
case ratio >= agentStrugglingFloor:
health, reason = "marginal", "transcode"
default:
health, reason = "struggling", "transcode"
}
return &agent.SessionHealth{Health: health, RealtimeRatio: ratio, Reason: reason}
}