From 59da949a534fdfde235ac83106ab18f7e350738d Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Fri, 12 Jun 2026 09:46:23 +0200 Subject: [PATCH] feat(agent): el auto-update difiere hasta que no haya stream activo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Un auto-update reiniciaba el daemon al momento y cortaba la reproducción en curso (mata la sesión HLS viva → freeze → F5). Ahora el path AUTO (OnUpgrade) difiere indefinido mientras haya streams activos y aplica solo en idle. Ningún update en segundo plano vale cortar un visionado. - HLSSessionRegistry.Count() + playerSessionRegistry.count() → GetActiveStreamCount() = player (HLS/direct/remux) + transcode HLS. - deferAutoUpgradeUntilIdle: guard de un solo waiter, ticker 30s, aplica al llegar a 0 streams. - `unarr update` (manual) SIN cambios: aplica al momento = escape hatch para un fix urgente. - SyncRequest.agentStatus ("updating") reportado antes del restart para que la web pueda avisar en vez de dar error de sesión. --- internal/agent/daemon.go | 53 ++++++++++++++++++++++++- internal/agent/sync.go | 7 ++++ internal/agent/types.go | 4 ++ internal/cmd/daemon.go | 6 +++ internal/cmd/player_session_registry.go | 6 +++ internal/engine/hls.go | 9 +++++ 6 files changed, 84 insertions(+), 1 deletion(-) diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index 93559a8..ae7395b 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -56,6 +56,11 @@ type Daemon struct { OnStreamSession func(sess StreamSession) OnControlAction func(action, taskID string, deleteFiles bool) GetActiveCount func() int // returns number of active downloads (wired from manager) + // GetActiveStreamCount returns the number of live stream sessions (player + + // HLS transcode). Wired from cmd. The graceful AUTO-upgrade path defers + // while this is > 0 so it never cuts a viewer mid-playback; a MANUAL + // `unarr update` ignores it and applies immediately. + GetActiveStreamCount func() int // OnAgentKeyMinted fires when a register reply carries a freshly-minted // per-machine key (the daemon registered with a general/legacy key). cmd // persists it so the next start authenticates with the bound agent key — @@ -68,6 +73,8 @@ type Daemon struct { Info AgentInfo State DaemonState lastNotifiedVersion string + // upgradeDeferring guards a single defer-until-idle waiter for auto-upgrade. + upgradeDeferring atomic.Bool // Managed-VPN split-tunnel state, set by cmd/daemon.go before Run and folded // into DaemonState on every write so external tools (`unarr vpn status`) see it. @@ -285,7 +292,7 @@ func (d *Daemon) Run(ctx context.Context) error { return } log.Printf("[upgrade] new version available: %s — applying auto-upgrade", version) - go d.applyAutoUpgrade(version) + go d.deferAutoUpgradeUntilIdle(version) } d.sync.OnScan = func() { log.Printf("Library scan requested by server") @@ -303,6 +310,9 @@ func (d *Daemon) Run(ctx context.Context) error { d.sync.GetFunnelURL = func() string { return d.funnelURL } + d.sync.GetAgentStatus = func() string { + return d.State.Status + } d.sync.OnSyncSuccess = func() { d.State.LastHeartbeat = time.Now() if d.GetActiveCount != nil { @@ -332,6 +342,40 @@ func (d *Daemon) Deregister() { RemoveState() } +// deferAutoUpgradeUntilIdle holds an AUTO-upgrade until the agent is idle (no +// active stream), then applies it. The user's call: no background update is +// worth cutting a viewer mid-playback. A MANUAL `unarr update` bypasses this +// entirely (see cmd/self_update.go) and is the escape hatch for an urgent fix. +// +// Runs in its own goroutine. A process-lifetime guard keeps exactly ONE waiter +// even though the server re-sends the upgrade signal on every sync. +func (d *Daemon) deferAutoUpgradeUntilIdle(version string) { + if !d.upgradeDeferring.CompareAndSwap(false, true) { + return + } + defer d.upgradeDeferring.Store(false) + + activeStreams := func() int { + if d.GetActiveStreamCount == nil { + return 0 + } + return d.GetActiveStreamCount() + } + + if n := activeStreams(); n > 0 { + log.Printf("[upgrade] v%s deferred — %d active stream(s); will apply when idle", version, n) + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for range ticker.C { + if n := activeStreams(); n == 0 { + break + } + } + log.Printf("[upgrade] no active streams — applying deferred upgrade to v%s", version) + } + d.applyAutoUpgrade(version) // exits the process on success +} + // applyAutoUpgrade downloads the target version and exits so the service // supervisor (systemd Restart=always on Linux) respawns on the new binary. // Triggered by the server's upgrade signal — opt-in flag set by the user from @@ -360,6 +404,13 @@ func (d *Daemon) applyAutoUpgrade(targetVersion string) { return } + // Tell the web we're updating so a NEW playback attempt during the brief + // restart sees "agent updating" instead of a hard session error. One + // heartbeat carries this before the (blocking) download + os.Exit below. + d.State.Status = "updating" + WriteState(&d.State) + d.TriggerSync() + upgrader := &upgrade.Upgrader{ CurrentVersion: currentClean, OnProgress: func(msg string) { diff --git a/internal/agent/sync.go b/internal/agent/sync.go index d725542..b3207ac 100644 --- a/internal/agent/sync.go +++ b/internal/agent/sync.go @@ -59,6 +59,10 @@ type SyncClient struct { // WireGuard tunnel is up, the mode, and the exit server) so the web can track // which agent holds the single WG slot. GetVPNState func() (active bool, mode, server string) + // GetAgentStatus returns the daemon lifecycle state ("running" | "updating" + // | "shutting_down") so the web can show "agent updating" during an upgrade + // restart instead of a hard error. Empty → treated as "running". + GetAgentStatus func() string // GetFunnelURL returns the CloudFlare Quick Tunnel public hostname if one // is active, else "". Sent on every sync so the web picks it up live. GetFunnelURL func() string @@ -217,6 +221,9 @@ func (sc *SyncClient) buildRequest() SyncRequest { if sc.GetVPNState != nil { req.VPNActive, req.VPNMode, req.VPNServer = sc.GetVPNState() } + if sc.GetAgentStatus != nil { + req.AgentStatus = sc.GetAgentStatus() + } if sc.GetFunnelURL != nil { req.FunnelURL = sc.GetFunnelURL() } diff --git a/internal/agent/types.go b/internal/agent/types.go index cb82c36..3d3e363 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -461,6 +461,10 @@ type SyncRequest struct { // IsDocker — see RegisterRequest.IsDocker. Sent every sync so the web keeps // the flag fresh even if the agent migrated binary↔docker between restarts. IsDocker bool `json:"isDocker"` + // AgentStatus — daemon lifecycle state ("running" | "updating" | + // "shutting_down"). Lets the web show "agent updating" during an upgrade + // restart instead of a hard session error. Empty (older agents) → "running". + AgentStatus string `json:"agentStatus,omitempty"` } // ControlAction represents a server-side control signal for a task. diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 6bcc67f..e1ed1c2 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -579,6 +579,12 @@ func runDaemonStart() error { sc.GetFreeSlots = manager.FreeSlots sc.GetTaskStates = manager.TaskStates d.GetActiveCount = manager.ActiveCount + // Live stream count for the graceful auto-upgrade gate: player sessions + // (in-browser HLS / direct-play / remux) + HLS transcode sessions. An auto + // upgrade defers while this is > 0 so it never cuts a viewer mid-playback. + d.GetActiveStreamCount = func() int { + return playerSessionRegistry.count() + streamSrv.HLS().Count() + } // Trigger immediate sync when a download slot frees up manager.OnTaskDone = func() { d.TriggerSync() } diff --git a/internal/cmd/player_session_registry.go b/internal/cmd/player_session_registry.go index e4a134f..80fa2d1 100644 --- a/internal/cmd/player_session_registry.go +++ b/internal/cmd/player_session_registry.go @@ -41,6 +41,12 @@ func (r *playerSessionRegistryT) remove(sessionID string) { delete(r.cancels, sessionID) } +func (r *playerSessionRegistryT) count() int { + r.mu.Lock() + defer r.mu.Unlock() + return len(r.cancels) +} + // cancelAllPlayerSessions cancels every running session. Called on daemon // shutdown so the ffmpeg children and SSE consumers exit cleanly. func cancelAllPlayerSessions() { diff --git a/internal/engine/hls.go b/internal/engine/hls.go index c6fb010..ea097bf 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -437,6 +437,15 @@ func (r *HLSSessionRegistry) HasLiveEncode() bool { return false } +// Count reports how many sessions are currently registered (live or recently +// finished but not yet swept). Used by the graceful auto-upgrade gate to defer +// applying an update while the agent is actively streaming. +func (r *HLSSessionRegistry) Count() int { + r.mu.RLock() + defer r.mu.RUnlock() + return len(r.sessions) +} + // Remove drops a session from the registry without closing it. func (r *HLSSessionRegistry) Remove(id string) { r.mu.Lock()