From 88316e701761bfad0ce4af6eb28b862ea3e74c4c Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Tue, 26 May 2026 20:39:57 +0200 Subject: [PATCH 01/32] feat(funnel): cloudflare quick tunnel embedded subprocess (0.9.5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gives the daemon a public HTTPS hostname (`https://.trycloudflare.com`) so the in-browser player on torrentclaw.com plays cross-network without Tailscale or port forwarding — the mixed-content block that was breaking HTTPS-page → HTTP-daemon fetches is gone. Bytes proxy through CloudFlare, never through TorrentClaw infra (preserves the aggregator legal posture). New surface: • `internal/funnel/` package: subprocess wrapper + auto-download for cloudflared. Linux amd64/arm64/armhf/386 fetched from GitHub releases on first run, validated by ELF magic + size sanity, O_EXCL partial write so concurrent daemons don't clobber each other. • `unarr funnel on/off/status` cobra command (sibling of `unarr vpn`). • Daemon supervisor goroutine keeps cloudflared up across crashes + CF's ~6h Quick Tunnel rotation. Exponential backoff (2 s → 5 min). On exit the reported URL is cleared so the web stops handing out a dead host. • Wire: agent registers/syncs a FunnelURL field; web prefers it over Tailscale/LAN for in-browser playback (HlsStreamPlayer + Stremio addon). Default ON for fresh installs (NAS/Docker get it without terminal-in); existing configs that pre-date the feature stay off until the operator opts in with `unarr funnel on`. Docker image now bundles cloudflared (built per TARGETARCH via buildx). Also fixed: libx264 'frame MB size > level limit' on anamorphic >16:9 sources. The level we hint to libx264 was derived from height alone, which busted on 720p cinemascope (1728×720 = 4860 MBs > level 3.1's 3600). Bumped each tier: 720p → 4.0, 1080p → 4.1. Version: 0.9.4 → 0.9.5. --- CHANGELOG.md | 38 +++++++ Dockerfile | 19 +++- README.md | 58 +++++++++++ internal/agent/daemon.go | 18 ++++ internal/agent/state.go | 5 + internal/agent/sync.go | 6 ++ internal/agent/types.go | 5 + internal/cmd/daemon.go | 61 ++++++++++++ internal/cmd/funnel.go | 165 ++++++++++++++++++++++++++++++ internal/cmd/root.go | 3 + internal/cmd/version.go | 2 +- internal/config/config.go | 23 +++++ internal/engine/hwaccel.go | 22 ++-- internal/funnel/funnel.go | 199 +++++++++++++++++++++++++++++++++++++ internal/funnel/install.go | 167 +++++++++++++++++++++++++++++++ 15 files changed, 778 insertions(+), 13 deletions(-) create mode 100644 internal/cmd/funnel.go create mode 100644 internal/funnel/funnel.go create mode 100644 internal/funnel/install.go diff --git a/CHANGELOG.md b/CHANGELOG.md index dfc0f79..38118cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,44 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.5] - 2026-05-26 + +### Added + +- **funnel**: optional CloudFlare Quick Tunnel subprocess. `unarr funnel on` + spawns `cloudflared` as a child process and registers an anonymous + `https://.trycloudflare.com` hostname tunnelled to the daemon's + HLS server. The hostname is reported back to the web on every sync so the + in-browser player picks it up automatically — cross-network playback now + works on torrentclaw.com without Tailscale or port forwarding. Bytes + proxy through CloudFlare; TorrentClaw still doesn't relay content. +- **funnel**: on by default for fresh installs (NAS/Docker get cross-network + HTTPS automatically); existing configs that pre-date the feature stay + off until the operator runs `unarr funnel on`. +- **funnel**: auto-downloads cloudflared to the unarr data dir when not on + PATH (Linux amd64/arm64/armhf/386). ELF magic + size sanity check on the + download; `O_EXCL` partial-write so concurrent daemons don't clobber + each other. +- **funnel**: subprocess supervisor keeps the tunnel up across cloudflared + crashes + CF's ~6h Quick Tunnel rotation. Exponential backoff (2 s → 5 min) + on persistent failures. The web's reported URL is cleared the moment + cloudflared exits so an outdated hostname doesn't keep handing out 502s. +- **funnel**: `unarr funnel status` shows the live URL once registered. + See README §`[downloads.funnel]` for the throughput / latency caveats of + CF's free Quick Tunnels. +- **docker**: the official `torrentclaw/unarr` image now bundles + `cloudflared` so the funnel works the moment the container starts — no + first-run download. + +### Fixed + +- **hls/libx264**: bump the H.264 level we hint to libx264 by one tier so + anamorphic (>16:9) sources stop emitting unplayable streams. 720p at + level 3.1 silently rejected 1728×720 cinemascope frames with + `frame MB size > level limit`; 720p now ships at level 4.0, 1080p at 4.1. + Decoder compatibility is unaffected — every device that handles 1080p + already handles ≥ 4.1. + ## [0.9.4] - 2026-05-26 ### Removed diff --git a/Dockerfile b/Dockerfile index 1773622..64ea4e2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,10 +21,23 @@ FROM alpine:3.22 # Use Alpine's native musl ffmpeg + ffprobe instead of the johnvansickle / # BtbN static glibc builds — those need a glibc shim on Alpine and the # vector-math symbols the GPL builds reference are not satisfiable by -# gcompat. Alpine ships ffmpeg ~7.x which is fine for the WebRTC -# transcoding pipeline (libx264 + libfdk-aac alternatives included). +# gcompat. Alpine ships ffmpeg ~7.x which is fine for the HLS transcoding +# pipeline (libx264 + libfdk-aac alternatives included). RUN apk upgrade --no-cache && \ - apk add --no-cache ca-certificates tzdata ffmpeg + apk add --no-cache ca-certificates tzdata ffmpeg wget + +# Bundle cloudflared so `unarr funnel on` (default: on, see config defaults) +# Just Works on a headless container with no first-run network round-trip. +# TARGETARCH is set automatically by Docker buildx during cross-builds. +ARG TARGETARCH=amd64 +RUN case "$TARGETARCH" in \ + amd64) CF_ARCH=amd64 ;; \ + arm64) CF_ARCH=arm64 ;; \ + arm) CF_ARCH=armhf ;; \ + *) echo "unsupported TARGETARCH=$TARGETARCH" >&2; exit 1 ;; \ + esac && \ + wget -qO /usr/local/bin/cloudflared "https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-$CF_ARCH" && \ + chmod +x /usr/local/bin/cloudflared # Non-root user (UID 1000 matches typical host user for volume permissions) RUN addgroup -g 1000 unarr && adduser -u 1000 -G unarr -D -h /home/unarr unarr diff --git a/README.md b/README.md index e1cc6e3..4129ffb 100644 --- a/README.md +++ b/README.md @@ -476,6 +476,64 @@ with a clear error — install ffmpeg or set `enabled = false`. See the [VPN](#vpn) section above for how it works (split-tunnel, no root) and how to protect your other devices. +#### `[downloads.funnel]` — public HTTPS hostname for the daemon (CloudFlare Quick Tunnel) + +```toml +[downloads.funnel] +enabled = false # off by default +``` + +| Key | Type | Default | Notes | +|-----|------|---------|-------| +| `enabled` | bool | `false` | Spawns `cloudflared tunnel --url http://localhost:` as a child process at daemon startup. Toggle with `unarr funnel on` / `off`. Requires `cloudflared` on PATH. | + +**What it does.** Without a tunnel, the daemon is reachable on `localhost`, +your LAN, and (if installed) Tailscale. That covers the same-machine and +Tailscale-connected cases, but the **browser-based player on torrentclaw.com +fails on any other network** because HTTPS pages can't fetch HTTP resources +("mixed content"). Enabling the funnel gives the daemon a public +`https://.trycloudflare.com` hostname so the web player picks it up +and playback works from anywhere — phone on cellular, friend's laptop on a +foreign Wi-Fi, anywhere. The Stremio addon already works cross-network +(native mpv/VLC players ignore CORS), so this is strictly a web-player fix. + +**Privacy posture.** Bytes pass through CloudFlare's edge — TorrentClaw never +relays content (we don't see your traffic), CloudFlare does. Quick Tunnels +are **anonymous** (no CF account required); the registration is unauthenticated +and the hostname is a random label, but CF logs request metadata like any CDN +would. If you want zero third-party byte access, use Tailscale instead. + +**Limitations (free Quick Tunnels).** +| Aspect | Limit | +|--------|-------| +| Session lifetime | ~6 hours, then the hostname rotates. cloudflared re-registers automatically; the web picks up the new URL on the next sync. In-flight HLS sessions break across the rotation (browser retries). | +| Bandwidth | No documented hard cap, but CF reserves the right to throttle. 1080p HLS (~6 Mbps) is fine; 4K HEVC at 25 Mbps may hit throttling. | +| Latency | +20–80 ms vs direct LAN/Tailscale (extra hop browser → CF edge → tunnel). HLS player buffer absorbs it. | +| Concurrency | One tunnel serves N viewers. CF rate-limits ~200 req/s, plenty for HLS segments. | +| TOS | CloudFlare flags Quick Tunnels as "not for production traffic". They can decommission an abusive tunnel without notice. | + +For heavy / high-throughput / persistent-URL use cases, switch to a CloudFlare +Named Tunnel (free, needs a CF account) or run your own reverse proxy — both +out of scope for the bundled command. + +**Disable.** `unarr funnel off` flips `enabled` to `false` in the TOML and +prompts you to restart the daemon. You can also edit `config.toml` directly: + +```toml +[downloads.funnel] +enabled = false +``` + +**Install cloudflared.** +- Linux: `apt install cloudflared` (after adding CF's apt repo) — see + . Or pull the static binary from + . +- macOS: `brew install cloudflared`. +- Windows: `winget install --id Cloudflare.cloudflared`. + +If `cloudflared` is not on PATH the daemon logs a warning at startup and +falls back to LAN/Tailscale-only reachability. + ### Environment variables Environment variables override config file values: diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index d0b1458..109dfbb 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -55,6 +55,10 @@ type Daemon struct { vpnMode string vpnServer string + // CloudFlare Quick Tunnel public URL; folded into DaemonState + heartbeat + // so the web can prefer it over Tailscale/LAN for in-browser playback. + funnelURL string + // Watching tracks whether a user is viewing download progress in the web UI. Watching atomic.Bool @@ -85,6 +89,15 @@ func (d *Daemon) SetVPNState(active bool, mode, server string) { d.vpnServer = server } +// SetFunnelURL records the CloudFlare Quick Tunnel hostname so it's reflected +// in the daemon state file (read by `unarr funnel status`) and in heartbeat +// requests (so the web prefers it over Tailscale/LAN). Pass "" to clear. +func (d *Daemon) SetFunnelURL(url string) { + d.funnelURL = url + d.State.FunnelURL = url + WriteState(&d.State) +} + // UpdateStreamPort updates the stream port reported in sync requests. func (d *Daemon) UpdateStreamPort(port int) { d.cfg.StreamPort = port @@ -109,6 +122,7 @@ func (d *Daemon) Register(ctx context.Context) error { VPNActive: d.vpnActive, VPNMode: d.vpnMode, VPNServer: d.vpnServer, + FunnelURL: d.funnelURL, } if free, total, err := DiskInfo(d.cfg.DownloadDir); err == nil { req.DiskFreeBytes = free @@ -162,6 +176,7 @@ func (d *Daemon) Register(ctx context.Context) error { VPNActive: d.vpnActive, VPNMode: d.vpnMode, VPNServer: d.vpnServer, + FunnelURL: d.funnelURL, } WriteState(&d.State) @@ -234,6 +249,9 @@ func (d *Daemon) Run(ctx context.Context) error { d.sync.GetVPNState = func() (bool, string, string) { return d.vpnActive, d.vpnMode, d.vpnServer } + d.sync.GetFunnelURL = func() string { + return d.funnelURL + } d.sync.OnSyncSuccess = func() { d.State.LastHeartbeat = time.Now() if d.GetActiveCount != nil { diff --git a/internal/agent/state.go b/internal/agent/state.go index 1de71bf..1f00033 100644 --- a/internal/agent/state.go +++ b/internal/agent/state.go @@ -29,6 +29,11 @@ type DaemonState struct { VPNActive bool `json:"vpnActive,omitempty"` VPNMode string `json:"vpnMode,omitempty"` // managed | self-hosted VPNServer string `json:"vpnServer,omitempty"` // WireGuard endpoint (ip:port) + + // CloudFlare Quick Tunnel state, so `unarr funnel status` can report the + // HTTPS hostname the daemon is reachable at from anywhere on the internet. + // Empty when the funnel is off or hasn't registered yet. + FunnelURL string `json:"funnelUrl,omitempty"` } // stateFilePathFn is overridable for testing. diff --git a/internal/agent/sync.go b/internal/agent/sync.go index c28c65f..ac856a5 100644 --- a/internal/agent/sync.go +++ b/internal/agent/sync.go @@ -40,6 +40,9 @@ type SyncClient struct { // WireGuard tunnel is up, the mode, and the exit server) so the web can track // which agent holds the single WG slot. GetVPNState func() (active bool, mode, server string) + // GetFunnelURL returns the CloudFlare Quick Tunnel public hostname if one + // is active, else "". Sent on every sync so the web picks it up live. + GetFunnelURL func() string // OnDeleteFiles is called when the server requests file deletion from disk. // It should delete the files and return the IDs of successfully deleted items. OnDeleteFiles func(items []LibraryDeleteRequest) []int @@ -162,6 +165,9 @@ func (sc *SyncClient) buildRequest() SyncRequest { if sc.GetVPNState != nil { req.VPNActive, req.VPNMode, req.VPNServer = sc.GetVPNState() } + if sc.GetFunnelURL != nil { + req.FunnelURL = sc.GetFunnelURL() + } // Flush confirmed deletions from previous cycle. // Once flushed, remove IDs from deleteInFlight — the server will stop sending // them after this sync, so deduplication protection is no longer needed. diff --git a/internal/agent/types.go b/internal/agent/types.go index 72e8af5..00802bc 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -34,6 +34,9 @@ type RegisterRequest struct { VPNActive bool `json:"vpnActive"` VPNMode string `json:"vpnMode,omitempty"` // managed | self-hosted VPNServer string `json:"vpnServer,omitempty"` + // CloudFlare Quick Tunnel hostname when enabled; the web prefers it over + // Tailscale/LAN for in-browser playback because it works on any network. + FunnelURL string `json:"funnelUrl,omitempty"` } // RegisterResponse is returned by the server after registration. @@ -359,6 +362,8 @@ type SyncRequest struct { VPNActive bool `json:"vpnActive"` VPNMode string `json:"vpnMode,omitempty"` VPNServer string `json:"vpnServer,omitempty"` + // CloudFlare Quick Tunnel hostname when enabled, else empty. + FunnelURL string `json:"funnelUrl,omitempty"` } // ControlAction represents a server-side control signal for a task. diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 84c458c..6c00e95 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -17,6 +17,7 @@ import ( "github.com/torrentclaw/unarr/internal/agent" "github.com/torrentclaw/unarr/internal/config" "github.com/torrentclaw/unarr/internal/engine" + "github.com/torrentclaw/unarr/internal/funnel" "github.com/torrentclaw/unarr/internal/library" "github.com/torrentclaw/unarr/internal/library/mediainfo" "github.com/torrentclaw/unarr/internal/usenet/download" @@ -303,6 +304,15 @@ func runDaemonStart() error { } d.UpdateStreamPort(streamSrv.Port()) + // CloudFlare Quick Tunnel — needs the ACTUAL listening port (the + // configured port may have been busy and bumped). Spawning here ensures + // cloudflared --url points at the right socket. Failures degrade to + // Tailscale/LAN only; the supervisor keeps the tunnel up across CF's + // periodic rotation + transient cloudflared crashes. + if cfg.Download.Funnel.Enabled { + go superviseFunnel(ctx, d, streamSrv.Port()) + } + // Warn at startup if transcode is enabled but ffmpeg/ffprobe are missing. // HLS sessions get rejected at runtime (see daemon.go ~line 455), but // surfacing it here gives the operator a chance to install ffmpeg before @@ -773,3 +783,54 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, } } } + +// superviseFunnel keeps a CloudFlare Quick Tunnel up across cloudflared +// crashes and CF's ~6h tunnel rotation. On a clean exit (cancellation) it +// returns; on a crash it clears the reported URL and respawns with an +// exponential backoff so we don't hammer cloudflared into a tight loop when +// it can't reach the CF edge. +func superviseFunnel(ctx context.Context, d *agent.Daemon, port int) { + backoff := 2 * time.Second + const maxBackoff = 5 * time.Minute + for ctx.Err() == nil { + t, err := funnel.Start(ctx, funnel.Config{Port: port}) + if err != nil { + log.Printf("[funnel] could not start CloudFlare tunnel (%v) — retrying in %s", err, backoff) + select { + case <-time.After(backoff): + case <-ctx.Done(): + return + } + backoff = min(backoff*2, maxBackoff) + continue + } + log.Printf("[funnel] cloudflared started, waiting for public URL...") + go func() { + url, werr := t.WaitURL(45 * time.Second) + if werr != nil { + log.Printf("[funnel] cloudflared did not emit a URL (%v)", werr) + return + } + log.Printf("[funnel] public URL: %s", url) + d.SetFunnelURL(url) + }() + // Block until cloudflared exits (CF rotation, crash, or shutdown). + exitErr := <-t.Done() + _ = t.Close() + d.SetFunnelURL("") + if ctx.Err() != nil { + return + } + if exitErr != nil { + log.Printf("[funnel] cloudflared exited: %v — restarting in %s", exitErr, backoff) + } else { + log.Printf("[funnel] cloudflared exited cleanly — restarting in %s", backoff) + } + select { + case <-time.After(backoff): + case <-ctx.Done(): + return + } + backoff = min(backoff*2, maxBackoff) + } +} diff --git a/internal/cmd/funnel.go b/internal/cmd/funnel.go new file mode 100644 index 0000000..5ce793d --- /dev/null +++ b/internal/cmd/funnel.go @@ -0,0 +1,165 @@ +package cmd + +import ( + "fmt" + + "github.com/fatih/color" + "github.com/spf13/cobra" + "github.com/torrentclaw/unarr/internal/agent" + "github.com/torrentclaw/unarr/internal/config" +) + +func newFunnelCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "funnel", + Short: "Expose the daemon over a public HTTPS hostname via CloudFlare Quick Tunnel", + Long: `Turn the CloudFlare Quick Tunnel on/off and check its status. + +When on, the daemon spawns cloudflared as a child process and registers a +` + "`https://.trycloudflare.com`" + ` hostname tunnelled to its local +HLS server. The torrentclaw.com / torrentclaw.to web player picks the tunnel +URL first so cross-network playback works from any browser without Tailscale +or port forwarding. + +Trade-offs: + • Bytes proxy through CloudFlare. We don't relay; CF does. Preserves the + TorrentClaw legal posture but means CF sees your traffic shape. + • Quick Tunnels are anonymous — no CF account required. + • Hostname is random per session and rotates roughly every 6 h. + +Requires the cloudflared binary on PATH. Install: + Linux : https://pkg.cloudflare.com (apt) or download from + https://github.com/cloudflare/cloudflared/releases + macOS : brew install cloudflared + Windows: winget install --id Cloudflare.cloudflared`, + Example: ` unarr funnel status # is the tunnel up? what's the URL? + unarr funnel on # turn it on + unarr funnel off # turn it off`, + RunE: func(cmd *cobra.Command, args []string) error { + return cmd.Help() + }, + } + cmd.AddCommand(newFunnelStatusCmd(), newFunnelOnCmd(), newFunnelOffCmd()) + return cmd +} + +func newFunnelStatusCmd() *cobra.Command { + return &cobra.Command{ + Use: "status", + Short: "Show CloudFlare tunnel configuration + live URL", + Example: " unarr funnel status", + RunE: func(cmd *cobra.Command, args []string) error { + return runFunnelStatus() + }, + } +} + +func runFunnelStatus() error { + bold := color.New(color.Bold) + dim := color.New(color.FgHiBlack) + green := color.New(color.FgGreen) + yellow := color.New(color.FgYellow) + cyan := color.New(color.FgCyan) + + cfg := loadConfig() + + fmt.Println() + bold.Println(" CloudFlare Quick Tunnel") + fmt.Println() + + if !cfg.Download.Funnel.Enabled { + dim.Println(" Mode: off") + fmt.Println() + dim.Println(" Enable with `unarr funnel on` to give the daemon a public HTTPS URL") + dim.Println(" so cross-network browser playback works without Tailscale.") + fmt.Println() + return nil + } + cyan.Println(" Mode: on") + + state := agent.ReadState() + alive := state != nil && isDaemonAlive(state) + fmt.Println() + switch { + case alive && state.FunnelURL != "": + green.Println(" ✓ Tunnel ACTIVE") + fmt.Printf(" URL: %s\n", state.FunnelURL) + fmt.Println() + dim.Println(" This URL rotates roughly every 6 h. The web player picks it up") + dim.Println(" automatically — no action needed on your side.") + case alive: + yellow.Println(" ⚠ Daemon is running but the tunnel hasn't registered yet.") + dim.Println(" Check `unarr daemon logs` for a [funnel] line. Common cause:") + dim.Println(" cloudflared isn't installed on PATH.") + default: + dim.Println(" Daemon not running — start it (`unarr start`) to bring the tunnel up.") + } + fmt.Println() + return nil +} + +func newFunnelOnCmd() *cobra.Command { + return &cobra.Command{ + Use: "on", + Short: "Turn the CloudFlare tunnel on", + Example: " unarr funnel on", + RunE: func(cmd *cobra.Command, args []string) error { + return setFunnelEnabled(true) + }, + } +} + +func newFunnelOffCmd() *cobra.Command { + return &cobra.Command{ + Use: "off", + Short: "Turn the CloudFlare tunnel off", + Example: " unarr funnel off", + RunE: func(cmd *cobra.Command, args []string) error { + return setFunnelEnabled(false) + }, + } +} + +func setFunnelEnabled(enabled bool) error { + green := color.New(color.FgGreen) + dim := color.New(color.FgHiBlack) + + cfg := loadConfig() + if cfg.Download.Funnel.Enabled == enabled { + fmt.Println() + dim.Printf(" Tunnel is already %s — nothing to do.\n", onOffWord(enabled)) + fmt.Println() + return nil + } + + cfg.Download.Funnel.Enabled = enabled + + configPath := config.FilePath() + if cfgFile != "" { + configPath = cfgFile + } + if err := config.Save(cfg, configPath); err != nil { + return fmt.Errorf("save config: %w", err) + } + appCfg = cfg + + fmt.Println() + green.Printf(" ✓ CloudFlare tunnel %s.\n", onOffWord(enabled)) + + // Subprocess is launched/torn down by the daemon at startup; a plain config + // reload does not bring it up. Prompt for a restart when the daemon is alive. + if state := agent.ReadState(); state != nil && isDaemonAlive(state) { + fmt.Println() + dim.Println(" The daemon is running. Restart it for this to take effect:") + dim.Println(" unarr daemon restart") + } + fmt.Println() + return nil +} + +func onOffWord(enabled bool) string { + if enabled { + return "on" + } + return "off" +} diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 55786fb..b28ec92 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -105,6 +105,8 @@ Source: https://github.com/torrentclaw/unarr`, daemonCmd.GroupID = "daemon" vpnCmd := newVPNCmd() vpnCmd.GroupID = "daemon" + funnelCmd := newFunnelCmd() + funnelCmd.GroupID = "daemon" // System & Diagnostics statsCmd := newStatsCmd() @@ -149,6 +151,7 @@ Source: https://github.com/torrentclaw/unarr`, statusCmd, daemonCmd, vpnCmd, + funnelCmd, // System & Diagnostics statsCmd, doctorCmd, diff --git a/internal/cmd/version.go b/internal/cmd/version.go index 4cd7a03..3009c5a 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.9.4" +var Version = "0.9.5" diff --git a/internal/config/config.go b/internal/config/config.go index 6e65df8..12573b4 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -53,6 +53,16 @@ type DownloadConfig struct { CORSExtraOrigins []string `toml:"cors_extra_origins"` // extra browser origins added on top of the baked-in allowlist (torrentclaw.com, app.torrentclaw.com, localhost:3030) Transcode TranscodeConfig `toml:"transcode"` VPN VPNConfig `toml:"vpn"` + Funnel FunnelConfig `toml:"funnel"` +} + +// FunnelConfig gates the optional CloudFlare Quick Tunnel that exposes the +// daemon's HLS server over a public HTTPS hostname (https://.try +// cloudflare.com). Enabling it lets the web player on torrentclaw.com play +// from this daemon across any network without Tailscale or a public IP — +// the cost is that bytes proxy through CloudFlare's network. Off by default. +type FunnelConfig struct { + Enabled bool `toml:"enabled"` } // VPNConfig gates the managed-VPN add-on split-tunnel. When enabled, the daemon @@ -139,6 +149,13 @@ func Default() Config { AudioBitrate: "192k", MaxConcurrent: 2, }, + Funnel: FunnelConfig{ + // On by default so headless installs (NAS / Docker) get cross-network + // HTTPS playback without anyone having to terminal in. Users who + // don't want bytes proxied through CloudFlare can opt out with + // `unarr funnel off` (sets enabled=false in the TOML). + Enabled: true, + }, }, Organize: OrganizeConfig{ Enabled: true, @@ -227,6 +244,12 @@ func applyDefaults(cfg *Config, meta toml.MetaData) { if !meta.IsDefined("downloads", "transcode", "max_concurrent") { cfg.Download.Transcode.MaxConcurrent = 2 } + // NOTE: Funnel default-ON only applies to fresh installs (no config file → + // Default() returns Funnel.Enabled=true straight off). When an existing + // config file lacks `[downloads.funnel]` entirely we intentionally do NOT + // flip it on here — that would silently route an upgraded operator's + // traffic through CloudFlare without their consent. They opt in with + // `unarr funnel on` whenever they're ready. } // Save writes config to the default or specified path using atomic write. diff --git a/internal/engine/hwaccel.go b/internal/engine/hwaccel.go index 886a295..7108379 100644 --- a/internal/engine/hwaccel.go +++ b/internal/engine/hwaccel.go @@ -129,12 +129,13 @@ func (h HWAccel) FFmpegVideoCodec(target string) string { } } -// H264LevelForHeight returns the lowest H.264 profile level capable of encoding -// a stream at the given output pixel height (assumes ~16:9, ≤30 fps). The -// previous code used a fixed "4.0" which silently rejects anything above 1080p -// — libx264 logs "frame MB size > level limit" and emits a corrupt stream. -// Returning a tighter level on smaller outputs keeps player compatibility on -// older devices where the encoder can't auto-pick. +// H264LevelForHeight returns the lowest H.264 profile level capable of +// encoding a stream at the given output pixel height. Each tier carries +// enough macroblock headroom to handle ANAMORPHIC content (up to ~2.4:1 +// cinemascope) at 30 fps — a fixed 16:9 assumption used to silently bust +// the level on a 720p movie shot in 2.4:1 (1728×720 = 4860 MBs > 3.1's +// 3600 limit; libx264 logs "frame MB size > level limit" and emits a +// corrupt stream). func H264LevelForHeight(height int) string { switch { case height <= 0: @@ -142,11 +143,14 @@ func H264LevelForHeight(height int) string { // re-introduce the silent-failure mode that motivated this helper. return "5.1" case height <= 480: - return "3.0" - case height <= 720: return "3.1" - case height <= 1080: + case height <= 720: + // 4.0 instead of 3.1: covers 720p anamorphic (e.g. 1728×720) + + // MB rate up to 245k/s (3.1 caps at 108k/s — broken at 24 fps). return "4.0" + case height <= 1080: + // 4.1 instead of 4.0: covers 1080p anamorphic + 30 fps (~245k MBs/s). + return "4.1" case height <= 1440: return "5.0" case height <= 2160: diff --git a/internal/funnel/funnel.go b/internal/funnel/funnel.go new file mode 100644 index 0000000..6a8640a --- /dev/null +++ b/internal/funnel/funnel.go @@ -0,0 +1,199 @@ +// Package funnel manages the optional CloudFlare Quick Tunnel subprocess +// that gives the daemon a public HTTPS hostname for cross-network playback +// from browser-based clients (web player on torrentclaw.com / torrentclaw.to). +// +// Why: HTTPS pages can't fetch HTTP resources (mixed content). Without a +// tunnel the daemon is only reachable from the same machine (localhost is +// exempt) or via Tailscale (which users can install themselves but most +// won't). CF Quick Tunnels are anonymous — no CF account, no DNS, no port +// forwarding — and assign a one-shot `https://.trycloudflare.com` +// URL. Bytes flow through CF, never through our infra (legal posture: we +// don't relay; CF does). +// +// Lifecycle: +// +// t, err := funnel.Start(ctx, funnel.Config{Port: 11819}) +// defer t.Close() +// url, err := t.WaitURL(30 * time.Second) // blocks until cloudflared emits the URL +// +// The tunnel runs until the context is cancelled or t.Close() is called. +package funnel + +import ( + "bufio" + "context" + "errors" + "fmt" + "io" + "os/exec" + "regexp" + "sync" + "time" +) + +// urlPattern matches the `https://.trycloudflare.com` URL cloudflared +// prints when a Quick Tunnel is registered. The hostname has a random +// hyphen-separated label followed by .trycloudflare.com. +var urlPattern = regexp.MustCompile(`https://[a-z0-9-]+\.trycloudflare\.com`) + +// Config controls how the tunnel is launched. +type Config struct { + // Port is the local upstream port cloudflared will tunnel to. Required. + Port int + // Binary is the cloudflared executable path. When empty the package looks + // it up via $PATH. + Binary string +} + +// Tunnel is a handle on a running cloudflared Quick Tunnel. +type Tunnel struct { + cmd *exec.Cmd + cancel context.CancelFunc + urlCh chan string + exitCh chan error + mu sync.Mutex + url string + stopped bool +} + +// Start launches cloudflared as a subprocess. The returned *Tunnel exposes the +// public URL via WaitURL once cloudflared registers it (usually 2–5 s). +// +// The subprocess inherits the cancellation of the supplied context. Closing +// the *Tunnel sends SIGTERM and waits for the subprocess to exit. +func Start(ctx context.Context, cfg Config) (*Tunnel, error) { + if cfg.Port <= 0 { + return nil, fmt.Errorf("funnel: invalid Port %d", cfg.Port) + } + binary := cfg.Binary + if binary == "" { + resolved, err := ResolveBinary() + if err != nil { + return nil, err + } + binary = resolved + } + + subCtx, cancel := context.WithCancel(ctx) + // `--no-autoupdate` disables cloudflared's daily self-update check (the + // daemon manages binary rotation). `--metrics 127.0.0.1:0` suppresses the + // default `:9090` listener that would collide on a shared box. + cmd := exec.CommandContext(subCtx, binary, + "tunnel", + "--no-autoupdate", + "--metrics", "127.0.0.1:0", + "--url", fmt.Sprintf("http://localhost:%d", cfg.Port), + ) + + // cloudflared writes the connect log + assigned URL to stderr. + stderr, err := cmd.StderrPipe() + if err != nil { + cancel() + return nil, fmt.Errorf("funnel: pipe stderr: %w", err) + } + cmd.Stdout = io.Discard // quick tunnels print nothing useful on stdout + + if err := cmd.Start(); err != nil { + cancel() + return nil, fmt.Errorf("funnel: start cloudflared: %w", err) + } + + t := &Tunnel{ + cmd: cmd, + cancel: cancel, + urlCh: make(chan string, 1), + exitCh: make(chan error, 1), + } + + // Reader goroutine: scan cloudflared's stderr for the URL, surface the + // rest as a single string we don't try to interpret. + go t.scanStderr(stderr) + + // Waiter goroutine: signal exit so callers can react (e.g. restart). + go func() { + t.exitCh <- cmd.Wait() + }() + + return t, nil +} + +// WaitURL blocks until cloudflared has registered the tunnel and emitted the +// public URL, or `timeout` elapses, or the subprocess exits. The returned URL +// has the form `https://.trycloudflare.com`. +func (t *Tunnel) WaitURL(timeout time.Duration) (string, error) { + t.mu.Lock() + if t.url != "" { + u := t.url + t.mu.Unlock() + return u, nil + } + t.mu.Unlock() + + select { + case u := <-t.urlCh: + return u, nil + case err := <-t.exitCh: + if err == nil { + return "", errors.New("funnel: cloudflared exited before URL") + } + return "", fmt.Errorf("funnel: cloudflared exited: %w", err) + case <-time.After(timeout): + return "", fmt.Errorf("funnel: timed out waiting for URL after %s", timeout) + } +} + +// URL returns the assigned tunnel URL, or "" if not yet emitted. +func (t *Tunnel) URL() string { + t.mu.Lock() + defer t.mu.Unlock() + return t.url +} + +// Done returns a channel that closes once the subprocess exits. The error sent +// before close describes the exit reason (nil = clean shutdown via Close). +func (t *Tunnel) Done() <-chan error { + return t.exitCh +} + +// Close terminates the subprocess and waits for it to exit. Safe to call +// multiple times. +func (t *Tunnel) Close() error { + t.mu.Lock() + if t.stopped { + t.mu.Unlock() + return nil + } + t.stopped = true + t.mu.Unlock() + t.cancel() + // Drain the exit channel so the Wait goroutine doesn't leak. + select { + case <-t.exitCh: + case <-time.After(5 * time.Second): + } + return nil +} + +func (t *Tunnel) scanStderr(r io.Reader) { + scanner := bufio.NewScanner(r) + // Some cloudflared lines exceed the default 64KiB scanner buffer (when it + // prints connection diagnostics). Bump to 1MiB. + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + line := scanner.Text() + if t.URL() == "" { + if m := urlPattern.FindString(line); m != "" { + t.mu.Lock() + t.url = m + t.mu.Unlock() + // Non-blocking send: if no one is listening, just drop — + // the URL field carries the value for any later WaitURL call. + select { + case t.urlCh <- m: + default: + } + } + } + } +} + diff --git a/internal/funnel/install.go b/internal/funnel/install.go new file mode 100644 index 0000000..1e827a4 --- /dev/null +++ b/internal/funnel/install.go @@ -0,0 +1,167 @@ +package funnel + +import ( + "bytes" + "errors" + "fmt" + "io" + "net/http" + "os" + "os/exec" + "path/filepath" + "runtime" + "time" + + "github.com/torrentclaw/unarr/internal/config" +) + +// ResolveBinary returns the path to a usable cloudflared executable, downloading +// one into the unarr data dir if neither $PATH nor the cached location has it. +// This makes the funnel feature usable on headless installs (NAS / Docker) +// where the user can't easily install cloudflared via the OS package manager. +// +// Resolution order: +// +// 1. cloudflared on $PATH (operator already installed it) +// 2. /bin/cloudflared (we cached it on a previous run) +// 3. download from GitHub releases (Linux-only fallback; macOS / Windows +// return a clear error pointing at brew / winget) +func ResolveBinary() (string, error) { + if p, err := exec.LookPath("cloudflared"); err == nil { + return p, nil + } + cached := cachedBinaryPath() + if _, err := os.Stat(cached); err == nil { + return cached, nil + } + return downloadCloudflared(cached) +} + +func cachedBinaryPath() string { + name := "cloudflared" + if runtime.GOOS == "windows" { + name += ".exe" + } + return filepath.Join(config.DataDir(), "bin", name) +} + +// downloadCloudflared fetches the latest cloudflared release asset matching +// the current GOOS/GOARCH into `dest`. Linux only — macOS/Windows return a +// pointer at the OS package manager. +// +// Supply-chain caveat: we trust GitHub-over-TLS + cloudflare/cloudflared +// repo integrity. The fetch is over HTTPS to api.github.com's release-asset +// redirector, so a network MITM is bounded by Let's Encrypt + GitHub's cert +// chain. We additionally verify the file is an ELF binary (Linux magic +// bytes) so a generic 404 HTML page or a wrong-arch tarball is rejected at +// rest. We do NOT verify a signature because Cloudflare doesn't sign release +// assets at the moment — if you need stricter integrity, install cloudflared +// from your distro's package manager (apt/brew/winget) and unarr will use +// the PATH copy. +func downloadCloudflared(dest string) (string, error) { + if runtime.GOOS != "linux" { + return "", fmt.Errorf("funnel: auto-download not supported on %s — install cloudflared manually or drop a binary at %s", runtime.GOOS, dest) + } + + var asset string + switch runtime.GOARCH { + case "amd64": + asset = "cloudflared-linux-amd64" + case "arm64": + asset = "cloudflared-linux-arm64" + case "arm": + asset = "cloudflared-linux-armhf" + case "386": + asset = "cloudflared-linux-386" + default: + return "", fmt.Errorf("funnel: unsupported linux arch %q — install cloudflared manually", runtime.GOARCH) + } + + url := "https://github.com/cloudflare/cloudflared/releases/latest/download/" + asset + if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil { + return "", fmt.Errorf("funnel: create bin dir: %w", err) + } + + // O_EXCL so concurrent unarr-dev / prod daemons don't clobber each + // other's partial download. The loser gets EEXIST → falls back to + // polling for the winner to finish. + tmp := dest + ".partial" + out, err := os.OpenFile(tmp, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o755) + if err != nil { + if errors.Is(err, os.ErrExist) { + // Another process is downloading. Wait briefly for them to finish. + for range 60 { + time.Sleep(time.Second) + if _, statErr := os.Stat(dest); statErr == nil { + return dest, nil + } + } + return "", fmt.Errorf("funnel: another download in progress at %s (timed out)", tmp) + } + return "", fmt.Errorf("funnel: open dest: %w", err) + } + + client := &http.Client{Timeout: 5 * time.Minute} + resp, err := client.Get(url) + if err != nil { + _ = out.Close() + _ = os.Remove(tmp) + return "", fmt.Errorf("funnel: download cloudflared: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + _ = out.Close() + _ = os.Remove(tmp) + return "", fmt.Errorf("funnel: download cloudflared: HTTP %d from %s", resp.StatusCode, url) + } + + if _, err := io.Copy(out, resp.Body); err != nil { + _ = out.Close() + _ = os.Remove(tmp) + return "", fmt.Errorf("funnel: write dest: %w", err) + } + if err := out.Close(); err != nil { + _ = os.Remove(tmp) + return "", fmt.Errorf("funnel: close dest: %w", err) + } + + // Sanity check before promoting to : must be a Linux + // ELF executable (rejects 404 HTML pages or wrong-arch payloads) and at + // least 1 MB (real cloudflared is ~50 MB; anything smaller is corrupt). + if err := verifyLinuxElf(tmp); err != nil { + _ = os.Remove(tmp) + return "", fmt.Errorf("funnel: downloaded file failed sanity check: %w", err) + } + + if err := os.Rename(tmp, dest); err != nil { + _ = os.Remove(tmp) + return "", fmt.Errorf("funnel: rename dest: %w", err) + } + return dest, nil +} + +// verifyLinuxElf returns nil when the file at `path` starts with the ELF +// magic bytes and is at least 1 MB. Used as a low-cost guard against +// downloading an HTML error page or a wrong-arch payload. +func verifyLinuxElf(path string) error { + st, err := os.Stat(path) + if err != nil { + return err + } + if st.Size() < 1024*1024 { + return errors.New("file is suspiciously small (<1 MB)") + } + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + head := make([]byte, 4) + if _, err := io.ReadFull(f, head); err != nil { + return fmt.Errorf("read magic bytes: %w", err) + } + if !bytes.Equal(head, []byte{0x7f, 'E', 'L', 'F'}) { + return errors.New("not an ELF binary") + } + return nil +} From 834c58c25aaf699766a15656f9044a9166070b46 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Tue, 26 May 2026 21:47:04 +0200 Subject: [PATCH 02/32] feat(daemon): auto-apply upgrades when server signals (0.9.6) OnUpgrade now downloads + replaces the binary and exits in a background goroutine; the service supervisor (systemd Restart=always) respawns on the new version. Removes the "run unarr update" manual step after pressing the web's Force update button. --- CHANGELOG.md | 17 +++++++++++++++++ internal/agent/daemon.go | 35 ++++++++++++++++++++++++++++++++--- internal/cmd/version.go | 2 +- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 38118cb..ca49641 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,23 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.6] - 2026-05-26 + +### Added + +- **auto-upgrade**: when the web flags the agent for upgrade + (`POST /api/internal/agent/upgrade` or the "Force update now" button), + the daemon now downloads and replaces the binary in-place, then exits so + the service supervisor (`systemd Restart=always` on Linux, the equivalent + on macOS/Windows) respawns on the new version. No `unarr update` step + required from the user. Still opt-in — only fires when the server sends + the upgrade signal. + +### Changed + +- The `OnUpgrade` daemon callback no longer just logs `run unarr self-update`; + it now triggers the actual upgrade in a background goroutine. + ## [0.9.5] - 2026-05-26 ### Added diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index 109dfbb..e5e4c60 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -11,6 +11,8 @@ import ( "strings" "sync/atomic" "time" + + "github.com/torrentclaw/unarr/internal/upgrade" ) // DaemonConfig holds daemon runtime settings. @@ -231,10 +233,12 @@ func (d *Daemon) Run(ctx context.Context) error { } } d.sync.OnUpgrade = func(version string) { - if version != d.lastNotifiedVersion { - d.lastNotifiedVersion = version - log.Printf("New version available: %s (run `unarr self-update` to upgrade)", version) + if version == d.lastNotifiedVersion { + return } + d.lastNotifiedVersion = version + log.Printf("[upgrade] new version available: %s — applying auto-upgrade", version) + go d.applyAutoUpgrade(version) } d.sync.OnScan = func() { log.Printf("Library scan requested by server") @@ -281,6 +285,31 @@ func (d *Daemon) Deregister() { RemoveState() } +// applyAutoUpgrade downloads the target version and exits so the service +// supervisor (systemd Restart=always on Linux) respawns on the new binary. +// Triggered by the server's upgrade signal — opt-in flag set by the user from +// the web UI; the daemon never auto-upgrades on a passive version bump. +func (d *Daemon) applyAutoUpgrade(targetVersion string) { + currentClean := strings.TrimPrefix(d.cfg.Version, "v") + upgrader := &upgrade.Upgrader{ + CurrentVersion: currentClean, + OnProgress: func(msg string) { + log.Printf("[upgrade] %s", msg) + }, + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + result := upgrader.Execute(ctx, targetVersion) + if !result.Success { + log.Printf("[upgrade] auto-upgrade failed: %v", result.Error) + return + } + log.Printf("[upgrade] upgraded v%s → v%s; exiting so service supervisor restarts on new binary", + result.OldVersion, result.NewVersion) + time.Sleep(500 * time.Millisecond) + os.Exit(0) +} + // isTransientError returns true for errors worth retrying (429, 5xx, network). func isTransientError(err error) bool { if err == nil { diff --git a/internal/cmd/version.go b/internal/cmd/version.go index 3009c5a..32f1ce3 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.9.5" +var Version = "0.9.6" From 7e969762574d24497554857d0729c6cedaa119c2 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Tue, 26 May 2026 23:39:02 +0200 Subject: [PATCH 03/32] feat(hls): persistent fMP4 segment cache + integrity + stats (0.9.7) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cache keyed by sha256(absPath|quality|audioIdx)[:8] with .complete marker; LRU + size-budget eviction; per-key writer-lock; pinned during play; startup orphan reap; integrity verify on HIT; subtitle-completeness gate; hit/miss counters + daily log line. New [downloads.hls_cache] block in config.toml (enabled/size_gb/dir, default 5GB). Smoke test: 2nd play of same source+quality is 23-31× faster (HIT path skips ffmpeg entirely). --- CHANGELOG.md | 33 ++ README.md | 87 +++++ internal/agent/daemon.go | 5 + internal/cmd/daemon.go | 28 ++ internal/cmd/version.go | 2 +- internal/config/config.go | 46 ++- internal/engine/hls.go | 198 +++++++++++- internal/engine/hls_cache.go | 410 ++++++++++++++++++++++++ internal/engine/hls_cache_smoke_test.go | 134 ++++++++ internal/engine/hls_cache_test.go | 361 +++++++++++++++++++++ 10 files changed, 1295 insertions(+), 9 deletions(-) create mode 100644 internal/engine/hls_cache.go create mode 100644 internal/engine/hls_cache_smoke_test.go create mode 100644 internal/engine/hls_cache_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ca49641..50cfa98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,39 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.7] - 2026-05-26 + +### Added + +- **hls cache**: persistent fMP4 segment cache keyed by + `(source, quality, audio_index)`. After a successful encode the segments + + `init.mp4` are kept under `~/.cache/unarr/hls-cache/{key}/` with a + `.complete` marker. A second play of the same file at the same quality + skips ffmpeg entirely (smoke-tested 23–31× faster than re-encode). LRU + + size-budget eviction; pinned during active play; per-key writer-lock + prevents two concurrent encodes from corrupting each other. Startup + reaps orphan dirs without `.complete` older than 10 min so a daemon + crash doesn't leak disk indefinitely. New `[downloads.hls_cache]` block + in `config.toml`: `enabled` (default true), `size_gb` (default 5, + min 1), `dir` (default `~/.cache/unarr/hls-cache`). +- **hls cache integrity check**: on HIT, the daemon stats `init.mp4` + + last segment before reporting cache reuse — if a file was externally + deleted, the entry is invalidated and re-encoded transparently. +- **hls cache stats**: hit/miss counters surface via `cache.Stats()` + (`Hits`, `Misses`, `EntryCount`, `TotalBytes`) and the sweeper logs a + daily summary line `[hls_cache] day-stats: hits=N misses=M ratio=X% + entries=Y size=ZMB`. +- **subtitle integrity for cached replay**: `Close` waits up to 15 s for + the subtitle extractor goroutine before sealing `.complete` so a HIT + never serves half-written `.vtt` files. Timeout invalidates instead of + sealing. + +### Changed + +- `[daemon] auto_upgrade` now appears in fresh `config.toml` files as + `true` (it was always the implicit default; this just makes it visible + in default-generated configs). + ## [0.9.6] - 2026-05-26 ### Added diff --git a/README.md b/README.md index 4129ffb..8a5d26d 100644 --- a/README.md +++ b/README.md @@ -343,6 +343,58 @@ unarr self-update --force # reinstall even if up to date `unarr doctor` checks: config file, API key, server connectivity (with latency), agent registration, download directory, disk space, and version. +### Updating unarr + +unarr supports three update paths. Pick whichever fits your workflow. + +**1. Manual self-update (always available).** + +```bash +unarr self-update # interactive update to latest +unarr self-update --force # reinstall same version +unarr self-update --allow-unsigned # accept releases without checksum signature +``` + +The CLI downloads the new release archive over HTTPS (from +`torrentclaw.com/releases/download/v/`), verifies SHA-256, swaps the +binary in place (`.backup` kept next to it), and restarts the systemd +user unit if the daemon is running. + +**2. Auto-apply on server signal (default, since 0.9.6).** + +When you press **"Force update now"** on the web (Settings → Agent → Force +update), the server sets a flag your daemon polls every sync (~3 s). On +the next sync the daemon downloads the new binary, replaces itself, and +exits — `systemd Restart=always` respawns on the new version. No SSH, no +terminal access required. Works headless on NAS / Docker. + +The button shows an amber warning if your agent is below 0.9.6 (older +daemons see the signal but only log "run unarr update" — the operator +must run the command manually that one time). + +**Opt out of auto-apply.** Some users prefer reviewing CHANGELOG before +applying. Disable in `config.toml`: + +```toml +[daemon] +auto_upgrade = false +``` + +With `auto_upgrade = false`, pressing the web button still flags your +agent (so the daemon logs the new version on next sync), but the daemon +will not download / replace anything — you run `unarr self-update` when +you're ready. + +**3. Docker auto-restart with a new tag.** + +```bash +docker pull torrentclaw/unarr:latest +docker compose up -d +``` + +Tags published: `latest`, `0.9`, `0.9.7`, ... — pin to a minor (`0.9`) +for opt-in patch updates without surprises. + ## Clean Remove temporary files, logs, resume data, and other artifacts generated by unarr. Shows what will be removed and asks for confirmation before deleting. @@ -424,6 +476,7 @@ tv_shows_dir = "~/Media/TV Shows" [daemon] poll_interval = "30s" heartbeat_interval = "30s" +auto_upgrade = true # apply server-flagged upgrades in-place (since 0.9.6) [notifications] enabled = true @@ -466,6 +519,40 @@ If `transcode.enabled = true` but `ffmpeg` / `ffprobe` aren't on PATH, the daemon logs a warning at startup and HLS sessions are rejected at runtime with a clear error — install ffmpeg or set `enabled = false`. +#### `[downloads.hls_cache]` — persistent HLS segment cache + +```toml +[downloads.hls_cache] +enabled = true # on by default +size_gb = 5 # disk budget; LRU eviction once exceeded +dir = "" # custom path; empty = ~/.cache/unarr/hls-cache +``` + +| Key | Type | Default | Notes | +|-----|------|---------|-------| +| `enabled` | bool | `true` | Persists finished HLS encodes per `(source, quality, audio_index)`. A second play of the same file at the same quality reuses the segments — no ffmpeg, near-zero CPU, instant playback. Set to `false` to delete segments on session close (original behavior). | +| `size_gb` | int | `5` | Cache budget in gigabytes. When exceeded the LRU sweeper evicts the least-recently-used cached encodes hourly. Minimum 1 GB (smaller values are clamped up). | +| `dir` | string | `""` | Custom storage path. Empty defaults to `~/.cache/unarr/hls-cache` (Linux/macOS) or the user cache dir (Windows). | + +**What it does.** First play encodes normally (ffmpeg writes segments). +On session close, if every segment is on disk and ffmpeg exited cleanly, +the directory is sealed with a `.complete` marker and kept. Next time the +same source + quality combo is requested, the daemon serves segments +straight from disk — no transcode, no warm-up, no CPU cost. + +**Why per (source, quality, audio).** Renaming the file or switching +quality invalidates the entry: the segments are tied to the exact source +bytes and the exact ffmpeg parameters. Re-encoding generates a new key. + +**Eviction.** A background goroutine wakes every hour. If total cache size +exceeds `size_gb`, it deletes the oldest entries (by mtime) until under +budget. Active sessions are pinned — they never get evicted mid-play. + +**Disable.** Either edit the TOML to set `enabled = false`, or remove the +cache directory manually (it'll be recreated as needed). Disabling does +not delete existing cached segments — drop `dir` (or `~/.cache/unarr/hls-cache`) +to reclaim the space. + #### `[downloads.vpn]` | Key | Type | Default | Notes | diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index e5e4c60..e79fc0a 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -28,6 +28,7 @@ type DaemonConfig struct { ScanPaths []string // configured scan paths for file deletion validation HWAccel string // detected encoder backend ("nvenc"/"qsv"/"vaapi"/"videotoolbox"/"none") MaxTranscodeHeight int // resolution cap the agent can transcode comfortably (px) + AutoUpgrade bool // honor server-flagged upgrades by downloading + restarting (default: true) } // Daemon manages agent registration and the sync loop. @@ -237,6 +238,10 @@ func (d *Daemon) Run(ctx context.Context) error { return } d.lastNotifiedVersion = version + if !d.cfg.AutoUpgrade { + log.Printf("[upgrade] new version available: %s — auto_upgrade=false, run `unarr update` to apply", version) + return + } log.Printf("[upgrade] new version available: %s — applying auto-upgrade", version) go d.applyAutoUpgrade(version) } diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 6c00e95..19c4b7c 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -162,6 +162,7 @@ func runDaemonStart() error { ScanPaths: library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath), HWAccel: string(hwAccelPick), MaxTranscodeHeight: maxTranscodeHeight, + AutoUpgrade: cfg.Daemon.AutoUpgradeEnabled(), } // Create HTTP client with mirror failover so a `.com` block-out rolls @@ -299,6 +300,32 @@ func runDaemonStart() error { if err := engine.CleanupHLSOrphanDirs(); err != nil { log.Printf("[hls] orphan tmpdir cleanup: %v", err) } + + // Persistent HLS segment cache — survives across sessions so re-plays + // of the same file at the same quality skip ffmpeg entirely. Off when + // hls_cache.enabled = false; size cap from hls_cache.size_gb; path from + // hls_cache.dir (defaults to ~/.cache/unarr/hls-cache). + var hlsCache *engine.HLSCache + if cfg.Download.HLSCache.Enabled { + cacheDir := cfg.Download.HLSCache.Dir + if cacheDir == "" { + if base, err := os.UserCacheDir(); err == nil { + cacheDir = filepath.Join(base, "unarr", "hls-cache") + } else { + cacheDir = filepath.Join(os.TempDir(), "unarr-hls-cache") + } + } + c, err := engine.NewHLSCache(cacheDir, cfg.Download.HLSCache.SizeGB) + if err != nil { + log.Printf("[hls_cache] init failed (%v) — falling back to per-session tmpdirs", err) + } else { + hlsCache = c + hlsCache.StartSweeper(ctx, time.Hour) + log.Printf("[hls_cache] enabled: dir=%s budget=%dGB", cacheDir, cfg.Download.HLSCache.SizeGB) + } + } else { + log.Printf("[hls_cache] disabled by config — every play re-encodes from scratch") + } if err := streamSrv.Listen(ctx); err != nil { return fmt.Errorf("start stream server: %w", err) } @@ -543,6 +570,7 @@ func runDaemonStart() error { Quality: sess.Quality, AudioIndex: sess.AudioIndex, Transcode: tcRuntime, + Cache: hlsCache, } hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) if err != nil { diff --git a/internal/cmd/version.go b/internal/cmd/version.go index 32f1ce3..1fd2df9 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.9.6" +var Version = "0.9.7" diff --git a/internal/config/config.go b/internal/config/config.go index 12573b4..dfa5e8a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -52,10 +52,22 @@ type DownloadConfig struct { EnableUPnP bool `toml:"enable_upnp"` // map StreamPort to the WAN via UPnP/NAT-PMP (default: false; opt-in because it exposes the unauthenticated /stream + /hls endpoints to the public internet) CORSExtraOrigins []string `toml:"cors_extra_origins"` // extra browser origins added on top of the baked-in allowlist (torrentclaw.com, app.torrentclaw.com, localhost:3030) Transcode TranscodeConfig `toml:"transcode"` + HLSCache HLSCacheConfig `toml:"hls_cache"` VPN VPNConfig `toml:"vpn"` Funnel FunnelConfig `toml:"funnel"` } +// HLSCacheConfig controls the persistent HLS segment cache. A completed encode +// is kept on disk so a second play of the same file at the same quality skips +// ffmpeg entirely. Old entries are evicted (LRU) once the cache exceeds the +// size budget. Enabled by default — disable to save disk space at the cost of +// re-encoding every play. +type HLSCacheConfig struct { + Enabled bool `toml:"enabled"` // default: true + SizeGB int `toml:"size_gb"` // size budget in gigabytes; default: 5; minimum: 1 + Dir string `toml:"dir"` // override storage path; default: ~/.cache/unarr/hls-cache +} + // FunnelConfig gates the optional CloudFlare Quick Tunnel that exposes the // daemon's HLS server over a public HTTPS hostname (https://.try // cloudflare.com). Enabling it lets the web player on torrentclaw.com play @@ -101,8 +113,27 @@ type OrganizeConfig struct { type DaemonConfig struct { StatusInterval string `toml:"status_interval"` + // AutoUpgrade gates the daemon's response to a server-flagged upgrade + // (set via the "Force update" button on the web). When true the daemon + // downloads + replaces the binary in-place and exits so the service + // supervisor respawns on the new version. When false the daemon only + // logs "new version available" and the operator must run `unarr update` + // manually. Default: true. Available since unarr 0.9.6. + AutoUpgrade *bool `toml:"auto_upgrade"` } +// AutoUpgradeEnabled returns the resolved AutoUpgrade flag — defaults to true +// when the user has not set it explicitly. Pointer-vs-bool because Go's +// zero-value bool would collapse "unset" and "false" together. +func (d DaemonConfig) AutoUpgradeEnabled() bool { + if d.AutoUpgrade == nil { + return true + } + return *d.AutoUpgrade +} + +func boolPtr(v bool) *bool { return &v } + type NotificationsConfig struct { Enabled bool `toml:"enabled"` } @@ -156,11 +187,24 @@ func Default() Config { // `unarr funnel off` (sets enabled=false in the TOML). Enabled: true, }, + HLSCache: HLSCacheConfig{ + // On by default — second play of a recently watched file at the + // same quality skips ffmpeg (instant start, near-zero CPU). + // Users can opt out (hls_cache.enabled=false) or shrink the + // budget (hls_cache.size_gb) when disk is tight. + Enabled: true, + SizeGB: 5, + }, + }, + Daemon: DaemonConfig{ + // Pointer-to-true so Default() round-trips through TOML marshal + // as `auto_upgrade = true` instead of an omitted key — keeps the + // freshly-written config aligned with what README documents. + AutoUpgrade: boolPtr(true), }, Organize: OrganizeConfig{ Enabled: true, }, - Daemon: DaemonConfig{}, Notifications: NotificationsConfig{ Enabled: true, }, diff --git a/internal/engine/hls.go b/internal/engine/hls.go index 9524627..795e893 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -100,6 +100,11 @@ type HLSSessionConfig struct { Quality string // "2160p"|"1080p"|"720p"|"480p"|"original"|"" AudioIndex int // 0-based ffmpeg audio stream selection (-map 0:a:N). -1 = default. Transcode TranscodeRuntime + // Cache is an optional persistent segment cache keyed by (source, quality, + // audio). When set, completed encodes are kept across sessions so re-plays + // of the same file at the same quality skip ffmpeg entirely. nil disables + // caching (per-session tmpdir, deleted on Close — original behavior). + Cache *HLSCache } // HLSSession owns a tmpdir + ffmpeg subprocess producing HLS fragments. @@ -139,6 +144,19 @@ type HLSSession struct { exitErr error exited bool readyCh chan struct{} // closed + replaced each time readyMax advances + + // Persistent cache state. cache==nil means caching disabled for this session. + // fromCache=true means the session is replaying a completed encode and no + // ffmpeg subprocess was spawned. writerLockHeld=true means this session + // owns the per-key TryAcquireWriter claim — Close must ReleaseWriter. + // subsDone closes when the subtitle extractor goroutine returns (or is + // nil when the source had no subtitle tracks); MarkComplete waits on it + // so a HIT replay never serves partial .vtt files. + cache *HLSCache + cacheKey string + fromCache bool + writerLockHeld bool + subsDone chan struct{} } // hlsSeekAhead is how many segments past the writer's current position the @@ -263,11 +281,77 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er return nil, errors.New("hls: source has no duration") } - tmpDir := filepath.Join(hlsTmpDirRoot(), cfg.SessionID) + // Resolve tmpDir + cache placement. Three states: + // 1. cache disabled → per-session tmpdir, deleted on Close. + // 2. cache HIT (.complete found) → read from cache dir, no ffmpeg, Pin. + // 3. cache MISS, writer-lock OK → ffmpeg writes to cache dir, Pin + writer-lock. + // 4. cache MISS, writer-lock NO → another session already writing this + // key; fall back to private per-session tmpdir + // (no caching for this session — second-writer + // would corrupt the first one's segments). + var ( + tmpDir string + cacheKey string + fromCache bool + writerLockHeld bool + ) + if cfg.Cache != nil { + cacheKey = cfg.Cache.KeyFor(cfg.SourcePath, cfg.Quality, cfg.AudioIndex) + // Integrity gate: HasComplete just stats the marker. If init.mp4 or + // the last segment vanished (external rm, partial-disk failure), we + // can't actually serve a HIT — drop the dir and re-encode. + segCountForVerify := int((probe.DurationSec + float64(hlsSegmentDuration) - 1) / float64(hlsSegmentDuration)) + if segCountForVerify < 1 { + segCountForVerify = 1 + } + if cfg.Cache.HasComplete(cacheKey) && !cfg.Cache.VerifyComplete(cacheKey, segCountForVerify) { + log.Printf("[hls %s] cache %s sealed but failed integrity check — re-encoding", + shortHLSID(cfg.SessionID), cacheKey) + _ = cfg.Cache.Invalidate(cacheKey) + } + if cfg.Cache.HasComplete(cacheKey) { + // HIT: read-only replay — many concurrent HITs are fine. + tmpDir = cfg.Cache.DirFor(cacheKey) + cfg.Cache.Pin(cacheKey) + fromCache = true + cfg.Cache.RecordHit() + _ = cfg.Cache.Touch(cacheKey) + } else if cfg.Cache.TryAcquireWriter(cacheKey) { + tmpDir = cfg.Cache.DirFor(cacheKey) + cfg.Cache.Pin(cacheKey) + writerLockHeld = true + cfg.Cache.RecordMiss() + } else { + // Another session is writing this key — fall back to private + // dir so we don't trample its segments. + log.Printf("[hls %s] cache key %s busy, falling back to per-session tmpdir", + shortHLSID(cfg.SessionID), cacheKey) + tmpDir = filepath.Join(hlsTmpDirRoot(), cfg.SessionID) + cacheKey = "" // disable caching for this session + cfg.Cache.RecordMiss() + } + } else { + tmpDir = filepath.Join(hlsTmpDirRoot(), cfg.SessionID) + } + + cleanupOnError := func() { + if cfg.Cache != nil && cacheKey != "" { + cfg.Cache.Unpin(cacheKey) + if writerLockHeld { + cfg.Cache.ReleaseWriter(cacheKey) + _ = cfg.Cache.Invalidate(cacheKey) + } + } else { + _ = os.RemoveAll(tmpDir) + } + } + if err := os.MkdirAll(filepath.Join(tmpDir, "video"), 0o755); err != nil { + cleanupOnError() return nil, fmt.Errorf("hls: mkdir video: %w", err) } if err := os.MkdirAll(filepath.Join(tmpDir, "subs"), 0o755); err != nil { + cleanupOnError() return nil, fmt.Errorf("hls: mkdir subs: %w", err) } @@ -285,10 +369,30 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er startedAt: time.Now(), lastTouch: time.Now(), readyCh: make(chan struct{}), + cache: cfg.Cache, + cacheKey: cacheKey, + fromCache: fromCache, + writerLockHeld: writerLockHeld, } s.manifestVideo = renderVideoPlaylist(probe.DurationSec, segCount) s.manifestRoot = renderMasterPlaylist(probe, cfg.Quality) + // Cache HIT: every segment + init.mp4 is already on disk. Skip ffmpeg + // entirely and mark readyMax so handlers don't wait. Background subtitle + // extraction is also unnecessary — subs were extracted on the original run. + if fromCache { + s.readyMu.Lock() + s.readyMax = segCount - 1 + s.exited = true + close(s.readyCh) + s.readyCh = nil + s.readyMu.Unlock() + log.Printf("[hls %s] cache HIT %s: %s, %.1fs, %d segs (quality=%s)", + shortHLSID(cfg.SessionID), cacheKey, filepath.Base(cfg.SourcePath), + probe.DurationSec, segCount, coalesce(cfg.Quality, "auto")) + return s, nil + } + // Spawn ffmpeg under a dedicated context so Close() can kill it without // touching the parent ctx. ffCtx, cancel := context.WithCancel(context.Background()) @@ -298,7 +402,7 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er cmd.Stderr = &hlsStderrCapture{owner: s} if err := cmd.Start(); err != nil { cancel() - _ = os.RemoveAll(tmpDir) + cleanupOnError() return nil, fmt.Errorf("hls: start ffmpeg: %w", err) } s.cmd = cmd @@ -307,12 +411,20 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er go s.pollSegments(ffCtx) if len(probe.SubtitleTracks) > 0 { - go s.extractSubtitles(ffCtx) + s.subsDone = make(chan struct{}) + go func() { + defer close(s.subsDone) + s.extractSubtitles(ffCtx) + }() } - log.Printf("[hls %s] started: %s, %.1fs, %d segs (quality=%s)", + cachedNote := "" + if cfg.Cache != nil { + cachedNote = fmt.Sprintf(" (cache-miss %s)", cacheKey) + } + log.Printf("[hls %s] started: %s, %.1fs, %d segs (quality=%s)%s", shortHLSID(cfg.SessionID), filepath.Base(cfg.SourcePath), - probe.DurationSec, segCount, coalesce(cfg.Quality, "auto")) + probe.DurationSec, segCount, coalesce(cfg.Quality, "auto"), cachedNote) return s, nil } @@ -385,8 +497,15 @@ func (s *HLSSession) Touch() { s.mu.Unlock() } -// Close stops ffmpeg, deletes the tmpdir, and prevents further requests from -// blocking on segment readiness. Idempotent. +// Close stops ffmpeg and prevents further requests from blocking on segment +// readiness. Idempotent. +// +// Disk lifecycle: +// - cache disabled → delete tmpDir (original behavior). +// - cache enabled + this session was a HIT → keep dir, just unpin. +// - cache enabled + this was a write session → if ffmpeg exited cleanly and +// every segment is on disk, persist with .complete and keep dir. Otherwise +// drop the dir so a half-written cache doesn't survive into the next play. func (s *HLSSession) Close() error { s.mu.Lock() if s.closed { @@ -407,7 +526,47 @@ func (s *HLSSession) Close() error { s.readyCh = nil } s.exited = true + exitErr := s.exitErr s.readyMu.Unlock() + + if s.cache != nil && s.cacheKey != "" { + defer s.cache.Unpin(s.cacheKey) + if s.writerLockHeld { + defer s.cache.ReleaseWriter(s.cacheKey) + } + if s.fromCache { + log.Printf("[hls %s] closed (cache reuse)", shortHLSID(s.cfg.SessionID)) + return nil + } + // Wait briefly for the subtitle extractor to finish so a cached + // replay never serves half-written .vtt files. Bounded so a stuck + // extractor can't block Close indefinitely; on timeout we treat + // the cache as incomplete and drop it. + subsOK := true + if s.subsDone != nil { + select { + case <-s.subsDone: + case <-time.After(15 * time.Second): + log.Printf("[hls %s] subtitle extractor timeout — not caching", shortHLSID(s.cfg.SessionID)) + subsOK = false + } + } + if subsOK && exitErr == nil && s.allSegmentsPresent() { + if err := s.cache.MarkComplete(s.cacheKey); err == nil { + log.Printf("[hls %s] cache persisted %s", shortHLSID(s.cfg.SessionID), s.cacheKey) + return nil + } else { + log.Printf("[hls %s] cache persist failed: %v", shortHLSID(s.cfg.SessionID), err) + } + } + // Partial / failed → drop so we re-encode next time. + if err := s.cache.Invalidate(s.cacheKey); err != nil { + log.Printf("[hls %s] cache invalidate failed: %v", shortHLSID(s.cfg.SessionID), err) + } + log.Printf("[hls %s] closed (cache discarded)", shortHLSID(s.cfg.SessionID)) + return nil + } + if tmpDir != "" { _ = os.RemoveAll(tmpDir) } @@ -415,6 +574,31 @@ func (s *HLSSession) Close() error { return nil } +// allSegmentsPresent reports whether every expected segment (and init.mp4) is +// on disk AND validated by the segment poller. Used to decide whether a +// finished session is cacheable. We trust readyMax (advanced by pollSegments +// only after the next segment exists, proving the predecessor is fully closed) +// over a naive Size>0 stat that could accept truncated mid-write files. +func (s *HLSSession) allSegmentsPresent() bool { + if fi, err := os.Stat(filepath.Join(s.tmpDir, "video", "init.mp4")); err != nil || fi.Size() == 0 { + return false + } + s.readyMu.Lock() + readyMax := s.readyMax + s.readyMu.Unlock() + if readyMax < s.segmentCount-1 { + return false + } + for i := 0; i < s.segmentCount; i++ { + path := filepath.Join(s.tmpDir, "video", fmt.Sprintf("seg-%d.m4s", i)) + fi, err := os.Stat(path) + if err != nil || fi.Size() == 0 { + return false + } + } + return true +} + // waitFFmpeg reaps the ffmpeg process and records its exit error for handlers. // // Auto-restart supervisor: if ffmpeg crashes (non-graceful exit) and the diff --git a/internal/engine/hls_cache.go b/internal/engine/hls_cache.go new file mode 100644 index 0000000..f1bf918 --- /dev/null +++ b/internal/engine/hls_cache.go @@ -0,0 +1,410 @@ +package engine + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "log" + "os" + "path/filepath" + "sort" + "sync" + "sync/atomic" + "time" +) + +// HLSCache persists transcoded HLS segments per (source, quality, audio) so a +// second play of the same file at the same quality skips ffmpeg entirely. +// +// Layout on disk: +// +// {root}/{key}/init.mp4 +// {root}/{key}/seg-0.m4s +// {root}/{key}/seg-N.m4s +// {root}/{key}/.complete +// +// Atomicity: the .complete marker is written only when ffmpeg exits 0 AND all +// segments are on disk. A dir without .complete is treated as a partial run — +// next session can reuse the segments already present, ffmpeg fills the gaps. +// +// Concurrency: Pin/Unpin increments a ref counter per key so the LRU sweeper +// never evicts a directory that an active session is reading from. +type HLSCache struct { + root string + maxBytes int64 + + mu sync.Mutex + refs map[string]int + writers map[string]bool // exclusive ffmpeg writer per key; nil entries are absent + + // Counters surfaced via Stats() — useful for /api/internal/agent/cache-stats + // and for the sweeper's daily log line. atomic so RecordHit/RecordMiss are + // safe to call from any goroutine without taking the cache mutex. + hits atomic.Uint64 + misses atomic.Uint64 +} + +const ( + hlsCacheCompleteMarker = ".complete" + // hlsCacheMinBudgetGB clamps absurd / zero / negative SizeGB values to + // a sane floor. NOT a guarantee that any single encode fits — a long + // 4K HEVC re-encode can exceed it. Operators should set size_gb based + // on their actual workload. + hlsCacheMinBudgetGB = 1 + // hlsCacheStartupOrphanAge: directories without .complete older than + // this are removed on cache startup. Long enough that a daemon crash + // during an in-progress encode (which legitimately leaves a partial + // dir) doesn't get nuked too aggressively if the daemon restarts fast. + hlsCacheStartupOrphanAge = 10 * time.Minute +) + +// NewHLSCache creates the cache rooted at the given dir with a size budget in +// gigabytes. A budget < hlsCacheMinBudgetGB is clamped up so a single play +// doesn't get instantly evicted mid-stream. +func NewHLSCache(root string, sizeGB int) (*HLSCache, error) { + if root == "" { + return nil, errors.New("hls_cache: empty root") + } + if sizeGB < hlsCacheMinBudgetGB { + sizeGB = hlsCacheMinBudgetGB + } + if err := os.MkdirAll(root, 0o755); err != nil { + return nil, fmt.Errorf("hls_cache: mkdir root: %w", err) + } + c := &HLSCache{ + root: root, + maxBytes: int64(sizeGB) * 1024 * 1024 * 1024, + refs: make(map[string]int), + writers: make(map[string]bool), + } + // Reap dirs left over from a crashed encode. A dir without .complete that + // hasn't been touched recently was almost certainly orphaned by an + // ungraceful daemon exit — keeping it just feeds the unbounded growth + // pattern the hourly LRU is too slow to contain. + if removed, err := c.cleanStartupOrphans(); err != nil { + log.Printf("[hls_cache] startup orphan cleanup: %v", err) + } else if removed > 0 { + log.Printf("[hls_cache] startup: removed %d orphan dir(s) without .complete", removed) + } + return c, nil +} + +// cleanStartupOrphans removes cache subdirectories that lack a .complete +// marker AND haven't been modified within hlsCacheStartupOrphanAge. Called +// once at construction. Safe at startup because no sessions are active yet, +// so Pin can't race with us. +func (c *HLSCache) cleanStartupOrphans() (int, error) { + entries, err := os.ReadDir(c.root) + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + return 0, err + } + cutoff := time.Now().Add(-hlsCacheStartupOrphanAge) + removed := 0 + for _, e := range entries { + if !e.IsDir() { + continue + } + dir := filepath.Join(c.root, e.Name()) + if _, err := os.Stat(filepath.Join(dir, hlsCacheCompleteMarker)); err == nil { + continue // sealed, keep + } + info, err := e.Info() + if err != nil { + continue + } + if info.ModTime().After(cutoff) { + continue // too recent — might be a daemon that just restarted mid-encode + } + if err := os.RemoveAll(dir); err == nil { + removed++ + } + } + return removed, nil +} + +// TryAcquireWriter attempts to claim exclusive ffmpeg-write access to a key. +// Returns true on success — the caller is then responsible for ReleaseWriter +// when ffmpeg exits / fails. Returns false if another session is already +// writing this key, in which case the caller must fall back to a private +// per-session tmpdir (no caching for that session). +func (c *HLSCache) TryAcquireWriter(key string) bool { + c.mu.Lock() + defer c.mu.Unlock() + if c.writers[key] { + return false + } + c.writers[key] = true + return true +} + +// ReleaseWriter releases the writer claim acquired via TryAcquireWriter. +// Idempotent on unknown keys. +func (c *HLSCache) ReleaseWriter(key string) { + c.mu.Lock() + delete(c.writers, key) + c.mu.Unlock() +} + +// KeyFor derives a stable cache key for (source, quality, audioIndex). Using +// the absolute source path means renaming a file invalidates the cache, which +// is correct — segment content is tied to the encoded source. +func (c *HLSCache) KeyFor(sourcePath, quality string, audioIndex int) string { + abs, err := filepath.Abs(sourcePath) + if err != nil { + abs = sourcePath + } + h := sha256.Sum256([]byte(fmt.Sprintf("%s|%s|%d", abs, quality, audioIndex))) + return hex.EncodeToString(h[:8]) // 16 hex chars — collision-safe enough for per-host cache +} + +// DirFor returns the on-disk directory for a cache key. Caller is responsible +// for creating it. +func (c *HLSCache) DirFor(key string) string { + return filepath.Join(c.root, key) +} + +// HasComplete returns true when the .complete marker is present, meaning the +// directory holds a full set of segments from a successful encode. +func (c *HLSCache) HasComplete(key string) bool { + if _, err := os.Stat(filepath.Join(c.DirFor(key), hlsCacheCompleteMarker)); err == nil { + return true + } + return false +} + +// MarkComplete writes the .complete marker. Call only after verifying ffmpeg +// exited cleanly AND every expected segment is on disk. The dir must already +// exist — StartHLSSession created it on the writer path. +func (c *HLSCache) MarkComplete(key string) error { + return os.WriteFile(filepath.Join(c.DirFor(key), hlsCacheCompleteMarker), nil, 0o644) +} + +// RecordHit increments the hit counter; called by StartHLSSession on a +// cache-HIT path. +func (c *HLSCache) RecordHit() { c.hits.Add(1) } + +// RecordMiss increments the miss counter; called when a session has to +// encode from scratch (or fails an integrity check on a stale HIT). +func (c *HLSCache) RecordMiss() { c.misses.Add(1) } + +// CacheStats is a snapshot of the cache's runtime counters + on-disk size. +// The size fields are best-effort (computed via dirSize) so callers paying +// for them should cache the result, not poll in a hot loop. +type CacheStats struct { + Hits uint64 + Misses uint64 + EntryCount int + TotalBytes int64 +} + +// Stats returns a snapshot of the cache counters and size. Walks the root +// to total disk usage — O(N segments). Call at most every few minutes. +func (c *HLSCache) Stats() CacheStats { + s := CacheStats{ + Hits: c.hits.Load(), + Misses: c.misses.Load(), + } + entries, err := os.ReadDir(c.root) + if err != nil { + return s + } + for _, e := range entries { + if !e.IsDir() { + continue + } + size, err := dirSize(filepath.Join(c.root, e.Name())) + if err != nil { + continue + } + s.EntryCount++ + s.TotalBytes += size + } + return s +} + +// hitRatePercent returns the current hit/(hit+miss) percentage rounded to +// the nearest int; 0 when no calls have been recorded. +func (c *HLSCache) hitRatePercent() int { + h := c.hits.Load() + m := c.misses.Load() + total := h + m + if total == 0 { + return 0 + } + return int((h*100 + total/2) / total) +} + +// VerifyComplete checks that the .complete marker is present AND the +// essential files (init.mp4 + last segment) exist with non-zero size. A +// dir that passes HasComplete but fails VerifyComplete is treated as +// corrupted — typically external `rm` or a partial-disk-failure scenario. +// When it returns false, callers should Invalidate and re-encode. +func (c *HLSCache) VerifyComplete(key string, segmentCount int) bool { + if !c.HasComplete(key) { + return false + } + dir := c.DirFor(key) + if fi, err := os.Stat(filepath.Join(dir, "video", "init.mp4")); err != nil || fi.Size() == 0 { + return false + } + if segmentCount > 0 { + lastSeg := filepath.Join(dir, "video", fmt.Sprintf("seg-%d.m4s", segmentCount-1)) + if fi, err := os.Stat(lastSeg); err != nil || fi.Size() == 0 { + return false + } + } + return true +} + +// Pin increments the ref counter for a key. The sweeper checks this before +// evicting, so a pinned dir is safe even if its mtime is old. +func (c *HLSCache) Pin(key string) { + c.mu.Lock() + c.refs[key]++ + c.mu.Unlock() +} + +// Unpin decrements; safe to call on unknown keys (no-op). +func (c *HLSCache) Unpin(key string) { + c.mu.Lock() + if c.refs[key] > 0 { + c.refs[key]-- + if c.refs[key] == 0 { + delete(c.refs, key) + } + } + c.mu.Unlock() +} + +func (c *HLSCache) isPinned(key string) bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.refs[key] > 0 +} + +// Touch updates the directory mtime so LRU picks fresher entries as recently +// used. Called when a session starts reading from a cached dir. +func (c *HLSCache) Touch(key string) error { + dir := c.DirFor(key) + now := time.Now() + return os.Chtimes(dir, now, now) +} + +// Sweep enforces the size budget by deleting the least-recently-used cache +// dirs (ignoring pinned ones) until the total size is at or below maxBytes. +// Returns the number of bytes freed. +func (c *HLSCache) Sweep() (int64, error) { + entries, err := os.ReadDir(c.root) + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + return 0, fmt.Errorf("hls_cache: read root: %w", err) + } + + type item struct { + key string + path string + size int64 + mtime time.Time + } + items := make([]item, 0, len(entries)) + var total, pinned int64 + for _, e := range entries { + if !e.IsDir() { + continue + } + info, err := e.Info() + if err != nil { + continue + } + key := e.Name() + path := filepath.Join(c.root, key) + size, err := dirSize(path) + if err != nil { + continue + } + items = append(items, item{key: key, path: path, size: size, mtime: info.ModTime()}) + total += size + if c.isPinned(key) { + pinned += size + } + } + + if total <= c.maxBytes { + return 0, nil + } + if pinned >= c.maxBytes { + // Every pinned byte already exceeds the budget — even evicting + // every unpinned dir won't bring us under. Warn loudly so the + // operator knows to bump size_gb (or kill the long-running session). + log.Printf("[hls_cache] warn: pinned bytes (%.1f MB) exceed budget (%.1f MB) — cannot enforce limit until sessions release", + float64(pinned)/(1024*1024), float64(c.maxBytes)/(1024*1024)) + return 0, nil + } + + // Oldest first. + sort.Slice(items, func(i, j int) bool { + return items[i].mtime.Before(items[j].mtime) + }) + + var freed int64 + for _, it := range items { + if total-freed <= c.maxBytes { + break + } + if c.isPinned(it.key) { + continue + } + if err := os.RemoveAll(it.path); err != nil { + log.Printf("[hls_cache] evict %s failed: %v", it.key, err) + continue + } + log.Printf("[hls_cache] evicted %s (%.1f MB, age %s)", + it.key, float64(it.size)/(1024*1024), time.Since(it.mtime).Round(time.Second)) + freed += it.size + } + return freed, nil +} + +// StartSweeper kicks off the LRU sweeper goroutine. Cancels on ctx done. +// In addition to enforcing the size budget, logs a daily summary of hit-rate +// + disk usage so operators can see the cache's value at a glance. +func (c *HLSCache) StartSweeper(ctx context.Context, interval time.Duration) { + if interval <= 0 { + interval = time.Hour + } + go func() { + t := time.NewTicker(interval) + defer t.Stop() + statsTick := time.NewTicker(24 * time.Hour) + defer statsTick.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + if _, err := c.Sweep(); err != nil { + log.Printf("[hls_cache] sweep error: %v", err) + } + case <-statsTick.C: + s := c.Stats() + log.Printf("[hls_cache] day-stats: hits=%d misses=%d ratio=%d%% entries=%d size=%.1fMB", + s.Hits, s.Misses, c.hitRatePercent(), s.EntryCount, + float64(s.TotalBytes)/(1024*1024)) + } + } + }() +} + +// Invalidate removes a cache entry — used when ffmpeg fails to encode the +// source so we don't reuse a half-written dir next time. +func (c *HLSCache) Invalidate(key string) error { + return os.RemoveAll(c.DirFor(key)) +} + diff --git a/internal/engine/hls_cache_smoke_test.go b/internal/engine/hls_cache_smoke_test.go new file mode 100644 index 0000000..a086cb2 --- /dev/null +++ b/internal/engine/hls_cache_smoke_test.go @@ -0,0 +1,134 @@ +//go:build smoke + +package engine + +import ( + "context" + "os/exec" + "path/filepath" + "testing" + "time" +) + +// TestHLSCacheSmoke exercises the end-to-end cache flow against real ffmpeg: +// - First session encodes a 5s test pattern; expect MISS, ffmpeg runs, +// .complete written, MarkComplete logs. +// - Second session for identical (source, quality, audio); expect HIT, +// no ffmpeg, instant Start. +// +// Build tag `smoke` keeps it out of the default `go test ./...` run because +// it depends on a working ffmpeg/ffprobe and takes ~5–10 s. +// +// go test -tags=smoke -run TestHLSCacheSmoke -v ./internal/engine/ +func TestHLSCacheSmoke(t *testing.T) { + ffmpeg, err := exec.LookPath("ffmpeg") + if err != nil { + t.Skipf("ffmpeg not on PATH: %v", err) + } + ffprobe, err := exec.LookPath("ffprobe") + if err != nil { + t.Skipf("ffprobe not on PATH: %v", err) + } + + tmp := t.TempDir() + source := filepath.Join(tmp, "source.mp4") + t.Logf("generating 5 s test pattern → %s", source) + if out, err := exec.Command(ffmpeg, + "-y", "-loglevel", "error", + "-f", "lavfi", "-i", "testsrc=duration=5:size=640x480:rate=30", + "-f", "lavfi", "-i", "sine=frequency=1000:duration=5", + "-c:v", "libx264", "-preset", "ultrafast", "-pix_fmt", "yuv420p", + "-c:a", "aac", + source, + ).CombinedOutput(); err != nil { + t.Fatalf("ffmpeg generate: %v\n%s", err, out) + } + + cacheRoot := filepath.Join(tmp, "cache") + cache, err := NewHLSCache(cacheRoot, 1) + if err != nil { + t.Fatalf("NewHLSCache: %v", err) + } + + cfg := HLSSessionConfig{ + SessionID: "smoke1", + SourcePath: source, + FileName: "source.mp4", + Quality: "720p", + AudioIndex: 0, + Transcode: TranscodeRuntime{ + FFmpegPath: ffmpeg, + FFprobePath: ffprobe, + Preset: "ultrafast", + }, + Cache: cache, + } + + // First run — expect MISS, ffmpeg runs. + t.Log("session 1: expect MISS") + t0 := time.Now() + s1, err := StartHLSSession(context.Background(), cfg) + if err != nil { + t.Fatalf("StartHLSSession #1: %v", err) + } + if s1.fromCache { + t.Fatal("session 1 reported cache HIT on a fresh cache") + } + + // Wait for all segments to land. 5 s source @ 4 s segments → 2 segments. + deadline := time.Now().Add(60 * time.Second) + for { + s1.readyMu.Lock() + ready := s1.readyMax + exited := s1.exited + s1.readyMu.Unlock() + if ready >= s1.segmentCount-1 && exited { + break + } + if time.Now().After(deadline) { + _ = s1.Close() + t.Fatalf("session 1 didn't finish in 60 s (readyMax=%d/%d, exited=%v)", + ready, s1.segmentCount-1, exited) + } + time.Sleep(100 * time.Millisecond) + } + if err := s1.Close(); err != nil { + t.Fatalf("Close #1: %v", err) + } + encodeDur := time.Since(t0) + t.Logf("session 1: MISS completed in %s", encodeDur.Round(time.Millisecond)) + + key := cache.KeyFor(source, "720p", 0) + if !cache.HasComplete(key) { + t.Fatalf("cache.HasComplete(%s) is false after successful encode", key) + } + + // Second run — expect HIT, no ffmpeg. + t.Log("session 2: expect HIT") + cfg.SessionID = "smoke2" + t1 := time.Now() + s2, err := StartHLSSession(context.Background(), cfg) + if err != nil { + t.Fatalf("StartHLSSession #2: %v", err) + } + if !s2.fromCache { + t.Fatal("session 2 should have reported cache HIT") + } + if s2.cmd != nil { + t.Fatal("session 2 should not have spawned ffmpeg (s.cmd != nil)") + } + hitDur := time.Since(t1) + t.Logf("session 2: HIT in %s (%.1f× faster than MISS)", + hitDur.Round(time.Millisecond), float64(encodeDur)/float64(hitDur)) + if hitDur > 500*time.Millisecond { + t.Errorf("HIT path too slow: %s — expected <500 ms", hitDur) + } + if err := s2.Close(); err != nil { + t.Fatalf("Close #2: %v", err) + } + + // After the HIT session closes, the cache dir + .complete must still exist. + if !cache.HasComplete(key) { + t.Fatal(".complete disappeared after HIT session closed") + } +} diff --git a/internal/engine/hls_cache_test.go b/internal/engine/hls_cache_test.go new file mode 100644 index 0000000..cb70ec1 --- /dev/null +++ b/internal/engine/hls_cache_test.go @@ -0,0 +1,361 @@ +package engine + +import ( + "context" + "os" + "path/filepath" + "sync" + "testing" + "time" +) + +func newTestCache(t *testing.T, sizeGB int) *HLSCache { + t.Helper() + root := t.TempDir() + c, err := NewHLSCache(root, sizeGB) + if err != nil { + t.Fatalf("NewHLSCache: %v", err) + } + return c +} + +func TestKeyForStable(t *testing.T) { + c := newTestCache(t, 1) + k1 := c.KeyFor("/a/b/movie.mkv", "1080p", 0) + k2 := c.KeyFor("/a/b/movie.mkv", "1080p", 0) + if k1 != k2 { + t.Fatalf("expected stable keys, got %q vs %q", k1, k2) + } + if c.KeyFor("/a/b/movie.mkv", "720p", 0) == k1 { + t.Fatal("quality should change key") + } + if c.KeyFor("/a/b/movie.mkv", "1080p", 1) == k1 { + t.Fatal("audio index should change key") + } + if c.KeyFor("/x/y/other.mkv", "1080p", 0) == k1 { + t.Fatal("path should change key") + } +} + +func TestMarkCompleteAndHas(t *testing.T) { + c := newTestCache(t, 1) + key := "abc123" + if c.HasComplete(key) { + t.Fatal("fresh cache should not report complete") + } + // Production callers create the dir during StartHLSSession; MarkComplete + // trusts that invariant and fails if the dir was wiped meanwhile. + if err := os.MkdirAll(c.DirFor(key), 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + if err := c.MarkComplete(key); err != nil { + t.Fatalf("MarkComplete: %v", err) + } + if !c.HasComplete(key) { + t.Fatal("after MarkComplete, HasComplete must be true") + } +} + +func TestMarkCompleteFailsWithoutDir(t *testing.T) { + c := newTestCache(t, 1) + if err := c.MarkComplete("never-created"); err == nil { + t.Fatal("MarkComplete should error when dir doesn't exist") + } +} + +func TestPinPreventsEviction(t *testing.T) { + c := newTestCache(t, 1) // 1 GB budget, but min clamp keeps it usable + c.maxBytes = 1024 // squeeze budget for the test + + // Write two entries past the budget. + for i, key := range []string{"old", "new"} { + dir := c.DirFor(key) + if err := os.MkdirAll(dir, 0o755); err != nil { + t.Fatalf("mkdir %s: %v", dir, err) + } + path := filepath.Join(dir, "seg.bin") + if err := os.WriteFile(path, make([]byte, 800), 0o644); err != nil { + t.Fatalf("write %s: %v", path, err) + } + now := time.Now().Add(time.Duration(i) * time.Hour) // "old" mtime < "new" + _ = os.Chtimes(dir, now, now) + } + + c.Pin("old") // protect the older one + freed, err := c.Sweep() + if err != nil { + t.Fatalf("Sweep: %v", err) + } + if freed == 0 { + t.Fatal("expected some eviction") + } + if _, err := os.Stat(c.DirFor("old")); err != nil { + t.Fatal("pinned 'old' was evicted") + } + if _, err := os.Stat(c.DirFor("new")); err == nil { + t.Fatal("'new' should have been evicted to make room") + } +} + +func TestSweepNoOpUnderBudget(t *testing.T) { + c := newTestCache(t, 1) + dir := c.DirFor("small") + _ = os.MkdirAll(dir, 0o755) + _ = os.WriteFile(filepath.Join(dir, "x"), []byte("tiny"), 0o644) + freed, err := c.Sweep() + if err != nil { + t.Fatalf("Sweep: %v", err) + } + if freed != 0 { + t.Fatalf("expected 0 freed under budget, got %d", freed) + } + if _, err := os.Stat(dir); err != nil { + t.Fatal("under-budget entry was wrongly evicted") + } +} + +func TestSweepEmptyRoot(t *testing.T) { + c := newTestCache(t, 1) + freed, err := c.Sweep() + if err != nil { + t.Fatalf("Sweep empty: %v", err) + } + if freed != 0 { + t.Fatalf("freed=%d, want 0", freed) + } +} + +func TestInvalidateRemovesDir(t *testing.T) { + c := newTestCache(t, 1) + key := "drop" + dir := c.DirFor(key) + _ = os.MkdirAll(dir, 0o755) + _ = os.WriteFile(filepath.Join(dir, "x"), []byte("y"), 0o644) + if err := c.Invalidate(key); err != nil { + t.Fatalf("Invalidate: %v", err) + } + if _, err := os.Stat(dir); err == nil { + t.Fatal("dir still present after Invalidate") + } +} + +func TestTouchUpdatesMtime(t *testing.T) { + c := newTestCache(t, 1) + key := "touch" + dir := c.DirFor(key) + _ = os.MkdirAll(dir, 0o755) + old := time.Now().Add(-2 * time.Hour) + _ = os.Chtimes(dir, old, old) + + if err := c.Touch(key); err != nil { + t.Fatalf("Touch: %v", err) + } + info, err := os.Stat(dir) + if err != nil { + t.Fatalf("stat: %v", err) + } + if !info.ModTime().After(old.Add(time.Minute)) { + t.Fatalf("mtime not refreshed: %v", info.ModTime()) + } +} + +func TestPinUnpinSymmetry(t *testing.T) { + c := newTestCache(t, 1) + c.Pin("k") + c.Pin("k") + if !c.isPinned("k") { + t.Fatal("Pin twice should leave pinned") + } + c.Unpin("k") + if !c.isPinned("k") { + t.Fatal("Unpin once should keep pinned (refs=1)") + } + c.Unpin("k") + if c.isPinned("k") { + t.Fatal("Unpin twice should drop pin") + } + c.Unpin("k") // safe no-op +} + +func TestConcurrentPinUnpin(t *testing.T) { + c := newTestCache(t, 1) + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + c.Pin("race") + time.Sleep(time.Microsecond) + c.Unpin("race") + }() + } + wg.Wait() + if c.isPinned("race") { + t.Fatal("refs leaked") + } +} + +func TestSweeperLoopExits(t *testing.T) { + c := newTestCache(t, 1) + ctx, cancel := context.WithCancel(context.Background()) + c.StartSweeper(ctx, 10*time.Millisecond) + time.Sleep(30 * time.Millisecond) + cancel() + // If StartSweeper doesn't exit on cancel the test would leak a goroutine; + // the leak detector in the test runner will surface it. + time.Sleep(20 * time.Millisecond) +} + +func TestMinBudgetClamp(t *testing.T) { + root := t.TempDir() + c, err := NewHLSCache(root, 0) // below floor + if err != nil { + t.Fatalf("NewHLSCache: %v", err) + } + if c.maxBytes != int64(hlsCacheMinBudgetGB)*1024*1024*1024 { + t.Fatalf("budget not clamped to min: got %d", c.maxBytes) + } +} + +func TestTryAcquireWriterExclusive(t *testing.T) { + c := newTestCache(t, 1) + if !c.TryAcquireWriter("k") { + t.Fatal("first acquire should succeed") + } + if c.TryAcquireWriter("k") { + t.Fatal("second acquire for same key must fail") + } + if !c.TryAcquireWriter("other") { + t.Fatal("different key should not conflict") + } + c.ReleaseWriter("k") + if !c.TryAcquireWriter("k") { + t.Fatal("acquire after release should succeed") + } + c.ReleaseWriter("k") + c.ReleaseWriter("k") // idempotent +} + +func TestStartupOrphanCleanup(t *testing.T) { + root := t.TempDir() + + // Pre-seed: one sealed dir + one orphan old enough + one orphan fresh. + sealed := filepath.Join(root, "sealed") + _ = os.MkdirAll(sealed, 0o755) + _ = os.WriteFile(filepath.Join(sealed, hlsCacheCompleteMarker), nil, 0o644) + + staleOrphan := filepath.Join(root, "stale_orphan") + _ = os.MkdirAll(staleOrphan, 0o755) + old := time.Now().Add(-2 * hlsCacheStartupOrphanAge) + _ = os.Chtimes(staleOrphan, old, old) + + freshOrphan := filepath.Join(root, "fresh_orphan") + _ = os.MkdirAll(freshOrphan, 0o755) + + if _, err := NewHLSCache(root, 1); err != nil { + t.Fatalf("NewHLSCache: %v", err) + } + + if _, err := os.Stat(sealed); err != nil { + t.Fatal("sealed dir was wrongly removed") + } + if _, err := os.Stat(staleOrphan); err == nil { + t.Fatal("stale orphan should have been removed at startup") + } + if _, err := os.Stat(freshOrphan); err != nil { + t.Fatal("fresh orphan should be kept (might be a mid-restart encode)") + } +} + +func TestHitMissCounters(t *testing.T) { + c := newTestCache(t, 1) + if s := c.Stats(); s.Hits != 0 || s.Misses != 0 { + t.Fatalf("fresh cache stats not zero: %+v", s) + } + c.RecordHit() + c.RecordHit() + c.RecordMiss() + s := c.Stats() + if s.Hits != 2 || s.Misses != 1 { + t.Fatalf("counters wrong: %+v", s) + } + // 2/3 = 67% + if got := c.hitRatePercent(); got != 67 { + t.Fatalf("hitRatePercent=%d, want 67", got) + } +} + +func TestStatsEntryCount(t *testing.T) { + c := newTestCache(t, 1) + for _, k := range []string{"a", "b", "c"} { + dir := c.DirFor(k) + _ = os.MkdirAll(dir, 0o755) + _ = os.WriteFile(filepath.Join(dir, "x"), []byte("hello"), 0o644) + } + s := c.Stats() + if s.EntryCount != 3 { + t.Fatalf("EntryCount=%d, want 3", s.EntryCount) + } + if s.TotalBytes != 15 { + t.Fatalf("TotalBytes=%d, want 15", s.TotalBytes) + } +} + +func TestVerifyCompleteRejectsMissingFiles(t *testing.T) { + c := newTestCache(t, 1) + key := "v" + dir := c.DirFor(key) + _ = os.MkdirAll(filepath.Join(dir, "video"), 0o755) + + // No .complete yet → reject. + if c.VerifyComplete(key, 2) { + t.Fatal("VerifyComplete should reject without .complete") + } + + // Mark complete but no files → reject. + if err := c.MarkComplete(key); err != nil { + t.Fatalf("MarkComplete: %v", err) + } + if c.VerifyComplete(key, 2) { + t.Fatal("VerifyComplete should reject when init.mp4 missing") + } + + // Write init.mp4, last seg missing → reject. + _ = os.WriteFile(filepath.Join(dir, "video", "init.mp4"), []byte("..."), 0o644) + if c.VerifyComplete(key, 2) { + t.Fatal("VerifyComplete should reject when last segment missing") + } + + // Write last seg → pass. + _ = os.WriteFile(filepath.Join(dir, "video", "seg-1.m4s"), []byte("..."), 0o644) + if !c.VerifyComplete(key, 2) { + t.Fatal("VerifyComplete should pass with all files present") + } + + // Zero-size last seg → reject. + _ = os.WriteFile(filepath.Join(dir, "video", "seg-1.m4s"), nil, 0o644) + if c.VerifyComplete(key, 2) { + t.Fatal("VerifyComplete should reject zero-size last segment") + } +} + +func TestSweepRespectsPinnedExceedsBudget(t *testing.T) { + c := newTestCache(t, 1) + c.maxBytes = 256 // squeeze + + pinned := c.DirFor("pinned") + _ = os.MkdirAll(pinned, 0o755) + _ = os.WriteFile(filepath.Join(pinned, "x"), make([]byte, 1024), 0o644) + c.Pin("pinned") + + freed, err := c.Sweep() + if err != nil { + t.Fatalf("Sweep: %v", err) + } + if freed != 0 { + t.Fatalf("nothing should have been freed: got %d", freed) + } + if _, err := os.Stat(pinned); err != nil { + t.Fatal("pinned dir wrongly removed despite over-budget pin") + } +} From 2e7cd7e8ed0dbc3b730a4b82867263ba7e1369f7 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 27 May 2026 08:18:33 +0200 Subject: [PATCH 04/32] fix(upgrade): break auto-apply restart loop (0.9.8) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs in 0.9.6/0.9.7 caused an infinite restart loop after a Force update signal: the CLI never reported the upgrade outcome, so `upgrade_requested` stayed `true`; AND `applyAutoUpgrade` called `os.Exit(0)` even when the target version equalled the current one, so systemd respawned and saw the flag again. - new Client.ReportUpgradeResult → POST /api/internal/agent/upgrade-result - applyAutoUpgrade calls it on success / failure / no-op - no-op case detected up front (same version) — skips Execute + Exit, clears server flag instead --- CHANGELOG.md | 21 +++++++++++++++++++++ internal/agent/client.go | 18 ++++++++++++++++++ internal/agent/daemon.go | 38 +++++++++++++++++++++++++++++++++++++- internal/cmd/version.go | 2 +- 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 50cfa98..7a2366f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,27 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.8] - 2026-05-27 + +### Fixed + +- **auto-upgrade restart loop**: when the server signal arrived for a version + the daemon was already running (e.g. flag still set after a previous + upgrade), `applyAutoUpgrade` would call `upgrade.Execute` (which no-ops), + then `os.Exit(0)` anyway — systemd respawned, the flag was still set, the + cycle repeated. Now: no-op case is detected up front, the daemon clears + the server flag via `/api/internal/agent/upgrade-result` and stays alive. +- **upgrade flag stuck after success**: the CLI never reported the upgrade + outcome, so `upgrade_requested` stayed `true` in the DB forever. The + daemon now calls `/api/internal/agent/upgrade-result` on every applyAutoUpgrade + branch (success, failure, no-op) — server clears the flag, restart loops + end. + +### Added + +- New `Client.ReportUpgradeResult(agentID, success, version, error)` HTTP + method wrapping `POST /api/internal/agent/upgrade-result`. + ## [0.9.7] - 2026-05-26 ### Added diff --git a/internal/agent/client.go b/internal/agent/client.go index 9aa3c2a..e60b0a4 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -91,6 +91,24 @@ func (c *Client) Deregister(ctx context.Context, agentID string) error { return nil } +// ReportUpgradeResult tells the server the outcome of a previously requested +// upgrade so the server can clear `upgrade_requested`. Without this call the +// flag stays sticky and the daemon would re-trigger applyAutoUpgrade on every +// sync after upgrade — even for "already on target version" no-ops. +func (c *Client) ReportUpgradeResult(ctx context.Context, agentID string, success bool, version, errMsg string) error { + req := struct { + AgentID string `json:"agentId"` + Success bool `json:"success"` + Version string `json:"version,omitempty"` + Error string `json:"error,omitempty"` + }{AgentID: agentID, Success: success, Version: version, Error: errMsg} + var resp StatusResponse + if err := c.doPost(ctx, "/api/internal/agent/upgrade-result", req, &resp); err != nil { + return fmt.Errorf("report upgrade result: %w", err) + } + return nil +} + // ReportStatus reports download progress. Returns server-side flags the CLI must act on. func (c *Client) ReportStatus(ctx context.Context, update StatusUpdate) (*StatusResponse, error) { var resp StatusResponse diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index e79fc0a..68a187f 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -294,8 +294,30 @@ func (d *Daemon) Deregister() { // supervisor (systemd Restart=always on Linux) respawns on the new binary. // Triggered by the server's upgrade signal — opt-in flag set by the user from // the web UI; the daemon never auto-upgrades on a passive version bump. +// +// Reports the outcome to /api/internal/agent/upgrade-result so the server +// clears `upgrade_requested`. Without this report the flag stays sticky and +// the daemon would loop on every sync — including the no-op case where it's +// already on the target version. func (d *Daemon) applyAutoUpgrade(targetVersion string) { currentClean := strings.TrimPrefix(d.cfg.Version, "v") + targetClean := strings.TrimPrefix(targetVersion, "v") + + // No-op: server signal arrived but we're already running the target. This + // happens when the daemon restarts after a previous auto-upgrade before + // reportUpgradeResult cleared the flag, or when the operator manually + // installed the same version off-band. Skip Execute (which would also + // no-op) AND skip os.Exit, but DO clear the flag — otherwise we loop. + if currentClean == targetClean { + log.Printf("[upgrade] already on v%s — clearing server flag", currentClean) + ctxR, cancelR := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelR() + if err := d.client.ReportUpgradeResult(ctxR, d.cfg.AgentID, true, currentClean, ""); err != nil { + log.Printf("[upgrade] report-result failed (will retry on next signal): %v", err) + } + return + } + upgrader := &upgrade.Upgrader{ CurrentVersion: currentClean, OnProgress: func(msg string) { @@ -307,10 +329,24 @@ func (d *Daemon) applyAutoUpgrade(targetVersion string) { result := upgrader.Execute(ctx, targetVersion) if !result.Success { log.Printf("[upgrade] auto-upgrade failed: %v", result.Error) + errMsg := "" + if result.Error != nil { + errMsg = result.Error.Error() + } + ctxR, cancelR := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelR() + if err := d.client.ReportUpgradeResult(ctxR, d.cfg.AgentID, false, targetClean, errMsg); err != nil { + log.Printf("[upgrade] report-result failed: %v", err) + } return } - log.Printf("[upgrade] upgraded v%s → v%s; exiting so service supervisor restarts on new binary", + log.Printf("[upgrade] upgraded v%s → v%s; reporting result + exiting so service supervisor restarts on new binary", result.OldVersion, result.NewVersion) + ctxR, cancelR := context.WithTimeout(context.Background(), 10*time.Second) + if err := d.client.ReportUpgradeResult(ctxR, d.cfg.AgentID, true, result.NewVersion, ""); err != nil { + log.Printf("[upgrade] report-result failed: %v", err) + } + cancelR() time.Sleep(500 * time.Millisecond) os.Exit(0) } diff --git a/internal/cmd/version.go b/internal/cmd/version.go index 1fd2df9..379f923 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.9.7" +var Version = "0.9.8" From 7b78d0b7781effb94f25ea3adfc5ab56038cc62e Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 27 May 2026 10:06:54 +0200 Subject: [PATCH 05/32] fix(cors): allow play from .to / staging / onion mirrors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Daemon CORS allowlist was hardcoded to torrentclaw.com + localhost. Browsers playing from any other official mirror (.to, onion, www., staging.) received 200 + body from the daemon's HLS server but no Access-Control-Allow-Origin header, so the response was dropped client-side. Probe loop treated every candidate as a failure and surfaced "No se puede conectar con tu agente — 404 todos los canales" even though the tunnel + ffmpeg were healthy. Static baseline now includes the full known mirror set (.com / www / app / staging / .to / www.to / built-in onion). At startup the daemon also fetches /api/mirrors with IPFS fallback and merges the live origins, so a future mirror addition does not require a CLI rebuild. --- internal/cmd/daemon.go | 55 ++++++++++++++++++++++++++++++++++++- internal/engine/validate.go | 15 ++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 19c4b7c..7cd1023 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -293,7 +293,15 @@ func runDaemonStart() error { // Create persistent stream server streamSrv := engine.NewStreamServer(cfg.Download.StreamPort) streamSrv.SetUPnPEnabled(cfg.Download.EnableUPnP) - streamSrv.SetCORSAllowedOrigins(cfg.Download.CORSExtraOrigins) + // CORS extras = operator config + dynamic mirror list from /api/mirrors. + // Without the mirror merge, a user playing from `torrentclaw.to` (or any + // future mirror) hits the daemon, gets 200 + body, but no + // `Access-Control-Allow-Origin` → browser drops the response → player + // reports "404 todos los canales". Fetching /api/mirrors at startup + // future-proofs against mirror additions without a CLI rebuild. + corsExtras := append([]string(nil), cfg.Download.CORSExtraOrigins...) + corsExtras = append(corsExtras, mirrorCORSOrigins(ctx, cfg, userAgent)...) + streamSrv.SetCORSAllowedOrigins(corsExtras) // Reap HLS tmpdirs left over from a previous daemon run before we start // accepting new sessions. The in-memory registry doesn't survive a // restart, so without this disk usage grows unbounded across restarts. @@ -862,3 +870,48 @@ func superviseFunnel(ctx context.Context, d *agent.Daemon, port int) { backoff = min(backoff*2, maxBackoff) } } + +// mirrorCORSOrigins fetches /api/mirrors from the configured primary (+ extra +// mirror candidates + static IPFS fallback) and returns the discovered URLs as +// Origin strings. Best-effort: any failure logs a warning and returns an empty +// slice; the static defaultCORSAllowedOrigins in validate.go covers the known +// mirrors (.com / .to / built-in onion) so the daemon still accepts the +// official surfaces when this call fails. +// +// Bounded to a short timeout so a slow /api/mirrors response can't delay +// daemon startup — every second here is a second the user can't play. +func mirrorCORSOrigins(parent context.Context, cfg config.Config, userAgent string) []string { + ctx, cancel := context.WithTimeout(parent, 10*time.Second) + defer cancel() + + candidates := append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...) + resp, err := agent.FetchMirrorsWithFallback(ctx, candidates, userAgent) + if err != nil { + log.Printf("[cors] mirror discovery failed (%v) — using static allowlist only", err) + return nil + } + + seen := make(map[string]struct{}) + out := make([]string, 0, len(resp.Mirrors)) + add := func(rawURL string) { + if rawURL == "" { + return + } + origin := strings.TrimRight(rawURL, "/") + if _, dup := seen[origin]; dup { + return + } + seen[origin] = struct{}{} + out = append(out, origin) + } + for _, m := range resp.Mirrors { + add(m.URL) + } + if resp.Tor != nil { + add(resp.Tor.URL) + } + if len(out) > 0 { + log.Printf("[cors] merged %d mirror origins from /api/mirrors", len(out)) + } + return out +} diff --git a/internal/engine/validate.go b/internal/engine/validate.go index dd07516..0efd4de 100644 --- a/internal/engine/validate.go +++ b/internal/engine/validate.go @@ -21,12 +21,27 @@ var validSessionID = regexp.MustCompile(`^[a-zA-Z0-9_-]{1,128}$`) // 127.0.0.1 is listed in addition to localhost because some browsers treat // them as distinct origins for CORS. // +// Mirrors (`.to`, `staging.torrentclaw.com`, `www.`) are listed so a user +// playing from any official mirror succeeds the HEAD probe; without these +// the browser drops the response for "missing ACAO" and the player reports +// "404 todos los canales" even though the daemon returned 200. +// // Note: media tags (