feat(stream): live transcode telemetry from ffmpeg speed=
Parse ffmpeg's -stats progress line (speed=Yx, fps=) from the HLS encoder's stderr into a per-session EWMA, and report a health snapshot to the web side a few seconds after seg-0. Lets the player name a too-slow transcode from a direct measurement (~5-7s) instead of inferring it from stall shape (~15-30s). - hls.go: add -stats; rewrite hlsStderrCapture.Write to frame on \r and \n, parse speed=/fps= (telemetry only, never logged), flag input-bound on source read errors. EWMA on HLSSession + GetTranscodeStats(); warmup-skip the first cold-start frames so a healthy encoder isn't reported as struggling. - client.go: MarkSessionReady takes an optional *SessionHealth. - daemon.go: watcher reports one health snapshot once >=4 post-warmup samples settle; classifyAgentHealth maps the speed ratio to ok/marginal/struggling. Additive: old web replicas ignore the extra field; cache-hit/direct-play sessions and short encodes report nil (the web keeps its stall heuristic).
This commit is contained in:
parent
2b47cb0656
commit
f14aee0b93
4 changed files with 335 additions and 24 deletions
|
|
@ -119,10 +119,11 @@ func (c *Client) ReportUpgradeResult(ctx context.Context, agentID string, succes
|
||||||
// will reach the same conclusion via HEAD probes anyway if this call
|
// will reach the same conclusion via HEAD probes anyway if this call
|
||||||
// fails. We log the error in the caller but don't retry — by the time
|
// fails. We log the error in the caller but don't retry — by the time
|
||||||
// a retry would land the user is likely already playing.
|
// a retry would land the user is likely already playing.
|
||||||
func (c *Client) MarkSessionReady(ctx context.Context, sessionID string) error {
|
func (c *Client) MarkSessionReady(ctx context.Context, sessionID string, health *SessionHealth) error {
|
||||||
req := struct {
|
req := struct {
|
||||||
SessionID string `json:"sessionId"`
|
SessionID string `json:"sessionId"`
|
||||||
}{SessionID: sessionID}
|
Health *SessionHealth `json:"health,omitempty"`
|
||||||
|
}{SessionID: sessionID, Health: health}
|
||||||
var resp StatusResponse
|
var resp StatusResponse
|
||||||
if err := c.doPost(ctx, "/api/internal/agent/session-ready", req, &resp); err != nil {
|
if err := c.doPost(ctx, "/api/internal/agent/session-ready", req, &resp); err != nil {
|
||||||
return fmt.Errorf("mark session ready: %w", err)
|
return fmt.Errorf("mark session ready: %w", err)
|
||||||
|
|
@ -130,6 +131,20 @@ func (c *Client) MarkSessionReady(ctx context.Context, sessionID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SessionHealth is an OPTIONAL live-transcode health snapshot attached to a
|
||||||
|
// session-ready report (F3). A nil *SessionHealth means the agent has no
|
||||||
|
// telemetry to share (cache hit, direct-play, or progress not yet stable) and
|
||||||
|
// the web side keeps its stall-shape heuristic. Old web replicas ignore the
|
||||||
|
// extra field; old agents simply never send it.
|
||||||
|
type SessionHealth struct {
|
||||||
|
// "ok" (≥ realtime) | "marginal" (keeps up barely) | "struggling" (can't).
|
||||||
|
Health string `json:"health"`
|
||||||
|
// ffmpeg speed= EWMA: 1.0 = exactly realtime, < 1.0 = slower than playback.
|
||||||
|
RealtimeRatio float64 `json:"realtimeRatio"`
|
||||||
|
// "realtime" | "transcode" (encoder is the wall) | "input_bound" (source read).
|
||||||
|
Reason string `json:"reason"`
|
||||||
|
}
|
||||||
|
|
||||||
// RefreshStreamURL re-resolves a fresh debrid direct URL for a live streaming
|
// RefreshStreamURL re-resolves a fresh debrid direct URL for a live streaming
|
||||||
// session (hueco #2 / 2c). Called by the daemon when a debrid source expires
|
// session (hueco #2 / 2c). Called by the daemon when a debrid source expires
|
||||||
// mid-stream (the link is time-limited; the content is still cached). Returns
|
// mid-stream (the link is time-limited; the content is still cached). Returns
|
||||||
|
|
|
||||||
|
|
@ -763,7 +763,7 @@ func runDaemonStart() error {
|
||||||
agent.ShortID(sess.SessionID), provider.FileName(), provider.FileSize())
|
agent.ShortID(sess.SessionID), provider.FileName(), provider.FileSize())
|
||||||
rctx, rcancel := context.WithTimeout(ctx, 10*time.Second)
|
rctx, rcancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
defer rcancel()
|
defer rcancel()
|
||||||
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
|
if err := agentClient.MarkSessionReady(rctx, sess.SessionID, nil); err != nil {
|
||||||
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
@ -858,7 +858,7 @@ func runDaemonStart() error {
|
||||||
go func() {
|
go func() {
|
||||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
|
if err := agentClient.MarkSessionReady(rctx, sess.SessionID, nil); err != nil {
|
||||||
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
@ -906,7 +906,7 @@ func runDaemonStart() error {
|
||||||
go func() {
|
go func() {
|
||||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil {
|
if err := agentClient.MarkSessionReady(rctx, sess.SessionID, nil); err != nil {
|
||||||
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
@ -1386,6 +1386,17 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.
|
||||||
deadline := time.Now().Add(60 * time.Second)
|
deadline := time.Now().Add(60 * time.Second)
|
||||||
ticker := time.NewTicker(200 * time.Millisecond)
|
ticker := time.NewTicker(200 * time.Millisecond)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
readyPosted := false
|
||||||
|
postReady := func(health *agent.SessionHealth) {
|
||||||
|
// Parent ctx so a session cancel mid-POST (user closed tab, daemon
|
||||||
|
// shutdown) tears down the in-flight webhook instead of blocking the
|
||||||
|
// goroutine for up to 10 s on a now-orphan call.
|
||||||
|
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
|
if err := client.MarkSessionReady(rctx, sessionID, health); err != nil {
|
||||||
|
log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
// Session torn down through a path that didn't cancel ctx (registry
|
// Session torn down through a path that didn't cancel ctx (registry
|
||||||
// replace, idle sweep, internal kill). Bail before polling further —
|
// replace, idle sweep, internal kill). Bail before polling further —
|
||||||
|
|
@ -1394,26 +1405,74 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.
|
||||||
if hsess.IsClosed() {
|
if hsess.IsClosed() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Cache HIT or seg-0 ready → notify + done.
|
// Phase 1: cache HIT or seg-0 ready → flip the "Preparando…" UI now.
|
||||||
if hsess.FromCache() || hsess.ReadyCount() >= 1 {
|
if !readyPosted && (hsess.FromCache() || hsess.ReadyCount() >= 1) {
|
||||||
// Parent ctx so a session cancel mid-POST (user closed tab,
|
postReady(nil)
|
||||||
// daemon shutdown) tears down the in-flight webhook instead of
|
readyPosted = true
|
||||||
// blocking the goroutine for up to 10 s on a now-orphan call.
|
// Cache replay has no live encode → no telemetry to report, done.
|
||||||
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
if hsess.FromCache() {
|
||||||
if err := client.MarkSessionReady(rctx, sessionID); err != nil {
|
|
||||||
log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
|
|
||||||
}
|
|
||||||
cancel()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
// Phase 2 (F3): once enough -stats samples accumulated (encoder past
|
||||||
|
// its cold ramp), report ONE live-health snapshot so the player can
|
||||||
|
// name a too-slow transcode in ~4s instead of inferring it from stalls.
|
||||||
|
// >=4 samples ≈ 2s of encoding past seg-0; the EWMA has settled by then.
|
||||||
|
if readyPosted {
|
||||||
|
if st := hsess.GetTranscodeStats(); st.Samples >= 4 {
|
||||||
|
postReady(classifyAgentHealth(st))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
}
|
}
|
||||||
if time.Now().After(deadline) {
|
if time.Now().After(deadline) {
|
||||||
|
if !readyPosted {
|
||||||
log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID))
|
log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// Ready but never got stable telemetry — report whatever we have so
|
||||||
|
// the player isn't left without a verdict (better partial than none).
|
||||||
|
if st := hsess.GetTranscodeStats(); st.Samples > 0 {
|
||||||
|
postReady(classifyAgentHealth(st))
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Realtime-ratio cutoffs for classifyAgentHealth. This is a cross-repo contract
|
||||||
|
// with the web bottleneck classifier (src/lib/stream/bottleneck-classifier.ts):
|
||||||
|
// - ≥ realtimeFloor → "ok" (encoder keeps up)
|
||||||
|
// - [strugglingFloor,..) → "marginal" (barely)
|
||||||
|
// - < strugglingFloor → "struggling" (can't) — the web fast-path commits
|
||||||
|
// the honest overlay + pauses on this WITHOUT waiting for a stall, so the
|
||||||
|
// floor is intentionally conservative (the web uses a looser 0.85 only once
|
||||||
|
// a stall has already corroborated the slowdown).
|
||||||
|
const (
|
||||||
|
agentRealtimeFloor = 0.95
|
||||||
|
agentStrugglingFloor = 0.75
|
||||||
|
)
|
||||||
|
|
||||||
|
// classifyAgentHealth turns a live ffmpeg telemetry snapshot into the health
|
||||||
|
// report the web side consumes (F3). The ×realtime speed is the load-bearing
|
||||||
|
// signal: < 1.0 means the encode can't keep up with playback. An input-bound
|
||||||
|
// hint (source read error) reclassifies the cause as the link, not the encoder.
|
||||||
|
func classifyAgentHealth(st engine.TranscodeStats) *agent.SessionHealth {
|
||||||
|
ratio := st.SpeedX
|
||||||
|
var health, reason string
|
||||||
|
switch {
|
||||||
|
case st.InputBound && ratio < agentRealtimeFloor:
|
||||||
|
health, reason = "struggling", "input_bound"
|
||||||
|
case ratio >= agentRealtimeFloor:
|
||||||
|
health, reason = "ok", "realtime"
|
||||||
|
case ratio >= agentStrugglingFloor:
|
||||||
|
health, reason = "marginal", "transcode"
|
||||||
|
default:
|
||||||
|
health, reason = "struggling", "transcode"
|
||||||
|
}
|
||||||
|
return &agent.SessionHealth{Health: health, RealtimeRatio: ratio, Reason: reason}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -27,6 +27,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
@ -254,6 +255,21 @@ type HLSSession struct {
|
||||||
cacheKey string
|
cacheKey string
|
||||||
fromCache bool
|
fromCache bool
|
||||||
writerLockHeld bool
|
writerLockHeld bool
|
||||||
|
|
||||||
|
// Live transcode telemetry (F3). ffmpeg's -stats progress line is parsed
|
||||||
|
// in hlsStderrCapture.Write into an EWMA of speed= (×realtime) + fps=, plus
|
||||||
|
// an input-bound hint set when the SOURCE read errors (slow/broken pull vs a
|
||||||
|
// too-slow encode). GetTranscodeStats() snapshots this so the ready-watcher
|
||||||
|
// can report a real measurement to the web side — letting the player name a
|
||||||
|
// too-slow transcode honestly in ~4s instead of inferring it from stall
|
||||||
|
// shape over 15-30s. Guarded by statsMu (the stderr goroutine writes; the
|
||||||
|
// watcher goroutine reads).
|
||||||
|
statsMu sync.Mutex
|
||||||
|
speedEWMA float64
|
||||||
|
fpsEWMA float64
|
||||||
|
speedSamples int
|
||||||
|
warmupSeen int // cold-start frames discarded before the EWMA is trusted
|
||||||
|
inputBound bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// hlsSeekAhead is how many segments past the writer's current position the
|
// hlsSeekAhead is how many segments past the writer's current position the
|
||||||
|
|
@ -595,6 +611,68 @@ func (s *HLSSession) ReadyCount() int {
|
||||||
// circuit polling — a cache HIT is ready the moment we return.
|
// circuit polling — a cache HIT is ready the moment we return.
|
||||||
func (s *HLSSession) FromCache() bool { return s.fromCache }
|
func (s *HLSSession) FromCache() bool { return s.fromCache }
|
||||||
|
|
||||||
|
// TranscodeStats is a point-in-time snapshot of live ffmpeg progress for one
|
||||||
|
// HLS session (F3). SpeedX < 1.0 means the encode runs slower than realtime —
|
||||||
|
// the player can't sustain playback without buffering. Samples==0 means no
|
||||||
|
// -stats line has been parsed yet (the watcher keeps waiting before reporting).
|
||||||
|
type TranscodeStats struct {
|
||||||
|
SpeedX float64 // EWMA of ffmpeg speed= (×realtime; 1.0 = exactly realtime)
|
||||||
|
Fps float64 // EWMA of ffmpeg fps=
|
||||||
|
Samples int // progress lines parsed so far (0 = no telemetry yet)
|
||||||
|
InputBound bool // source read hit I/O errors (slow/broken pull, not encode)
|
||||||
|
FromCache bool // replayed from cache → no live encode, stats meaningless
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTranscodeStats returns a snapshot of the parsed ffmpeg progress EWMAs.
|
||||||
|
func (s *HLSSession) GetTranscodeStats() TranscodeStats {
|
||||||
|
s.statsMu.Lock()
|
||||||
|
defer s.statsMu.Unlock()
|
||||||
|
return TranscodeStats{
|
||||||
|
SpeedX: s.speedEWMA,
|
||||||
|
Fps: s.fpsEWMA,
|
||||||
|
Samples: s.speedSamples,
|
||||||
|
InputBound: s.inputBound,
|
||||||
|
FromCache: s.fromCache,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// hlsStatsWarmupSkip is how many leading -stats frames to discard before
|
||||||
|
// trusting the EWMA. ffmpeg's first readings reflect the pipeline filling
|
||||||
|
// (often speed=0.0x) and would otherwise drag a healthy encoder into a false
|
||||||
|
// "struggling" verdict that pauses a stream which plays fine once warmed up.
|
||||||
|
const hlsStatsWarmupSkip = 2
|
||||||
|
|
||||||
|
// recordProgress folds one parsed ffmpeg -stats sample into the session EWMAs.
|
||||||
|
// alpha=0.3 smooths the noisy per-line numbers while still tracking a sustained
|
||||||
|
// slowdown within a few samples (~2s of encoding).
|
||||||
|
func (s *HLSSession) recordProgress(speedX, fps float64) {
|
||||||
|
s.statsMu.Lock()
|
||||||
|
defer s.statsMu.Unlock()
|
||||||
|
// Drop the cold-start frames so a steady-state slowdown — not the encoder
|
||||||
|
// spin-up — is what the watcher reports.
|
||||||
|
if s.warmupSeen < hlsStatsWarmupSkip {
|
||||||
|
s.warmupSeen++
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const alpha = 0.3
|
||||||
|
if s.speedSamples == 0 {
|
||||||
|
s.speedEWMA = speedX
|
||||||
|
s.fpsEWMA = fps
|
||||||
|
} else {
|
||||||
|
s.speedEWMA = alpha*speedX + (1-alpha)*s.speedEWMA
|
||||||
|
s.fpsEWMA = alpha*fps + (1-alpha)*s.fpsEWMA
|
||||||
|
}
|
||||||
|
s.speedSamples++
|
||||||
|
}
|
||||||
|
|
||||||
|
// markInputBound flags that ffmpeg reported a source-read error — the wall is
|
||||||
|
// the input pull (slow debrid link / dropped torrent peer), not the encoder.
|
||||||
|
func (s *HLSSession) markInputBound() {
|
||||||
|
s.statsMu.Lock()
|
||||||
|
s.inputBound = true
|
||||||
|
s.statsMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// IsClosed reports whether Close() has been invoked. Exposed (vs the
|
// IsClosed reports whether Close() has been invoked. Exposed (vs the
|
||||||
// internal isClosed) so external watchers — the ready-webhook
|
// internal isClosed) so external watchers — the ready-webhook
|
||||||
// goroutine in cmd/daemon.go — can short-circuit polling on a session
|
// goroutine in cmd/daemon.go — can short-circuit polling on a session
|
||||||
|
|
@ -1140,7 +1218,10 @@ func ResolveEncoderProfile(hw HWAccel, configuredPreset string) EncoderProfile {
|
||||||
// `-output_ts_offset` keeps the segment PTS aligned with manifest timeline.
|
// `-output_ts_offset` keeps the segment PTS aligned with manifest timeline.
|
||||||
func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string, startIdx int, startSec float64) []string {
|
func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string, startIdx int, startSec float64) []string {
|
||||||
profile := ResolveEncoderProfile(cfg.Transcode.HWAccel, cfg.Transcode.Preset)
|
profile := ResolveEncoderProfile(cfg.Transcode.HWAccel, cfg.Transcode.Preset)
|
||||||
args := []string{"-y", "-hide_banner", "-loglevel", "warning"}
|
// -stats forces ffmpeg to emit the frame=/fps=/speed= progress line to
|
||||||
|
// stderr even at -loglevel warning; hlsStderrCapture parses it for live
|
||||||
|
// transcode telemetry (F3) without logging it.
|
||||||
|
args := []string{"-y", "-hide_banner", "-loglevel", "warning", "-stats"}
|
||||||
|
|
||||||
// Demuxer-side HW-decode hint. Sourced from the profile so a future
|
// Demuxer-side HW-decode hint. Sourced from the profile so a future
|
||||||
// codec/hint mismatch is impossible — the encoder + decode hint are
|
// codec/hint mismatch is impossible — the encoder + decode hint are
|
||||||
|
|
@ -1581,6 +1662,46 @@ type hlsStderrCapture struct {
|
||||||
|
|
||||||
const maxStderrBuf = 64 * 1024
|
const maxStderrBuf = 64 * 1024
|
||||||
|
|
||||||
|
// ffmpeg -stats progress lines look like:
|
||||||
|
//
|
||||||
|
// frame= 123 fps= 30 q=28.0 size= 456kB time=00:00:08.00 speed=1.05x
|
||||||
|
//
|
||||||
|
// emitted with a trailing \r (overwrite-in-place), once per ~0.5s. We parse
|
||||||
|
// speed=/fps= out of them for live transcode telemetry (F3) and DON'T log them
|
||||||
|
// (one per 0.5s would drown the daemon log) — only \n-terminated warning/error
|
||||||
|
// lines reach log.Printf below.
|
||||||
|
var (
|
||||||
|
reFFmpegSpeed = regexp.MustCompile(`speed=\s*([0-9.]+)x`)
|
||||||
|
reFFmpegFps = regexp.MustCompile(`fps=\s*([0-9.]+)`)
|
||||||
|
)
|
||||||
|
|
||||||
|
func parseFFmpegProgress(line string) (speedX, fps float64, ok bool) {
|
||||||
|
m := reFFmpegSpeed.FindStringSubmatch(line)
|
||||||
|
if m == nil {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
v, err := strconv.ParseFloat(m[1], 64)
|
||||||
|
if err != nil {
|
||||||
|
return 0, 0, false
|
||||||
|
}
|
||||||
|
if fm := reFFmpegFps.FindStringSubmatch(line); fm != nil {
|
||||||
|
fps, _ = strconv.ParseFloat(fm[1], 64)
|
||||||
|
}
|
||||||
|
return v, fps, true
|
||||||
|
}
|
||||||
|
|
||||||
|
// isInputBoundLine spots ffmpeg stderr that means the SOURCE read failed (slow
|
||||||
|
// debrid link, dropped torrent peer, network timeout) rather than the encoder
|
||||||
|
// being too slow — so the player names the bottleneck as the link, not the GPU.
|
||||||
|
func isInputBoundLine(line string) bool {
|
||||||
|
l := strings.ToLower(line)
|
||||||
|
return strings.Contains(l, "i/o error") ||
|
||||||
|
strings.Contains(l, "connection reset") ||
|
||||||
|
strings.Contains(l, "rw_timeout") ||
|
||||||
|
strings.Contains(l, "error in the pull function") ||
|
||||||
|
strings.Contains(l, "connection timed out")
|
||||||
|
}
|
||||||
|
|
||||||
func (c *hlsStderrCapture) Write(p []byte) (int, error) {
|
func (c *hlsStderrCapture) Write(p []byte) (int, error) {
|
||||||
// If the incoming chunk alone exceeds the cap (very long unterminated
|
// If the incoming chunk alone exceeds the cap (very long unterminated
|
||||||
// line), drop the buffered prefix AND truncate p so a single multi-MB
|
// line), drop the buffered prefix AND truncate p so a single multi-MB
|
||||||
|
|
@ -1589,20 +1710,33 @@ func (c *hlsStderrCapture) Write(p []byte) (int, error) {
|
||||||
c.buf.Reset()
|
c.buf.Reset()
|
||||||
p = p[len(p)-maxStderrBuf:]
|
p = p[len(p)-maxStderrBuf:]
|
||||||
} else if c.buf.Len()+len(p) > maxStderrBuf {
|
} else if c.buf.Len()+len(p) > maxStderrBuf {
|
||||||
// Drop the unterminated partial line; we'll resync on the next \n.
|
// Drop the unterminated partial line; we'll resync on the next \r/\n.
|
||||||
c.buf.Reset()
|
c.buf.Reset()
|
||||||
}
|
}
|
||||||
c.buf.Write(p)
|
c.buf.Write(p)
|
||||||
|
// Frame on \r OR \n: ffmpeg's progress line is \r-terminated, warnings are
|
||||||
|
// \n-terminated. Parsing progress per-frame keeps the EWMA fresh; logging
|
||||||
|
// only the \n lines keeps the log readable.
|
||||||
for {
|
for {
|
||||||
line, rest, ok := strings.Cut(c.buf.String(), "\n")
|
s := c.buf.String()
|
||||||
if !ok {
|
idx := strings.IndexAny(s, "\r\n")
|
||||||
|
if idx < 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
line := strings.TrimSpace(s[:idx])
|
||||||
c.buf.Reset()
|
c.buf.Reset()
|
||||||
c.buf.WriteString(rest)
|
c.buf.WriteString(s[idx+1:])
|
||||||
if line = strings.TrimSpace(line); line != "" {
|
if line == "" {
|
||||||
log.Printf("[hls %s] ffmpeg: %s", shortHLSID(c.owner.cfg.SessionID), line)
|
continue
|
||||||
}
|
}
|
||||||
|
if speedX, fps, ok := parseFFmpegProgress(line); ok {
|
||||||
|
c.owner.recordProgress(speedX, fps)
|
||||||
|
continue // progress line — telemetry only, never logged
|
||||||
|
}
|
||||||
|
if isInputBoundLine(line) {
|
||||||
|
c.owner.markInputBound()
|
||||||
|
}
|
||||||
|
log.Printf("[hls %s] ffmpeg: %s", shortHLSID(c.owner.cfg.SessionID), line)
|
||||||
}
|
}
|
||||||
return len(p), nil
|
return len(p), nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
103
internal/engine/hls_progress_test.go
Normal file
103
internal/engine/hls_progress_test.go
Normal file
|
|
@ -0,0 +1,103 @@
|
||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseFFmpegProgress(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
line string
|
||||||
|
wantSpeed float64
|
||||||
|
wantFps float64
|
||||||
|
wantOK bool
|
||||||
|
}{
|
||||||
|
{"realtime", "frame= 123 fps= 30 q=28.0 size= 456kB time=00:00:08.00 bitrate=467.0kbits/s speed=1.05x", 1.05, 30, true},
|
||||||
|
{"slow", "frame= 12 fps=2.4 q=-1.0 size= 40kB time=00:00:00.40 speed=0.18x", 0.18, 2.4, true},
|
||||||
|
{"tight_spacing", "speed=2x", 2, 0, true},
|
||||||
|
{"no_speed", "[libplacebo @ 0x55] Spent 2657ms on a slow shader", 0, 0, false},
|
||||||
|
{"warning_line", "[hevc @ 0x7f] Could not find ref with POC 12", 0, 0, false},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
sp, fps, ok := parseFFmpegProgress(c.line)
|
||||||
|
if ok != c.wantOK {
|
||||||
|
t.Fatalf("ok=%v want %v", ok, c.wantOK)
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if math.Abs(sp-c.wantSpeed) > 1e-9 {
|
||||||
|
t.Errorf("speed=%v want %v", sp, c.wantSpeed)
|
||||||
|
}
|
||||||
|
if math.Abs(fps-c.wantFps) > 1e-9 {
|
||||||
|
t.Errorf("fps=%v want %v", fps, c.wantFps)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIsInputBoundLine(t *testing.T) {
|
||||||
|
bound := []string{
|
||||||
|
"[http @ 0x55] HTTP error: Connection reset by peer",
|
||||||
|
"rw_timeout reached, aborting",
|
||||||
|
"Error in the pull function.",
|
||||||
|
"tcp://: I/O error",
|
||||||
|
}
|
||||||
|
for _, l := range bound {
|
||||||
|
if !isInputBoundLine(l) {
|
||||||
|
t.Errorf("expected input-bound: %q", l)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
notBound := []string{
|
||||||
|
"frame= 1 fps=30 speed=1.0x",
|
||||||
|
"[libplacebo] slow shader",
|
||||||
|
}
|
||||||
|
for _, l := range notBound {
|
||||||
|
if isInputBoundLine(l) {
|
||||||
|
t.Errorf("expected NOT input-bound: %q", l)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// hlsStderrCapture must frame on \r (progress) as well as \n (warnings),
|
||||||
|
// fold progress into the EWMA, and surface a sustained slow encode as < 1.0x.
|
||||||
|
func TestHlsStderrCaptureProgressEWMA(t *testing.T) {
|
||||||
|
s := &HLSSession{}
|
||||||
|
s.cfg.SessionID = "test-session-00000000"
|
||||||
|
c := &hlsStderrCapture{owner: s}
|
||||||
|
|
||||||
|
// Cold-start frames ffmpeg emits while the pipeline fills — must be skipped
|
||||||
|
// (hlsStatsWarmupSkip) so they don't drag the EWMA into a false struggle.
|
||||||
|
warmup := "frame=0 fps=0 speed=0.01x\r" +
|
||||||
|
"frame=0 fps=0 speed=0.04x\r"
|
||||||
|
// A burst of \r-terminated steady-state progress lines, like real ffmpeg.
|
||||||
|
chunk := "frame=1 fps=2 speed=0.20x\r" +
|
||||||
|
"frame=2 fps=2 speed=0.21x\r" +
|
||||||
|
"frame=3 fps=2 speed=0.19x\r" +
|
||||||
|
"frame=4 fps=2 speed=0.20x\r" +
|
||||||
|
"frame=5 fps=2 speed=0.20x\r"
|
||||||
|
if _, err := c.Write([]byte(warmup + chunk)); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
st := s.GetTranscodeStats()
|
||||||
|
// 7 progress lines written, first hlsStatsWarmupSkip(2) discarded → 5 counted.
|
||||||
|
if st.Samples != 5 {
|
||||||
|
t.Fatalf("samples=%d want 5 (7 lines - 2 warmup)", st.Samples)
|
||||||
|
}
|
||||||
|
if st.SpeedX > 0.5 || st.SpeedX < 0.1 {
|
||||||
|
t.Errorf("speedX EWMA=%v, want ~0.2 (sustained slow encode)", st.SpeedX)
|
||||||
|
}
|
||||||
|
if st.InputBound {
|
||||||
|
t.Error("not input-bound for a pure slow encode")
|
||||||
|
}
|
||||||
|
|
||||||
|
// A \n-terminated I/O error line flips input-bound.
|
||||||
|
if _, err := c.Write([]byte("tcp://: I/O error\n")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if !s.GetTranscodeStats().InputBound {
|
||||||
|
t.Error("expected input-bound after I/O error line")
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue