feat(agent): session-ready webhook for SSE-driven player handshake (0.9.13)
Closes Fase 3.3b. Daemon now tells the server the moment a session's
first HLS segment + init.mp4 land on disk; the web side flips
streaming_session.ready_at = NOW(), which its SSE endpoint pushes to
subscribed players so the loading UI flips from "Preparando…" to
"Stream listo" without polling HEAD on the segment URL.
Surface:
- New Client.MarkSessionReady(ctx, sessionId) HTTP method →
POST /api/internal/agent/session-ready.
- New engine.HLSSession.ReadyCount() + FromCache() accessors so the
watcher goroutine doesn't reach into private state.
- New cmd.watchSessionReady(ctx, client, hsess, sessionId) goroutine
polls ReadyCount every 200 ms with a 60 s deadline + short-circuits
for cache-HIT sessions (ready the moment StartHLSSession returns).
- Daemon callback spawns it right after streamSrv.HLS().Register so
the watcher's lifecycle matches the session's.
Best-effort: a transient network failure on the webhook is logged + the
goroutine exits — the player's existing HEAD-probe retry path still
discovers ready state independently. The webhook is an acceleration,
not a hard dependency.
This commit is contained in:
parent
d913e66527
commit
03fe5ca54a
5 changed files with 92 additions and 1 deletions
15
CHANGELOG.md
15
CHANGELOG.md
|
|
@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
|
|||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.9.13] - 2026-05-27
|
||||
|
||||
### Added
|
||||
|
||||
- **Session-ready webhook** (`/api/internal/agent/session-ready`). Daemon
|
||||
watches every new HLSSession's segment counter and, the moment seg-0 +
|
||||
init.mp4 land on disk, POSTs the sessionId to the server. The web side
|
||||
flips `streaming_session.ready_at = NOW()`, which its new SSE endpoint
|
||||
pushes to subscribed players so the "Preparando…" UI flips to
|
||||
"Stream listo" without waiting for the player's HEAD-probe retry loop
|
||||
to discover it. Cache-HIT sessions fire the webhook immediately on
|
||||
StartHLSSession return.
|
||||
- `engine.HLSSession.ReadyCount()` + `FromCache()` accessors so the
|
||||
ready-watcher goroutine doesn't reach into private state.
|
||||
|
||||
## [0.9.12] - 2026-05-27
|
||||
|
||||
### Added
|
||||
|
|
|
|||
|
|
@ -109,6 +109,27 @@ func (c *Client) ReportUpgradeResult(ctx context.Context, agentID string, succes
|
|||
return nil
|
||||
}
|
||||
|
||||
// MarkSessionReady signals the server that the first HLS segment + init.mp4
|
||||
// landed on disk for the given session. The web side flips
|
||||
// streaming_session.ready_at = NOW(), which its SSE endpoint emits to
|
||||
// subscribed players so the "Preparando…" UI ends without polling HEAD
|
||||
// on /hls/<id>/master.m3u8.
|
||||
//
|
||||
// Best-effort: the server is the source of truth for session state and
|
||||
// will reach the same conclusion via HEAD probes anyway if this call
|
||||
// fails. We log the error in the caller but don't retry — by the time
|
||||
// a retry would land the user is likely already playing.
|
||||
func (c *Client) MarkSessionReady(ctx context.Context, sessionID string) error {
|
||||
req := struct {
|
||||
SessionID string `json:"sessionId"`
|
||||
}{SessionID: sessionID}
|
||||
var resp StatusResponse
|
||||
if err := c.doPost(ctx, "/api/internal/agent/session-ready", req, &resp); err != nil {
|
||||
return fmt.Errorf("mark session ready: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReportStatus reports download progress. Returns server-side flags the CLI must act on.
|
||||
func (c *Client) ReportStatus(ctx context.Context, update StatusUpdate) (*StatusResponse, error) {
|
||||
var resp StatusResponse
|
||||
|
|
|
|||
|
|
@ -612,6 +612,11 @@ func runDaemonStart() error {
|
|||
return
|
||||
}
|
||||
streamSrv.HLS().Register(hsess)
|
||||
// Tell the server seg-0 is on disk as soon as it lands so the
|
||||
// player's SSE subscription flips its "Preparando…" UI without
|
||||
// waiting for the browser HEAD-probe loop to discover it
|
||||
// independently. Cache-HIT sessions are ready immediately.
|
||||
go watchSessionReady(hlsCtx, agentClient, hsess, sess.SessionID)
|
||||
}()
|
||||
}
|
||||
|
||||
|
|
@ -940,3 +945,38 @@ func mirrorCORSOrigins(parent context.Context, cfg config.Config, userAgent stri
|
|||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// watchSessionReady polls HLSSession.ReadyCount until the first segment +
|
||||
// init.mp4 are on disk, then POSTs /api/internal/agent/session-ready so
|
||||
// the web side flips streaming_session.ready_at — which its SSE endpoint
|
||||
// pushes to subscribed players. Cache-HIT sessions are ready the moment
|
||||
// StartHLSSession returns and POST immediately.
|
||||
//
|
||||
// Bounded by a 60 s deadline so a permanently stuck encoder doesn't keep
|
||||
// a goroutine alive forever; if seg-0 never lands the player falls back
|
||||
// to its existing HEAD-probe retry path anyway.
|
||||
func watchSessionReady(ctx context.Context, client *agent.Client, hsess *engine.HLSSession, sessionID string) {
|
||||
deadline := time.Now().Add(60 * time.Second)
|
||||
ticker := time.NewTicker(200 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
// Cache HIT or seg-0 ready → notify + done.
|
||||
if hsess.FromCache() || hsess.ReadyCount() >= 1 {
|
||||
rctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
if err := client.MarkSessionReady(rctx, sessionID); err != nil {
|
||||
log.Printf("[hls %s] mark-ready failed: %v", agent.ShortID(sessionID), err)
|
||||
}
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
log.Printf("[hls %s] mark-ready: timeout waiting for seg-0", agent.ShortID(sessionID))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package cmd
|
||||
|
||||
// Version is the CLI version. Overridden by goreleaser ldflags at release time.
|
||||
var Version = "0.9.12"
|
||||
var Version = "0.9.13"
|
||||
|
|
|
|||
|
|
@ -519,6 +519,21 @@ func (s *HLSSession) ProbeInfo() map[string]any {
|
|||
}
|
||||
}
|
||||
|
||||
// ReadyCount returns how many segments are currently fully on disk.
|
||||
// Caller can `>= 1` it to check whether seg-0 has landed (and so the
|
||||
// player can be told to attach). For cache-HIT sessions this is always
|
||||
// `segmentCount` from the moment StartHLSSession returns.
|
||||
func (s *HLSSession) ReadyCount() int {
|
||||
s.readyMu.Lock()
|
||||
defer s.readyMu.Unlock()
|
||||
return s.readyMax
|
||||
}
|
||||
|
||||
// FromCache reports whether this session was served from the HLS cache
|
||||
// (no ffmpeg subprocess spawned). Used by ready-watcher logic to short-
|
||||
// circuit polling — a cache HIT is ready the moment we return.
|
||||
func (s *HLSSession) FromCache() bool { return s.fromCache }
|
||||
|
||||
// MasterPlaylist returns the rendered master.m3u8 contents.
|
||||
func (s *HLSSession) MasterPlaylist() string { return s.manifestRoot }
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue