feat(agent): session-ready webhook for SSE-driven player handshake (0.9.13)
Some checks failed
Release / release (push) Failing after 3s
Release / docker (push) Has been skipped
Release / virustotal (push) Failing after 0s

Closes Fase 3.3b. Daemon now tells the server the moment a session's
first HLS segment + init.mp4 land on disk; the web side flips
streaming_session.ready_at = NOW(), which its SSE endpoint pushes to
subscribed players so the loading UI flips from "Preparando…" to
"Stream listo" without polling HEAD on the segment URL.

Surface:
  - New Client.MarkSessionReady(ctx, sessionId) HTTP method →
    POST /api/internal/agent/session-ready.
  - New engine.HLSSession.ReadyCount() + FromCache() accessors so the
    watcher goroutine doesn't reach into private state.
  - New cmd.watchSessionReady(ctx, client, hsess, sessionId) goroutine
    polls ReadyCount every 200 ms with a 60 s deadline + short-circuits
    for cache-HIT sessions (ready the moment StartHLSSession returns).
  - Daemon callback spawns it right after streamSrv.HLS().Register so
    the watcher's lifecycle matches the session's.

Best-effort: a transient network failure on the webhook is logged + the
goroutine exits — the player's existing HEAD-probe retry path still
discovers ready state independently. The webhook is an acceleration,
not a hard dependency.
This commit is contained in:
Deivid Soto 2026-05-27 14:40:53 +02:00
parent 4f304fb13a
commit 4ccd37aa5d
5 changed files with 92 additions and 1 deletions

View file

@ -109,6 +109,27 @@ func (c *Client) ReportUpgradeResult(ctx context.Context, agentID string, succes
return nil
}
// MarkSessionReady signals the server that the first HLS segment + init.mp4
// landed on disk for the given session. The web side flips
// streaming_session.ready_at = NOW(), which its SSE endpoint emits to
// subscribed players so the "Preparando…" UI ends without polling HEAD
// on /hls/<id>/master.m3u8.
//
// Best-effort: the server is the source of truth for session state and
// will reach the same conclusion via HEAD probes anyway if this call
// fails. We log the error in the caller but don't retry — by the time
// a retry would land the user is likely already playing.
func (c *Client) MarkSessionReady(ctx context.Context, sessionID string) error {
req := struct {
SessionID string `json:"sessionId"`
}{SessionID: sessionID}
var resp StatusResponse
if err := c.doPost(ctx, "/api/internal/agent/session-ready", req, &resp); err != nil {
return fmt.Errorf("mark session ready: %w", err)
}
return nil
}
// ReportStatus reports download progress. Returns server-side flags the CLI must act on.
func (c *Client) ReportStatus(ctx context.Context, update StatusUpdate) (*StatusResponse, error) {
var resp StatusResponse

View file

@ -612,6 +612,11 @@ func runDaemonStart() error {
return
}
streamSrv.HLS().Register(hsess)
// Tell the server seg-0 is on disk as soon as it lands so the
// player's SSE subscription flips its "Preparando…" UI without
// waiting for the browser HEAD-probe loop to discover it
// independently. Cache-HIT sessions are ready immediately.
go watchSessionReady(hlsCtx, agentClient, hsess, sess.SessionID)
}()
}
@ -940,3 +945,38 @@ func mirrorCORSOrigins(parent context.Context, cfg config.Config, userAgent stri
}
return out
}
// watchSessionReady polls HLSSession.ReadyCount until the first segment +
// init.mp4 are on disk, then POSTs /api/internal/agent/session-ready so
// the web side flips streaming_session.ready_at — which its SSE endpoint
// pushes to subscribed players. Cache-HIT sessions are ready the moment
// StartHLSSession returns and POST immediately.
//
// Bounded by a 60 s deadline so a permanently stuck encoder doesn't keep
// a goroutine alive forever; if seg-0 never lands the player falls back
// to its existing HEAD-probe retry path anyway.
func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.HLSSession, sessionID string) {
deadline := time.Now().Add(60 * time.Second)
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
// Cache HIT or seg-0 ready → notify + done.
if hsess.FromCache() || hsess.ReadyCount() >= 1 {
rctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := client.MarkSessionReady(rctx, sessionID); err != nil {
log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
}
cancel()
return
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if time.Now().After(deadline) {
log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID))
return
}
}
}

View file

@ -1,4 +1,4 @@
package cmd
// Version is the CLI version. Overridden by goreleaser ldflags at release time.
var Version = "0.9.12"
var Version = "0.9.13"

View file

@ -519,6 +519,21 @@ func (s *HLSSession) ProbeInfo() map[string]any {
}
}
// ReadyCount returns how many segments are currently fully on disk.
// Caller can `>= 1` it to check whether seg-0 has landed (and so the
// player can be told to attach). For cache-HIT sessions this is always
// `segmentCount` from the moment StartHLSSession returns.
func (s *HLSSession) ReadyCount() int {
s.readyMu.Lock()
defer s.readyMu.Unlock()
return s.readyMax
}
// FromCache reports whether this session was served from the HLS cache
// (no ffmpeg subprocess spawned). Used by ready-watcher logic to short-
// circuit polling — a cache HIT is ready the moment we return.
func (s *HLSSession) FromCache() bool { return s.fromCache }
// MasterPlaylist returns the rendered master.m3u8 contents.
func (s *HLSSession) MasterPlaylist() string { return s.manifestRoot }