From 01d62ffa1329a5fe1c28639b823e28e143cb517c Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Tue, 31 Mar 2026 16:55:50 +0200 Subject: [PATCH] fix(progress): always report status transitions and poll for control signals --- internal/agent/daemon.go | 6 ++- internal/agent/types.go | 4 +- internal/cmd/daemon.go | 59 ++++++++++++++++++++++++------ internal/config/config.go | 6 +++ internal/engine/progress.go | 73 +++++++++++++++++++++++++++++++------ 5 files changed, 122 insertions(+), 26 deletions(-) diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index 35d3fda..7b07cec 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -6,6 +6,7 @@ import ( "log" "os" "runtime" + "sync/atomic" "time" ) @@ -43,7 +44,8 @@ type Daemon struct { // Watching tracks whether a user is viewing download progress in the web UI. // When false, the progress reporter skips detailed updates (only sends final states). - Watching bool + // Accessed from heartbeat goroutine, flush goroutine, and WatchingFunc closure — must be atomic. + Watching atomic.Bool // Exposed tickers for hot-reload PollTicker *time.Ticker @@ -195,7 +197,7 @@ func (d *Daemon) heartbeat(ctx context.Context) { } // Update watching flag and state file - d.Watching = resp.Watching + d.Watching.Store(resp.Watching) d.State.LastHeartbeat = time.Now() if d.GetActiveCount != nil { d.State.ActiveTasks = d.GetActiveCount() diff --git a/internal/agent/types.go b/internal/agent/types.go index a5d2a81..616f23f 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -109,6 +109,7 @@ type StatusResponse struct { Paused bool `json:"paused,omitempty"` DeleteFiles bool `json:"deleteFiles,omitempty"` StreamRequested bool `json:"streamRequested,omitempty"` + Watching bool `json:"watching,omitempty"` } // BatchStatusRequest wraps multiple status updates in a single request. @@ -118,7 +119,8 @@ type BatchStatusRequest struct { // BatchStatusResponse wraps per-task results from the batch endpoint. type BatchStatusResponse struct { - Results []StatusResponse `json:"results"` + Results []StatusResponse `json:"results"` + Watching bool `json:"watching,omitempty"` } // HeartbeatResponse is returned by the server on heartbeat. diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index d83e5c0..4024311 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -136,6 +136,10 @@ func runDaemonStart() error { if heartbeatInterval == 0 { heartbeatInterval = 30 * time.Second } + statusInterval, _ := time.ParseDuration(cfg.Daemon.StatusInterval) + if statusInterval == 0 { + statusInterval = 3 * time.Second + } userAgent := "unarr/" + Version @@ -171,8 +175,9 @@ func runDaemonStart() error { d := agent.NewDaemon(daemonCfg, transport) // Create progress reporter using transport - reporter := engine.NewProgressReporterWithTransport(transport, 3*time.Second) - reporter.SetWatchingFunc(func() bool { return d.Watching }) + reporter := engine.NewProgressReporterWithTransport(transport, statusInterval) + reporter.SetWatchingFunc(func() bool { return d.Watching.Load() }) + reporter.SetWatchingChangedHandler(func(watching bool) { d.Watching.Store(watching) }) // Parse speed limits maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed) @@ -270,6 +275,8 @@ func runDaemonStart() error { d.OnTasksClaimed = func(tasks []agent.Task) { for _, t := range tasks { if t.Mode == "stream" { + // Only 1 stream at a time: cancel all existing streams + cancelAllStreams() go handleStreamTask(ctx, t, reporter, cfg) } else if t.ForceStart || manager.HasCapacity() { manager.Submit(ctx, t) @@ -281,20 +288,28 @@ func runDaemonStart() error { // Wire: stream requests for completed downloads → serve file from disk d.OnStreamRequested = func(sr agent.StreamRequest) { - // Check if already streaming this task - streamRegistry.mu.Lock() - _, exists := streamRegistry.servers[sr.TaskID] - streamRegistry.mu.Unlock() - if exists { + // Only 1 stream at a time: cancel all existing streams + cancelAllStreams() + + filePath := sr.FilePath + info, err := os.Stat(filePath) + if err != nil { + log.Printf("[%s] stream request: file not found: %s", sr.TaskID[:8], filePath) return } - if _, err := os.Stat(sr.FilePath); err != nil { - log.Printf("[%s] stream request: file not found: %s", sr.TaskID[:8], sr.FilePath) - return + // If filePath is a directory, find the largest video file inside + if info.IsDir() { + found := engine.FindVideoFile(filePath) + if found == "" { + log.Printf("[%s] stream request: no video file in directory: %s", sr.TaskID[:8], filePath) + return + } + filePath = found + log.Printf("[%s] resolved directory to video file: %s", sr.TaskID[:8], filepath.Base(filePath)) } - srv := engine.NewStreamServerFromDisk(sr.FilePath, 0) + srv := engine.NewStreamServerFromDisk(filePath, cfg.Download.StreamPort) streamURL, err := srv.Start(context.Background()) if err != nil { log.Printf("[%s] stream failed: %v", sr.TaskID[:8], err) @@ -316,6 +331,24 @@ func runDaemonStart() error { log.Printf("[%s] stream URL report failed: %v", sr.TaskID[:8], err) } }() + + // Auto-shutdown after 30 min of idle (no HTTP requests) + go func() { + ticker := time.NewTicker(60 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if srv.IdleSince() > 30*time.Minute { + log.Printf("[%s] disk stream idle timeout (30m), shutting down", sr.TaskID[:8]) + cancelStreamTask(sr.TaskID) + return + } + } + } + }() } // Wire: WS control actions (pause/cancel/stream pushed from server) @@ -331,6 +364,8 @@ func runDaemonStart() error { log.Printf("[%s] resume requested via WebSocket, triggering poll", taskID[:8]) d.TriggerPoll() case "stream": + // Only 1 stream at a time: cancel all existing streams + cancelAllStreams() // Use registry mutex to prevent TOCTOU race with HTTP-polled stream requests streamRegistry.mu.Lock() if _, exists := streamRegistry.servers[taskID]; exists { @@ -352,6 +387,8 @@ func runDaemonStart() error { streamRegistry.servers[taskID] = srv streamRegistry.mu.Unlock() task.SetStreamURL(srv.URL()) + case "stop-stream": + cancelStreamTask(taskID) } } diff --git a/internal/config/config.go b/internal/config/config.go index 04195b7..693f30d 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -44,6 +44,7 @@ type DownloadConfig struct { MetadataTimeout string `toml:"metadata_timeout"` // e.g. "1h", "30m", "0" = unlimited (default: "0") StallTimeout string `toml:"stall_timeout"` // e.g. "30m", "1h", "0" = unlimited (default: "30m") ListenPort int `toml:"listen_port"` // fixed port for incoming peer connections (default: 42069, 0 = random) + StreamPort int `toml:"stream_port"` // fixed port for streaming HTTP server (default: 11818) } type OrganizeConfig struct { @@ -55,6 +56,7 @@ type OrganizeConfig struct { type DaemonConfig struct { PollInterval string `toml:"poll_interval"` HeartbeatInterval string `toml:"heartbeat_interval"` + StatusInterval string `toml:"status_interval"` } type NotificationsConfig struct { @@ -85,6 +87,7 @@ func Default() Config { Download: DownloadConfig{ PreferredMethod: "auto", MaxConcurrent: 3, + StreamPort: 11818, }, Organize: OrganizeConfig{ Enabled: true, @@ -143,6 +146,9 @@ func Load(path string) (Config, error) { if cfg.General.Country == "" { cfg.General.Country = "US" } + if cfg.Download.StreamPort == 0 { + cfg.Download.StreamPort = 11818 + } return cfg, nil } diff --git a/internal/engine/progress.go b/internal/engine/progress.go index e2284fc..264de2f 100644 --- a/internal/engine/progress.go +++ b/internal/engine/progress.go @@ -39,27 +39,32 @@ type ProgressReporter struct { onPause ActionFunc onDeleteFiles ActionFunc onStreamRequested ActionFunc + onWatchingChanged func(watching bool) - mu sync.Mutex - latest map[string]*Task // taskID -> task with latest progress + mu sync.Mutex + latest map[string]*Task // taskID -> task with latest progress + lastReported map[string]TaskStatus // taskID -> last status sent to API + lastCheckAt time.Time // last time we reported for control-signal polling } // NewProgressReporter creates a reporter that flushes every interval. // Accepts *agent.Client directly (backwards compatible). func NewProgressReporter(ac *agent.Client, interval time.Duration) *ProgressReporter { return &ProgressReporter{ - reporter: ac, - interval: interval, - latest: make(map[string]*Task), + reporter: ac, + interval: interval, + latest: make(map[string]*Task), + lastReported: make(map[string]TaskStatus), } } // NewProgressReporterWithTransport creates a reporter using a Transport. func NewProgressReporterWithTransport(t agent.Transport, interval time.Duration) *ProgressReporter { return &ProgressReporter{ - reporter: &transportStatusAdapter{t: t}, - interval: interval, - latest: make(map[string]*Task), + reporter: &transportStatusAdapter{t: t}, + interval: interval, + latest: make(map[string]*Task), + lastReported: make(map[string]TaskStatus), } } @@ -87,6 +92,12 @@ func (r *ProgressReporter) SetStreamRequestedHandler(fn ActionFunc) { r.onStream // SetWatchingFunc sets the function that checks if someone is viewing downloads. func (r *ProgressReporter) SetWatchingFunc(fn WatchingFunc) { r.isWatching = fn } +// SetWatchingChangedHandler sets a callback invoked when the server's watching flag changes. +// This allows the daemon to update its Watching state from status responses (not just heartbeats). +func (r *ProgressReporter) SetWatchingChangedHandler(fn func(watching bool)) { + r.onWatchingChanged = fn +} + // Track registers a task for progress tracking. func (r *ProgressReporter) Track(task *Task) { r.mu.Lock() @@ -99,6 +110,7 @@ func (r *ProgressReporter) Untrack(taskID string) { r.mu.Lock() defer r.mu.Unlock() delete(r.latest, taskID) + delete(r.lastReported, taskID) } // Run starts the periodic flush loop. Blocks until ctx is cancelled. @@ -123,23 +135,38 @@ func (r *ProgressReporter) flush(ctx context.Context) { for _, t := range r.latest { tasks = append(tasks, t) } + // Snapshot lastReported under the same lock + lastReported := make(map[string]TaskStatus, len(r.lastReported)) + for k, v := range r.lastReported { + lastReported[k] = v + } r.mu.Unlock() - // When nobody is watching, only report final states (completed/failed). - // This saves ~99% of API requests when the user isn't on the downloads page. + // When nobody is watching, only report final states, status transitions, + // and periodic check-ins (every 30s) so we still receive control signals + // (cancel/pause) from the server. watching := r.isWatching == nil || r.isWatching() + controlCheckDue := time.Since(r.lastCheckAt) >= 30*time.Second var reportable []*Task for _, task := range tasks { status := task.GetStatus() isFinal := status == StatusCompleted || status == StatusFailed isActive := status == StatusDownloading || status == StatusVerifying || - status == StatusOrganizing || status == StatusSeeding - if isFinal || (watching && isActive) { + status == StatusOrganizing || status == StatusSeeding || + status == StatusResolving + // Always report status transitions so the DB reflects the current state. + prev := lastReported[task.ID] + isTransition := prev == "" || prev != status + if isFinal || isTransition || (watching && isActive) || (controlCheckDue && isActive) { reportable = append(reportable, task) } } + if controlCheckDue { + r.lastCheckAt = time.Now() + } + if len(reportable) == 0 { return } @@ -152,20 +179,27 @@ func (r *ProgressReporter) flush(ctx context.Context) { // Fallback: individual requests for _, task := range reportable { + statusAtReport := task.GetStatus() // capture before HTTP round-trip update := task.ToStatusUpdate() resp, err := r.reporter.ReportStatus(ctx, update) if err != nil { log.Printf("[%s] progress report failed: %v", task.ID[:8], err) continue } + r.mu.Lock() + r.lastReported[task.ID] = statusAtReport + r.mu.Unlock() r.handleResponse(task, resp) } } func (r *ProgressReporter) flushBatch(ctx context.Context, batcher BatchStatusReporter, tasks []*Task) { updates := make([]agent.StatusUpdate, len(tasks)) + // Capture status before HTTP round-trip to avoid missed transitions + statusAtReport := make([]TaskStatus, len(tasks)) for i, task := range tasks { updates[i] = task.ToStatusUpdate() + statusAtReport[i] = task.GetStatus() } resp, err := batcher.BatchReportStatus(ctx, updates) @@ -174,10 +208,20 @@ func (r *ProgressReporter) flushBatch(ctx context.Context, batcher BatchStatusRe return } + // Propagate watching flag from batch response + if resp.Watching && r.onWatchingChanged != nil { + r.onWatchingChanged(true) + } + // Match results back to tasks by index (server returns in same order) if len(resp.Results) != len(tasks) { log.Printf("batch response mismatch: sent %d updates, got %d results", len(tasks), len(resp.Results)) } + r.mu.Lock() + for i, task := range tasks { + r.lastReported[task.ID] = statusAtReport[i] + } + r.mu.Unlock() for i, result := range resp.Results { if i < len(tasks) { r.handleResponse(tasks[i], &result) @@ -186,6 +230,11 @@ func (r *ProgressReporter) flushBatch(ctx context.Context, batcher BatchStatusRe } func (r *ProgressReporter) handleResponse(task *Task, resp *agent.StatusResponse) { + // Propagate watching flag from status response to daemon + if resp.Watching && r.onWatchingChanged != nil { + r.onWatchingChanged(true) + } + if resp.Cancelled { log.Printf("[%s] cancelled by user (via web)", task.ID[:8]) r.Untrack(task.ID)