diff --git a/internal/agent/types.go b/internal/agent/types.go index 4086186..d6ba061 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -485,6 +485,13 @@ type StreamSession struct { // slow resume). 0/omitted = start at the beginning. Older daemons simply // don't decode the field and keep the old start-at-0 behaviour. StartSec float64 `json:"startSec,omitempty"` + // Prewarm marks a background cache-fill session (next-episode prewarm, + // hover prewarm): the daemon must encode it WITHOUT displacing the + // viewer's live session — it waits until the active encode finishes and + // registers alongside instead of evicting (Register kills every other + // session; a prewarm claimed mid-playback used to kill the stream the + // user was watching). False/omitted = a real viewer session. + Prewarm bool `json:"prewarm,omitempty"` // PlayMethod is how the daemon should serve this session: // "" — default (HLS transcode); also what legacy servers send. // "direct" — the source is already browser-native (the web decided this diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index ee91dc4..96efae6 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -716,9 +716,49 @@ func runDaemonStart() error { // wires it into the StreamServer. Shared by the local-file HLS path and // the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe // off the sync loop, and report readiness identically. + // + // Prewarm sessions (background cache-fill: next-episode, hover) take a + // deferential path: wait until no live encode is running (never steal + // the encoder from the viewer), then register WITHOUT displacing other + // sessions. Before this, a prewarm claimed mid-playback went through + // Register() and KILLED the stream the user was watching (verified + // 2026-06-10: prewarm started → live session "closed (cache + // discarded)" → player 404). startHLSPlayback := func(hlsCfg engine.HLSSessionConfig, hlsCtx context.Context, hlsCancel context.CancelFunc) { playerSessionRegistry.add(hlsCfg.SessionID, hlsCancel) + prewarm := sess.Prewarm go func() { + if prewarm { + // Defer until the encoder is free. Poll is cheap (10 s); + // cap the wait at 30 min — a prewarm that can't start + // within an episode's runtime has lost its purpose. + deadline := time.Now().Add(30 * time.Minute) + for streamSrv.HLS().HasLiveEncode() { + if time.Now().After(deadline) || hlsCtx.Err() != nil { + playerSessionRegistry.remove(hlsCfg.SessionID) + hlsCancel() + log.Printf("[hls %s] prewarm abandoned (encoder busy %s)", + agent.ShortID(hlsCfg.SessionID), "30m") + return + } + select { + case <-hlsCtx.Done(): + playerSessionRegistry.remove(hlsCfg.SessionID) + return + case <-time.After(10 * time.Second): + } + } + } else { + // REAL session: reap in-flight prewarm encodes BEFORE + // StartHLSSession so the per-key cache writer-lock is + // free and the viewer's encode lands in the persistent + // cache (not an uncached tmpdir). A SEALED prewarm is + // unaffected — this session simply cache-HITs it. + if n := streamSrv.HLS().CloseWhere(func(s *engine.HLSSession) bool { return s.IsPrewarm() }); n > 0 { + log.Printf("[hls %s] reaped %d in-flight prewarm(s) for the viewer session", + agent.ShortID(hlsCfg.SessionID), n) + } + } hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) if err != nil { playerSessionRegistry.remove(hlsCfg.SessionID) @@ -726,6 +766,14 @@ func runDaemonStart() error { log.Printf("[hls %s] start failed: %v", agent.ShortID(hlsCfg.SessionID), err) return } + if prewarm { + // Side-by-side: never evict the viewer's session. A later + // REAL session still evicts this one via Register — by + // then the encode is usually sealed in the segment cache. + streamSrv.HLS().RegisterKeep(hsess) + log.Printf("[hls %s] prewarm encoding: %s", agent.ShortID(hlsCfg.SessionID), hlsCfg.FileName) + return // no viewer waiting → no ready-watcher + } streamSrv.HLS().Register(hsess) go watchSessionReady(hlsCtx, agentClient, hsess, hlsCfg.SessionID) }() @@ -791,6 +839,7 @@ func runDaemonStart() error { AudioIndex: sess.AudioIndex, BurnSubtitleIndex: sess.BurnSubtitleIndex, StartSec: sess.StartSec, + Prewarm: sess.Prewarm, Transcode: tcRuntime, Cache: hlsCache, // 2c: refresh the debrid link if it expires mid-transcode; the @@ -927,6 +976,7 @@ func runDaemonStart() error { AudioIndex: sess.AudioIndex, BurnSubtitleIndex: sess.BurnSubtitleIndex, StartSec: sess.StartSec, + Prewarm: sess.Prewarm, Transcode: tcRuntime, Cache: hlsCache, }, hlsCtx, hlsCancel) diff --git a/internal/engine/hls.go b/internal/engine/hls.go index f5e1c30..f617b75 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -179,7 +179,12 @@ type HLSSessionConfig struct { // killed by an immediate seek-restart when the player asks for the resume // segment (double spawn, slow resume). 0 = start at the beginning. // Ignored on a cache HIT (every segment is already on disk). - StartSec float64 + StartSec float64 + // Prewarm marks a background cache-fill session. The daemon defers its + // encode until no live encode runs and registers it via RegisterKeep + // (never evicting the viewer). It also lets a REAL session close stale + // prewarms up front so the cache writer-lock is free for the viewer. + Prewarm bool Transcode TranscodeRuntime // Cache is an optional persistent segment cache keyed by (source, quality, // audio). When set, completed encodes are kept across sessions so re-plays @@ -341,6 +346,63 @@ func (r *HLSSessionRegistry) Register(s *HLSSession) { } } +// CloseWhere closes + removes every registered session matching pred. Used +// by the REAL-session path to reap stale prewarm encodes BEFORE its own +// StartHLSSession runs — that frees the per-key cache writer-lock, so the +// viewer's encode lands in the persistent cache instead of falling back to +// an uncached per-session tmpdir (and a SEALED prewarm survives as a cache +// HIT: closing a from-cache reader never invalidates the entry). +func (r *HLSSessionRegistry) CloseWhere(pred func(*HLSSession) bool) int { + r.mu.Lock() + victims := make([]*HLSSession, 0, len(r.sessions)) + for id, s := range r.sessions { + if pred(s) { + victims = append(victims, s) + delete(r.sessions, id) + } + } + r.mu.Unlock() + for _, s := range victims { + _ = s.Close() + } + return len(victims) +} + +// IsPrewarm reports whether this session was started as a background +// cache-fill (HLSSessionConfig.Prewarm). cfg is immutable after construction. +func (s *HLSSession) IsPrewarm() bool { return s.cfg.Prewarm } + +// RegisterKeep adds a session WITHOUT displacing the others — the prewarm +// path: a background cache-fill encode must not evict the viewer's live +// session (Register's eviction killed the stream being watched when the +// next-episode prewarm got claimed mid-playback). It still replaces (and +// closes) a previous session with the SAME ID. A later Register() of a real +// viewer session evicts prewarms like any other session — a completed +// (sealed) prewarm survives in the segment cache either way. +func (r *HLSSessionRegistry) RegisterKeep(s *HLSSession) { + r.mu.Lock() + prev := r.sessions[s.cfg.SessionID] + r.sessions[s.cfg.SessionID] = s + r.mu.Unlock() + if prev != nil && prev != s { + _ = prev.Close() + } +} + +// HasLiveEncode reports whether any registered session still has a RUNNING +// ffmpeg (encode not finished). Used to defer prewarm encodes so they never +// compete with the viewer's live transcode for the encoder. +func (r *HLSSessionRegistry) HasLiveEncode() bool { + r.mu.RLock() + defer r.mu.RUnlock() + for _, s := range r.sessions { + if !s.EncodeExited() { + return true + } + } + return false +} + // Remove drops a session from the registry without closing it. func (r *HLSSessionRegistry) Remove(id string) { r.mu.Lock() @@ -666,6 +728,15 @@ func (s *HLSSession) ReadyCount() int { return s.readyMax } +// EncodeExited reports whether this session's ffmpeg has finished (clean or +// crashed) or never ran (cache HIT). False while an encode is producing +// segments. Used by HasLiveEncode to defer prewarm work. +func (s *HLSSession) EncodeExited() bool { + s.readyMu.Lock() + defer s.readyMu.Unlock() + return s.exited +} + // WriterStartIdx returns the segment index the CURRENT ffmpeg writer started // at: 0 for a from-the-beginning encode, the resume segment for a StartSec // session, the seek target after a seek-restart. See ReadyCount for the diff --git a/internal/engine/hls_registry_prewarm_test.go b/internal/engine/hls_registry_prewarm_test.go new file mode 100644 index 0000000..6111285 --- /dev/null +++ b/internal/engine/hls_registry_prewarm_test.go @@ -0,0 +1,80 @@ +package engine + +import "testing" + +// bare session: no ffmpeg, no tmpdir — exercises pure registry semantics. +func bareSession(id string, prewarm bool, exited bool) *HLSSession { + s := &HLSSession{cfg: HLSSessionConfig{SessionID: id, Prewarm: prewarm}} + s.exited = exited + return s +} + +// A prewarm registered via RegisterKeep must NOT evict the viewer's live +// session (the old Register-for-everything path killed the stream being +// watched when the next-episode prewarm got claimed mid-playback). +func TestRegisterKeepDoesNotEvict(t *testing.T) { + r := NewHLSSessionRegistry() + live := bareSession("live", false, false) + r.Register(live) + + pre := bareSession("pre", true, false) + r.RegisterKeep(pre) + + if r.Get("live") == nil { + t.Fatal("RegisterKeep evicted the live session") + } + if r.Get("pre") == nil { + t.Fatal("RegisterKeep did not register the prewarm") + } + if live.isClosed() { + t.Fatal("RegisterKeep closed the live session") + } + + // A REAL session via Register still evicts everything (single viewer). + real2 := bareSession("real2", false, false) + r.Register(real2) + if r.Get("live") != nil || r.Get("pre") != nil { + t.Fatal("Register must evict every other session") + } + if !live.isClosed() || !pre.isClosed() { + t.Fatal("Register must close the evicted sessions") + } +} + +func TestCloseWherePrewarmsOnly(t *testing.T) { + r := NewHLSSessionRegistry() + live := bareSession("live", false, false) + pre1 := bareSession("pre1", true, false) + pre2 := bareSession("pre2", true, true) + r.Register(live) + r.RegisterKeep(pre1) + r.RegisterKeep(pre2) + + n := r.CloseWhere(func(s *HLSSession) bool { return s.IsPrewarm() }) + if n != 2 { + t.Fatalf("CloseWhere closed %d sessions, want 2", n) + } + if r.Get("live") == nil || live.isClosed() { + t.Fatal("CloseWhere must not touch the live session") + } + if r.Get("pre1") != nil || r.Get("pre2") != nil { + t.Fatal("CloseWhere must remove the prewarms from the registry") + } +} + +func TestHasLiveEncode(t *testing.T) { + r := NewHLSSessionRegistry() + if r.HasLiveEncode() { + t.Fatal("empty registry must report no live encode") + } + done := bareSession("done", false, true) // encode finished / cache HIT + r.Register(done) + if r.HasLiveEncode() { + t.Fatal("an exited encode must not count as live") + } + running := bareSession("running", true, false) + r.RegisterKeep(running) + if !r.HasLiveEncode() { + t.Fatal("a running encode must count as live") + } +} diff --git a/internal/library/mediainfo/trickplay.go b/internal/library/mediainfo/trickplay.go index 5a33453..f134be6 100644 --- a/internal/library/mediainfo/trickplay.go +++ b/internal/library/mediainfo/trickplay.go @@ -107,9 +107,16 @@ func ReadCachedTrickplay(mediaPath string, width int) (TrickplayManifest, bool) // GenerateTrickplay builds the montage sprite + manifest for mediaPath and caches // them in the sidecar dir. ONE ffmpeg pass samples a frame every intervalSec // (fps=1/interval), scales each to width (even height), and tiles them into a -// single JPEG. The whole file is decoded once — slow but a one-time, cached, -// scan-time cost (run with idle I/O priority by the prewarm), and it removes ALL -// live extraction during playback (no contention with the active stream). +// single JPEG. +// +// `-skip_frame nokey` makes the decoder touch ONLY keyframes — ~12× less CPU +// than the old full decode (measured 233 s → 19 s CPU on a 24-min 1080p +// episode), which matters because this runs alongside live streaming on the +// same box. The fps filter still emits one frame per UNIFORM tick (it +// repeats the latest keyframe for ticks between keyframes), so the manifest +// contract — tileIndex = floor(t / IntervalSec) — is unchanged and cached +// clients keep working; each tile just shows the nearest keyframe ≤ its +// tick (≤ one GOP off, invisible at 240-320 px scrub size). // // durationSec drives the grid size; pass the probed duration (0 → error, nothing // to sample). The caller owns the ctx deadline (generous at scan time). @@ -179,10 +186,18 @@ func GenerateTrickplay(ctx context.Context, ffmpegPath, mediaPath string, interv tmpSprite := spritePath + ".tmp" // fps filter wants a rational; format 1/effInterval with enough precision. + // eof_action=pass: with -skip_frame nokey a short/all-inter clip can decode + // to a SINGLE keyframe, and fps's default eof handling emits zero frames + // from a one-frame stream (it never sees a later PTS to close the first + // tick) → "Nothing was written into output". pass flushes the last frame + // at EOF instead; on normal media it only matters at the very end, where + // -frames:v 1 + the tile grid already bound the output. fps := fmt.Sprintf("1/%s", strconv.FormatFloat(effInterval, 'f', 3, 64)) - vf := fmt.Sprintf("fps=%s,scale=%d:-2,tile=%dx%d", fps, width, cols, rows) + vf := fmt.Sprintf("fps=%s:eof_action=pass,scale=%d:-2,tile=%dx%d", fps, width, cols, rows) args := []string{ "-nostdin", "-loglevel", "error", "-y", + // Decoder-level keyframe-only mode — must precede -i (input option). + "-skip_frame", "nokey", "-i", mediaPath, "-frames:v", "1", "-vf", vf,