From 40e7977cf5d178c8e9e031ad703eafb994c706ed Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Fri, 8 May 2026 08:51:19 +0200 Subject: [PATCH] fix(streaming): bounded ffmpeg auto-restart + tmpdir gc + probe/stderr safety MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reliability hardening pass for the HLS daemon. None of these change the public API, all reduce the chances of an end-user seeing a broken session in production. - engine/hls.go waitFFmpeg now supervises ffmpeg: on a non-graceful exit while the session is still in use, restart from the last good segment up to 3 times within a 60 s window. Beyond that we give up and log the file as broken — better than a perpetually black player with no error. - engine/hls.go CleanupHLSOrphanDirs() removes tmpdirs older than 1 h at startup; cmd/daemon.go calls it before streamSrv.Listen so a daemon crash + restart doesn't leak gigabytes of segment files. - engine/hls.go StartHLSSession wraps ffprobe in a 15 s timeout. A hung probe on a slow remote fs would otherwise block the goroutine forever and the player would stay on "Preparando sesion". - engine/hls.go hlsStderrCapture buffer is capped at 64 KiB; a misbehaving ffmpeg that emits megabytes without newlines used to grow daemon memory unbounded. --- internal/cmd/daemon.go | 6 +++ internal/engine/hls.go | 105 +++++++++++++++++++++++++++++++++++++++-- 2 files changed, 108 insertions(+), 3 deletions(-) diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index b284cc4..9bd0d9b 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -226,6 +226,12 @@ func runDaemonStart() error { // Create persistent stream server streamSrv := engine.NewStreamServer(cfg.Download.StreamPort) + // Reap HLS tmpdirs left over from a previous daemon run before we start + // accepting new sessions. The in-memory registry doesn't survive a + // restart, so without this disk usage grows unbounded across restarts. + if err := engine.CleanupHLSOrphanDirs(); err != nil { + log.Printf("[hls] orphan tmpdir cleanup: %v", err) + } if err := streamSrv.Listen(ctx); err != nil { return fmt.Errorf("start stream server: %w", err) } diff --git a/internal/engine/hls.go b/internal/engine/hls.go index bd0853c..729f3d3 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -57,6 +57,43 @@ func hlsTmpDirRoot() string { return filepath.Join(os.TempDir(), "unarr-hls-sessions") } +// CleanupHLSOrphanDirs removes any per-session tmpdir under hlsTmpDirRoot +// that's older than 1 h. Daemon restart drops the in-memory session +// registry but leaves tmpdirs behind; on the next start we GC them so +// disk usage doesn't grow unbounded across restarts. Sessions started +// less than 1 h ago might still belong to the daemon we're booting (race +// during a quick restart) — leave those alone. +func CleanupHLSOrphanDirs() error { + root := hlsTmpDirRoot() + entries, err := os.ReadDir(root) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + cutoff := time.Now().Add(-1 * time.Hour) + removed := 0 + for _, e := range entries { + if !e.IsDir() { + continue + } + info, err := e.Info() + if err != nil { + continue + } + if info.ModTime().Before(cutoff) { + if err := os.RemoveAll(filepath.Join(root, e.Name())); err == nil { + removed++ + } + } + } + if removed > 0 { + log.Printf("[hls] cleaned %d orphan tmpdir(s) at startup", removed) + } + return nil +} + // HLSSessionConfig describes a single browser playback session driven by HLS. type HLSSessionConfig struct { SessionID string @@ -93,6 +130,8 @@ type HLSSession struct { startedAt time.Time lastTouch time.Time ffmpegSegStart int // index of the first segment the current ffmpeg writes + restartCount int // bounded auto-restart counter (resets on Close) + lastRestartAt time.Time // readyCond + readyMax track which segments ffmpeg has finished writing. // Handlers waiting on a future segment block on readyCond until the @@ -209,7 +248,13 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er return nil, errors.New("hls: ffmpeg/ffprobe not available") } - probe, err := ProbeFile(ctx, cfg.Transcode.FFprobePath, cfg.SourcePath) + // Probe gets a 15 s ceiling. ffprobe on a 50 GB MKV over a slow remote + // fs can hang indefinitely; without a deadline the daemon would block + // the goroutine that started the session forever and the user would + // see the player phase stuck on "Preparando sesión". + probeCtx, cancelProbe := context.WithTimeout(ctx, 15*time.Second) + probe, err := ProbeFile(probeCtx, cfg.Transcode.FFprobePath, cfg.SourcePath) + cancelProbe() if err != nil { return nil, fmt.Errorf("hls: probe: %w", err) } @@ -370,6 +415,11 @@ func (s *HLSSession) Close() error { } // waitFFmpeg reaps the ffmpeg process and records its exit error for handlers. +// +// Auto-restart supervisor: if ffmpeg crashes (non-graceful exit) and the +// session is still in use, we attempt to restart it from the last known +// good segment. Bounded to maxRestarts within restartWindow to avoid +// thrashing on a permanently broken source. func (s *HLSSession) waitFFmpeg() { err := s.cmd.Wait() s.readyMu.Lock() @@ -379,9 +429,47 @@ func (s *HLSSession) waitFFmpeg() { close(s.readyCh) s.readyCh = nil } + readyMax := s.readyMax s.readyMu.Unlock() - if err != nil && !s.isClosed() { - log.Printf("[hls %s] ffmpeg exited: %v", shortHLSID(s.cfg.SessionID), err) + + if err == nil || s.isClosed() { + return + } + log.Printf("[hls %s] ffmpeg exited: %v", shortHLSID(s.cfg.SessionID), err) + + // Decide whether to attempt an auto-restart. We don't restart when: + // - the session was closed externally (kill on quality change etc.) + // - we've already retried 3 times within the last 60 s (broken file) + const maxRestarts = 3 + const restartWindow = 60 * time.Second + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return + } + if !s.lastRestartAt.IsZero() && time.Since(s.lastRestartAt) > restartWindow { + s.restartCount = 0 + } + if s.restartCount >= maxRestarts { + s.mu.Unlock() + log.Printf("[hls %s] giving up after %d auto-restarts", shortHLSID(s.cfg.SessionID), maxRestarts) + return + } + s.restartCount++ + s.lastRestartAt = time.Now() + s.mu.Unlock() + + // Restart from the last segment we know is safely on disk. If readyMax + // is 0 (never produced anything), retry from segment 0 — covers initial + // startup failures on transient errors. + target := readyMax + if target < 0 { + target = 0 + } + log.Printf("[hls %s] auto-restarting from segment %d (attempt %d/%d)", + shortHLSID(s.cfg.SessionID), target, s.restartCount, maxRestarts) + if rerr := s.restartFromSegment(target); rerr != nil { + log.Printf("[hls %s] auto-restart failed: %v", shortHLSID(s.cfg.SessionID), rerr) } } @@ -1013,12 +1101,23 @@ func scaledDimensions(srcW, srcH, capH int) (int, int) { // hlsStderrCapture forwards ffmpeg stderr lines to the daemon log prefixed by // the session ID, so failures are visible without spelunking tmpdirs. +// +// The internal buffer accumulates partial bytes between writes (a single line +// can span multiple Write calls). Capped at maxStderrBuf so a misbehaving +// ffmpeg that emits megabytes without newlines can't grow daemon memory +// unbounded; on overflow we discard the partial line and keep going. type hlsStderrCapture struct { owner *HLSSession buf strings.Builder } +const maxStderrBuf = 64 * 1024 + func (c *hlsStderrCapture) Write(p []byte) (int, error) { + if c.buf.Len()+len(p) > maxStderrBuf { + // Drop the unterminated partial line; we'll resync on the next \n. + c.buf.Reset() + } c.buf.Write(p) for { line, rest, ok := strings.Cut(c.buf.String(), "\n")