fix(hls): los prewarms ya no desalojan la sesión del espectador + trickplay 12x
- StreamSession.Prewarm → HLSSessionConfig.Prewarm: el daemon difiere el
encode de un prewarm hasta que no haya encode vivo (poll 10s, tope
30min) y lo registra vía RegisterKeep (side-by-side, sin desalojar).
Antes todo pasaba por Register(), que cierra las demás sesiones — un
prewarm de next-episode reclamado en mitad de la reproducción mataba
el stream del usuario ("closed (cache discarded)" → master 404,
verificado 2026-06-10). Una sesión REAL nueva primero reapea los
prewarms en vuelo (CloseWhere(IsPrewarm)) para liberar el writer-lock
de la caché — un prewarm SELLADO sobrevive como cache HIT — y luego
desaloja normal vía Register.
- Trickplay: -skip_frame nokey + fps=...:eof_action=pass — solo
decodifica keyframes (12x menos CPU medido: 233s→19s en un episodio
de 24min 1080p; importa porque corre junto al streaming en vivo).
Los ticks siguen siendo uniformes (fps repite el último keyframe),
así que manifest y clientes cacheados no cambian. eof_action=pass
cubre clips con un único keyframe (el filtro fps no emite nada de un
stream de 1 frame con el eof por defecto).
This commit is contained in:
parent
9b97aedfe4
commit
f9ecd5ed82
5 changed files with 228 additions and 5 deletions
|
|
@ -485,6 +485,13 @@ type StreamSession struct {
|
||||||
// slow resume). 0/omitted = start at the beginning. Older daemons simply
|
// slow resume). 0/omitted = start at the beginning. Older daemons simply
|
||||||
// don't decode the field and keep the old start-at-0 behaviour.
|
// don't decode the field and keep the old start-at-0 behaviour.
|
||||||
StartSec float64 `json:"startSec,omitempty"`
|
StartSec float64 `json:"startSec,omitempty"`
|
||||||
|
// Prewarm marks a background cache-fill session (next-episode prewarm,
|
||||||
|
// hover prewarm): the daemon must encode it WITHOUT displacing the
|
||||||
|
// viewer's live session — it waits until the active encode finishes and
|
||||||
|
// registers alongside instead of evicting (Register kills every other
|
||||||
|
// session; a prewarm claimed mid-playback used to kill the stream the
|
||||||
|
// user was watching). False/omitted = a real viewer session.
|
||||||
|
Prewarm bool `json:"prewarm,omitempty"`
|
||||||
// PlayMethod is how the daemon should serve this session:
|
// PlayMethod is how the daemon should serve this session:
|
||||||
// "" — default (HLS transcode); also what legacy servers send.
|
// "" — default (HLS transcode); also what legacy servers send.
|
||||||
// "direct" — the source is already browser-native (the web decided this
|
// "direct" — the source is already browser-native (the web decided this
|
||||||
|
|
|
||||||
|
|
@ -716,9 +716,49 @@ func runDaemonStart() error {
|
||||||
// wires it into the StreamServer. Shared by the local-file HLS path and
|
// wires it into the StreamServer. Shared by the local-file HLS path and
|
||||||
// the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe
|
// the debrid HLS-from-URL path (hueco #2 / 2b) so both register, probe
|
||||||
// off the sync loop, and report readiness identically.
|
// off the sync loop, and report readiness identically.
|
||||||
|
//
|
||||||
|
// Prewarm sessions (background cache-fill: next-episode, hover) take a
|
||||||
|
// deferential path: wait until no live encode is running (never steal
|
||||||
|
// the encoder from the viewer), then register WITHOUT displacing other
|
||||||
|
// sessions. Before this, a prewarm claimed mid-playback went through
|
||||||
|
// Register() and KILLED the stream the user was watching (verified
|
||||||
|
// 2026-06-10: prewarm started → live session "closed (cache
|
||||||
|
// discarded)" → player 404).
|
||||||
startHLSPlayback := func(hlsCfg engine.HLSSessionConfig, hlsCtx context.Context, hlsCancel context.CancelFunc) {
|
startHLSPlayback := func(hlsCfg engine.HLSSessionConfig, hlsCtx context.Context, hlsCancel context.CancelFunc) {
|
||||||
playerSessionRegistry.add(hlsCfg.SessionID, hlsCancel)
|
playerSessionRegistry.add(hlsCfg.SessionID, hlsCancel)
|
||||||
|
prewarm := sess.Prewarm
|
||||||
go func() {
|
go func() {
|
||||||
|
if prewarm {
|
||||||
|
// Defer until the encoder is free. Poll is cheap (10 s);
|
||||||
|
// cap the wait at 30 min — a prewarm that can't start
|
||||||
|
// within an episode's runtime has lost its purpose.
|
||||||
|
deadline := time.Now().Add(30 * time.Minute)
|
||||||
|
for streamSrv.HLS().HasLiveEncode() {
|
||||||
|
if time.Now().After(deadline) || hlsCtx.Err() != nil {
|
||||||
|
playerSessionRegistry.remove(hlsCfg.SessionID)
|
||||||
|
hlsCancel()
|
||||||
|
log.Printf("[hls %s] prewarm abandoned (encoder busy %s)",
|
||||||
|
agent.ShortID(hlsCfg.SessionID), "30m")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-hlsCtx.Done():
|
||||||
|
playerSessionRegistry.remove(hlsCfg.SessionID)
|
||||||
|
return
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// REAL session: reap in-flight prewarm encodes BEFORE
|
||||||
|
// StartHLSSession so the per-key cache writer-lock is
|
||||||
|
// free and the viewer's encode lands in the persistent
|
||||||
|
// cache (not an uncached tmpdir). A SEALED prewarm is
|
||||||
|
// unaffected — this session simply cache-HITs it.
|
||||||
|
if n := streamSrv.HLS().CloseWhere(func(s *engine.HLSSession) bool { return s.IsPrewarm() }); n > 0 {
|
||||||
|
log.Printf("[hls %s] reaped %d in-flight prewarm(s) for the viewer session",
|
||||||
|
agent.ShortID(hlsCfg.SessionID), n)
|
||||||
|
}
|
||||||
|
}
|
||||||
hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg)
|
hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
playerSessionRegistry.remove(hlsCfg.SessionID)
|
playerSessionRegistry.remove(hlsCfg.SessionID)
|
||||||
|
|
@ -726,6 +766,14 @@ func runDaemonStart() error {
|
||||||
log.Printf("[hls %s] start failed: %v", agent.ShortID(hlsCfg.SessionID), err)
|
log.Printf("[hls %s] start failed: %v", agent.ShortID(hlsCfg.SessionID), err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if prewarm {
|
||||||
|
// Side-by-side: never evict the viewer's session. A later
|
||||||
|
// REAL session still evicts this one via Register — by
|
||||||
|
// then the encode is usually sealed in the segment cache.
|
||||||
|
streamSrv.HLS().RegisterKeep(hsess)
|
||||||
|
log.Printf("[hls %s] prewarm encoding: %s", agent.ShortID(hlsCfg.SessionID), hlsCfg.FileName)
|
||||||
|
return // no viewer waiting → no ready-watcher
|
||||||
|
}
|
||||||
streamSrv.HLS().Register(hsess)
|
streamSrv.HLS().Register(hsess)
|
||||||
go watchSessionReady(hlsCtx, agentClient, hsess, hlsCfg.SessionID)
|
go watchSessionReady(hlsCtx, agentClient, hsess, hlsCfg.SessionID)
|
||||||
}()
|
}()
|
||||||
|
|
@ -791,6 +839,7 @@ func runDaemonStart() error {
|
||||||
AudioIndex: sess.AudioIndex,
|
AudioIndex: sess.AudioIndex,
|
||||||
BurnSubtitleIndex: sess.BurnSubtitleIndex,
|
BurnSubtitleIndex: sess.BurnSubtitleIndex,
|
||||||
StartSec: sess.StartSec,
|
StartSec: sess.StartSec,
|
||||||
|
Prewarm: sess.Prewarm,
|
||||||
Transcode: tcRuntime,
|
Transcode: tcRuntime,
|
||||||
Cache: hlsCache,
|
Cache: hlsCache,
|
||||||
// 2c: refresh the debrid link if it expires mid-transcode; the
|
// 2c: refresh the debrid link if it expires mid-transcode; the
|
||||||
|
|
@ -927,6 +976,7 @@ func runDaemonStart() error {
|
||||||
AudioIndex: sess.AudioIndex,
|
AudioIndex: sess.AudioIndex,
|
||||||
BurnSubtitleIndex: sess.BurnSubtitleIndex,
|
BurnSubtitleIndex: sess.BurnSubtitleIndex,
|
||||||
StartSec: sess.StartSec,
|
StartSec: sess.StartSec,
|
||||||
|
Prewarm: sess.Prewarm,
|
||||||
Transcode: tcRuntime,
|
Transcode: tcRuntime,
|
||||||
Cache: hlsCache,
|
Cache: hlsCache,
|
||||||
}, hlsCtx, hlsCancel)
|
}, hlsCtx, hlsCancel)
|
||||||
|
|
|
||||||
|
|
@ -180,6 +180,11 @@ type HLSSessionConfig struct {
|
||||||
// segment (double spawn, slow resume). 0 = start at the beginning.
|
// segment (double spawn, slow resume). 0 = start at the beginning.
|
||||||
// Ignored on a cache HIT (every segment is already on disk).
|
// Ignored on a cache HIT (every segment is already on disk).
|
||||||
StartSec float64
|
StartSec float64
|
||||||
|
// Prewarm marks a background cache-fill session. The daemon defers its
|
||||||
|
// encode until no live encode runs and registers it via RegisterKeep
|
||||||
|
// (never evicting the viewer). It also lets a REAL session close stale
|
||||||
|
// prewarms up front so the cache writer-lock is free for the viewer.
|
||||||
|
Prewarm bool
|
||||||
Transcode TranscodeRuntime
|
Transcode TranscodeRuntime
|
||||||
// Cache is an optional persistent segment cache keyed by (source, quality,
|
// Cache is an optional persistent segment cache keyed by (source, quality,
|
||||||
// audio). When set, completed encodes are kept across sessions so re-plays
|
// audio). When set, completed encodes are kept across sessions so re-plays
|
||||||
|
|
@ -341,6 +346,63 @@ func (r *HLSSessionRegistry) Register(s *HLSSession) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CloseWhere closes + removes every registered session matching pred. Used
|
||||||
|
// by the REAL-session path to reap stale prewarm encodes BEFORE its own
|
||||||
|
// StartHLSSession runs — that frees the per-key cache writer-lock, so the
|
||||||
|
// viewer's encode lands in the persistent cache instead of falling back to
|
||||||
|
// an uncached per-session tmpdir (and a SEALED prewarm survives as a cache
|
||||||
|
// HIT: closing a from-cache reader never invalidates the entry).
|
||||||
|
func (r *HLSSessionRegistry) CloseWhere(pred func(*HLSSession) bool) int {
|
||||||
|
r.mu.Lock()
|
||||||
|
victims := make([]*HLSSession, 0, len(r.sessions))
|
||||||
|
for id, s := range r.sessions {
|
||||||
|
if pred(s) {
|
||||||
|
victims = append(victims, s)
|
||||||
|
delete(r.sessions, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.mu.Unlock()
|
||||||
|
for _, s := range victims {
|
||||||
|
_ = s.Close()
|
||||||
|
}
|
||||||
|
return len(victims)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsPrewarm reports whether this session was started as a background
|
||||||
|
// cache-fill (HLSSessionConfig.Prewarm). cfg is immutable after construction.
|
||||||
|
func (s *HLSSession) IsPrewarm() bool { return s.cfg.Prewarm }
|
||||||
|
|
||||||
|
// RegisterKeep adds a session WITHOUT displacing the others — the prewarm
|
||||||
|
// path: a background cache-fill encode must not evict the viewer's live
|
||||||
|
// session (Register's eviction killed the stream being watched when the
|
||||||
|
// next-episode prewarm got claimed mid-playback). It still replaces (and
|
||||||
|
// closes) a previous session with the SAME ID. A later Register() of a real
|
||||||
|
// viewer session evicts prewarms like any other session — a completed
|
||||||
|
// (sealed) prewarm survives in the segment cache either way.
|
||||||
|
func (r *HLSSessionRegistry) RegisterKeep(s *HLSSession) {
|
||||||
|
r.mu.Lock()
|
||||||
|
prev := r.sessions[s.cfg.SessionID]
|
||||||
|
r.sessions[s.cfg.SessionID] = s
|
||||||
|
r.mu.Unlock()
|
||||||
|
if prev != nil && prev != s {
|
||||||
|
_ = prev.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasLiveEncode reports whether any registered session still has a RUNNING
|
||||||
|
// ffmpeg (encode not finished). Used to defer prewarm encodes so they never
|
||||||
|
// compete with the viewer's live transcode for the encoder.
|
||||||
|
func (r *HLSSessionRegistry) HasLiveEncode() bool {
|
||||||
|
r.mu.RLock()
|
||||||
|
defer r.mu.RUnlock()
|
||||||
|
for _, s := range r.sessions {
|
||||||
|
if !s.EncodeExited() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Remove drops a session from the registry without closing it.
|
// Remove drops a session from the registry without closing it.
|
||||||
func (r *HLSSessionRegistry) Remove(id string) {
|
func (r *HLSSessionRegistry) Remove(id string) {
|
||||||
r.mu.Lock()
|
r.mu.Lock()
|
||||||
|
|
@ -666,6 +728,15 @@ func (s *HLSSession) ReadyCount() int {
|
||||||
return s.readyMax
|
return s.readyMax
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EncodeExited reports whether this session's ffmpeg has finished (clean or
|
||||||
|
// crashed) or never ran (cache HIT). False while an encode is producing
|
||||||
|
// segments. Used by HasLiveEncode to defer prewarm work.
|
||||||
|
func (s *HLSSession) EncodeExited() bool {
|
||||||
|
s.readyMu.Lock()
|
||||||
|
defer s.readyMu.Unlock()
|
||||||
|
return s.exited
|
||||||
|
}
|
||||||
|
|
||||||
// WriterStartIdx returns the segment index the CURRENT ffmpeg writer started
|
// WriterStartIdx returns the segment index the CURRENT ffmpeg writer started
|
||||||
// at: 0 for a from-the-beginning encode, the resume segment for a StartSec
|
// at: 0 for a from-the-beginning encode, the resume segment for a StartSec
|
||||||
// session, the seek target after a seek-restart. See ReadyCount for the
|
// session, the seek target after a seek-restart. See ReadyCount for the
|
||||||
|
|
|
||||||
80
internal/engine/hls_registry_prewarm_test.go
Normal file
80
internal/engine/hls_registry_prewarm_test.go
Normal file
|
|
@ -0,0 +1,80 @@
|
||||||
|
package engine
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
// bare session: no ffmpeg, no tmpdir — exercises pure registry semantics.
|
||||||
|
func bareSession(id string, prewarm bool, exited bool) *HLSSession {
|
||||||
|
s := &HLSSession{cfg: HLSSessionConfig{SessionID: id, Prewarm: prewarm}}
|
||||||
|
s.exited = exited
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// A prewarm registered via RegisterKeep must NOT evict the viewer's live
|
||||||
|
// session (the old Register-for-everything path killed the stream being
|
||||||
|
// watched when the next-episode prewarm got claimed mid-playback).
|
||||||
|
func TestRegisterKeepDoesNotEvict(t *testing.T) {
|
||||||
|
r := NewHLSSessionRegistry()
|
||||||
|
live := bareSession("live", false, false)
|
||||||
|
r.Register(live)
|
||||||
|
|
||||||
|
pre := bareSession("pre", true, false)
|
||||||
|
r.RegisterKeep(pre)
|
||||||
|
|
||||||
|
if r.Get("live") == nil {
|
||||||
|
t.Fatal("RegisterKeep evicted the live session")
|
||||||
|
}
|
||||||
|
if r.Get("pre") == nil {
|
||||||
|
t.Fatal("RegisterKeep did not register the prewarm")
|
||||||
|
}
|
||||||
|
if live.isClosed() {
|
||||||
|
t.Fatal("RegisterKeep closed the live session")
|
||||||
|
}
|
||||||
|
|
||||||
|
// A REAL session via Register still evicts everything (single viewer).
|
||||||
|
real2 := bareSession("real2", false, false)
|
||||||
|
r.Register(real2)
|
||||||
|
if r.Get("live") != nil || r.Get("pre") != nil {
|
||||||
|
t.Fatal("Register must evict every other session")
|
||||||
|
}
|
||||||
|
if !live.isClosed() || !pre.isClosed() {
|
||||||
|
t.Fatal("Register must close the evicted sessions")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCloseWherePrewarmsOnly(t *testing.T) {
|
||||||
|
r := NewHLSSessionRegistry()
|
||||||
|
live := bareSession("live", false, false)
|
||||||
|
pre1 := bareSession("pre1", true, false)
|
||||||
|
pre2 := bareSession("pre2", true, true)
|
||||||
|
r.Register(live)
|
||||||
|
r.RegisterKeep(pre1)
|
||||||
|
r.RegisterKeep(pre2)
|
||||||
|
|
||||||
|
n := r.CloseWhere(func(s *HLSSession) bool { return s.IsPrewarm() })
|
||||||
|
if n != 2 {
|
||||||
|
t.Fatalf("CloseWhere closed %d sessions, want 2", n)
|
||||||
|
}
|
||||||
|
if r.Get("live") == nil || live.isClosed() {
|
||||||
|
t.Fatal("CloseWhere must not touch the live session")
|
||||||
|
}
|
||||||
|
if r.Get("pre1") != nil || r.Get("pre2") != nil {
|
||||||
|
t.Fatal("CloseWhere must remove the prewarms from the registry")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHasLiveEncode(t *testing.T) {
|
||||||
|
r := NewHLSSessionRegistry()
|
||||||
|
if r.HasLiveEncode() {
|
||||||
|
t.Fatal("empty registry must report no live encode")
|
||||||
|
}
|
||||||
|
done := bareSession("done", false, true) // encode finished / cache HIT
|
||||||
|
r.Register(done)
|
||||||
|
if r.HasLiveEncode() {
|
||||||
|
t.Fatal("an exited encode must not count as live")
|
||||||
|
}
|
||||||
|
running := bareSession("running", true, false)
|
||||||
|
r.RegisterKeep(running)
|
||||||
|
if !r.HasLiveEncode() {
|
||||||
|
t.Fatal("a running encode must count as live")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -107,9 +107,16 @@ func ReadCachedTrickplay(mediaPath string, width int) (TrickplayManifest, bool)
|
||||||
// GenerateTrickplay builds the montage sprite + manifest for mediaPath and caches
|
// GenerateTrickplay builds the montage sprite + manifest for mediaPath and caches
|
||||||
// them in the sidecar dir. ONE ffmpeg pass samples a frame every intervalSec
|
// them in the sidecar dir. ONE ffmpeg pass samples a frame every intervalSec
|
||||||
// (fps=1/interval), scales each to width (even height), and tiles them into a
|
// (fps=1/interval), scales each to width (even height), and tiles them into a
|
||||||
// single JPEG. The whole file is decoded once — slow but a one-time, cached,
|
// single JPEG.
|
||||||
// scan-time cost (run with idle I/O priority by the prewarm), and it removes ALL
|
//
|
||||||
// live extraction during playback (no contention with the active stream).
|
// `-skip_frame nokey` makes the decoder touch ONLY keyframes — ~12× less CPU
|
||||||
|
// than the old full decode (measured 233 s → 19 s CPU on a 24-min 1080p
|
||||||
|
// episode), which matters because this runs alongside live streaming on the
|
||||||
|
// same box. The fps filter still emits one frame per UNIFORM tick (it
|
||||||
|
// repeats the latest keyframe for ticks between keyframes), so the manifest
|
||||||
|
// contract — tileIndex = floor(t / IntervalSec) — is unchanged and cached
|
||||||
|
// clients keep working; each tile just shows the nearest keyframe ≤ its
|
||||||
|
// tick (≤ one GOP off, invisible at 240-320 px scrub size).
|
||||||
//
|
//
|
||||||
// durationSec drives the grid size; pass the probed duration (0 → error, nothing
|
// durationSec drives the grid size; pass the probed duration (0 → error, nothing
|
||||||
// to sample). The caller owns the ctx deadline (generous at scan time).
|
// to sample). The caller owns the ctx deadline (generous at scan time).
|
||||||
|
|
@ -179,10 +186,18 @@ func GenerateTrickplay(ctx context.Context, ffmpegPath, mediaPath string, interv
|
||||||
tmpSprite := spritePath + ".tmp"
|
tmpSprite := spritePath + ".tmp"
|
||||||
|
|
||||||
// fps filter wants a rational; format 1/effInterval with enough precision.
|
// fps filter wants a rational; format 1/effInterval with enough precision.
|
||||||
|
// eof_action=pass: with -skip_frame nokey a short/all-inter clip can decode
|
||||||
|
// to a SINGLE keyframe, and fps's default eof handling emits zero frames
|
||||||
|
// from a one-frame stream (it never sees a later PTS to close the first
|
||||||
|
// tick) → "Nothing was written into output". pass flushes the last frame
|
||||||
|
// at EOF instead; on normal media it only matters at the very end, where
|
||||||
|
// -frames:v 1 + the tile grid already bound the output.
|
||||||
fps := fmt.Sprintf("1/%s", strconv.FormatFloat(effInterval, 'f', 3, 64))
|
fps := fmt.Sprintf("1/%s", strconv.FormatFloat(effInterval, 'f', 3, 64))
|
||||||
vf := fmt.Sprintf("fps=%s,scale=%d:-2,tile=%dx%d", fps, width, cols, rows)
|
vf := fmt.Sprintf("fps=%s:eof_action=pass,scale=%d:-2,tile=%dx%d", fps, width, cols, rows)
|
||||||
args := []string{
|
args := []string{
|
||||||
"-nostdin", "-loglevel", "error", "-y",
|
"-nostdin", "-loglevel", "error", "-y",
|
||||||
|
// Decoder-level keyframe-only mode — must precede -i (input option).
|
||||||
|
"-skip_frame", "nokey",
|
||||||
"-i", mediaPath,
|
"-i", mediaPath,
|
||||||
"-frames:v", "1",
|
"-frames:v", "1",
|
||||||
"-vf", vf,
|
"-vf", vf,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue