fix(stream): report stream failures via StreamError + retry transient stat
Stream-request failures previously reported status:"failed", which the web treated as a download failure — it left the task unstreamable and surfaced a misleading 20s timeout. Report them through a dedicated StreamError field instead, so the web clears the stream flag and shows the real reason without touching the download status. - StatusUpdate gains StreamError (json: streamError) - OnStreamRequested reports failures via a reportStreamError helper (path rejected, file not found, no video in dir) instead of status:"failed" - os.Stat is retried 3× (300ms) before giving up — NFS can transiently fail (ESTALE/EAGAIN/timeout), the root of the intermittent "works on the 3rd try" - dispatch OnStreamRequested off the sync loop (goroutine): it does blocking I/O (stat retries, ffprobe in SetFile) that would otherwise stall task dispatch + status reporting for other items
This commit is contained in:
parent
f7ea06c70a
commit
2b5a45674a
3 changed files with 44 additions and 31 deletions
|
|
@ -246,8 +246,12 @@ func (d *Daemon) Run(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.sync.OnStreamRequest = func(req StreamRequest) {
|
d.sync.OnStreamRequest = func(req StreamRequest) {
|
||||||
|
// Off the sync loop: the handler does blocking I/O (os.Stat retries on
|
||||||
|
// NFS, then ffprobe in SetFile) — running it inline would stall task
|
||||||
|
// dispatch + status reporting for other items. The single-stream model
|
||||||
|
// (atomic SetFile swap, last-wins) tolerates concurrent requests.
|
||||||
if d.OnStreamRequested != nil {
|
if d.OnStreamRequested != nil {
|
||||||
d.OnStreamRequested(req)
|
go d.OnStreamRequested(req)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.sync.OnStreamSession = func(sess StreamSession) {
|
d.sync.OnStreamSession = func(sess StreamSession) {
|
||||||
|
|
|
||||||
|
|
@ -134,6 +134,11 @@ type StatusUpdate struct {
|
||||||
StreamURL string `json:"streamUrl,omitempty"`
|
StreamURL string `json:"streamUrl,omitempty"`
|
||||||
StreamReady bool `json:"streamReady,omitempty"`
|
StreamReady bool `json:"streamReady,omitempty"`
|
||||||
ErrorMessage string `json:"errorMessage,omitempty"`
|
ErrorMessage string `json:"errorMessage,omitempty"`
|
||||||
|
// StreamError reports a failed /stream attempt (path rejected, transient
|
||||||
|
// FS error, etc.) WITHOUT marking the download itself failed — the web
|
||||||
|
// clears streamRequested + surfaces this so the player fails fast with the
|
||||||
|
// real reason instead of a 20s "agent didn't respond" timeout.
|
||||||
|
StreamError string `json:"streamError,omitempty"`
|
||||||
// mode=seed_file: agent computes the info_hash from the local file
|
// mode=seed_file: agent computes the info_hash from the local file
|
||||||
// and reports it back so the web player can target /stream/<hash>.
|
// and reports it back so the web player can target /stream/<hash>.
|
||||||
InfoHash string `json:"infoHash,omitempty"`
|
InfoHash string `json:"infoHash,omitempty"`
|
||||||
|
|
|
||||||
|
|
@ -563,33 +563,45 @@ func runDaemonStart() error {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reportStreamError tells the web a /stream attempt failed WITHOUT
|
||||||
|
// marking the download failed (StreamError, not Status). The web clears
|
||||||
|
// streamRequested and surfaces this so the player fails fast with the
|
||||||
|
// real reason instead of polling out the 20s "agent didn't respond".
|
||||||
|
reportStreamError := func(reason string) {
|
||||||
|
go func() {
|
||||||
|
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
||||||
|
TaskID: sr.TaskID,
|
||||||
|
StreamError: reason,
|
||||||
|
}); err != nil {
|
||||||
|
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
filePath := filepath.Clean(sr.FilePath)
|
filePath := filepath.Clean(sr.FilePath)
|
||||||
if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath,
|
if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath,
|
||||||
cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) {
|
cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) {
|
||||||
log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath)
|
log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath)
|
||||||
go func() {
|
reportStreamError(fmt.Sprintf("path outside allowed dirs: %s", filePath))
|
||||||
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
|
||||||
TaskID: sr.TaskID,
|
|
||||||
Status: "failed",
|
|
||||||
ErrorMessage: fmt.Sprintf("path outside allowed dirs: %s", filePath),
|
|
||||||
}); err != nil {
|
|
||||||
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
info, err := os.Stat(filePath)
|
// os.Stat over NFS can transiently fail (ESTALE/EAGAIN/timeout) right
|
||||||
if err != nil {
|
// after a remount or under load. Retry a few times before giving up so
|
||||||
log.Printf("[%s] stream request: file not found: %s", agent.ShortID(sr.TaskID), filePath)
|
// a hiccup doesn't surface as a spurious "file not found" — this is the
|
||||||
go func() {
|
// root of the intermittent "works on the 3rd try" stream failures.
|
||||||
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
var info os.FileInfo
|
||||||
TaskID: sr.TaskID,
|
var statErr error
|
||||||
Status: "failed",
|
for attempt := 0; attempt < 3; attempt++ {
|
||||||
ErrorMessage: fmt.Sprintf("file not found: %s", filePath),
|
if info, statErr = os.Stat(filePath); statErr == nil {
|
||||||
}); err != nil {
|
break
|
||||||
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
|
||||||
}
|
}
|
||||||
}()
|
if attempt < 2 {
|
||||||
|
time.Sleep(300 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if statErr != nil {
|
||||||
|
log.Printf("[%s] stream request: file not found after retries: %s (%v)", agent.ShortID(sr.TaskID), filePath, statErr)
|
||||||
|
reportStreamError(fmt.Sprintf("file not found: %s", filePath))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -597,15 +609,7 @@ func runDaemonStart() error {
|
||||||
found := engine.FindVideoFile(filePath)
|
found := engine.FindVideoFile(filePath)
|
||||||
if found == "" {
|
if found == "" {
|
||||||
log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath)
|
log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath)
|
||||||
go func() {
|
reportStreamError(fmt.Sprintf("no video file in directory: %s", filePath))
|
||||||
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
|
||||||
TaskID: sr.TaskID,
|
|
||||||
Status: "failed",
|
|
||||||
ErrorMessage: fmt.Sprintf("no video file in directory: %s", filePath),
|
|
||||||
}); err != nil {
|
|
||||||
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
filePath = found
|
filePath = found
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue