feat(daemon): telemetría de salud continua + heartbeat de sesiones copy

El watcher F3 posteaba UN snapshot de speed= al arrancar y moría: un encoder
sano en el minuto 1 que se ahoga en el minuto 20 (escena compleja, GPU robada
por otro proceso) era invisible para el triage de stalls del player, que
decidía con el dato de arranque.

- monitorSessionHealth: ticker 5s el resto de la sesión; re-postea al cambiar
  el bucket ok/marginal/struggling (con histéresis de 2 ticks — una EWMA
  bailando sobre 0.95 no puede webhookear cada 5s) o al derivar el ratio
  ≥0.15. Un POST fallido NO avanza el baseline: el tick siguiente reintenta
  (perder el único webhook de la transición a struggling cegaba al player
  justo en el caso que esto existe para cubrir).
- resetTranscodeStats() en restartFromSegment: el ffmpeg nuevo de un seek
  re-arma el warmup y resiembra la EWMA — sus frames fríos (speed=0.0x)
  hundían la media curada a <0.75 y el monitor habría posteado un
  "struggling" falso que pausaba el player en pleno seek. Verificado e2e:
  dos restarts (seek a 1200s) con health estable en ok.
- inputBound ventanado (30s) en vez de pegajoso: un blip de lectura
  transitorio ya no reclasifica como input_bound/struggling cada dip <0.95
  durante el resto de una sesión de horas.
- Heartbeat copy (F2): las sesiones -c:v copy postean una vez
  {ok, 1.0, "copy"} tras el ready — la web ya distingue "sesión copy" de
  "agente viejo sin telemetría" (ambos eran null). Segundo POST deliberado:
  un 400 de una web vieja (enum sin "copy") jamás debe bloquear el ready.
- Logs de fallo etiquetados por tipo de POST: un heartbeat fallido ya no se
  lee como "mark-ready failed" (el ready SÍ aterrizó).

Requiere web con session-ready/SSE actualizados (desplegar web primero;
contra web vieja todo degrada a best-effort con log).
This commit is contained in:
Deivid Soto 2026-06-11 20:53:18 +02:00
parent 2b9d576aee
commit f0c51c5d90
3 changed files with 129 additions and 12 deletions

View file

@ -187,7 +187,10 @@ type SessionHealth struct {
Health string `json:"health"`
// ffmpeg speed= EWMA: 1.0 = exactly realtime, < 1.0 = slower than playback.
RealtimeRatio float64 `json:"realtimeRatio"`
// "realtime" | "transcode" (encoder is the wall) | "input_bound" (source read).
// "realtime" | "transcode" (encoder is the wall) | "input_bound" (source
// read) | "copy" (HLS-copy session: no encode — always realtime; the
// heartbeat exists so the web can tell "copy session" from "old agent
// with no telemetry", which both used to read as a null health).
Reason string `json:"reason"`
}

View file

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log"
"math"
"os"
"os/signal"
"path/filepath"
@ -1614,15 +1615,21 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
readyPosted := false
postReady := func(health *agent.SessionHealth) {
// `kind` labels the log line: a failed health re-post must not read as a
// failed READY (the ready already landed — whoever debugs an eternal
// "Preparando…" would chase the wrong webhook). Returns the error so the
// F1 monitor only advances its baseline on a post that actually landed.
postReady := func(health *agent.SessionHealth, kind string) error {
// Parent ctx so a session cancel mid-POST (user closed tab, daemon
// shutdown) tears down the in-flight webhook instead of blocking the
// goroutine for up to 10 s on a now-orphan call.
rctx, cancel := context.WithTimeout(ctx, 10*time.Second)
if err := client.MarkSessionReady(rctx, sessionID, health); err != nil {
log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
err := client.MarkSessionReady(rctx, sessionID, health)
if err != nil {
log.Printf("[hls %s] %s failed: %v", agent.ShortID(sessionID), kind, err)
}
cancel()
return err
}
for {
// Session torn down through a path that didn't cancel ctx (registry
@ -1639,20 +1646,42 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.
// `>= 1` would fire "ready" instantly and freeze the player waiting
// on a segment that doesn't exist yet.
if !readyPosted && (hsess.FromCache() || hsess.ReadyCount() > hsess.WriterStartIdx()) {
postReady(nil)
_ = postReady(nil, "mark-ready")
readyPosted = true
// Cache replay has no live encode → no telemetry to report, done.
if hsess.FromCache() {
return
}
// HLS-copy session (F2): ffmpeg copies I/O-bound and emits no
// usable -stats, so there is nothing to monitor — but a one-shot
// "copy" heartbeat lets the web tell "copy session, producer
// fine" from "old agent, no telemetry" (both read as null before).
// Deliberately a SECOND post (not merged into the ready one): an
// older web rejects the unknown "copy" reason with a 400, and that
// must never block the ready flip itself.
if hsess.IsVideoCopy() {
_ = postReady(&agent.SessionHealth{Health: "ok", RealtimeRatio: 1, Reason: "copy"}, "copy health heartbeat (ready already marked)")
return
}
}
// Phase 2 (F3): once enough -stats samples accumulated (encoder past
// its cold ramp), report ONE live-health snapshot so the player can
// name a too-slow transcode in ~4s instead of inferring it from stalls.
// its cold ramp), report the first live-health snapshot so the player
// can name a too-slow transcode in ~4s instead of inferring it from
// stalls — then keep MONITORING (F1): an encode that is fine in
// minute 1 can fall behind in minute 20 (complex scene, GPU stolen by
// another process) and the player's stall triage needs the fresh
// ratio, not the boot-time snapshot.
// >=4 samples ≈ 2s of encoding past seg-0; the EWMA has settled by then.
if readyPosted {
if st := hsess.GetTranscodeStats(); st.Samples >= 4 {
postReady(classifyAgentHealth(st))
first := classifyAgentHealth(st)
if postReady(first, "health snapshot (ready already marked)") != nil {
// Snapshot never landed — hand the monitor an empty
// baseline so its first tick re-posts instead of
// believing the web already knows this state.
first = &agent.SessionHealth{}
}
monitorSessionHealth(ctx, hsess, sessionID, first, postReady)
return
}
}
@ -1669,7 +1698,7 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.
// Ready but never got stable telemetry — report whatever we have so
// the player isn't left without a verdict (better partial than none).
if st := hsess.GetTranscodeStats(); st.Samples > 0 {
postReady(classifyAgentHealth(st))
_ = postReady(classifyAgentHealth(st), "late health snapshot (ready already marked)")
}
return
}
@ -1714,6 +1743,59 @@ func fetchAgentCert(ctx context.Context, client *agent.Client, hash string) {
log.Printf("[acme] installed cert for *.%s.%s", hash, base)
}
// monitorSessionHealth keeps re-posting the live transcode health for the
// rest of the session (F1). Re-posts when the verdict BUCKET flips — only
// after the new bucket holds for 2 consecutive ticks (hysteresis: an EWMA
// dancing around a cutoff like 0.95 must not webhook every 5s) — or when the
// ratio drifts ≥0.15 within the same bucket. Worst case is therefore one
// post per 2 ticks during a genuinely volatile encode, none when steady. A
// failed post keeps the previous baseline so the next tick retries — losing
// the single ok→struggling transition webhook would blind the player for
// the rest of the session. Exits with the session (ctx cancel or registry
// close); no own deadline on purpose — the session lifetime IS the bound.
func monitorSessionHealth(ctx context.Context, hsess *engine.HLSSession, sessionID string, last *agent.SessionHealth, post func(*agent.SessionHealth, string) error) {
const ratioDrift = 0.15
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
pendingBucket := ""
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if hsess.IsClosed() {
return
}
st := hsess.GetTranscodeStats()
if st.Samples == 0 {
// Fresh ffmpeg (seek restart re-armed the warmup) — wait for the
// new process's own measurements.
continue
}
h := classifyAgentHealth(st)
if h.Health == last.Health {
pendingBucket = ""
if math.Abs(h.RealtimeRatio-last.RealtimeRatio) < ratioDrift {
continue
}
} else if h.Health != pendingBucket {
// First tick in a different bucket — remember it, post only if
// the next tick still agrees (filters threshold flapping AND any
// residual cold-ramp dip a restart reset didn't fully absorb).
pendingBucket = h.Health
continue
} else {
pendingBucket = ""
}
log.Printf("[hls %s] health %s→%s (speed %.2fx→%.2fx)", agent.ShortID(sessionID), last.Health, h.Health, last.RealtimeRatio, h.RealtimeRatio)
if post(h, "health update") != nil {
continue // baseline unchanged → next tick retries the transition
}
last = h
}
}
// Realtime-ratio cutoffs for classifyAgentHealth. This is a cross-repo contract
// with the web bottleneck classifier (src/lib/stream/bottleneck-classifier.ts):
// - ≥ realtimeFloor → "ok" (encoder keeps up)

View file

@ -317,7 +317,12 @@ type HLSSession struct {
fpsEWMA float64
speedSamples int
warmupSeen int // cold-start frames discarded before the EWMA is trusted
inputBound bool
// Walltime of the LAST source-read error ffmpeg reported. Windowed (see
// hlsInputBoundWindow) instead of a sticky bool: with the F1 continuous
// monitor a single transient read blip (peer drop, debrid hiccup ffmpeg
// reconnects through) must not reclassify every sub-realtime dip as
// "input_bound/struggling" for the rest of a multi-hour session.
inputErrAt time.Time
}
// hlsSeekAhead is how many segments past the writer's current position the
@ -395,6 +400,12 @@ func (r *HLSSessionRegistry) CloseWhere(pred func(*HLSSession) bool) int {
// cache-fill (HLSSessionConfig.Prewarm). cfg is immutable after construction.
func (s *HLSSession) IsPrewarm() bool { return s.cfg.Prewarm }
// IsVideoCopy reports whether this session serves -c:v copy (no video
// re-encode). Copy sessions emit no ffmpeg -stats telemetry, so the ready
// watcher posts a one-shot "copy" health heartbeat instead of waiting for
// speed= samples that never arrive.
func (s *HLSSession) IsVideoCopy() bool { return s.cfg.VideoCopy }
// RegisterKeep adds a session WITHOUT displacing the others — the prewarm
// path: a background cache-fill encode must not evict the viewer's live
// session (Register's eviction killed the stream being watched when the
@ -830,11 +841,16 @@ func (s *HLSSession) GetTranscodeStats() TranscodeStats {
SpeedX: s.speedEWMA,
Fps: s.fpsEWMA,
Samples: s.speedSamples,
InputBound: s.inputBound,
InputBound: !s.inputErrAt.IsZero() && time.Since(s.inputErrAt) < hlsInputBoundWindow,
FromCache: s.fromCache,
}
}
// hlsInputBoundWindow bounds how long a source-read error keeps classifying
// the session as input-bound. Past it, a sub-realtime encode is the encoder's
// own problem again (the transient link blip resolved or ffmpeg reconnected).
const hlsInputBoundWindow = 30 * time.Second
// hlsStatsWarmupSkip is how many leading -stats frames to discard before
// trusting the EWMA. ffmpeg's first readings reflect the pipeline filling
// (often speed=0.0x) and would otherwise drag a healthy encoder into a false
@ -868,7 +884,22 @@ func (s *HLSSession) recordProgress(speedX, fps float64) {
// the input pull (slow debrid link / dropped torrent peer), not the encoder.
func (s *HLSSession) markInputBound() {
s.statsMu.Lock()
s.inputBound = true
s.inputErrAt = time.Now()
s.statsMu.Unlock()
}
// resetTranscodeStats re-arms the cold-start warmup and drops the EWMAs +
// input-error mark. MUST be called whenever a NEW ffmpeg process starts
// inside the same session (seek restart, auto-restart supervisor): the new
// process's pipeline-fill frames read speed=0.0x, and folding them into the
// already-warmed EWMA drags a healthy 1.5x encode under the 0.75 struggling
// floor in two samples — which the F1 health monitor would then report as a
// false "struggling" (pausing the player) right at the seek the user made.
func (s *HLSSession) resetTranscodeStats() {
s.statsMu.Lock()
s.warmupSeen = 0
s.speedSamples = 0 // recordProgress re-seeds the EWMA on the next sample
s.inputErrAt = time.Time{}
s.statsMu.Unlock()
}
@ -1415,6 +1446,7 @@ func (s *HLSSession) restartFromSegment(targetIdx int) error {
}
// Reset session state so the poll + wait machinery picks up the new run.
s.resetTranscodeStats() // new ffmpeg = new cold ramp; don't poison the EWMA
s.mu.Lock()
s.cmd = cmd
s.cancel = cancel