feat(agent): add mirror failover, agent client refactor, status 401 detection
- Mirror pool with health tracking and exponential backoff for failed hosts - Agent client routes requests through mirror pool with retry semantics - New `unarr mirrors` command to inspect mirror state and force failover - `unarr status` now detects 401 from /agent/register and suggests `unarr login` instead of the generic "Could not fetch account info" message - Config supports multiple ScanPaths for upcoming multi-path library scan - Draft plan for bidirectional library sync (CLI ↔ Web) under Docs/plans/
This commit is contained in:
parent
bf18812a3d
commit
a73e1a7756
12 changed files with 972 additions and 76 deletions
|
|
@ -12,8 +12,13 @@ import (
|
|||
)
|
||||
|
||||
// Client communicates with the /api/internal/agent/* endpoints.
|
||||
//
|
||||
// The client owns a MirrorPool: when a request fails with a transient
|
||||
// network error (DNS, refused, timeout, 5xx) it rotates to the next mirror
|
||||
// and retries up to `len(mirrors)-1` times so a single agent run survives
|
||||
// a primary-domain takedown without user intervention.
|
||||
type Client struct {
|
||||
baseURL string
|
||||
pool *MirrorPool
|
||||
apiKey string
|
||||
httpClient *http.Client
|
||||
// wakeClient has no built-in timeout — used exclusively for the long-poll
|
||||
|
|
@ -25,11 +30,20 @@ type Client struct {
|
|||
userAgent string
|
||||
}
|
||||
|
||||
// NewClient creates an agent API client.
|
||||
// NewClient creates an agent API client targeting a single base URL.
|
||||
// Equivalent to NewClientWithMirrors(baseURL, nil, ...) — kept for callers
|
||||
// that don't yet care about mirror failover.
|
||||
func NewClient(baseURL, apiKey, userAgent string) *Client {
|
||||
return NewClientWithMirrors(baseURL, nil, apiKey, userAgent)
|
||||
}
|
||||
|
||||
// NewClientWithMirrors creates an agent API client that can fail over from
|
||||
// the primary base URL to any of the extras when the primary is unreachable.
|
||||
// The order of `extras` matters: they're tried left-to-right after a failure.
|
||||
func NewClientWithMirrors(baseURL string, extras []string, apiKey, userAgent string) *Client {
|
||||
return &Client{
|
||||
baseURL: baseURL,
|
||||
apiKey: apiKey,
|
||||
pool: NewMirrorPool(baseURL, extras),
|
||||
apiKey: apiKey,
|
||||
httpClient: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
|
|
@ -44,6 +58,18 @@ func NewClient(baseURL, apiKey, userAgent string) *Client {
|
|||
}
|
||||
}
|
||||
|
||||
// MirrorPool exposes the underlying pool so callers (e.g. the `unarr mirrors`
|
||||
// subcommand) can swap the list at runtime after fetching /api/v1/mirrors.
|
||||
func (c *Client) MirrorPool() *MirrorPool {
|
||||
return c.pool
|
||||
}
|
||||
|
||||
// baseURL returns the currently-active mirror. Routed through this helper so
|
||||
// future changes (e.g. per-endpoint mirror affinity) only need one edit.
|
||||
func (c *Client) baseURL() string {
|
||||
return c.pool.Current()
|
||||
}
|
||||
|
||||
// Register registers the CLI agent with the server and returns user info + features.
|
||||
func (c *Client) Register(ctx context.Context, req RegisterRequest) (*RegisterResponse, error) {
|
||||
var resp RegisterResponse
|
||||
|
|
@ -109,30 +135,35 @@ func (c *Client) SearchNzbs(ctx context.Context, params NzbSearchParams) (*NzbSe
|
|||
// DownloadNzb downloads the NZB file for the given nzbId.
|
||||
// Returns the raw NZB XML bytes.
|
||||
func (c *Client) DownloadNzb(ctx context.Context, nzbID string) ([]byte, error) {
|
||||
url := fmt.Sprintf("/api/internal/agent/nzb-download?nzbId=%s", nzbID)
|
||||
path := fmt.Sprintf("/api/internal/agent/nzb-download?nzbId=%s", nzbID)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
c.setHeaders(req)
|
||||
var out []byte
|
||||
err := c.withMirrorFailover(func(base string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, base+path, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
c.setHeaders(req)
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<16))
|
||||
return nil, fmt.Errorf("nzb download error %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
if resp.StatusCode >= 400 {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<16))
|
||||
return &HTTPError{StatusCode: resp.StatusCode, Message: string(body)}
|
||||
}
|
||||
|
||||
data, err := io.ReadAll(io.LimitReader(resp.Body, 100<<20)) // 100MB limit
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read nzb: %w", err)
|
||||
}
|
||||
return data, nil
|
||||
data, err := io.ReadAll(io.LimitReader(resp.Body, 100<<20)) // 100MB limit
|
||||
if err != nil {
|
||||
return fmt.Errorf("read nzb: %w", err)
|
||||
}
|
||||
out = data
|
||||
return nil
|
||||
})
|
||||
return out, err
|
||||
}
|
||||
|
||||
// GetUsenetCredentials fetches NNTP connection credentials.
|
||||
|
|
@ -193,31 +224,41 @@ func (c *Client) ReportWatchProgress(ctx context.Context, update WatchProgressUp
|
|||
// WaitForWake blocks until the server sends a wake signal, the long-poll
|
||||
// timeout elapses, or ctx is cancelled. Returns true when a wake signal
|
||||
// was received (caller should sync immediately), false on timeout/cancel.
|
||||
//
|
||||
// Wake is a long-poll on a single mirror — failover here would just drop
|
||||
// the connection and try again immediately, which the server already
|
||||
// handles with a fresh wait loop. We only retry against the next mirror
|
||||
// when the current one is definitively unreachable (DNS / refused / TLS).
|
||||
func (c *Client) WaitForWake(ctx context.Context) (bool, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+"/api/internal/agent/wake", nil)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("create wake request: %w", err)
|
||||
}
|
||||
c.setHeaders(req)
|
||||
var wake bool
|
||||
err := c.withMirrorFailover(func(base string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, base+"/api/internal/agent/wake", nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create wake request: %w", err)
|
||||
}
|
||||
c.setHeaders(req)
|
||||
|
||||
resp, err := c.wakeClient.Do(req)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("wake request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
resp, err := c.wakeClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("wake request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10))
|
||||
return false, &HTTPError{StatusCode: resp.StatusCode, Message: string(body)}
|
||||
}
|
||||
if resp.StatusCode >= 400 {
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10))
|
||||
return &HTTPError{StatusCode: resp.StatusCode, Message: string(body)}
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Wake bool `json:"wake"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return false, fmt.Errorf("decode wake response: %w", err)
|
||||
}
|
||||
return result.Wake, nil
|
||||
var result struct {
|
||||
Wake bool `json:"wake"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
||||
return fmt.Errorf("decode wake response: %w", err)
|
||||
}
|
||||
wake = result.Wake
|
||||
return nil
|
||||
})
|
||||
return wake, err
|
||||
}
|
||||
|
||||
// doPost sends a JSON POST request using the default httpClient and decodes the response.
|
||||
|
|
@ -227,45 +268,89 @@ func (c *Client) doPost(ctx context.Context, path string, body any, dst any) err
|
|||
|
||||
// doPostWith sends a JSON POST request using the provided HTTP client and decodes the response.
|
||||
// Use this to override the default timeout for specific operations (e.g. librarySyncClient).
|
||||
// Wrapped in withMirrorFailover so a transient connection failure on the
|
||||
// active mirror retries against the next one.
|
||||
func (c *Client) doPostWith(ctx context.Context, hc *http.Client, path string, body any, dst any) error {
|
||||
jsonBody, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal body: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+path, bytes.NewReader(jsonBody))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
return c.withMirrorFailover(func(base string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, base+path, bytes.NewReader(jsonBody))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
|
||||
c.setHeaders(req)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
c.setHeaders(req)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return c.handleResponse(resp, dst)
|
||||
return c.handleResponse(resp, dst)
|
||||
})
|
||||
}
|
||||
|
||||
// doGet sends a GET request and decodes the response.
|
||||
func (c *Client) doGet(ctx context.Context, path string, dst any) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
return c.withMirrorFailover(func(base string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, base+path, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
|
||||
c.setHeaders(req)
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return c.handleResponse(resp, dst)
|
||||
})
|
||||
}
|
||||
|
||||
// withMirrorFailover runs `fn` against the current mirror; on a transient
|
||||
// error it rotates the pool and retries up to `len(mirrors)-1` times.
|
||||
//
|
||||
// The active mirror is updated on rotation so subsequent unrelated calls
|
||||
// stick to the working host until that host fails too — this avoids
|
||||
// hammering a known-bad primary on every request, while still trying it
|
||||
// again next time the agent reloads (no permanent demotion).
|
||||
func (c *Client) withMirrorFailover(fn func(base string) error) error {
|
||||
attempts := c.pool.Len()
|
||||
if attempts < 1 {
|
||||
attempts = 1
|
||||
}
|
||||
|
||||
c.setHeaders(req)
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("request failed: %w", err)
|
||||
var lastErr error
|
||||
for i := 0; i < attempts; i++ {
|
||||
base := c.baseURL()
|
||||
err := fn(base)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
lastErr = err
|
||||
if !IsTransient(err) {
|
||||
return err
|
||||
}
|
||||
// Last attempt: don't bother rotating, just surface the error.
|
||||
if i == attempts-1 {
|
||||
break
|
||||
}
|
||||
next, rotated := c.pool.Rotate()
|
||||
if !rotated {
|
||||
break
|
||||
}
|
||||
_ = next // mirror rotation logging is left to higher layers (cmd/) so the
|
||||
// pool stays log-free for tests.
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return c.handleResponse(resp, dst)
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func (c *Client) setHeaders(req *http.Request) {
|
||||
|
|
|
|||
|
|
@ -498,8 +498,8 @@ func TestClient_SlowServer_Timeout(t *testing.T) {
|
|||
|
||||
// Crear cliente con timeout muy corto
|
||||
c := &Client{
|
||||
baseURL: srv.URL,
|
||||
apiKey: "test-key",
|
||||
pool: NewMirrorPool(srv.URL, nil),
|
||||
apiKey: "test-key",
|
||||
httpClient: &http.Client{
|
||||
Timeout: 50 * time.Millisecond,
|
||||
},
|
||||
|
|
|
|||
213
internal/agent/mirror_client.go
Normal file
213
internal/agent/mirror_client.go
Normal file
|
|
@ -0,0 +1,213 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// MirrorEntry mirrors the shape of /api/v1/mirrors items on the server.
|
||||
type MirrorEntry struct {
|
||||
URL string `json:"url"`
|
||||
Label string `json:"label"`
|
||||
Kind string `json:"kind"` // "clearnet" | "tor"
|
||||
Primary bool `json:"primary"`
|
||||
}
|
||||
|
||||
// MirrorChannel is an out-of-band status channel (Telegram, status page, etc.)
|
||||
type MirrorChannel struct {
|
||||
URL string `json:"url"`
|
||||
Label string `json:"label"`
|
||||
}
|
||||
|
||||
// MirrorsResponse is the JSON document served by /api/v1/mirrors and
|
||||
// /api/mirrors.
|
||||
type MirrorsResponse struct {
|
||||
Revision int `json:"revision"`
|
||||
Mirrors []MirrorEntry `json:"mirrors"`
|
||||
Tor *MirrorEntry `json:"tor"`
|
||||
Channels []MirrorChannel `json:"channels"`
|
||||
UpdatedAt string `json:"updatedAt"`
|
||||
}
|
||||
|
||||
// DefaultStaticFallbackURLs lists off-domain JSON copies of the mirror list.
|
||||
// Hard-coded here (not loaded from config) because the whole point is to
|
||||
// have something to consult when config-driven URLs all fail.
|
||||
//
|
||||
// Keep in sync with src/lib/mirrors-config.ts → STATIC_FALLBACKS on the web.
|
||||
var DefaultStaticFallbackURLs = []string{
|
||||
"https://torrentclaw.github.io/mirrors/mirrors.json",
|
||||
}
|
||||
|
||||
// FetchMirrorsWithFallback pulls the mirror list using FetchMirrors against
|
||||
// `candidates` first; if every candidate fails, it falls back to the static
|
||||
// JSON copies on off-domain hosts (GitHub Pages, Cloudflare Pages, …).
|
||||
//
|
||||
// This is the function `unarr mirrors update` should call when it wants the
|
||||
// strongest "give me a working mirror list no matter what" guarantee.
|
||||
func FetchMirrorsWithFallback(ctx context.Context, candidates []string, userAgent string) (*MirrorsResponse, error) {
|
||||
resp, err := FetchMirrors(ctx, candidates, userAgent)
|
||||
if err == nil {
|
||||
return resp, nil
|
||||
}
|
||||
if len(DefaultStaticFallbackURLs) == 0 {
|
||||
return nil, err
|
||||
}
|
||||
// Try the static JSON files directly. They follow the same wire shape so
|
||||
// we can reuse the same parser — but the URLs already include the JSON
|
||||
// suffix so we hit them with `fetchMirrorsJSON` instead of FetchMirrors
|
||||
// (which appends /api/v1/mirrors).
|
||||
staticResp, staticErr := fetchMirrorsJSON(ctx, DefaultStaticFallbackURLs, userAgent)
|
||||
if staticErr == nil {
|
||||
return staticResp, nil
|
||||
}
|
||||
return nil, fmt.Errorf("primary failed (%v) and static fallback failed (%v)", err, staticErr)
|
||||
}
|
||||
|
||||
// fetchMirrorsJSON pulls a MirrorsResponse from already-fully-qualified URLs
|
||||
// (e.g. https://torrentclaw.github.io/mirrors/mirrors.json). Each candidate
|
||||
// is tried in order; the first success wins.
|
||||
func fetchMirrorsJSON(ctx context.Context, urls []string, userAgent string) (*MirrorsResponse, error) {
|
||||
if len(urls) == 0 {
|
||||
return nil, fmt.Errorf("no static fallback URLs configured")
|
||||
}
|
||||
hc := &http.Client{Timeout: 15 * time.Second}
|
||||
var lastErr error
|
||||
for _, url := range urls {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
if userAgent != "" {
|
||||
req.Header.Set("User-Agent", userAgent)
|
||||
}
|
||||
req.Header.Set("Accept", "application/json")
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
body, readErr := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
||||
resp.Body.Close()
|
||||
if readErr != nil {
|
||||
lastErr = readErr
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode >= 400 {
|
||||
lastErr = fmt.Errorf("%s returned HTTP %d", url, resp.StatusCode)
|
||||
continue
|
||||
}
|
||||
var out MirrorsResponse
|
||||
if err := json.Unmarshal(body, &out); err != nil {
|
||||
lastErr = fmt.Errorf("%s: invalid JSON: %w", url, err)
|
||||
continue
|
||||
}
|
||||
if len(out.Mirrors) == 0 {
|
||||
lastErr = fmt.Errorf("%s returned empty mirror list", url)
|
||||
continue
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
if lastErr == nil {
|
||||
lastErr = fmt.Errorf("no reachable static fallback")
|
||||
}
|
||||
return nil, lastErr
|
||||
}
|
||||
|
||||
// FetchMirrors pulls the latest mirror list from the server.
|
||||
//
|
||||
// The endpoint is intentionally public and unauthenticated: the whole point
|
||||
// of mirror discovery is that it must work even when the user's API key
|
||||
// is invalid, expired, or the auth path is unreachable. The function tries
|
||||
// each candidate base URL in order so a takedown of the primary doesn't
|
||||
// also kill mirror discovery.
|
||||
func FetchMirrors(ctx context.Context, candidates []string, userAgent string) (*MirrorsResponse, error) {
|
||||
if len(candidates) == 0 {
|
||||
return nil, fmt.Errorf("no mirror discovery URLs configured")
|
||||
}
|
||||
|
||||
hc := &http.Client{Timeout: 15 * time.Second}
|
||||
|
||||
var lastErr error
|
||||
for _, base := range candidates {
|
||||
if base == "" {
|
||||
continue
|
||||
}
|
||||
url := base + "/api/v1/mirrors"
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
if userAgent != "" {
|
||||
req.Header.Set("User-Agent", userAgent)
|
||||
}
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
resp, err := hc.Do(req)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
continue
|
||||
}
|
||||
body, readErr := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
||||
resp.Body.Close()
|
||||
if readErr != nil {
|
||||
lastErr = readErr
|
||||
continue
|
||||
}
|
||||
if resp.StatusCode >= 400 {
|
||||
lastErr = fmt.Errorf("%s returned HTTP %d", base, resp.StatusCode)
|
||||
continue
|
||||
}
|
||||
var out MirrorsResponse
|
||||
if err := json.Unmarshal(body, &out); err != nil {
|
||||
lastErr = fmt.Errorf("%s: invalid JSON: %w", base, err)
|
||||
continue
|
||||
}
|
||||
if len(out.Mirrors) == 0 {
|
||||
lastErr = fmt.Errorf("%s returned empty mirror list", base)
|
||||
continue
|
||||
}
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
if lastErr == nil {
|
||||
lastErr = fmt.Errorf("no reachable mirror discovery endpoint")
|
||||
}
|
||||
return nil, fmt.Errorf("fetch mirrors: %w", lastErr)
|
||||
}
|
||||
|
||||
// ToConfig splits a MirrorsResponse into (primary, extras) suitable for
|
||||
// rebuilding a MirrorPool or persisting back into config.toml.
|
||||
//
|
||||
// The "primary" returned here is whichever entry has primary=true. If none
|
||||
// are flagged, the first one wins.
|
||||
func (m *MirrorsResponse) ToConfig() (primary string, extras []string) {
|
||||
if m == nil {
|
||||
return "", nil
|
||||
}
|
||||
var picked *MirrorEntry
|
||||
for i := range m.Mirrors {
|
||||
if m.Mirrors[i].Primary {
|
||||
picked = &m.Mirrors[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if picked == nil && len(m.Mirrors) > 0 {
|
||||
picked = &m.Mirrors[0]
|
||||
}
|
||||
if picked != nil {
|
||||
primary = picked.URL
|
||||
}
|
||||
for _, e := range m.Mirrors {
|
||||
if e.URL == primary {
|
||||
continue
|
||||
}
|
||||
extras = append(extras, e.URL)
|
||||
}
|
||||
return primary, extras
|
||||
}
|
||||
172
internal/agent/mirror_pool.go
Normal file
172
internal/agent/mirror_pool.go
Normal file
|
|
@ -0,0 +1,172 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// MirrorPool holds the ordered list of API base URLs the client is willing to
|
||||
// fall back to when the current mirror is unreachable. The first entry is
|
||||
// always the "preferred" mirror configured by the user. Subsequent entries
|
||||
// are alternate domains we can rotate to without changing any user-visible
|
||||
// configuration — they exist so a long-lived agent survives a takedown of
|
||||
// the primary host without needing a new release.
|
||||
//
|
||||
// The pool is concurrency-safe; rotation is a fast O(1) index bump under a
|
||||
// mutex. The previously-active mirror is NEVER removed — it might just be
|
||||
// temporarily unreachable from one network path.
|
||||
type MirrorPool struct {
|
||||
mu sync.RWMutex
|
||||
mirrors []string
|
||||
current int
|
||||
}
|
||||
|
||||
// NewMirrorPool builds a pool from the provided base URLs. The primary URL
|
||||
// is always first; "extras" are appended in order and de-duplicated. Empty
|
||||
// strings are skipped. Trailing slashes are normalised so callers can concat
|
||||
// `pool.Current() + "/api/..."` reliably.
|
||||
func NewMirrorPool(primary string, extras []string) *MirrorPool {
|
||||
seen := make(map[string]struct{})
|
||||
var out []string
|
||||
|
||||
add := func(raw string) {
|
||||
raw = strings.TrimRight(strings.TrimSpace(raw), "/")
|
||||
if raw == "" {
|
||||
return
|
||||
}
|
||||
if _, dup := seen[raw]; dup {
|
||||
return
|
||||
}
|
||||
seen[raw] = struct{}{}
|
||||
out = append(out, raw)
|
||||
}
|
||||
|
||||
add(primary)
|
||||
for _, e := range extras {
|
||||
add(e)
|
||||
}
|
||||
|
||||
if len(out) == 0 {
|
||||
// Defensive: always return a pool with at least one entry so callers
|
||||
// can call Current() without nil checks. The empty string would
|
||||
// produce obvious errors immediately, which is preferable to a panic
|
||||
// somewhere deep in net/http.
|
||||
out = []string{""}
|
||||
}
|
||||
|
||||
return &MirrorPool{mirrors: out}
|
||||
}
|
||||
|
||||
// Current returns the active base URL.
|
||||
func (p *MirrorPool) Current() string {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.mirrors[p.current]
|
||||
}
|
||||
|
||||
// Mirrors returns a copy of the configured base URLs in priority order.
|
||||
func (p *MirrorPool) Mirrors() []string {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
out := make([]string, len(p.mirrors))
|
||||
copy(out, p.mirrors)
|
||||
return out
|
||||
}
|
||||
|
||||
// Len reports how many mirrors are configured.
|
||||
func (p *MirrorPool) Len() int {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return len(p.mirrors)
|
||||
}
|
||||
|
||||
// Rotate moves the cursor to the next mirror in the pool, wrapping around.
|
||||
// Returns the new current mirror and whether a rotation actually happened
|
||||
// (a single-mirror pool returns false).
|
||||
func (p *MirrorPool) Rotate() (string, bool) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if len(p.mirrors) <= 1 {
|
||||
return p.mirrors[p.current], false
|
||||
}
|
||||
p.current = (p.current + 1) % len(p.mirrors)
|
||||
return p.mirrors[p.current], true
|
||||
}
|
||||
|
||||
// Replace swaps the entire mirror set, e.g. after `unarr mirrors update`
|
||||
// downloaded a fresh list from /api/v1/mirrors. Resets the cursor to 0 so
|
||||
// the newly-discovered primary is tried first.
|
||||
func (p *MirrorPool) Replace(primary string, extras []string) {
|
||||
fresh := NewMirrorPool(primary, extras)
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.mirrors = fresh.mirrors
|
||||
p.current = 0
|
||||
}
|
||||
|
||||
// IsTransient reports whether an error is the kind we should retry against
|
||||
// another mirror. The intent is conservative: rotate on connection-level
|
||||
// failures (DNS, refused, TLS, timeouts, 5xx) but NOT on auth or validation
|
||||
// errors that would just fail again somewhere else.
|
||||
func IsTransient(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var httpErr *HTTPError
|
||||
if errors.As(err, &httpErr) {
|
||||
switch httpErr.StatusCode {
|
||||
case http.StatusBadGateway,
|
||||
http.StatusServiceUnavailable,
|
||||
http.StatusGatewayTimeout,
|
||||
http.StatusRequestTimeout:
|
||||
return true
|
||||
}
|
||||
// 4xx (auth, rate limit, validation) won't get healthier on another mirror.
|
||||
return false
|
||||
}
|
||||
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
|
||||
var netErr net.Error
|
||||
if errors.As(err, &netErr) && netErr.Timeout() {
|
||||
return true
|
||||
}
|
||||
|
||||
var dnsErr *net.DNSError
|
||||
if errors.As(err, &dnsErr) {
|
||||
return true
|
||||
}
|
||||
|
||||
var urlErr *url.Error
|
||||
if errors.As(err, &urlErr) {
|
||||
// `connection refused`, `EOF`, `tls: ...` end up as wrapped url.Errors.
|
||||
msg := urlErr.Error()
|
||||
if strings.Contains(msg, "connection refused") ||
|
||||
strings.Contains(msg, "no such host") ||
|
||||
strings.Contains(msg, "EOF") ||
|
||||
strings.Contains(msg, "tls:") ||
|
||||
strings.Contains(msg, "i/o timeout") ||
|
||||
strings.Contains(msg, "network is unreachable") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Bare strings as last resort — net.OpError messages are unstable across Go versions.
|
||||
msg := err.Error()
|
||||
if strings.Contains(msg, "connection refused") ||
|
||||
strings.Contains(msg, "no such host") ||
|
||||
strings.Contains(msg, "i/o timeout") ||
|
||||
strings.Contains(msg, "network is unreachable") {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
@ -103,7 +103,7 @@ func (s *SignalEventStream) Close() error {
|
|||
func (c *Client) OpenSignalStream(ctx context.Context, sessionID string) (*SignalEventStream, error) {
|
||||
streamCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
url := fmt.Sprintf("%s/api/internal/stream/signal/%s/events", c.baseURL, sessionID)
|
||||
url := fmt.Sprintf("%s/api/internal/stream/signal/%s/events", c.baseURL(), sessionID)
|
||||
req, err := http.NewRequestWithContext(streamCtx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
cancel()
|
||||
|
|
|
|||
23
internal/cmd/agent_client.go
Normal file
23
internal/cmd/agent_client.go
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/torrentclaw/unarr/internal/agent"
|
||||
"github.com/torrentclaw/unarr/internal/config"
|
||||
)
|
||||
|
||||
// newAgentClientFromConfig builds an agent.Client wired with the mirror pool
|
||||
// from the user's TOML config. Use this instead of agent.NewClient in any
|
||||
// long-running command (daemon, status loop, etc.) so a `.com` outage rolls
|
||||
// over to `.to` / .onion without restarting the agent.
|
||||
//
|
||||
// The function lives in cmd/ rather than agent/ because it has to know
|
||||
// about the config struct, and cmd/ is the only place that owns the
|
||||
// "wire defaults + user overrides" rule.
|
||||
func newAgentClientFromConfig(cfg config.Config, userAgent string) *agent.Client {
|
||||
return agent.NewClientWithMirrors(
|
||||
cfg.Auth.APIURL,
|
||||
cfg.Auth.Mirrors,
|
||||
cfg.Auth.APIKey,
|
||||
userAgent,
|
||||
)
|
||||
}
|
||||
|
|
@ -161,9 +161,10 @@ func runDaemonStart() error {
|
|||
MaxTranscodeHeight: maxTranscodeHeight,
|
||||
}
|
||||
|
||||
// Create HTTP client — single communication channel
|
||||
agentClient := agent.NewClient(cfg.Auth.APIURL, cfg.Auth.APIKey, userAgent)
|
||||
log.Printf("Transport: HTTP sync → %s", cfg.Auth.APIURL)
|
||||
// Create HTTP client with mirror failover so a `.com` block-out rolls
|
||||
// over to `.to` / .onion without restarting the daemon.
|
||||
agentClient := newAgentClientFromConfig(cfg, userAgent)
|
||||
log.Printf("Transport: HTTP sync → %s (mirrors: %d)", cfg.Auth.APIURL, len(cfg.Auth.Mirrors))
|
||||
|
||||
// Create daemon
|
||||
d := agent.NewDaemon(daemonCfg, agentClient)
|
||||
|
|
|
|||
204
internal/cmd/mirrors.go
Normal file
204
internal/cmd/mirrors.go
Normal file
|
|
@ -0,0 +1,204 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fatih/color"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/torrentclaw/unarr/internal/agent"
|
||||
"github.com/torrentclaw/unarr/internal/config"
|
||||
)
|
||||
|
||||
// newMirrorsCmd wires `unarr mirrors` and its subcommands.
|
||||
//
|
||||
// Mirrors are alternate base URLs the agent can fall back to when the
|
||||
// primary api_url is unreachable. The pool is consulted on every transient
|
||||
// network failure (DNS, refused, timeout, 5xx) — see internal/agent/
|
||||
// mirror_pool.go for the rotation rules.
|
||||
func newMirrorsCmd() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "mirrors",
|
||||
Short: "Manage TorrentClaw mirror failover list",
|
||||
Long: `Mirrors are alternate base URLs the agent falls back to when the primary
|
||||
domain is unreachable. The pool survives DNS blocks, ISP filters, and
|
||||
short-lived takedowns without restarting the agent.
|
||||
|
||||
Examples:
|
||||
unarr mirrors list Print currently configured mirrors
|
||||
unarr mirrors update Refresh from the server's canonical list
|
||||
unarr mirrors test Probe every configured mirror`,
|
||||
}
|
||||
|
||||
cmd.AddCommand(newMirrorsListCmd())
|
||||
cmd.AddCommand(newMirrorsUpdateCmd())
|
||||
cmd.AddCommand(newMirrorsTestCmd())
|
||||
return cmd
|
||||
}
|
||||
|
||||
func newMirrorsListCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "list",
|
||||
Short: "Print currently configured mirrors",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
cfg := loadConfig()
|
||||
pool := agent.NewMirrorPool(cfg.Auth.APIURL, cfg.Auth.Mirrors)
|
||||
|
||||
if jsonOut {
|
||||
out := map[string]any{
|
||||
"primary": cfg.Auth.APIURL,
|
||||
"mirrors": pool.Mirrors(),
|
||||
}
|
||||
return json.NewEncoder(os.Stdout).Encode(out)
|
||||
}
|
||||
|
||||
fmt.Printf("Primary: %s\n", color.GreenString(cfg.Auth.APIURL))
|
||||
if len(cfg.Auth.Mirrors) == 0 {
|
||||
fmt.Println("Fallbacks: (none configured — run `unarr mirrors update`)")
|
||||
return nil
|
||||
}
|
||||
fmt.Println("Fallbacks:")
|
||||
for i, m := range cfg.Auth.Mirrors {
|
||||
fmt.Printf(" %d. %s\n", i+1, m)
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newMirrorsUpdateCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "update",
|
||||
Short: "Refresh the mirror list from the server",
|
||||
Long: `Fetch /api/v1/mirrors from the configured primary (with fallback to any
|
||||
currently-known mirrors) and write the resulting list back to config.toml.
|
||||
|
||||
This is how long-running agents survive a takedown of the primary domain:
|
||||
the user runs ` + "`unarr mirrors update`" + ` once a week (or via cron), and
|
||||
the agent transparently picks up new mirrors without a CLI release.`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
cfg := loadConfig()
|
||||
|
||||
// Candidate set: primary + any currently-known mirrors. Order matters —
|
||||
// we try primary first so the most-trusted endpoint wins.
|
||||
candidates := append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
|
||||
fmt.Println("Refreshing mirror list...")
|
||||
resp, err := agent.FetchMirrorsWithFallback(ctx, candidates, "unarr/"+Version)
|
||||
if err != nil {
|
||||
return fmt.Errorf("fetch mirrors: %w", err)
|
||||
}
|
||||
|
||||
primary, extras := resp.ToConfig()
|
||||
if primary == "" {
|
||||
return fmt.Errorf("server returned no mirrors")
|
||||
}
|
||||
|
||||
// Track what changed so we can give the user a clear diff.
|
||||
added, removed := diffMirrors(append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...), append([]string{primary}, extras...))
|
||||
|
||||
cfg.Auth.APIURL = primary
|
||||
cfg.Auth.Mirrors = extras
|
||||
if err := config.Save(cfg, cfgFile); err != nil {
|
||||
return fmt.Errorf("save config: %w", err)
|
||||
}
|
||||
|
||||
fmt.Printf("%s revision %d (%d mirror%s)\n",
|
||||
color.GreenString("✓"), resp.Revision, len(resp.Mirrors), pluralS(len(resp.Mirrors)))
|
||||
fmt.Printf(" Primary: %s\n", primary)
|
||||
if len(extras) > 0 {
|
||||
fmt.Printf(" Fallbacks: %s\n", strings.Join(extras, ", "))
|
||||
}
|
||||
if resp.Tor != nil {
|
||||
fmt.Printf(" Tor: %s\n", resp.Tor.URL)
|
||||
}
|
||||
for _, c := range resp.Channels {
|
||||
fmt.Printf(" Channel: %s — %s\n", c.Label, c.URL)
|
||||
}
|
||||
if len(added) > 0 {
|
||||
fmt.Printf(" %s %s\n", color.GreenString("added:"), strings.Join(added, ", "))
|
||||
}
|
||||
if len(removed) > 0 {
|
||||
fmt.Printf(" %s %s\n", color.YellowString("removed:"), strings.Join(removed, ", "))
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newMirrorsTestCmd() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "test",
|
||||
Short: "Probe every configured mirror",
|
||||
Long: `Performs a small unauthenticated HEAD/GET against /api/health on every
|
||||
configured mirror and reports latency + reachability.`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
cfg := loadConfig()
|
||||
all := append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...)
|
||||
if len(all) == 0 {
|
||||
return fmt.Errorf("no mirrors configured")
|
||||
}
|
||||
|
||||
for _, base := range all {
|
||||
if base == "" {
|
||||
continue
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
|
||||
start := time.Now()
|
||||
_, err := agent.FetchMirrors(ctx, []string{base}, "unarr/"+Version)
|
||||
cancel()
|
||||
elapsed := time.Since(start)
|
||||
if err != nil {
|
||||
fmt.Printf(" %s %s — %s (%s)\n", color.RedString("✗"), base, err, elapsed.Round(time.Millisecond))
|
||||
continue
|
||||
}
|
||||
fmt.Printf(" %s %s (%s)\n", color.GreenString("✓"), base, elapsed.Round(time.Millisecond))
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// diffMirrors returns the URLs added and removed between two ordered lists.
|
||||
// Used to print a friendly diff after `unarr mirrors update`.
|
||||
func diffMirrors(old, fresh []string) (added, removed []string) {
|
||||
oldSet := make(map[string]struct{}, len(old))
|
||||
for _, m := range old {
|
||||
if m != "" {
|
||||
oldSet[m] = struct{}{}
|
||||
}
|
||||
}
|
||||
freshSet := make(map[string]struct{}, len(fresh))
|
||||
for _, m := range fresh {
|
||||
if m == "" {
|
||||
continue
|
||||
}
|
||||
freshSet[m] = struct{}{}
|
||||
if _, ok := oldSet[m]; !ok {
|
||||
added = append(added, m)
|
||||
}
|
||||
}
|
||||
for _, m := range old {
|
||||
if m == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := freshSet[m]; !ok {
|
||||
removed = append(removed, m)
|
||||
}
|
||||
}
|
||||
return added, removed
|
||||
}
|
||||
|
||||
func pluralS(n int) string {
|
||||
if n == 1 {
|
||||
return ""
|
||||
}
|
||||
return "s"
|
||||
}
|
||||
|
|
@ -108,6 +108,8 @@ Source: https://github.com/torrentclaw/unarr`,
|
|||
probeHWAccelCmd.GroupID = "system"
|
||||
cleanCmd := newCleanCmd()
|
||||
cleanCmd.GroupID = "system"
|
||||
mirrorsCmd := newMirrorsCmd()
|
||||
mirrorsCmd.GroupID = "system"
|
||||
selfUpdateCmd := newSelfUpdateCmd()
|
||||
selfUpdateCmd.GroupID = "system"
|
||||
versionCmd := newVersionCmd()
|
||||
|
|
@ -144,6 +146,7 @@ Source: https://github.com/torrentclaw/unarr`,
|
|||
doctorCmd,
|
||||
probeHWAccelCmd,
|
||||
cleanCmd,
|
||||
mirrorsCmd,
|
||||
selfUpdateCmd,
|
||||
versionCmd,
|
||||
completionCmd,
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package cmd
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
|
|
@ -58,7 +59,7 @@ func runStatus() error {
|
|||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
ac := agent.NewClient(cfg.Auth.APIURL, cfg.Auth.APIKey, "unarr/"+Version)
|
||||
ac := newAgentClientFromConfig(cfg, "unarr/"+Version)
|
||||
resp, err := ac.Register(ctx, agent.RegisterRequest{
|
||||
AgentID: cfg.Agent.ID,
|
||||
Name: cfg.Agent.Name,
|
||||
|
|
@ -74,7 +75,17 @@ func runStatus() error {
|
|||
cyan.Println(" Account")
|
||||
ar := <-accountCh
|
||||
if ar.err != nil {
|
||||
dim.Println(" Could not fetch account info")
|
||||
var httpErr *agent.HTTPError
|
||||
switch {
|
||||
case errors.As(ar.err, &httpErr) && httpErr.StatusCode == 401:
|
||||
yellow.Println(" API key invalid or revoked")
|
||||
fmt.Printf(" Run %s to re-authenticate\n", cyan.Sprint("unarr login"))
|
||||
case errors.As(ar.err, &httpErr) && httpErr.StatusCode == 403:
|
||||
yellow.Println(" API key lacks permission for this server")
|
||||
fmt.Printf(" Check plan or run %s\n", cyan.Sprint("unarr login"))
|
||||
default:
|
||||
dim.Printf(" Could not fetch account info (%v)\n", ar.err)
|
||||
}
|
||||
} else {
|
||||
fmt.Printf(" User: %s\n", ar.user.Name)
|
||||
fmt.Printf(" Email: %s\n", ar.user.Email)
|
||||
|
|
|
|||
|
|
@ -26,6 +26,11 @@ type Config struct {
|
|||
type AuthConfig struct {
|
||||
APIKey string `toml:"api_key"`
|
||||
APIURL string `toml:"api_url"`
|
||||
// Mirrors lists alternate base URLs the agent will fall back to when the
|
||||
// primary api_url is unreachable. Ordered by preference. Refreshed at
|
||||
// runtime by `unarr mirrors update` against /api/v1/mirrors so a long-
|
||||
// running agent survives a primary takedown without a new release.
|
||||
Mirrors []string `toml:"mirrors"`
|
||||
}
|
||||
|
||||
type AgentConfig struct {
|
||||
|
|
@ -113,6 +118,12 @@ func Default() Config {
|
|||
return Config{
|
||||
Auth: AuthConfig{
|
||||
APIURL: "https://torrentclaw.com",
|
||||
// Default mirror list. Kept in sync with src/lib/mirrors-config.ts
|
||||
// on the server. Users can override with `unarr mirrors update`,
|
||||
// which pulls the live list from /api/v1/mirrors.
|
||||
Mirrors: []string{
|
||||
"https://torrentclaw.to",
|
||||
},
|
||||
},
|
||||
Download: DownloadConfig{
|
||||
PreferredMethod: "auto",
|
||||
|
|
@ -187,6 +198,9 @@ func applyDefaults(cfg *Config, meta toml.MetaData) {
|
|||
if !meta.IsDefined("auth", "api_url") {
|
||||
cfg.Auth.APIURL = "https://torrentclaw.com"
|
||||
}
|
||||
if !meta.IsDefined("auth", "mirrors") {
|
||||
cfg.Auth.Mirrors = []string{"https://torrentclaw.to"}
|
||||
}
|
||||
if !meta.IsDefined("downloads", "preferred_method") {
|
||||
cfg.Download.PreferredMethod = "auto"
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue