From 2b5a45674a8a6cccd579eda6c4d3a2c90dcc3f59 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Tue, 2 Jun 2026 20:31:49 +0200 Subject: [PATCH] fix(stream): report stream failures via StreamError + retry transient stat MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stream-request failures previously reported status:"failed", which the web treated as a download failure — it left the task unstreamable and surfaced a misleading 20s timeout. Report them through a dedicated StreamError field instead, so the web clears the stream flag and shows the real reason without touching the download status. - StatusUpdate gains StreamError (json: streamError) - OnStreamRequested reports failures via a reportStreamError helper (path rejected, file not found, no video in dir) instead of status:"failed" - os.Stat is retried 3× (300ms) before giving up — NFS can transiently fail (ESTALE/EAGAIN/timeout), the root of the intermittent "works on the 3rd try" - dispatch OnStreamRequested off the sync loop (goroutine): it does blocking I/O (stat retries, ffprobe in SetFile) that would otherwise stall task dispatch + status reporting for other items --- internal/agent/daemon.go | 6 +++- internal/agent/types.go | 5 ++++ internal/cmd/daemon.go | 64 +++++++++++++++++++++------------------- 3 files changed, 44 insertions(+), 31 deletions(-) diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index 53b4d18..9ef9c54 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -246,8 +246,12 @@ func (d *Daemon) Run(ctx context.Context) error { } } d.sync.OnStreamRequest = func(req StreamRequest) { + // Off the sync loop: the handler does blocking I/O (os.Stat retries on + // NFS, then ffprobe in SetFile) — running it inline would stall task + // dispatch + status reporting for other items. The single-stream model + // (atomic SetFile swap, last-wins) tolerates concurrent requests. if d.OnStreamRequested != nil { - d.OnStreamRequested(req) + go d.OnStreamRequested(req) } } d.sync.OnStreamSession = func(sess StreamSession) { diff --git a/internal/agent/types.go b/internal/agent/types.go index 8a18687..58ecd38 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -134,6 +134,11 @@ type StatusUpdate struct { StreamURL string `json:"streamUrl,omitempty"` StreamReady bool `json:"streamReady,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` + // StreamError reports a failed /stream attempt (path rejected, transient + // FS error, etc.) WITHOUT marking the download itself failed — the web + // clears streamRequested + surfaces this so the player fails fast with the + // real reason instead of a 20s "agent didn't respond" timeout. + StreamError string `json:"streamError,omitempty"` // mode=seed_file: agent computes the info_hash from the local file // and reports it back so the web player can target /stream/. InfoHash string `json:"infoHash,omitempty"` diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index f27fbc9..2ffc459 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -563,33 +563,45 @@ func runDaemonStart() error { return } + // reportStreamError tells the web a /stream attempt failed WITHOUT + // marking the download failed (StreamError, not Status). The web clears + // streamRequested and surfaces this so the player fails fast with the + // real reason instead of polling out the 20s "agent didn't respond". + reportStreamError := func(reason string) { + go func() { + if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ + TaskID: sr.TaskID, + StreamError: reason, + }); err != nil { + log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err) + } + }() + } + filePath := filepath.Clean(sr.FilePath) if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) { log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath) - go func() { - if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: sr.TaskID, - Status: "failed", - ErrorMessage: fmt.Sprintf("path outside allowed dirs: %s", filePath), - }); err != nil { - log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err) - } - }() + reportStreamError(fmt.Sprintf("path outside allowed dirs: %s", filePath)) return } - info, err := os.Stat(filePath) - if err != nil { - log.Printf("[%s] stream request: file not found: %s", agent.ShortID(sr.TaskID), filePath) - go func() { - if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: sr.TaskID, - Status: "failed", - ErrorMessage: fmt.Sprintf("file not found: %s", filePath), - }); err != nil { - log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err) - } - }() + // os.Stat over NFS can transiently fail (ESTALE/EAGAIN/timeout) right + // after a remount or under load. Retry a few times before giving up so + // a hiccup doesn't surface as a spurious "file not found" — this is the + // root of the intermittent "works on the 3rd try" stream failures. + var info os.FileInfo + var statErr error + for attempt := 0; attempt < 3; attempt++ { + if info, statErr = os.Stat(filePath); statErr == nil { + break + } + if attempt < 2 { + time.Sleep(300 * time.Millisecond) + } + } + if statErr != nil { + log.Printf("[%s] stream request: file not found after retries: %s (%v)", agent.ShortID(sr.TaskID), filePath, statErr) + reportStreamError(fmt.Sprintf("file not found: %s", filePath)) return } @@ -597,15 +609,7 @@ func runDaemonStart() error { found := engine.FindVideoFile(filePath) if found == "" { log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath) - go func() { - if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: sr.TaskID, - Status: "failed", - ErrorMessage: fmt.Sprintf("no video file in directory: %s", filePath), - }); err != nil { - log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err) - } - }() + reportStreamError(fmt.Sprintf("no video file in directory: %s", filePath)) return } filePath = found