diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 97ce849..a1b0a8a 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -1168,6 +1168,7 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, Trickplay: cfg.Library.Trickplay.Enabled, TrickplayIntervalSec: cfg.Library.Trickplay.IntervalSeconds(), TrickplayWidth: cfg.Library.Trickplay.Width, + MaxLoadRatio: cfg.Library.PrewarmMaxLoadRatio, }) } diff --git a/internal/cmd/scan.go b/internal/cmd/scan.go index 23f320d..754ebc1 100644 --- a/internal/cmd/scan.go +++ b/internal/cmd/scan.go @@ -155,6 +155,7 @@ func runScan(dirPath string, workers int, ffprobePath string, noSync bool) error Trickplay: cfg.Library.Trickplay.Enabled, TrickplayIntervalSec: cfg.Library.Trickplay.IntervalSeconds(), TrickplayWidth: cfg.Library.Trickplay.Width, + MaxLoadRatio: cfg.Library.PrewarmMaxLoadRatio, }) } else { fmt.Fprintf(os.Stderr, " Skipping sidecar prewarm: ffmpeg unavailable: %v\n", err) diff --git a/internal/config/config.go b/internal/config/config.go index 325ebda..2d6e664 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -205,6 +205,12 @@ type LibraryConfig struct { // no contention with the active stream (the cause of broken seekbar previews) // — and the file panel picks a few positions from the same grid. Trickplay TrickplayConfig `toml:"trickplay"` + + // PrewarmMaxLoadRatio gates the heavy trickplay decode on system load: a sprite + // job only starts while the 1-min load average is ≤ this × NumCPU, so scan-time + // generation never saturates the machine or the NAS. Default 0.7; 0 falls back + // to the default. Linux-only (no load reading elsewhere → unthrottled). + PrewarmMaxLoadRatio float64 `toml:"prewarm_max_load_ratio"` } // TrickplayConfig controls scan-time trickplay sprite generation. @@ -297,6 +303,7 @@ func Default() Config { Interval: "10s", Width: 240, }, + PrewarmMaxLoadRatio: 0.7, }, } } @@ -381,6 +388,11 @@ func applyDefaults(cfg *Config, meta toml.MetaData) { if !meta.IsDefined("library", "trickplay", "width") { cfg.Library.Trickplay.Width = 240 } + // Load-gate defaults ON for configs predating the key, so an old install can't + // saturate the box with scan-time sprite generation. + if !meta.IsDefined("library", "prewarm_max_load_ratio") { + cfg.Library.PrewarmMaxLoadRatio = 0.7 + } if !meta.IsDefined("downloads", "transcode", "enabled") { cfg.Download.Transcode.Enabled = true diff --git a/internal/library/loadgate_test.go b/internal/library/loadgate_test.go new file mode 100644 index 0000000..8b7c505 --- /dev/null +++ b/internal/library/loadgate_test.go @@ -0,0 +1,46 @@ +package library + +import ( + "context" + "testing" + "time" + + "github.com/torrentclaw/unarr/internal/library/mediainfo" +) + +// A huge ratio means the threshold is always above the real load, so the gate +// must return immediately (no blocking) regardless of how busy the box is. +func TestWaitForLowLoad_HighRatioReturnsImmediately(t *testing.T) { + done := make(chan struct{}) + go func() { + waitForLowLoad(context.Background(), 1e9) + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForLowLoad blocked despite an impossibly-high threshold") + } +} + +// With a tiny ratio the gate would block (load almost always exceeds it), but a +// cancelled context must unblock it promptly — the prewarm has to stop cleanly on +// Ctrl-C / daemon shutdown even while waiting for the machine to go idle. +func TestWaitForLowLoad_RespectsContextCancel(t *testing.T) { + if _, ok := mediainfo.LoadAverage1(); !ok { + t.Skip("no load reading on this platform — gate is a no-op") + } + ctx, cancel := context.WithCancel(context.Background()) + cancel() // already cancelled + + done := make(chan struct{}) + go func() { + waitForLowLoad(ctx, 0.0001) // threshold ~0 → would otherwise block + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("waitForLowLoad ignored a cancelled context") + } +} diff --git a/internal/library/mediainfo/harden_linux_test.go b/internal/library/mediainfo/harden_linux_test.go new file mode 100644 index 0000000..910add5 --- /dev/null +++ b/internal/library/mediainfo/harden_linux_test.go @@ -0,0 +1,65 @@ +//go:build linux + +package mediainfo + +import ( + "fmt" + "os" + "os/exec" + "strconv" + "strings" + "syscall" + "testing" + "time" +) + +// TestHardenCmd_KillsChildOnParentDeath is the e2e guarantee for the orphan fix: +// a child spawned with hardenCmd must be SIGKILL'd by the kernel the instant its +// parent process dies (Pdeathsig), so an agent crash/restart can never leave an +// ffmpeg running to ppid 1. It re-execs this test binary as a short-lived helper +// that starts `sleep`, prints the sleep PID, then exits — and asserts that PID is +// gone afterwards. +func TestHardenCmd_KillsChildOnParentDeath(t *testing.T) { + if os.Getenv("UNARR_PDEATHSIG_CHILD") == "1" { + // Helper role: start a hardened long sleep, announce its PID, then exit so + // the kernel fires Pdeathsig on it. + cmd := exec.Command("sleep", "120") + hardenCmd(cmd) + if err := cmd.Start(); err != nil { + fmt.Println("ERR", err) + os.Exit(2) + } + fmt.Println(cmd.Process.Pid) + os.Exit(0) + } + + helper := exec.Command(os.Args[0], "-test.run=TestHardenCmd_KillsChildOnParentDeath", "-test.v") + helper.Env = append(os.Environ(), "UNARR_PDEATHSIG_CHILD=1") + out, err := helper.Output() + if err != nil { + t.Fatalf("helper run: %v (out=%q)", err, out) + } + + var sleepPID int + for _, line := range strings.Split(string(out), "\n") { + if n, perr := strconv.Atoi(strings.TrimSpace(line)); perr == nil && n > 0 { + sleepPID = n + break + } + } + if sleepPID == 0 { + t.Fatalf("could not parse child PID from helper output: %q", out) + } + + // Give the kernel a moment to deliver SIGKILL after the helper exited. + deadline := time.Now().Add(3 * time.Second) + for time.Now().Before(deadline) { + if syscall.Kill(sleepPID, 0) != nil { + return // process gone → Pdeathsig worked + } + time.Sleep(50 * time.Millisecond) + } + // Cleanup if it somehow survived, then fail. + _ = syscall.Kill(sleepPID, syscall.SIGKILL) + t.Fatalf("child %d survived parent death — Pdeathsig not applied (orphan leak)", sleepPID) +} diff --git a/internal/library/mediainfo/ioprio_linux.go b/internal/library/mediainfo/ioprio_linux.go index 9b1d508..210e922 100644 --- a/internal/library/mediainfo/ioprio_linux.go +++ b/internal/library/mediainfo/ioprio_linux.go @@ -2,7 +2,13 @@ package mediainfo -import "syscall" +import ( + "os" + "os/exec" + "strconv" + "strings" + "syscall" +) // Linux I/O priority (ioprio) constants. The 16-bit ioprio value packs a class // in the top 3 bits (shift 13) and a class-data nibble below it; the IDLE class @@ -23,3 +29,47 @@ func setIdleIOPriority(pid int) { ioprio := ioprioClassIdle << ioprioClassShift // IDLE class, data 0 _, _, _ = syscall.Syscall(syscall.SYS_IOPRIO_SET, uintptr(ioprioWhoProcess), uintptr(pid), uintptr(ioprio)) } + +// setLowCPUPriority best-effort drops a process to the lowest CPU niceness (19), +// so the heavy trickplay full-decode pass yields the CPU to foreground work. +// Pairs with setIdleIOPriority (disk): IDLE I/O alone is not enough when the +// bottleneck is software/contended 4K decode — without CPU nice, N stacked +// decodes pin every core (the host hit load ~140). Errors are ignored — it's an +// optimization, not required for correctness. +func setLowCPUPriority(pid int) { + _ = syscall.Setpriority(syscall.PRIO_PROCESS, pid, 19) +} + +// hardenCmd makes the child ffmpeg die with this agent. Setpgid isolates it in +// its own process group, and Pdeathsig=SIGKILL asks the kernel to kill it the +// instant the agent process dies. Without this, exec.CommandContext can only +// enforce its timeout from an in-process goroutine — an agent crash / restart / +// SIGKILL kills that goroutine, so the ffmpeg is reparented to init (ppid 1) and +// runs its full 45-min decode to the end. Successive dev restarts stacked those +// orphans (one pair per restart) and spiked the box to load ~140. +func hardenCmd(cmd *exec.Cmd) { + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.Setpgid = true + cmd.SysProcAttr.Pdeathsig = syscall.SIGKILL +} + +// LoadAverage1 returns the 1-minute system load from /proc/loadavg. ok=false when +// it can't be read, so callers treat "unknown" as "don't gate" (proceed) rather +// than blocking forever. +func LoadAverage1() (float64, bool) { + b, err := os.ReadFile("/proc/loadavg") + if err != nil { + return 0, false + } + fields := strings.Fields(string(b)) + if len(fields) == 0 { + return 0, false + } + v, err := strconv.ParseFloat(fields[0], 64) + if err != nil { + return 0, false + } + return v, true +} diff --git a/internal/library/mediainfo/ioprio_other.go b/internal/library/mediainfo/ioprio_other.go index 3f84d6a..cea220a 100644 --- a/internal/library/mediainfo/ioprio_other.go +++ b/internal/library/mediainfo/ioprio_other.go @@ -2,5 +2,12 @@ package mediainfo -// setIdleIOPriority is a no-op on non-Linux platforms (ioprio is Linux-specific). +import "os/exec" + +// These are Linux-specific optimizations / safeguards; no-ops elsewhere. func setIdleIOPriority(_ int) {} +func setLowCPUPriority(_ int) {} +func hardenCmd(_ *exec.Cmd) {} + +// LoadAverage1 is unavailable off Linux; ok=false means callers don't gate. +func LoadAverage1() (float64, bool) { return 0, false } diff --git a/internal/library/mediainfo/trickplay.go b/internal/library/mediainfo/trickplay.go index 67fa762..5a33453 100644 --- a/internal/library/mediainfo/trickplay.go +++ b/internal/library/mediainfo/trickplay.go @@ -3,15 +3,56 @@ package mediainfo import ( "context" "encoding/json" + "errors" "fmt" "math" "os" "os/exec" "path/filepath" + "runtime" "strconv" "strings" + "time" ) +// ErrTrickplayInProgress means another worker — possibly an agent on another host +// sharing the same library (e.g. the dev binary on /mnt/nas and the docker agent +// on /downloads, the SAME files) — already holds this sprite's lock and is +// generating it. The caller must SKIP, not count it as a failure. +var ErrTrickplayInProgress = errors.New("trickplay: generation already in progress") + +// trickplayLockTTL bounds a stale lock: longer than the caller's 45-min generation +// deadline so a live job is never stolen, short enough that a crashed/killed +// worker's lock is reclaimed on a later scan. +const trickplayLockTTL = 90 * time.Minute + +// acquireTrickplayLock takes an exclusive, cross-process lock for one sprite by +// O_CREATE|O_EXCL on a ".lock" file in the shared sidecar dir, so two agents that +// watch the same library never decode the same 4K file at once (the cause of the +// 5×-per-file ffmpeg pile-up). A lock older than trickplayLockTTL is assumed +// abandoned (owner crashed) and reclaimed. Returns ErrTrickplayInProgress when a +// fresh lock is held by someone else. +func acquireTrickplayLock(lockPath string) (func(), error) { + for attempt := 0; attempt < 2; attempt++ { + f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644) + if err == nil { + host, _ := os.Hostname() + fmt.Fprintf(f, "%s pid=%d t=%d\n", host, os.Getpid(), time.Now().Unix()) + _ = f.Close() + return func() { _ = os.Remove(lockPath) }, nil + } + if !os.IsExist(err) { + return nil, fmt.Errorf("trickplay lock: %w", err) + } + if fi, statErr := os.Stat(lockPath); statErr == nil && time.Since(fi.ModTime()) > trickplayLockTTL { + _ = os.Remove(lockPath) // stale → reclaim and retry + continue + } + return nil, ErrTrickplayInProgress + } + return nil, ErrTrickplayInProgress +} + // TrickplayManifest describes the montage sprite layout so a client can map a // playback time to one tile: tileIndex = floor(timeSec / IntervalSec), then // col = tileIndex % Cols, row = tileIndex / Cols, and the tile's pixel box is @@ -126,6 +167,15 @@ func GenerateTrickplay(ctx context.Context, ffmpegPath, mediaPath string, interv if err := os.MkdirAll(filepath.Dir(spritePath), 0o755); err != nil { return TrickplayManifest{}, err } + + // Single-flight across processes/agents: only one worker decodes this file at + // a time. Returns ErrTrickplayInProgress (skip, not fail) if another holds it. + release, err := acquireTrickplayLock(spritePath + ".lock") + if err != nil { + return TrickplayManifest{}, err + } + defer release() + tmpSprite := spritePath + ".tmp" // fps filter wants a rational; format 1/effInterval with enough precision. @@ -144,17 +194,31 @@ func GenerateTrickplay(ctx context.Context, ffmpegPath, mediaPath string, interv "-f", "mjpeg", tmpSprite, } + // Pin this goroutine to its OS thread for the whole child lifetime. hardenCmd's + // Pdeathsig is delivered when the THREAD that forked dies, not the process + // (golang/go#27505); without the lock Go could recycle that thread mid-decode + // and the kernel would SIGKILL a perfectly healthy ffmpeg. Locked here (before + // the fork in Start) and released after Wait, the thread lives exactly as long + // as ffmpeg: it dies only when the agent process itself dies → SIGKILL fires + // only then, which is precisely the orphan we want to prevent. + runtime.LockOSThread() + defer runtime.UnlockOSThread() + cmd := exec.CommandContext(ctx, ffmpegPath, args...) var stderr strings.Builder cmd.Stderr = &stderr - // Start + idle I/O priority + Wait (matches the subtitle/thumbnail extractors): - // this full-decode pass is the heaviest sidecar job and runs in the background - // alongside live streaming on the same disk/NFS, so it must yield I/O. + // Die-with-parent BEFORE Start so an agent crash can't orphan this decode. + hardenCmd(cmd) + // Start + idle I/O + lowest CPU niceness + Wait (matches the subtitle/thumbnail + // extractors): this full-decode pass is the heaviest sidecar job and runs in the + // background alongside live streaming on the same box/NFS, so it must yield both + // disk AND CPU. The prewarm also gates it on system load before getting here. if err := cmd.Start(); err != nil { _ = os.Remove(tmpSprite) return TrickplayManifest{}, fmt.Errorf("ffmpeg tile start: %w", err) } setIdleIOPriority(cmd.Process.Pid) + setLowCPUPriority(cmd.Process.Pid) if err := cmd.Wait(); err != nil { _ = os.Remove(tmpSprite) return TrickplayManifest{}, fmt.Errorf("ffmpeg tile: %w: %s", err, strings.TrimSpace(stderr.String())) diff --git a/internal/library/mediainfo/trickplay_lock_test.go b/internal/library/mediainfo/trickplay_lock_test.go new file mode 100644 index 0000000..41c94b0 --- /dev/null +++ b/internal/library/mediainfo/trickplay_lock_test.go @@ -0,0 +1,67 @@ +package mediainfo + +import ( + "errors" + "os" + "path/filepath" + "testing" + "time" +) + +func TestAcquireTrickplayLock_SingleFlight(t *testing.T) { + lock := filepath.Join(t.TempDir(), "sprite.jpg.lock") + + release, err := acquireTrickplayLock(lock) + if err != nil { + t.Fatalf("first acquire: %v", err) + } + if _, statErr := os.Stat(lock); statErr != nil { + t.Fatalf("lock file not created: %v", statErr) + } + + // Second acquire while the first is held → skip sentinel, not a real error. + if _, err := acquireTrickplayLock(lock); !errors.Is(err, ErrTrickplayInProgress) { + t.Fatalf("expected ErrTrickplayInProgress, got %v", err) + } + + // After release the lock file is gone and it can be re-acquired. + release() + if _, statErr := os.Stat(lock); !os.IsNotExist(statErr) { + t.Fatalf("lock file should be removed after release, stat err = %v", statErr) + } + release2, err := acquireTrickplayLock(lock) + if err != nil { + t.Fatalf("re-acquire after release: %v", err) + } + release2() +} + +func TestAcquireTrickplayLock_ReclaimsStale(t *testing.T) { + lock := filepath.Join(t.TempDir(), "sprite.jpg.lock") + + // Simulate a crashed worker: a lock file older than the TTL with no live owner. + if err := os.WriteFile(lock, []byte("deadhost pid=999 t=0\n"), 0o644); err != nil { + t.Fatal(err) + } + old := time.Now().Add(-trickplayLockTTL - time.Minute) + if err := os.Chtimes(lock, old, old); err != nil { + t.Fatal(err) + } + + release, err := acquireTrickplayLock(lock) + if err != nil { + t.Fatalf("stale lock should be reclaimed, got %v", err) + } + release() +} + +func TestAcquireTrickplayLock_FreshNotReclaimed(t *testing.T) { + lock := filepath.Join(t.TempDir(), "sprite.jpg.lock") + if err := os.WriteFile(lock, []byte("livehost pid=123 t=now\n"), 0o644); err != nil { + t.Fatal(err) + } + // Fresh mtime (just written) → a live owner is assumed; must NOT be stolen. + if _, err := acquireTrickplayLock(lock); !errors.Is(err, ErrTrickplayInProgress) { + t.Fatalf("fresh lock must not be reclaimed, got %v", err) + } +} diff --git a/internal/library/prewarm.go b/internal/library/prewarm.go index 5d5162a..50b444e 100644 --- a/internal/library/prewarm.go +++ b/internal/library/prewarm.go @@ -2,8 +2,10 @@ package library import ( "context" + "errors" "log" "math" + "runtime" "sync" "time" @@ -33,6 +35,12 @@ type PrewarmOptions struct { Trickplay bool TrickplayIntervalSec float64 TrickplayWidth int + + // MaxLoadRatio gates the heavy trickplay decode on system load: a job only + // starts while the 1-min load average is ≤ MaxLoadRatio×NumCPU, so sprite + // generation never saturates the machine or the NAS. ≤0 → default 0.7. Has no + // effect on platforms without a load reading (proceeds unthrottled). + MaxLoadRatio float64 } // prewarmJob is one extraction unit: all text subtitles of a file in one ffmpeg @@ -65,6 +73,15 @@ func PrewarmSidecars(ctx context.Context, cache *LibraryCache, opts PrewarmOptio if workers < 1 { workers = 2 } + maxLoadRatio := opts.MaxLoadRatio + if maxLoadRatio <= 0 { + maxLoadRatio = 0.7 + } + // Trickplay is the heaviest job (full 4K decode). Cap it to ONE concurrent + // decode across this agent's workers — the thumbnail/subtitle jobs (light / + // I/O-bound) keep their `workers` parallelism. Cross-agent dup work is stopped + // by the per-file lock inside GenerateTrickplay. + trickSem := make(chan struct{}, 1) jobs := make(chan prewarmJob) var wg sync.WaitGroup @@ -115,19 +132,39 @@ func PrewarmSidecars(ctx context.Context, cache *LibraryCache, opts PrewarmOptio if _, ok := mediainfo.ReadCachedTrickplay(j.path, j.width); ok { continue } + // Serialize the heavy decode (1 at a time) and wait for the box to + // be idle enough before starting — sprite generation must never + // saturate the CPU or the NAS. + select { + case trickSem <- struct{}{}: + case <-ctx.Done(): + return + } + waitForLowLoad(ctx, maxLoadRatio) + if ctx.Err() != nil { + <-trickSem + return + } // Full-decode pass (samples 1 frame per interval over the whole // file) — generous deadline like subtitles; idempotent + cached. + // INVARIANT: this deadline MUST stay below mediainfo.trickplayLockTTL, + // or another agent could reclaim a still-running job's lock and double + // the decode. If you raise this, raise trickplayLockTTL too. jctx, cancel := context.WithTimeout(ctx, 45*time.Minute) _, err := mediainfo.GenerateTrickplay(jctx, opts.FFmpegPath, j.path, opts.TrickplayIntervalSec, j.width, j.duration) cancel() + <-trickSem mu.Lock() - if err != nil { + switch { + case err == nil: + trickCached++ + case errors.Is(err, mediainfo.ErrTrickplayInProgress): + // another worker/agent owns this file — skip, not a failure. + default: failed++ if sampleErr == "" { sampleErr = err.Error() } - } else { - trickCached++ } mu.Unlock() continue @@ -239,6 +276,47 @@ func PrewarmSidecars(ctx context.Context, cache *LibraryCache, opts PrewarmOptio } } +// prewarmLoadWaitCap bounds how long the load gate DEFERS a trickplay job. It's a +// throttle, not an off-switch: on a host whose baseline load is permanently above +// the threshold (a shared prod box, or any 1–2 core machine), an unbounded wait +// would mean sprites NEVER generate. After the cap we proceed anyway — the other +// safeguards (single-flight lock, trickSem=1, nice 19 + idle I/O, Pdeathsig) keep +// one throttled decode from saturating the box. +const prewarmLoadWaitCap = 15 * time.Minute + +// waitForLowLoad defers until the 1-minute system load is at or below +// max(maxRatio×NumCPU, 1.5), or ctx is cancelled, or prewarmLoadWaitCap elapses — +// so the heavy trickplay decode prefers an idle machine but never stalls forever. +// The 1.5 floor reserves ~one core so the gate can still open on 1–2 core hosts +// (without it, threshold 0.7–1.4 is below almost any active machine's load and the +// feature would be permanently off). No load reading (non-Linux) → returns at once. +func waitForLowLoad(ctx context.Context, maxRatio float64) { + threshold := maxRatio * float64(runtime.NumCPU()) + if threshold < 1.5 { + threshold = 1.5 + } + deadline := time.After(prewarmLoadWaitCap) + logged := false + for { + load, ok := mediainfo.LoadAverage1() + if !ok || load <= threshold { + return + } + if !logged { + log.Printf("[prewarm] system load %.1f > %.1f — deferring trickplay (≤ %s)", load, threshold, prewarmLoadWaitCap) + logged = true + } + select { + case <-ctx.Done(): + return + case <-deadline: + log.Printf("[prewarm] load still high after %s — proceeding with throttled trickplay (nice + idle I/O + single-flight still apply)", prewarmLoadWaitCap) + return + case <-time.After(15 * time.Second): + } + } +} + // thumbPositions returns the sample frame offsets (whole seconds) for an item, // matching the web panel: fractions of a known runtime, else fixed fallbacks. func thumbPositions(item LibraryItem) []float64 {