- task.go: fix deadlock in ToStatusUpdate() — calling Percent() (which RLocks) while already holding RLock caused deadlock when a writer was waiting; compute percent inline instead - usenet.go: fix data race in Cancel() — tracker and taskDir were read without the mutex while Download() writes them under it; read all fields under the same lock - upnp.go: fix UPnP Remove() blocking shutdown — run cleanup in goroutine with 10s deadline (removeNATPMP worst case is 3s dial + 5s deadline) - daemon.go: add path traversal protection for stream requests — validate sr.FilePath is within configured directories before os.Stat; defends against compromised API server sending arbitrary paths - client.go: add wakeClient without timeout for long-poll wake endpoint where context controls cancellation - sync.go: trigger immediate sync when entering watching mode so stream requests are picked up without waiting for the next scheduled interval
481 lines
12 KiB
Go
481 lines
12 KiB
Go
package engine
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/torrentclaw/unarr/internal/agent"
|
|
"github.com/torrentclaw/unarr/internal/config"
|
|
"github.com/torrentclaw/unarr/internal/usenet/download"
|
|
"github.com/torrentclaw/unarr/internal/usenet/nntp"
|
|
"github.com/torrentclaw/unarr/internal/usenet/nzb"
|
|
"github.com/torrentclaw/unarr/internal/usenet/postprocess"
|
|
)
|
|
|
|
// activeDownload holds the state for a single in-progress usenet download.
|
|
type activeDownload struct {
|
|
cancel context.CancelFunc
|
|
taskDir string // populated after MkdirAll; empty before
|
|
tracker *download.ProgressTracker // populated after tracker creation; nil before
|
|
}
|
|
|
|
// UsenetDownloader downloads via Usenet/NZB protocol.
|
|
// It searches for NZBs, downloads articles via NNTP, and assembles the final files.
|
|
type UsenetDownloader struct {
|
|
apiClient *agent.Client
|
|
enabled bool // set during initialization based on features
|
|
|
|
mu sync.Mutex
|
|
nntpClient *nntp.Client
|
|
active map[string]*activeDownload
|
|
|
|
// Cached credentials
|
|
credentials *agent.UsenetCredentials
|
|
credExpiry time.Time
|
|
|
|
// Cached NZB search results (from Available → Download)
|
|
nzbCache map[string]*agent.NzbSearchResult // taskID → best result
|
|
nzbCacheMu sync.RWMutex
|
|
}
|
|
|
|
// NewUsenetDownloader creates a usenet downloader.
|
|
// apiClient is used to call the web API for NZB search, download, and credentials.
|
|
func NewUsenetDownloader(apiClient *agent.Client) *UsenetDownloader {
|
|
return &UsenetDownloader{
|
|
apiClient: apiClient,
|
|
enabled: true,
|
|
active: make(map[string]*activeDownload),
|
|
nzbCache: make(map[string]*agent.NzbSearchResult),
|
|
}
|
|
}
|
|
|
|
func (u *UsenetDownloader) Method() DownloadMethod { return MethodUsenet }
|
|
|
|
// SetEnabled controls whether usenet downloads are available.
|
|
func (u *UsenetDownloader) SetEnabled(enabled bool) {
|
|
u.mu.Lock()
|
|
u.enabled = enabled
|
|
u.mu.Unlock()
|
|
}
|
|
|
|
// Available checks if a usenet download is possible for this task.
|
|
// Searches NZB indexers by IMDb ID or title and caches the result.
|
|
func (u *UsenetDownloader) Available(ctx context.Context, task *Task) (bool, error) {
|
|
u.mu.Lock()
|
|
enabled := u.enabled
|
|
u.mu.Unlock()
|
|
|
|
if !enabled {
|
|
return false, nil
|
|
}
|
|
|
|
// Need at least an IMDb ID or title to search
|
|
if task.IMDbID == "" && task.Title == "" {
|
|
return false, nil
|
|
}
|
|
|
|
// If task has pre-resolved NZB ID, it's available
|
|
if task.NzbID != "" {
|
|
return true, nil
|
|
}
|
|
|
|
// Search NZB indexers
|
|
result, err := u.searchBestNzb(ctx, task)
|
|
if err != nil {
|
|
return false, nil // search failure = not available (don't error out)
|
|
}
|
|
if result == nil {
|
|
return false, nil
|
|
}
|
|
|
|
// Cache for Download()
|
|
u.nzbCacheMu.Lock()
|
|
u.nzbCache[task.ID] = result
|
|
u.nzbCacheMu.Unlock()
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// Download performs the full usenet download pipeline:
|
|
// search NZB → download NZB file → parse → NNTP download → assemble → post-process.
|
|
func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir string, progressCh chan<- Progress) (*Result, error) {
|
|
// Create cancellable context
|
|
dlCtx, cancel := context.WithCancel(ctx)
|
|
|
|
dl := &activeDownload{cancel: cancel}
|
|
u.mu.Lock()
|
|
u.active[task.ID] = dl
|
|
u.mu.Unlock()
|
|
|
|
defer func() {
|
|
u.mu.Lock()
|
|
delete(u.active, task.ID)
|
|
u.mu.Unlock()
|
|
cancel()
|
|
}()
|
|
|
|
shortID := task.ID
|
|
if len(shortID) > 8 {
|
|
shortID = shortID[:8]
|
|
}
|
|
|
|
// Step 1: Get NZB ID (from cache, task, or search)
|
|
nzbID, nzbTitle, err := u.resolveNzbID(dlCtx, task)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("resolve NZB: %w", err)
|
|
}
|
|
|
|
log.Printf("[%s] NZB: %s", shortID, nzbTitle)
|
|
|
|
// Step 2: Download NZB file (or use cached version for resume)
|
|
resumeDir := filepath.Join(config.DataDir(), "resume")
|
|
nzbCachePath := filepath.Join(resumeDir, task.ID+".nzb")
|
|
|
|
nzbData, err := os.ReadFile(nzbCachePath)
|
|
if err != nil {
|
|
// Not cached — download from server
|
|
nzbData, err = u.apiClient.DownloadNzb(dlCtx, nzbID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("download NZB: %w", err)
|
|
}
|
|
// Cache for future resume (best-effort — download still works without cache)
|
|
if mkErr := os.MkdirAll(resumeDir, 0o755); mkErr != nil {
|
|
log.Printf("[%s] resume dir create failed: %v", shortID, mkErr)
|
|
} else if wErr := os.WriteFile(nzbCachePath, nzbData, 0o644); wErr != nil {
|
|
log.Printf("[%s] NZB cache write failed: %v", shortID, wErr)
|
|
}
|
|
} else {
|
|
log.Printf("[%s] using cached NZB", shortID)
|
|
}
|
|
|
|
// Step 3: Parse NZB
|
|
nzbFile, err := nzb.ParseBytes(nzbData)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse NZB: %w", err)
|
|
}
|
|
|
|
totalBytes := nzbFile.TotalBytes()
|
|
totalSegs := nzbFile.TotalSegments()
|
|
log.Printf("[%s] NZB parsed: %d files, %d segments, %s",
|
|
shortID, len(nzbFile.Files), totalSegs, formatBytes(totalBytes))
|
|
|
|
// Step 3.5: Resume support — load or create progress tracker
|
|
tracker := download.NewProgressTracker(task.ID, nzbFile, resumeDir)
|
|
resumed, _ := tracker.Load()
|
|
if resumed {
|
|
log.Printf("[%s] resuming usenet download (%d/%d segments completed)",
|
|
shortID, tracker.TotalCompleted(), totalSegs)
|
|
}
|
|
|
|
// Always flush progress on exit — covers graceful shutdown, SIGTERM,
|
|
// error returns, and shutdown-timeout scenarios. The atomic write
|
|
// (tmp+rename) ensures the file is never corrupted even on hard kill.
|
|
defer tracker.Flush()
|
|
|
|
// Step 4: Get NNTP credentials and connect
|
|
creds, err := u.getCredentials(dlCtx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get credentials: %w", err)
|
|
}
|
|
|
|
nntpClient, err := u.getOrCreateNNTP(dlCtx, creds)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NNTP connect: %w", err)
|
|
}
|
|
|
|
log.Printf("[%s] NNTP: %s", shortID, nntpClient.Status())
|
|
|
|
// Step 5: Create download directory for this task
|
|
taskDir := filepath.Join(outputDir, sanitizeDir(task.Title))
|
|
if err := os.MkdirAll(taskDir, 0o755); err != nil {
|
|
return nil, fmt.Errorf("create dir: %w", err)
|
|
}
|
|
|
|
// Register tracker and taskDir for Cancel() cleanup
|
|
u.mu.Lock()
|
|
dl.taskDir = taskDir
|
|
dl.tracker = tracker
|
|
u.mu.Unlock()
|
|
|
|
// Step 6: Download all files via NNTP
|
|
segDl := download.NewDownloader(nntpClient)
|
|
|
|
// Bridge download.Progress to engine.Progress
|
|
dlProgressCh := make(chan download.Progress, 16)
|
|
go func() {
|
|
for dp := range dlProgressCh {
|
|
p := Progress{
|
|
DownloadedBytes: dp.BytesDownloaded,
|
|
TotalBytes: dp.BytesTotal,
|
|
SpeedBps: dp.SpeedBps,
|
|
FileName: dp.FileName,
|
|
}
|
|
if dp.BytesTotal > 0 {
|
|
p.ETA = int(float64(dp.BytesTotal-dp.BytesDownloaded) / float64(max(dp.SpeedBps, 1)))
|
|
}
|
|
task.UpdateProgress(p)
|
|
select {
|
|
case progressCh <- p:
|
|
default:
|
|
}
|
|
}
|
|
}()
|
|
|
|
downloadedFiles, err := segDl.DownloadNZB(dlCtx, nzbFile, taskDir, tracker, dlProgressCh)
|
|
close(dlProgressCh)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NNTP download: %w", err)
|
|
}
|
|
|
|
// Step 7: Post-processing (par2, extract, cleanup)
|
|
log.Printf("[%s] post-processing...", shortID)
|
|
|
|
// Use password from NZB meta (embedded in file), or from task (user-provided)
|
|
password := nzbFile.Password
|
|
if task.NzbPassword != "" {
|
|
password = task.NzbPassword // user-provided overrides NZB meta
|
|
}
|
|
if password != "" {
|
|
log.Printf("[%s] NZB has password: %s", shortID, password)
|
|
}
|
|
ppResult, err := postprocess.Process(taskDir, downloadedFiles, postprocess.Options{
|
|
Password: password,
|
|
Cleanup: true,
|
|
})
|
|
if err != nil {
|
|
// Password error is special — report clearly
|
|
if _, ok := err.(*postprocess.PasswordError); ok {
|
|
return nil, fmt.Errorf("archive is password protected (set password in download options)")
|
|
}
|
|
return nil, fmt.Errorf("post-process: %w", err)
|
|
}
|
|
|
|
if ppResult.Repaired {
|
|
log.Printf("[%s] par2: repair was needed and successful", shortID)
|
|
}
|
|
if ppResult.Extracted {
|
|
log.Printf("[%s] extracted archive", shortID)
|
|
}
|
|
|
|
finalPath := ppResult.FinalPath
|
|
if finalPath == "" {
|
|
// Fallback: use the task directory
|
|
finalPath = taskDir
|
|
}
|
|
|
|
// Get final file size
|
|
var finalSize int64
|
|
if fi, err := os.Stat(finalPath); err == nil {
|
|
finalSize = fi.Size()
|
|
}
|
|
|
|
// Clean up resume state on successful completion
|
|
tracker.Remove()
|
|
|
|
return &Result{
|
|
FilePath: finalPath,
|
|
FileName: filepath.Base(finalPath),
|
|
Method: MethodUsenet,
|
|
Size: finalSize,
|
|
}, nil
|
|
}
|
|
|
|
// Pause cancels an in-progress download but keeps files.
|
|
func (u *UsenetDownloader) Pause(taskID string) error {
|
|
u.mu.Lock()
|
|
dl := u.active[taskID]
|
|
u.mu.Unlock()
|
|
if dl != nil {
|
|
dl.cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Cancel aborts an in-progress download and removes partial files + resume state.
|
|
func (u *UsenetDownloader) Cancel(taskID string) error {
|
|
// Read all fields under the lock — Download() writes tracker and taskDir under
|
|
// the same lock, so we must hold it while reading to avoid a data race.
|
|
u.mu.Lock()
|
|
dl := u.active[taskID]
|
|
var tracker *download.ProgressTracker
|
|
var taskDir string
|
|
if dl != nil {
|
|
tracker = dl.tracker
|
|
taskDir = dl.taskDir
|
|
}
|
|
u.mu.Unlock()
|
|
|
|
if dl == nil {
|
|
return nil
|
|
}
|
|
|
|
// Cancel context first — workers will stop and release file handles
|
|
dl.cancel()
|
|
|
|
// Remove resume state (best-effort)
|
|
if tracker != nil {
|
|
tracker.Remove()
|
|
}
|
|
|
|
// Remove partial download directory in background (can be slow for large dirs)
|
|
if taskDir != "" {
|
|
go os.RemoveAll(taskDir)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Shutdown closes the NNTP connection pool.
|
|
func (u *UsenetDownloader) Shutdown(_ context.Context) error {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
// Cancel all active downloads
|
|
for id, dl := range u.active {
|
|
dl.cancel()
|
|
delete(u.active, id)
|
|
}
|
|
|
|
// Close NNTP
|
|
if u.nntpClient != nil {
|
|
u.nntpClient.Close()
|
|
u.nntpClient = nil
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// --- Internal helpers ---
|
|
|
|
func (u *UsenetDownloader) searchBestNzb(ctx context.Context, task *Task) (*agent.NzbSearchResult, error) {
|
|
params := agent.NzbSearchParams{
|
|
Limit: 10,
|
|
}
|
|
|
|
if task.IMDbID != "" {
|
|
params.IMDbID = task.IMDbID
|
|
} else {
|
|
params.Query = task.Title
|
|
}
|
|
|
|
resp, err := u.apiClient.SearchNzbs(ctx, params)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if len(resp.Results) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Pick best match: prefer largest size (likely best quality), then most grabs
|
|
best := &resp.Results[0]
|
|
for i := 1; i < len(resp.Results); i++ {
|
|
r := &resp.Results[i]
|
|
if r.Size > best.Size {
|
|
best = r
|
|
} else if r.Size == best.Size && r.Grabs > best.Grabs {
|
|
best = r
|
|
}
|
|
}
|
|
|
|
return best, nil
|
|
}
|
|
|
|
func (u *UsenetDownloader) resolveNzbID(ctx context.Context, task *Task) (string, string, error) {
|
|
// Priority 1: Task has pre-resolved NZB ID
|
|
if task.NzbID != "" {
|
|
return task.NzbID, task.Title, nil
|
|
}
|
|
|
|
// Priority 2: Check cache from Available()
|
|
u.nzbCacheMu.RLock()
|
|
cached, ok := u.nzbCache[task.ID]
|
|
u.nzbCacheMu.RUnlock()
|
|
if ok {
|
|
// Clean cache entry
|
|
u.nzbCacheMu.Lock()
|
|
delete(u.nzbCache, task.ID)
|
|
u.nzbCacheMu.Unlock()
|
|
return cached.NzbID, cached.Title, nil
|
|
}
|
|
|
|
// Priority 3: Search now
|
|
result, err := u.searchBestNzb(ctx, task)
|
|
if err != nil {
|
|
return "", "", err
|
|
}
|
|
if result == nil {
|
|
return "", "", fmt.Errorf("no NZB found for %q (IMDb: %s)", task.Title, task.IMDbID)
|
|
}
|
|
return result.NzbID, result.Title, nil
|
|
}
|
|
|
|
func (u *UsenetDownloader) getCredentials(ctx context.Context) (*agent.UsenetCredentials, error) {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
// Use cached credentials if still valid
|
|
if u.credentials != nil && time.Now().Before(u.credExpiry) {
|
|
return u.credentials, nil
|
|
}
|
|
|
|
creds, err := u.apiClient.GetUsenetCredentials(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
u.credentials = creds
|
|
u.credExpiry = time.Now().Add(5 * time.Minute)
|
|
return creds, nil
|
|
}
|
|
|
|
func (u *UsenetDownloader) getOrCreateNNTP(ctx context.Context, creds *agent.UsenetCredentials) (*nntp.Client, error) {
|
|
u.mu.Lock()
|
|
defer u.mu.Unlock()
|
|
|
|
if u.nntpClient != nil {
|
|
return u.nntpClient, nil
|
|
}
|
|
|
|
maxConns := creds.MaxConnections
|
|
if maxConns <= 0 {
|
|
maxConns = 10
|
|
}
|
|
|
|
client := nntp.NewClient(nntp.Config{
|
|
Host: creds.Host,
|
|
Port: creds.Port,
|
|
SSL: creds.SSL,
|
|
TLSServerName: creds.TLSServerName,
|
|
Username: creds.Username,
|
|
Password: creds.Password,
|
|
MaxConnections: maxConns,
|
|
})
|
|
|
|
if err := client.Connect(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
u.nntpClient = client
|
|
return client, nil
|
|
}
|
|
|
|
func sanitizeDir(name string) string {
|
|
if name == "" {
|
|
return "usenet_download"
|
|
}
|
|
for _, c := range []string{"/", "\\", ":", "*", "?", "\"", "<", ">", "|"} {
|
|
name = strings.ReplaceAll(name, c, "_")
|
|
}
|
|
if len(name) > 200 {
|
|
name = name[:200]
|
|
}
|
|
return name
|
|
}
|