fix(trickplay): stop scan-time sprite generation from saturating the host
Some checks failed
CI / Test (push) Failing after 6m21s
CI / Build (push) Successful in 1m34s
CI / Build-1 (push) Successful in 2m0s
CI / Build-2 (push) Successful in 1m33s
CI / Build-3 (push) Successful in 1m38s
CI / Build-4 (push) Successful in 1m35s
CI / Build-5 (push) Successful in 1m38s
CI / Lint (push) Failing after 2m34s
CI / Coverage (push) Failing after 2m44s
CI / Vet (push) Successful in 2m3s
Some checks failed
CI / Test (push) Failing after 6m21s
CI / Build (push) Successful in 1m34s
CI / Build-1 (push) Successful in 2m0s
CI / Build-2 (push) Successful in 1m33s
CI / Build-3 (push) Successful in 1m38s
CI / Build-4 (push) Successful in 1m35s
CI / Build-5 (push) Successful in 1m38s
CI / Lint (push) Failing after 2m34s
CI / Coverage (push) Failing after 2m44s
CI / Vet (push) Successful in 2m3s
Trickplay sprite generation (one full-decode ffmpeg pass per file) could pin a machine: multiple agents on the same library decoded the same 4K file at once, no CPU throttling, and crashed/restarted agents orphaned ffmpeg to init (it ran the full 45-min decode to completion). Stacked orphans spiked a box to load ~140. - Single-flight lock: O_CREATE|O_EXCL .lock in the shared sidecar dir so two agents watching the same library never decode the same file twice (stale locks reclaimed after a TTL). Returns ErrTrickplayInProgress → prewarm skips, not fail. - Load gate: defer the heavy decode until 1-min load ≤ max(ratio×NumCPU, 1.5), capped at 15 min so it throttles without ever becoming a permanent off-switch on busy / small hosts. New knob library.prewarm_max_load_ratio (default 0.7). - Concurrency: trickSem caps trickplay to ONE decode at a time per agent. - CPU priority: setLowCPUPriority (nice 19) alongside the existing idle ionice. - No orphans: hardenCmd sets Setpgid + Pdeathsig=SIGKILL, with runtime.LockOSThread around the child so the kernel kills ffmpeg exactly when the agent dies (and not spuriously — golang/go#27505). Tests: single-flight/stale-reclaim, load-gate immediate/cancel, and an e2e Pdeathsig orphan-kill check.
This commit is contained in:
parent
aba20e2078
commit
c82826bf68
10 changed files with 399 additions and 8 deletions
65
internal/library/mediainfo/harden_linux_test.go
Normal file
65
internal/library/mediainfo/harden_linux_test.go
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
//go:build linux
|
||||
|
||||
package mediainfo
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestHardenCmd_KillsChildOnParentDeath is the e2e guarantee for the orphan fix:
|
||||
// a child spawned with hardenCmd must be SIGKILL'd by the kernel the instant its
|
||||
// parent process dies (Pdeathsig), so an agent crash/restart can never leave an
|
||||
// ffmpeg running to ppid 1. It re-execs this test binary as a short-lived helper
|
||||
// that starts `sleep`, prints the sleep PID, then exits — and asserts that PID is
|
||||
// gone afterwards.
|
||||
func TestHardenCmd_KillsChildOnParentDeath(t *testing.T) {
|
||||
if os.Getenv("UNARR_PDEATHSIG_CHILD") == "1" {
|
||||
// Helper role: start a hardened long sleep, announce its PID, then exit so
|
||||
// the kernel fires Pdeathsig on it.
|
||||
cmd := exec.Command("sleep", "120")
|
||||
hardenCmd(cmd)
|
||||
if err := cmd.Start(); err != nil {
|
||||
fmt.Println("ERR", err)
|
||||
os.Exit(2)
|
||||
}
|
||||
fmt.Println(cmd.Process.Pid)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
helper := exec.Command(os.Args[0], "-test.run=TestHardenCmd_KillsChildOnParentDeath", "-test.v")
|
||||
helper.Env = append(os.Environ(), "UNARR_PDEATHSIG_CHILD=1")
|
||||
out, err := helper.Output()
|
||||
if err != nil {
|
||||
t.Fatalf("helper run: %v (out=%q)", err, out)
|
||||
}
|
||||
|
||||
var sleepPID int
|
||||
for _, line := range strings.Split(string(out), "\n") {
|
||||
if n, perr := strconv.Atoi(strings.TrimSpace(line)); perr == nil && n > 0 {
|
||||
sleepPID = n
|
||||
break
|
||||
}
|
||||
}
|
||||
if sleepPID == 0 {
|
||||
t.Fatalf("could not parse child PID from helper output: %q", out)
|
||||
}
|
||||
|
||||
// Give the kernel a moment to deliver SIGKILL after the helper exited.
|
||||
deadline := time.Now().Add(3 * time.Second)
|
||||
for time.Now().Before(deadline) {
|
||||
if syscall.Kill(sleepPID, 0) != nil {
|
||||
return // process gone → Pdeathsig worked
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
// Cleanup if it somehow survived, then fail.
|
||||
_ = syscall.Kill(sleepPID, syscall.SIGKILL)
|
||||
t.Fatalf("child %d survived parent death — Pdeathsig not applied (orphan leak)", sleepPID)
|
||||
}
|
||||
|
|
@ -2,7 +2,13 @@
|
|||
|
||||
package mediainfo
|
||||
|
||||
import "syscall"
|
||||
import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// Linux I/O priority (ioprio) constants. The 16-bit ioprio value packs a class
|
||||
// in the top 3 bits (shift 13) and a class-data nibble below it; the IDLE class
|
||||
|
|
@ -23,3 +29,47 @@ func setIdleIOPriority(pid int) {
|
|||
ioprio := ioprioClassIdle << ioprioClassShift // IDLE class, data 0
|
||||
_, _, _ = syscall.Syscall(syscall.SYS_IOPRIO_SET, uintptr(ioprioWhoProcess), uintptr(pid), uintptr(ioprio))
|
||||
}
|
||||
|
||||
// setLowCPUPriority best-effort drops a process to the lowest CPU niceness (19),
|
||||
// so the heavy trickplay full-decode pass yields the CPU to foreground work.
|
||||
// Pairs with setIdleIOPriority (disk): IDLE I/O alone is not enough when the
|
||||
// bottleneck is software/contended 4K decode — without CPU nice, N stacked
|
||||
// decodes pin every core (the host hit load ~140). Errors are ignored — it's an
|
||||
// optimization, not required for correctness.
|
||||
func setLowCPUPriority(pid int) {
|
||||
_ = syscall.Setpriority(syscall.PRIO_PROCESS, pid, 19)
|
||||
}
|
||||
|
||||
// hardenCmd makes the child ffmpeg die with this agent. Setpgid isolates it in
|
||||
// its own process group, and Pdeathsig=SIGKILL asks the kernel to kill it the
|
||||
// instant the agent process dies. Without this, exec.CommandContext can only
|
||||
// enforce its timeout from an in-process goroutine — an agent crash / restart /
|
||||
// SIGKILL kills that goroutine, so the ffmpeg is reparented to init (ppid 1) and
|
||||
// runs its full 45-min decode to the end. Successive dev restarts stacked those
|
||||
// orphans (one pair per restart) and spiked the box to load ~140.
|
||||
func hardenCmd(cmd *exec.Cmd) {
|
||||
if cmd.SysProcAttr == nil {
|
||||
cmd.SysProcAttr = &syscall.SysProcAttr{}
|
||||
}
|
||||
cmd.SysProcAttr.Setpgid = true
|
||||
cmd.SysProcAttr.Pdeathsig = syscall.SIGKILL
|
||||
}
|
||||
|
||||
// LoadAverage1 returns the 1-minute system load from /proc/loadavg. ok=false when
|
||||
// it can't be read, so callers treat "unknown" as "don't gate" (proceed) rather
|
||||
// than blocking forever.
|
||||
func LoadAverage1() (float64, bool) {
|
||||
b, err := os.ReadFile("/proc/loadavg")
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
fields := strings.Fields(string(b))
|
||||
if len(fields) == 0 {
|
||||
return 0, false
|
||||
}
|
||||
v, err := strconv.ParseFloat(fields[0], 64)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
return v, true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,5 +2,12 @@
|
|||
|
||||
package mediainfo
|
||||
|
||||
// setIdleIOPriority is a no-op on non-Linux platforms (ioprio is Linux-specific).
|
||||
import "os/exec"
|
||||
|
||||
// These are Linux-specific optimizations / safeguards; no-ops elsewhere.
|
||||
func setIdleIOPriority(_ int) {}
|
||||
func setLowCPUPriority(_ int) {}
|
||||
func hardenCmd(_ *exec.Cmd) {}
|
||||
|
||||
// LoadAverage1 is unavailable off Linux; ok=false means callers don't gate.
|
||||
func LoadAverage1() (float64, bool) { return 0, false }
|
||||
|
|
|
|||
|
|
@ -3,15 +3,56 @@ package mediainfo
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ErrTrickplayInProgress means another worker — possibly an agent on another host
|
||||
// sharing the same library (e.g. the dev binary on /mnt/nas and the docker agent
|
||||
// on /downloads, the SAME files) — already holds this sprite's lock and is
|
||||
// generating it. The caller must SKIP, not count it as a failure.
|
||||
var ErrTrickplayInProgress = errors.New("trickplay: generation already in progress")
|
||||
|
||||
// trickplayLockTTL bounds a stale lock: longer than the caller's 45-min generation
|
||||
// deadline so a live job is never stolen, short enough that a crashed/killed
|
||||
// worker's lock is reclaimed on a later scan.
|
||||
const trickplayLockTTL = 90 * time.Minute
|
||||
|
||||
// acquireTrickplayLock takes an exclusive, cross-process lock for one sprite by
|
||||
// O_CREATE|O_EXCL on a ".lock" file in the shared sidecar dir, so two agents that
|
||||
// watch the same library never decode the same 4K file at once (the cause of the
|
||||
// 5×-per-file ffmpeg pile-up). A lock older than trickplayLockTTL is assumed
|
||||
// abandoned (owner crashed) and reclaimed. Returns ErrTrickplayInProgress when a
|
||||
// fresh lock is held by someone else.
|
||||
func acquireTrickplayLock(lockPath string) (func(), error) {
|
||||
for attempt := 0; attempt < 2; attempt++ {
|
||||
f, err := os.OpenFile(lockPath, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o644)
|
||||
if err == nil {
|
||||
host, _ := os.Hostname()
|
||||
fmt.Fprintf(f, "%s pid=%d t=%d\n", host, os.Getpid(), time.Now().Unix())
|
||||
_ = f.Close()
|
||||
return func() { _ = os.Remove(lockPath) }, nil
|
||||
}
|
||||
if !os.IsExist(err) {
|
||||
return nil, fmt.Errorf("trickplay lock: %w", err)
|
||||
}
|
||||
if fi, statErr := os.Stat(lockPath); statErr == nil && time.Since(fi.ModTime()) > trickplayLockTTL {
|
||||
_ = os.Remove(lockPath) // stale → reclaim and retry
|
||||
continue
|
||||
}
|
||||
return nil, ErrTrickplayInProgress
|
||||
}
|
||||
return nil, ErrTrickplayInProgress
|
||||
}
|
||||
|
||||
// TrickplayManifest describes the montage sprite layout so a client can map a
|
||||
// playback time to one tile: tileIndex = floor(timeSec / IntervalSec), then
|
||||
// col = tileIndex % Cols, row = tileIndex / Cols, and the tile's pixel box is
|
||||
|
|
@ -126,6 +167,15 @@ func GenerateTrickplay(ctx context.Context, ffmpegPath, mediaPath string, interv
|
|||
if err := os.MkdirAll(filepath.Dir(spritePath), 0o755); err != nil {
|
||||
return TrickplayManifest{}, err
|
||||
}
|
||||
|
||||
// Single-flight across processes/agents: only one worker decodes this file at
|
||||
// a time. Returns ErrTrickplayInProgress (skip, not fail) if another holds it.
|
||||
release, err := acquireTrickplayLock(spritePath + ".lock")
|
||||
if err != nil {
|
||||
return TrickplayManifest{}, err
|
||||
}
|
||||
defer release()
|
||||
|
||||
tmpSprite := spritePath + ".tmp"
|
||||
|
||||
// fps filter wants a rational; format 1/effInterval with enough precision.
|
||||
|
|
@ -144,17 +194,31 @@ func GenerateTrickplay(ctx context.Context, ffmpegPath, mediaPath string, interv
|
|||
"-f", "mjpeg",
|
||||
tmpSprite,
|
||||
}
|
||||
// Pin this goroutine to its OS thread for the whole child lifetime. hardenCmd's
|
||||
// Pdeathsig is delivered when the THREAD that forked dies, not the process
|
||||
// (golang/go#27505); without the lock Go could recycle that thread mid-decode
|
||||
// and the kernel would SIGKILL a perfectly healthy ffmpeg. Locked here (before
|
||||
// the fork in Start) and released after Wait, the thread lives exactly as long
|
||||
// as ffmpeg: it dies only when the agent process itself dies → SIGKILL fires
|
||||
// only then, which is precisely the orphan we want to prevent.
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
|
||||
cmd := exec.CommandContext(ctx, ffmpegPath, args...)
|
||||
var stderr strings.Builder
|
||||
cmd.Stderr = &stderr
|
||||
// Start + idle I/O priority + Wait (matches the subtitle/thumbnail extractors):
|
||||
// this full-decode pass is the heaviest sidecar job and runs in the background
|
||||
// alongside live streaming on the same disk/NFS, so it must yield I/O.
|
||||
// Die-with-parent BEFORE Start so an agent crash can't orphan this decode.
|
||||
hardenCmd(cmd)
|
||||
// Start + idle I/O + lowest CPU niceness + Wait (matches the subtitle/thumbnail
|
||||
// extractors): this full-decode pass is the heaviest sidecar job and runs in the
|
||||
// background alongside live streaming on the same box/NFS, so it must yield both
|
||||
// disk AND CPU. The prewarm also gates it on system load before getting here.
|
||||
if err := cmd.Start(); err != nil {
|
||||
_ = os.Remove(tmpSprite)
|
||||
return TrickplayManifest{}, fmt.Errorf("ffmpeg tile start: %w", err)
|
||||
}
|
||||
setIdleIOPriority(cmd.Process.Pid)
|
||||
setLowCPUPriority(cmd.Process.Pid)
|
||||
if err := cmd.Wait(); err != nil {
|
||||
_ = os.Remove(tmpSprite)
|
||||
return TrickplayManifest{}, fmt.Errorf("ffmpeg tile: %w: %s", err, strings.TrimSpace(stderr.String()))
|
||||
|
|
|
|||
67
internal/library/mediainfo/trickplay_lock_test.go
Normal file
67
internal/library/mediainfo/trickplay_lock_test.go
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
package mediainfo
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestAcquireTrickplayLock_SingleFlight(t *testing.T) {
|
||||
lock := filepath.Join(t.TempDir(), "sprite.jpg.lock")
|
||||
|
||||
release, err := acquireTrickplayLock(lock)
|
||||
if err != nil {
|
||||
t.Fatalf("first acquire: %v", err)
|
||||
}
|
||||
if _, statErr := os.Stat(lock); statErr != nil {
|
||||
t.Fatalf("lock file not created: %v", statErr)
|
||||
}
|
||||
|
||||
// Second acquire while the first is held → skip sentinel, not a real error.
|
||||
if _, err := acquireTrickplayLock(lock); !errors.Is(err, ErrTrickplayInProgress) {
|
||||
t.Fatalf("expected ErrTrickplayInProgress, got %v", err)
|
||||
}
|
||||
|
||||
// After release the lock file is gone and it can be re-acquired.
|
||||
release()
|
||||
if _, statErr := os.Stat(lock); !os.IsNotExist(statErr) {
|
||||
t.Fatalf("lock file should be removed after release, stat err = %v", statErr)
|
||||
}
|
||||
release2, err := acquireTrickplayLock(lock)
|
||||
if err != nil {
|
||||
t.Fatalf("re-acquire after release: %v", err)
|
||||
}
|
||||
release2()
|
||||
}
|
||||
|
||||
func TestAcquireTrickplayLock_ReclaimsStale(t *testing.T) {
|
||||
lock := filepath.Join(t.TempDir(), "sprite.jpg.lock")
|
||||
|
||||
// Simulate a crashed worker: a lock file older than the TTL with no live owner.
|
||||
if err := os.WriteFile(lock, []byte("deadhost pid=999 t=0\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
old := time.Now().Add(-trickplayLockTTL - time.Minute)
|
||||
if err := os.Chtimes(lock, old, old); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
release, err := acquireTrickplayLock(lock)
|
||||
if err != nil {
|
||||
t.Fatalf("stale lock should be reclaimed, got %v", err)
|
||||
}
|
||||
release()
|
||||
}
|
||||
|
||||
func TestAcquireTrickplayLock_FreshNotReclaimed(t *testing.T) {
|
||||
lock := filepath.Join(t.TempDir(), "sprite.jpg.lock")
|
||||
if err := os.WriteFile(lock, []byte("livehost pid=123 t=now\n"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Fresh mtime (just written) → a live owner is assumed; must NOT be stolen.
|
||||
if _, err := acquireTrickplayLock(lock); !errors.Is(err, ErrTrickplayInProgress) {
|
||||
t.Fatalf("fresh lock must not be reclaimed, got %v", err)
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue