diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index 53b4d18..9ef9c54 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -246,8 +246,12 @@ func (d *Daemon) Run(ctx context.Context) error { } } d.sync.OnStreamRequest = func(req StreamRequest) { + // Off the sync loop: the handler does blocking I/O (os.Stat retries on + // NFS, then ffprobe in SetFile) — running it inline would stall task + // dispatch + status reporting for other items. The single-stream model + // (atomic SetFile swap, last-wins) tolerates concurrent requests. if d.OnStreamRequested != nil { - d.OnStreamRequested(req) + go d.OnStreamRequested(req) } } d.sync.OnStreamSession = func(sess StreamSession) { diff --git a/internal/agent/types.go b/internal/agent/types.go index 8a18687..58ecd38 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -134,6 +134,11 @@ type StatusUpdate struct { StreamURL string `json:"streamUrl,omitempty"` StreamReady bool `json:"streamReady,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` + // StreamError reports a failed /stream attempt (path rejected, transient + // FS error, etc.) WITHOUT marking the download itself failed — the web + // clears streamRequested + surfaces this so the player fails fast with the + // real reason instead of a 20s "agent didn't respond" timeout. + StreamError string `json:"streamError,omitempty"` // mode=seed_file: agent computes the info_hash from the local file // and reports it back so the web player can target /stream/. InfoHash string `json:"infoHash,omitempty"` diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index f27fbc9..2ffc459 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -563,33 +563,45 @@ func runDaemonStart() error { return } + // reportStreamError tells the web a /stream attempt failed WITHOUT + // marking the download failed (StreamError, not Status). The web clears + // streamRequested and surfaces this so the player fails fast with the + // real reason instead of polling out the 20s "agent didn't respond". + reportStreamError := func(reason string) { + go func() { + if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ + TaskID: sr.TaskID, + StreamError: reason, + }); err != nil { + log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err) + } + }() + } + filePath := filepath.Clean(sr.FilePath) if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) { log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath) - go func() { - if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: sr.TaskID, - Status: "failed", - ErrorMessage: fmt.Sprintf("path outside allowed dirs: %s", filePath), - }); err != nil { - log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err) - } - }() + reportStreamError(fmt.Sprintf("path outside allowed dirs: %s", filePath)) return } - info, err := os.Stat(filePath) - if err != nil { - log.Printf("[%s] stream request: file not found: %s", agent.ShortID(sr.TaskID), filePath) - go func() { - if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: sr.TaskID, - Status: "failed", - ErrorMessage: fmt.Sprintf("file not found: %s", filePath), - }); err != nil { - log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err) - } - }() + // os.Stat over NFS can transiently fail (ESTALE/EAGAIN/timeout) right + // after a remount or under load. Retry a few times before giving up so + // a hiccup doesn't surface as a spurious "file not found" — this is the + // root of the intermittent "works on the 3rd try" stream failures. + var info os.FileInfo + var statErr error + for attempt := 0; attempt < 3; attempt++ { + if info, statErr = os.Stat(filePath); statErr == nil { + break + } + if attempt < 2 { + time.Sleep(300 * time.Millisecond) + } + } + if statErr != nil { + log.Printf("[%s] stream request: file not found after retries: %s (%v)", agent.ShortID(sr.TaskID), filePath, statErr) + reportStreamError(fmt.Sprintf("file not found: %s", filePath)) return } @@ -597,15 +609,7 @@ func runDaemonStart() error { found := engine.FindVideoFile(filePath) if found == "" { log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath) - go func() { - if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: sr.TaskID, - Status: "failed", - ErrorMessage: fmt.Sprintf("no video file in directory: %s", filePath), - }); err != nil { - log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err) - } - }() + reportStreamError(fmt.Sprintf("no video file in directory: %s", filePath)) return } filePath = found