fix(progress): always report status transitions and poll for control signals

This commit is contained in:
Deivid Soto 2026-03-31 16:55:50 +02:00
parent 763e267bf8
commit 01d62ffa13
5 changed files with 122 additions and 26 deletions

View file

@ -136,6 +136,10 @@ func runDaemonStart() error {
if heartbeatInterval == 0 {
heartbeatInterval = 30 * time.Second
}
statusInterval, _ := time.ParseDuration(cfg.Daemon.StatusInterval)
if statusInterval == 0 {
statusInterval = 3 * time.Second
}
userAgent := "unarr/" + Version
@ -171,8 +175,9 @@ func runDaemonStart() error {
d := agent.NewDaemon(daemonCfg, transport)
// Create progress reporter using transport
reporter := engine.NewProgressReporterWithTransport(transport, 3*time.Second)
reporter.SetWatchingFunc(func() bool { return d.Watching })
reporter := engine.NewProgressReporterWithTransport(transport, statusInterval)
reporter.SetWatchingFunc(func() bool { return d.Watching.Load() })
reporter.SetWatchingChangedHandler(func(watching bool) { d.Watching.Store(watching) })
// Parse speed limits
maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed)
@ -270,6 +275,8 @@ func runDaemonStart() error {
d.OnTasksClaimed = func(tasks []agent.Task) {
for _, t := range tasks {
if t.Mode == "stream" {
// Only 1 stream at a time: cancel all existing streams
cancelAllStreams()
go handleStreamTask(ctx, t, reporter, cfg)
} else if t.ForceStart || manager.HasCapacity() {
manager.Submit(ctx, t)
@ -281,20 +288,28 @@ func runDaemonStart() error {
// Wire: stream requests for completed downloads → serve file from disk
d.OnStreamRequested = func(sr agent.StreamRequest) {
// Check if already streaming this task
streamRegistry.mu.Lock()
_, exists := streamRegistry.servers[sr.TaskID]
streamRegistry.mu.Unlock()
if exists {
// Only 1 stream at a time: cancel all existing streams
cancelAllStreams()
filePath := sr.FilePath
info, err := os.Stat(filePath)
if err != nil {
log.Printf("[%s] stream request: file not found: %s", sr.TaskID[:8], filePath)
return
}
if _, err := os.Stat(sr.FilePath); err != nil {
log.Printf("[%s] stream request: file not found: %s", sr.TaskID[:8], sr.FilePath)
return
// If filePath is a directory, find the largest video file inside
if info.IsDir() {
found := engine.FindVideoFile(filePath)
if found == "" {
log.Printf("[%s] stream request: no video file in directory: %s", sr.TaskID[:8], filePath)
return
}
filePath = found
log.Printf("[%s] resolved directory to video file: %s", sr.TaskID[:8], filepath.Base(filePath))
}
srv := engine.NewStreamServerFromDisk(sr.FilePath, 0)
srv := engine.NewStreamServerFromDisk(filePath, cfg.Download.StreamPort)
streamURL, err := srv.Start(context.Background())
if err != nil {
log.Printf("[%s] stream failed: %v", sr.TaskID[:8], err)
@ -316,6 +331,24 @@ func runDaemonStart() error {
log.Printf("[%s] stream URL report failed: %v", sr.TaskID[:8], err)
}
}()
// Auto-shutdown after 30 min of idle (no HTTP requests)
go func() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if srv.IdleSince() > 30*time.Minute {
log.Printf("[%s] disk stream idle timeout (30m), shutting down", sr.TaskID[:8])
cancelStreamTask(sr.TaskID)
return
}
}
}
}()
}
// Wire: WS control actions (pause/cancel/stream pushed from server)
@ -331,6 +364,8 @@ func runDaemonStart() error {
log.Printf("[%s] resume requested via WebSocket, triggering poll", taskID[:8])
d.TriggerPoll()
case "stream":
// Only 1 stream at a time: cancel all existing streams
cancelAllStreams()
// Use registry mutex to prevent TOCTOU race with HTTP-polled stream requests
streamRegistry.mu.Lock()
if _, exists := streamRegistry.servers[taskID]; exists {
@ -352,6 +387,8 @@ func runDaemonStart() error {
streamRegistry.servers[taskID] = srv
streamRegistry.mu.Unlock()
task.SetStreamURL(srv.URL())
case "stop-stream":
cancelStreamTask(taskID)
}
}