fix(streaming): bounded ffmpeg auto-restart + tmpdir gc + probe/stderr safety

Reliability hardening pass for the HLS daemon. None of these change the
public API, all reduce the chances of an end-user seeing a broken
session in production.

- engine/hls.go waitFFmpeg now supervises ffmpeg: on a non-graceful
  exit while the session is still in use, restart from the last good
  segment up to 3 times within a 60 s window. Beyond that we give up
  and log the file as broken — better than a perpetually black player
  with no error.
- engine/hls.go CleanupHLSOrphanDirs() removes tmpdirs older than 1 h
  at startup; cmd/daemon.go calls it before streamSrv.Listen so a
  daemon crash + restart doesn't leak gigabytes of segment files.
- engine/hls.go StartHLSSession wraps ffprobe in a 15 s timeout. A
  hung probe on a slow remote fs would otherwise block the goroutine
  forever and the player would stay on "Preparando sesion".
- engine/hls.go hlsStderrCapture buffer is capped at 64 KiB; a
  misbehaving ffmpeg that emits megabytes without newlines used to
  grow daemon memory unbounded.
This commit is contained in:
Deivid Soto 2026-05-08 08:51:19 +02:00
parent eb2548f9a6
commit 40e7977cf5
2 changed files with 108 additions and 3 deletions

View file

@ -226,6 +226,12 @@ func runDaemonStart() error {
// Create persistent stream server
streamSrv := engine.NewStreamServer(cfg.Download.StreamPort)
// Reap HLS tmpdirs left over from a previous daemon run before we start
// accepting new sessions. The in-memory registry doesn't survive a
// restart, so without this disk usage grows unbounded across restarts.
if err := engine.CleanupHLSOrphanDirs(); err != nil {
log.Printf("[hls] orphan tmpdir cleanup: %v", err)
}
if err := streamSrv.Listen(ctx); err != nil {
return fmt.Errorf("start stream server: %w", err)
}

View file

@ -57,6 +57,43 @@ func hlsTmpDirRoot() string {
return filepath.Join(os.TempDir(), "unarr-hls-sessions")
}
// CleanupHLSOrphanDirs removes any per-session tmpdir under hlsTmpDirRoot
// that's older than 1 h. Daemon restart drops the in-memory session
// registry but leaves tmpdirs behind; on the next start we GC them so
// disk usage doesn't grow unbounded across restarts. Sessions started
// less than 1 h ago might still belong to the daemon we're booting (race
// during a quick restart) — leave those alone.
func CleanupHLSOrphanDirs() error {
root := hlsTmpDirRoot()
entries, err := os.ReadDir(root)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
cutoff := time.Now().Add(-1 * time.Hour)
removed := 0
for _, e := range entries {
if !e.IsDir() {
continue
}
info, err := e.Info()
if err != nil {
continue
}
if info.ModTime().Before(cutoff) {
if err := os.RemoveAll(filepath.Join(root, e.Name())); err == nil {
removed++
}
}
}
if removed > 0 {
log.Printf("[hls] cleaned %d orphan tmpdir(s) at startup", removed)
}
return nil
}
// HLSSessionConfig describes a single browser playback session driven by HLS.
type HLSSessionConfig struct {
SessionID string
@ -93,6 +130,8 @@ type HLSSession struct {
startedAt time.Time
lastTouch time.Time
ffmpegSegStart int // index of the first segment the current ffmpeg writes
restartCount int // bounded auto-restart counter (resets on Close)
lastRestartAt time.Time
// readyCond + readyMax track which segments ffmpeg has finished writing.
// Handlers waiting on a future segment block on readyCond until the
@ -209,7 +248,13 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er
return nil, errors.New("hls: ffmpeg/ffprobe not available")
}
probe, err := ProbeFile(ctx, cfg.Transcode.FFprobePath, cfg.SourcePath)
// Probe gets a 15 s ceiling. ffprobe on a 50 GB MKV over a slow remote
// fs can hang indefinitely; without a deadline the daemon would block
// the goroutine that started the session forever and the user would
// see the player phase stuck on "Preparando sesión".
probeCtx, cancelProbe := context.WithTimeout(ctx, 15*time.Second)
probe, err := ProbeFile(probeCtx, cfg.Transcode.FFprobePath, cfg.SourcePath)
cancelProbe()
if err != nil {
return nil, fmt.Errorf("hls: probe: %w", err)
}
@ -370,6 +415,11 @@ func (s *HLSSession) Close() error {
}
// waitFFmpeg reaps the ffmpeg process and records its exit error for handlers.
//
// Auto-restart supervisor: if ffmpeg crashes (non-graceful exit) and the
// session is still in use, we attempt to restart it from the last known
// good segment. Bounded to maxRestarts within restartWindow to avoid
// thrashing on a permanently broken source.
func (s *HLSSession) waitFFmpeg() {
err := s.cmd.Wait()
s.readyMu.Lock()
@ -379,9 +429,47 @@ func (s *HLSSession) waitFFmpeg() {
close(s.readyCh)
s.readyCh = nil
}
readyMax := s.readyMax
s.readyMu.Unlock()
if err != nil && !s.isClosed() {
log.Printf("[hls %s] ffmpeg exited: %v", shortHLSID(s.cfg.SessionID), err)
if err == nil || s.isClosed() {
return
}
log.Printf("[hls %s] ffmpeg exited: %v", shortHLSID(s.cfg.SessionID), err)
// Decide whether to attempt an auto-restart. We don't restart when:
// - the session was closed externally (kill on quality change etc.)
// - we've already retried 3 times within the last 60 s (broken file)
const maxRestarts = 3
const restartWindow = 60 * time.Second
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return
}
if !s.lastRestartAt.IsZero() && time.Since(s.lastRestartAt) > restartWindow {
s.restartCount = 0
}
if s.restartCount >= maxRestarts {
s.mu.Unlock()
log.Printf("[hls %s] giving up after %d auto-restarts", shortHLSID(s.cfg.SessionID), maxRestarts)
return
}
s.restartCount++
s.lastRestartAt = time.Now()
s.mu.Unlock()
// Restart from the last segment we know is safely on disk. If readyMax
// is 0 (never produced anything), retry from segment 0 — covers initial
// startup failures on transient errors.
target := readyMax
if target < 0 {
target = 0
}
log.Printf("[hls %s] auto-restarting from segment %d (attempt %d/%d)",
shortHLSID(s.cfg.SessionID), target, s.restartCount, maxRestarts)
if rerr := s.restartFromSegment(target); rerr != nil {
log.Printf("[hls %s] auto-restart failed: %v", shortHLSID(s.cfg.SessionID), rerr)
}
}
@ -1013,12 +1101,23 @@ func scaledDimensions(srcW, srcH, capH int) (int, int) {
// hlsStderrCapture forwards ffmpeg stderr lines to the daemon log prefixed by
// the session ID, so failures are visible without spelunking tmpdirs.
//
// The internal buffer accumulates partial bytes between writes (a single line
// can span multiple Write calls). Capped at maxStderrBuf so a misbehaving
// ffmpeg that emits megabytes without newlines can't grow daemon memory
// unbounded; on overflow we discard the partial line and keep going.
type hlsStderrCapture struct {
owner *HLSSession
buf strings.Builder
}
const maxStderrBuf = 64 * 1024
func (c *hlsStderrCapture) Write(p []byte) (int, error) {
if c.buf.Len()+len(p) > maxStderrBuf {
// Drop the unterminated partial line; we'll resync on the next \n.
c.buf.Reset()
}
c.buf.Write(p)
for {
line, rest, ok := strings.Cut(c.buf.String(), "\n")