From bf8ed0d928c5fee4baef2e82a8bcd422b21b7148 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 27 May 2026 11:15:44 +0200 Subject: [PATCH] refactor(hls): critico-driven hardening of fase 3.2 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses items raised by the multi-agent code review of the 0.9.9 HW accel + first-start work: - EncoderProfile now carries DecodeHwAccel so the demuxer `-hwaccel` flag and the encoder argv derive from a single resolved profile. Adding a new backend can no longer leave the two switches out of sync. - VAAPI no longer passes `-hwaccel_output_format vaapi`. That option pinned decoded frames to GPU memory, but the filter chain (scale, format, setparams) runs on CPU and would fail with "impossible to convert between formats". Frames now decode HW + flow on CPU; the encoder uploads back to GPU. Pre-existing bug, never reported because no one had VAAPI auto-detected in practice. - readyMax field comment + name: documented that it's a COUNT (segments ready), not an index. The semantics were correct but the comment read "highest index" which made `idx < readyMax` look like an off-by-one to reviewers. - probe_cache background janitor: 5-minute sweeper that drops expired entries even when no lookup retouches the key. Lookup-only eviction was fine for small libraries but unbounded for users who browse and abandon thousands of files within a TTL window. Lazy + sync.Once. - probe_cache TTL eviction now re-checks under the write lock so a concurrent re-insert isn't accidentally evicted. - probe_cache size-change test now Chtimes the file back to its original mtime so only `size` differs between store and lookup keys — properly exercises the size-check path. - New TestProbeCache_SweepDropsExpired covers the janitor sweep. - CHANGELOG: backfilled missing compare links 0.6.4 → 0.9.9. - Stale "line ~1119" reference in VideoToolbox comment dropped; the bitrate block moved a few lines and the comment was already wrong. --- CHANGELOG.md | 17 ++++++ internal/engine/hls.go | 81 +++++++++++++++++------------ internal/engine/hwaccel_test.go | 27 ++++++---- internal/engine/probe_cache.go | 51 ++++++++++++++++-- internal/engine/probe_cache_test.go | 52 +++++++++++++++++- 5 files changed, 181 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85a4552..f31f458 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -618,6 +618,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [0.6.6]: https://github.com/torrentclaw/unarr/compare/v0.6.5...v0.6.6 [0.6.5]: https://github.com/torrentclaw/unarr/compare/v0.6.4...v0.6.5 [0.6.4]: https://github.com/torrentclaw/unarr/compare/v0.6.3...v0.6.4 +[0.9.9]: https://github.com/torrentclaw/unarr/compare/v0.9.8...v0.9.9 +[0.9.8]: https://github.com/torrentclaw/unarr/compare/v0.9.7...v0.9.8 +[0.9.7]: https://github.com/torrentclaw/unarr/compare/v0.9.6...v0.9.7 +[0.9.6]: https://github.com/torrentclaw/unarr/compare/v0.9.5...v0.9.6 +[0.9.5]: https://github.com/torrentclaw/unarr/compare/v0.9.4...v0.9.5 +[0.9.4]: https://github.com/torrentclaw/unarr/compare/v0.9.2...v0.9.4 +[0.9.2]: https://github.com/torrentclaw/unarr/compare/v0.9.1...v0.9.2 +[0.9.1]: https://github.com/torrentclaw/unarr/compare/v0.9.0...v0.9.1 +[0.9.0]: https://github.com/torrentclaw/unarr/compare/v0.8.1...v0.9.0 +[0.8.1]: https://github.com/torrentclaw/unarr/compare/v0.8.0...v0.8.1 +[0.8.0]: https://github.com/torrentclaw/unarr/compare/v0.7.0...v0.8.0 +[0.7.0]: https://github.com/torrentclaw/unarr/compare/v0.6.8...v0.7.0 +[0.6.8]: https://github.com/torrentclaw/unarr/compare/v0.6.7...v0.6.8 +[0.6.7]: https://github.com/torrentclaw/unarr/compare/v0.6.6...v0.6.7 +[0.6.6]: https://github.com/torrentclaw/unarr/compare/v0.6.5...v0.6.6 +[0.6.5]: https://github.com/torrentclaw/unarr/compare/v0.6.4...v0.6.5 +[0.6.4]: https://github.com/torrentclaw/unarr/compare/v0.6.3...v0.6.4 [0.6.3]: https://github.com/torrentclaw/unarr/compare/v0.6.2...v0.6.3 [0.6.2]: https://github.com/torrentclaw/unarr/compare/v0.6.1...v0.6.2 [0.6.1]: https://github.com/torrentclaw/unarr/compare/v0.6.0...v0.6.1 diff --git a/internal/engine/hls.go b/internal/engine/hls.go index cbb4501..7d0cf21 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -136,11 +136,13 @@ type HLSSession struct { restartCount int // bounded auto-restart counter (resets on Close) lastRestartAt time.Time - // readyCond + readyMax track which segments ffmpeg has finished writing. - // Handlers waiting on a future segment block on readyCond until the - // poller advances readyMax past their index (or ffmpeg exits). + // readyCh + readyMax track how many segments ffmpeg has finished writing. + // readyMax is a COUNT (not an index): readyMax=N means seg-0 … seg-(N-1) + // are fully on disk. A handler waiting on `idx` blocks until + // `idx < readyMax` (segment idx is present). The pollSegments goroutine + // advances readyMax and re-creates readyCh on every step. readyMu sync.Mutex - readyMax int // highest segment index whose .m4s file is fully written + readyMax int exitErr error exited bool readyCh chan struct{} // closed + replaced each time readyMax advances @@ -975,13 +977,16 @@ func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) return buildHLSFFmpegArgsAt(cfg, probe, tmpDir, 0, 0) } -// EncoderProfile names the codec + preset combination the HLS pipeline picks -// for the given hardware backend + transcode config. Exposed so callers can -// log the chosen encoder before ffmpeg launches (otherwise the resolution -// lives only inside buildHLSFFmpegArgsAt). +// EncoderProfile names the codec + preset + decoder hint combination the HLS +// pipeline picks for the given hardware backend + transcode config. Exposed +// so callers can log the chosen encoder before ffmpeg launches and so both +// the demuxer-side `-hwaccel` flag and the encoder-side argv stay in sync +// (otherwise the two switches in buildHLSFFmpegArgsAt could silently drift +// when adding a new backend). type EncoderProfile struct { - Codec string // ffmpeg encoder name (e.g. "h264_nvenc", "libx264") - Preset string // preset string, or "" when the codec has no preset knob + Codec string // ffmpeg encoder name (e.g. "h264_nvenc", "libx264") + Preset string // preset string, or "" when the codec has no preset knob + DecodeHwAccel string // ffmpeg `-hwaccel` value (e.g. "cuda", "qsv", "vaapi"), or "" } // ResolveEncoderProfile mirrors the codec + preset selection inside @@ -993,6 +998,14 @@ type EncoderProfile struct { // the argv (NVENC uses p1-p7, QSV uses its own subset). So vendor encoders // always use their hardcoded vendor preset and ignore configuredPreset. // VideoToolbox has no preset knob at all. +// +// DecodeHwAccel mirrors the encoder family — `-hwaccel cuda` for NVENC, +// `-hwaccel qsv` for QSV, `-hwaccel vaapi` for VAAPI. We intentionally +// do NOT pass `-hwaccel_output_format vaapi`: that pins decoded frames +// to GPU memory, but our filter chain (scale/format/setparams) runs on +// CPU and can't consume VAAPI surfaces. Keeping output frames on CPU +// makes the filter chain work and the VAAPI encoder still benefits from +// HW-accelerated DECODE on the input side. func ResolveEncoderProfile(hw HWAccel, configuredPreset string) EncoderProfile { codec := hw.FFmpegVideoCodec("h264") switch codec { @@ -1001,17 +1014,20 @@ func ResolveEncoderProfile(hw HWAccel, configuredPreset string) EncoderProfile { if preset == "" { preset = "superfast" } - return EncoderProfile{Codec: codec, Preset: preset} + return EncoderProfile{Codec: codec, Preset: preset, DecodeHwAccel: ""} case "h264_nvenc": - return EncoderProfile{Codec: codec, Preset: "p3"} + return EncoderProfile{Codec: codec, Preset: "p3", DecodeHwAccel: "cuda"} case "h264_qsv": - return EncoderProfile{Codec: codec, Preset: "veryfast"} + return EncoderProfile{Codec: codec, Preset: "veryfast", DecodeHwAccel: "qsv"} + case "h264_vaapi": + return EncoderProfile{Codec: codec, Preset: "", DecodeHwAccel: "vaapi"} case "h264_videotoolbox": // No preset knob for VideoToolbox; the speed/quality dial is `-q:v`. - return EncoderProfile{Codec: codec, Preset: ""} + // VideoToolbox uses per-encoder flags rather than a demuxer hint. + return EncoderProfile{Codec: codec, Preset: "", DecodeHwAccel: ""} } - // VAAPI + future codecs: no preset, vendor-specific knobs handled in argv. - return EncoderProfile{Codec: codec, Preset: ""} + // Unknown / future codecs: software path. + return EncoderProfile{Codec: codec, Preset: "", DecodeHwAccel: ""} } // buildHLSFFmpegArgsAt returns the argv for an HLS encode that starts at the @@ -1019,18 +1035,19 @@ func ResolveEncoderProfile(hw HWAccel, configuredPreset string) EncoderProfile { // startIdx so they slot into the existing manifest at the correct position. // `-output_ts_offset` keeps the segment PTS aligned with manifest timeline. func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string, startIdx int, startSec float64) []string { - hwHint := cfg.Transcode.HWAccel + profile := ResolveEncoderProfile(cfg.Transcode.HWAccel, cfg.Transcode.Preset) args := []string{"-y", "-hide_banner", "-loglevel", "warning"} - switch hwHint { - case HWAccelNVENC: - args = append(args, "-hwaccel", "cuda") - case HWAccelQSV: - args = append(args, "-hwaccel", "qsv") - case HWAccelVAAPI: - args = append(args, "-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi") - case HWAccelNone, HWAccelVideoToolbox: - // No demuxer-side hint. + // Demuxer-side HW-decode hint. Sourced from the profile so a future + // codec/hint mismatch is impossible — the encoder + decode hint are + // computed once and stay coherent. Notably we do NOT add + // `-hwaccel_output_format vaapi` on the VAAPI path: that pins decoded + // frames to GPU memory but our CPU filter chain (scale, format, + // setparams) can't consume VAAPI surfaces. Letting frames flow on CPU + // keeps the filter chain working; the encoder still gets HW-accelerated + // decode on the input side. + if profile.DecodeHwAccel != "" { + args = append(args, "-hwaccel", profile.DecodeHwAccel) } // Seek before -i for fast keyframe-aligned start. The new ffmpeg writes @@ -1060,14 +1077,14 @@ func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir strin } args = append(args, "-map", fmt.Sprintf("0:a:%d?", audioIdx)) - // Video encode. Codec + preset are resolved by ResolveEncoderProfile so - // the same logic feeds both the argv builder and per-session log lines. + // Video encode. Codec + preset come from the EncoderProfile resolved at + // the top of this function so the demuxer hint, the encoder, and the + // per-session log line all stay consistent. // // Defaults are biased for FIRST-START LATENCY over quality — the player // blocks on seg-0 before the first frame paints, and a slow seg-0 is // what users notice ("preparando sesión" stuck). Users who want better // quality can override via `download.transcode.preset` in config.toml. - profile := ResolveEncoderProfile(hwHint, cfg.Transcode.Preset) codec := profile.Codec args = append(args, "-c:v", codec) switch codec { @@ -1090,9 +1107,9 @@ func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir strin args = append(args, "-preset", profile.Preset, "-look_ahead", "0") case "h264_videotoolbox": // VideoToolbox has no "preset" knob; `-realtime` flips into the - // low-latency path used by FaceTime. We let `-b:v / -maxrate / - // -bufsize` (set below at line ~1119) drive rate control — - // adding `-q:v` here would conflict because ffmpeg's + // low-latency path used by FaceTime. We let the `-b:v / -maxrate + // / -bufsize` block (added later in this function) drive rate + // control — adding `-q:v` here would conflict because ffmpeg's // videotoolbox encoder treats `-b:v` as authoritative and // silently ignores `-q:v`, so the constant-quality knob never // took effect anyway. diff --git a/internal/engine/hwaccel_test.go b/internal/engine/hwaccel_test.go index 26c2b6f..cf3bec2 100644 --- a/internal/engine/hwaccel_test.go +++ b/internal/engine/hwaccel_test.go @@ -42,22 +42,29 @@ func TestResolveEncoderProfileDefaults(t *testing.T) { configured string wantCodec string wantPreset string + wantHint string }{ // Empty configured preset → pick latency-biased default per backend. - {HWAccelNone, "", "libx264", "superfast"}, - {HWAccelNVENC, "", "h264_nvenc", "p3"}, - {HWAccelQSV, "", "h264_qsv", "veryfast"}, + // DecodeHwAccel matches the encoder family for HW encoders; libx264 + + // VideoToolbox have no demuxer hint. + {HWAccelNone, "", "libx264", "superfast", ""}, + {HWAccelNVENC, "", "h264_nvenc", "p3", "cuda"}, + {HWAccelQSV, "", "h264_qsv", "veryfast", "qsv"}, + // VAAPI: decoder hint set, no preset, no `-hwaccel_output_format vaapi` + // (so the CPU filter chain can consume the decoded frames). + {HWAccelVAAPI, "", "h264_vaapi", "", "vaapi"}, // VideoToolbox has no preset knob — Preset should be "" regardless of input. - {HWAccelVideoToolbox, "p4", "h264_videotoolbox", ""}, - {HWAccelVideoToolbox, "", "h264_videotoolbox", ""}, - // VAAPI codec name resolved correctly; no preset substitution (uses ""). - {HWAccelVAAPI, "", "h264_vaapi", ""}, + // VideoToolbox uses per-encoder flags, not a demuxer `-hwaccel` hint. + {HWAccelVideoToolbox, "p4", "h264_videotoolbox", "", ""}, + {HWAccelVideoToolbox, "", "h264_videotoolbox", "", ""}, } for _, tc := range cases { got := ResolveEncoderProfile(tc.hw, tc.configured) - if got.Codec != tc.wantCodec || got.Preset != tc.wantPreset { - t.Errorf("ResolveEncoderProfile(%s, %q) = {%s, %s}, want {%s, %s}", - tc.hw, tc.configured, got.Codec, got.Preset, tc.wantCodec, tc.wantPreset) + if got.Codec != tc.wantCodec || got.Preset != tc.wantPreset || got.DecodeHwAccel != tc.wantHint { + t.Errorf("ResolveEncoderProfile(%s, %q) = {codec=%s preset=%s hint=%s}, want {codec=%s preset=%s hint=%s}", + tc.hw, tc.configured, + got.Codec, got.Preset, got.DecodeHwAccel, + tc.wantCodec, tc.wantPreset, tc.wantHint) } } } diff --git a/internal/engine/probe_cache.go b/internal/engine/probe_cache.go index d57c5e6..fcc7dec 100644 --- a/internal/engine/probe_cache.go +++ b/internal/engine/probe_cache.go @@ -13,6 +13,13 @@ import ( // mtime delta. const probeCacheTTL = 30 * time.Minute +// probeCacheJanitorInterval is how often the background sweeper wakes to +// drop expired entries. Lookup-time eviction handles hot paths, but a +// user who browses 5k files and then stops would leak entries until each +// is individually re-touched. 5 min ≈ 6 sweeps per TTL window — enough +// to keep memory bounded without burning CPU. +const probeCacheJanitorInterval = 5 * time.Minute + type probeCacheEntry struct { probe *StreamProbe expires time.Time @@ -25,10 +32,42 @@ type probeCacheKey struct { } var ( - probeCacheMu sync.RWMutex - probeCache = make(map[probeCacheKey]probeCacheEntry) + probeCacheMu sync.RWMutex + probeCache = make(map[probeCacheKey]probeCacheEntry) + probeCacheJanitor sync.Once ) +// startProbeCacheJanitor launches the background sweeper exactly once per +// process. Lazy — fired on first storeProbeCache. Drops expired entries +// every probeCacheJanitorInterval. Idempotent (sync.Once). +func startProbeCacheJanitor() { + probeCacheJanitor.Do(func() { + go func() { + ticker := time.NewTicker(probeCacheJanitorInterval) + defer ticker.Stop() + for range ticker.C { + sweepProbeCache(time.Now()) + } + }() + }) +} + +// sweepProbeCache removes every entry whose expiry is at or before `now`. +// Exposed for tests; production code calls it indirectly via the janitor +// goroutine. +func sweepProbeCache(now time.Time) int { + probeCacheMu.Lock() + defer probeCacheMu.Unlock() + removed := 0 + for k, e := range probeCache { + if !now.Before(e.expires) { + delete(probeCache, k) + removed++ + } + } + return removed +} + // lookupProbeCache returns the cached StreamProbe for the given path if its // mtime + size still match the value recorded at insert time, AND the cache // entry hasn't expired. Any stat failure / mismatch returns (nil, false) so @@ -50,8 +89,12 @@ func lookupProbeCache(path string) (*StreamProbe, bool) { return nil, false } if time.Now().After(entry.expires) { + // Re-check under the write lock so a concurrent re-insert (same key, + // fresh expiry) isn't accidentally evicted. probeCacheMu.Lock() - delete(probeCache, key) + if cur, stillThere := probeCache[key]; stillThere && time.Now().After(cur.expires) { + delete(probeCache, key) + } probeCacheMu.Unlock() return nil, false } @@ -78,6 +121,8 @@ func storeProbeCache(path string, probe *StreamProbe) { expires: time.Now().Add(probeCacheTTL), } probeCacheMu.Unlock() + // Lazy janitor — fires once per process. No-op after first call. + startProbeCacheJanitor() } // ResetProbeCache clears the in-memory probe cache. Test-only. diff --git a/internal/engine/probe_cache_test.go b/internal/engine/probe_cache_test.go index b31b10d..76c79da 100644 --- a/internal/engine/probe_cache_test.go +++ b/internal/engine/probe_cache_test.go @@ -73,15 +73,24 @@ func TestProbeCache_SizeChangeInvalidates(t *testing.T) { if err := os.WriteFile(path, []byte("aaaaa"), 0o644); err != nil { t.Fatalf("write: %v", err) } + originalMtime := time.Now().Add(-1 * time.Hour) // stable, in the past + if err := os.Chtimes(path, originalMtime, originalMtime); err != nil { + t.Fatalf("chtimes original: %v", err) + } probe := &StreamProbe{VideoCodec: "h264", DurationSec: 100} storeProbeCache(path, probe) - // Truncate file to a different size + reset mtime to the original (to - // isolate the size-check path). Stat picks up new size immediately. + // Truncate to a different size, then reset mtime to the original so + // only `size` differs between store and lookup keys — isolates the + // size-check path. Without the Chtimes, WriteFile bumps mtime and the + // test would pass via mtime invalidation regardless of size logic. if err := os.WriteFile(path, []byte("a"), 0o644); err != nil { t.Fatalf("rewrite: %v", err) } + if err := os.Chtimes(path, originalMtime, originalMtime); err != nil { + t.Fatalf("chtimes restore: %v", err) + } if _, ok := lookupProbeCache(path); ok { t.Fatal("expected MISS after size change") @@ -152,3 +161,42 @@ func TestProbeCache_StoreNonexistentNoOp(t *testing.T) { t.Fatalf("expected 0 entries; got %d", ProbeCacheSize()) } } + +func TestProbeCache_SweepDropsExpired(t *testing.T) { + ResetProbeCache() + t.Cleanup(ResetProbeCache) + + dir := t.TempDir() + // Two entries: one expired, one fresh. + expiredPath := filepath.Join(dir, "old.mkv") + freshPath := filepath.Join(dir, "new.mkv") + if err := os.WriteFile(expiredPath, []byte("a"), 0o644); err != nil { + t.Fatalf("write expired: %v", err) + } + if err := os.WriteFile(freshPath, []byte("b"), 0o644); err != nil { + t.Fatalf("write fresh: %v", err) + } + + now := time.Now() + fiExp, _ := os.Stat(expiredPath) + fiFresh, _ := os.Stat(freshPath) + + probeCacheMu.Lock() + probeCache[probeCacheKey{path: expiredPath, mtime: fiExp.ModTime().UnixNano(), size: fiExp.Size()}] = probeCacheEntry{ + probe: &StreamProbe{VideoCodec: "h264"}, + expires: now.Add(-1 * time.Minute), // expired + } + probeCache[probeCacheKey{path: freshPath, mtime: fiFresh.ModTime().UnixNano(), size: fiFresh.Size()}] = probeCacheEntry{ + probe: &StreamProbe{VideoCodec: "h264"}, + expires: now.Add(10 * time.Minute), // fresh + } + probeCacheMu.Unlock() + + removed := sweepProbeCache(now) + if removed != 1 { + t.Fatalf("expected 1 expired entry removed; got %d", removed) + } + if ProbeCacheSize() != 1 { + t.Fatalf("expected 1 fresh entry kept; got %d", ProbeCacheSize()) + } +}