From eb2548f9a622eed33880399adee978c2375e74ec Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Thu, 7 May 2026 23:55:05 +0200 Subject: [PATCH] feat(streaming): seek-restart, single-session, idle sweeper, probe.json Follow-ups on the daemon HLS pipeline (0fc0e1c): - engine/hls.go HLSSession.Register now closes every other active session in the registry. Modeled as "one viewer == one transcode" so repeated quality switches or page reloads don't leave orphan ffmpegs saturating the CPU until the idle sweeper reaps them 30 min later. - engine/hls.go restartFromSegment kills + respawns ffmpeg with -ss / -output_ts_offset / -start_number when the browser asks for a segment far ahead of the writer head. Segments already on disk stay cached. Without this, a user dragging the scrubber to minute 30 of a fresh stream blocks until the encoder reaches minute 30 in real time. - engine/hls.go subtitle disambiguation: never set DEFAULT=YES on any rendition (anime forced "signs only" tracks were autoselected and rendered nothing during opening dialogue, looking broken). Names get numeric suffixes when language is duplicated; FORCED tracks get a "(forzados)" suffix. - engine/hls.go ProbeInfo() exposes codec / audio / subtitle metadata to the new GET /hls//probe.json endpoint for the player's info badge + bandwidth logic. - engine/hls.go scale chain fix: chains a trunc(iw/2)*2 scale after the height cap so libx264 stops rejecting odd widths (853x480 etc.). - engine/hls.go HW encoder tuning: NVENC -preset p4 -rc vbr -tune hq, QSV -preset medium. - engine/stream_server.go routes /hls//probe.json to the session. - cmd/daemon.go runs an idle sweeper goroutine every 5 min, reaping sessions whose last segment fetch was >30 min ago. --- internal/cmd/daemon.go | 19 ++ internal/engine/hls.go | 297 ++++++++++++++++++++++++++++--- internal/engine/stream_server.go | 4 + 3 files changed, 295 insertions(+), 25 deletions(-) diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 2710e20..b284cc4 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -513,6 +513,25 @@ func runDaemonStart() error { } }() + // Periodic HLS session sweeper (every 5 min). Closes sessions whose last + // segment fetch was over 30 min ago — kills the orphan ffmpeg + removes + // the per-session tmpdir, so a tab that died mid-stream doesn't leak + // disk space until daemon shutdown. + go func() { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if n := streamSrv.HLS().SweepIdle(); n > 0 { + log.Printf("[hls] swept %d idle session(s)", n) + } + case <-ctx.Done(): + return + } + } + }() + // Start auto-scan goroutine scanPaths := daemonCfg.ScanPaths if len(scanPaths) > 0 && cfg.Library.AutoScan { diff --git a/internal/engine/hls.go b/internal/engine/hls.go index 9ba0235..bd0853c 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -68,6 +68,14 @@ type HLSSessionConfig struct { } // HLSSession owns a tmpdir + ffmpeg subprocess producing HLS fragments. +// +// Seek behaviour: ffmpeg writes segments sequentially from `ffmpegSegStart`. +// When a handler asks for a segment far ahead of the writer, the daemon +// kills the current ffmpeg and restarts it with `-ss +// -output_ts_offset -start_number ` so the next segments +// it emits land at the requested timeline position. Segments already on +// disk before the seek stay there; the new ffmpeg only writes from the +// target index forward. type HLSSession struct { cfg HLSSessionConfig probe *StreamProbe @@ -78,12 +86,13 @@ type HLSSession struct { manifestVideo string // pre-rendered video media playlist manifestRoot string // pre-rendered master playlist - mu sync.Mutex - cmd *exec.Cmd - cancel context.CancelFunc - closed bool - startedAt time.Time - lastTouch time.Time + mu sync.Mutex + cmd *exec.Cmd + cancel context.CancelFunc + closed bool + startedAt time.Time + lastTouch time.Time + ffmpegSegStart int // index of the first segment the current ffmpeg writes // readyCond + readyMax track which segments ffmpeg has finished writing. // Handlers waiting on a future segment block on readyCond until the @@ -95,6 +104,12 @@ type HLSSession struct { readyCh chan struct{} // closed + replaced each time readyMax advances } +// hlsSeekAhead is how many segments past the writer's current position the +// browser is allowed to request before we restart ffmpeg from the requested +// segment. 8 segments * 4 s = 32 s of "warm" buffer; further seeks trigger +// a restart instead of waiting through real-time encode. +const hlsSeekAhead = 8 + // HLSSessionRegistry tracks active sessions keyed by ID. type HLSSessionRegistry struct { mu sync.RWMutex @@ -115,13 +130,27 @@ func (r *HLSSessionRegistry) Get(id string) *HLSSession { // Register adds a session under its ID. Replaces any previous session with // the same ID (which is closed first to release ffmpeg + tmpdir). +// +// Also closes EVERY OTHER active session, since one daemon == one viewer == +// one stream at a time. Without this, repeatedly opening the player (or +// changing quality) leaves orphan ffmpegs running until the 30 min idle +// sweeper reaps them, and N concurrent transcodes saturate the CPU. func (r *HLSSessionRegistry) Register(s *HLSSession) { r.mu.Lock() - defer r.mu.Unlock() - if prev, ok := r.sessions[s.cfg.SessionID]; ok { - _ = prev.Close() + stale := make([]*HLSSession, 0, len(r.sessions)) + for id, prev := range r.sessions { + if id == s.cfg.SessionID { + stale = append(stale, prev) + continue + } + stale = append(stale, prev) + delete(r.sessions, id) } r.sessions[s.cfg.SessionID] = s + r.mu.Unlock() + for _, prev := range stale { + _ = prev.Close() + } } // Remove drops a session from the registry without closing it. @@ -249,6 +278,47 @@ func shortHLSID(id string) string { return id } +// ProbeInfo returns a JSON-friendly summary of the source media so the web +// player can render quality / codec / track info without re-probing. +func (s *HLSSession) ProbeInfo() map[string]any { + if s.probe == nil { + return map[string]any{} + } + audios := make([]map[string]any, 0, len(s.probe.AudioTracks)) + for _, a := range s.probe.AudioTracks { + audios = append(audios, map[string]any{ + "index": a.Index, + "lang": a.Lang, + "codec": a.Codec, + "channels": a.Channels, + "title": a.Title, + "default": a.Default, + }) + } + subs := make([]map[string]any, 0, len(s.probe.SubtitleTracks)) + for _, sb := range s.probe.SubtitleTracks { + subs = append(subs, map[string]any{ + "index": sb.Index, + "lang": sb.Lang, + "codec": sb.Codec, + "title": sb.Title, + "forced": sb.Forced, + "text": sb.IsTextSubtitle(), + }) + } + return map[string]any{ + "videoCodec": s.probe.VideoCodec, + "width": s.probe.Width, + "height": s.probe.Height, + "bitDepth": s.probe.BitDepth, + "hdr": s.probe.HDR, + "durationSec": s.probe.DurationSec, + "container": s.probe.Container, + "audio": audios, + "subtitles": subs, + } +} + // MasterPlaylist returns the rendered master.m3u8 contents. func (s *HLSSession) MasterPlaylist() string { return s.manifestRoot } @@ -458,22 +528,129 @@ func (s *HLSSession) ServeInit(w http.ResponseWriter, r *http.Request) { // ServeSegment writes the requested video segment, blocking until ffmpeg // produces it (capped by waitForSegment timeout). +// +// Seek-restart: if the requested segment is far ahead of where the current +// ffmpeg writer is producing AND it's not already on disk, we kill ffmpeg +// and restart it from the requested position. Without this, a user dragging +// the scrubber to minute 30 would block until the encoder reaches minute 30 +// in real time (~25 minutes wait at 1080p software encode). func (s *HLSSession) ServeSegment(w http.ResponseWriter, r *http.Request, idx int) { s.Touch() if idx < 0 || idx >= s.segmentCount { http.Error(w, "segment out of range", http.StatusNotFound) return } + + path := filepath.Join(s.tmpDir, "video", fmt.Sprintf("seg-%d.m4s", idx)) + // Fast path: file already on disk (either current writer reached it, or + // a previous session left it there before a seek-restart). + if fi, err := os.Stat(path); err == nil && fi.Size() > 0 { + w.Header().Set("Content-Type", "video/mp4") + w.Header().Set("Cache-Control", "max-age=3600") + http.ServeFile(w, r, path) + return + } + + // Decide if we should restart ffmpeg from the requested segment. Check + // segStart vs idx — if the gap is wider than hlsSeekAhead and the file + // isn't on disk, the writer would take too long to reach it. + s.mu.Lock() + segStart := s.ffmpegSegStart + s.mu.Unlock() + s.readyMu.Lock() + readyMax := s.readyMax + s.readyMu.Unlock() + + if idx >= readyMax+hlsSeekAhead || idx < segStart { + if err := s.restartFromSegment(idx); err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + } + if err := s.waitForSegment(r.Context(), idx); err != nil { http.Error(w, err.Error(), http.StatusServiceUnavailable) return } - path := filepath.Join(s.tmpDir, "video", fmt.Sprintf("seg-%d.m4s", idx)) w.Header().Set("Content-Type", "video/mp4") w.Header().Set("Cache-Control", "max-age=3600") http.ServeFile(w, r, path) } +// restartFromSegment kills the current ffmpeg, then spawns a new one whose +// `-ss` offset corresponds to segment `targetIdx`. The caller must NOT hold +// s.mu when calling — the function takes both s.mu and s.readyMu. +func (s *HLSSession) restartFromSegment(targetIdx int) error { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return errors.New("hls: session closed") + } + if targetIdx == s.ffmpegSegStart && !s.exited { + // Already writing from this point — nothing to do. + s.mu.Unlock() + return nil + } + prevCancel := s.cancel + prevCmd := s.cmd + s.mu.Unlock() + + if prevCancel != nil { + prevCancel() + } + if prevCmd != nil && prevCmd.Process != nil { + _ = prevCmd.Process.Kill() + } + // Wait for old ffmpeg to exit so its file handles release. waitFFmpeg + // (the original goroutine) sets s.exited = true; poll until it does. + deadline := time.Now().Add(5 * time.Second) + for { + s.readyMu.Lock() + exited := s.exited + s.readyMu.Unlock() + if exited { + break + } + if time.Now().After(deadline) { + break // proceed anyway; new ffmpeg will overwrite + } + time.Sleep(50 * time.Millisecond) + } + + // Build args for the new ffmpeg with -ss offset. + startSec := float64(targetIdx * hlsSegmentDuration) + args := buildHLSFFmpegArgsAt(s.cfg, s.probe, s.tmpDir, targetIdx, startSec) + + ffCtx, cancel := context.WithCancel(context.Background()) + cmd := exec.CommandContext(ffCtx, s.cfg.Transcode.FFmpegPath, args...) + cmd.Stderr = &hlsStderrCapture{owner: s} + if err := cmd.Start(); err != nil { + cancel() + return fmt.Errorf("hls: restart ffmpeg: %w", err) + } + + // Reset session state so the poll + wait machinery picks up the new run. + s.mu.Lock() + s.cmd = cmd + s.cancel = cancel + s.ffmpegSegStart = targetIdx + s.mu.Unlock() + + s.readyMu.Lock() + s.readyMax = targetIdx // new writer's segments start at targetIdx + s.exited = false + s.exitErr = nil + s.readyCh = make(chan struct{}) + s.readyMu.Unlock() + + go s.waitFFmpeg() + go s.pollSegments(ffCtx) + + log.Printf("[hls %s] restarted ffmpeg at segment %d (%.1fs)", + shortHLSID(s.cfg.SessionID), targetIdx, startSec) + return nil +} + // ServeSubtitle writes the WebVTT subtitle for the requested track index, if // extraction has finished. func (s *HLSSession) ServeSubtitle(w http.ResponseWriter, r *http.Request, idx int) { @@ -501,11 +678,16 @@ func (s *HLSSession) ServeSubtitle(w http.ResponseWriter, r *http.Request, idx i // ---- ffmpeg argument builders ---- -// buildHLSFFmpegArgs returns the argv for the main HLS encode. It always -// re-encodes video + audio so segment boundaries align with -force_key_frames. -// Pure -c copy can be added later for h264+aac+mp4 sources where the GOP is -// already short enough; keeping it simple for the MVP. +// buildHLSFFmpegArgs returns the argv for the initial HLS encode (start at 0). func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) []string { + return buildHLSFFmpegArgsAt(cfg, probe, tmpDir, 0, 0) +} + +// buildHLSFFmpegArgsAt returns the argv for an HLS encode that starts at the +// given segment index (`-ss `) and writes segments numbered from +// startIdx so they slot into the existing manifest at the correct position. +// `-output_ts_offset` keeps the segment PTS aligned with manifest timeline. +func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string, startIdx int, startSec float64) []string { hwHint := cfg.Transcode.HWAccel args := []string{"-y", "-hide_banner", "-loglevel", "warning"} @@ -520,8 +702,19 @@ func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) // No demuxer-side hint. } + // Seek before -i for fast keyframe-aligned start. The new ffmpeg writes + // segments with PTS shifted via -output_ts_offset so the manifest's + // pre-computed segment numbering still matches the timeline. + if startSec > 0 { + args = append(args, "-ss", strconv.FormatFloat(startSec, 'f', 3, 64)) + } + args = append(args, "-i", cfg.SourcePath) + if startSec > 0 { + args = append(args, "-output_ts_offset", strconv.FormatFloat(startSec, 'f', 3, 64)) + } + // Map video + selected audio. Always use first video stream. args = append(args, "-map", "0:v:0") audioIdx := cfg.AudioIndex @@ -539,12 +732,21 @@ func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) // Video encode. codec := hwHint.FFmpegVideoCodec("h264") args = append(args, "-c:v", codec) - if codec == "libx264" { + // Encoder-specific tuning. Each HW encoder takes a different "preset" + // vocabulary; libx264 uses ultrafast→placebo, NVENC uses p1→p7, QSV uses + // veryfast→veryslow, VAAPI/VideoToolbox don't expose presets. + switch codec { + case "libx264": preset := cfg.Transcode.Preset if preset == "" { preset = "veryfast" } args = append(args, "-preset", preset) + case "h264_nvenc": + // p4 = balanced quality/speed; p1 fastest, p7 highest quality. + args = append(args, "-preset", "p4", "-rc", "vbr", "-tune", "hq") + case "h264_qsv": + args = append(args, "-preset", "medium", "-look_ahead", "0") } args = append(args, "-profile:v", "main", "-level:v", "4.0") @@ -562,6 +764,13 @@ func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) args = append(args, "-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", hlsSegmentDuration)) // Filter chain: optional scale, force 8-bit yuv420p, normalise color metadata. + // + // Width-rounding pitfall: `scale=-2:H` alone derives width from `H * dar` and + // rounds to the nearest multiple of 2, which is correct. But adding + // `force_original_aspect_ratio=decrease` makes ffmpeg ignore the `-2` and + // emit the exact computed width — which can be odd (e.g. 853×480) and + // libx264 then refuses to open. We chain a second `scale=trunc(iw/2)*2:...` + // after the cap to guarantee even dimensions before format/setparams. maxH := qcap.MaxHeight if maxH == 0 { maxH = cfg.Transcode.MaxHeight @@ -569,11 +778,11 @@ func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) var filterChain string if maxH > 0 && probe.Height > maxH { filterChain = fmt.Sprintf( - "scale=-2:%d:force_original_aspect_ratio=decrease,format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv", + "scale=-2:%d:force_original_aspect_ratio=decrease,scale=trunc(iw/2)*2:trunc(ih/2)*2,format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv", maxH, ) } else { - filterChain = "format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv" + filterChain = "scale=trunc(iw/2)*2:trunc(ih/2)*2,format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv" } args = append(args, "-vf", filterChain) @@ -590,16 +799,22 @@ func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) ) // HLS muxer — fmp4 segments with pre-computed segment count. + // `-start_number` slots seg-N.m4s where N matches the segment index in + // the pre-rendered manifest. Each ffmpeg writes its own ffmpeg.m3u8 but + // we never serve it — the rendered VOD manifest already knows everything. videoDir := filepath.Join(tmpDir, "video") + manifestName := fmt.Sprintf("ffmpeg-%d.m3u8", startIdx) args = append(args, "-f", "hls", "-hls_time", strconv.Itoa(hlsSegmentDuration), "-hls_playlist_type", "vod", "-hls_segment_type", "fmp4", "-hls_list_size", "0", + "-hls_flags", "independent_segments", + "-start_number", strconv.Itoa(startIdx), "-hls_fmp4_init_filename", "init.mp4", "-hls_segment_filename", filepath.Join(videoDir, "seg-%d.m4s"), - filepath.Join(videoDir, "ffmpeg.m3u8"), + filepath.Join(videoDir, manifestName), ) return args } @@ -671,8 +886,22 @@ func renderMasterPlaylist(probe *StreamProbe, qualityLabel string) string { b.WriteString("#EXTM3U\n") b.WriteString("#EXT-X-VERSION:7\n") - // Subtitle renditions. + // Never set DEFAULT=YES on any subtitle rendition. Anime files routinely + // ship with a forced "signs only" English track that has cues only every + // few minutes (movie titles, location captions, etc.); auto-enabling + // that track makes users think the subtitles are broken because nothing + // renders during opening dialogue. Forcing the user to pick the track + // explicitly avoids the false-negative report and is in line with how + // Plex/Jellyfin ship by default. The HLS spec also requires at most + // one DEFAULT per GROUP-ID; "none" trivially satisfies it. + defaultIdx := -1 + + // Subtitle renditions. Names disambiguate when several tracks share the + // same language: a numeric suffix gets appended ("ES", "ES 2"...) so the + // captions menu in the player isn't a list of duplicate "Nirvana sub" + // entries. hasSubs := false + langCounts := make(map[string]int) for i, s := range probe.SubtitleTracks { if !s.IsTextSubtitle() { continue @@ -682,17 +911,35 @@ func renderMasterPlaylist(probe *StreamProbe, qualityLabel string) string { if lang == "" { lang = "und" } - name := s.Title - if name == "" { - name = strings.ToUpper(lang) + // Build a unique display name: title if present, else lang code, + // disambiguated with a numeric suffix when duplicated. + base := s.Title + if base == "" { + base = strings.ToUpper(lang) + } + key := strings.ToLower(base) + langCounts[key]++ + name := base + if langCounts[key] > 1 { + name = fmt.Sprintf("%s %d", base, langCounts[key]) + } + if s.Forced { + name = name + " (forzados)" } def := "NO" - if s.Forced || i == 0 { + if i == defaultIdx { def = "YES" } + // AUTOSELECT only on the default — otherwise some players activate + // every track tagged AUTOSELECT for the user's preferred language, + // stacking captions on screen. + auto := "NO" + if i == defaultIdx { + auto = "YES" + } b.WriteString(fmt.Sprintf( - `#EXT-X-MEDIA:TYPE=SUBTITLES,GROUP-ID="subs",NAME=%q,LANGUAGE=%q,DEFAULT=%s,AUTOSELECT=YES,FORCED=%s,URI="subs/sub-%d.m3u8"`+"\n", - name, lang, def, ynBool(s.Forced), i, + `#EXT-X-MEDIA:TYPE=SUBTITLES,GROUP-ID="subs",NAME=%q,LANGUAGE=%q,DEFAULT=%s,AUTOSELECT=%s,FORCED=%s,URI="subs/sub-%d.m3u8"`+"\n", + name, lang, def, auto, ynBool(s.Forced), i, )) } diff --git a/internal/engine/stream_server.go b/internal/engine/stream_server.go index 82b0a29..2e42868 100644 --- a/internal/engine/stream_server.go +++ b/internal/engine/stream_server.go @@ -317,6 +317,10 @@ func (ss *StreamServer) hlsHandler(w http.ResponseWriter, r *http.Request) { switch { case resource == "master.m3u8": session.ServeMaster(w, r) + case resource == "probe.json": + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Cache-Control", "no-cache") + _ = json.NewEncoder(w).Encode(session.ProbeInfo()) case resource == "video/index.m3u8": session.ServeVideoPlaylist(w, r) case resource == "video/init.mp4":