From f0c51c5d905815058ecca9568f36aceca9959e74 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Thu, 11 Jun 2026 20:53:18 +0200 Subject: [PATCH] =?UTF-8?q?feat(daemon):=20telemetr=C3=ADa=20de=20salud=20?= =?UTF-8?q?continua=20+=20heartbeat=20de=20sesiones=20copy?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit El watcher F3 posteaba UN snapshot de speed= al arrancar y moría: un encoder sano en el minuto 1 que se ahoga en el minuto 20 (escena compleja, GPU robada por otro proceso) era invisible para el triage de stalls del player, que decidía con el dato de arranque. - monitorSessionHealth: ticker 5s el resto de la sesión; re-postea al cambiar el bucket ok/marginal/struggling (con histéresis de 2 ticks — una EWMA bailando sobre 0.95 no puede webhookear cada 5s) o al derivar el ratio ≥0.15. Un POST fallido NO avanza el baseline: el tick siguiente reintenta (perder el único webhook de la transición a struggling cegaba al player justo en el caso que esto existe para cubrir). - resetTranscodeStats() en restartFromSegment: el ffmpeg nuevo de un seek re-arma el warmup y resiembra la EWMA — sus frames fríos (speed=0.0x) hundían la media curada a <0.75 y el monitor habría posteado un "struggling" falso que pausaba el player en pleno seek. Verificado e2e: dos restarts (seek a 1200s) con health estable en ok. - inputBound ventanado (30s) en vez de pegajoso: un blip de lectura transitorio ya no reclasifica como input_bound/struggling cada dip <0.95 durante el resto de una sesión de horas. - Heartbeat copy (F2): las sesiones -c:v copy postean una vez {ok, 1.0, "copy"} tras el ready — la web ya distingue "sesión copy" de "agente viejo sin telemetría" (ambos eran null). Segundo POST deliberado: un 400 de una web vieja (enum sin "copy") jamás debe bloquear el ready. - Logs de fallo etiquetados por tipo de POST: un heartbeat fallido ya no se lee como "mark-ready failed" (el ready SÍ aterrizó). Requiere web con session-ready/SSE actualizados (desplegar web primero; contra web vieja todo degrada a best-effort con log). --- internal/agent/client.go | 5 +- internal/cmd/daemon.go | 98 ++++++++++++++++++++++++++++++++++++---- internal/engine/hls.go | 38 ++++++++++++++-- 3 files changed, 129 insertions(+), 12 deletions(-) diff --git a/internal/agent/client.go b/internal/agent/client.go index fc817d5..62f62ed 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -187,7 +187,10 @@ type SessionHealth struct { Health string `json:"health"` // ffmpeg speed= EWMA: 1.0 = exactly realtime, < 1.0 = slower than playback. RealtimeRatio float64 `json:"realtimeRatio"` - // "realtime" | "transcode" (encoder is the wall) | "input_bound" (source read). + // "realtime" | "transcode" (encoder is the wall) | "input_bound" (source + // read) | "copy" (HLS-copy session: no encode — always realtime; the + // heartbeat exists so the web can tell "copy session" from "old agent + // with no telemetry", which both used to read as a null health). Reason string `json:"reason"` } diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 543272f..6bcc67f 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log" + "math" "os" "os/signal" "path/filepath" @@ -1614,15 +1615,21 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine. ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() readyPosted := false - postReady := func(health *agent.SessionHealth) { + // `kind` labels the log line: a failed health re-post must not read as a + // failed READY (the ready already landed — whoever debugs an eternal + // "Preparando…" would chase the wrong webhook). Returns the error so the + // F1 monitor only advances its baseline on a post that actually landed. + postReady := func(health *agent.SessionHealth, kind string) error { // Parent ctx so a session cancel mid-POST (user closed tab, daemon // shutdown) tears down the in-flight webhook instead of blocking the // goroutine for up to 10 s on a now-orphan call. rctx, cancel := context.WithTimeout(ctx, 10*time.Second) - if err := client.MarkSessionReady(rctx, sessionID, health); err != nil { - log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err) + err := client.MarkSessionReady(rctx, sessionID, health) + if err != nil { + log.Printf("[hls %s] %s failed: %v", agent.ShortID(sessionID), kind, err) } cancel() + return err } for { // Session torn down through a path that didn't cancel ctx (registry @@ -1639,20 +1646,42 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine. // `>= 1` would fire "ready" instantly and freeze the player waiting // on a segment that doesn't exist yet. if !readyPosted && (hsess.FromCache() || hsess.ReadyCount() > hsess.WriterStartIdx()) { - postReady(nil) + _ = postReady(nil, "mark-ready") readyPosted = true // Cache replay has no live encode → no telemetry to report, done. if hsess.FromCache() { return } + // HLS-copy session (F2): ffmpeg copies I/O-bound and emits no + // usable -stats, so there is nothing to monitor — but a one-shot + // "copy" heartbeat lets the web tell "copy session, producer + // fine" from "old agent, no telemetry" (both read as null before). + // Deliberately a SECOND post (not merged into the ready one): an + // older web rejects the unknown "copy" reason with a 400, and that + // must never block the ready flip itself. + if hsess.IsVideoCopy() { + _ = postReady(&agent.SessionHealth{Health: "ok", RealtimeRatio: 1, Reason: "copy"}, "copy health heartbeat (ready already marked)") + return + } } // Phase 2 (F3): once enough -stats samples accumulated (encoder past - // its cold ramp), report ONE live-health snapshot so the player can - // name a too-slow transcode in ~4s instead of inferring it from stalls. + // its cold ramp), report the first live-health snapshot so the player + // can name a too-slow transcode in ~4s instead of inferring it from + // stalls — then keep MONITORING (F1): an encode that is fine in + // minute 1 can fall behind in minute 20 (complex scene, GPU stolen by + // another process) and the player's stall triage needs the fresh + // ratio, not the boot-time snapshot. // >=4 samples ≈ 2s of encoding past seg-0; the EWMA has settled by then. if readyPosted { if st := hsess.GetTranscodeStats(); st.Samples >= 4 { - postReady(classifyAgentHealth(st)) + first := classifyAgentHealth(st) + if postReady(first, "health snapshot (ready already marked)") != nil { + // Snapshot never landed — hand the monitor an empty + // baseline so its first tick re-posts instead of + // believing the web already knows this state. + first = &agent.SessionHealth{} + } + monitorSessionHealth(ctx, hsess, sessionID, first, postReady) return } } @@ -1669,7 +1698,7 @@ func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine. // Ready but never got stable telemetry — report whatever we have so // the player isn't left without a verdict (better partial than none). if st := hsess.GetTranscodeStats(); st.Samples > 0 { - postReady(classifyAgentHealth(st)) + _ = postReady(classifyAgentHealth(st), "late health snapshot (ready already marked)") } return } @@ -1714,6 +1743,59 @@ func fetchAgentCert(ctx context.Context, client *agent.Client, hash string) { log.Printf("[acme] installed cert for *.%s.%s", hash, base) } +// monitorSessionHealth keeps re-posting the live transcode health for the +// rest of the session (F1). Re-posts when the verdict BUCKET flips — only +// after the new bucket holds for 2 consecutive ticks (hysteresis: an EWMA +// dancing around a cutoff like 0.95 must not webhook every 5s) — or when the +// ratio drifts ≥0.15 within the same bucket. Worst case is therefore one +// post per 2 ticks during a genuinely volatile encode, none when steady. A +// failed post keeps the previous baseline so the next tick retries — losing +// the single ok→struggling transition webhook would blind the player for +// the rest of the session. Exits with the session (ctx cancel or registry +// close); no own deadline on purpose — the session lifetime IS the bound. +func monitorSessionHealth(ctx context.Context, hsess *engine.HLSSession, sessionID string, last *agent.SessionHealth, post func(*agent.SessionHealth, string) error) { + const ratioDrift = 0.15 + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + pendingBucket := "" + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + if hsess.IsClosed() { + return + } + st := hsess.GetTranscodeStats() + if st.Samples == 0 { + // Fresh ffmpeg (seek restart re-armed the warmup) — wait for the + // new process's own measurements. + continue + } + h := classifyAgentHealth(st) + if h.Health == last.Health { + pendingBucket = "" + if math.Abs(h.RealtimeRatio-last.RealtimeRatio) < ratioDrift { + continue + } + } else if h.Health != pendingBucket { + // First tick in a different bucket — remember it, post only if + // the next tick still agrees (filters threshold flapping AND any + // residual cold-ramp dip a restart reset didn't fully absorb). + pendingBucket = h.Health + continue + } else { + pendingBucket = "" + } + log.Printf("[hls %s] health %s→%s (speed %.2fx→%.2fx)", agent.ShortID(sessionID), last.Health, h.Health, last.RealtimeRatio, h.RealtimeRatio) + if post(h, "health update") != nil { + continue // baseline unchanged → next tick retries the transition + } + last = h + } +} + // Realtime-ratio cutoffs for classifyAgentHealth. This is a cross-repo contract // with the web bottleneck classifier (src/lib/stream/bottleneck-classifier.ts): // - ≥ realtimeFloor → "ok" (encoder keeps up) diff --git a/internal/engine/hls.go b/internal/engine/hls.go index 74b44bd..c6fb010 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -317,7 +317,12 @@ type HLSSession struct { fpsEWMA float64 speedSamples int warmupSeen int // cold-start frames discarded before the EWMA is trusted - inputBound bool + // Walltime of the LAST source-read error ffmpeg reported. Windowed (see + // hlsInputBoundWindow) instead of a sticky bool: with the F1 continuous + // monitor a single transient read blip (peer drop, debrid hiccup ffmpeg + // reconnects through) must not reclassify every sub-realtime dip as + // "input_bound/struggling" for the rest of a multi-hour session. + inputErrAt time.Time } // hlsSeekAhead is how many segments past the writer's current position the @@ -395,6 +400,12 @@ func (r *HLSSessionRegistry) CloseWhere(pred func(*HLSSession) bool) int { // cache-fill (HLSSessionConfig.Prewarm). cfg is immutable after construction. func (s *HLSSession) IsPrewarm() bool { return s.cfg.Prewarm } +// IsVideoCopy reports whether this session serves -c:v copy (no video +// re-encode). Copy sessions emit no ffmpeg -stats telemetry, so the ready +// watcher posts a one-shot "copy" health heartbeat instead of waiting for +// speed= samples that never arrive. +func (s *HLSSession) IsVideoCopy() bool { return s.cfg.VideoCopy } + // RegisterKeep adds a session WITHOUT displacing the others — the prewarm // path: a background cache-fill encode must not evict the viewer's live // session (Register's eviction killed the stream being watched when the @@ -830,11 +841,16 @@ func (s *HLSSession) GetTranscodeStats() TranscodeStats { SpeedX: s.speedEWMA, Fps: s.fpsEWMA, Samples: s.speedSamples, - InputBound: s.inputBound, + InputBound: !s.inputErrAt.IsZero() && time.Since(s.inputErrAt) < hlsInputBoundWindow, FromCache: s.fromCache, } } +// hlsInputBoundWindow bounds how long a source-read error keeps classifying +// the session as input-bound. Past it, a sub-realtime encode is the encoder's +// own problem again (the transient link blip resolved or ffmpeg reconnected). +const hlsInputBoundWindow = 30 * time.Second + // hlsStatsWarmupSkip is how many leading -stats frames to discard before // trusting the EWMA. ffmpeg's first readings reflect the pipeline filling // (often speed=0.0x) and would otherwise drag a healthy encoder into a false @@ -868,7 +884,22 @@ func (s *HLSSession) recordProgress(speedX, fps float64) { // the input pull (slow debrid link / dropped torrent peer), not the encoder. func (s *HLSSession) markInputBound() { s.statsMu.Lock() - s.inputBound = true + s.inputErrAt = time.Now() + s.statsMu.Unlock() +} + +// resetTranscodeStats re-arms the cold-start warmup and drops the EWMAs + +// input-error mark. MUST be called whenever a NEW ffmpeg process starts +// inside the same session (seek restart, auto-restart supervisor): the new +// process's pipeline-fill frames read speed=0.0x, and folding them into the +// already-warmed EWMA drags a healthy 1.5x encode under the 0.75 struggling +// floor in two samples — which the F1 health monitor would then report as a +// false "struggling" (pausing the player) right at the seek the user made. +func (s *HLSSession) resetTranscodeStats() { + s.statsMu.Lock() + s.warmupSeen = 0 + s.speedSamples = 0 // recordProgress re-seeds the EWMA on the next sample + s.inputErrAt = time.Time{} s.statsMu.Unlock() } @@ -1415,6 +1446,7 @@ func (s *HLSSession) restartFromSegment(targetIdx int) error { } // Reset session state so the poll + wait machinery picks up the new run. + s.resetTranscodeStats() // new ffmpeg = new cold ramp; don't poison the EWMA s.mu.Lock() s.cmd = cmd s.cancel = cancel