From efa4562acd50d77f76039388db93c113ce80649e Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Mon, 30 Mar 2026 23:24:16 +0200 Subject: [PATCH] refactor: migrate lint config to v2, remove daemon auto-upgrade, add trust badges --- .github/workflows/ci.yml | 2 +- .golangci.yml | 43 ++++++----- README.md | 4 + go.mod | 1 + go.sum | 2 + internal/agent/client.go | 20 +++-- internal/agent/client_test.go | 59 -------------- internal/agent/daemon.go | 51 ++++++------- internal/agent/transport.go | 3 - internal/agent/transport_e2e_test.go | 16 +--- internal/agent/transport_http.go | 8 +- internal/agent/transport_hybrid.go | 12 --- internal/agent/transport_ws.go | 16 ---- internal/agent/types.go | 23 +++--- internal/cmd/daemon.go | 62 +-------------- internal/cmd/version.go | 2 +- internal/engine/progress.go | 110 ++++++++++++++++++++------- internal/engine/torrent.go | 22 +++++- 18 files changed, 188 insertions(+), 268 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 04465e5..262833c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -81,7 +81,7 @@ jobs: - name: Upload coverage to Codecov uses: codecov/codecov-action@v5 with: - file: ./coverage.out + files: ./coverage.out fail_ci_if_error: false env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} diff --git a/.golangci.yml b/.golangci.yml index 52a7fa7..d5c6099 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,10 +1,11 @@ +version: "2" + run: timeout: 5m linters: enable: - errcheck - - gosimple - govet - ineffassign - staticcheck @@ -16,29 +17,31 @@ linters: - errname - errorlint - exhaustive - - gofmt - - goimports - misspell - nilerr - prealloc - unconvert - unparam - wastedassign + settings: + gosec: + excludes: + - G104 # Allow unhandled errors in fire-and-forget (notifications) + errcheck: + exclude-functions: + - (*os/exec.Cmd).Start # Fire-and-forget for notifications + exhaustive: + default-signifies-exhaustive: true + misspell: + locale: US + exclusions: + paths: + - dist -linters-settings: - gosec: - excludes: - - G104 # Allow unhandled errors in fire-and-forget (notifications) - errcheck: - exclude-functions: - - (*os/exec.Cmd).Start # Fire-and-forget for notifications - exhaustive: - default-signifies-exhaustive: true - misspell: - locale: US - -issues: - exclude-dirs: - - dist - max-issues-per-linter: 50 - max-same-issues: 5 +formatters: + enable: + - gofmt + - goimports + exclusions: + paths: + - dist diff --git a/README.md b/README.md index a89473a..340147c 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,11 @@ > **⚠️ Beta** — unarr is under active development. Features may change, and bugs are expected. [Report issues here](https://github.com/torrentclaw/unarr/issues). [![CI](https://github.com/torrentclaw/unarr/actions/workflows/ci.yml/badge.svg)](https://github.com/torrentclaw/unarr/actions/workflows/ci.yml) +[![Latest Release](https://img.shields.io/github/v/release/torrentclaw/unarr)](https://github.com/torrentclaw/unarr/releases) [![Go Report Card](https://goreportcard.com/badge/github.com/torrentclaw/unarr)](https://goreportcard.com/report/github.com/torrentclaw/unarr) +[![Coverage](https://img.shields.io/codecov/c/github/torrentclaw/unarr)](https://codecov.io/gh/torrentclaw/unarr) +[![VirusTotal](https://img.shields.io/badge/VirusTotal-scanned-brightgreen?logo=virustotal)](https://github.com/torrentclaw/unarr/releases) +[![Docker Pulls](https://img.shields.io/docker/pulls/torrentclaw/unarr)](https://hub.docker.com/r/torrentclaw/unarr) [![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE) [![Go Version](https://img.shields.io/github/go-mod/go-version/torrentclaw/unarr)](go.mod) diff --git a/go.mod b/go.mod index ac2f72b..8cefa35 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/olekukonko/tablewriter v1.1.4 github.com/spf13/cobra v1.10.2 github.com/torrentclaw/go-client v0.2.0 + golang.org/x/term v0.41.0 golang.org/x/time v0.15.0 ) diff --git a/go.sum b/go.sum index 4401697..653faf6 100644 --- a/go.sum +++ b/go.sum @@ -533,6 +533,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.41.0 h1:QCgPso/Q3RTJx2Th4bDLqML4W6iJiaXFq2/ftQF13YU= +golang.org/x/term v0.41.0/go.mod h1:3pfBgksrReYfZ5lvYM0kSO0LIkAl4Yl2bXOkKP7Ec2A= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/internal/agent/client.go b/internal/agent/client.go index d0dd7ad..9fd6ec8 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -73,17 +73,6 @@ func (c *Client) Deregister(ctx context.Context, agentID string) error { return nil } -// ReportUpgradeResult reports the outcome of a self-upgrade attempt. -func (c *Client) ReportUpgradeResult(ctx context.Context, result UpgradeResult) error { - var resp struct { - Success bool `json:"success"` - } - if err := c.doPost(ctx, "/api/internal/agent/upgrade-result", result, &resp); err != nil { - return fmt.Errorf("report upgrade: %w", err) - } - return nil -} - // ReportStatus reports download progress. Returns server-side flags the CLI must act on. func (c *Client) ReportStatus(ctx context.Context, update StatusUpdate) (*StatusResponse, error) { var resp StatusResponse @@ -93,6 +82,15 @@ func (c *Client) ReportStatus(ctx context.Context, update StatusUpdate) (*Status return &resp, nil } +// BatchReportStatus sends multiple status updates in a single request. +func (c *Client) BatchReportStatus(ctx context.Context, updates []StatusUpdate) (*BatchStatusResponse, error) { + var resp BatchStatusResponse + if err := c.doPost(ctx, "/api/internal/agent/status", BatchStatusRequest{Updates: updates}, &resp); err != nil { + return nil, fmt.Errorf("batch report status: %w", err) + } + return &resp, nil +} + // --------------------------------------------------------------------------- // Usenet endpoints // --------------------------------------------------------------------------- diff --git a/internal/agent/client_test.go b/internal/agent/client_test.go index 03dcb57..9266b74 100644 --- a/internal/agent/client_test.go +++ b/internal/agent/client_test.go @@ -324,62 +324,3 @@ func TestHeartbeatWithoutUpgradeSignal(t *testing.T) { t.Errorf("expected no upgrade signal, got %+v", resp.Upgrade) } } - -func TestReportUpgradeResult(t *testing.T) { - var received UpgradeResult - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/api/internal/agent/upgrade-result" { - t.Errorf("path = %s, want /api/internal/agent/upgrade-result", r.URL.Path) - } - if r.Method != http.MethodPost { - t.Errorf("method = %s, want POST", r.Method) - } - json.NewDecoder(r.Body).Decode(&received) - json.NewEncoder(w).Encode(struct{ Success bool }{Success: true}) - })) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "unarr-test") - err := c.ReportUpgradeResult(context.Background(), UpgradeResult{ - AgentID: "agent-1", - Success: true, - Version: "2.0.0", - }) - if err != nil { - t.Fatalf("ReportUpgradeResult failed: %v", err) - } - if received.AgentID != "agent-1" { - t.Errorf("agentId = %q, want agent-1", received.AgentID) - } - if !received.Success { - t.Error("expected success=true") - } - if received.Version != "2.0.0" { - t.Errorf("version = %q, want 2.0.0", received.Version) - } -} - -func TestReportUpgradeResultFailure(t *testing.T) { - var received UpgradeResult - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - json.NewDecoder(r.Body).Decode(&received) - json.NewEncoder(w).Encode(struct{ Success bool }{Success: true}) - })) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "unarr-test") - err := c.ReportUpgradeResult(context.Background(), UpgradeResult{ - AgentID: "agent-1", - Success: false, - Error: "checksum mismatch", - }) - if err != nil { - t.Fatalf("ReportUpgradeResult failed: %v", err) - } - if received.Success { - t.Error("expected success=false") - } - if received.Error != "checksum mismatch" { - t.Errorf("error = %q, want 'checksum mismatch'", received.Error) - } -} diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index b8a9017..417e8b3 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -25,23 +25,26 @@ type Daemon struct { transport Transport // Callbacks - OnTasksClaimed func(tasks []Task) - OnStreamRequested func(req StreamRequest) - OnUpgradeRequested func(version string) - OnControlAction func(action, taskID string) + OnTasksClaimed func(tasks []Task) + OnStreamRequested func(req StreamRequest) + OnControlAction func(action, taskID string) // State - User UserInfo - Features FeatureFlags - Info AgentInfo - State DaemonState - upgradeInProgress bool - heartbeatFailures int + User UserInfo + Features FeatureFlags + Info AgentInfo + State DaemonState + heartbeatFailures int + lastNotifiedVersion string // Callbacks for state tracking (set by cmd/daemon.go) GetActiveCount func() int GetCleanableBytes func() int64 + // Watching tracks whether a user is viewing download progress in the web UI. + // When false, the progress reporter skips detailed updates (only sends final states). + Watching bool + // Exposed tickers for hot-reload PollTicker *time.Ticker HeartbeatTicker *time.Ticker @@ -191,20 +194,18 @@ func (d *Daemon) heartbeat(ctx context.Context) { d.heartbeatFailures = 0 } - // Update state file + // Update watching flag and state file + d.Watching = resp.Watching d.State.LastHeartbeat = time.Now() if d.GetActiveCount != nil { d.State.ActiveTasks = d.GetActiveCount() } WriteState(&d.State) - // Check for upgrade signal from server - if resp.Upgrade != nil && resp.Upgrade.Version != "" && !d.upgradeInProgress { - d.upgradeInProgress = true - log.Printf("Upgrade requested by server: %s → %s", d.cfg.Version, resp.Upgrade.Version) - if d.OnUpgradeRequested != nil { - go d.OnUpgradeRequested(resp.Upgrade.Version) - } + // Log once per version when server suggests an upgrade + if resp.Upgrade != nil && resp.Upgrade.Version != "" && resp.Upgrade.Version != d.lastNotifiedVersion { + d.lastNotifiedVersion = resp.Upgrade.Version + log.Printf("New version available: %s (run `unarr self-update` to upgrade)", resp.Upgrade.Version) } } @@ -225,12 +226,9 @@ func (d *Daemon) handleEvent(event ServerEvent) { } case "upgrade": - if event.Upgrade != nil && event.Upgrade.Version != "" && !d.upgradeInProgress { - d.upgradeInProgress = true - log.Printf("Upgrade requested via WebSocket: %s → %s", d.cfg.Version, event.Upgrade.Version) - if d.OnUpgradeRequested != nil { - go d.OnUpgradeRequested(event.Upgrade.Version) - } + if event.Upgrade != nil && event.Upgrade.Version != "" && event.Upgrade.Version != d.lastNotifiedVersion { + d.lastNotifiedVersion = event.Upgrade.Version + log.Printf("New version available: %s (run `unarr self-update` to upgrade)", event.Upgrade.Version) } case "control": @@ -253,11 +251,6 @@ func (d *Daemon) TriggerPoll() { } } -// ClearUpgradeInProgress resets the upgrade flag so a retry can be attempted. -func (d *Daemon) ClearUpgradeInProgress() { - d.upgradeInProgress = false -} - func (d *Daemon) deregister() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/internal/agent/transport.go b/internal/agent/transport.go index 9aeee53..4bae6d7 100644 --- a/internal/agent/transport.go +++ b/internal/agent/transport.go @@ -29,9 +29,6 @@ type Transport interface { // Deregister notifies the server of graceful shutdown. Deregister(ctx context.Context, agentID string) error - // ReportUpgradeResult reports upgrade outcome. - ReportUpgradeResult(ctx context.Context, result UpgradeResult) error - // Events returns a channel that emits server-initiated events. // In HTTP mode this channel is never written to (polling handles it). // In WS mode, tasks/upgrade/control arrive here. diff --git a/internal/agent/transport_e2e_test.go b/internal/agent/transport_e2e_test.go index 0dd3668..01de3cb 100644 --- a/internal/agent/transport_e2e_test.go +++ b/internal/agent/transport_e2e_test.go @@ -134,23 +134,13 @@ func TestE2EFullLifecycle(t *testing.T) { t.Fatal("timeout waiting for cancel control") } - // 6. Send upgrade result - err = tr.ReportUpgradeResult(ctx, UpgradeResult{ - AgentID: "e2e-agent", - Success: true, - Version: "2.0.0", - }) - if err != nil { - t.Fatalf("ReportUpgradeResult: %v", err) - } - // Verify server received all messages time.Sleep(100 * time.Millisecond) mu.Lock() defer mu.Unlock() - if len(receivedMessages) < 4 { - t.Fatalf("expected at least 4 messages, got %d", len(receivedMessages)) + if len(receivedMessages) < 3 { + t.Fatalf("expected at least 3 messages, got %d", len(receivedMessages)) } types := make([]string, len(receivedMessages)) @@ -158,7 +148,7 @@ func TestE2EFullLifecycle(t *testing.T) { types[i], _ = m["type"].(string) } - expected := []string{"auth", "heartbeat", "progress", "upgrade-result"} + expected := []string{"auth", "heartbeat", "progress"} for _, exp := range expected { found := false for _, got := range types { diff --git a/internal/agent/transport_http.go b/internal/agent/transport_http.go index d5f52a4..a506ba1 100644 --- a/internal/agent/transport_http.go +++ b/internal/agent/transport_http.go @@ -34,6 +34,10 @@ func (t *HTTPTransport) SendProgress(ctx context.Context, update StatusUpdate) ( return t.client.ReportStatus(ctx, update) } +func (t *HTTPTransport) BatchReportStatus(ctx context.Context, updates []StatusUpdate) (*BatchStatusResponse, error) { + return t.client.BatchReportStatus(ctx, updates) +} + func (t *HTTPTransport) ClaimTasks(ctx context.Context, agentID string) (*TasksResponse, error) { return t.client.ClaimTasks(ctx, agentID) } @@ -42,9 +46,5 @@ func (t *HTTPTransport) Deregister(ctx context.Context, agentID string) error { return t.client.Deregister(ctx, agentID) } -func (t *HTTPTransport) ReportUpgradeResult(ctx context.Context, result UpgradeResult) error { - return t.client.ReportUpgradeResult(ctx, result) -} - // Client returns the underlying HTTP client for direct use if needed. func (t *HTTPTransport) Client() *Client { return t.client } diff --git a/internal/agent/transport_hybrid.go b/internal/agent/transport_hybrid.go index c2bd831..345ac27 100644 --- a/internal/agent/transport_hybrid.go +++ b/internal/agent/transport_hybrid.go @@ -120,18 +120,6 @@ func (h *HybridTransport) Deregister(ctx context.Context, agentID string) error return h.http.Deregister(ctx, agentID) } -// ReportUpgradeResult delegates to the active transport. -func (h *HybridTransport) ReportUpgradeResult(ctx context.Context, result UpgradeResult) error { - if h.mode.Load() == "ws" { - if err := h.ws.ReportUpgradeResult(ctx, result); err != nil { - h.switchToHTTP() - return h.http.ReportUpgradeResult(ctx, result) - } - return nil - } - return h.http.ReportUpgradeResult(ctx, result) -} - // ── Internal ───────────────────────────────────────────────────────────────── func (h *HybridTransport) switchToHTTP() { diff --git a/internal/agent/transport_ws.go b/internal/agent/transport_ws.go index a1e152d..bdb7343 100644 --- a/internal/agent/transport_ws.go +++ b/internal/agent/transport_ws.go @@ -209,22 +209,6 @@ func (t *WSTransport) Deregister(_ context.Context, _ string) error { return t.Close() } -// ReportUpgradeResult sends upgrade result to the DO. -func (t *WSTransport) ReportUpgradeResult(_ context.Context, result UpgradeResult) error { - msg := struct { - Type string `json:"type"` - Success bool `json:"success"` - Version string `json:"version,omitempty"` - Error string `json:"error,omitempty"` - }{ - Type: "upgrade-result", - Success: result.Success, - Version: result.Version, - Error: result.Error, - } - return t.send(msg) -} - // ── Internal ───────────────────────────────────────────────────────────────── func (t *WSTransport) send(msg any) error { diff --git a/internal/agent/types.go b/internal/agent/types.go index f95cdd1..4375d9d 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -111,10 +111,21 @@ type StatusResponse struct { StreamRequested bool `json:"streamRequested,omitempty"` } +// BatchStatusRequest wraps multiple status updates in a single request. +type BatchStatusRequest struct { + Updates []StatusUpdate `json:"updates"` +} + +// BatchStatusResponse wraps per-task results from the batch endpoint. +type BatchStatusResponse struct { + Results []StatusResponse `json:"results"` +} + // HeartbeatResponse is returned by the server on heartbeat. type HeartbeatResponse struct { - Success bool `json:"success"` - Upgrade *UpgradeSignal `json:"upgrade,omitempty"` + Success bool `json:"success"` + Upgrade *UpgradeSignal `json:"upgrade,omitempty"` + Watching bool `json:"watching,omitempty"` // true when a user is viewing download progress in the web UI } // UpgradeSignal tells the agent to upgrade to a specific version. @@ -122,14 +133,6 @@ type UpgradeSignal struct { Version string `json:"version"` } -// UpgradeResult is sent by the agent after an upgrade attempt. -type UpgradeResult struct { - AgentID string `json:"agentId"` - Success bool `json:"success"` - Version string `json:"version,omitempty"` - Error string `json:"error,omitempty"` -} - // ErrorResponse is returned on API errors. type ErrorResponse struct { Error string `json:"error"` diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 5b5ddb6..d83e5c0 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -18,7 +18,6 @@ import ( "github.com/torrentclaw/unarr/internal/engine" "github.com/torrentclaw/unarr/internal/library" "github.com/torrentclaw/unarr/internal/usenet/download" - "github.com/torrentclaw/unarr/internal/upgrade" ) // newStartCmd creates the top-level `unarr start` command. @@ -93,7 +92,6 @@ func newDaemonCmd() *cobra.Command { return cmd } - func runDaemonStart() error { cfg := loadConfig() bold := color.New(color.Bold) @@ -174,6 +172,7 @@ func runDaemonStart() error { // Create progress reporter using transport reporter := engine.NewProgressReporterWithTransport(transport, 3*time.Second) + reporter.SetWatchingFunc(func() bool { return d.Watching }) // Parse speed limits maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed) @@ -187,7 +186,7 @@ func runDaemonStart() error { torrentDl, err := engine.NewTorrentDownloader(engine.TorrentConfig{ DataDir: cfg.Download.Dir, MetadataTimeout: metaTimeout, // 0 = unlimited (default) - StallTimeout: stallTimeout, // 0 = unlimited (default) + StallTimeout: stallTimeout, // 0 = unlimited (default) MaxTimeout: 0, // unlimited MaxDownloadRate: maxDl, MaxUploadRate: maxUl, @@ -356,63 +355,6 @@ func runDaemonStart() error { } } - // Wire: server-requested upgrade - d.OnUpgradeRequested = func(targetVersion string) { - - // Wait for active downloads to finish - if active := manager.ActiveCount(); active > 0 { - log.Printf("Waiting for %d active download(s) to finish before upgrading...", active) - manager.Wait() - } - - upgrader := &upgrade.Upgrader{CurrentVersion: Version} - result := upgrader.Execute(ctx, targetVersion) - - // Report result to server - reportCtx, reportCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer reportCancel() - errMsg := "" - if result.Error != nil { - errMsg = result.Error.Error() - } - upgradeResult := agent.UpgradeResult{ - AgentID: cfg.Agent.ID, - Success: result.Success, - Version: result.NewVersion, - Error: errMsg, - } - _ = transport.ReportUpgradeResult(reportCtx, upgradeResult) - - if !result.Success { - log.Printf("Upgrade failed: %v", result.Error) - d.ClearUpgradeInProgress() - return - } - - // Restart: replace current process with the new binary - log.Printf("Upgrade successful (%s → %s), restarting...", result.OldVersion, result.NewVersion) - - // Deregister first so the server knows we're restarting - deregCtx, deregCancel := context.WithTimeout(context.Background(), 5*time.Second) - defer deregCancel() - _ = transport.Deregister(deregCtx, cfg.Agent.ID) - - // Flush progress reporter - cancel() - - // Re-exec with the same args — the new binary takes over - binPath, err := os.Executable() - if err != nil { - log.Printf("Could not determine executable path: %v", err) - os.Exit(75) // EX_TEMPFAIL - } - // syscall.Exec replaces the current process (Unix) - execErr := syscall.Exec(binPath, os.Args, os.Environ()) - // If we get here, exec failed (e.g. Windows) - log.Printf("Exec failed: %v — exiting for service manager restart", execErr) - os.Exit(75) - } - // Config hot-reload (SIGUSR1 on Unix, no-op on Windows) // Tickers are initialized inside d.Run(), so we pass the daemon // and the reload goroutine reads them when the signal arrives. diff --git a/internal/cmd/version.go b/internal/cmd/version.go index 2b0ca15..0f63091 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.3.0-dev" +var Version = "0.3.4-dev" diff --git a/internal/engine/progress.go b/internal/engine/progress.go index f91af8a..e2284fc 100644 --- a/internal/engine/progress.go +++ b/internal/engine/progress.go @@ -18,11 +18,22 @@ type StatusReporter interface { ReportStatus(ctx context.Context, update agent.StatusUpdate) (*agent.StatusResponse, error) } +// BatchStatusReporter extends StatusReporter with batch support. +// Transports that implement this send all updates in a single request. +type BatchStatusReporter interface { + StatusReporter + BatchReportStatus(ctx context.Context, updates []agent.StatusUpdate) (*agent.BatchStatusResponse, error) +} + +// WatchingFunc returns whether a user is actively viewing download progress. +type WatchingFunc func() bool + // ProgressReporter aggregates progress from downloads and reports to the API. // It batches updates to avoid flooding the server. type ProgressReporter struct { - reporter StatusReporter - interval time.Duration + reporter StatusReporter + interval time.Duration + isWatching WatchingFunc // nil = always report (backwards compatible) onCancel ActionFunc onPause ActionFunc @@ -73,6 +84,9 @@ func (r *ProgressReporter) SetDeleteFilesHandler(fn ActionFunc) { r.onDeleteFile // SetStreamRequestedHandler sets the callback for stream activation. func (r *ProgressReporter) SetStreamRequestedHandler(fn ActionFunc) { r.onStreamRequested = fn } +// SetWatchingFunc sets the function that checks if someone is viewing downloads. +func (r *ProgressReporter) SetWatchingFunc(fn WatchingFunc) { r.isWatching = fn } + // Track registers a task for progress tracking. func (r *ProgressReporter) Track(task *Task) { r.mu.Lock() @@ -111,43 +125,87 @@ func (r *ProgressReporter) flush(ctx context.Context) { } r.mu.Unlock() + // When nobody is watching, only report final states (completed/failed). + // This saves ~99% of API requests when the user isn't on the downloads page. + watching := r.isWatching == nil || r.isWatching() + + var reportable []*Task for _, task := range tasks { status := task.GetStatus() - if status != StatusDownloading && status != StatusVerifying && - status != StatusOrganizing && status != StatusSeeding && - status != StatusCompleted && status != StatusFailed { - continue + isFinal := status == StatusCompleted || status == StatusFailed + isActive := status == StatusDownloading || status == StatusVerifying || + status == StatusOrganizing || status == StatusSeeding + if isFinal || (watching && isActive) { + reportable = append(reportable, task) } + } + if len(reportable) == 0 { + return + } + + // Use batch when transport supports it + if batcher, ok := r.reporter.(BatchStatusReporter); ok { + r.flushBatch(ctx, batcher, reportable) + return + } + + // Fallback: individual requests + for _, task := range reportable { update := task.ToStatusUpdate() resp, err := r.reporter.ReportStatus(ctx, update) if err != nil { log.Printf("[%s] progress report failed: %v", task.ID[:8], err) continue } + r.handleResponse(task, resp) + } +} - // Handle server-side signals - if resp.Cancelled { - log.Printf("[%s] cancelled by user (via web)", task.ID[:8]) - r.Untrack(task.ID) - if resp.DeleteFiles && r.onDeleteFiles != nil { - r.onDeleteFiles(task.ID) - } else if r.onCancel != nil { - r.onCancel(task.ID) - } - } else if resp.Paused { - log.Printf("[%s] paused by user (via web)", task.ID[:8]) - r.Untrack(task.ID) - if r.onPause != nil { - r.onPause(task.ID) - } +func (r *ProgressReporter) flushBatch(ctx context.Context, batcher BatchStatusReporter, tasks []*Task) { + updates := make([]agent.StatusUpdate, len(tasks)) + for i, task := range tasks { + updates[i] = task.ToStatusUpdate() + } + + resp, err := batcher.BatchReportStatus(ctx, updates) + if err != nil { + log.Printf("batch progress report failed: %v", err) + return + } + + // Match results back to tasks by index (server returns in same order) + if len(resp.Results) != len(tasks) { + log.Printf("batch response mismatch: sent %d updates, got %d results", len(tasks), len(resp.Results)) + } + for i, result := range resp.Results { + if i < len(tasks) { + r.handleResponse(tasks[i], &result) } + } +} - if resp.StreamRequested && task.GetStreamURL() == "" { - log.Printf("[%s] stream requested by user (via web)", task.ID[:8]) - if r.onStreamRequested != nil { - r.onStreamRequested(task.ID) - } +func (r *ProgressReporter) handleResponse(task *Task, resp *agent.StatusResponse) { + if resp.Cancelled { + log.Printf("[%s] cancelled by user (via web)", task.ID[:8]) + r.Untrack(task.ID) + if resp.DeleteFiles && r.onDeleteFiles != nil { + r.onDeleteFiles(task.ID) + } else if r.onCancel != nil { + r.onCancel(task.ID) + } + } else if resp.Paused { + log.Printf("[%s] paused by user (via web)", task.ID[:8]) + r.Untrack(task.ID) + if r.onPause != nil { + r.onPause(task.ID) + } + } + + if resp.StreamRequested && task.GetStreamURL() == "" { + log.Printf("[%s] stream requested by user (via web)", task.ID[:8]) + if r.onStreamRequested != nil { + r.onStreamRequested(task.ID) } } } diff --git a/internal/engine/torrent.go b/internal/engine/torrent.go index 3318255..ecb2b68 100644 --- a/internal/engine/torrent.go +++ b/internal/engine/torrent.go @@ -17,6 +17,7 @@ import ( "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/storage" "github.com/torrentclaw/unarr/internal/config" + "golang.org/x/term" "golang.org/x/time/rate" ) @@ -96,7 +97,7 @@ func NewTorrentDownloader(cfg TorrentConfig) (*TorrentDownloader, error) { tcfg.DataDir = cfg.DataDir tcfg.Seed = cfg.SeedEnabled tcfg.NoUpload = !cfg.SeedEnabled - tcfg.Logger = alog.Default.FilterLevel(alog.Warning) + tcfg.Logger = alog.Default.FilterLevel(alog.Critical) // --- Performance optimizations --- @@ -342,13 +343,20 @@ func (d *TorrentDownloader) pollDownload(ctx context.Context, t *torrent.Torrent } lastBytesAt := time.Now() lastBytes := int64(0) + isTTY := term.IsTerminal(int(os.Stderr.Fd())) for { select { case <-ctx.Done(): + if isTTY { + fmt.Fprintln(os.Stderr) + } return nil, fmt.Errorf("cancelled") case <-deadline: + if isTTY { + fmt.Fprintln(os.Stderr) + } return nil, fmt.Errorf("max timeout %s exceeded", d.cfg.MaxTimeout) case <-ticker.C: @@ -381,12 +389,17 @@ func (d *TorrentDownloader) pollDownload(ctx context.Context, t *torrent.Torrent // Peer stats stats := t.Stats() - // Terminal progress (log.Printf for daemon-friendly output, no \r) + // Terminal progress pct := int(float64(downloaded) / float64(totalBytes) * 100) - log.Printf("[%s] %d%% — %s/%s @ %s/s peers:%d seeds:%d", + line := fmt.Sprintf("[%s] %d%% — %s/%s @ %s/s peers:%d seeds:%d", task.ID[:8], pct, formatBytes(downloaded), formatBytes(totalBytes), formatBytes(speed), stats.ActivePeers, stats.ConnectedSeeders) + if isTTY { + fmt.Fprintf(os.Stderr, "\r\033[K%s", line) + } else { + log.Print(line) + } // Report progress p := Progress{ @@ -407,6 +420,9 @@ func (d *TorrentDownloader) pollDownload(ctx context.Context, t *torrent.Torrent // Check completion if downloaded >= totalBytes { + if isTTY { + fmt.Fprintln(os.Stderr) // newline after \r progress + } log.Printf("[%s] download complete: %s", task.ID[:8], fileName) return &Result{}, nil }