unarr/internal/engine/task.go
Deivid Soto ef4f38d324 fix: resolve deadlock, data races and path traversal vulnerabilities
- task.go: fix deadlock in ToStatusUpdate() — calling Percent() (which
  RLocks) while already holding RLock caused deadlock when a writer was
  waiting; compute percent inline instead
- usenet.go: fix data race in Cancel() — tracker and taskDir were read
  without the mutex while Download() writes them under it; read all
  fields under the same lock
- upnp.go: fix UPnP Remove() blocking shutdown — run cleanup in goroutine
  with 10s deadline (removeNATPMP worst case is 3s dial + 5s deadline)
- daemon.go: add path traversal protection for stream requests — validate
  sr.FilePath is within configured directories before os.Stat; defends
  against compromised API server sending arbitrary paths
- client.go: add wakeClient without timeout for long-poll wake endpoint
  where context controls cancellation
- sync.go: trigger immediate sync when entering watching mode so stream
  requests are picked up without waiting for the next scheduled interval
2026-04-08 23:36:18 +02:00

256 lines
6.7 KiB
Go

package engine
import (
"fmt"
"sync"
"time"
"github.com/torrentclaw/unarr/internal/agent"
)
// TaskStatus represents the current state of a download task.
type TaskStatus string
const (
StatusPending TaskStatus = "pending"
StatusClaimed TaskStatus = "claimed"
StatusResolving TaskStatus = "resolving"
StatusDownloading TaskStatus = "downloading"
StatusVerifying TaskStatus = "verifying"
StatusOrganizing TaskStatus = "organizing"
StatusSeeding TaskStatus = "seeding"
StatusCompleted TaskStatus = "completed"
StatusFailed TaskStatus = "failed"
StatusCancelled TaskStatus = "cancelled"
)
// validTransitions defines allowed state changes.
var validTransitions = map[TaskStatus][]TaskStatus{
StatusPending: {StatusClaimed},
StatusClaimed: {StatusResolving, StatusCancelled},
StatusResolving: {StatusDownloading, StatusFailed, StatusCancelled},
StatusDownloading: {StatusVerifying, StatusFailed, StatusResolving, StatusCancelled},
StatusVerifying: {StatusOrganizing, StatusFailed},
StatusOrganizing: {StatusSeeding, StatusCompleted},
StatusSeeding: {StatusCompleted},
}
// Task represents a download task with its full lifecycle state.
type Task struct {
mu sync.RWMutex
// From server
ID string
InfoHash string
Title string
ContentID *int
IMDbID string
PreferredMethod string // auto | torrent | debrid | usenet
DirectURL string // HTTPS download URL (debrid, etc.)
DirectFileName string // Original filename from direct URL
NzbID string // Pre-resolved NZB ID (usenet)
NzbPassword string // Password for encrypted NZB archives
ReplacePath string // File to replace after download (upgrade mode)
LibraryItemID int // Library item being upgraded
ContentType string // "movie" | "show" — from server metadata
ContentTitle string // Clean title from TMDB
Season *int // Season number
Episode *int // Episode number
ContentYear *int // Year from TMDB (avoids regex on torrent title)
CollectionName string // Collection name (e.g., "Harry Potter Collection")
// Runtime state
Status TaskStatus
Mode string // download | stream
ResolvedMethod DownloadMethod
TriedMethods []DownloadMethod
DownloadedBytes int64
TotalBytes int64
SpeedBps int64
ETA int
FileName string
FilePath string
StreamURL string
ErrorMessage string
// Timestamps
ClaimedAt time.Time
StartedAt time.Time
CompletedAt time.Time
}
// NewTaskFromAgent creates a Task from a server-claimed agent.Task.
func NewTaskFromAgent(at agent.Task) *Task {
mode := at.Mode
if mode == "" {
mode = "download"
}
return &Task{
ID: at.ID,
InfoHash: at.InfoHash,
Title: at.Title,
ContentID: at.ContentID,
IMDbID: at.IMDbID,
PreferredMethod: at.PreferredMethod,
DirectURL: at.DirectURL,
DirectFileName: at.DirectFileName,
NzbID: at.NzbID,
NzbPassword: at.NzbPassword,
ReplacePath: at.ReplacePath,
LibraryItemID: at.LibraryItemID,
ContentType: at.ContentType,
ContentTitle: at.ContentTitle,
ContentYear: at.ContentYear,
Season: at.Season,
Episode: at.Episode,
CollectionName: at.CollectionName,
Mode: mode,
Status: StatusClaimed,
ClaimedAt: time.Now(),
}
}
// Transition validates and performs a state transition.
func (t *Task) Transition(to TaskStatus) error {
t.mu.Lock()
defer t.mu.Unlock()
allowed, ok := validTransitions[t.Status]
if !ok {
return fmt.Errorf("no transitions from %s", t.Status)
}
for _, a := range allowed {
if a == to {
t.Status = to
if to == StatusDownloading {
t.StartedAt = time.Now()
}
if to == StatusCompleted || to == StatusFailed {
t.CompletedAt = time.Now()
}
return nil
}
}
return fmt.Errorf("invalid transition: %s -> %s", t.Status, to)
}
// GetStatus returns current status thread-safely.
func (t *Task) GetStatus() TaskStatus {
t.mu.RLock()
defer t.mu.RUnlock()
return t.Status
}
// SetStreamURL sets the stream URL thread-safely.
func (t *Task) SetStreamURL(url string) {
t.mu.Lock()
defer t.mu.Unlock()
t.StreamURL = url
}
// GetStreamURL returns the stream URL thread-safely.
func (t *Task) GetStreamURL() string {
t.mu.RLock()
defer t.mu.RUnlock()
return t.StreamURL
}
// UpdateProgress updates download metrics thread-safely.
func (t *Task) UpdateProgress(p Progress) {
t.mu.Lock()
defer t.mu.Unlock()
t.DownloadedBytes = p.DownloadedBytes
t.TotalBytes = p.TotalBytes
t.SpeedBps = p.SpeedBps
t.ETA = p.ETA
if p.FileName != "" {
t.FileName = p.FileName
}
}
// Percent returns download progress as 0-100.
func (t *Task) Percent() int {
t.mu.RLock()
defer t.mu.RUnlock()
if t.TotalBytes <= 0 {
return 0
}
p := int(float64(t.DownloadedBytes) / float64(t.TotalBytes) * 100)
if p > 100 {
return 100
}
return p
}
// ToStatusUpdate converts task state to an API status update.
func (t *Task) ToStatusUpdate() agent.StatusUpdate {
t.mu.RLock()
defer t.mu.RUnlock()
apiStatus := ""
switch t.Status {
case StatusResolving:
apiStatus = "resolving"
case StatusDownloading:
apiStatus = "downloading"
case StatusVerifying:
apiStatus = "verifying"
case StatusOrganizing:
apiStatus = "organizing"
case StatusSeeding:
apiStatus = "downloading"
case StatusCompleted:
apiStatus = "completed"
case StatusFailed:
apiStatus = "failed"
default:
// StatusPending, StatusClaimed, StatusCancelled — not reported
}
// Compute percent inline — do NOT call t.Percent() here since we already hold RLock.
// Calling Percent() (which also RLocks) while holding RLock deadlocks when a writer is waiting.
percent := 0
if t.TotalBytes > 0 {
percent = int(float64(t.DownloadedBytes) / float64(t.TotalBytes) * 100)
if percent > 100 {
percent = 100
}
}
return agent.StatusUpdate{
TaskID: t.ID,
Status: apiStatus,
Progress: percent,
DownloadedBytes: t.DownloadedBytes,
TotalBytes: t.TotalBytes,
SpeedBps: t.SpeedBps,
ETA: t.ETA,
ResolvedMethod: string(t.ResolvedMethod),
FileName: t.FileName,
FilePath: t.FilePath,
StreamURL: t.StreamURL,
ErrorMessage: t.ErrorMessage,
}
}
// MagnetURI builds a magnet link from the info hash.
func (t *Task) MagnetURI() string {
return "magnet:?xt=urn:btih:" + t.InfoHash
}
// HasUntried returns true if there are download methods not yet attempted.
func (t *Task) HasUntried(available []DownloadMethod) bool {
for _, m := range available {
tried := false
for _, tm := range t.TriedMethods {
if tm == m {
tried = true
break
}
}
if !tried {
return true
}
}
return false
}