From a73e1a775677f8a86dd09e4efa2cc5b53b36f3c8 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Fri, 15 May 2026 16:26:43 +0200 Subject: [PATCH] feat(agent): add mirror failover, agent client refactor, status 401 detection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Mirror pool with health tracking and exponential backoff for failed hosts - Agent client routes requests through mirror pool with retry semantics - New `unarr mirrors` command to inspect mirror state and force failover - `unarr status` now detects 401 from /agent/register and suggests `unarr login` instead of the generic "Could not fetch account info" message - Config supports multiple ScanPaths for upcoming multi-path library scan - Draft plan for bidirectional library sync (CLI ↔ Web) under Docs/plans/ --- Docs/plans/library-sync.md | 170 ++++++++++++++++++++++++ internal/agent/client.go | 221 ++++++++++++++++++++++---------- internal/agent/client_test.go | 4 +- internal/agent/mirror_client.go | 213 ++++++++++++++++++++++++++++++ internal/agent/mirror_pool.go | 172 +++++++++++++++++++++++++ internal/agent/signal_client.go | 2 +- internal/cmd/agent_client.go | 23 ++++ internal/cmd/daemon.go | 7 +- internal/cmd/mirrors.go | 204 +++++++++++++++++++++++++++++ internal/cmd/root.go | 3 + internal/cmd/status.go | 15 ++- internal/config/config.go | 14 ++ 12 files changed, 972 insertions(+), 76 deletions(-) create mode 100644 Docs/plans/library-sync.md create mode 100644 internal/agent/mirror_client.go create mode 100644 internal/agent/mirror_pool.go create mode 100644 internal/cmd/agent_client.go create mode 100644 internal/cmd/mirrors.go diff --git a/Docs/plans/library-sync.md b/Docs/plans/library-sync.md new file mode 100644 index 0000000..509e87a --- /dev/null +++ b/Docs/plans/library-sync.md @@ -0,0 +1,170 @@ +# Plan: Sincronización bidireccional de biblioteca (CLI ↔ Web) + +## Context +La biblioteca web solo muestra descargas completadas (download_task + debrid). El `unarr scan` escanea ficheros con ffprobe y los sube al servidor, pero solo soporta un path, no detecta borrados del disco, y no permite borrar ficheros desde la web. El usuario quiere una biblioteca unificada que refleje el estado real de su colección y se sincronice en ambas direcciones. + +## Protocolo de sincronización + +### Forward Sync (Disco → Web) +1. CLI escanea todos los `ScanPaths` configurados +2. Para cada path: descubre ficheros, compara con cache (skip ffprobe si no cambió), sube a `/library-sync` +3. En `isLastBatch=true`: el servidor elimina items con ese `scanPath` que no estén en el batch (ficheros borrados del disco desaparecen de la web) + +### Reverse Sync (Web → Disco) +1. CLI llama a `GET /agent/library-deletions` — items que el usuario soft-deleted desde la web +2. Si `AutoDelete=true` o `--yes`: borra ficheros del disco +3. Si no: muestra lista y pide confirmación interactiva +4. Llama a `POST /agent/library-deletions/confirm` con los IDs confirmados → hard-delete en DB + +### Resolución de conflictos +- Fichero en disco pero no en web → forward sync lo añade +- Fichero en web pero no en disco → forward sync lo elimina (isLastBatch) +- Soft-deleted en web, aún en disco → reverse sync lo borra del disco y confirma +- Soft-deleted en web, ya borrado del disco → reverse sync confirma directamente +- Race condition (user borra en web mientras CLI escanea) → forward sync skippea rows con `deleted_at IS NOT NULL` + +--- + +## Fase 1: Multi-path + Forward Sync mejorado + +### 1.1 CLI — Config multi-path +**Archivo:** `torrentclaw-cli/internal/config/config.go` +- Añadir `ScanPaths []string` a `LibraryConfig` +- Migrar `ScanPath` → `ScanPaths[0]` en `Load()` si `ScanPaths` está vacío +- Añadir `AutoDelete bool` (default false) + +### 1.2 CLI — Cache v2 +**Archivo:** `torrentclaw-cli/internal/library/types.go` +- Cambiar `LibraryCache` a version 2: `Paths map[string][]LibraryItem` +- Migración v1→v2: `Path`+items → `Paths[Path]` + +**Archivo:** `torrentclaw-cli/internal/library/cache.go` +- `LoadCache` detecta versión y migra +- `SaveCache` siempre guarda v2 + +### 1.3 CLI — Scan multi-path +**Archivo:** `torrentclaw-cli/internal/cmd/scan.go` +- `unarr scan` sin args → escanea todos los `ScanPaths` +- `unarr scan /path/a /path/b` → escanea paths específicos y los recuerda en config +- Loop: para cada path, scan + sync con su `scanPath` + +### 1.4 CLI — Nuevo comando `unarr sync` +**Archivo nuevo:** `torrentclaw-cli/internal/cmd/sync.go` +- Forward sync: scan ligero (sin ffprobe para ficheros sin cambios) + upload +- Sin reverse sync todavía (Fase 3) +- Flags: `--dry-run`, `--paths` + +### 1.5 Web — Columna `scan_path` en `library_item` +**Archivo:** `torrentclaw-web/src/lib/db/schema.ts` +- Añadir `scanPath: varchar(2048)` a tabla `libraryItem` +- Generar migración con `pnpm db:generate` + +**Archivo:** `torrentclaw-web/src/lib/services/library-upgrade.ts` +- `syncLibraryItems()`: persistir `scanPath` en cada row al hacer upsert + +### 1.6 CLI — Daemon multi-path +**Archivo:** `torrentclaw-cli/internal/cmd/daemon.go` +- `runAutoScan()` itera sobre todos los `ScanPaths` + +--- + +## Fase 2: Reverse Sync (Web → Disco) + +### 2.1 Web — Soft-delete +**Archivo:** `torrentclaw-web/src/lib/db/schema.ts` +- Añadir `deletedAt: timestamp` a tabla `libraryItem` +- Generar migración + +### 2.2 Web — Endpoints de borrado +**Archivo nuevo:** `torrentclaw-web/src/app/api/internal/library/items/route.ts` +- `DELETE` — session auth, recibe `{itemIds: number[]}`, hace soft-delete (`deletedAt = NOW()`) + +**Archivo nuevo:** `torrentclaw-web/src/app/api/internal/agent/library-deletions/route.ts` +- `GET` — agent auth, devuelve items con `deletedAt IS NOT NULL` para ese usuario +- `POST` — agent auth, recibe `{confirmedIds: number[]}`, hard-delete los rows + +### 2.3 Web — Heartbeat con pendingDeletions +**Archivo:** endpoint de heartbeat del agente +- Añadir `pendingDeletions: number` al response (count de items con `deletedAt IS NOT NULL`) + +### 2.4 Web — Forward sync respeta soft-deletes +**Archivo:** `torrentclaw-web/src/lib/services/library-upgrade.ts` +- `syncLibraryItems()` en `isLastBatch`: la query de DELETE excluye rows con `deletedAt IS NOT NULL` + +### 2.5 CLI — Agent client nuevos métodos +**Archivo:** `torrentclaw-cli/internal/agent/client.go` +- `GetLibraryDeletions(ctx) → []DeletionItem` +- `ConfirmLibraryDeletions(ctx, ids []int) → error` + +**Archivo:** `torrentclaw-cli/internal/agent/types.go` +- `DeletionItem {ID int, FilePath string, DeletedAt string}` + +### 2.6 CLI — Sync reverse +**Archivo:** `torrentclaw-cli/internal/cmd/sync.go` +- Después del forward sync: llama a `GetLibraryDeletions()` +- Valida que cada fichero está dentro de un `ScanPaths` conocido (seguridad) +- Si `AutoDelete` o `--yes`: borra y confirma +- Si no: muestra lista interactiva, pide confirmación +- Flag `--no-delete` para skip reverse sync +- Si `BackupDir` configurado: mover a backup en vez de borrar + +### 2.7 CLI — Daemon auto-delete +**Archivo:** `torrentclaw-cli/internal/cmd/daemon.go` +- Al final de `runAutoSync()`: si `AutoDelete=true`, procesa deletions automáticamente +- Si no: log warning "N files pending deletion, run `unarr sync`" + +--- + +## Fase 3: Web UI (brief) + +- Botón "Eliminar" en items de biblioteca → llama `DELETE /library/items` +- Badge "Pendiente de borrar" en items soft-deleted +- Posibilidad de cancelar el borrado (clear `deletedAt`) +- Vista unificada: scanned items + downloaded items en la misma vista + +--- + +## Archivos clave + +### CLI (Go) +| Archivo | Cambio | +|---------|--------| +| `internal/config/config.go` | ScanPaths, AutoDelete, migración | +| `internal/library/types.go` | Cache v2 con Paths map | +| `internal/library/cache.go` | Load/Save v2, migración v1 | +| `internal/library/sync.go` | BuildSyncItems (sin cambios) | +| `internal/cmd/scan.go` | Multi-path loop | +| `internal/cmd/sync.go` | **Nuevo** — comando sync bidireccional | +| `internal/cmd/daemon.go` | runAutoSync multi-path + reverse | +| `internal/agent/client.go` | GetLibraryDeletions, ConfirmLibraryDeletions | +| `internal/agent/types.go` | DeletionItem type | + +### Web (TypeScript) +| Archivo | Cambio | +|---------|--------| +| `src/lib/db/schema.ts` | scanPath + deletedAt en library_item | +| `src/lib/services/library-upgrade.ts` | persistir scanPath, respetar soft-deletes | +| `src/app/api/internal/agent/library-deletions/route.ts` | **Nuevo** — GET + POST | +| `src/app/api/internal/library/items/route.ts` | **Nuevo** — DELETE soft-delete | +| Endpoint heartbeat del agente | pendingDeletions en response | + +--- + +## Verificación + +### Fase 1 +1. `go build ./cmd/unarr/ && go test ./...` +2. Configurar 2 scan paths en config.toml, ejecutar `unarr scan` → ambos se escanean +3. Borrar un fichero del disco, ejecutar `unarr scan` → desaparece de la web +4. `pnpm build` en torrentclaw-web para verificar tipos + +### Fase 2 +1. Desde la web: borrar un item de la biblioteca +2. Ejecutar `unarr sync` → muestra el fichero pendiente de borrar, pedir confirmación +3. Confirmar → fichero se borra del disco y desaparece de la web +4. `unarr sync --dry-run` → muestra lo que haría sin hacer nada +5. Con `auto_delete = true` en config: el daemon borra automáticamente + +### Fase 3 +1. Verificar visualmente en Chrome DevTools la UI de borrado +2. Verificar que el badge "pendiente" aparece y desaparece correctamente diff --git a/internal/agent/client.go b/internal/agent/client.go index 5ff987d..9aa3c2a 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -12,8 +12,13 @@ import ( ) // Client communicates with the /api/internal/agent/* endpoints. +// +// The client owns a MirrorPool: when a request fails with a transient +// network error (DNS, refused, timeout, 5xx) it rotates to the next mirror +// and retries up to `len(mirrors)-1` times so a single agent run survives +// a primary-domain takedown without user intervention. type Client struct { - baseURL string + pool *MirrorPool apiKey string httpClient *http.Client // wakeClient has no built-in timeout — used exclusively for the long-poll @@ -25,11 +30,20 @@ type Client struct { userAgent string } -// NewClient creates an agent API client. +// NewClient creates an agent API client targeting a single base URL. +// Equivalent to NewClientWithMirrors(baseURL, nil, ...) — kept for callers +// that don't yet care about mirror failover. func NewClient(baseURL, apiKey, userAgent string) *Client { + return NewClientWithMirrors(baseURL, nil, apiKey, userAgent) +} + +// NewClientWithMirrors creates an agent API client that can fail over from +// the primary base URL to any of the extras when the primary is unreachable. +// The order of `extras` matters: they're tried left-to-right after a failure. +func NewClientWithMirrors(baseURL string, extras []string, apiKey, userAgent string) *Client { return &Client{ - baseURL: baseURL, - apiKey: apiKey, + pool: NewMirrorPool(baseURL, extras), + apiKey: apiKey, httpClient: &http.Client{ Timeout: 30 * time.Second, }, @@ -44,6 +58,18 @@ func NewClient(baseURL, apiKey, userAgent string) *Client { } } +// MirrorPool exposes the underlying pool so callers (e.g. the `unarr mirrors` +// subcommand) can swap the list at runtime after fetching /api/v1/mirrors. +func (c *Client) MirrorPool() *MirrorPool { + return c.pool +} + +// baseURL returns the currently-active mirror. Routed through this helper so +// future changes (e.g. per-endpoint mirror affinity) only need one edit. +func (c *Client) baseURL() string { + return c.pool.Current() +} + // Register registers the CLI agent with the server and returns user info + features. func (c *Client) Register(ctx context.Context, req RegisterRequest) (*RegisterResponse, error) { var resp RegisterResponse @@ -109,30 +135,35 @@ func (c *Client) SearchNzbs(ctx context.Context, params NzbSearchParams) (*NzbSe // DownloadNzb downloads the NZB file for the given nzbId. // Returns the raw NZB XML bytes. func (c *Client) DownloadNzb(ctx context.Context, nzbID string) ([]byte, error) { - url := fmt.Sprintf("/api/internal/agent/nzb-download?nzbId=%s", nzbID) + path := fmt.Sprintf("/api/internal/agent/nzb-download?nzbId=%s", nzbID) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+url, nil) - if err != nil { - return nil, fmt.Errorf("create request: %w", err) - } - c.setHeaders(req) + var out []byte + err := c.withMirrorFailover(func(base string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, base+path, nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + c.setHeaders(req) - resp, err := c.httpClient.Do(req) - if err != nil { - return nil, fmt.Errorf("request failed: %w", err) - } - defer resp.Body.Close() + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() - if resp.StatusCode >= 400 { - body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<16)) - return nil, fmt.Errorf("nzb download error %d: %s", resp.StatusCode, string(body)) - } + if resp.StatusCode >= 400 { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<16)) + return &HTTPError{StatusCode: resp.StatusCode, Message: string(body)} + } - data, err := io.ReadAll(io.LimitReader(resp.Body, 100<<20)) // 100MB limit - if err != nil { - return nil, fmt.Errorf("read nzb: %w", err) - } - return data, nil + data, err := io.ReadAll(io.LimitReader(resp.Body, 100<<20)) // 100MB limit + if err != nil { + return fmt.Errorf("read nzb: %w", err) + } + out = data + return nil + }) + return out, err } // GetUsenetCredentials fetches NNTP connection credentials. @@ -193,31 +224,41 @@ func (c *Client) ReportWatchProgress(ctx context.Context, update WatchProgressUp // WaitForWake blocks until the server sends a wake signal, the long-poll // timeout elapses, or ctx is cancelled. Returns true when a wake signal // was received (caller should sync immediately), false on timeout/cancel. +// +// Wake is a long-poll on a single mirror — failover here would just drop +// the connection and try again immediately, which the server already +// handles with a fresh wait loop. We only retry against the next mirror +// when the current one is definitively unreachable (DNS / refused / TLS). func (c *Client) WaitForWake(ctx context.Context) (bool, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/internal/agent/wake", nil) - if err != nil { - return false, fmt.Errorf("create wake request: %w", err) - } - c.setHeaders(req) + var wake bool + err := c.withMirrorFailover(func(base string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/api/internal/agent/wake", nil) + if err != nil { + return fmt.Errorf("create wake request: %w", err) + } + c.setHeaders(req) - resp, err := c.wakeClient.Do(req) - if err != nil { - return false, fmt.Errorf("wake request failed: %w", err) - } - defer resp.Body.Close() + resp, err := c.wakeClient.Do(req) + if err != nil { + return fmt.Errorf("wake request failed: %w", err) + } + defer resp.Body.Close() - if resp.StatusCode >= 400 { - body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10)) - return false, &HTTPError{StatusCode: resp.StatusCode, Message: string(body)} - } + if resp.StatusCode >= 400 { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10)) + return &HTTPError{StatusCode: resp.StatusCode, Message: string(body)} + } - var result struct { - Wake bool `json:"wake"` - } - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { - return false, fmt.Errorf("decode wake response: %w", err) - } - return result.Wake, nil + var result struct { + Wake bool `json:"wake"` + } + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return fmt.Errorf("decode wake response: %w", err) + } + wake = result.Wake + return nil + }) + return wake, err } // doPost sends a JSON POST request using the default httpClient and decodes the response. @@ -227,45 +268,89 @@ func (c *Client) doPost(ctx context.Context, path string, body any, dst any) err // doPostWith sends a JSON POST request using the provided HTTP client and decodes the response. // Use this to override the default timeout for specific operations (e.g. librarySyncClient). +// Wrapped in withMirrorFailover so a transient connection failure on the +// active mirror retries against the next one. func (c *Client) doPostWith(ctx context.Context, hc *http.Client, path string, body any, dst any) error { jsonBody, err := json.Marshal(body) if err != nil { return fmt.Errorf("marshal body: %w", err) } - req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, bytes.NewReader(jsonBody)) - if err != nil { - return fmt.Errorf("create request: %w", err) - } + return c.withMirrorFailover(func(base string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, base+path, bytes.NewReader(jsonBody)) + if err != nil { + return fmt.Errorf("create request: %w", err) + } - c.setHeaders(req) - req.Header.Set("Content-Type", "application/json") + c.setHeaders(req) + req.Header.Set("Content-Type", "application/json") - resp, err := hc.Do(req) - if err != nil { - return fmt.Errorf("request failed: %w", err) - } - defer resp.Body.Close() + resp, err := hc.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() - return c.handleResponse(resp, dst) + return c.handleResponse(resp, dst) + }) } // doGet sends a GET request and decodes the response. func (c *Client) doGet(ctx context.Context, path string, dst any) error { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil) - if err != nil { - return fmt.Errorf("create request: %w", err) + return c.withMirrorFailover(func(base string) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, base+path, nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + c.setHeaders(req) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + + return c.handleResponse(resp, dst) + }) +} + +// withMirrorFailover runs `fn` against the current mirror; on a transient +// error it rotates the pool and retries up to `len(mirrors)-1` times. +// +// The active mirror is updated on rotation so subsequent unrelated calls +// stick to the working host until that host fails too — this avoids +// hammering a known-bad primary on every request, while still trying it +// again next time the agent reloads (no permanent demotion). +func (c *Client) withMirrorFailover(fn func(base string) error) error { + attempts := c.pool.Len() + if attempts < 1 { + attempts = 1 } - c.setHeaders(req) - - resp, err := c.httpClient.Do(req) - if err != nil { - return fmt.Errorf("request failed: %w", err) + var lastErr error + for i := 0; i < attempts; i++ { + base := c.baseURL() + err := fn(base) + if err == nil { + return nil + } + lastErr = err + if !IsTransient(err) { + return err + } + // Last attempt: don't bother rotating, just surface the error. + if i == attempts-1 { + break + } + next, rotated := c.pool.Rotate() + if !rotated { + break + } + _ = next // mirror rotation logging is left to higher layers (cmd/) so the + // pool stays log-free for tests. } - defer resp.Body.Close() - - return c.handleResponse(resp, dst) + return lastErr } func (c *Client) setHeaders(req *http.Request) { diff --git a/internal/agent/client_test.go b/internal/agent/client_test.go index 8b279a5..d905de4 100644 --- a/internal/agent/client_test.go +++ b/internal/agent/client_test.go @@ -498,8 +498,8 @@ func TestClient_SlowServer_Timeout(t *testing.T) { // Crear cliente con timeout muy corto c := &Client{ - baseURL: srv.URL, - apiKey: "test-key", + pool: NewMirrorPool(srv.URL, nil), + apiKey: "test-key", httpClient: &http.Client{ Timeout: 50 * time.Millisecond, }, diff --git a/internal/agent/mirror_client.go b/internal/agent/mirror_client.go new file mode 100644 index 0000000..1aa6fc2 --- /dev/null +++ b/internal/agent/mirror_client.go @@ -0,0 +1,213 @@ +package agent + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// MirrorEntry mirrors the shape of /api/v1/mirrors items on the server. +type MirrorEntry struct { + URL string `json:"url"` + Label string `json:"label"` + Kind string `json:"kind"` // "clearnet" | "tor" + Primary bool `json:"primary"` +} + +// MirrorChannel is an out-of-band status channel (Telegram, status page, etc.) +type MirrorChannel struct { + URL string `json:"url"` + Label string `json:"label"` +} + +// MirrorsResponse is the JSON document served by /api/v1/mirrors and +// /api/mirrors. +type MirrorsResponse struct { + Revision int `json:"revision"` + Mirrors []MirrorEntry `json:"mirrors"` + Tor *MirrorEntry `json:"tor"` + Channels []MirrorChannel `json:"channels"` + UpdatedAt string `json:"updatedAt"` +} + +// DefaultStaticFallbackURLs lists off-domain JSON copies of the mirror list. +// Hard-coded here (not loaded from config) because the whole point is to +// have something to consult when config-driven URLs all fail. +// +// Keep in sync with src/lib/mirrors-config.ts → STATIC_FALLBACKS on the web. +var DefaultStaticFallbackURLs = []string{ + "https://torrentclaw.github.io/mirrors/mirrors.json", +} + +// FetchMirrorsWithFallback pulls the mirror list using FetchMirrors against +// `candidates` first; if every candidate fails, it falls back to the static +// JSON copies on off-domain hosts (GitHub Pages, Cloudflare Pages, …). +// +// This is the function `unarr mirrors update` should call when it wants the +// strongest "give me a working mirror list no matter what" guarantee. +func FetchMirrorsWithFallback(ctx context.Context, candidates []string, userAgent string) (*MirrorsResponse, error) { + resp, err := FetchMirrors(ctx, candidates, userAgent) + if err == nil { + return resp, nil + } + if len(DefaultStaticFallbackURLs) == 0 { + return nil, err + } + // Try the static JSON files directly. They follow the same wire shape so + // we can reuse the same parser — but the URLs already include the JSON + // suffix so we hit them with `fetchMirrorsJSON` instead of FetchMirrors + // (which appends /api/v1/mirrors). + staticResp, staticErr := fetchMirrorsJSON(ctx, DefaultStaticFallbackURLs, userAgent) + if staticErr == nil { + return staticResp, nil + } + return nil, fmt.Errorf("primary failed (%v) and static fallback failed (%v)", err, staticErr) +} + +// fetchMirrorsJSON pulls a MirrorsResponse from already-fully-qualified URLs +// (e.g. https://torrentclaw.github.io/mirrors/mirrors.json). Each candidate +// is tried in order; the first success wins. +func fetchMirrorsJSON(ctx context.Context, urls []string, userAgent string) (*MirrorsResponse, error) { + if len(urls) == 0 { + return nil, fmt.Errorf("no static fallback URLs configured") + } + hc := &http.Client{Timeout: 15 * time.Second} + var lastErr error + for _, url := range urls { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + lastErr = err + continue + } + if userAgent != "" { + req.Header.Set("User-Agent", userAgent) + } + req.Header.Set("Accept", "application/json") + resp, err := hc.Do(req) + if err != nil { + lastErr = err + continue + } + body, readErr := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + resp.Body.Close() + if readErr != nil { + lastErr = readErr + continue + } + if resp.StatusCode >= 400 { + lastErr = fmt.Errorf("%s returned HTTP %d", url, resp.StatusCode) + continue + } + var out MirrorsResponse + if err := json.Unmarshal(body, &out); err != nil { + lastErr = fmt.Errorf("%s: invalid JSON: %w", url, err) + continue + } + if len(out.Mirrors) == 0 { + lastErr = fmt.Errorf("%s returned empty mirror list", url) + continue + } + return &out, nil + } + if lastErr == nil { + lastErr = fmt.Errorf("no reachable static fallback") + } + return nil, lastErr +} + +// FetchMirrors pulls the latest mirror list from the server. +// +// The endpoint is intentionally public and unauthenticated: the whole point +// of mirror discovery is that it must work even when the user's API key +// is invalid, expired, or the auth path is unreachable. The function tries +// each candidate base URL in order so a takedown of the primary doesn't +// also kill mirror discovery. +func FetchMirrors(ctx context.Context, candidates []string, userAgent string) (*MirrorsResponse, error) { + if len(candidates) == 0 { + return nil, fmt.Errorf("no mirror discovery URLs configured") + } + + hc := &http.Client{Timeout: 15 * time.Second} + + var lastErr error + for _, base := range candidates { + if base == "" { + continue + } + url := base + "/api/v1/mirrors" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + lastErr = err + continue + } + if userAgent != "" { + req.Header.Set("User-Agent", userAgent) + } + req.Header.Set("Accept", "application/json") + + resp, err := hc.Do(req) + if err != nil { + lastErr = err + continue + } + body, readErr := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + resp.Body.Close() + if readErr != nil { + lastErr = readErr + continue + } + if resp.StatusCode >= 400 { + lastErr = fmt.Errorf("%s returned HTTP %d", base, resp.StatusCode) + continue + } + var out MirrorsResponse + if err := json.Unmarshal(body, &out); err != nil { + lastErr = fmt.Errorf("%s: invalid JSON: %w", base, err) + continue + } + if len(out.Mirrors) == 0 { + lastErr = fmt.Errorf("%s returned empty mirror list", base) + continue + } + return &out, nil + } + + if lastErr == nil { + lastErr = fmt.Errorf("no reachable mirror discovery endpoint") + } + return nil, fmt.Errorf("fetch mirrors: %w", lastErr) +} + +// ToConfig splits a MirrorsResponse into (primary, extras) suitable for +// rebuilding a MirrorPool or persisting back into config.toml. +// +// The "primary" returned here is whichever entry has primary=true. If none +// are flagged, the first one wins. +func (m *MirrorsResponse) ToConfig() (primary string, extras []string) { + if m == nil { + return "", nil + } + var picked *MirrorEntry + for i := range m.Mirrors { + if m.Mirrors[i].Primary { + picked = &m.Mirrors[i] + break + } + } + if picked == nil && len(m.Mirrors) > 0 { + picked = &m.Mirrors[0] + } + if picked != nil { + primary = picked.URL + } + for _, e := range m.Mirrors { + if e.URL == primary { + continue + } + extras = append(extras, e.URL) + } + return primary, extras +} diff --git a/internal/agent/mirror_pool.go b/internal/agent/mirror_pool.go new file mode 100644 index 0000000..e8f737b --- /dev/null +++ b/internal/agent/mirror_pool.go @@ -0,0 +1,172 @@ +package agent + +import ( + "context" + "errors" + "net" + "net/http" + "net/url" + "strings" + "sync" +) + +// MirrorPool holds the ordered list of API base URLs the client is willing to +// fall back to when the current mirror is unreachable. The first entry is +// always the "preferred" mirror configured by the user. Subsequent entries +// are alternate domains we can rotate to without changing any user-visible +// configuration — they exist so a long-lived agent survives a takedown of +// the primary host without needing a new release. +// +// The pool is concurrency-safe; rotation is a fast O(1) index bump under a +// mutex. The previously-active mirror is NEVER removed — it might just be +// temporarily unreachable from one network path. +type MirrorPool struct { + mu sync.RWMutex + mirrors []string + current int +} + +// NewMirrorPool builds a pool from the provided base URLs. The primary URL +// is always first; "extras" are appended in order and de-duplicated. Empty +// strings are skipped. Trailing slashes are normalised so callers can concat +// `pool.Current() + "/api/..."` reliably. +func NewMirrorPool(primary string, extras []string) *MirrorPool { + seen := make(map[string]struct{}) + var out []string + + add := func(raw string) { + raw = strings.TrimRight(strings.TrimSpace(raw), "/") + if raw == "" { + return + } + if _, dup := seen[raw]; dup { + return + } + seen[raw] = struct{}{} + out = append(out, raw) + } + + add(primary) + for _, e := range extras { + add(e) + } + + if len(out) == 0 { + // Defensive: always return a pool with at least one entry so callers + // can call Current() without nil checks. The empty string would + // produce obvious errors immediately, which is preferable to a panic + // somewhere deep in net/http. + out = []string{""} + } + + return &MirrorPool{mirrors: out} +} + +// Current returns the active base URL. +func (p *MirrorPool) Current() string { + p.mu.RLock() + defer p.mu.RUnlock() + return p.mirrors[p.current] +} + +// Mirrors returns a copy of the configured base URLs in priority order. +func (p *MirrorPool) Mirrors() []string { + p.mu.RLock() + defer p.mu.RUnlock() + out := make([]string, len(p.mirrors)) + copy(out, p.mirrors) + return out +} + +// Len reports how many mirrors are configured. +func (p *MirrorPool) Len() int { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.mirrors) +} + +// Rotate moves the cursor to the next mirror in the pool, wrapping around. +// Returns the new current mirror and whether a rotation actually happened +// (a single-mirror pool returns false). +func (p *MirrorPool) Rotate() (string, bool) { + p.mu.Lock() + defer p.mu.Unlock() + if len(p.mirrors) <= 1 { + return p.mirrors[p.current], false + } + p.current = (p.current + 1) % len(p.mirrors) + return p.mirrors[p.current], true +} + +// Replace swaps the entire mirror set, e.g. after `unarr mirrors update` +// downloaded a fresh list from /api/v1/mirrors. Resets the cursor to 0 so +// the newly-discovered primary is tried first. +func (p *MirrorPool) Replace(primary string, extras []string) { + fresh := NewMirrorPool(primary, extras) + p.mu.Lock() + defer p.mu.Unlock() + p.mirrors = fresh.mirrors + p.current = 0 +} + +// IsTransient reports whether an error is the kind we should retry against +// another mirror. The intent is conservative: rotate on connection-level +// failures (DNS, refused, TLS, timeouts, 5xx) but NOT on auth or validation +// errors that would just fail again somewhere else. +func IsTransient(err error) bool { + if err == nil { + return false + } + + var httpErr *HTTPError + if errors.As(err, &httpErr) { + switch httpErr.StatusCode { + case http.StatusBadGateway, + http.StatusServiceUnavailable, + http.StatusGatewayTimeout, + http.StatusRequestTimeout: + return true + } + // 4xx (auth, rate limit, validation) won't get healthier on another mirror. + return false + } + + if errors.Is(err, context.DeadlineExceeded) { + return true + } + + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return true + } + + var dnsErr *net.DNSError + if errors.As(err, &dnsErr) { + return true + } + + var urlErr *url.Error + if errors.As(err, &urlErr) { + // `connection refused`, `EOF`, `tls: ...` end up as wrapped url.Errors. + msg := urlErr.Error() + if strings.Contains(msg, "connection refused") || + strings.Contains(msg, "no such host") || + strings.Contains(msg, "EOF") || + strings.Contains(msg, "tls:") || + strings.Contains(msg, "i/o timeout") || + strings.Contains(msg, "network is unreachable") { + return true + } + } + + // Bare strings as last resort — net.OpError messages are unstable across Go versions. + msg := err.Error() + if strings.Contains(msg, "connection refused") || + strings.Contains(msg, "no such host") || + strings.Contains(msg, "i/o timeout") || + strings.Contains(msg, "network is unreachable") { + return true + } + + return false +} diff --git a/internal/agent/signal_client.go b/internal/agent/signal_client.go index 27fe2e1..e41a9ea 100644 --- a/internal/agent/signal_client.go +++ b/internal/agent/signal_client.go @@ -103,7 +103,7 @@ func (s *SignalEventStream) Close() error { func (c *Client) OpenSignalStream(ctx context.Context, sessionID string) (*SignalEventStream, error) { streamCtx, cancel := context.WithCancel(ctx) - url := fmt.Sprintf("%s/api/internal/stream/signal/%s/events", c.baseURL, sessionID) + url := fmt.Sprintf("%s/api/internal/stream/signal/%s/events", c.baseURL(), sessionID) req, err := http.NewRequestWithContext(streamCtx, http.MethodGet, url, nil) if err != nil { cancel() diff --git a/internal/cmd/agent_client.go b/internal/cmd/agent_client.go new file mode 100644 index 0000000..a903096 --- /dev/null +++ b/internal/cmd/agent_client.go @@ -0,0 +1,23 @@ +package cmd + +import ( + "github.com/torrentclaw/unarr/internal/agent" + "github.com/torrentclaw/unarr/internal/config" +) + +// newAgentClientFromConfig builds an agent.Client wired with the mirror pool +// from the user's TOML config. Use this instead of agent.NewClient in any +// long-running command (daemon, status loop, etc.) so a `.com` outage rolls +// over to `.to` / .onion without restarting the agent. +// +// The function lives in cmd/ rather than agent/ because it has to know +// about the config struct, and cmd/ is the only place that owns the +// "wire defaults + user overrides" rule. +func newAgentClientFromConfig(cfg config.Config, userAgent string) *agent.Client { + return agent.NewClientWithMirrors( + cfg.Auth.APIURL, + cfg.Auth.Mirrors, + cfg.Auth.APIKey, + userAgent, + ) +} diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 717dfbb..84a1245 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -161,9 +161,10 @@ func runDaemonStart() error { MaxTranscodeHeight: maxTranscodeHeight, } - // Create HTTP client — single communication channel - agentClient := agent.NewClient(cfg.Auth.APIURL, cfg.Auth.APIKey, userAgent) - log.Printf("Transport: HTTP sync → %s", cfg.Auth.APIURL) + // Create HTTP client with mirror failover so a `.com` block-out rolls + // over to `.to` / .onion without restarting the daemon. + agentClient := newAgentClientFromConfig(cfg, userAgent) + log.Printf("Transport: HTTP sync → %s (mirrors: %d)", cfg.Auth.APIURL, len(cfg.Auth.Mirrors)) // Create daemon d := agent.NewDaemon(daemonCfg, agentClient) diff --git a/internal/cmd/mirrors.go b/internal/cmd/mirrors.go new file mode 100644 index 0000000..76870a7 --- /dev/null +++ b/internal/cmd/mirrors.go @@ -0,0 +1,204 @@ +package cmd + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "time" + + "github.com/fatih/color" + "github.com/spf13/cobra" + "github.com/torrentclaw/unarr/internal/agent" + "github.com/torrentclaw/unarr/internal/config" +) + +// newMirrorsCmd wires `unarr mirrors` and its subcommands. +// +// Mirrors are alternate base URLs the agent can fall back to when the +// primary api_url is unreachable. The pool is consulted on every transient +// network failure (DNS, refused, timeout, 5xx) — see internal/agent/ +// mirror_pool.go for the rotation rules. +func newMirrorsCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "mirrors", + Short: "Manage TorrentClaw mirror failover list", + Long: `Mirrors are alternate base URLs the agent falls back to when the primary +domain is unreachable. The pool survives DNS blocks, ISP filters, and +short-lived takedowns without restarting the agent. + +Examples: + unarr mirrors list Print currently configured mirrors + unarr mirrors update Refresh from the server's canonical list + unarr mirrors test Probe every configured mirror`, + } + + cmd.AddCommand(newMirrorsListCmd()) + cmd.AddCommand(newMirrorsUpdateCmd()) + cmd.AddCommand(newMirrorsTestCmd()) + return cmd +} + +func newMirrorsListCmd() *cobra.Command { + return &cobra.Command{ + Use: "list", + Short: "Print currently configured mirrors", + RunE: func(cmd *cobra.Command, args []string) error { + cfg := loadConfig() + pool := agent.NewMirrorPool(cfg.Auth.APIURL, cfg.Auth.Mirrors) + + if jsonOut { + out := map[string]any{ + "primary": cfg.Auth.APIURL, + "mirrors": pool.Mirrors(), + } + return json.NewEncoder(os.Stdout).Encode(out) + } + + fmt.Printf("Primary: %s\n", color.GreenString(cfg.Auth.APIURL)) + if len(cfg.Auth.Mirrors) == 0 { + fmt.Println("Fallbacks: (none configured — run `unarr mirrors update`)") + return nil + } + fmt.Println("Fallbacks:") + for i, m := range cfg.Auth.Mirrors { + fmt.Printf(" %d. %s\n", i+1, m) + } + return nil + }, + } +} + +func newMirrorsUpdateCmd() *cobra.Command { + return &cobra.Command{ + Use: "update", + Short: "Refresh the mirror list from the server", + Long: `Fetch /api/v1/mirrors from the configured primary (with fallback to any +currently-known mirrors) and write the resulting list back to config.toml. + +This is how long-running agents survive a takedown of the primary domain: +the user runs ` + "`unarr mirrors update`" + ` once a week (or via cron), and +the agent transparently picks up new mirrors without a CLI release.`, + RunE: func(cmd *cobra.Command, args []string) error { + cfg := loadConfig() + + // Candidate set: primary + any currently-known mirrors. Order matters — + // we try primary first so the most-trusted endpoint wins. + candidates := append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + fmt.Println("Refreshing mirror list...") + resp, err := agent.FetchMirrorsWithFallback(ctx, candidates, "unarr/"+Version) + if err != nil { + return fmt.Errorf("fetch mirrors: %w", err) + } + + primary, extras := resp.ToConfig() + if primary == "" { + return fmt.Errorf("server returned no mirrors") + } + + // Track what changed so we can give the user a clear diff. + added, removed := diffMirrors(append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...), append([]string{primary}, extras...)) + + cfg.Auth.APIURL = primary + cfg.Auth.Mirrors = extras + if err := config.Save(cfg, cfgFile); err != nil { + return fmt.Errorf("save config: %w", err) + } + + fmt.Printf("%s revision %d (%d mirror%s)\n", + color.GreenString("✓"), resp.Revision, len(resp.Mirrors), pluralS(len(resp.Mirrors))) + fmt.Printf(" Primary: %s\n", primary) + if len(extras) > 0 { + fmt.Printf(" Fallbacks: %s\n", strings.Join(extras, ", ")) + } + if resp.Tor != nil { + fmt.Printf(" Tor: %s\n", resp.Tor.URL) + } + for _, c := range resp.Channels { + fmt.Printf(" Channel: %s — %s\n", c.Label, c.URL) + } + if len(added) > 0 { + fmt.Printf(" %s %s\n", color.GreenString("added:"), strings.Join(added, ", ")) + } + if len(removed) > 0 { + fmt.Printf(" %s %s\n", color.YellowString("removed:"), strings.Join(removed, ", ")) + } + return nil + }, + } +} + +func newMirrorsTestCmd() *cobra.Command { + return &cobra.Command{ + Use: "test", + Short: "Probe every configured mirror", + Long: `Performs a small unauthenticated HEAD/GET against /api/health on every +configured mirror and reports latency + reachability.`, + RunE: func(cmd *cobra.Command, args []string) error { + cfg := loadConfig() + all := append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...) + if len(all) == 0 { + return fmt.Errorf("no mirrors configured") + } + + for _, base := range all { + if base == "" { + continue + } + ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + start := time.Now() + _, err := agent.FetchMirrors(ctx, []string{base}, "unarr/"+Version) + cancel() + elapsed := time.Since(start) + if err != nil { + fmt.Printf(" %s %s — %s (%s)\n", color.RedString("✗"), base, err, elapsed.Round(time.Millisecond)) + continue + } + fmt.Printf(" %s %s (%s)\n", color.GreenString("✓"), base, elapsed.Round(time.Millisecond)) + } + return nil + }, + } +} + +// diffMirrors returns the URLs added and removed between two ordered lists. +// Used to print a friendly diff after `unarr mirrors update`. +func diffMirrors(old, fresh []string) (added, removed []string) { + oldSet := make(map[string]struct{}, len(old)) + for _, m := range old { + if m != "" { + oldSet[m] = struct{}{} + } + } + freshSet := make(map[string]struct{}, len(fresh)) + for _, m := range fresh { + if m == "" { + continue + } + freshSet[m] = struct{}{} + if _, ok := oldSet[m]; !ok { + added = append(added, m) + } + } + for _, m := range old { + if m == "" { + continue + } + if _, ok := freshSet[m]; !ok { + removed = append(removed, m) + } + } + return added, removed +} + +func pluralS(n int) string { + if n == 1 { + return "" + } + return "s" +} diff --git a/internal/cmd/root.go b/internal/cmd/root.go index ab3021c..8df3cc3 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -108,6 +108,8 @@ Source: https://github.com/torrentclaw/unarr`, probeHWAccelCmd.GroupID = "system" cleanCmd := newCleanCmd() cleanCmd.GroupID = "system" + mirrorsCmd := newMirrorsCmd() + mirrorsCmd.GroupID = "system" selfUpdateCmd := newSelfUpdateCmd() selfUpdateCmd.GroupID = "system" versionCmd := newVersionCmd() @@ -144,6 +146,7 @@ Source: https://github.com/torrentclaw/unarr`, doctorCmd, probeHWAccelCmd, cleanCmd, + mirrorsCmd, selfUpdateCmd, versionCmd, completionCmd, diff --git a/internal/cmd/status.go b/internal/cmd/status.go index 5b451a5..f43d6ca 100644 --- a/internal/cmd/status.go +++ b/internal/cmd/status.go @@ -2,6 +2,7 @@ package cmd import ( "context" + "errors" "fmt" "runtime" "strings" @@ -58,7 +59,7 @@ func runStatus() error { go func() { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - ac := agent.NewClient(cfg.Auth.APIURL, cfg.Auth.APIKey, "unarr/"+Version) + ac := newAgentClientFromConfig(cfg, "unarr/"+Version) resp, err := ac.Register(ctx, agent.RegisterRequest{ AgentID: cfg.Agent.ID, Name: cfg.Agent.Name, @@ -74,7 +75,17 @@ func runStatus() error { cyan.Println(" Account") ar := <-accountCh if ar.err != nil { - dim.Println(" Could not fetch account info") + var httpErr *agent.HTTPError + switch { + case errors.As(ar.err, &httpErr) && httpErr.StatusCode == 401: + yellow.Println(" API key invalid or revoked") + fmt.Printf(" Run %s to re-authenticate\n", cyan.Sprint("unarr login")) + case errors.As(ar.err, &httpErr) && httpErr.StatusCode == 403: + yellow.Println(" API key lacks permission for this server") + fmt.Printf(" Check plan or run %s\n", cyan.Sprint("unarr login")) + default: + dim.Printf(" Could not fetch account info (%v)\n", ar.err) + } } else { fmt.Printf(" User: %s\n", ar.user.Name) fmt.Printf(" Email: %s\n", ar.user.Email) diff --git a/internal/config/config.go b/internal/config/config.go index d5b0f91..d3c18f9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -26,6 +26,11 @@ type Config struct { type AuthConfig struct { APIKey string `toml:"api_key"` APIURL string `toml:"api_url"` + // Mirrors lists alternate base URLs the agent will fall back to when the + // primary api_url is unreachable. Ordered by preference. Refreshed at + // runtime by `unarr mirrors update` against /api/v1/mirrors so a long- + // running agent survives a primary takedown without a new release. + Mirrors []string `toml:"mirrors"` } type AgentConfig struct { @@ -113,6 +118,12 @@ func Default() Config { return Config{ Auth: AuthConfig{ APIURL: "https://torrentclaw.com", + // Default mirror list. Kept in sync with src/lib/mirrors-config.ts + // on the server. Users can override with `unarr mirrors update`, + // which pulls the live list from /api/v1/mirrors. + Mirrors: []string{ + "https://torrentclaw.to", + }, }, Download: DownloadConfig{ PreferredMethod: "auto", @@ -187,6 +198,9 @@ func applyDefaults(cfg *Config, meta toml.MetaData) { if !meta.IsDefined("auth", "api_url") { cfg.Auth.APIURL = "https://torrentclaw.com" } + if !meta.IsDefined("auth", "mirrors") { + cfg.Auth.Mirrors = []string{"https://torrentclaw.to"} + } if !meta.IsDefined("downloads", "preferred_method") { cfg.Download.PreferredMethod = "auto" }