diff --git a/internal/agent/client.go b/internal/agent/client.go index 3861096..f6189ee 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -40,26 +40,50 @@ func (c *Client) Register(ctx context.Context, req RegisterRequest) (*RegisterRe return &resp, nil } -// Heartbeat sends a periodic keep-alive signal. -func (c *Client) Heartbeat(ctx context.Context, req HeartbeatRequest) error { - var resp StatusResponse +// Heartbeat sends a periodic keep-alive signal and returns server directives. +func (c *Client) Heartbeat(ctx context.Context, req HeartbeatRequest) (*HeartbeatResponse, error) { + var resp HeartbeatResponse if err := c.doPost(ctx, "/api/internal/agent/heartbeat", req, &resp); err != nil { - return fmt.Errorf("heartbeat: %w", err) + return nil, fmt.Errorf("heartbeat: %w", err) } - return nil + return &resp, nil } // ClaimTasks polls for pending download tasks and claims them atomically. -func (c *Client) ClaimTasks(ctx context.Context, agentID string) ([]Task, error) { +// Also returns any stream requests for completed downloads. +func (c *Client) ClaimTasks(ctx context.Context, agentID string) (*TasksResponse, error) { url := fmt.Sprintf("/api/internal/agent/tasks?agentId=%s", agentID) var resp TasksResponse if err := c.doGet(ctx, url, &resp); err != nil { return nil, fmt.Errorf("claim tasks: %w", err) } - return resp.Tasks, nil + return &resp, nil } // ReportStatus reports download progress or completion for a task. +// Deregister notifies the server that the agent is shutting down. +func (c *Client) Deregister(ctx context.Context, agentID string) error { + req := struct { + AgentID string `json:"agentId"` + }{AgentID: agentID} + var resp StatusResponse + if err := c.doPost(ctx, "/api/internal/agent/deregister", req, &resp); err != nil { + return fmt.Errorf("deregister: %w", err) + } + return nil +} + +// ReportUpgradeResult reports the outcome of a self-upgrade attempt. +func (c *Client) ReportUpgradeResult(ctx context.Context, result UpgradeResult) error { + var resp struct { + Success bool `json:"success"` + } + if err := c.doPost(ctx, "/api/internal/agent/upgrade-result", result, &resp); err != nil { + return fmt.Errorf("report upgrade: %w", err) + } + return nil +} + // ReportStatus reports download progress. Returns server-side flags the CLI must act on. func (c *Client) ReportStatus(ctx context.Context, update StatusUpdate) (*StatusResponse, error) { var resp StatusResponse @@ -69,6 +93,66 @@ func (c *Client) ReportStatus(ctx context.Context, update StatusUpdate) (*Status return &resp, nil } +// --------------------------------------------------------------------------- +// Usenet endpoints +// --------------------------------------------------------------------------- + +// SearchNzbs searches NZB indexers for matching content. +func (c *Client) SearchNzbs(ctx context.Context, params NzbSearchParams) (*NzbSearchResponse, error) { + var resp NzbSearchResponse + if err := c.doPost(ctx, "/api/internal/agent/nzb-search", params, &resp); err != nil { + return nil, fmt.Errorf("nzb search: %w", err) + } + return &resp, nil +} + +// DownloadNzb downloads the NZB file for the given nzbId. +// Returns the raw NZB XML bytes. +func (c *Client) DownloadNzb(ctx context.Context, nzbID string) ([]byte, error) { + url := fmt.Sprintf("/api/internal/agent/nzb-download?nzbId=%s", nzbID) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+url, nil) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + c.setHeaders(req) + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<16)) + return nil, fmt.Errorf("nzb download error %d: %s", resp.StatusCode, string(body)) + } + + data, err := io.ReadAll(io.LimitReader(resp.Body, 100<<20)) // 100MB limit + if err != nil { + return nil, fmt.Errorf("read nzb: %w", err) + } + return data, nil +} + +// GetUsenetCredentials fetches NNTP connection credentials. +func (c *Client) GetUsenetCredentials(ctx context.Context) (*UsenetCredentials, error) { + var resp UsenetCredentials + if err := c.doGet(ctx, "/api/internal/agent/usenet-credentials", &resp); err != nil { + return nil, fmt.Errorf("usenet credentials: %w", err) + } + return &resp, nil +} + +// GetUsenetUsage fetches current month's usenet quota usage. +func (c *Client) GetUsenetUsage(ctx context.Context) (*UsenetUsageResponse, error) { + var resp UsenetUsageResponse + if err := c.doGet(ctx, "/api/internal/agent/usenet-usage", &resp); err != nil { + return nil, fmt.Errorf("usenet usage: %w", err) + } + return &resp, nil +} + // doPost sends a JSON POST request and decodes the response. func (c *Client) doPost(ctx context.Context, path string, body any, dst any) error { jsonBody, err := json.Marshal(body) diff --git a/internal/agent/types.go b/internal/agent/types.go index ce132a0..1ab20fd 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -66,6 +66,8 @@ type Task struct { Mode string `json:"mode,omitempty"` // download | stream DirectURL string `json:"directUrl,omitempty"` // HTTPS download URL (debrid, etc.) DirectFileName string `json:"directFileName,omitempty"` // Original filename from direct URL + NzbID string `json:"nzbId,omitempty"` // Pre-resolved NZB ID from server + NzbPassword string `json:"nzbPassword,omitempty"` // Password for encrypted NZB archives } // TasksResponse wraps the array of tasks returned by the server. @@ -141,3 +143,57 @@ type AgentInfo struct { LastPollAt time.Time ActiveTasks int } + +// --------------------------------------------------------------------------- +// Usenet types +// --------------------------------------------------------------------------- + +// UsenetCredentials holds NNTP connection details for the CLI. +type UsenetCredentials struct { + Host string `json:"host"` + Port int `json:"port"` + SSL bool `json:"ssl"` + TLSServerName string `json:"tlsServerName,omitempty"` // override for cert validation (e.g., "xsnews.nl") + Username string `json:"username"` + Password string `json:"password"` + MaxConnections int `json:"maxConnections"` +} + +// NzbSearchParams defines search criteria for NZB indexers. +type NzbSearchParams struct { + Query string `json:"query,omitempty"` + IMDbID string `json:"imdbId,omitempty"` + TVDbID string `json:"tvdbId,omitempty"` + Season *int `json:"season,omitempty"` + Episode *int `json:"episode,omitempty"` + Limit int `json:"limit,omitempty"` +} + +// NzbSearchResult represents a single NZB found by the indexer. +type NzbSearchResult struct { + Title string `json:"title"` + NzbID string `json:"nzbId"` + Category string `json:"category"` + Size int64 `json:"size"` + PublishedAt string `json:"publishedAt"` + Grabs int `json:"grabs"` + Group string `json:"group"` + Poster string `json:"poster"` + Attributes map[string]string `json:"attributes"` +} + +// NzbSearchResponse wraps search results. +type NzbSearchResponse struct { + Results []NzbSearchResult `json:"results"` + Total int `json:"total"` + Offset int `json:"offset"` +} + +// UsenetUsageResponse holds quota information. +type UsenetUsageResponse struct { + UsedBytes int64 `json:"usedBytes"` + QuotaBytes int64 `json:"quotaBytes"` + PercentUsed float64 `json:"percentUsed"` + RemainingBytes int64 `json:"remainingBytes"` + QuotaResetDate string `json:"quotaResetDate"` +} diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index e2cc59a..7eea9e3 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -26,8 +26,17 @@ func newStartCmd() *cobra.Command { Short: "Start the download daemon (foreground)", Long: `Start the unarr daemon in the foreground. -Registers with the server, polls for download tasks, and executes them -using the configured download method. Press Ctrl+C to stop gracefully.`, +Registers with the server, receives download tasks via WebSocket (with +HTTP fallback), and executes them using the configured download method. +Supports torrent, debrid, and usenet downloads concurrently. + +The daemon sends periodic heartbeats and reports download progress back +to the web dashboard. Press Ctrl+C to stop gracefully — active downloads +get up to 30 seconds to finish. + +Requires: API key, agent ID, and download directory (run 'unarr setup' first). + +To run as a background service, use 'unarr daemon install' instead.`, Example: ` unarr start unarr start --config /path/to/config.toml`, RunE: func(cmd *cobra.Command, args []string) error { @@ -41,9 +50,21 @@ func newStopCmd() *cobra.Command { return &cobra.Command{ Use: "stop", Short: "Stop the running daemon", + Long: `Stop the unarr daemon. + +If running in the foreground, press Ctrl+C in the terminal where it was started. +If installed as a system service, use your OS service manager: + + Linux (systemd): systemctl --user stop unarr + macOS (launchd): launchctl unload ~/Library/LaunchAgents/com.torrentclaw.unarr.plist`, + Example: ` unarr stop`, RunE: func(cmd *cobra.Command, args []string) error { fmt.Println(" Use Ctrl+C in the terminal where the daemon is running.") - fmt.Println(" (Signal-based stop coming in a future release)") + fmt.Println() + fmt.Println(" If installed as a service:") + fmt.Println(" Linux: systemctl --user stop unarr") + fmt.Println(" macOS: launchctl unload ~/Library/LaunchAgents/com.torrentclaw.unarr.plist") + fmt.Println() return nil }, } @@ -52,9 +73,14 @@ func newStopCmd() *cobra.Command { // newDaemonCmd creates `unarr daemon` for administrative subcommands. func newDaemonCmd() *cobra.Command { cmd := &cobra.Command{ - Use: "daemon", - Short: "Daemon administration (install, uninstall, logs)", - Long: "Administrative commands for managing the daemon as a system service.", + Use: "daemon ", + Short: "Manage the daemon as a system service", + Long: `Install or remove unarr as a system service that starts automatically on boot. + + Linux: Creates a systemd user service (~/.config/systemd/user/unarr.service) + macOS: Creates a launchd agent (~/Library/LaunchAgents/com.torrentclaw.unarr.plist)`, + Example: ` unarr daemon install + unarr daemon uninstall`, } cmd.AddCommand( diff --git a/internal/engine/task.go b/internal/engine/task.go index 44f7dff..bb6ace7 100644 --- a/internal/engine/task.go +++ b/internal/engine/task.go @@ -48,6 +48,8 @@ type Task struct { PreferredMethod string // auto | torrent | debrid | usenet DirectURL string // HTTPS download URL (debrid, etc.) DirectFileName string // Original filename from direct URL + NzbID string // Pre-resolved NZB ID (usenet) + NzbPassword string // Password for encrypted NZB archives // Runtime state Status TaskStatus @@ -84,6 +86,8 @@ func NewTaskFromAgent(at agent.Task) *Task { PreferredMethod: at.PreferredMethod, DirectURL: at.DirectURL, DirectFileName: at.DirectFileName, + NzbID: at.NzbID, + NzbPassword: at.NzbPassword, Mode: mode, Status: StatusClaimed, ClaimedAt: time.Now(), diff --git a/internal/engine/usenet.go b/internal/engine/usenet.go index bde6696..f81f81a 100644 --- a/internal/engine/usenet.go +++ b/internal/engine/usenet.go @@ -3,24 +3,406 @@ package engine import ( "context" "fmt" + "log" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/torrentclaw/torrentclaw-cli/internal/agent" + "github.com/torrentclaw/torrentclaw-cli/internal/ui" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/download" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/nntp" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/nzb" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/postprocess" ) // UsenetDownloader downloads via Usenet/NZB protocol. -// Currently a stub — not implemented. -type UsenetDownloader struct{} +// It searches for NZBs, downloads articles via NNTP, and assembles the final files. +type UsenetDownloader struct { + apiClient *agent.Client + enabled bool // set during initialization based on features -func NewUsenetDownloader() *UsenetDownloader { return &UsenetDownloader{} } + mu sync.Mutex + nntpClient *nntp.Client + active map[string]context.CancelFunc + + // Cached credentials + credentials *agent.UsenetCredentials + credExpiry time.Time + + // Cached NZB search results (from Available → Download) + nzbCache map[string]*agent.NzbSearchResult // taskID → best result + nzbCacheMu sync.RWMutex +} + +// NewUsenetDownloader creates a usenet downloader. +// apiClient is used to call the web API for NZB search, download, and credentials. +func NewUsenetDownloader(apiClient *agent.Client) *UsenetDownloader { + return &UsenetDownloader{ + apiClient: apiClient, + enabled: true, + active: make(map[string]context.CancelFunc), + nzbCache: make(map[string]*agent.NzbSearchResult), + } +} func (u *UsenetDownloader) Method() DownloadMethod { return MethodUsenet } -func (u *UsenetDownloader) Available(_ context.Context, _ *Task) (bool, error) { - return false, nil // always unavailable until implemented +// SetEnabled controls whether usenet downloads are available. +func (u *UsenetDownloader) SetEnabled(enabled bool) { + u.mu.Lock() + u.enabled = enabled + u.mu.Unlock() } -func (u *UsenetDownloader) Download(_ context.Context, _ *Task, _ string, _ chan<- Progress) (*Result, error) { - return nil, fmt.Errorf("usenet download not implemented yet (coming in a future release)") +// Available checks if a usenet download is possible for this task. +// Searches NZB indexers by IMDb ID or title and caches the result. +func (u *UsenetDownloader) Available(ctx context.Context, task *Task) (bool, error) { + u.mu.Lock() + enabled := u.enabled + u.mu.Unlock() + + if !enabled { + return false, nil + } + + // Need at least an IMDb ID or title to search + if task.IMDbID == "" && task.Title == "" { + return false, nil + } + + // If task has pre-resolved NZB ID, it's available + if task.NzbID != "" { + return true, nil + } + + // Search NZB indexers + result, err := u.searchBestNzb(ctx, task) + if err != nil { + return false, nil // search failure = not available (don't error out) + } + if result == nil { + return false, nil + } + + // Cache for Download() + u.nzbCacheMu.Lock() + u.nzbCache[task.ID] = result + u.nzbCacheMu.Unlock() + + return true, nil +} + +// Download performs the full usenet download pipeline: +// search NZB → download NZB file → parse → NNTP download → assemble → post-process. +func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir string, progressCh chan<- Progress) (*Result, error) { + // Create cancellable context + dlCtx, cancel := context.WithCancel(ctx) + + u.mu.Lock() + u.active[task.ID] = cancel + u.mu.Unlock() + + defer func() { + u.mu.Lock() + delete(u.active, task.ID) + u.mu.Unlock() + cancel() + }() + + shortID := task.ID + if len(shortID) > 8 { + shortID = shortID[:8] + } + + // Step 1: Get NZB ID (from cache, task, or search) + nzbID, nzbTitle, err := u.resolveNzbID(dlCtx, task) + if err != nil { + return nil, fmt.Errorf("resolve NZB: %w", err) + } + + log.Printf("[%s] NZB: %s", shortID, nzbTitle) + + // Step 2: Download NZB file + nzbData, err := u.apiClient.DownloadNzb(dlCtx, nzbID) + if err != nil { + return nil, fmt.Errorf("download NZB: %w", err) + } + + // Step 3: Parse NZB + nzbFile, err := nzb.ParseBytes(nzbData) + if err != nil { + return nil, fmt.Errorf("parse NZB: %w", err) + } + + totalBytes := nzbFile.TotalBytes() + totalSegs := nzbFile.TotalSegments() + log.Printf("[%s] NZB parsed: %d files, %d segments, %s", + shortID, len(nzbFile.Files), totalSegs, ui.FormatBytes(totalBytes)) + + // Step 4: Get NNTP credentials and connect + creds, err := u.getCredentials(dlCtx) + if err != nil { + return nil, fmt.Errorf("get credentials: %w", err) + } + + nntpClient, err := u.getOrCreateNNTP(dlCtx, creds) + if err != nil { + return nil, fmt.Errorf("NNTP connect: %w", err) + } + + log.Printf("[%s] NNTP: %s", shortID, nntpClient.Status()) + + // Step 5: Create download directory for this task + taskDir := filepath.Join(outputDir, sanitizeDir(task.Title)) + if err := os.MkdirAll(taskDir, 0o755); err != nil { + return nil, fmt.Errorf("create dir: %w", err) + } + + // Step 6: Download all files via NNTP + dl := download.NewDownloader(nntpClient) + + // Bridge download.Progress to engine.Progress + dlProgressCh := make(chan download.Progress, 16) + go func() { + for dp := range dlProgressCh { + p := Progress{ + DownloadedBytes: dp.BytesDownloaded, + TotalBytes: dp.BytesTotal, + SpeedBps: dp.SpeedBps, + FileName: dp.FileName, + } + if dp.BytesTotal > 0 { + p.ETA = int(float64(dp.BytesTotal-dp.BytesDownloaded) / float64(max(dp.SpeedBps, 1))) + } + task.UpdateProgress(p) + select { + case progressCh <- p: + default: + } + } + }() + + downloadedFiles, err := dl.DownloadNZB(dlCtx, nzbFile, taskDir, dlProgressCh) + close(dlProgressCh) + + if err != nil { + return nil, fmt.Errorf("NNTP download: %w", err) + } + + // Step 7: Post-processing (par2, extract, cleanup) + log.Printf("[%s] post-processing...", shortID) + + // Use password from NZB meta (embedded in file), or from task (user-provided) + password := nzbFile.Password + if task.NzbPassword != "" { + password = task.NzbPassword // user-provided overrides NZB meta + } + if password != "" { + log.Printf("[%s] NZB has password: %s", shortID, password) + } + ppResult, err := postprocess.Process(taskDir, downloadedFiles, postprocess.Options{ + Password: password, + Cleanup: true, + }) + if err != nil { + // Password error is special — report clearly + if _, ok := err.(*postprocess.PasswordError); ok { + return nil, fmt.Errorf("archive is password protected (set password in download options)") + } + return nil, fmt.Errorf("post-process: %w", err) + } + + if ppResult.Repaired { + log.Printf("[%s] par2: repair was needed and successful", shortID) + } + if ppResult.Extracted { + log.Printf("[%s] extracted archive", shortID) + } + + finalPath := ppResult.FinalPath + if finalPath == "" { + // Fallback: use the task directory + finalPath = taskDir + } + + // Get final file size + var finalSize int64 + if fi, err := os.Stat(finalPath); err == nil { + finalSize = fi.Size() + } + + return &Result{ + FilePath: finalPath, + FileName: filepath.Base(finalPath), + Method: MethodUsenet, + Size: finalSize, + }, nil +} + +// Pause cancels an in-progress download but keeps files. +func (u *UsenetDownloader) Pause(taskID string) error { + u.mu.Lock() + cancel, ok := u.active[taskID] + u.mu.Unlock() + if ok { + cancel() + } + return nil +} + +// Cancel aborts an in-progress download. +func (u *UsenetDownloader) Cancel(taskID string) error { + return u.Pause(taskID) +} + +// Shutdown closes the NNTP connection pool. +func (u *UsenetDownloader) Shutdown(_ context.Context) error { + u.mu.Lock() + defer u.mu.Unlock() + + // Cancel all active downloads + for id, cancel := range u.active { + cancel() + delete(u.active, id) + } + + // Close NNTP + if u.nntpClient != nil { + u.nntpClient.Close() + u.nntpClient = nil + } + + return nil +} + +// --- Internal helpers --- + +func (u *UsenetDownloader) searchBestNzb(ctx context.Context, task *Task) (*agent.NzbSearchResult, error) { + params := agent.NzbSearchParams{ + Limit: 10, + } + + if task.IMDbID != "" { + params.IMDbID = task.IMDbID + } else { + params.Query = task.Title + } + + resp, err := u.apiClient.SearchNzbs(ctx, params) + if err != nil { + return nil, err + } + + if len(resp.Results) == 0 { + return nil, nil + } + + // Pick best match: prefer largest size (likely best quality), then most grabs + best := &resp.Results[0] + for i := 1; i < len(resp.Results); i++ { + r := &resp.Results[i] + if r.Size > best.Size { + best = r + } else if r.Size == best.Size && r.Grabs > best.Grabs { + best = r + } + } + + return best, nil +} + +func (u *UsenetDownloader) resolveNzbID(ctx context.Context, task *Task) (string, string, error) { + // Priority 1: Task has pre-resolved NZB ID + if task.NzbID != "" { + return task.NzbID, task.Title, nil + } + + // Priority 2: Check cache from Available() + u.nzbCacheMu.RLock() + cached, ok := u.nzbCache[task.ID] + u.nzbCacheMu.RUnlock() + if ok { + // Clean cache entry + u.nzbCacheMu.Lock() + delete(u.nzbCache, task.ID) + u.nzbCacheMu.Unlock() + return cached.NzbID, cached.Title, nil + } + + // Priority 3: Search now + result, err := u.searchBestNzb(ctx, task) + if err != nil { + return "", "", err + } + if result == nil { + return "", "", fmt.Errorf("no NZB found for %q (IMDb: %s)", task.Title, task.IMDbID) + } + return result.NzbID, result.Title, nil +} + +func (u *UsenetDownloader) getCredentials(ctx context.Context) (*agent.UsenetCredentials, error) { + u.mu.Lock() + defer u.mu.Unlock() + + // Use cached credentials if still valid + if u.credentials != nil && time.Now().Before(u.credExpiry) { + return u.credentials, nil + } + + creds, err := u.apiClient.GetUsenetCredentials(ctx) + if err != nil { + return nil, err + } + + u.credentials = creds + u.credExpiry = time.Now().Add(5 * time.Minute) + return creds, nil +} + +func (u *UsenetDownloader) getOrCreateNNTP(ctx context.Context, creds *agent.UsenetCredentials) (*nntp.Client, error) { + u.mu.Lock() + defer u.mu.Unlock() + + if u.nntpClient != nil { + return u.nntpClient, nil + } + + maxConns := creds.MaxConnections + if maxConns <= 0 { + maxConns = 10 + } + + client := nntp.NewClient(nntp.Config{ + Host: creds.Host, + Port: creds.Port, + SSL: creds.SSL, + TLSServerName: creds.TLSServerName, + Username: creds.Username, + Password: creds.Password, + MaxConnections: maxConns, + }) + + if err := client.Connect(ctx); err != nil { + return nil, err + } + + u.nntpClient = client + return client, nil +} + +func sanitizeDir(name string) string { + if name == "" { + return "usenet_download" + } + for _, c := range []string{"/", "\\", ":", "*", "?", "\"", "<", ">", "|"} { + name = strings.ReplaceAll(name, c, "_") + } + if len(name) > 200 { + name = name[:200] + } + return name } -func (u *UsenetDownloader) Pause(_ string) error { return nil } -func (u *UsenetDownloader) Cancel(_ string) error { return nil } -func (u *UsenetDownloader) Shutdown(_ context.Context) error { return nil } diff --git a/internal/usenet/download/downloader.go b/internal/usenet/download/downloader.go new file mode 100644 index 0000000..caabc44 --- /dev/null +++ b/internal/usenet/download/downloader.go @@ -0,0 +1,308 @@ +package download + +import ( + "bytes" + "context" + "fmt" + "log" + "os" + "path/filepath" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/torrentclaw/torrentclaw-cli/internal/ui" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/nntp" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/nzb" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/yenc" +) + +// Progress is emitted during download. +type Progress struct { + FileName string + SegmentsDone int + SegmentsTotal int + BytesDownloaded int64 + BytesTotal int64 + SpeedBps int64 +} + +// Downloader orchestrates downloading all segments of NZB files via NNTP. +type Downloader struct { + nntp *nntp.Client +} + +// NewDownloader creates a usenet segment downloader. +func NewDownloader(nntpClient *nntp.Client) *Downloader { + return &Downloader{nntp: nntpClient} +} + +// DownloadFile downloads all segments of a single NZB file and assembles them. +// Returns the path to the assembled file. +func (d *Downloader) DownloadFile(ctx context.Context, file nzb.File, outputDir string, progressCh chan<- Progress) (string, error) { + fileName := file.Filename() + if fileName == "" { + fileName = fmt.Sprintf("usenet_%d", time.Now().UnixNano()) + } + + destPath := filepath.Join(outputDir, fileName) + + // Ensure output directory exists + if err := os.MkdirAll(outputDir, 0o755); err != nil { + return "", fmt.Errorf("mkdir: %w", err) + } + + totalBytes := file.TotalBytes() + totalSegs := len(file.Segments) + + // Sort segments by number + segments := make([]nzb.Segment, len(file.Segments)) + copy(segments, file.Segments) + sort.Slice(segments, func(i, j int) bool { + return segments[i].Number < segments[j].Number + }) + + // Create/open output file + outFile, err := os.Create(destPath) + if err != nil { + return "", fmt.Errorf("create file: %w", err) + } + defer outFile.Close() + + // Pre-allocate file if we know the size + if totalBytes > 0 { + outFile.Truncate(totalBytes) + } + + // Download segments using worker pool + var downloaded atomic.Int64 + var segsDone atomic.Int32 + startTime := time.Now() + + // Create work channel + type segWork struct { + seg nzb.Segment + index int + } + workCh := make(chan segWork, len(segments)) + for i, seg := range segments { + workCh <- segWork{seg: seg, index: i} + } + close(workCh) + + // Track file offsets for each segment (sequential assembly) + offsets := make([]int64, len(segments)) + var offset int64 + for i, seg := range segments { + offsets[i] = offset + offset += seg.Bytes + } + + // Progress reporter goroutine + stopProgress := make(chan struct{}) + go func() { + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + dl := downloaded.Load() + elapsed := time.Since(startTime).Seconds() + var speed int64 + if elapsed > 0 { + speed = int64(float64(dl) / elapsed) + } + if progressCh != nil { + select { + case progressCh <- Progress{ + FileName: fileName, + SegmentsDone: int(segsDone.Load()), + SegmentsTotal: totalSegs, + BytesDownloaded: dl, + BytesTotal: totalBytes, + SpeedBps: speed, + }: + default: + } + } + case <-stopProgress: + return + case <-ctx.Done(): + return + } + } + }() + + // Workers — one per NNTP connection + numWorkers := d.nntp.ActiveConnections() + if numWorkers <= 0 { + numWorkers = 1 + } + + errCh := make(chan error, 1) + var wg sync.WaitGroup + + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for work := range workCh { + select { + case <-ctx.Done(): + return + default: + } + + data, err := d.downloadSegment(ctx, work.seg) + if err != nil { + select { + case errCh <- fmt.Errorf("segment %d (%s): %w", work.seg.Number, work.seg.MessageID, err): + default: + } + return + } + + // Write decoded data at the correct offset + // WriteAt is safe for concurrent non-overlapping writes + _, writeErr := outFile.WriteAt(data, offsets[work.index]) + + if writeErr != nil { + select { + case errCh <- fmt.Errorf("write segment %d: %w", work.seg.Number, writeErr): + default: + } + return + } + + downloaded.Add(int64(len(data))) + segsDone.Add(1) + } + }() + } + + // Wait for all workers + wg.Wait() + + // Stop progress reporter before sending final progress + close(stopProgress) + + // Check for errors + select { + case err := <-errCh: + os.Remove(destPath) + return "", err + default: + } + + // Check context cancellation + if ctx.Err() != nil { + os.Remove(destPath) + return "", ctx.Err() + } + + // Final progress report + dl := downloaded.Load() + elapsed := time.Since(startTime).Seconds() + var speed int64 + if elapsed > 0 { + speed = int64(float64(dl) / elapsed) + } + if progressCh != nil { + select { + case progressCh <- Progress{ + FileName: fileName, + SegmentsDone: totalSegs, + SegmentsTotal: totalSegs, + BytesDownloaded: dl, + BytesTotal: totalBytes, + SpeedBps: speed, + }: + default: + } + } + + // Truncate to actual size (in case pre-allocation was larger) + actualSize := downloaded.Load() + if actualSize > 0 { + outFile.Truncate(actualSize) + } + + log.Printf("[usenet] downloaded %s (%d segments, %s)", fileName, totalSegs, ui.FormatBytes(actualSize)) + return destPath, nil +} + +// DownloadNZB downloads content files from an NZB (rars or direct content). +// Par2 files are NOT downloaded initially — they're only fetched on demand +// if extraction fails (via DownloadPar2). +// Returns a map of filename → filepath for all downloaded files. +func (d *Downloader) DownloadNZB(ctx context.Context, n *nzb.NZB, outputDir string, progressCh chan<- Progress) (map[string]string, error) { + // Determine which files to download (NO par2 initially) + var filesToDownload []nzb.File + + if n.HasRars() { + filesToDownload = n.RarFiles() + } else { + filesToDownload = n.ContentFiles() + } + + if len(filesToDownload) == 0 { + return nil, fmt.Errorf("no downloadable files found in NZB") + } + + results := make(map[string]string) + + for _, file := range filesToDownload { + select { + case <-ctx.Done(): + return results, ctx.Err() + default: + } + + path, err := d.DownloadFile(ctx, file, outputDir, progressCh) + if err != nil { + return results, fmt.Errorf("download %s: %w", file.Filename(), err) + } + results[file.Filename()] = path + } + + return results, nil +} + +// DownloadPar2 downloads par2 parity files from the NZB. +// Called on-demand when extraction/verification fails. +func (d *Downloader) DownloadPar2(ctx context.Context, n *nzb.NZB, outputDir string, progressCh chan<- Progress) (map[string]string, error) { + par2Files := n.Par2Files() + if len(par2Files) == 0 { + return nil, fmt.Errorf("no par2 files in NZB") + } + + results := make(map[string]string) + for _, file := range par2Files { + path, err := d.DownloadFile(ctx, file, outputDir, progressCh) + if err != nil { + log.Printf("[usenet] par2 download failed (non-fatal): %v", err) + continue + } + results[file.Filename()] = path + } + return results, nil +} + +// downloadSegment downloads and decodes a single segment. +func (d *Downloader) downloadSegment(ctx context.Context, seg nzb.Segment) ([]byte, error) { + // Download article body via NNTP + body, err := d.nntp.Body(ctx, seg.MessageID) + if err != nil { + return nil, fmt.Errorf("nntp body: %w", err) + } + + // Decode yEnc + part, err := yenc.Decode(bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("yenc decode: %w", err) + } + + return part.Data, nil +} + diff --git a/internal/usenet/download/e2e_test.go b/internal/usenet/download/e2e_test.go new file mode 100644 index 0000000..b6ecffb --- /dev/null +++ b/internal/usenet/download/e2e_test.go @@ -0,0 +1,177 @@ +package download_test + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/download" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/nntp" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/nzb" + "github.com/torrentclaw/torrentclaw-cli/internal/usenet/postprocess" +) + +// TestE2EDownload is a real end-to-end test that downloads from Usenet. +// It requires: +// - NZB file at /tmp/oppenheimer-test.nzb +// - NNTP credentials in env: NNTP_USER, NNTP_PASS +// - Network access to reader.torrentclaw.com:563 +// +// Run with: go test -v -run TestE2EDownload -tags e2e -timeout 30m ./internal/usenet/download/ +func TestE2EDownload(t *testing.T) { + if os.Getenv("NNTP_USER") == "" || os.Getenv("NNTP_PASS") == "" { + t.Skip("NNTP_USER and NNTP_PASS not set — skipping e2e test") + } + + nzbPath := os.Getenv("NZB_FILE") + if nzbPath == "" { + nzbPath = "/tmp/oppenheimer-test.nzb" + } + + // 1. Parse NZB + f, err := os.Open(nzbPath) + if err != nil { + t.Fatalf("open NZB: %v", err) + } + defer f.Close() + + nzbFile, err := nzb.Parse(f) + if err != nil { + t.Fatalf("parse NZB: %v", err) + } + + t.Logf("NZB: %d files, %d total segments, %.2f GB", + len(nzbFile.Files), nzbFile.TotalSegments(), + float64(nzbFile.TotalBytes())/1024/1024/1024) + + if nzbFile.Password != "" { + t.Logf("NZB password: %s", nzbFile.Password) + } + + t.Logf("Has rars: %v, Has par2: %v", nzbFile.HasRars(), nzbFile.HasPar2()) + + for _, file := range nzbFile.Files { + t.Logf(" %s — %d segments, %.1f MB", + file.Filename(), len(file.Segments), + float64(file.TotalBytes())/1024/1024) + } + + // 2. Connect NNTP + client := nntp.NewClient(nntp.Config{ + Host: "reader.torrentclaw.com", + Port: 563, + SSL: true, + TLSServerName: "xsnews.nl", + Username: os.Getenv("NNTP_USER"), + Password: os.Getenv("NNTP_PASS"), + MaxConnections: 10, + }) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer cancel() + + t.Log("Connecting NNTP (10 connections)...") + if err := client.Connect(ctx); err != nil { + t.Fatalf("NNTP connect: %v", err) + } + defer client.Close() + t.Logf("NNTP connected: %s", client.Status()) + + // 3. Download all files + outputDir, err := os.MkdirTemp("", "usenet-e2e-*") + if err != nil { + t.Fatalf("tmpdir: %v", err) + } + t.Logf("Output dir: %s", outputDir) + // Don't cleanup automatically — let user inspect + // defer os.RemoveAll(outputDir) + + dl := download.NewDownloader(client) + + progressCh := make(chan download.Progress, 64) + go func() { + for p := range progressCh { + pct := 0 + if p.BytesTotal > 0 { + pct = int(float64(p.BytesDownloaded) / float64(p.BytesTotal) * 100) + } + fmt.Fprintf(os.Stderr, "\r [%s] %d%% — %s/%s @ %s/s (%d/%d segs) ", + p.FileName, + pct, + formatSize(p.BytesDownloaded), + formatSize(p.BytesTotal), + formatSize(p.SpeedBps), + p.SegmentsDone, p.SegmentsTotal) + } + fmt.Fprintln(os.Stderr) + }() + + downloadedFiles, err := dl.DownloadNZB(ctx, nzbFile, outputDir, progressCh) + close(progressCh) + if err != nil { + t.Fatalf("download: %v", err) + } + + t.Logf("Downloaded %d files:", len(downloadedFiles)) + for name, path := range downloadedFiles { + fi, _ := os.Stat(path) + size := int64(0) + if fi != nil { + size = fi.Size() + } + t.Logf(" %s → %s (%.1f MB)", name, path, float64(size)/1024/1024) + } + + // 4. Post-process + t.Log("Post-processing...") + result, err := postprocess.Process(outputDir, downloadedFiles, postprocess.Options{ + Password: nzbFile.Password, + Cleanup: true, + }) + if err != nil { + t.Fatalf("post-process: %v", err) + } + + t.Logf("Post-process result:") + t.Logf(" Final path: %s", result.FinalPath) + t.Logf(" Repaired: %v", result.Repaired) + t.Logf(" Extracted: %v", result.Extracted) + t.Logf(" Files: %v", result.Files) + + // Verify final file exists and has size + if result.FinalPath != "" { + fi, err := os.Stat(result.FinalPath) + if err != nil { + t.Errorf("final file stat: %v", err) + } else { + t.Logf(" Final size: %.2f GB", float64(fi.Size())/1024/1024/1024) + } + } + + // List all files in output dir + t.Log("Final directory contents:") + filepath.Walk(outputDir, func(path string, info os.FileInfo, err error) error { + if err != nil || info.IsDir() { + return nil + } + rel, _ := filepath.Rel(outputDir, path) + t.Logf(" %s (%.1f MB)", rel, float64(info.Size())/1024/1024) + return nil + }) +} + +func formatSize(b int64) string { + const unit = 1024 + if b < unit { + return fmt.Sprintf("%d B", b) + } + div, exp := int64(unit), 0 + for n := b / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %cB", float64(b)/float64(div), "KMGTPE"[exp]) +} diff --git a/internal/usenet/nntp/client.go b/internal/usenet/nntp/client.go new file mode 100644 index 0000000..61793e9 --- /dev/null +++ b/internal/usenet/nntp/client.go @@ -0,0 +1,386 @@ +package nntp + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "fmt" + "io" + "net" + "net/textproto" + "sync" + "time" +) + +// Config holds NNTP server connection parameters. +type Config struct { + Host string + Port int + SSL bool + TLSServerName string // override for TLS cert validation (e.g., "xsnews.nl" when Host is a CNAME) + Username string + Password string + MaxConnections int // default 10 +} + +// Client manages a pool of authenticated NNTP connections. +type Client struct { + cfg Config + pool chan *conn + mu sync.Mutex + open int + done chan struct{} // closed on Close() +} + +// conn is a single NNTP connection. +type conn struct { + tp *textproto.Conn + raw net.Conn + closed bool +} + +// NewClient creates a new NNTP client (does not connect yet). +func NewClient(cfg Config) *Client { + if cfg.MaxConnections <= 0 { + cfg.MaxConnections = 10 + } + return &Client{ + cfg: cfg, + pool: make(chan *conn, cfg.MaxConnections), + done: make(chan struct{}), + } +} + +// Connect opens and authenticates all connections in the pool. +// Safe to call again after a previous Connect failure. +func (c *Client) Connect(ctx context.Context) error { + // Reset done channel if previously closed (allows retry after failure) + select { + case <-c.done: + c.done = make(chan struct{}) + default: + } + + for i := 0; i < c.cfg.MaxConnections; i++ { + conn, err := c.dial(ctx) + if err != nil { + // Close any connections we already opened, but keep client reusable + c.drainPool() + return fmt.Errorf("nntp: connect %d/%d: %w", i+1, c.cfg.MaxConnections, err) + } + c.pool <- conn + c.mu.Lock() + c.open++ + c.mu.Unlock() + } + return nil +} + +// drainPool closes all connections in the pool without closing the done channel. +func (c *Client) drainPool() { + for { + select { + case cn := <-c.pool: + c.closeConn(cn) + default: + c.mu.Lock() + c.open = 0 + c.mu.Unlock() + return + } + } +} + +// Body downloads the body of an NNTP article by message-ID. +// Returns the raw body reader (typically yEnc encoded). +// The caller MUST call release() when done reading. +func (c *Client) Body(ctx context.Context, messageID string) ([]byte, error) { + cn, err := c.acquire(ctx) + if err != nil { + return nil, err + } + + data, err := c.bodyOnConn(ctx, cn, messageID) + if err != nil { + // Connection might be broken, try to reconnect + cn2, reconErr := c.reconnect(ctx, cn) + if reconErr != nil { + c.discard(cn) + return nil, fmt.Errorf("nntp: body failed and reconnect failed: %w (original: %v)", reconErr, err) + } + // Retry once on the fresh connection + data, err = c.bodyOnConn(ctx, cn2, messageID) + if err != nil { + c.release(cn2) + return nil, err + } + c.release(cn2) + return data, nil + } + + c.release(cn) + return data, nil +} + +// ActiveConnections returns the number of open connections. +func (c *Client) ActiveConnections() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.open +} + +// Close shuts down all connections in the pool. +func (c *Client) Close() error { + select { + case <-c.done: + return nil // already closed + default: + close(c.done) + } + + // Drain pool and close connections + for { + select { + case cn := <-c.pool: + c.closeConn(cn) + default: + return nil + } + } +} + +// --- Internal --- + +func (c *Client) dial(ctx context.Context) (*conn, error) { + addr := fmt.Sprintf("%s:%d", c.cfg.Host, c.cfg.Port) + + dialer := &net.Dialer{Timeout: 30 * time.Second} + + var rawConn net.Conn + var err error + + if c.cfg.SSL { + // Use TLSServerName if set (e.g., cert is for xsnews.nl but host is reader.torrentclaw.com) + serverName := c.cfg.TLSServerName + if serverName == "" { + serverName = c.cfg.Host + } + tlsCfg := &tls.Config{ + ServerName: serverName, + MinVersion: tls.VersionTLS12, + } + rawConn, err = tls.DialWithDialer(dialer, "tcp", addr, tlsCfg) + } else { + rawConn, err = dialer.DialContext(ctx, "tcp", addr) + } + if err != nil { + return nil, fmt.Errorf("dial %s: %w", addr, err) + } + + tp := textproto.NewConn(rawConn) + cn := &conn{tp: tp, raw: rawConn} + + // Read welcome banner (200 or 201) + code, msg, err := tp.ReadCodeLine(200) + if err != nil { + // Also accept 201 (posting not allowed) + if code != 201 { + rawConn.Close() + return nil, fmt.Errorf("welcome: %d %s: %w", code, msg, err) + } + } + + // Authenticate + if c.cfg.Username != "" { + if err := c.auth(tp); err != nil { + rawConn.Close() + return nil, fmt.Errorf("auth: %w", err) + } + } + + return cn, nil +} + +func (c *Client) auth(tp *textproto.Conn) error { + id, err := tp.Cmd("AUTHINFO USER %s", c.cfg.Username) + if err != nil { + return err + } + tp.StartResponse(id) + code, msg, err := tp.ReadCodeLine(381) + tp.EndResponse(id) + if err != nil { + // 281 means no password required (unlikely but valid) + if code == 281 { + return nil + } + return fmt.Errorf("AUTHINFO USER: %d %s: %w", code, msg, err) + } + + id, err = tp.Cmd("AUTHINFO PASS %s", c.cfg.Password) + if err != nil { + return err + } + tp.StartResponse(id) + code, msg, err = tp.ReadCodeLine(281) + tp.EndResponse(id) + if err != nil { + return fmt.Errorf("AUTHINFO PASS: %d %s: %w", code, msg, err) + } + + return nil +} + +func (c *Client) bodyOnConn(ctx context.Context, cn *conn, messageID string) ([]byte, error) { + // Set deadline from context + deadline, hasDeadline := ctx.Deadline() + if !hasDeadline { + deadline = time.Now().Add(60 * time.Second) + } + cn.raw.SetDeadline(deadline) + defer cn.raw.SetDeadline(time.Time{}) + + // Send BODY command + id, err := cn.tp.Cmd("BODY <%s>", messageID) + if err != nil { + return nil, fmt.Errorf("BODY cmd: %w", err) + } + + cn.tp.StartResponse(id) + defer cn.tp.EndResponse(id) + + // Read response code + code, msg, err := cn.tp.ReadCodeLine(222) + if err != nil { + if code == 430 { + return nil, &ArticleNotFoundError{MessageID: messageID} + } + return nil, fmt.Errorf("BODY response: %d %s: %w", code, msg, err) + } + + // Read dot-terminated body + body, err := readDotBody(cn.tp.R) + if err != nil { + // Partial read leaves textproto in broken state — mark connection as dead + cn.closed = true + return nil, fmt.Errorf("read body: %w", err) + } + + return body, nil +} + +// readDotBody reads a dot-terminated text block from the NNTP server. +// Lines beginning with a dot have the dot removed (dot-stuffing). +// The final ".\r\n" line signals the end. +func readDotBody(r *bufio.Reader) ([]byte, error) { + var buf bytes.Buffer + + for { + line, err := r.ReadBytes('\n') + if err != nil { + if err == io.EOF { + break + } + return nil, err + } + + // Trim trailing \r\n + line = bytes.TrimRight(line, "\r\n") + + // Check for terminator: single dot + if len(line) == 1 && line[0] == '.' { + break + } + + // Dot-unstuffing: remove leading dot if present + if len(line) > 0 && line[0] == '.' { + line = line[1:] + } + + buf.Write(line) + buf.WriteByte('\n') + } + + return buf.Bytes(), nil +} + +func (c *Client) acquire(ctx context.Context) (*conn, error) { + for { + select { + case cn := <-c.pool: + if cn.closed { + c.mu.Lock() + c.open-- + c.mu.Unlock() + continue // discard and try next + } + return cn, nil + case <-ctx.Done(): + return nil, ctx.Err() + case <-c.done: + return nil, fmt.Errorf("nntp: client closed") + } + } +} + +func (c *Client) release(cn *conn) { + if cn == nil || cn.closed { + return + } + select { + case c.pool <- cn: + default: + // Pool full, close the connection + c.closeConn(cn) + } +} + +func (c *Client) discard(cn *conn) { + c.closeConn(cn) + c.mu.Lock() + c.open-- + c.mu.Unlock() +} + +func (c *Client) reconnect(ctx context.Context, old *conn) (*conn, error) { + c.closeConn(old) + newConn, err := c.dial(ctx) + if err != nil { + c.mu.Lock() + c.open-- + c.mu.Unlock() + return nil, err + } + return newConn, nil +} + +func (c *Client) closeConn(cn *conn) { + if cn == nil || cn.closed { + return + } + cn.closed = true + // Best-effort QUIT + cn.tp.Cmd("QUIT") + cn.raw.Close() +} + +// ArticleNotFoundError is returned when the server responds with 430. +type ArticleNotFoundError struct { + MessageID string +} + +func (e *ArticleNotFoundError) Error() string { + return fmt.Sprintf("nntp: article not found: %s", e.MessageID) +} + +// Status returns a human-readable status string. +func (c *Client) Status() string { + c.mu.Lock() + open := c.open + c.mu.Unlock() + + pooled := len(c.pool) + return fmt.Sprintf("%d connections (%d pooled) to %s:%d", open, pooled, c.cfg.Host, c.cfg.Port) +} + diff --git a/internal/usenet/nzb/parser.go b/internal/usenet/nzb/parser.go new file mode 100644 index 0000000..b82a5f9 --- /dev/null +++ b/internal/usenet/nzb/parser.go @@ -0,0 +1,365 @@ +package nzb + +import ( + "encoding/xml" + "fmt" + "io" + "path/filepath" + "regexp" + "strconv" + "strings" +) + +// NZB represents a parsed NZB file containing one or more files to download. +type NZB struct { + Files []File + Password string // from in + Meta map[string]string // all entries from +} + +// File represents a single file within an NZB, composed of multiple segments. +type File struct { + Poster string + Date int64 + Subject string + Groups []string + Segments []Segment +} + +// Segment represents a single NNTP article segment of a file. +type Segment struct { + Bytes int64 + Number int + MessageID string // message-id without angle brackets +} + +// xmlNZB is the raw XML structure for parsing. +type xmlNZB struct { + XMLName xml.Name `xml:"nzb"` + Head xmlHead `xml:"head"` + Files []xmlFile `xml:"file"` +} + +type xmlHead struct { + Meta []xmlMeta `xml:"meta"` +} + +type xmlMeta struct { + Type string `xml:"type,attr"` + Value string `xml:",chardata"` +} + +type xmlFile struct { + Poster string `xml:"poster,attr"` + Date string `xml:"date,attr"` + Subject string `xml:"subject,attr"` + Groups xmlGroups `xml:"groups"` + Segments xmlSegments `xml:"segments"` +} + +type xmlGroups struct { + Groups []string `xml:"group"` +} + +type xmlSegments struct { + Segments []xmlSegment `xml:"segment"` +} + +type xmlSegment struct { + Bytes string `xml:"bytes,attr"` + Number string `xml:"number,attr"` + MessageID string `xml:",chardata"` +} + +// Parse reads and parses an NZB XML document from the given reader. +func Parse(r io.Reader) (*NZB, error) { + var raw xmlNZB + dec := xml.NewDecoder(r) + if err := dec.Decode(&raw); err != nil { + return nil, fmt.Errorf("nzb: xml decode: %w", err) + } + + if len(raw.Files) == 0 { + return nil, fmt.Errorf("nzb: no files found") + } + + nzb := &NZB{ + Files: make([]File, 0, len(raw.Files)), + Meta: make(map[string]string), + } + + // Parse meta entries + for _, m := range raw.Head.Meta { + if m.Type != "" { + nzb.Meta[m.Type] = strings.TrimSpace(m.Value) + } + } + nzb.Password = nzb.Meta["password"] + + for _, rf := range raw.Files { + date, _ := strconv.ParseInt(rf.Date, 10, 64) + + segs := make([]Segment, 0, len(rf.Segments.Segments)) + for _, rs := range rf.Segments.Segments { + bytes, _ := strconv.ParseInt(rs.Bytes, 10, 64) + num, _ := strconv.Atoi(rs.Number) + msgID := strings.TrimSpace(rs.MessageID) + // Strip angle brackets if present + msgID = strings.TrimPrefix(msgID, "<") + msgID = strings.TrimSuffix(msgID, ">") + + if msgID == "" { + continue + } + + segs = append(segs, Segment{ + Bytes: bytes, + Number: num, + MessageID: msgID, + }) + } + + if len(segs) == 0 { + continue + } + + nzb.Files = append(nzb.Files, File{ + Poster: rf.Poster, + Date: date, + Subject: rf.Subject, + Groups: rf.Groups.Groups, + Segments: segs, + }) + } + + if len(nzb.Files) == 0 { + return nil, fmt.Errorf("nzb: no valid files with segments found") + } + + return nzb, nil +} + +// ParseBytes parses an NZB from a byte slice. +func ParseBytes(data []byte) (*NZB, error) { + return Parse(strings.NewReader(string(data))) +} + +// TotalBytes returns the total size of all segments across all files. +func (n *NZB) TotalBytes() int64 { + var total int64 + for _, f := range n.Files { + total += f.TotalBytes() + } + return total +} + +// TotalSegments returns the total number of segments across all files. +func (n *NZB) TotalSegments() int { + var total int + for _, f := range n.Files { + total += len(f.Segments) + } + return total +} + +// ContentFiles returns files that are likely content (video, audio, images), +// excluding par2, nfo, sfv, nzb, and sample files. +func (n *NZB) ContentFiles() []File { + var result []File + for _, f := range n.Files { + name := f.Filename() + if isMetadataFile(name) || isSampleFile(name) { + continue + } + result = append(result, f) + } + return result +} + +// Par2Files returns only par2 parity files. +func (n *NZB) Par2Files() []File { + var result []File + for _, f := range n.Files { + ext := strings.ToLower(filepath.Ext(f.Filename())) + if ext == ".par2" { + result = append(result, f) + } + } + return result +} + +// RarFiles returns rar archive files (.rar, .rNN, .NNN). +func (n *NZB) RarFiles() []File { + var result []File + for _, f := range n.Files { + if isRarFile(f.Filename()) { + result = append(result, f) + } + } + return result +} + +// LargestFile returns the file with the most total bytes. +// Returns nil if NZB has no files. +func (n *NZB) LargestFile() *File { + if len(n.Files) == 0 { + return nil + } + largest := &n.Files[0] + for i := 1; i < len(n.Files); i++ { + if n.Files[i].TotalBytes() > largest.TotalBytes() { + largest = &n.Files[i] + } + } + return largest +} + +// IsObfuscated returns true if the NZB filenames appear to be obfuscated +// (random strings instead of meaningful names). +func (n *NZB) IsObfuscated() bool { + for _, f := range n.Files { + name := f.Filename() + if name == "" { + continue + } + base := strings.TrimSuffix(name, filepath.Ext(name)) + // Check if base name is mostly hex/random chars (obfuscated) + if len(base) > 10 && isHexLike(base) { + return true + } + } + return false +} + +// HasRars returns true if the NZB contains rar archive files. +func (n *NZB) HasRars() bool { + for _, f := range n.Files { + if isRarFile(f.Filename()) { + return true + } + } + return false +} + +// HasPar2 returns true if the NZB contains par2 parity files. +func (n *NZB) HasPar2() bool { + for _, f := range n.Files { + ext := strings.ToLower(filepath.Ext(f.Filename())) + if ext == ".par2" { + return true + } + } + return false +} + +// TotalBytes returns the sum of all segment sizes in this file. +func (f *File) TotalBytes() int64 { + var total int64 + for _, s := range f.Segments { + total += s.Bytes + } + return total +} + +// subjectFilenameRe matches the filename in a typical Usenet subject line. +// Examples: +// "Movie.2024.1080p.mkv" yEnc (1/50) +// [PRiVATE]-[#a]- "file.rar" yEnc (01/99) +var subjectFilenameRe = regexp.MustCompile(`"([^"]+)"`) + +// Filename extracts the filename from the subject line. +// Falls back to the raw subject if no quoted filename is found. +func (f *File) Filename() string { + m := subjectFilenameRe.FindStringSubmatch(f.Subject) + if len(m) >= 2 { + return m[1] + } + // Fallback: try to extract something useful + return sanitizeFilename(f.Subject) +} + +// Extension returns the lowercase file extension (e.g., ".mkv", ".rar"). +func (f *File) Extension() string { + return strings.ToLower(filepath.Ext(f.Filename())) +} + +// isMetadataFile returns true for non-content files. +func isMetadataFile(name string) bool { + ext := strings.ToLower(filepath.Ext(name)) + switch ext { + case ".par2", ".nfo", ".sfv", ".nzb", ".txt", ".jpg", ".png", ".url": + return true + } + return false +} + +// isSampleFile returns true for sample/preview files. +// Matches filenames containing "sample" as a word boundary (e.g., "movie.sample.mkv", "Sample/video.mkv"). +func isSampleFile(name string) bool { + lower := strings.ToLower(name) + // Match "sample" preceded and followed by non-alphanumeric (word boundary) + idx := strings.Index(lower, "sample") + if idx < 0 { + return false + } + // Check it's not part of a larger word (e.g., "resampled") + if idx > 0 && isAlphaNum(lower[idx-1]) { + return false + } + end := idx + 6 + if end < len(lower) && isAlphaNum(lower[end]) { + return false + } + return true +} + +func isAlphaNum(b byte) bool { + return (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9') +} + +// isRarFile returns true for rar archive files. +func isRarFile(name string) bool { + lower := strings.ToLower(name) + ext := filepath.Ext(lower) + if ext == ".rar" { + return true + } + // Match .r00, .r01, ..., .r99 and .s00, .s01 + if len(ext) == 4 && (ext[1] == 'r' || ext[1] == 's') { + _, err := strconv.Atoi(ext[2:]) + return err == nil + } + // Match .001, .002, etc (split rar) + if len(ext) == 4 { + _, err := strconv.Atoi(ext[1:]) + return err == nil + } + return false +} + +// isHexLike returns true if the string looks like random hex/obfuscated. +func isHexLike(s string) bool { + hexChars := 0 + for _, c := range s { + if (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F') { + hexChars++ + } + } + return float64(hexChars)/float64(len(s)) > 0.8 +} + +var yencPartRe = regexp.MustCompile(`\s*\(\d+/\d+\)\s*`) + +// sanitizeFilename removes characters that are invalid in filenames. +func sanitizeFilename(s string) string { + // Remove yEnc part indicators like (01/50) + s = yencPartRe.ReplaceAllString(s, "") + // Remove yEnc keyword + s = strings.ReplaceAll(s, "yEnc", "") + s = strings.TrimSpace(s) + // Remove invalid path chars + for _, c := range []string{"/", "\\", ":", "*", "?", "\"", "<", ">", "|"} { + s = strings.ReplaceAll(s, c, "_") + } + return s +} diff --git a/internal/usenet/nzb/parser_test.go b/internal/usenet/nzb/parser_test.go new file mode 100644 index 0000000..8d0d686 --- /dev/null +++ b/internal/usenet/nzb/parser_test.go @@ -0,0 +1,269 @@ +package nzb + +import ( + "strings" + "testing" +) + +const testNZB = ` + + + + + alt.binaries.movies + alt.binaries.multimedia + + + abc123@news.example.com + def456@news.example.com + ghi789@news.example.com + + + + + alt.binaries.movies + + + nfo001@news.example.com + + + + + alt.binaries.movies + + + par001@news.example.com + + +` + +const testNZBWithRars = ` + + + alt.binaries.movies + + rar001@example + rar002@example + + + + alt.binaries.movies + + r00001@example + + + + alt.binaries.movies + + r01001@example + + + + alt.binaries.movies + + par001@example + + +` + +func TestParse(t *testing.T) { + nzb, err := Parse(strings.NewReader(testNZB)) + if err != nil { + t.Fatalf("Parse failed: %v", err) + } + + if len(nzb.Files) != 3 { + t.Fatalf("expected 3 files, got %d", len(nzb.Files)) + } + + // First file — the MKV + f := nzb.Files[0] + if f.Poster != "user@example.com" { + t.Errorf("poster: got %q", f.Poster) + } + if f.Date != 1700000000 { + t.Errorf("date: got %d", f.Date) + } + if len(f.Groups) != 2 { + t.Errorf("groups: got %d", len(f.Groups)) + } + if f.Groups[0] != "alt.binaries.movies" { + t.Errorf("group[0]: got %q", f.Groups[0]) + } + if len(f.Segments) != 3 { + t.Errorf("segments: got %d", len(f.Segments)) + } + + seg := f.Segments[0] + if seg.Bytes != 768000 { + t.Errorf("seg bytes: got %d", seg.Bytes) + } + if seg.Number != 1 { + t.Errorf("seg number: got %d", seg.Number) + } + if seg.MessageID != "abc123@news.example.com" { + t.Errorf("seg msgid: got %q", seg.MessageID) + } +} + +func TestParseBytes(t *testing.T) { + nzb, err := ParseBytes([]byte(testNZB)) + if err != nil { + t.Fatalf("ParseBytes failed: %v", err) + } + if len(nzb.Files) != 3 { + t.Fatalf("expected 3 files, got %d", len(nzb.Files)) + } +} + +func TestTotalBytes(t *testing.T) { + nzb, _ := ParseBytes([]byte(testNZB)) + // 768000 + 768000 + 512000 + 4096 + 32768 + expected := int64(768000 + 768000 + 512000 + 4096 + 32768) + if got := nzb.TotalBytes(); got != expected { + t.Errorf("TotalBytes: got %d, want %d", got, expected) + } +} + +func TestTotalSegments(t *testing.T) { + nzb, _ := ParseBytes([]byte(testNZB)) + if got := nzb.TotalSegments(); got != 5 { + t.Errorf("TotalSegments: got %d, want 5", got) + } +} + +func TestContentFiles(t *testing.T) { + nzb, _ := ParseBytes([]byte(testNZB)) + content := nzb.ContentFiles() + if len(content) != 1 { + t.Fatalf("ContentFiles: got %d, want 1", len(content)) + } + if content[0].Filename() != "Movie.2024.1080p.BluRay.x264-GROUP.mkv" { + t.Errorf("content filename: got %q", content[0].Filename()) + } +} + +func TestPar2Files(t *testing.T) { + nzb, _ := ParseBytes([]byte(testNZB)) + par2 := nzb.Par2Files() + if len(par2) != 1 { + t.Fatalf("Par2Files: got %d, want 1", len(par2)) + } +} + +func TestLargestFile(t *testing.T) { + nzb, _ := ParseBytes([]byte(testNZB)) + largest := nzb.LargestFile() + if largest == nil { + t.Fatal("LargestFile returned nil") + } + if largest.Filename() != "Movie.2024.1080p.BluRay.x264-GROUP.mkv" { + t.Errorf("largest file: got %q", largest.Filename()) + } +} + +func TestFilename(t *testing.T) { + tests := []struct { + subject string + expected string + }{ + { + `Movie.2024.1080p [01/50] - "Movie.2024.1080p.mkv" yEnc (1/3200)`, + "Movie.2024.1080p.mkv", + }, + { + `[PRiVATE]-[#a]- "file.rar" yEnc (01/99)`, + "file.rar", + }, + { + `Some subject without quotes (1/1)`, + "Some subject without quotes", + }, + } + + for _, tt := range tests { + f := File{Subject: tt.subject} + if got := f.Filename(); got != tt.expected { + t.Errorf("Filename(%q) = %q, want %q", tt.subject, got, tt.expected) + } + } +} + +func TestExtension(t *testing.T) { + f := File{Subject: `"Movie.2024.1080p.BluRay.x264-GROUP.mkv" yEnc (1/3200)`} + if got := f.Extension(); got != ".mkv" { + t.Errorf("Extension: got %q, want .mkv", got) + } +} + +func TestHasRars(t *testing.T) { + nzb, _ := ParseBytes([]byte(testNZBWithRars)) + if !nzb.HasRars() { + t.Error("HasRars: expected true") + } + if !nzb.HasPar2() { + t.Error("HasPar2: expected true") + } +} + +func TestRarFiles(t *testing.T) { + nzb, _ := ParseBytes([]byte(testNZBWithRars)) + rars := nzb.RarFiles() + if len(rars) != 3 { + t.Fatalf("RarFiles: got %d, want 3", len(rars)) + } +} + +func TestIsRarFile(t *testing.T) { + tests := []struct { + name string + want bool + }{ + {"file.rar", true}, + {"file.r00", true}, + {"file.r99", true}, + {"file.s00", true}, + {"file.001", true}, + {"file.mkv", false}, + {"file.par2", false}, + {"file.nfo", false}, + } + for _, tt := range tests { + if got := isRarFile(tt.name); got != tt.want { + t.Errorf("isRarFile(%q) = %v, want %v", tt.name, got, tt.want) + } + } +} + +func TestParseEmpty(t *testing.T) { + _, err := Parse(strings.NewReader(``)) + if err == nil { + t.Error("expected error for empty NZB") + } +} + +func TestParseInvalidXML(t *testing.T) { + _, err := Parse(strings.NewReader("not xml")) + if err == nil { + t.Error("expected error for invalid XML") + } +} + +func TestStripAngleBrackets(t *testing.T) { + nzbXML := ` + + + alt.test + + <angle@brackets.com> + + +` + nzb, err := ParseBytes([]byte(nzbXML)) + if err != nil { + t.Fatalf("Parse failed: %v", err) + } + if nzb.Files[0].Segments[0].MessageID != "angle@brackets.com" { + t.Errorf("MessageID not stripped: got %q", nzb.Files[0].Segments[0].MessageID) + } +} diff --git a/internal/usenet/postprocess/extract.go b/internal/usenet/postprocess/extract.go new file mode 100644 index 0000000..6388f19 --- /dev/null +++ b/internal/usenet/postprocess/extract.go @@ -0,0 +1,224 @@ +package postprocess + +import ( + "fmt" + "log" + "os" + "os/exec" + "path/filepath" + "regexp" + "strings" +) + +// ExtractorType identifies which extraction tool is available. +type ExtractorType string + +const ( + ExtractorNone ExtractorType = "" + ExtractorUnrar ExtractorType = "unrar" + Extractor7z ExtractorType = "7z" +) + +// FindExtractor checks which archive extractor is available in PATH. +func FindExtractor() (ExtractorType, string) { + if path, err := exec.LookPath("unrar"); err == nil { + return ExtractorUnrar, path + } + if path, err := exec.LookPath("7z"); err == nil { + return Extractor7z, path + } + return ExtractorNone, "" +} + +// Extract extracts an archive using the best available tool. +// password is optional — pass "" if not needed. +// Returns the list of extracted file paths. +func Extract(archivePath string, outputDir string, password string) ([]string, error) { + extType, extPath := FindExtractor() + if extType == ExtractorNone { + return nil, fmt.Errorf("no archive extractor found (install unrar or 7z)") + } + + switch extType { + case ExtractorUnrar: + return extractUnrar(extPath, archivePath, outputDir, password) + case Extractor7z: + return extract7z(extPath, archivePath, outputDir, password) + default: + return nil, fmt.Errorf("unknown extractor: %s", extType) + } +} + +// extractUnrar extracts using unrar. +func extractUnrar(unrarPath, archivePath, outputDir, password string) ([]string, error) { + args := []string{"x", "-o+", "-y"} + if password != "" { + args = append(args, "-p"+password) + } else { + args = append(args, "-p-") // no password, skip asking + } + args = append(args, archivePath, outputDir+"/") + + cmd := exec.Command(unrarPath, args...) + cmd.Dir = outputDir + output, err := cmd.CombinedOutput() + if err != nil { + // Check for password error + outStr := string(output) + if strings.Contains(outStr, "wrong password") || strings.Contains(outStr, "Incorrect password") { + return nil, &PasswordError{Archive: archivePath} + } + return nil, fmt.Errorf("unrar: %w\n%s", err, output) + } + + return listExtractedFiles(outputDir, archivePath) +} + +// extract7z extracts using 7z. +func extract7z(szPath, archivePath, outputDir, password string) ([]string, error) { + args := []string{"x", "-y", "-o" + outputDir} + if password != "" { + args = append(args, "-p"+password) + } else { + args = append(args, "-p") // empty password + } + args = append(args, archivePath) + + cmd := exec.Command(szPath, args...) + cmd.Dir = outputDir + output, err := cmd.CombinedOutput() + if err != nil { + outStr := string(output) + if strings.Contains(outStr, "Wrong password") || strings.Contains(outStr, "incorrect password") { + return nil, &PasswordError{Archive: archivePath} + } + return nil, fmt.Errorf("7z: %w\n%s", err, output) + } + + return listExtractedFiles(outputDir, archivePath) +} + +// IsPasswordProtected checks if a rar archive requires a password. +func IsPasswordProtected(archivePath string) bool { + extType, extPath := FindExtractor() + if extType == ExtractorNone { + return false + } + + switch extType { + case ExtractorUnrar: + cmd := exec.Command(extPath, "t", "-p-", archivePath) + output, err := cmd.CombinedOutput() + if err != nil { + outStr := string(output) + return strings.Contains(outStr, "password") || strings.Contains(outStr, "encrypted") + } + case Extractor7z: + cmd := exec.Command(extPath, "t", "-p", archivePath) + output, err := cmd.CombinedOutput() + if err != nil { + outStr := string(output) + return strings.Contains(outStr, "Wrong password") || strings.Contains(outStr, "encrypted") + } + } + return false +} + +// listExtractedFiles returns new files in outputDir that aren't the archive itself. +func listExtractedFiles(dir, archivePath string) ([]string, error) { + archiveBase := filepath.Base(archivePath) + archiveDir := filepath.Dir(archivePath) + var files []string + + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return nil // skip errors + } + if info.IsDir() { + return nil + } + base := filepath.Base(path) + // Skip archive files themselves + if isArchiveFile(base) && filepath.Dir(path) == archiveDir { + return nil + } + if base == archiveBase { + return nil + } + files = append(files, path) + return nil + }) + return files, err +} + +// Cleanup removes archive and parity files from a directory. +func Cleanup(dir string) error { + entries, err := os.ReadDir(dir) + if err != nil { + return err + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + name := entry.Name() + if isCleanupTarget(name) { + path := filepath.Join(dir, name) + log.Printf("[usenet] cleanup: removing %s", name) + os.Remove(path) + } + } + return nil +} + +// isArchiveFile returns true for rar/split archive files. +func isArchiveFile(name string) bool { + lower := strings.ToLower(name) + ext := filepath.Ext(lower) + + if ext == ".rar" { + return true + } + // .r00, .r01, ... .r99, .s00, etc. + if len(ext) == 4 && (ext[1] == 'r' || ext[1] == 's') { + return isNumeric(ext[2:]) + } + // .001, .002, etc. + if len(ext) == 4 && isNumeric(ext[1:]) { + return true + } + return false +} + +// isCleanupTarget returns true for files that should be removed after extraction. +var cleanupExts = regexp.MustCompile(`(?i)\.(par2|nfo|sfv|nzb|srr|srs|jpg|png|txt|url)$`) +var cleanupRarParts = regexp.MustCompile(`(?i)\.(rar|r\d{2}|s\d{2}|\d{3})$`) + +func isCleanupTarget(name string) bool { + if cleanupExts.MatchString(name) { + return true + } + if cleanupRarParts.MatchString(name) { + return true + } + return false +} + +func isNumeric(s string) bool { + for _, c := range s { + if c < '0' || c > '9' { + return false + } + } + return len(s) > 0 +} + +// PasswordError indicates the archive requires a password. +type PasswordError struct { + Archive string +} + +func (e *PasswordError) Error() string { + return fmt.Sprintf("archive is password protected: %s", e.Archive) +} diff --git a/internal/usenet/postprocess/par2.go b/internal/usenet/postprocess/par2.go new file mode 100644 index 0000000..3eb96c3 --- /dev/null +++ b/internal/usenet/postprocess/par2.go @@ -0,0 +1,65 @@ +package postprocess + +import ( + "fmt" + "log" + "os/exec" + "strings" +) + +// Par2Available checks if par2cmdline is installed. +func Par2Available() bool { + _, err := exec.LookPath("par2") + return err == nil +} + +// Par2Verify verifies files using a par2 file. +// Returns nil if verification passes, error otherwise. +func Par2Verify(par2File string) error { + if !Par2Available() { + log.Printf("[usenet] par2 not installed, skipping verification") + return nil + } + + cmd := exec.Command("par2", "verify", par2File) + output, err := cmd.CombinedOutput() + if err != nil { + outStr := string(output) + // Check if repair is possible + if strings.Contains(outStr, "Repair is possible") { + return &Par2RepairableError{Par2File: par2File} + } + if strings.Contains(outStr, "Repair is not possible") { + return fmt.Errorf("par2: verification failed and repair not possible:\n%s", outStr) + } + return fmt.Errorf("par2 verify: %w\n%s", err, outStr) + } + + log.Printf("[usenet] par2: verification OK") + return nil +} + +// Par2Repair attempts to repair files using par2 parity data. +func Par2Repair(par2File string) error { + if !Par2Available() { + return fmt.Errorf("par2 not installed") + } + + cmd := exec.Command("par2", "repair", par2File) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("par2 repair: %w\n%s", err, output) + } + + log.Printf("[usenet] par2: repair successful") + return nil +} + +// Par2RepairableError indicates verification failed but repair is possible. +type Par2RepairableError struct { + Par2File string +} + +func (e *Par2RepairableError) Error() string { + return fmt.Sprintf("par2: verification failed, repair possible: %s", e.Par2File) +} diff --git a/internal/usenet/postprocess/pipeline.go b/internal/usenet/postprocess/pipeline.go new file mode 100644 index 0000000..de220db --- /dev/null +++ b/internal/usenet/postprocess/pipeline.go @@ -0,0 +1,229 @@ +package postprocess + +import ( + "fmt" + "log" + "os" + "path/filepath" + "regexp" + "sort" + "strings" +) + +// Result holds the outcome of post-processing. +type Result struct { + FinalPath string // path to the main content file (e.g., the video) + Files []string // all final files + Repaired bool // whether par2 repair was needed + Extracted bool // whether archive extraction was performed +} + +// Options configures post-processing behavior. +type Options struct { + Password string // password for encrypted archives (empty = none) + Cleanup bool // remove intermediate files after extraction +} + +// Process runs the full post-processing pipeline on downloaded usenet files. +// Steps: par2 verify → par2 repair → extract archives → cleanup → find main file. +func Process(dir string, downloadedFiles map[string]string, opts Options) (*Result, error) { + result := &Result{} + + // Step 1: Par2 verification and repair + par2File := findPar2File(downloadedFiles) + if par2File != "" { + err := Par2Verify(par2File) + if err != nil { + if _, ok := err.(*Par2RepairableError); ok { + log.Printf("[usenet] attempting par2 repair...") + if repairErr := Par2Repair(par2File); repairErr != nil { + log.Printf("[usenet] par2 repair failed: %v", repairErr) + } else { + result.Repaired = true + } + } else { + log.Printf("[usenet] par2 verification error: %v", err) + } + } + } + + // Step 2: Find and extract archives + rarFile := findFirstRar(downloadedFiles) + if rarFile != "" { + log.Printf("[usenet] extracting archive: %s", filepath.Base(rarFile)) + + // Check if password-protected + if opts.Password == "" && IsPasswordProtected(rarFile) { + return nil, &PasswordError{Archive: rarFile} + } + + extracted, err := Extract(rarFile, dir, opts.Password) + if err != nil { + if _, ok := err.(*PasswordError); ok { + return nil, err + } + return nil, fmt.Errorf("extraction failed: %w", err) + } + + result.Extracted = true + result.Files = extracted + + // Step 3: Cleanup archive + par2 files + if opts.Cleanup { + Cleanup(dir) + } + } else { + // No archives — content files are the final files + for _, path := range downloadedFiles { + if !isCleanupTarget(filepath.Base(path)) { + result.Files = append(result.Files, path) + } + } + + // Cleanup metadata files + if opts.Cleanup { + for name, path := range downloadedFiles { + lower := strings.ToLower(name) + ext := filepath.Ext(lower) + if ext == ".par2" || ext == ".nfo" || ext == ".sfv" || ext == ".nzb" { + log.Printf("[usenet] cleanup: removing %s", name) + os.Remove(path) + } + } + } + } + + // Step 4: Find main content file (largest video file) + result.FinalPath = findMainFile(dir, result.Files) + + if result.FinalPath == "" && len(result.Files) > 0 { + result.FinalPath = result.Files[0] + } + + return result, nil +} + +// findPar2File returns the path of the main .par2 file (not volume sets). +func findPar2File(files map[string]string) string { + var mainPar2 string + var smallestSize int64 = -1 + + for name, path := range files { + ext := strings.ToLower(filepath.Ext(name)) + if ext != ".par2" { + continue + } + // The main par2 file is typically the smallest one (index file) + // Volume par2 files are larger (contain recovery data) + fi, err := os.Stat(path) + if err != nil { + continue + } + if smallestSize < 0 || fi.Size() < smallestSize { + smallestSize = fi.Size() + mainPar2 = path + } + } + return mainPar2 +} + +// firstRarRe matches the first volume of a multi-part rar set. +// Patterns: .part01.rar, .part1.rar, or just .rar (single/first volume) +var firstRarRe = regexp.MustCompile(`(?i)\.part0*1\.rar$`) + +// findFirstRar returns the path to the first rar volume. +// For multi-part rars (part01.rar, part02.rar...), returns part01 specifically. +func findFirstRar(files map[string]string) string { + // Priority 1: Find explicitly named first part (part01.rar, part1.rar) + for _, path := range files { + if firstRarRe.MatchString(path) { + return path + } + } + + // Priority 2: Find the shortest-named .rar file (usually the first volume) + var rarFiles []struct { + name string + path string + } + for name, path := range files { + if strings.HasSuffix(strings.ToLower(name), ".rar") { + rarFiles = append(rarFiles, struct { + name string + path string + }{name, path}) + } + } + if len(rarFiles) > 0 { + sort.Slice(rarFiles, func(i, j int) bool { + return len(rarFiles[i].name) < len(rarFiles[j].name) + }) + return rarFiles[0].path + } + + // Priority 3: .001 split format + for name, path := range files { + if strings.HasSuffix(strings.ToLower(name), ".001") { + return path + } + } + return "" +} + +// findMainFile finds the largest video file in the directory or file list. +func findMainFile(dir string, files []string) string { + videoExts := map[string]bool{ + ".mkv": true, ".mp4": true, ".avi": true, ".mov": true, + ".wmv": true, ".flv": true, ".m4v": true, ".ts": true, + ".webm": true, + } + + var bestPath string + var bestSize int64 + + // First try from the explicit file list + for _, path := range files { + ext := strings.ToLower(filepath.Ext(path)) + if !videoExts[ext] { + continue + } + fi, err := os.Stat(path) + if err != nil { + continue + } + if fi.Size() > bestSize { + bestSize = fi.Size() + bestPath = path + } + } + + if bestPath != "" { + return bestPath + } + + // Fallback: scan directory + entries, err := os.ReadDir(dir) + if err != nil { + return "" + } + for _, entry := range entries { + if entry.IsDir() { + continue + } + ext := strings.ToLower(filepath.Ext(entry.Name())) + if !videoExts[ext] { + continue + } + fi, err := entry.Info() + if err != nil { + continue + } + if fi.Size() > bestSize { + bestSize = fi.Size() + bestPath = filepath.Join(dir, entry.Name()) + } + } + + return bestPath +} + diff --git a/internal/usenet/yenc/decode.go b/internal/usenet/yenc/decode.go new file mode 100644 index 0000000..779d6a6 --- /dev/null +++ b/internal/usenet/yenc/decode.go @@ -0,0 +1,210 @@ +package yenc + +import ( + "bufio" + "bytes" + "fmt" + "hash/crc32" + "io" + "strconv" + "strings" +) + +// Part represents a decoded yEnc part (one NNTP article body). +type Part struct { + Name string // filename from =ybegin + Number int // part number (1-based) + Total int // total parts (from =ybegin total=N) + Begin int64 // byte offset start (from =ypart begin=N, 1-based) + End int64 // byte offset end (from =ypart end=N, inclusive) + Size int64 // total file size (from =ybegin size=N) + CRC32 uint32 // CRC32 of this part's data (from =yend pcrc32) + Data []byte // decoded binary data +} + +// Decode reads a yEnc encoded article body and returns the decoded part. +// The reader should contain the raw article body (after NNTP BODY response). +func Decode(r io.Reader) (*Part, error) { + scanner := bufio.NewScanner(r) + scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024) // up to 10MB per article + + part := &Part{} + + // Phase 1: Find and parse =ybegin header + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "=ybegin ") { + parseYBegin(part, line) + break + } + } + if part.Name == "" && part.Size == 0 { + return nil, fmt.Errorf("yenc: no =ybegin header found") + } + + // Phase 2: Find optional =ypart header (for multipart) + // Peek at next line + if scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "=ypart ") { + parseYPart(part, line) + } else { + // Not a ypart line, decode it as data + part.Data = append(part.Data, decodeLine(line)...) + } + } + + // Phase 3: Decode data lines until =yend + hasher := crc32.NewIEEE() + // Hash data we already decoded (if any from non-ypart line) + if len(part.Data) > 0 { + hasher.Write(part.Data) + } + + for scanner.Scan() { + line := scanner.Text() + + if strings.HasPrefix(line, "=yend") { + parseYEnd(part, line) + break + } + + decoded := decodeLine(line) + hasher.Write(decoded) + part.Data = append(part.Data, decoded...) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("yenc: read error: %w", err) + } + + // Verify CRC32 if provided + if part.CRC32 != 0 { + computed := hasher.Sum32() + if computed != part.CRC32 { + return nil, fmt.Errorf("yenc: CRC32 mismatch: expected %08x, got %08x", part.CRC32, computed) + } + } + + return part, nil +} + +// DecodeBytes decodes a yEnc encoded byte slice. +func DecodeBytes(data []byte) (*Part, error) { + return Decode(bytes.NewReader(data)) +} + +// decodeLine decodes a single line of yEnc data. +// yEnc encoding: each byte = (original + 42) % 256 +// Escape character '=' followed by next byte: (escapedByte - 64 - 42) % 256 +func decodeLine(line string) []byte { + out := make([]byte, 0, len(line)) + escaped := false + + for i := 0; i < len(line); i++ { + b := line[i] + + if escaped { + // Escaped byte: subtract 106 (42 + 64) + out = append(out, b-106) + escaped = false + continue + } + + if b == '=' { + escaped = true + continue + } + + // Normal byte: subtract 42 + out = append(out, b-42) + } + + return out +} + +// parseYBegin parses "=ybegin part=1 total=50 line=128 size=768000 name=file.mkv" +func parseYBegin(p *Part, line string) { + p.Number = getIntParam(line, "part") + p.Total = getIntParam(line, "total") + p.Size = int64(getIntParam(line, "size")) + + // Name is special: it's everything after "name=" to end of line + if idx := strings.Index(line, "name="); idx >= 0 { + p.Name = strings.TrimSpace(line[idx+5:]) + } +} + +// parseYPart parses "=ypart begin=1 end=768000" +func parseYPart(p *Part, line string) { + p.Begin = int64(getIntParam(line, "begin")) + p.End = int64(getIntParam(line, "end")) +} + +// parseYEnd parses "=yend size=768000 part=1 pcrc32=ABCD1234 crc32=ABCD1234" +func parseYEnd(p *Part, line string) { + // pcrc32 is the CRC of this part; crc32 is the CRC of the whole file (only on last part) + if hex := getHexParam(line, "pcrc32"); hex != 0 { + p.CRC32 = hex + } else if hex := getHexParam(line, "crc32"); hex != 0 && p.Total <= 1 { + // For single-part files, crc32 is the only CRC + p.CRC32 = hex + } +} + +// getIntParam extracts an integer parameter from a yEnc header line. +func getIntParam(line, key string) int { + prefix := key + "=" + idx := strings.Index(line, prefix) + if idx < 0 { + return 0 + } + start := idx + len(prefix) + end := start + for end < len(line) && line[end] >= '0' && line[end] <= '9' { + end++ + } + if end == start { + return 0 + } + v, _ := strconv.Atoi(line[start:end]) + return v +} + +// getHexParam extracts a hex parameter (like CRC32) from a yEnc header line. +// Uses word-boundary matching to avoid "pcrc32" matching "crc32". +func getHexParam(line, key string) uint32 { + prefix := key + "=" + idx := strings.Index(line, prefix) + if idx < 0 { + return 0 + } + // Ensure we're matching the exact key, not a suffix (e.g., "crc32" should not match "pcrc32") + if idx > 0 && line[idx-1] != ' ' && line[idx-1] != '\t' { + // Try finding another occurrence after this one + rest := line[idx+1:] + nextIdx := strings.Index(rest, prefix) + if nextIdx < 0 { + return 0 + } + idx = idx + 1 + nextIdx + if idx > 0 && line[idx-1] != ' ' && line[idx-1] != '\t' { + return 0 + } + } + start := idx + len(prefix) + end := start + for end < len(line) && ((line[end] >= '0' && line[end] <= '9') || + (line[end] >= 'a' && line[end] <= 'f') || + (line[end] >= 'A' && line[end] <= 'F')) { + end++ + } + if end == start { + return 0 + } + v, err := strconv.ParseUint(line[start:end], 16, 32) + if err != nil { + return 0 + } + return uint32(v) +} diff --git a/internal/usenet/yenc/decode_test.go b/internal/usenet/yenc/decode_test.go new file mode 100644 index 0000000..35b77f5 --- /dev/null +++ b/internal/usenet/yenc/decode_test.go @@ -0,0 +1,208 @@ +package yenc + +import ( + "bytes" + "encoding/hex" + "fmt" + "hash/crc32" + "strings" + "testing" +) + +func TestDecodeLine(t *testing.T) { + // yEnc: each byte = (original + 42) % 256 + // So to encode byte 0x00, we store 0x2A (42) + // To encode byte 0x01, we store 0x2B (43) + + // Encode "Hello" manually: + // H=72 → 72+42=114='r' + // e=101 → 101+42=143='\x8f' + // l=108 → 108+42=150='\x96' + // l=108 → same + // o=111 → 111+42=153='\x99' + input := string([]byte{114, 143, 150, 150, 153}) + decoded := decodeLine(input) + if string(decoded) != "Hello" { + t.Errorf("decodeLine: got %q, want %q", string(decoded), "Hello") + } +} + +func TestDecodeLineWithEscape(t *testing.T) { + // Escaped characters: =\x00 (NUL), =\n (LF), =\r (CR), == (=) + // Escape: '=' followed by byte, decoded as (byte - 64 - 42) = (byte - 106) + + // To encode byte 0x00 (NUL): escape it → '=' + (0 + 42 + 64) = '=' + 106 = '=' + 'j' + input := "=j" // should decode to 0x00 + decoded := decodeLine(input) + if len(decoded) != 1 || decoded[0] != 0x00 { + t.Errorf("escape decode: got %v, want [0x00]", decoded) + } +} + +func TestDecodeSimpleArticle(t *testing.T) { + // Create a simple yEnc encoded article + original := []byte("Hello, World! This is a test of yEnc encoding.") + encoded := encodeForTest(original) + + crc := crc32.ChecksumIEEE(original) + + article := fmt.Sprintf("=ybegin line=128 size=%d name=test.txt\r\n%s\r\n=yend size=%d crc32=%08x\r\n", + len(original), encoded, len(original), crc) + + part, err := Decode(strings.NewReader(article)) + if err != nil { + t.Fatalf("Decode failed: %v", err) + } + + if part.Name != "test.txt" { + t.Errorf("Name: got %q, want %q", part.Name, "test.txt") + } + if part.Size != int64(len(original)) { + t.Errorf("Size: got %d, want %d", part.Size, len(original)) + } + if !bytes.Equal(part.Data, original) { + t.Errorf("Data mismatch:\n got: %s\n want: %s", hex.EncodeToString(part.Data), hex.EncodeToString(original)) + } +} + +func TestDecodeMultipart(t *testing.T) { + original := []byte("Part one data here") + encoded := encodeForTest(original) + + crc := crc32.ChecksumIEEE(original) + + article := fmt.Sprintf("=ybegin part=1 total=3 line=128 size=1000 name=movie.mkv\r\n"+ + "=ypart begin=1 end=%d\r\n"+ + "%s\r\n"+ + "=yend size=%d part=1 pcrc32=%08x\r\n", + len(original), encoded, len(original), crc) + + part, err := Decode(strings.NewReader(article)) + if err != nil { + t.Fatalf("Decode failed: %v", err) + } + + if part.Number != 1 { + t.Errorf("Number: got %d, want 1", part.Number) + } + if part.Total != 3 { + t.Errorf("Total: got %d, want 3", part.Total) + } + if part.Begin != 1 { + t.Errorf("Begin: got %d, want 1", part.Begin) + } + if part.End != int64(len(original)) { + t.Errorf("End: got %d, want %d", part.End, len(original)) + } + if part.Name != "movie.mkv" { + t.Errorf("Name: got %q", part.Name) + } + if !bytes.Equal(part.Data, original) { + t.Error("Data mismatch") + } +} + +func TestDecodeCRC32Mismatch(t *testing.T) { + original := []byte("test data") + encoded := encodeForTest(original) + + article := fmt.Sprintf("=ybegin line=128 size=%d name=test.bin\r\n%s\r\n=yend size=%d crc32=deadbeef\r\n", + len(original), encoded, len(original)) + + _, err := Decode(strings.NewReader(article)) + if err == nil { + t.Error("expected CRC32 mismatch error") + } + if !strings.Contains(err.Error(), "CRC32 mismatch") { + t.Errorf("unexpected error: %v", err) + } +} + +func TestDecodeNoHeader(t *testing.T) { + _, err := Decode(strings.NewReader("just some random data\r\n")) + if err == nil { + t.Error("expected error for missing header") + } +} + +func TestDecodeBytes(t *testing.T) { + original := []byte("quick test") + encoded := encodeForTest(original) + crc := crc32.ChecksumIEEE(original) + + article := fmt.Sprintf("=ybegin line=128 size=%d name=q.bin\r\n%s\r\n=yend size=%d crc32=%08x\r\n", + len(original), encoded, len(original), crc) + + part, err := DecodeBytes([]byte(article)) + if err != nil { + t.Fatalf("DecodeBytes failed: %v", err) + } + if !bytes.Equal(part.Data, original) { + t.Error("Data mismatch") + } +} + +func TestDecodeBinaryData(t *testing.T) { + // Test with all byte values 0-255 + original := make([]byte, 256) + for i := range original { + original[i] = byte(i) + } + + encoded := encodeForTest(original) + crc := crc32.ChecksumIEEE(original) + + article := fmt.Sprintf("=ybegin line=128 size=%d name=binary.bin\r\n%s\r\n=yend size=%d crc32=%08x\r\n", + len(original), encoded, len(original), crc) + + part, err := Decode(strings.NewReader(article)) + if err != nil { + t.Fatalf("Decode failed: %v", err) + } + if !bytes.Equal(part.Data, original) { + t.Errorf("Binary data mismatch: got %d bytes, want %d", len(part.Data), len(original)) + } +} + +func TestGetIntParam(t *testing.T) { + line := "=ybegin part=5 total=100 line=128 size=768000 name=file.mkv" + if v := getIntParam(line, "part"); v != 5 { + t.Errorf("part: got %d", v) + } + if v := getIntParam(line, "total"); v != 100 { + t.Errorf("total: got %d", v) + } + if v := getIntParam(line, "size"); v != 768000 { + t.Errorf("size: got %d", v) + } + if v := getIntParam(line, "missing"); v != 0 { + t.Errorf("missing: got %d", v) + } +} + +func TestGetHexParam(t *testing.T) { + line := "=yend size=1000 pcrc32=ABCD1234 crc32=deadbeef" + if v := getHexParam(line, "pcrc32"); v != 0xABCD1234 { + t.Errorf("pcrc32: got %08x, want ABCD1234", v) + } + if v := getHexParam(line, "crc32"); v != 0xdeadbeef { + t.Errorf("crc32: got %08x, want deadbeef", v) + } +} + +// encodeForTest encodes data using yEnc for testing purposes. +func encodeForTest(data []byte) string { + var buf bytes.Buffer + for _, b := range data { + encoded := byte((int(b) + 42) % 256) + // Escape special bytes: NUL, LF, CR, '=', '.' + switch encoded { + case 0x00, 0x0A, 0x0D, 0x3D, 0x2E: + buf.WriteByte('=') + buf.WriteByte(byte((int(encoded) + 64) % 256)) + default: + buf.WriteByte(encoded) + } + } + return buf.String() +}