From 4ccd37aa5d0e45231c126042018bdf73e5042481 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 27 May 2026 14:40:53 +0200 Subject: [PATCH] feat(agent): session-ready webhook for SSE-driven player handshake (0.9.13) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes Fase 3.3b. Daemon now tells the server the moment a session's first HLS segment + init.mp4 land on disk; the web side flips streaming_session.ready_at = NOW(), which its SSE endpoint pushes to subscribed players so the loading UI flips from "Preparando…" to "Stream listo" without polling HEAD on the segment URL. Surface: - New Client.MarkSessionReady(ctx, sessionId) HTTP method → POST /api/internal/agent/session-ready. - New engine.HLSSession.ReadyCount() + FromCache() accessors so the watcher goroutine doesn't reach into private state. - New cmd.watchSessionReady(ctx, client, hsess, sessionId) goroutine polls ReadyCount every 200 ms with a 60 s deadline + short-circuits for cache-HIT sessions (ready the moment StartHLSSession returns). - Daemon callback spawns it right after streamSrv.HLS().Register so the watcher's lifecycle matches the session's. Best-effort: a transient network failure on the webhook is logged + the goroutine exits — the player's existing HEAD-probe retry path still discovers ready state independently. The webhook is an acceleration, not a hard dependency. --- CHANGELOG.md | 15 +++++++++++++++ internal/agent/client.go | 21 +++++++++++++++++++++ internal/cmd/daemon.go | 40 ++++++++++++++++++++++++++++++++++++++++ internal/cmd/version.go | 2 +- internal/engine/hls.go | 15 +++++++++++++++ 5 files changed, 92 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d75ac7..c8681bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.13] - 2026-05-27 + +### Added + +- **Session-ready webhook** (`/api/internal/agent/session-ready`). Daemon + watches every new HLSSession's segment counter and, the moment seg-0 + + init.mp4 land on disk, POSTs the sessionId to the server. The web side + flips `streaming_session.ready_at = NOW()`, which its new SSE endpoint + pushes to subscribed players so the "Preparando…" UI flips to + "Stream listo" without waiting for the player's HEAD-probe retry loop + to discover it. Cache-HIT sessions fire the webhook immediately on + StartHLSSession return. +- `engine.HLSSession.ReadyCount()` + `FromCache()` accessors so the + ready-watcher goroutine doesn't reach into private state. + ## [0.9.12] - 2026-05-27 ### Added diff --git a/internal/agent/client.go b/internal/agent/client.go index e60b0a4..e7f2c37 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -109,6 +109,27 @@ func (c *Client) ReportUpgradeResult(ctx context.Context, agentID string, succes return nil } +// MarkSessionReady signals the server that the first HLS segment + init.mp4 +// landed on disk for the given session. The web side flips +// streaming_session.ready_at = NOW(), which its SSE endpoint emits to +// subscribed players so the "Preparando…" UI ends without polling HEAD +// on /hls//master.m3u8. +// +// Best-effort: the server is the source of truth for session state and +// will reach the same conclusion via HEAD probes anyway if this call +// fails. We log the error in the caller but don't retry — by the time +// a retry would land the user is likely already playing. +func (c *Client) MarkSessionReady(ctx context.Context, sessionID string) error { + req := struct { + SessionID string `json:"sessionId"` + }{SessionID: sessionID} + var resp StatusResponse + if err := c.doPost(ctx, "/api/internal/agent/session-ready", req, &resp); err != nil { + return fmt.Errorf("mark session ready: %w", err) + } + return nil +} + // ReportStatus reports download progress. Returns server-side flags the CLI must act on. func (c *Client) ReportStatus(ctx context.Context, update StatusUpdate) (*StatusResponse, error) { var resp StatusResponse diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 668ecff..be66858 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -612,6 +612,11 @@ func runDaemonStart() error { return } streamSrv.HLS().Register(hsess) + // Tell the server seg-0 is on disk as soon as it lands so the + // player's SSE subscription flips its "Preparando…" UI without + // waiting for the browser HEAD-probe loop to discover it + // independently. Cache-HIT sessions are ready immediately. + go watchSessionReady(hlsCtx, agentClient, hsess, sess.SessionID) }() } @@ -940,3 +945,38 @@ func mirrorCORSOrigins(parent context.Context, cfg config.Config, userAgent stri } return out } + +// watchSessionReady polls HLSSession.ReadyCount until the first segment + +// init.mp4 are on disk, then POSTs /api/internal/agent/session-ready so +// the web side flips streaming_session.ready_at — which its SSE endpoint +// pushes to subscribed players. Cache-HIT sessions are ready the moment +// StartHLSSession returns and POST immediately. +// +// Bounded by a 60 s deadline so a permanently stuck encoder doesn't keep +// a goroutine alive forever; if seg-0 never lands the player falls back +// to its existing HEAD-probe retry path anyway. +func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.HLSSession, sessionID string) { + deadline := time.Now().Add(60 * time.Second) + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + for { + // Cache HIT or seg-0 ready → notify + done. + if hsess.FromCache() || hsess.ReadyCount() >= 1 { + rctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if err := client.MarkSessionReady(rctx, sessionID); err != nil { + log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err) + } + cancel() + return + } + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + if time.Now().After(deadline) { + log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID)) + return + } + } +} diff --git a/internal/cmd/version.go b/internal/cmd/version.go index f4f3f21..efb6b30 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.9.12" +var Version = "0.9.13" diff --git a/internal/engine/hls.go b/internal/engine/hls.go index 634f193..4938c11 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -519,6 +519,21 @@ func (s *HLSSession) ProbeInfo() map[string]any { } } +// ReadyCount returns how many segments are currently fully on disk. +// Caller can `>= 1` it to check whether seg-0 has landed (and so the +// player can be told to attach). For cache-HIT sessions this is always +// `segmentCount` from the moment StartHLSSession returns. +func (s *HLSSession) ReadyCount() int { + s.readyMu.Lock() + defer s.readyMu.Unlock() + return s.readyMax +} + +// FromCache reports whether this session was served from the HLS cache +// (no ffmpeg subprocess spawned). Used by ready-watcher logic to short- +// circuit polling — a cache HIT is ready the moment we return. +func (s *HLSSession) FromCache() bool { return s.fromCache } + // MasterPlaylist returns the rendered master.m3u8 contents. func (s *HLSSession) MasterPlaylist() string { return s.manifestRoot }