diff --git a/internal/agent/client.go b/internal/agent/client.go index f1014c5..aceeb66 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -119,10 +119,11 @@ func (c *Client) ReportUpgradeResult(ctx context.Context, agentID string, succes // will reach the same conclusion via HEAD probes anyway if this call // fails. We log the error in the caller but don't retry — by the time // a retry would land the user is likely already playing. -func (c *Client) MarkSessionReady(ctx context.Context, sessionID string) error { +func (c *Client) MarkSessionReady(ctx context.Context, sessionID string, health *SessionHealth) error { req := struct { - SessionID string `json:"sessionId"` - }{SessionID: sessionID} + SessionID string `json:"sessionId"` + Health *SessionHealth `json:"health,omitempty"` + }{SessionID: sessionID, Health: health} var resp StatusResponse if err := c.doPost(ctx, "/api/internal/agent/session-ready", req, &resp); err != nil { return fmt.Errorf("mark session ready: %w", err) @@ -130,6 +131,20 @@ func (c *Client) MarkSessionReady(ctx context.Context, sessionID string) error { return nil } +// SessionHealth is an OPTIONAL live-transcode health snapshot attached to a +// session-ready report (F3). A nil *SessionHealth means the agent has no +// telemetry to share (cache hit, direct-play, or progress not yet stable) and +// the web side keeps its stall-shape heuristic. Old web replicas ignore the +// extra field; old agents simply never send it. +type SessionHealth struct { + // "ok" (≥ realtime) | "marginal" (keeps up barely) | "struggling" (can't). + Health string `json:"health"` + // ffmpeg speed= EWMA: 1.0 = exactly realtime, < 1.0 = slower than playback. + RealtimeRatio float64 `json:"realtimeRatio"` + // "realtime" | "transcode" (encoder is the wall) | "input_bound" (source read). + Reason string `json:"reason"` +} + // RefreshStreamURL re-resolves a fresh debrid direct URL for a live streaming // session (hueco #2 / 2c). Called by the daemon when a debrid source expires // mid-stream (the link is time-limited; the content is still cached). Returns diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 5818640..3ad69e3 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -763,7 +763,7 @@ func runDaemonStart() error { agent.ShortID(sess.SessionID), provider.FileName(), provider.FileSize()) rctx, rcancel := context.WithTimeout(ctx, 10*time.Second) defer rcancel() - if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil { + if err := agentClient.MarkSessionReady(rctx, sess.SessionID, nil); err != nil { log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err) } }() @@ -858,7 +858,7 @@ func runDaemonStart() error { go func() { rctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil { + if err := agentClient.MarkSessionReady(rctx, sess.SessionID, nil); err != nil { log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err) } }() @@ -906,7 +906,7 @@ func runDaemonStart() error { go func() { rctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() - if err := agentClient.MarkSessionReady(rctx, sess.SessionID); err != nil { + if err := agentClient.MarkSessionReady(rctx, sess.SessionID, nil); err != nil { log.Printf("[stream %s] mark-ready failed: %v", agent.ShortID(sess.SessionID), err) } }() @@ -1386,6 +1386,17 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine. deadline := time.Now().Add(60 * time.Second) ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() + readyPosted := false + postReady := func(health *agent.SessionHealth) { + // Parent ctx so a session cancel mid-POST (user closed tab, daemon + // shutdown) tears down the in-flight webhook instead of blocking the + // goroutine for up to 10 s on a now-orphan call. + rctx, cancel := context.WithTimeout(ctx, 10*time.Second) + if err := client.MarkSessionReady(rctx, sessionID, health); err != nil { + log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err) + } + cancel() + } for { // Session torn down through a path that didn't cancel ctx (registry // replace, idle sweep, internal kill). Bail before polling further — @@ -1394,17 +1405,24 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine. if hsess.IsClosed() { return } - // Cache HIT or seg-0 ready → notify + done. - if hsess.FromCache() || hsess.ReadyCount() >= 1 { - // Parent ctx so a session cancel mid-POST (user closed tab, - // daemon shutdown) tears down the in-flight webhook instead of - // blocking the goroutine for up to 10 s on a now-orphan call. - rctx, cancel := context.WithTimeout(ctx, 10*time.Second) - if err := client.MarkSessionReady(rctx, sessionID); err != nil { - log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err) + // Phase 1: cache HIT or seg-0 ready → flip the "Preparando…" UI now. + if !readyPosted && (hsess.FromCache() || hsess.ReadyCount() >= 1) { + postReady(nil) + readyPosted = true + // Cache replay has no live encode → no telemetry to report, done. + if hsess.FromCache() { + return + } + } + // Phase 2 (F3): once enough -stats samples accumulated (encoder past + // its cold ramp), report ONE live-health snapshot so the player can + // name a too-slow transcode in ~4s instead of inferring it from stalls. + // >=4 samples ≈ 2s of encoding past seg-0; the EWMA has settled by then. + if readyPosted { + if st := hsess.GetTranscodeStats(); st.Samples >= 4 { + postReady(classifyAgentHealth(st)) + return } - cancel() - return } select { case <-ctx.Done(): @@ -1412,8 +1430,49 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine. case <-ticker.C: } if time.Now().After(deadline) { - log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID)) + if !readyPosted { + log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID)) + return + } + // Ready but never got stable telemetry — report whatever we have so + // the player isn't left without a verdict (better partial than none). + if st := hsess.GetTranscodeStats(); st.Samples > 0 { + postReady(classifyAgentHealth(st)) + } return } } } + +// Realtime-ratio cutoffs for classifyAgentHealth. This is a cross-repo contract +// with the web bottleneck classifier (src/lib/stream/bottleneck-classifier.ts): +// - ≥ realtimeFloor → "ok" (encoder keeps up) +// - [strugglingFloor,..) → "marginal" (barely) +// - < strugglingFloor → "struggling" (can't) — the web fast-path commits +// the honest overlay + pauses on this WITHOUT waiting for a stall, so the +// floor is intentionally conservative (the web uses a looser 0.85 only once +// a stall has already corroborated the slowdown). +const ( + agentRealtimeFloor = 0.95 + agentStrugglingFloor = 0.75 +) + +// classifyAgentHealth turns a live ffmpeg telemetry snapshot into the health +// report the web side consumes (F3). The ×realtime speed is the load-bearing +// signal: < 1.0 means the encode can't keep up with playback. An input-bound +// hint (source read error) reclassifies the cause as the link, not the encoder. +func classifyAgentHealth(st engine.TranscodeStats) *agent.SessionHealth { + ratio := st.SpeedX + var health, reason string + switch { + case st.InputBound && ratio < agentRealtimeFloor: + health, reason = "struggling", "input_bound" + case ratio >= agentRealtimeFloor: + health, reason = "ok", "realtime" + case ratio >= agentStrugglingFloor: + health, reason = "marginal", "transcode" + default: + health, reason = "struggling", "transcode" + } + return &agent.SessionHealth{Health: health, RealtimeRatio: ratio, Reason: reason} +} diff --git a/internal/engine/hls.go b/internal/engine/hls.go index 181973b..30261a6 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -27,6 +27,7 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "strconv" "strings" "sync" @@ -254,6 +255,21 @@ type HLSSession struct { cacheKey string fromCache bool writerLockHeld bool + + // Live transcode telemetry (F3). ffmpeg's -stats progress line is parsed + // in hlsStderrCapture.Write into an EWMA of speed= (×realtime) + fps=, plus + // an input-bound hint set when the SOURCE read errors (slow/broken pull vs a + // too-slow encode). GetTranscodeStats() snapshots this so the ready-watcher + // can report a real measurement to the web side — letting the player name a + // too-slow transcode honestly in ~4s instead of inferring it from stall + // shape over 15-30s. Guarded by statsMu (the stderr goroutine writes; the + // watcher goroutine reads). + statsMu sync.Mutex + speedEWMA float64 + fpsEWMA float64 + speedSamples int + warmupSeen int // cold-start frames discarded before the EWMA is trusted + inputBound bool } // hlsSeekAhead is how many segments past the writer's current position the @@ -595,6 +611,68 @@ func (s *HLSSession) ReadyCount() int { // circuit polling — a cache HIT is ready the moment we return. func (s *HLSSession) FromCache() bool { return s.fromCache } +// TranscodeStats is a point-in-time snapshot of live ffmpeg progress for one +// HLS session (F3). SpeedX < 1.0 means the encode runs slower than realtime — +// the player can't sustain playback without buffering. Samples==0 means no +// -stats line has been parsed yet (the watcher keeps waiting before reporting). +type TranscodeStats struct { + SpeedX float64 // EWMA of ffmpeg speed= (×realtime; 1.0 = exactly realtime) + Fps float64 // EWMA of ffmpeg fps= + Samples int // progress lines parsed so far (0 = no telemetry yet) + InputBound bool // source read hit I/O errors (slow/broken pull, not encode) + FromCache bool // replayed from cache → no live encode, stats meaningless +} + +// GetTranscodeStats returns a snapshot of the parsed ffmpeg progress EWMAs. +func (s *HLSSession) GetTranscodeStats() TranscodeStats { + s.statsMu.Lock() + defer s.statsMu.Unlock() + return TranscodeStats{ + SpeedX: s.speedEWMA, + Fps: s.fpsEWMA, + Samples: s.speedSamples, + InputBound: s.inputBound, + FromCache: s.fromCache, + } +} + +// hlsStatsWarmupSkip is how many leading -stats frames to discard before +// trusting the EWMA. ffmpeg's first readings reflect the pipeline filling +// (often speed=0.0x) and would otherwise drag a healthy encoder into a false +// "struggling" verdict that pauses a stream which plays fine once warmed up. +const hlsStatsWarmupSkip = 2 + +// recordProgress folds one parsed ffmpeg -stats sample into the session EWMAs. +// alpha=0.3 smooths the noisy per-line numbers while still tracking a sustained +// slowdown within a few samples (~2s of encoding). +func (s *HLSSession) recordProgress(speedX, fps float64) { + s.statsMu.Lock() + defer s.statsMu.Unlock() + // Drop the cold-start frames so a steady-state slowdown — not the encoder + // spin-up — is what the watcher reports. + if s.warmupSeen < hlsStatsWarmupSkip { + s.warmupSeen++ + return + } + const alpha = 0.3 + if s.speedSamples == 0 { + s.speedEWMA = speedX + s.fpsEWMA = fps + } else { + s.speedEWMA = alpha*speedX + (1-alpha)*s.speedEWMA + s.fpsEWMA = alpha*fps + (1-alpha)*s.fpsEWMA + } + s.speedSamples++ +} + +// markInputBound flags that ffmpeg reported a source-read error — the wall is +// the input pull (slow debrid link / dropped torrent peer), not the encoder. +func (s *HLSSession) markInputBound() { + s.statsMu.Lock() + s.inputBound = true + s.statsMu.Unlock() +} + // IsClosed reports whether Close() has been invoked. Exposed (vs the // internal isClosed) so external watchers — the ready-webhook // goroutine in cmd/daemon.go — can short-circuit polling on a session @@ -1140,7 +1218,10 @@ func ResolveEncoderProfile(hw HWAccel, configuredPreset string) EncoderProfile { // `-output_ts_offset` keeps the segment PTS aligned with manifest timeline. func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string, startIdx int, startSec float64) []string { profile := ResolveEncoderProfile(cfg.Transcode.HWAccel, cfg.Transcode.Preset) - args := []string{"-y", "-hide_banner", "-loglevel", "warning"} + // -stats forces ffmpeg to emit the frame=/fps=/speed= progress line to + // stderr even at -loglevel warning; hlsStderrCapture parses it for live + // transcode telemetry (F3) without logging it. + args := []string{"-y", "-hide_banner", "-loglevel", "warning", "-stats"} // Demuxer-side HW-decode hint. Sourced from the profile so a future // codec/hint mismatch is impossible — the encoder + decode hint are @@ -1581,6 +1662,46 @@ type hlsStderrCapture struct { const maxStderrBuf = 64 * 1024 +// ffmpeg -stats progress lines look like: +// +// frame= 123 fps= 30 q=28.0 size= 456kB time=00:00:08.00 speed=1.05x +// +// emitted with a trailing \r (overwrite-in-place), once per ~0.5s. We parse +// speed=/fps= out of them for live transcode telemetry (F3) and DON'T log them +// (one per 0.5s would drown the daemon log) — only \n-terminated warning/error +// lines reach log.Printf below. +var ( + reFFmpegSpeed = regexp.MustCompile(`speed=\s*([0-9.]+)x`) + reFFmpegFps = regexp.MustCompile(`fps=\s*([0-9.]+)`) +) + +func parseFFmpegProgress(line string) (speedX, fps float64, ok bool) { + m := reFFmpegSpeed.FindStringSubmatch(line) + if m == nil { + return 0, 0, false + } + v, err := strconv.ParseFloat(m[1], 64) + if err != nil { + return 0, 0, false + } + if fm := reFFmpegFps.FindStringSubmatch(line); fm != nil { + fps, _ = strconv.ParseFloat(fm[1], 64) + } + return v, fps, true +} + +// isInputBoundLine spots ffmpeg stderr that means the SOURCE read failed (slow +// debrid link, dropped torrent peer, network timeout) rather than the encoder +// being too slow — so the player names the bottleneck as the link, not the GPU. +func isInputBoundLine(line string) bool { + l := strings.ToLower(line) + return strings.Contains(l, "i/o error") || + strings.Contains(l, "connection reset") || + strings.Contains(l, "rw_timeout") || + strings.Contains(l, "error in the pull function") || + strings.Contains(l, "connection timed out") +} + func (c *hlsStderrCapture) Write(p []byte) (int, error) { // If the incoming chunk alone exceeds the cap (very long unterminated // line), drop the buffered prefix AND truncate p so a single multi-MB @@ -1589,20 +1710,33 @@ func (c *hlsStderrCapture) Write(p []byte) (int, error) { c.buf.Reset() p = p[len(p)-maxStderrBuf:] } else if c.buf.Len()+len(p) > maxStderrBuf { - // Drop the unterminated partial line; we'll resync on the next \n. + // Drop the unterminated partial line; we'll resync on the next \r/\n. c.buf.Reset() } c.buf.Write(p) + // Frame on \r OR \n: ffmpeg's progress line is \r-terminated, warnings are + // \n-terminated. Parsing progress per-frame keeps the EWMA fresh; logging + // only the \n lines keeps the log readable. for { - line, rest, ok := strings.Cut(c.buf.String(), "\n") - if !ok { + s := c.buf.String() + idx := strings.IndexAny(s, "\r\n") + if idx < 0 { break } + line := strings.TrimSpace(s[:idx]) c.buf.Reset() - c.buf.WriteString(rest) - if line = strings.TrimSpace(line); line != "" { - log.Printf("[hls %s] ffmpeg: %s", shortHLSID(c.owner.cfg.SessionID), line) + c.buf.WriteString(s[idx+1:]) + if line == "" { + continue } + if speedX, fps, ok := parseFFmpegProgress(line); ok { + c.owner.recordProgress(speedX, fps) + continue // progress line — telemetry only, never logged + } + if isInputBoundLine(line) { + c.owner.markInputBound() + } + log.Printf("[hls %s] ffmpeg: %s", shortHLSID(c.owner.cfg.SessionID), line) } return len(p), nil } diff --git a/internal/engine/hls_progress_test.go b/internal/engine/hls_progress_test.go new file mode 100644 index 0000000..f2639f6 --- /dev/null +++ b/internal/engine/hls_progress_test.go @@ -0,0 +1,103 @@ +package engine + +import ( + "math" + "testing" +) + +func TestParseFFmpegProgress(t *testing.T) { + cases := []struct { + name string + line string + wantSpeed float64 + wantFps float64 + wantOK bool + }{ + {"realtime", "frame= 123 fps= 30 q=28.0 size= 456kB time=00:00:08.00 bitrate=467.0kbits/s speed=1.05x", 1.05, 30, true}, + {"slow", "frame= 12 fps=2.4 q=-1.0 size= 40kB time=00:00:00.40 speed=0.18x", 0.18, 2.4, true}, + {"tight_spacing", "speed=2x", 2, 0, true}, + {"no_speed", "[libplacebo @ 0x55] Spent 2657ms on a slow shader", 0, 0, false}, + {"warning_line", "[hevc @ 0x7f] Could not find ref with POC 12", 0, 0, false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + sp, fps, ok := parseFFmpegProgress(c.line) + if ok != c.wantOK { + t.Fatalf("ok=%v want %v", ok, c.wantOK) + } + if !ok { + return + } + if math.Abs(sp-c.wantSpeed) > 1e-9 { + t.Errorf("speed=%v want %v", sp, c.wantSpeed) + } + if math.Abs(fps-c.wantFps) > 1e-9 { + t.Errorf("fps=%v want %v", fps, c.wantFps) + } + }) + } +} + +func TestIsInputBoundLine(t *testing.T) { + bound := []string{ + "[http @ 0x55] HTTP error: Connection reset by peer", + "rw_timeout reached, aborting", + "Error in the pull function.", + "tcp://: I/O error", + } + for _, l := range bound { + if !isInputBoundLine(l) { + t.Errorf("expected input-bound: %q", l) + } + } + notBound := []string{ + "frame= 1 fps=30 speed=1.0x", + "[libplacebo] slow shader", + } + for _, l := range notBound { + if isInputBoundLine(l) { + t.Errorf("expected NOT input-bound: %q", l) + } + } +} + +// hlsStderrCapture must frame on \r (progress) as well as \n (warnings), +// fold progress into the EWMA, and surface a sustained slow encode as < 1.0x. +func TestHlsStderrCaptureProgressEWMA(t *testing.T) { + s := &HLSSession{} + s.cfg.SessionID = "test-session-00000000" + c := &hlsStderrCapture{owner: s} + + // Cold-start frames ffmpeg emits while the pipeline fills — must be skipped + // (hlsStatsWarmupSkip) so they don't drag the EWMA into a false struggle. + warmup := "frame=0 fps=0 speed=0.01x\r" + + "frame=0 fps=0 speed=0.04x\r" + // A burst of \r-terminated steady-state progress lines, like real ffmpeg. + chunk := "frame=1 fps=2 speed=0.20x\r" + + "frame=2 fps=2 speed=0.21x\r" + + "frame=3 fps=2 speed=0.19x\r" + + "frame=4 fps=2 speed=0.20x\r" + + "frame=5 fps=2 speed=0.20x\r" + if _, err := c.Write([]byte(warmup + chunk)); err != nil { + t.Fatal(err) + } + st := s.GetTranscodeStats() + // 7 progress lines written, first hlsStatsWarmupSkip(2) discarded → 5 counted. + if st.Samples != 5 { + t.Fatalf("samples=%d want 5 (7 lines - 2 warmup)", st.Samples) + } + if st.SpeedX > 0.5 || st.SpeedX < 0.1 { + t.Errorf("speedX EWMA=%v, want ~0.2 (sustained slow encode)", st.SpeedX) + } + if st.InputBound { + t.Error("not input-bound for a pure slow encode") + } + + // A \n-terminated I/O error line flips input-bound. + if _, err := c.Write([]byte("tcp://: I/O error\n")); err != nil { + t.Fatal(err) + } + if !s.GetTranscodeStats().InputBound { + t.Error("expected input-bound after I/O error line") + } +}