From 5e8091150150175fb2b025978c7bce509a8105ba Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Sat, 28 Mar 2026 18:09:34 +0100 Subject: [PATCH] feat(debrid): add HTTPS downloader for debrid direct URLs DebridDownloader receives directUrl from the server and downloads via plain HTTPS with progress reporting, resume (Range), and pause/cancel. - Add DirectURL, DirectFileName to agent Task and engine Task types - Implement DebridDownloader: HTTPS download with progress, resume, cancel - HTTP client with 30s ResponseHeaderTimeout - Safe shortID helper to prevent slice panic on short IDs - Validate 416 against Content-Range server size for resume integrity - Register debridDl in daemon and one-shot download command - Tests: available, download, resume, cancel, pause, fallback filename, expired URL (410), unauthorized (401), shutdown, task propagation --- internal/agent/types.go | 30 ++- internal/cmd/daemon.go | 233 +++++++++++++++--- internal/cmd/download.go | 4 +- internal/engine/debrid.go | 306 ++++++++++++++++++++++-- internal/engine/debrid_test.go | 419 +++++++++++++++++++++++++++++++++ internal/engine/task.go | 14 +- internal/engine/task_test.go | 28 +++ 7 files changed, 981 insertions(+), 53 deletions(-) create mode 100644 internal/engine/debrid_test.go diff --git a/internal/agent/types.go b/internal/agent/types.go index 3e5ac9c..ce132a0 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -64,11 +64,20 @@ type Task struct { IMDbID string `json:"imdbId,omitempty"` PreferredMethod string `json:"preferredMethod"` // auto | debrid | usenet | torrent Mode string `json:"mode,omitempty"` // download | stream + DirectURL string `json:"directUrl,omitempty"` // HTTPS download URL (debrid, etc.) + DirectFileName string `json:"directFileName,omitempty"` // Original filename from direct URL } // TasksResponse wraps the array of tasks returned by the server. type TasksResponse struct { - Tasks []Task `json:"tasks"` + Tasks []Task `json:"tasks"` + StreamRequests []StreamRequest `json:"streamRequests,omitempty"` +} + +// StreamRequest is a request to stream a completed download from disk. +type StreamRequest struct { + TaskID string `json:"taskId"` + FilePath string `json:"filePath"` } // StatusUpdate is sent by the CLI to report download progress. @@ -97,6 +106,25 @@ type StatusResponse struct { StreamRequested bool `json:"streamRequested,omitempty"` } +// HeartbeatResponse is returned by the server on heartbeat. +type HeartbeatResponse struct { + Success bool `json:"success"` + Upgrade *UpgradeSignal `json:"upgrade,omitempty"` +} + +// UpgradeSignal tells the agent to upgrade to a specific version. +type UpgradeSignal struct { + Version string `json:"version"` +} + +// UpgradeResult is sent by the agent after an upgrade attempt. +type UpgradeResult struct { + AgentID string `json:"agentId"` + Success bool `json:"success"` + Version string `json:"version,omitempty"` + Error string `json:"error,omitempty"` +} + // ErrorResponse is returned on API errors. type ErrorResponse struct { Error string `json:"error"` diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 43c5b12..f904cfa 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "path/filepath" "syscall" "time" @@ -14,6 +15,7 @@ import ( "github.com/torrentclaw/torrentclaw-cli/internal/agent" "github.com/torrentclaw/torrentclaw-cli/internal/config" "github.com/torrentclaw/torrentclaw-cli/internal/engine" + "github.com/torrentclaw/torrentclaw-cli/internal/upgrade" ) // newStartCmd creates the top-level `unarr start` command. @@ -55,35 +57,13 @@ func newDaemonCmd() *cobra.Command { } cmd.AddCommand( - newDaemonInstallCmd(), - newDaemonUninstallCmd(), + newDaemonInstallCmdReal(), + newDaemonUninstallCmdReal(), ) return cmd } -func newDaemonInstallCmd() *cobra.Command { - return &cobra.Command{ - Use: "install", - Short: "Install daemon as a system service (systemd/launchd)", - RunE: func(cmd *cobra.Command, args []string) error { - fmt.Println(" Service installation coming in a future release.") - fmt.Println(" For now, use: unarr start") - return nil - }, - } -} - -func newDaemonUninstallCmd() *cobra.Command { - return &cobra.Command{ - Use: "uninstall", - Short: "Remove daemon system service", - RunE: func(cmd *cobra.Command, args []string) error { - fmt.Println(" Service uninstall coming in a future release.") - return nil - }, - } -} func runDaemonStart() error { cfg := loadConfig() @@ -124,10 +104,11 @@ func runDaemonStart() error { heartbeatInterval = 30 * time.Second } - // Create agent client + // Create agent client (direct HTTP — always available as fallback) ac := agent.NewClient(cfg.Auth.APIURL, cfg.Auth.APIKey, "unarr/"+Version) + userAgent := "unarr/" + Version - // Create daemon + // Create daemon config daemonCfg := agent.DaemonConfig{ AgentID: cfg.Agent.ID, AgentName: cfg.Agent.Name, @@ -136,10 +117,37 @@ func runDaemonStart() error { PollInterval: pollInterval, HeartbeatInterval: heartbeatInterval, } - d := agent.NewDaemon(daemonCfg, ac) + // Create transport: Hybrid (WS + HTTP fallback) or HTTP-only + wsURL := cfg.Auth.WSURL + if wsURL == "" { + wsURL = deriveWSURL(cfg.Auth.APIURL, cfg.Agent.ID) + } + + var transport agent.Transport + if wsURL != "" { + httpT := agent.NewHTTPTransport(cfg.Auth.APIURL, cfg.Auth.APIKey, userAgent) + wsT := agent.NewWSTransport(wsURL, cfg.Auth.APIKey, cfg.Agent.ID, userAgent) + transport = agent.NewHybridTransport(wsT, httpT) + log.Printf("Transport: WebSocket (fallback: HTTP) → %s", wsURL) + } + + // Create daemon + var d *agent.Daemon + if transport != nil { + d = agent.NewDaemonWithTransport(daemonCfg, transport) + } else { + d = agent.NewDaemon(daemonCfg, ac) + } + + // Wire state tracking (connected after manager creation below) // Create progress reporter - reporter := engine.NewProgressReporter(ac, 3*time.Second) + var reporter *engine.ProgressReporter + if transport != nil { + reporter = engine.NewProgressReporterWithTransport(transport, 3*time.Second) + } else { + reporter = engine.NewProgressReporter(ac, 3*time.Second) + } // Parse speed limits maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed) @@ -169,6 +177,9 @@ func runDaemonStart() error { log.Printf("Speed limits: download=%s upload=%s", dlStr, ulStr) } + // Create debrid downloader (HTTPS-based, no provider interaction needed) + debridDl := engine.NewDebridDownloader() + // Create download manager manager := engine.NewManager(engine.ManagerConfig{ MaxConcurrent: cfg.Download.MaxConcurrent, @@ -179,7 +190,10 @@ func runDaemonStart() error { MoviesDir: cfg.Organize.MoviesDir, TVShowsDir: cfg.Organize.TVShowsDir, }, - }, reporter, torrentDl) + }, reporter, torrentDl, debridDl) + + // Wire state tracking + d.GetActiveCount = manager.ActiveCount // Wire: server-side signals -> manager actions + stream tasks reporter.SetCancelHandler(func(taskID string) { @@ -233,6 +247,142 @@ func runDaemonStart() error { } } + // Wire: stream requests for completed downloads → serve file from disk + d.OnStreamRequested = func(sr agent.StreamRequest) { + // Check if already streaming this task + streamRegistry.mu.Lock() + _, exists := streamRegistry.servers[sr.TaskID] + streamRegistry.mu.Unlock() + if exists { + return + } + + if _, err := os.Stat(sr.FilePath); err != nil { + log.Printf("[%s] stream request: file not found: %s", sr.TaskID[:8], sr.FilePath) + return + } + + srv := engine.NewStreamServerFromDisk(sr.FilePath, 0) + streamURL, err := srv.Start(context.Background()) + if err != nil { + log.Printf("[%s] stream failed: %v", sr.TaskID[:8], err) + return + } + + streamRegistry.mu.Lock() + streamRegistry.servers[sr.TaskID] = srv + streamRegistry.mu.Unlock() + + log.Printf("[%s] streaming from disk: %s → %s", sr.TaskID[:8], filepath.Base(sr.FilePath), streamURL) + + // Report stream URL back to the server + go func() { + if _, err := ac.ReportStatus(ctx, agent.StatusUpdate{ + TaskID: sr.TaskID, + StreamURL: streamURL, + }); err != nil { + log.Printf("[%s] stream URL report failed: %v", sr.TaskID[:8], err) + } + }() + } + + // Wire: WS control actions (pause/cancel/stream pushed from server) + d.OnControlAction = func(action, taskID string) { + switch action { + case "cancel": + manager.CancelTask(taskID) + cancelStreamTask(taskID) + case "pause": + manager.PauseTask(taskID) + cancelStreamTask(taskID) + case "resume": + log.Printf("[%s] resume requested via WebSocket", taskID[:8]) + case "stream": + task := manager.GetTask(taskID) + if task == nil { + return + } + if task.GetStreamURL() != "" { + return + } + srv, err := torrentDl.StartStream(taskID) + if err != nil { + log.Printf("[%s] stream failed: %v", taskID[:8], err) + return + } + streamRegistry.mu.Lock() + streamRegistry.servers[taskID] = srv + streamRegistry.mu.Unlock() + task.SetStreamURL(srv.URL()) + } + } + + // Wire: server-requested upgrade + d.OnUpgradeRequested = func(targetVersion string) { + + // Wait for active downloads to finish + if active := manager.ActiveCount(); active > 0 { + log.Printf("Waiting for %d active download(s) to finish before upgrading...", active) + manager.Wait() + } + + upgrader := &upgrade.Upgrader{CurrentVersion: Version} + result := upgrader.Execute(ctx, targetVersion) + + // Report result to server + reportCtx, reportCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer reportCancel() + errMsg := "" + if result.Error != nil { + errMsg = result.Error.Error() + } + upgradeResult := agent.UpgradeResult{ + AgentID: cfg.Agent.ID, + Success: result.Success, + Version: result.NewVersion, + Error: errMsg, + } + if transport != nil { + _ = transport.ReportUpgradeResult(reportCtx, upgradeResult) + } else { + _ = ac.ReportUpgradeResult(reportCtx, upgradeResult) + } + + if !result.Success { + log.Printf("Upgrade failed: %v", result.Error) + d.ClearUpgradeInProgress() + return + } + + // Restart: replace current process with the new binary + log.Printf("Upgrade successful (%s → %s), restarting...", result.OldVersion, result.NewVersion) + + // Deregister first so the server knows we're restarting + deregCtx, deregCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer deregCancel() + _ = ac.Deregister(deregCtx, cfg.Agent.ID) + + // Flush progress reporter + cancel() + + // Re-exec with the same args — the new binary takes over + binPath, err := os.Executable() + if err != nil { + log.Printf("Could not determine executable path: %v", err) + os.Exit(75) // EX_TEMPFAIL + } + // syscall.Exec replaces the current process (Unix) + execErr := syscall.Exec(binPath, os.Args, os.Environ()) + // If we get here, exec failed (e.g. Windows) + log.Printf("Exec failed: %v — exiting for service manager restart", execErr) + os.Exit(75) + } + + // Config hot-reload (SIGUSR1 on Unix, no-op on Windows) + // Tickers are initialized inside d.Run(), so we pass the daemon + // and the reload goroutine reads them when the signal arrives. + startReloadWatcher(&ReloadableConfig{Daemon: d}) + // Signal handling sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) @@ -266,6 +416,31 @@ func runDaemonStart() error { } } +// deriveWSURL derives a WebSocket URL from the API URL. +// https://torrentclaw.com → wss://unarr.torrentclaw.com/ws/{agentId} +func deriveWSURL(apiURL, agentID string) string { + if apiURL == "" || agentID == "" { + return "" + } + // Parse domain from API URL + domain := apiURL + for _, prefix := range []string{"https://", "http://"} { + if len(domain) > len(prefix) && domain[:len(prefix)] == prefix { + domain = domain[len(prefix):] + break + } + } + // Strip trailing slash/path + for i := 0; i < len(domain); i++ { + if domain[i] == '/' { + domain = domain[:i] + break + } + } + + return "wss://unarr." + domain + "/ws/" + agentID +} + func formatSpeedLog(bps int64) string { switch { case bps >= 1024*1024*1024: diff --git a/internal/cmd/download.go b/internal/cmd/download.go index 8dd070b..e9d9024 100644 --- a/internal/cmd/download.go +++ b/internal/cmd/download.go @@ -88,6 +88,8 @@ func runDownload(input, method string) error { 5*time.Second, ) + debridDl := engine.NewDebridDownloader() + manager := engine.NewManager(engine.ManagerConfig{ MaxConcurrent: 1, OutputDir: outputDir, @@ -96,7 +98,7 @@ func runDownload(input, method string) error { MoviesDir: cfg.Organize.MoviesDir, TVShowsDir: cfg.Organize.TVShowsDir, }, - }, reporter, torrentDl) + }, reporter, torrentDl, debridDl) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/engine/debrid.go b/internal/engine/debrid.go index d6f9de1..d6bcb85 100644 --- a/internal/engine/debrid.go +++ b/internal/engine/debrid.go @@ -3,39 +3,303 @@ package engine import ( "context" "fmt" - - tc "github.com/torrentclaw/go-client" + "io" + "log" + "net/http" + "os" + "path/filepath" + "sync" + "time" ) -// DebridDownloader downloads via debrid services (Real-Debrid, AllDebrid, etc.). -// Currently a stub — Available() works, Download() returns not-implemented. -type DebridDownloader struct { - apiClient *tc.Client +// httpClient is used for debrid HTTPS downloads with a reasonable header timeout. +var httpClient = &http.Client{ + Transport: &http.Transport{ + ResponseHeaderTimeout: 30 * time.Second, + }, } -// NewDebridDownloader creates a debrid downloader stub. -func NewDebridDownloader(apiClient *tc.Client) *DebridDownloader { - return &DebridDownloader{apiClient: apiClient} +func shortID(id string) string { + if len(id) > 8 { + return id[:8] + } + return id +} + +// DebridDownloader downloads files via HTTPS direct URLs resolved by the server. +// The server handles all debrid provider interaction; this downloader only needs +// a plain HTTPS URL to fetch. +type DebridDownloader struct { + activeMu sync.Mutex + active map[string]context.CancelFunc +} + +// NewDebridDownloader creates a debrid downloader. +func NewDebridDownloader() *DebridDownloader { + return &DebridDownloader{ + active: make(map[string]context.CancelFunc), + } } func (d *DebridDownloader) Method() DownloadMethod { return MethodDebrid } -func (d *DebridDownloader) Available(ctx context.Context, task *Task) (bool, error) { - if d.apiClient == nil { - return false, nil +// Available returns true if the task has a direct HTTPS URL from the server. +func (d *DebridDownloader) Available(_ context.Context, task *Task) (bool, error) { + return task.DirectURL != "", nil +} + +// Download fetches the file from task.DirectURL via HTTPS with progress reporting. +// Supports resume via HTTP Range headers if the server supports it. +func (d *DebridDownloader) Download(ctx context.Context, task *Task, outputDir string, progressCh chan<- Progress) (*Result, error) { + if task.DirectURL == "" { + return nil, fmt.Errorf("no direct URL provided for debrid download") } - resp, err := d.apiClient.DebridCheckCache(ctx, "", "", []string{task.InfoHash}) + + // Determine filename + fileName := task.DirectFileName + if fileName == "" { + fileName = task.Title + if fileName == "" { + fileName = task.InfoHash + } + } + + destPath, err := safePath(outputDir, fileName) if err != nil { - return false, err + return nil, fmt.Errorf("invalid filename: %w", err) } - cached, ok := resp.Cached[task.InfoHash] - return ok && cached, nil + + // Check for existing partial file (resume support) + var existingSize int64 + if fi, statErr := os.Stat(destPath); statErr == nil { + existingSize = fi.Size() + } + + // Create cancellable context + dlCtx, cancel := context.WithCancel(ctx) + + d.activeMu.Lock() + d.active[task.ID] = cancel + d.activeMu.Unlock() + + defer func() { + d.activeMu.Lock() + delete(d.active, task.ID) + d.activeMu.Unlock() + cancel() + }() + + // Build request with optional Range header for resume + req, err := http.NewRequestWithContext(dlCtx, http.MethodGet, task.DirectURL, nil) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + if existingSize > 0 { + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", existingSize)) + } + + resp, err := httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("http request: %w", err) + } + defer resp.Body.Close() + + // Handle response codes + var totalBytes int64 + var startOffset int64 + + switch resp.StatusCode { + case http.StatusOK: + // Full download (server doesn't support Range, or fresh start) + if resp.ContentLength > 0 { + totalBytes = resp.ContentLength + } + existingSize = 0 // Start fresh + case http.StatusPartialContent: + // Resume accepted + startOffset = existingSize + if resp.ContentLength > 0 { + totalBytes = existingSize + resp.ContentLength + } + case http.StatusRequestedRangeNotSatisfiable: + // 416 means our Range start is beyond the file size. + // Verify local file matches the server's actual size via Content-Range header. + if existingSize > 0 { + if cr := resp.Header.Get("Content-Range"); cr != "" { + // Content-Range: bytes */12345 — parse total size + var serverSize int64 + if _, err := fmt.Sscanf(cr, "bytes */%d", &serverSize); err == nil && serverSize > 0 && existingSize != serverSize { + // Local file size doesn't match server — re-download from scratch + log.Printf("[%s] local size %s != server size %s, re-downloading", shortID(task.ID), formatBytes(existingSize), formatBytes(serverSize)) + existingSize = 0 + resp.Body.Close() + req2, err := http.NewRequestWithContext(dlCtx, http.MethodGet, task.DirectURL, nil) + if err != nil { + return nil, fmt.Errorf("create retry request: %w", err) + } + resp, err = httpClient.Do(req2) + if err != nil { + return nil, fmt.Errorf("retry http request: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("retry unexpected HTTP status: %d %s", resp.StatusCode, resp.Status) + } + if resp.ContentLength > 0 { + totalBytes = resp.ContentLength + } + break // continue to download loop + } + } + log.Printf("[%s] file already complete: %s (%s)", shortID(task.ID), fileName, formatBytes(existingSize)) + return &Result{ + FilePath: destPath, + FileName: fileName, + Method: MethodDebrid, + Size: existingSize, + }, nil + } + return nil, fmt.Errorf("server returned 416 Range Not Satisfiable") + default: + return nil, fmt.Errorf("unexpected HTTP status: %d %s", resp.StatusCode, resp.Status) + } + + // Open file for writing (append if resuming, create if new) + var flags int + if startOffset > 0 { + flags = os.O_WRONLY | os.O_APPEND + log.Printf("[%s] resuming debrid download at %s: %s", shortID(task.ID), formatBytes(startOffset), fileName) + } else { + flags = os.O_WRONLY | os.O_CREATE | os.O_TRUNC + log.Printf("[%s] starting debrid download: %s", shortID(task.ID), fileName) + } + + if err := os.MkdirAll(filepath.Dir(destPath), 0o755); err != nil { + return nil, fmt.Errorf("create directory: %w", err) + } + + file, err := os.OpenFile(destPath, flags, 0o644) + if err != nil { + return nil, fmt.Errorf("open file: %w", err) + } + defer file.Close() + + // Download with progress reporting + downloaded := startOffset + lastReportAt := time.Now() + lastBytes := downloaded + buf := make([]byte, 256*1024) // 256KB buffer + + for { + select { + case <-dlCtx.Done(): + return nil, dlCtx.Err() + default: + } + + n, readErr := resp.Body.Read(buf) + if n > 0 { + if _, writeErr := file.Write(buf[:n]); writeErr != nil { + return nil, fmt.Errorf("write file: %w", writeErr) + } + downloaded += int64(n) + } + + // Report progress every second + now := time.Now() + if now.Sub(lastReportAt) >= time.Second || readErr == io.EOF { + elapsed := now.Sub(lastReportAt).Seconds() + var speed int64 + if elapsed > 0 { + speed = int64(float64(downloaded-lastBytes) / elapsed) + } + + var eta int + if speed > 0 && totalBytes > 0 { + eta = int((totalBytes - downloaded) / speed) + } + + pct := 0 + if totalBytes > 0 { + pct = int(float64(downloaded) / float64(totalBytes) * 100) + } + + fmt.Fprintf(os.Stderr, "\r[%s] %d%% — %s/%s @ %s/s (debrid)", + shortID(task.ID), pct, + formatBytes(downloaded), formatBytes(totalBytes), formatBytes(speed)) + + p := Progress{ + DownloadedBytes: downloaded, + TotalBytes: totalBytes, + SpeedBps: speed, + ETA: eta, + FileName: fileName, + } + task.UpdateProgress(p) + + select { + case progressCh <- p: + default: + } + + lastReportAt = now + lastBytes = downloaded + } + + if readErr == io.EOF { + break + } + if readErr != nil { + return nil, fmt.Errorf("read response: %w", readErr) + } + } + + fmt.Fprint(os.Stderr, "\r\033[2K") // clear progress line + log.Printf("[%s] debrid download complete: %s (%s)", shortID(task.ID), fileName, formatBytes(downloaded)) + + return &Result{ + FilePath: destPath, + FileName: fileName, + Method: MethodDebrid, + Size: downloaded, + }, nil } -func (d *DebridDownloader) Download(_ context.Context, _ *Task, _ string, _ chan<- Progress) (*Result, error) { - return nil, fmt.Errorf("debrid download not implemented yet (coming in a future release)") +// Pause cancels the in-progress HTTP download but keeps partial file for resume. +func (d *DebridDownloader) Pause(taskID string) error { + d.activeMu.Lock() + cancel, ok := d.active[taskID] + delete(d.active, taskID) + d.activeMu.Unlock() + + if ok { + cancel() + log.Printf("[%s] debrid download paused (file kept for resume)", shortID(taskID)) + } + return nil } -func (d *DebridDownloader) Pause(_ string) error { return nil } -func (d *DebridDownloader) Cancel(_ string) error { return nil } -func (d *DebridDownloader) Shutdown(_ context.Context) error { return nil } +// Cancel aborts the in-progress HTTP download. Partial file is kept on disk. +func (d *DebridDownloader) Cancel(taskID string) error { + d.activeMu.Lock() + cancel, ok := d.active[taskID] + delete(d.active, taskID) + d.activeMu.Unlock() + + if ok { + cancel() + log.Printf("[%s] debrid download cancelled", shortID(taskID)) + } + return nil +} + +func (d *DebridDownloader) Shutdown(_ context.Context) error { + d.activeMu.Lock() + defer d.activeMu.Unlock() + + for id, cancel := range d.active { + cancel() + delete(d.active, id) + } + return nil +} diff --git a/internal/engine/debrid_test.go b/internal/engine/debrid_test.go new file mode 100644 index 0000000..5a20222 --- /dev/null +++ b/internal/engine/debrid_test.go @@ -0,0 +1,419 @@ +package engine + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/torrentclaw/torrentclaw-cli/internal/agent" +) + +func TestDebridAvailable(t *testing.T) { + d := NewDebridDownloader() + + t.Run("available when DirectURL is set", func(t *testing.T) { + task := &Task{DirectURL: "https://cdn.example.com/file.mkv"} + ok, err := d.Available(context.Background(), task) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Error("should be available when DirectURL is set") + } + }) + + t.Run("not available when DirectURL is empty", func(t *testing.T) { + task := &Task{DirectURL: ""} + ok, err := d.Available(context.Background(), task) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if ok { + t.Error("should not be available when DirectURL is empty") + } + }) +} + +func TestDebridDownloadSuccess(t *testing.T) { + fileContent := strings.Repeat("x", 1024*100) // 100KB file + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(fileContent))) + w.WriteHeader(http.StatusOK) + w.Write([]byte(fileContent)) + })) + defer srv.Close() + + d := NewDebridDownloader() + outputDir := t.TempDir() + + task := &Task{ + ID: "debrid-test-001", + InfoHash: "abc123def456abc123def456abc123def456abc1", + Title: "Test Movie", + DirectURL: srv.URL + "/file.mkv", + DirectFileName: "Test.Movie.2026.1080p.mkv", + Status: StatusDownloading, + } + + progressCh := make(chan Progress, 100) + result, err := d.Download(context.Background(), task, outputDir, progressCh) + close(progressCh) + + if err != nil { + t.Fatalf("Download failed: %v", err) + } + + if result.Method != MethodDebrid { + t.Errorf("Method = %q, want debrid", result.Method) + } + if result.FileName != "Test.Movie.2026.1080p.mkv" { + t.Errorf("FileName = %q, want Test.Movie.2026.1080p.mkv", result.FileName) + } + if result.Size != int64(len(fileContent)) { + t.Errorf("Size = %d, want %d", result.Size, len(fileContent)) + } + + // Verify file exists on disk + data, err := os.ReadFile(result.FilePath) + if err != nil { + t.Fatalf("read downloaded file: %v", err) + } + if len(data) != len(fileContent) { + t.Errorf("file size = %d, want %d", len(data), len(fileContent)) + } + + // Verify task progress was updated + if task.DownloadedBytes != int64(len(fileContent)) { + t.Errorf("task.DownloadedBytes = %d, want %d", task.DownloadedBytes, len(fileContent)) + } +} + +func TestDebridDownloadNoURL(t *testing.T) { + d := NewDebridDownloader() + task := &Task{ID: "no-url-001", DirectURL: ""} + progressCh := make(chan Progress, 10) + + _, err := d.Download(context.Background(), task, t.TempDir(), progressCh) + if err == nil { + t.Error("expected error for empty DirectURL") + } + if !strings.Contains(err.Error(), "no direct URL") { + t.Errorf("error = %q, should mention no direct URL", err.Error()) + } +} + +func TestDebridDownloadHTTPError(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusForbidden) + })) + defer srv.Close() + + d := NewDebridDownloader() + task := &Task{ + ID: "http-err-001", + DirectURL: srv.URL + "/expired", + DirectFileName: "expired.mkv", + } + progressCh := make(chan Progress, 10) + + _, err := d.Download(context.Background(), task, t.TempDir(), progressCh) + if err == nil { + t.Error("expected error for HTTP 403") + } + if !strings.Contains(err.Error(), "403") { + t.Errorf("error = %q, should contain 403", err.Error()) + } +} + +func TestDebridDownloadExpiredURL(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusGone) // 410 — URL expired + })) + defer srv.Close() + + d := NewDebridDownloader() + task := &Task{ + ID: "expired-001", + DirectURL: srv.URL + "/expired", + DirectFileName: "expired.mkv", + } + progressCh := make(chan Progress, 10) + + _, err := d.Download(context.Background(), task, t.TempDir(), progressCh) + if err == nil { + t.Error("expected error for HTTP 410 (expired URL)") + } + if !strings.Contains(err.Error(), "410") { + t.Errorf("error = %q, should contain 410", err.Error()) + } +} + +func TestDebridDownloadUnauthorized(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) + })) + defer srv.Close() + + d := NewDebridDownloader() + task := &Task{ + ID: "unauth-001", + DirectURL: srv.URL + "/unauth", + DirectFileName: "unauth.mkv", + } + progressCh := make(chan Progress, 10) + + _, err := d.Download(context.Background(), task, t.TempDir(), progressCh) + if err == nil { + t.Error("expected error for HTTP 401") + } + if !strings.Contains(err.Error(), "401") { + t.Errorf("error = %q, should contain 401", err.Error()) + } +} + +func TestDebridDownloadResume(t *testing.T) { + fullContent := "HEADER_ALREADY_DOWNLOADED_REST_OF_FILE" + alreadyDownloaded := "HEADER_ALREADY_DOWNLOADED_" + remaining := "REST_OF_FILE" + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + // Parse "bytes=26-" + var start int64 + fmt.Sscanf(rangeHeader, "bytes=%d-", &start) + if start == int64(len(alreadyDownloaded)) { + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(remaining))) + w.WriteHeader(http.StatusPartialContent) + w.Write([]byte(remaining)) + return + } + } + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(fullContent))) + w.WriteHeader(http.StatusOK) + w.Write([]byte(fullContent)) + })) + defer srv.Close() + + d := NewDebridDownloader() + outputDir := t.TempDir() + fileName := "resume-test.mkv" + + // Create partial file + partialPath := filepath.Join(outputDir, fileName) + if err := os.WriteFile(partialPath, []byte(alreadyDownloaded), 0o644); err != nil { + t.Fatalf("write partial file: %v", err) + } + + task := &Task{ + ID: "resume-001", + DirectURL: srv.URL + "/file.mkv", + DirectFileName: fileName, + Status: StatusDownloading, + } + + progressCh := make(chan Progress, 100) + result, err := d.Download(context.Background(), task, outputDir, progressCh) + if err != nil { + t.Fatalf("Download failed: %v", err) + } + + // Verify total size includes both parts + if result.Size != int64(len(fullContent)) { + t.Errorf("Size = %d, want %d", result.Size, len(fullContent)) + } + + // Verify file content + data, err := os.ReadFile(result.FilePath) + if err != nil { + t.Fatalf("read file: %v", err) + } + if string(data) != fullContent { + t.Errorf("file content = %q, want %q", string(data), fullContent) + } +} + +func TestDebridDownloadCancel(t *testing.T) { + // Server that sends a chunk then waits + started := make(chan struct{}) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", "1000000") + w.WriteHeader(http.StatusOK) + // Write some data so the download starts + w.Write([]byte(strings.Repeat("x", 4096))) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + close(started) + // Block until client disconnects + <-r.Context().Done() + })) + defer srv.Close() + + d := NewDebridDownloader() + task := &Task{ + ID: "cancel-001", + DirectURL: srv.URL + "/slow", + DirectFileName: "slow.mkv", + Status: StatusDownloading, + } + + progressCh := make(chan Progress, 100) + + errCh := make(chan error, 1) + go func() { + _, err := d.Download(context.Background(), task, t.TempDir(), progressCh) + errCh <- err + }() + + // Wait for server to confirm download started, then cancel + <-started + d.Cancel("cancel-001") + + err := <-errCh + if err == nil { + t.Error("expected error after cancel") + } +} + +func TestDebridDownloadPause(t *testing.T) { + // Server that sends a chunk then waits + started := make(chan struct{}) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", "1000000") + w.WriteHeader(http.StatusOK) + // Write enough data to create file + w.Write([]byte(strings.Repeat("x", 8192))) + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + close(started) + // Block until client disconnects + <-r.Context().Done() + })) + defer srv.Close() + + d := NewDebridDownloader() + outputDir := t.TempDir() + task := &Task{ + ID: "pause-001", + DirectURL: srv.URL + "/slow", + DirectFileName: "pauseable.mkv", + Status: StatusDownloading, + } + + progressCh := make(chan Progress, 100) + errCh := make(chan error, 1) + go func() { + _, err := d.Download(context.Background(), task, outputDir, progressCh) + errCh <- err + }() + + // Wait for server to confirm data was sent, then pause + <-started + time.Sleep(50 * time.Millisecond) // small delay for file write + d.Pause("pause-001") + + <-errCh + + // Verify partial file exists on disk (pause keeps files) + partialPath := filepath.Join(outputDir, "pauseable.mkv") + fi, err := os.Stat(partialPath) + if err != nil { + t.Fatalf("partial file should exist after pause: %v", err) + } + if fi.Size() == 0 { + t.Error("partial file should have some bytes") + } +} + +func TestDebridDownloadFallbackFilename(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", "5") + w.WriteHeader(http.StatusOK) + io.WriteString(w, "hello") + })) + defer srv.Close() + + d := NewDebridDownloader() + + t.Run("uses Title when DirectFileName is empty", func(t *testing.T) { + task := &Task{ + ID: "fallback-001", + Title: "My Movie Title", + DirectURL: srv.URL + "/file", + Status: StatusDownloading, + } + progressCh := make(chan Progress, 10) + result, err := d.Download(context.Background(), task, t.TempDir(), progressCh) + if err != nil { + t.Fatalf("Download failed: %v", err) + } + if result.FileName != "My Movie Title" { + t.Errorf("FileName = %q, want 'My Movie Title'", result.FileName) + } + }) + + t.Run("uses InfoHash when both are empty", func(t *testing.T) { + task := &Task{ + ID: "fallback-002", + InfoHash: "abc123", + DirectURL: srv.URL + "/file", + Status: StatusDownloading, + } + progressCh := make(chan Progress, 10) + result, err := d.Download(context.Background(), task, t.TempDir(), progressCh) + if err != nil { + t.Fatalf("Download failed: %v", err) + } + if result.FileName != "abc123" { + t.Errorf("FileName = %q, want 'abc123'", result.FileName) + } + }) +} + +func TestDebridShutdown(t *testing.T) { + d := NewDebridDownloader() + err := d.Shutdown(context.Background()) + if err != nil { + t.Errorf("Shutdown should not error: %v", err) + } +} + +func TestNewTaskFromAgentWithDirectURL(t *testing.T) { + at := agent.Task{ + ID: "uuid-debrid", + InfoHash: "abc123def456abc123def456abc123def456abc1", + Title: "Debrid Movie", + PreferredMethod: "debrid", + DirectURL: "https://cdn.torbox.app/dl/abc123/movie.mkv", + DirectFileName: "Movie.2026.1080p.mkv", + } + + task := NewTaskFromAgent(at) + + if task.DirectURL != "https://cdn.torbox.app/dl/abc123/movie.mkv" { + t.Errorf("DirectURL = %q", task.DirectURL) + } + if task.DirectFileName != "Movie.2026.1080p.mkv" { + t.Errorf("DirectFileName = %q", task.DirectFileName) + } + if task.PreferredMethod != "debrid" { + t.Errorf("PreferredMethod = %q", task.PreferredMethod) + } +} + +func TestDebridMethod(t *testing.T) { + d := NewDebridDownloader() + if d.Method() != MethodDebrid { + t.Errorf("Method = %q, want debrid", d.Method()) + } +} diff --git a/internal/engine/task.go b/internal/engine/task.go index 36904c5..44f7dff 100644 --- a/internal/engine/task.go +++ b/internal/engine/task.go @@ -46,6 +46,8 @@ type Task struct { ContentID *int IMDbID string PreferredMethod string // auto | torrent | debrid | usenet + DirectURL string // HTTPS download URL (debrid, etc.) + DirectFileName string // Original filename from direct URL // Runtime state Status TaskStatus @@ -80,6 +82,8 @@ func NewTaskFromAgent(at agent.Task) *Task { ContentID: at.ContentID, IMDbID: at.IMDbID, PreferredMethod: at.PreferredMethod, + DirectURL: at.DirectURL, + DirectFileName: at.DirectFileName, Mode: mode, Status: StatusClaimed, ClaimedAt: time.Now(), @@ -165,7 +169,15 @@ func (t *Task) ToStatusUpdate() agent.StatusUpdate { apiStatus := "" switch t.Status { - case StatusResolving, StatusDownloading, StatusVerifying, StatusOrganizing, StatusSeeding: + case StatusResolving: + apiStatus = "resolving" + case StatusDownloading: + apiStatus = "downloading" + case StatusVerifying: + apiStatus = "verifying" + case StatusOrganizing: + apiStatus = "organizing" + case StatusSeeding: apiStatus = "downloading" case StatusCompleted: apiStatus = "completed" diff --git a/internal/engine/task_test.go b/internal/engine/task_test.go index 4e205a1..13848d4 100644 --- a/internal/engine/task_test.go +++ b/internal/engine/task_test.go @@ -171,6 +171,34 @@ func TestToStatusUpdate(t *testing.T) { } } +func TestToStatusUpdateGranularStates(t *testing.T) { + tests := []struct { + status TaskStatus + wantAPI string + }{ + {StatusResolving, "resolving"}, + {StatusDownloading, "downloading"}, + {StatusVerifying, "verifying"}, + {StatusOrganizing, "organizing"}, + {StatusCompleted, "completed"}, + {StatusFailed, "failed"}, + {StatusSeeding, "downloading"}, // seeding maps to downloading for backwards compat + } + + for _, tt := range tests { + t.Run(string(tt.status), func(t *testing.T) { + task := &Task{ + ID: "task-1", + Status: tt.status, + } + update := task.ToStatusUpdate() + if update.Status != tt.wantAPI { + t.Errorf("ToStatusUpdate().Status for %s = %q, want %q", tt.status, update.Status, tt.wantAPI) + } + }) + } +} + func TestMagnetURI(t *testing.T) { task := &Task{InfoHash: "abc123"} m := task.MagnetURI()