From 5d44ee704c133b6878cb4bed9095164cf4169f54 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Fri, 22 May 2026 08:33:02 +0200 Subject: [PATCH 01/37] feat(vpn): unarr vpn command + report/arbitrate the WireGuard slot Add `unarr vpn` (status/enable/disable, with `status --check`) to manage the managed WireGuard split-tunnel from the CLI. The daemon now reports its split-tunnel state (active, mode, exit server) to the web on register and on every sync, and sends its agent id when fetching the VPN config so the web can arbitrate the single WireGuard slot (1 VPNResellers account = 1 WG keypair = 1 concurrent connection): the first agent claims it; the rest are told to run OpenVPN on their own host (1 WireGuard + up to 9 OpenVPN = 10). `status --check` passes probe=1 so it validates provisioning without claiming the slot. VPNActive drops omitempty so a downed tunnel reaches the server and frees the slot. Bumps to 0.9.2 with CHANGELOG + README VPN section. --- CHANGELOG.md | 12 +++ README.md | 60 +++++++++++ internal/agent/daemon.go | 23 +++++ internal/agent/state.go | 7 ++ internal/agent/sync.go | 7 ++ internal/agent/types.go | 15 +++ internal/cmd/daemon.go | 13 ++- internal/cmd/root.go | 3 + internal/cmd/version.go | 2 +- internal/cmd/vpn.go | 213 +++++++++++++++++++++++++++++++++++++++ internal/vpn/vpn.go | 24 ++++- 11 files changed, 373 insertions(+), 6 deletions(-) create mode 100644 internal/cmd/vpn.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 42c34bb..961db09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.2] - 2026-05-21 + +### Added + +- **vpn**: `unarr vpn` command (`status`, `enable`, `disable`) to manage the managed + WireGuard split-tunnel, with `vpn status --check` to verify provisioning. +- **vpn**: report split-tunnel state (active, exit server) to the web on register + + every sync, so the dashboard shows which agent holds the single WireGuard slot. +- **vpn**: send the agent id when fetching the VPN config so the web can arbitrate + the single WireGuard slot — the first agent claims it; the rest are told to run + OpenVPN on their own host (1 agent on WireGuard + up to 9 on OpenVPN). + ## [0.9.1] - 2026-05-21 diff --git a/README.md b/README.md index 102d151..6984bd0 100644 --- a/README.md +++ b/README.md @@ -171,6 +171,9 @@ unarr start | `unarr status` | Show daemon status and active downloads | | `unarr daemon install` | Install as system service (systemd/launchd) | | `unarr daemon uninstall` | Remove the system service | +| `unarr vpn status` | Show managed-VPN config and live tunnel state | +| `unarr vpn enable` | Turn the managed VPN on | +| `unarr vpn disable` | Turn the managed VPN off | ### System & Diagnostics @@ -280,6 +283,53 @@ The daemon connects via WebSocket for instant task delivery, with automatic HTTP - Linux: `~/.config/systemd/user/unarr.service` (systemd) - macOS: `~/Library/LaunchAgents/com.torrentclaw.unarr.plist` (launchd) +## VPN + +unarr can route your **downloads** through a managed WireGuard VPN, so peers and +trackers see the VPN server's IP instead of yours. It runs entirely in userspace +(wireguard-go + a gVisor netstack) — **no root, no `wg-quick`, no changes to your +OS routing table**. + +Requires a **PRO+ plan with the VPN add-on**. Set it up at +[torrentclaw.com/vpn](https://torrentclaw.com/vpn). + +```bash +# Turn it on (writes [downloads.vpn] enabled = true to your config) +unarr vpn enable + +# Restart the daemon so it brings the tunnel up at startup +unarr daemon restart # or: unarr start (if not installed as a service) + +# Check it's working — shows the exit server when the tunnel is up +unarr vpn status + +# Verify your account is provisioned (queries the API) +unarr vpn status --check + +# Turn it off again +unarr vpn disable +``` + +**Split-tunnel — read this:** only the torrent client's traffic goes through the +VPN. Your browser, `curl`, and every other app keep using your **real IP** — that +is by design. To check the VPN is working, look at `unarr vpn status` (or the +peer/announce IP), **not** your browser's "what's my IP". To protect your other +devices (phone, laptop), use the **OpenVPN credentials** from your profile — those +support ~10 concurrent devices and do **not** share the agent's WireGuard slot. + +**When does it fetch the config?** Once, at daemon startup. There's no periodic +refresh — after changing your exit server in the web panel or re-provisioning, +restart the daemon to pick it up. If the fetch fails the daemon logs a `[vpn]` +line and downloads in the clear (never refuses to run). + +**Self-hosted / personal VPN:** instead of the managed config, point unarr at a +local WireGuard `.conf`: + +```toml +[downloads.vpn] +config_file = "/path/to/wg.conf" # takes precedence over `enabled` +``` + ## Diagnostics ```bash @@ -438,6 +488,16 @@ If `transcode.enabled = true` but `ffmpeg` / `ffprobe` aren't on PATH, the daemon logs a warning at startup and HLS sessions are rejected at runtime with a clear error — install ffmpeg or set `enabled = false`. +#### `[downloads.vpn]` + +| Key | Type | Default | Notes | +|-----|------|---------|-------| +| `enabled` | bool | `false` | Managed VPN: at startup the daemon fetches a WireGuard config from your account and split-tunnels torrent traffic through it. Needs a PRO+ plan with the VPN add-on. Toggle with `unarr vpn enable` / `disable`. | +| `config_file` | string | `""` | Self-hosted / personal VPN: path to a local WireGuard `.conf`. **Takes precedence over `enabled`** — when set, the daemon uses this file and never calls the API. | + +See the [VPN](#vpn) section above for how it works (split-tunnel, no root) and +how to protect your other devices. + ### Environment variables Environment variables override config file values: diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index 1c324d5..a8edc9b 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -48,6 +48,12 @@ type Daemon struct { State DaemonState lastNotifiedVersion string + // Managed-VPN split-tunnel state, set by cmd/daemon.go before Run and folded + // into DaemonState on every write so external tools (`unarr vpn status`) see it. + vpnActive bool + vpnMode string + vpnServer string + // Watching tracks whether a user is viewing download progress in the web UI. Watching atomic.Bool @@ -70,6 +76,14 @@ func NewDaemon(cfg DaemonConfig, client *Client) *Daemon { // SyncClient returns the sync client for external wiring. func (d *Daemon) SyncClient() *SyncClient { return d.sync } +// SetVPNState records the managed-VPN split-tunnel state so it's reflected in the +// daemon state file (read by `unarr vpn status`). Call before Run. +func (d *Daemon) SetVPNState(active bool, mode, server string) { + d.vpnActive = active + d.vpnMode = mode + d.vpnServer = server +} + // UpdateStreamPort updates the stream port reported in sync requests. func (d *Daemon) UpdateStreamPort(port int) { d.cfg.StreamPort = port @@ -91,6 +105,9 @@ func (d *Daemon) Register(ctx context.Context) error { TailscaleIP: d.cfg.TailscaleIP, HWAccel: d.cfg.HWAccel, MaxTranscodeHeight: d.cfg.MaxTranscodeHeight, + VPNActive: d.vpnActive, + VPNMode: d.vpnMode, + VPNServer: d.vpnServer, } if free, total, err := DiskInfo(d.cfg.DownloadDir); err == nil { req.DiskFreeBytes = free @@ -141,6 +158,9 @@ func (d *Daemon) Register(ctx context.Context) error { PID: os.Getpid(), StartedAt: now, MethodStats: make(map[string]int), + VPNActive: d.vpnActive, + VPNMode: d.vpnMode, + VPNServer: d.vpnServer, } WriteState(&d.State) @@ -195,6 +215,9 @@ func (d *Daemon) Run(ctx context.Context) error { d.sync.OnWatchingChange = func(watching bool) { d.Watching.Store(watching) } + d.sync.GetVPNState = func() (bool, string, string) { + return d.vpnActive, d.vpnMode, d.vpnServer + } d.sync.OnSyncSuccess = func() { d.State.LastHeartbeat = time.Now() if d.GetActiveCount != nil { diff --git a/internal/agent/state.go b/internal/agent/state.go index 0bbd246..1de71bf 100644 --- a/internal/agent/state.go +++ b/internal/agent/state.go @@ -22,6 +22,13 @@ type DaemonState struct { FailedCount int `json:"failedCount"` TotalDownloaded int64 `json:"totalDownloaded"` MethodStats map[string]int `json:"methodStats,omitempty"` + + // Managed-VPN split-tunnel state, so `unarr vpn status` can report whether + // torrent traffic is actually being routed through the tunnel (vs. the daemon + // running but the tunnel having failed to come up → downloading in the clear). + VPNActive bool `json:"vpnActive,omitempty"` + VPNMode string `json:"vpnMode,omitempty"` // managed | self-hosted + VPNServer string `json:"vpnServer,omitempty"` // WireGuard endpoint (ip:port) } // stateFilePathFn is overridable for testing. diff --git a/internal/agent/sync.go b/internal/agent/sync.go index 864de8a..9847aba 100644 --- a/internal/agent/sync.go +++ b/internal/agent/sync.go @@ -36,6 +36,10 @@ type SyncClient struct { OnSyncSuccess func() // called after each successful sync (e.g. to update state file) GetFreeSlots func() int GetTaskStates func() []TaskState // returns current state of all active + recently finished tasks + // GetVPNState returns the live managed-VPN split-tunnel state (whether the + // WireGuard tunnel is up, the mode, and the exit server) so the web can track + // which agent holds the single WG slot. + GetVPNState func() (active bool, mode, server string) // OnDeleteFiles is called when the server requests file deletion from disk. // It should delete the files and return the IDs of successfully deleted items. OnDeleteFiles func(items []LibraryDeleteRequest) []int @@ -155,6 +159,9 @@ func (sc *SyncClient) buildRequest() SyncRequest { if sc.GetFreeSlots != nil { req.FreeSlots = sc.GetFreeSlots() } + if sc.GetVPNState != nil { + req.VPNActive, req.VPNMode, req.VPNServer = sc.GetVPNState() + } // Flush confirmed deletions from previous cycle. // Once flushed, remove IDs from deleteInFlight — the server will stop sending // them after this sync, so deduplication protection is no longer needed. diff --git a/internal/agent/types.go b/internal/agent/types.go index 487e681..8e0094a 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -26,6 +26,14 @@ type RegisterRequest struct { // up to 2160p. HWAccel string `json:"hwAccel,omitempty"` MaxTranscodeHeight int `json:"maxTranscodeHeight,omitempty"` + // Managed-VPN split-tunnel state. The web tracks which agent holds the single + // WireGuard slot (1 VPNResellers account = 1 WG keypair = 1 concurrent + // connection); other agents are told to use OpenVPN on their host instead. + // VPNActive has no omitempty: false is a meaningful state (tunnel down), not + // "unset" — the server must see it to release the slot. + VPNActive bool `json:"vpnActive"` + VPNMode string `json:"vpnMode,omitempty"` // managed | self-hosted + VPNServer string `json:"vpnServer,omitempty"` } // RegisterResponse is returned by the server after registration. @@ -344,6 +352,13 @@ type SyncRequest struct { Tasks []TaskState `json:"tasks"` CanDelete bool `json:"canDelete"` // library.allow_delete is enabled DeleteConfirmed []int `json:"deleteConfirmed,omitempty"` // library item IDs successfully deleted from disk + // Live managed-VPN split-tunnel state, sent every sync so the web sees the + // WireGuard slot owner update in near-realtime (vs. register, once at startup). + // VPNActive has no omitempty: false (tunnel down) must reach the server so it + // releases the slot, not be elided as "unset". + VPNActive bool `json:"vpnActive"` + VPNMode string `json:"vpnMode,omitempty"` + VPNServer string `json:"vpnServer,omitempty"` } // ControlAction represents a server-side control signal for a task. diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 771e9b4..54759b2 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -217,12 +217,12 @@ func runDaemonStart() error { apiURL = "https://torrentclaw.com" } fetchCtx, cancel := context.WithTimeout(context.Background(), 25*time.Second) - conf, ferr := vpn.FetchConfig(fetchCtx, apiURL, cfg.Auth.APIKey, "unarr/"+Version) + conf, ferr := vpn.FetchConfig(fetchCtx, apiURL, cfg.Auth.APIKey, "unarr/"+Version, cfg.Agent.ID, false) cancel() var fe *vpn.FetchError switch { case ferr != nil && errors.As(ferr, &fe) && fe.Code == vpn.ErrSlotOnDevice: - log.Printf("[vpn] slot is active on one of your devices — downloads will NOT use the VPN. Switch the slot to unarr in your profile to protect downloads.") + log.Printf("[vpn] the single WireGuard slot is already held by another unarr agent — this one downloads in the clear. To protect this machine too, set up OpenVPN on it (1 agent uses WireGuard, the rest use OpenVPN — up to 10). See https://torrentclaw.com/vpn") case ferr != nil: log.Printf("[vpn] could not enable VPN (%v) — downloading in the clear", ferr) default: @@ -236,6 +236,15 @@ func runDaemonStart() error { } } + // Record VPN split-tunnel state for `unarr vpn status`. + if vpnTunnel != nil { + mode := "managed" + if cfg.Download.VPN.ConfigFile != "" { + mode = "self-hosted" + } + d.SetVPNState(true, mode, vpnTunnel.Endpoint) + } + // Create torrent downloader torrentDl, err := engine.NewTorrentDownloader(engine.TorrentConfig{ DataDir: cfg.Download.Dir, diff --git a/internal/cmd/root.go b/internal/cmd/root.go index 2217340..55786fb 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -103,6 +103,8 @@ Source: https://github.com/torrentclaw/unarr`, statusCmd.GroupID = "daemon" daemonCmd := newDaemonCmd() daemonCmd.GroupID = "daemon" + vpnCmd := newVPNCmd() + vpnCmd.GroupID = "daemon" // System & Diagnostics statsCmd := newStatsCmd() @@ -146,6 +148,7 @@ Source: https://github.com/torrentclaw/unarr`, stopCmd, statusCmd, daemonCmd, + vpnCmd, // System & Diagnostics statsCmd, doctorCmd, diff --git a/internal/cmd/version.go b/internal/cmd/version.go index e639749..18dac17 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.9.1" +var Version = "0.9.2" diff --git a/internal/cmd/vpn.go b/internal/cmd/vpn.go new file mode 100644 index 0000000..fb11532 --- /dev/null +++ b/internal/cmd/vpn.go @@ -0,0 +1,213 @@ +package cmd + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/fatih/color" + "github.com/spf13/cobra" + "github.com/torrentclaw/unarr/internal/agent" + "github.com/torrentclaw/unarr/internal/config" + "github.com/torrentclaw/unarr/internal/vpn" +) + +func newVPNCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "vpn", + Short: "Manage the managed-VPN split-tunnel for downloads", + Long: `Enable, disable, and inspect the managed VPN. + +When enabled, the daemon fetches a WireGuard config from your TorrentClaw account +at startup and routes ONLY the torrent client's traffic (peers + trackers) through +an in-process WireGuard tunnel — no root, no OS routing changes. + +This is split-tunnel: your browser and other apps keep using your real IP. Only +your downloads are hidden behind the VPN server. + +The VPN requires a PRO+ plan with the VPN add-on. Set it up at +https://torrentclaw.com/vpn and configure your other devices (phone, laptop) with +the OpenVPN credentials from your profile — those don't share the agent's tunnel.`, + Example: ` unarr vpn status # is the tunnel up? which server? + unarr vpn enable # turn the managed VPN on + unarr vpn disable # turn it off`, + RunE: func(cmd *cobra.Command, args []string) error { + return cmd.Help() + }, + } + cmd.AddCommand(newVPNStatusCmd(), newVPNEnableCmd(), newVPNDisableCmd()) + return cmd +} + +func newVPNStatusCmd() *cobra.Command { + var check bool + cmd := &cobra.Command{ + Use: "status", + Short: "Show VPN configuration and live tunnel state", + Example: " unarr vpn status\n unarr vpn status --check # also verify your account is provisioned", + RunE: func(cmd *cobra.Command, args []string) error { + return runVPNStatus(check) + }, + } + cmd.Flags().BoolVar(&check, "check", false, "query the API to verify the VPN is provisioned on your account") + return cmd +} + +func runVPNStatus(check bool) error { + bold := color.New(color.Bold) + dim := color.New(color.FgHiBlack) + green := color.New(color.FgGreen) + yellow := color.New(color.FgYellow) + cyan := color.New(color.FgCyan) + + cfg := loadConfig() + + fmt.Println() + bold.Println(" Managed VPN") + fmt.Println() + + // ── Configured mode ── + switch { + case cfg.Download.VPN.ConfigFile != "": + cyan.Println(" Mode: self-hosted (local config_file)") + fmt.Printf(" Config: %s\n", cfg.Download.VPN.ConfigFile) + case cfg.Download.VPN.Enabled: + cyan.Println(" Mode: managed (config fetched from your account)") + default: + dim.Println(" Mode: off") + fmt.Println() + dim.Println(" Enable with `unarr vpn enable` (needs a PRO+ plan with the VPN add-on).") + fmt.Println() + return nil + } + + // ── Live tunnel state (from the daemon state file) ── + state := agent.ReadState() + alive := state != nil && isDaemonAlive(state) + fmt.Println() + switch { + case alive && state.VPNActive: + server := state.VPNServer + if host, _, err := net.SplitHostPort(server); err == nil && host != "" { + server = host + } + green.Println(" ✓ Tunnel ACTIVE — torrent traffic is routed through the VPN") + if server != "" { + fmt.Printf(" Exit server: %s\n", server) + } + case alive: + yellow.Println(" ⚠ Daemon is running but the tunnel is NOT up — downloads go in the clear.") + dim.Println(" Check `unarr daemon logs` for a [vpn] line. Common cause: no active") + dim.Println(" VPN on your account (set it up at https://torrentclaw.com/vpn).") + default: + dim.Println(" Daemon not running — start it (`unarr start`) to bring the tunnel up.") + } + + // ── Optional live provisioning check ── + if check { + fmt.Println() + if cfg.Auth.APIKey == "" { + yellow.Println(" ⚠ No API key — run `unarr init` first.") + } else { + apiURL := cfg.Auth.APIURL + if apiURL == "" { + apiURL = "https://torrentclaw.com" + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + _, err := vpn.FetchConfig(ctx, apiURL, cfg.Auth.APIKey, "unarr/"+Version, cfg.Agent.ID, true) + cancel() + switch { + case err == nil: + green.Println(" ✓ Account provisioned — a VPN config is available.") + default: + yellow.Printf(" ⚠ %s\n", err) + } + } + } + + // ── Split-tunnel reminder ── + fmt.Println() + dim.Println(" Split-tunnel: only your downloads use the VPN. Your browser and other") + dim.Println(" apps keep your real IP — that's by design. Use the OpenVPN credentials in") + dim.Println(" your profile to protect your other devices.") + fmt.Println() + return nil +} + +func newVPNEnableCmd() *cobra.Command { + return &cobra.Command{ + Use: "enable", + Short: "Turn the managed VPN on", + Example: " unarr vpn enable", + RunE: func(cmd *cobra.Command, args []string) error { + return setVPNEnabled(true) + }, + } +} + +func newVPNDisableCmd() *cobra.Command { + return &cobra.Command{ + Use: "disable", + Short: "Turn the managed VPN off", + Example: " unarr vpn disable", + RunE: func(cmd *cobra.Command, args []string) error { + return setVPNEnabled(false) + }, + } +} + +func setVPNEnabled(enabled bool) error { + green := color.New(color.FgGreen) + yellow := color.New(color.FgYellow) + dim := color.New(color.FgHiBlack) + + cfg := loadConfig() + + if enabled && cfg.Auth.APIKey == "" { + return fmt.Errorf("no API key configured — run `unarr init` first (the managed VPN fetches its config from your account)") + } + + if cfg.Download.VPN.Enabled == enabled { + fmt.Println() + dim.Printf(" VPN is already %s — nothing to do.\n", enabledWord(enabled)) + fmt.Println() + return nil + } + + cfg.Download.VPN.Enabled = enabled + + configPath := config.FilePath() + if cfgFile != "" { + configPath = cfgFile + } + if err := config.Save(cfg, configPath); err != nil { + return fmt.Errorf("save config: %w", err) + } + appCfg = cfg + + fmt.Println() + green.Printf(" ✓ Managed VPN %s.\n", enabledWord(enabled)) + + if enabled && cfg.Download.VPN.ConfigFile != "" { + yellow.Println(" ⚠ A config_file is set, so self-hosted mode takes precedence and the") + yellow.Println(" managed config from your account is ignored. Clear config_file to use it.") + } + + // The tunnel is brought up once at daemon startup; a plain config reload does + // NOT (re)create it. Tell the user to restart the daemon if it's running. + if state := agent.ReadState(); state != nil && isDaemonAlive(state) { + fmt.Println() + dim.Println(" The daemon is running. Restart it for this to take effect:") + dim.Println(" unarr daemon restart") + } + fmt.Println() + return nil +} + +func enabledWord(enabled bool) string { + if enabled { + return "enabled" + } + return "disabled" +} diff --git a/internal/vpn/vpn.go b/internal/vpn/vpn.go index b8bab50..7f50ea1 100644 --- a/internal/vpn/vpn.go +++ b/internal/vpn/vpn.go @@ -18,6 +18,7 @@ import ( "net" "net/http" "net/netip" + neturl "net/url" "strconv" "strings" "time" @@ -56,9 +57,22 @@ type fetchResponse struct { } // FetchConfig retrieves the agent's WireGuard .conf from the web API. Auth is -// `Authorization: Bearer ` (the agent-auth scheme). -func FetchConfig(ctx context.Context, apiURL, apiKey, userAgent string) (string, error) { +// `Authorization: Bearer ` (the agent-auth scheme). agentId lets the web +// arbitrate the single WireGuard slot (first agent to ask claims it; others get +// 409 → ErrSlotOnDevice and should use OpenVPN on their host instead). +func FetchConfig(ctx context.Context, apiURL, apiKey, userAgent, agentID string, probe bool) (string, error) { + q := neturl.Values{} + if agentID != "" { + q.Set("agentId", agentID) + } + if probe { + // Validate provisioning without claiming the WireGuard slot (status --check). + q.Set("probe", "1") + } url := strings.TrimSuffix(apiURL, "/") + "/api/internal/agent/vpn-config" + if len(q) > 0 { + url += "?" + q.Encode() + } req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return "", &FetchError{ErrUpstream, err.Error()} @@ -103,6 +117,10 @@ func FetchConfig(ctx context.Context, apiURL, apiKey, userAgent string) (string, type Tunnel struct { dev *device.Device Net *netstack.Net + // Endpoint is the resolved ip:port of the WireGuard server this tunnel + // exits through — surfaced in `unarr vpn status` so the user can see which + // VPN server their torrent traffic is routed out of. + Endpoint string } // Up parses a WireGuard .conf and brings up the tunnel in userspace. @@ -132,7 +150,7 @@ func Up(confText string) (*Tunnel, error) { return nil, fmt.Errorf("wireguard up: %w", err) } - return &Tunnel{dev: dev, Net: tnet}, nil + return &Tunnel{dev: dev, Net: tnet, Endpoint: wc.endpoint}, nil } // Close tears the tunnel down. From 0e8d9e87f6b01e8b23520c04652310e8d6839f77 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Sat, 23 May 2026 15:34:58 +0200 Subject: [PATCH 02/37] fix(engine): truncate errorMessage before reporting status A failed usenet extract sets task.ErrorMessage to the full unrar/par2 dump (multi-KB). Sent raw, the web /agent/status route rejected it and the terminal report failed, leaving the task stuck non-terminal. Cap the reported errorMessage at 2000 bytes (rune-safe) in the status snapshot, matching the server's stored length. --- internal/engine/task.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/internal/engine/task.go b/internal/engine/task.go index ceba6c9..09621e8 100644 --- a/internal/engine/task.go +++ b/internal/engine/task.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" "time" + "unicode/utf8" "github.com/torrentclaw/unarr/internal/agent" ) @@ -229,10 +230,25 @@ func (t *Task) ToStatusUpdate() agent.StatusUpdate { FileName: t.FileName, FilePath: t.FilePath, StreamURL: t.StreamURL, - ErrorMessage: t.ErrorMessage, + // Cap to the server's stored length. A failed extract can carry a + // multi-KB unrar/par2 dump; sending it raw made /agent/status 400 + // the whole report, leaving the task stuck non-terminal. + ErrorMessage: truncateMsg(t.ErrorMessage, 2000), } } +// truncateMsg caps s to at most max bytes without splitting a UTF-8 rune. +func truncateMsg(s string, max int) string { + if len(s) <= max { + return s + } + cut := max + for cut > 0 && !utf8.RuneStart(s[cut]) { + cut-- + } + return s[:cut] +} + // MagnetURI builds a magnet link from the info hash. func (t *Task) MagnetURI() string { return "magnet:?xt=urn:btih:" + t.InfoHash From a5a92b111bbea6466e4362cedba793f27892250b Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Sat, 23 May 2026 15:36:37 +0200 Subject: [PATCH 03/37] feat(usenet): warn at startup when par2 or extractor is missing A usenet-enabled agent silently produces corrupt files when par2 is not installed: bad NNTP segments go unrepaired and unrar reports checksum errors. Likewise, no unrar/7z means RAR-packed downloads can't be unpacked at all. When the registered agent has the usenet feature, check par2 and the extractor (unrar/7z) in PATH and log a loud WARNING for each missing one, mirroring the existing ffmpeg-for-HLS warning. --- internal/agent/daemon.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index a8edc9b..385454a 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "os/exec" "runtime" "strings" "sync/atomic" @@ -178,6 +179,21 @@ func (d *Daemon) Run(ctx context.Context) error { log.Printf("Agent registered: %s (%s) [%s]", d.User.Name, d.User.Email, d.User.Plan) log.Printf("Features: torrent=%v debrid=%v usenet=%v", d.Features.Torrent, d.Features.Debrid, d.Features.Usenet) + // Usenet needs par2 (segment repair) + an extractor (RAR/7z) on the host. + // Without par2, a single bad segment corrupts the file silently; without + // an extractor, RAR-packed downloads can't be unpacked. Warn loudly at + // startup so the operator installs them before the first download fails. + if d.Features.Usenet { + if _, err := exec.LookPath("par2"); err != nil { + log.Printf("[usenet] WARNING: par2 not found in PATH — corrupted segments cannot be repaired and extraction may fail. Install par2 (apt install par2 / brew install par2).") + } + _, unrarErr := exec.LookPath("unrar") + _, sevenZErr := exec.LookPath("7z") + if unrarErr != nil && sevenZErr != nil { + log.Printf("[usenet] WARNING: no archive extractor (unrar or 7z) found — RAR-packed downloads cannot be unpacked. Install unrar or 7z.") + } + } + // Wire sync callbacks d.sync.OnNewTasks = func(tasks []Task) { if d.OnTasksClaimed != nil { From 9176e877eb82d86cf26e4a95aaded4e595a58bb5 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Tue, 26 May 2026 16:00:18 +0200 Subject: [PATCH 04/37] fix(hls): clamp ffmpeg bitrate to the level we derive from outputHeight MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Asking for 2160p quality on a 720p source kept the daemon's qcap.VideoBitrate at 25 Mbps even after outputHeight was clamped to the source. The level H264LevelForHeight picks for the 720p output is 3.1 / 4.0, which rejects any VBV >20 Mbps — libx264 then exited with "VBV bitrate (25000) > level limit" on every restart, ffmpeg auto-restarted 3 times, master.m3u8 never appeared, and the player got stuck at "Preparando sesión". Re-derive the (height, bitrate) cap from the EFFECTIVE outputHeight via the new capForHeight helper. Result: 720p source asked for 2160p → outputs 720p with the 3500 kbps bitrate the level actually accepts. ffmpeg runs cleanly, master.m3u8 appears, playback starts. The web also clamps effectiveQuality to source resolution before the session row is written, so the daemon mostly receives sane labels. This change keeps the daemon defensive against (a) older web clients that still ask for upscaled qualities, and (b) future quality="original" requests where qcap is empty and Transcode.VideoBitrate could overshoot the level too. --- internal/cmd/version.go | 2 +- internal/engine/hls.go | 12 +++++++++++- internal/engine/webrtc_stream.go | 23 +++++++++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/internal/cmd/version.go b/internal/cmd/version.go index 18dac17..e03063b 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.9.2" +var Version = "0.9.3" diff --git a/internal/engine/hls.go b/internal/engine/hls.go index 03a9948..cc0b442 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -863,7 +863,17 @@ func buildHLSFFmpegArgsAt(cfg HLSSessionConfig, probe *StreamProbe, tmpDir strin } args = append(args, "-profile:v", "main", "-level:v", H264LevelForHeight(outputHeight)) - bitrate := qcap.VideoBitrate + // Bitrate must match the level libx264 actually picks for outputHeight, + // not the qcap target for the user's requested label. If a user asks for + // "2160p" on a 1080p source, qcap.VideoBitrate is 25 Mbps but the level + // (derived from outputHeight=1080) is 4.0, which rejects bitrates >20 Mbps + // with "VBV bitrate (25000) > level limit (20000)". Re-derive the cap + // from the effective height so the (level, bitrate) pair stays coherent. + effectiveCap := capForHeight(outputHeight) + bitrate := effectiveCap.VideoBitrate + if bitrate == "" { + bitrate = qcap.VideoBitrate + } if bitrate == "" { bitrate = cfg.Transcode.VideoBitrate } diff --git a/internal/engine/webrtc_stream.go b/internal/engine/webrtc_stream.go index fa4016c..1b4905a 100644 --- a/internal/engine/webrtc_stream.go +++ b/internal/engine/webrtc_stream.go @@ -130,6 +130,29 @@ func resolveQualityCap(label string) qualityCap { } } +// capForHeight returns the bitrate-cap pair appropriate for an effective +// output height. Used after clamping outputHeight to the source's resolution: +// asking ffmpeg for "2160p" bitrate (25 Mbps) on a 1080p source overshoots +// the H.264 level we derived from the EFFECTIVE height (4.0, max 20 Mbps) and +// makes libx264 refuse with "VBV bitrate > level limit". This helper picks +// the bitrate that matches the level libx264 will actually accept. +func capForHeight(height int) qualityCap { + switch { + case height <= 0: + return qualityCap{} + case height <= 480: + return qualityCap{MaxHeight: 480, VideoBitrate: "1500k"} + case height <= 720: + return qualityCap{MaxHeight: 720, VideoBitrate: "3500k"} + case height <= 1080: + return qualityCap{MaxHeight: 1080, VideoBitrate: "6000k"} + case height <= 1440: + return qualityCap{MaxHeight: 1440, VideoBitrate: "12000k"} + default: + return qualityCap{MaxHeight: 2160, VideoBitrate: "25000k"} + } +} + // buildStreamSource picks between passthrough and transcoded source. ffprobe // failure or missing ffmpeg falls back to passthrough — the browser surfaces // a clearer codec error than us refusing to start. From ca7de23a56b14aac9ae3c0c7647789ce9886f24b Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Tue, 26 May 2026 18:04:35 +0200 Subject: [PATCH 05/37] feat(stream)!: retire WebRTC, HLS-only, bump 0.9.4 Drops the custom WebRTC DataChannel pipeline + pion deps + WSS signaling client + wire framing. Every in-browser playback now uses HLS over HTTP from the daemon (Tailscale/LAN/UPnP). Browser P2P never re-enabled. Wire renames (incompatible with web < 2026-05-26): agent.WebRTCSession => agent.StreamSession, SyncResponse.WebRTCSessions (JSON: webrtcSessions) => StreamSessions (JSON: streamSessions). MIN_AGENT_VERSION is bumped to 0.9.4 on the web side so older agents see an upgrade card. Also fixes the libx264 'VBV bitrate > level limit' abort by clamping the encoder bitrate to the effective output height instead of the requested label (carried over from the prior 0.9.3 unreleased work). The seed_file vertical (mode=seed_file handler + engine.SeedFile) was retired with the in-browser P2P player. [downloads.webrtc] config block deleted; existing TOML files with the section still parse fine. --- .github/workflows/ci.yml | 7 +- CHANGELOG.md | 31 + README.md | 30 +- SECURITY.md | 4 +- cmd/wstracker-probe/main.go | 268 ------ go.mod | 2 +- internal/agent/daemon.go | 8 +- internal/agent/signal_client.go | 258 ------ internal/agent/signal_client_test.go | 196 ----- internal/agent/sync.go | 10 +- internal/agent/types.go | 33 +- internal/cmd/daemon.go | 110 +-- internal/cmd/download.go | 3 - ...registry.go => player_session_registry.go} | 50 +- internal/cmd/probe_hwaccel.go | 2 +- internal/cmd/seed_file_handler.go | 65 -- internal/cmd/version.go | 2 +- internal/config/config.go | 34 +- internal/config/config_test.go | 23 +- internal/engine/hls.go | 4 +- internal/engine/probe.go | 2 +- internal/engine/seed_file.go | 138 --- internal/engine/seed_file_test.go | 164 ---- internal/engine/stream_source.go | 2 +- internal/engine/torrent.go | 51 +- internal/engine/transcode_quality.go | 64 ++ internal/engine/transcoder.go | 31 +- internal/engine/webrtc.go | 36 - internal/engine/webrtc_stream.go | 807 ------------------ internal/engine/webrtc_test.go | 177 ---- internal/engine/wire/proto.go | 254 ------ internal/engine/wire/proto_test.go | 193 ----- internal/library/mediainfo/ffmpeg.go | 2 +- 33 files changed, 207 insertions(+), 2854 deletions(-) delete mode 100644 cmd/wstracker-probe/main.go delete mode 100644 internal/agent/signal_client.go delete mode 100644 internal/agent/signal_client_test.go rename internal/cmd/{webrtc_session_registry.go => player_session_registry.go} (51%) delete mode 100644 internal/cmd/seed_file_handler.go delete mode 100644 internal/engine/seed_file.go delete mode 100644 internal/engine/seed_file_test.go create mode 100644 internal/engine/transcode_quality.go delete mode 100644 internal/engine/webrtc.go delete mode 100644 internal/engine/webrtc_stream.go delete mode 100644 internal/engine/webrtc_test.go delete mode 100644 internal/engine/wire/proto.go delete mode 100644 internal/engine/wire/proto_test.go diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dd5fc7d..7dabcc4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,14 +86,11 @@ jobs: run: | # Threshold applies only to engine and agent — cmd contains interactive UI # commands (config menus, daemon, auth browser) that are not unit-testable. - # WebRTC files are excluded: deprecated, slated for removal in 0.9.0. go test -race -coverprofile=coverage-core.out -covermode=atomic \ ./internal/engine/... \ ./internal/agent/... - # Strip webrtc lines from the profile before computing the threshold. - grep -v '/internal/engine/webrtc' coverage-core.out > coverage-core-filtered.out - COVERAGE=$(go tool cover -func=coverage-core-filtered.out | grep ^total | awk '{print $3}' | tr -d '%') - echo "Coverage on engine+agent (excluding webrtc): ${COVERAGE}%" + COVERAGE=$(go tool cover -func=coverage-core.out | grep ^total | awk '{print $3}' | tr -d '%') + echo "Coverage on engine+agent: ${COVERAGE}%" python3 -c " coverage = float('${COVERAGE}') threshold = 50.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 961db09..dfc0f79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,37 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.4] - 2026-05-26 + +### Removed + +- **streaming**: retire the custom WebRTC DataChannel pipeline. The daemon no + longer ships pion/webrtc, the WSS signaling client, or the wire framing + package — every in-browser session now uses HLS over HTTP from the daemon + (Tailscale / LAN / UPnP). Browser P2P (WebTorrent) bytes never re-enabled. +- **config**: `[downloads.webrtc]` block removed from the TOML schema; existing + config files with the section parse cleanly because go-toml ignores unknown + sections. +- **seed_file**: `mode=seed_file` task handler + `engine.SeedFile` helper + dropped — the last in-browser caller was retired with the WebRTC player. +- **wstracker-probe**: standalone probe binary removed. + +### Changed + +- **agent wire**: `SyncResponse.WebRTCSessions` (JSON: `webrtcSessions`) renamed + to `StreamSessions` (JSON: `streamSessions`). The Go type `agent.WebRTCSession` + is now `agent.StreamSession`. Wire-incompatible with web < 2026-05-26. +- **torrent**: `buildMagnet` no longer accepts an `extraTrackers` variadic — + the default tracker list is the only set used. + +### Fixed + +- **hls**: clamp the ffmpeg `-b:v` to the bitrate cap derived from the EFFECTIVE + output height instead of the requested quality. Previously asking for "2160p" + on a 1080p source overshot the H.264 level we resolved from the effective + height (4.0, max 20 Mbps) and made libx264 abort with + `VBV bitrate > level limit`. + ## [0.9.2] - 2026-05-21 ### Added diff --git a/README.md b/README.md index 6984bd0..e1cc6e3 100644 --- a/README.md +++ b/README.md @@ -434,24 +434,12 @@ country = "US" ### Streaming reference -The in-browser player on torrentclaw.com streams from the daemon over WebRTC -(low-latency P2P) or HLS (HTTP fragments + ffmpeg transcode for codecs the -browser can't decode natively). Both are enabled by default — a fresh install -"just works" without editing the TOML. Disable surgically only if you have a -reason. +The in-browser player on torrentclaw.com streams from the daemon over HLS +(HTTP fragments + ffmpeg transcode for codecs the browser can't decode +natively). Enabled by default — a fresh install "just works" without editing +the TOML. ```toml -[downloads.webrtc] -enabled = true # master switch -trackers = ["wss://tracker.torrentclaw.com"] # signaling trackers -stun_servers = [ # NAT traversal - "stun:stun.l.google.com:19302", - "stun:stun1.l.google.com:19302", -] -turn_servers = [] # optional TURN relays -turn_user = "" -turn_pass = "" - [downloads.transcode] enabled = true # master switch hw_accel = "auto" # auto | none | nvenc | qsv | vaapi | videotoolbox @@ -462,16 +450,6 @@ max_height = 0 # 0 = no cap; e.g. 720 forces 720p max max_concurrent = 2 # max simultaneous ffmpeg processes ``` -#### `[downloads.webrtc]` - -| Key | Type | Default | Notes | -|-----|------|---------|-------| -| `enabled` | bool | `true` | Browser↔daemon WebRTC peer for the in-browser P2P player. Disable to skip WebRTC tracker signalling (saves ~5MB RAM, blocks WebRTC streaming — HLS still works). | -| `trackers` | `[]string` | `["wss://tracker.torrentclaw.com"]` | Signaling trackers for peer discovery. | -| `stun_servers` | `[]string` | Google public STUN ×2 | ICE candidate gathering. | -| `turn_servers` | `[]string` | `[]` | Optional TURN relays for symmetric-NAT users. | -| `turn_user` / `turn_pass` | string | `""` | Credentials for authed TURN servers. Applied to all `turn_servers`. | - #### `[downloads.transcode]` | Key | Type | Default | Notes | diff --git a/SECURITY.md b/SECURITY.md index 9b64c4c..b88b335 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -72,7 +72,7 @@ Docker Hub vulnerability count: package pulls ~40 codec/parser libraries (`x264`, `x265`, `libvpx`, `aom`, `dav1d`, `libtheora`, `libvorbis`, `libwebp`, `libbluray`, `libopenmpt`, …). Each carries a long NVD history that Alpine does not backport. ffmpeg is a - **functional dependency** — the WebRTC/HLS transcode pipeline shells out to + **functional dependency** — the HLS transcode pipeline shells out to `ffmpeg`/`ffprobe` to decode untrusted media and re-encode to H.264 + AAC. ### Accepted risk and policy @@ -100,7 +100,7 @@ Recommended additions for exposed deployments: - no-new-privileges:true ``` -If you do not need WebRTC/HLS transcoding, you can run with transcoding disabled to +If you do not need HLS transcoding, you can run with transcoding disabled to avoid feeding untrusted media to ffmpeg at all. ## Disclosure Policy diff --git a/cmd/wstracker-probe/main.go b/cmd/wstracker-probe/main.go deleted file mode 100644 index 7eecaa5..0000000 --- a/cmd/wstracker-probe/main.go +++ /dev/null @@ -1,268 +0,0 @@ -// wstracker-probe — connects to a WebSocket BitTorrent tracker and either -// (a) advertises a fake info_hash to verify announce signalling, or -// (b) seeds a real file via the WebTorrent protocol so a browser -// webtorrent.js client can fetch it for end-to-end verification. -// -// Modes: -// -// wstracker-probe -tracker wss://tracker.torrentclaw.com -// Announces a random info_hash; exits 0 on TrackerAnnounceSuccessful. -// -// wstracker-probe -tracker wss://… -seed /path/to/file.mp4 -// Builds a single-file torrent in memory, seeds forever, prints the -// magnet (with the WSS tracker injected). Ctrl-C to stop. -// -// Useful for browser ↔ unarr e2e — point a webtorrent.js page at the -// printed magnet and the player should pull pieces via WebRTC data channel. -package main - -import ( - "context" - "crypto/rand" - "flag" - "fmt" - "log" - "net/url" - "os" - "os/signal" - "path/filepath" - "syscall" - "time" - - alog "github.com/anacrolix/log" - "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/storage" - "github.com/pion/webrtc/v4" -) - -func main() { - tracker := flag.String("tracker", "wss://tracker.torrentclaw.com", "WSS tracker URL to probe") - timeout := flag.Duration("timeout", 30*time.Second, "max wait for successful announce (ignored in -seed mode)") - seedPath := flag.String("seed", "", "path to a file to seed (single-file torrent). When set, runs forever instead of exiting on first announce.") - flag.Parse() - - if *seedPath != "" { - runSeeder(*seedPath, *tracker) - return - } - - runProbe(*tracker, *timeout) -} - -// runProbe — single random-hash announce, exits on success/error/timeout. -func runProbe(trackerURL string, timeout time.Duration) { - tmp, err := os.MkdirTemp("", "wstracker-probe-*") - if err != nil { - log.Fatalf("temp dir: %v", err) - } - defer os.RemoveAll(tmp) - - cfg := baseClientConfig(tmp) - - annSuccess := make(chan struct{}, 1) - annError := make(chan error, 1) - cfg.Callbacks.StatusUpdated = append( - cfg.Callbacks.StatusUpdated, - func(e torrent.StatusUpdatedEvent) { - switch e.Event { //nolint:exhaustive // peer events are noise for tracker probe - case torrent.TrackerConnected: - if e.Error != nil { - fmt.Printf("[probe] tracker connect FAILED: %v\n", e.Error) - } else { - fmt.Printf("[probe] tracker connected: %s\n", e.Url) - } - case torrent.TrackerAnnounceSuccessful: - fmt.Printf("[probe] tracker announce OK: %s ih=%s\n", e.Url, e.InfoHash) - select { - case annSuccess <- struct{}{}: - default: - } - case torrent.TrackerAnnounceError: - fmt.Printf("[probe] tracker announce ERROR: %s ih=%s err=%v\n", e.Url, e.InfoHash, e.Error) - select { - case annError <- e.Error: - default: - } - case torrent.TrackerDisconnected: - fmt.Printf("[probe] tracker disconnected: %s err=%v\n", e.Url, e.Error) - } - }, - ) - - client, err := torrent.NewClient(cfg) - if err != nil { - log.Fatalf("create torrent client: %v", err) - } - defer client.Close() - - var ih [20]byte - if _, err := rand.Read(ih[:]); err != nil { - log.Fatalf("random info_hash: %v", err) - } - magnet := fmt.Sprintf("magnet:?xt=urn:btih:%x&tr=%s", ih, trackerURL) - fmt.Printf("[probe] tracker=%s info_hash=%x timeout=%s\n", trackerURL, ih, timeout) - - t, err := client.AddMagnet(magnet) - if err != nil { - log.Fatalf("add magnet: %v", err) - } - defer t.Drop() - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - select { - case <-annSuccess: - fmt.Println("[probe] OK — tracker announce succeeded") - os.Exit(0) - case err := <-annError: - fmt.Printf("[probe] FAIL — tracker announce error: %v\n", err) - os.Exit(1) - case <-ctx.Done(): - fmt.Printf("[probe] FAIL — timeout after %s\n", timeout) - os.Exit(2) - } -} - -// runSeeder — builds a single-file torrent for the given path, adds it to -// a WebTorrent-enabled client, and seeds until SIGINT/SIGTERM. -func runSeeder(filePath, trackerURL string) { - abs, err := filepath.Abs(filePath) - if err != nil { - log.Fatalf("resolve seed path: %v", err) - } - st, err := os.Stat(abs) - if err != nil { - log.Fatalf("stat seed file: %v", err) - } - if st.IsDir() { - log.Fatalf("-seed currently supports a single file, not a directory: %s", abs) - } - - dataDir := filepath.Dir(abs) - - // Build single-file torrent metadata. - info := metainfo.Info{ - PieceLength: chooseSeedPieceLength(st.Size()), - Name: filepath.Base(abs), - } - if err := info.BuildFromFilePath(abs); err != nil { - log.Fatalf("build info from file: %v", err) - } - infoBytes, err := bencode.Marshal(info) - if err != nil { - log.Fatalf("marshal info: %v", err) - } - - mi := &metainfo.MetaInfo{ - InfoBytes: infoBytes, - AnnounceList: metainfo.AnnounceList{{trackerURL}}, - CreatedBy: "wstracker-probe", - } - ih := mi.HashInfoBytes() - - cfg := baseClientConfig(dataDir) - cfg.Seed = true - - cfg.Callbacks.StatusUpdated = append( - cfg.Callbacks.StatusUpdated, - func(e torrent.StatusUpdatedEvent) { - switch e.Event { //nolint:exhaustive - case torrent.TrackerConnected: - if e.Error != nil { - fmt.Printf("[seed] tracker connect FAILED: %v\n", e.Error) - } else { - fmt.Printf("[seed] tracker connected: %s\n", e.Url) - } - case torrent.TrackerAnnounceSuccessful: - fmt.Printf("[seed] tracker announce OK: %s ih=%s\n", e.Url, e.InfoHash) - case torrent.TrackerAnnounceError: - fmt.Printf("[seed] tracker announce ERROR: %s err=%v\n", e.Url, e.Error) - case torrent.TrackerDisconnected: - fmt.Printf("[seed] tracker disconnected: %s err=%v\n", e.Url, e.Error) - } - }, - ) - - client, err := torrent.NewClient(cfg) - if err != nil { - log.Fatalf("create torrent client: %v", err) - } - defer client.Close() - - t, err := client.AddTorrent(mi) - if err != nil { - log.Fatalf("add torrent: %v", err) - } - t.DownloadAll() - - dn := url.QueryEscape(info.Name) - enc := url.QueryEscape(trackerURL) - magnet := fmt.Sprintf("magnet:?xt=urn:btih:%s&dn=%s&tr=%s", ih.HexString(), dn, enc) - - fmt.Printf("[seed] file=%s size=%d bytes piece_length=%d\n", abs, st.Size(), info.PieceLength) - fmt.Printf("[seed] info_hash=%s\n", ih.HexString()) - fmt.Printf("[seed] magnet=%s\n", magnet) - fmt.Println("[seed] seeding via WebRTC. Ctrl-C to stop.") - - stop := make(chan os.Signal, 1) - signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) - statTicker := time.NewTicker(5 * time.Second) - defer statTicker.Stop() - - for { - select { - case <-statTicker.C: - s := t.Stats() - fmt.Printf("[seed] peers=%d uploaded=%d bytes seeders=%d leechers=%d\n", - s.ActivePeers, s.BytesWrittenData.Int64(), - s.ConnectedSeeders, s.ActivePeers-s.ConnectedSeeders) - case <-stop: - fmt.Println("[seed] stopping") - return - } - } -} - -// baseClientConfig — shared anacrolix client config for both modes. -// WebTorrent is the only transport enabled; TCP/uTP/DHT/IPv6 are disabled -// to keep the moving parts to the minimum required for a WSS-only test. -func baseClientConfig(dataDir string) *torrent.ClientConfig { - cfg := torrent.NewDefaultClientConfig() - cfg.DataDir = dataDir - cfg.DefaultStorage = storage.NewMMap(dataDir) - cfg.NoUpload = false - cfg.DisableTCP = true - cfg.DisableUTP = true - cfg.DisableIPv6 = true - cfg.NoDHT = true - cfg.NoDefaultPortForwarding = true - cfg.ListenPort = 0 - cfg.Logger = alog.Default.FilterLevel(alog.Critical) - cfg.DisableWebtorrent = false - cfg.ICEServerList = []webrtc.ICEServer{ - {URLs: []string{"stun:stun.l.google.com:19302"}}, - {URLs: []string{"stun:stun1.l.google.com:19302"}}, - } - return cfg -} - -// chooseSeedPieceLength picks a sane piece size for a given file size. -// Mirrors the libtorrent / qBittorrent ladder so the resulting torrent -// is interoperable with mainstream clients. -func chooseSeedPieceLength(size int64) int64 { - switch { - case size < 4*1024*1024: // < 4 MiB - return 16 * 1024 // 16 KiB - case size < 64*1024*1024: // < 64 MiB - return 64 * 1024 // 64 KiB - case size < 512*1024*1024: // < 512 MiB - return 256 * 1024 // 256 KiB - case size < 4*1024*1024*1024: // < 4 GiB - return 1024 * 1024 // 1 MiB - default: - return 4 * 1024 * 1024 // 4 MiB - } -} diff --git a/go.mod b/go.mod index f3aea87..a47f6e3 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/google/uuid v1.6.0 github.com/huin/goupnp v1.3.0 github.com/olekukonko/tablewriter v1.1.4 - github.com/pion/webrtc/v4 v4.2.11 github.com/spf13/cobra v1.10.2 github.com/torrentclaw/go-client v0.2.0 golang.org/x/term v0.43.0 @@ -107,6 +106,7 @@ require ( github.com/pion/stun/v3 v3.1.1 // indirect github.com/pion/transport/v4 v4.0.1 // indirect github.com/pion/turn/v4 v4.1.4 // indirect + github.com/pion/webrtc/v4 v4.2.11 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/protolambda/ctxlock v0.1.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index 385454a..d0b1458 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -38,7 +38,7 @@ type Daemon struct { // Callbacks — set by cmd/daemon.go before calling Run. OnTasksClaimed func(tasks []Task) OnStreamRequested func(req StreamRequest) - OnWebRTCSession func(sess WebRTCSession) + OnStreamSession func(sess StreamSession) OnControlAction func(action, taskID string, deleteFiles bool) GetActiveCount func() int // returns number of active downloads (wired from manager) @@ -210,9 +210,9 @@ func (d *Daemon) Run(ctx context.Context) error { d.OnStreamRequested(req) } } - d.sync.OnWebRTCSession = func(sess WebRTCSession) { - if d.OnWebRTCSession != nil { - d.OnWebRTCSession(sess) + d.sync.OnStreamSession = func(sess StreamSession) { + if d.OnStreamSession != nil { + d.OnStreamSession(sess) } } d.sync.OnUpgrade = func(version string) { diff --git a/internal/agent/signal_client.go b/internal/agent/signal_client.go deleted file mode 100644 index 624dc6c..0000000 --- a/internal/agent/signal_client.go +++ /dev/null @@ -1,258 +0,0 @@ -package agent - -import ( - "bufio" - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "strings" - "time" -) - -// SignalRole identifies who produced a signalling message. The opposite role -// receives it. -type SignalRole string - -const ( - SignalRoleBrowser SignalRole = "browser" - SignalRoleAgent SignalRole = "agent" -) - -// SignalMessageType matches the server-side z.enum on -// /api/internal/stream/signal/[sessionId] route. -type SignalMessageType string - -const ( - SignalMsgOffer SignalMessageType = "offer" - SignalMsgAnswer SignalMessageType = "answer" - SignalMsgCandidate SignalMessageType = "candidate" - SignalMsgCandidateEnd SignalMessageType = "candidate-end" - SignalMsgBye SignalMessageType = "bye" -) - -// SignalMessage mirrors the bus envelope on the web side. -type SignalMessage struct { - From SignalRole `json:"from"` - Type SignalMessageType `json:"type"` - Payload string `json:"payload"` - TS int64 `json:"ts"` -} - -// PostSignal enqueues a signalling message produced by this agent. The -// browser receives it on its next SSE event push. -func (c *Client) PostSignal(ctx context.Context, sessionID string, msg SignalMessage) error { - body := map[string]any{ - "from": string(SignalRoleAgent), - "type": string(msg.Type), - "payload": msg.Payload, - } - path := fmt.Sprintf("/api/internal/stream/signal/%s", sessionID) - return c.doPost(ctx, path, body, &struct { - OK bool `json:"ok"` - }{}) -} - -// SignalEventStream wraps an open SSE connection. Read messages from Events() -// until the channel closes (server timeout or context cancel). Always defer -// Close() to release the underlying response body. -type SignalEventStream struct { - resp *http.Response - cancel context.CancelFunc - events chan SignalMessage - errs chan error - done chan struct{} -} - -// Events streams browser-produced messages addressed to the agent. -// The channel closes when the SSE connection ends; the caller should then -// call Close() and reopen if it wants to keep listening. -func (s *SignalEventStream) Events() <-chan SignalMessage { return s.events } - -// Err returns the terminating error (if any) once Events() has closed. -func (s *SignalEventStream) Err() error { - select { - case err := <-s.errs: - return err - default: - return nil - } -} - -// Close cancels the underlying HTTP request and waits for the reader goroutine -// to drain. Safe to call more than once. -func (s *SignalEventStream) Close() error { - if s.cancel != nil { - s.cancel() - } - if s.resp != nil { - s.resp.Body.Close() - } - <-s.done - return nil -} - -// OpenSignalStream opens a long-lived SSE connection to the signal events -// endpoint. Caller MUST cancel ctx (or call Close()) to free resources. -// -// The server caps each response at ~25 s; OpenSignalStream surfaces the -// disconnect by closing the events channel. Caller should reopen until the -// session ends. -func (c *Client) OpenSignalStream(ctx context.Context, sessionID string) (*SignalEventStream, error) { - streamCtx, cancel := context.WithCancel(ctx) - - url := fmt.Sprintf("%s/api/internal/stream/signal/%s/events", c.baseURL(), sessionID) - req, err := http.NewRequestWithContext(streamCtx, http.MethodGet, url, nil) - if err != nil { - cancel() - return nil, fmt.Errorf("open signal stream: %w", err) - } - req.Header.Set("Accept", "text/event-stream") - req.Header.Set("Authorization", "Bearer "+c.apiKey) - req.Header.Set("User-Agent", c.userAgent) - req.Header.Set("Cache-Control", "no-cache") - - // Use a per-call client with no timeout (SSE connections are long). - sseClient := &http.Client{} - resp, err := sseClient.Do(req) - if err != nil { - cancel() - return nil, fmt.Errorf("open signal stream: %w", err) - } - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) - resp.Body.Close() - cancel() - return nil, fmt.Errorf("open signal stream: HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) - } - - stream := &SignalEventStream{ - resp: resp, - cancel: cancel, - events: make(chan SignalMessage, 8), - errs: make(chan error, 1), - done: make(chan struct{}), - } - - go stream.read() - return stream, nil -} - -// sseMaxLineBytes caps the size of a single SSE line. Real signalling lines -// are JSON payloads of a few hundred bytes; 256 KiB is generous enough to -// survive a future schema bump but small enough that a hostile or buggy -// server cannot grow daemon memory by streaming a single line forever. -const sseMaxLineBytes = 256 * 1024 - -// sseMaxEventBytes caps the total bytes buffered across the lines of one -// SSE event. Without a cap, a peer could send unbounded `data:` continuation -// lines and OOM the daemon between blank-line dispatches. -const sseMaxEventBytes = 1024 * 1024 - -func (s *SignalEventStream) read() { - defer close(s.done) - defer close(s.events) - - scanner := bufio.NewScanner(s.resp.Body) - scanner.Buffer(make([]byte, 16*1024), sseMaxLineBytes) - - var dataBuf bytes.Buffer - var eventName string - - for scanner.Scan() { - line := strings.TrimRight(scanner.Text(), "\r") - if line == "" { - // End of an event — dispatch if we have data. - if dataBuf.Len() == 0 { - eventName = "" - continue - } - if eventName == "" || eventName == "signal" { - var msg SignalMessage - if err := json.Unmarshal(dataBuf.Bytes(), &msg); err == nil { - select { - case s.events <- msg: - case <-s.resp.Request.Context().Done(): - return - } - } - } - dataBuf.Reset() - eventName = "" - continue - } - if strings.HasPrefix(line, ":") { - // SSE comment (heartbeat); ignore. - continue - } - if strings.HasPrefix(line, "event:") { - eventName = strings.TrimSpace(line[len("event:"):]) - continue - } - if strings.HasPrefix(line, "data:") { - payload := strings.TrimSpace(line[len("data:"):]) - // Refuse to grow the event buffer past the cap. Reset so a - // well-formed event after the offender can still be parsed, - // and surface an error so SignalLoop reconnects. - if dataBuf.Len()+len(payload)+1 > sseMaxEventBytes { - dataBuf.Reset() - eventName = "" - select { - case s.errs <- fmt.Errorf("sse: event exceeded %d bytes", sseMaxEventBytes): - default: - } - return - } - if dataBuf.Len() > 0 { - dataBuf.WriteByte('\n') - } - dataBuf.WriteString(payload) - continue - } - // id:, retry:, anything else — ignore for now. - } - if err := scanner.Err(); err != nil { - select { - case s.errs <- err: - default: - } - } -} - -// SignalLoop runs an SSE consumer that reconnects automatically on disconnect. -// onMessage is called for every browser-produced message. Returns when ctx is -// cancelled. Reconnect backoff is fixed at 1 s — the server already paces -// reconnects with `retry: 1500` headers so churn is bounded. -func (c *Client) SignalLoop(ctx context.Context, sessionID string, onMessage func(SignalMessage)) error { - for ctx.Err() == nil { - stream, err := c.OpenSignalStream(ctx, sessionID) - if err != nil { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return ctx.Err() - } - continue - } - for msg := range stream.Events() { - onMessage(msg) - } - streamErr := stream.Err() - stream.Close() - if ctx.Err() != nil { - return ctx.Err() - } - // Server closes the SSE every ~25 s; reconnect immediately. - // Hard error → small backoff so we don't hammer. - if streamErr != nil { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return ctx.Err() - } - } - } - return ctx.Err() -} diff --git a/internal/agent/signal_client_test.go b/internal/agent/signal_client_test.go deleted file mode 100644 index 796b545..0000000 --- a/internal/agent/signal_client_test.go +++ /dev/null @@ -1,196 +0,0 @@ -package agent - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "strings" - "sync" - "testing" - "time" -) - -// fakeSSEServer streams a fixed set of SSE events then closes the connection. -func fakeSSEServer(t *testing.T, msgs []SignalMessage, holdOpenAfter bool) *httptest.Server { - t.Helper() - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Authorization") != "Bearer test-key" { - http.Error(w, "auth", http.StatusUnauthorized) - return - } - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - flusher, ok := w.(http.Flusher) - if !ok { - t.Fatal("server: ResponseWriter is not a Flusher") - } - fmt.Fprint(w, "retry: 1500\n\n") - flusher.Flush() - for _, m := range msgs { - data, _ := json.Marshal(m) - fmt.Fprintf(w, "id: %d\nevent: signal\ndata: %s\n\n", m.TS, data) - flusher.Flush() - } - // Send a heartbeat comment to verify it's ignored. - fmt.Fprint(w, ": heartbeat\n\n") - flusher.Flush() - if holdOpenAfter { - // Hold the connection until the client disconnects so the test can - // exercise stream.Close(). - <-r.Context().Done() - } - })) -} - -func TestSignalStreamReadsMessages(t *testing.T) { - want := []SignalMessage{ - {From: SignalRoleBrowser, Type: SignalMsgOffer, Payload: "{sdp:1}", TS: 1}, - {From: SignalRoleBrowser, Type: SignalMsgCandidate, Payload: "{cand:1}", TS: 2}, - } - srv := fakeSSEServer(t, want, false) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "test-ua") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - stream, err := c.OpenSignalStream(ctx, "session-1") - if err != nil { - t.Fatalf("open: %v", err) - } - defer stream.Close() - - var got []SignalMessage - for m := range stream.Events() { - got = append(got, m) - if len(got) == len(want) { - break - } - } - if len(got) != len(want) { - t.Fatalf("got %d messages, want %d", len(got), len(want)) - } - for i, m := range got { - if m.From != want[i].From || m.Type != want[i].Type || m.Payload != want[i].Payload { - t.Errorf("[%d] mismatch: %+v want %+v", i, m, want[i]) - } - } -} - -func TestSignalStreamPropagatesAuthError(t *testing.T) { - srv := fakeSSEServer(t, nil, false) - defer srv.Close() - - c := NewClient(srv.URL, "wrong-key", "test-ua") - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - _, err := c.OpenSignalStream(ctx, "session-1") - if err == nil { - t.Fatal("expected auth error, got nil") - } -} - -func TestSignalStreamCloseCancelsRead(t *testing.T) { - srv := fakeSSEServer(t, nil, true) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "test-ua") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - stream, err := c.OpenSignalStream(ctx, "session-1") - if err != nil { - t.Fatalf("open: %v", err) - } - - // Close on a separate goroutine then make sure the events channel drains. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(50 * time.Millisecond) - stream.Close() - }() - - for range stream.Events() { - // drain - } - wg.Wait() -} - -// TestSignalStreamRejectsOversizedEvent verifies that a hostile or buggy -// server sending an unbounded `data:` event surfaces an error and stops -// the reader instead of growing daemon memory forever. -func TestSignalStreamRejectsOversizedEvent(t *testing.T) { - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Authorization") != "Bearer test-key" { - http.Error(w, "auth", http.StatusUnauthorized) - return - } - w.Header().Set("Content-Type", "text/event-stream") - flusher := w.(http.Flusher) - // Send many data: continuation lines until we blow past the - // per-event cap. Each chunk is a short legitimate-looking line. - chunk := "data: " + strings.Repeat("x", 4096) + "\n" - fmt.Fprint(w, "event: signal\n") - for i := 0; i < (sseMaxEventBytes/4096)+8; i++ { - fmt.Fprint(w, chunk) - } - flusher.Flush() - <-r.Context().Done() - })) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "test-ua") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - stream, err := c.OpenSignalStream(ctx, "session-overflow") - if err != nil { - t.Fatalf("open: %v", err) - } - defer stream.Close() - - for range stream.Events() { - // Should never receive a parsed event — the over-sized buffer must - // be rejected before dispatch. - } - if err := stream.Err(); err == nil { - t.Fatal("expected error from oversized event, got nil") - } -} - -func TestPostSignalSendsCorrectBody(t *testing.T) { - var bodySeen map[string]any - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Authorization") != "Bearer test-key" { - http.Error(w, "auth", http.StatusUnauthorized) - return - } - _ = json.NewDecoder(r.Body).Decode(&bodySeen) - w.Header().Set("Content-Type", "application/json") - fmt.Fprint(w, `{"ok":true}`) - })) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "test-ua") - err := c.PostSignal(context.Background(), "sess-x", SignalMessage{ - Type: SignalMsgAnswer, - Payload: "{sdp:answer}", - }) - if err != nil { - t.Fatalf("post: %v", err) - } - if bodySeen["from"] != string(SignalRoleAgent) { - t.Errorf("expected from=agent, got %v", bodySeen["from"]) - } - if bodySeen["type"] != string(SignalMsgAnswer) { - t.Errorf("expected type=answer, got %v", bodySeen["type"]) - } - if bodySeen["payload"] != "{sdp:answer}" { - t.Errorf("expected payload mismatch, got %v", bodySeen["payload"]) - } -} diff --git a/internal/agent/sync.go b/internal/agent/sync.go index 9847aba..c28c65f 100644 --- a/internal/agent/sync.go +++ b/internal/agent/sync.go @@ -29,7 +29,7 @@ type SyncClient struct { OnNewTasks func(tasks []Task) OnControl func(action, taskID string, deleteFiles bool) OnStreamRequest func(req StreamRequest) - OnWebRTCSession func(sess WebRTCSession) + OnStreamSession func(sess StreamSession) OnUpgrade func(version string) OnScan func() OnWatchingChange func(watching bool) @@ -199,10 +199,10 @@ func (sc *SyncClient) processResponse(resp *SyncResponse) { } } - // WebRTC streaming sessions - for _, ws := range resp.WebRTCSessions { - if sc.OnWebRTCSession != nil { - sc.OnWebRTCSession(ws) + // HLS streaming sessions. + for _, ws := range resp.StreamSessions { + if sc.OnStreamSession != nil { + sc.OnStreamSession(ws) } } diff --git a/internal/agent/types.go b/internal/agent/types.go index 8e0094a..72e8af5 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -374,29 +374,22 @@ type LibraryDeleteRequest struct { FilePath string `json:"filePath"` } -// WebRTCSession is a request to open a streaming session for a browser -// player. Transport selects the on-the-wire protocol: empty/"webrtc" runs the -// legacy custom WebRTC DataChannel pipeline; "hls" spawns an HLS session -// (ffmpeg producing fragmented MP4 served over HTTP). The CLI must POST an -// SDP answer to /api/internal/stream/signal/ for WebRTC sessions -// and register the HLS session in the StreamServer's HLS registry for HLS -// sessions; either way the source bytes come from FilePath (or, when only -// InfoHash is set, from a download_task on disk). -type WebRTCSession struct { - SessionID string `json:"sessionId"` - // Transport selects the streaming protocol. "" or "webrtc" → legacy - // WebRTC + MSE pipeline (Phase 1). "hls" → HLS over HTTP (Phase 2). - Transport string `json:"transport,omitempty"` - FilePath string `json:"filePath,omitempty"` - InfoHash string `json:"infoHash,omitempty"` - TaskID string `json:"taskId,omitempty"` - FileName string `json:"fileName,omitempty"` - FileSize int64 `json:"fileSize,omitempty"` +// StreamSession is a request to open an HLS streaming session for an +// in-browser player. The CLI registers the HLS session in the StreamServer's +// HLS registry; source bytes come from FilePath (or, when only InfoHash is +// set, from a download_task on disk). +type StreamSession struct { + SessionID string `json:"sessionId"` + FilePath string `json:"filePath,omitempty"` + InfoHash string `json:"infoHash,omitempty"` + TaskID string `json:"taskId,omitempty"` + FileName string `json:"fileName,omitempty"` + FileSize int64 `json:"fileSize,omitempty"` // Quality target the daemon should aim for when transcoding. One of // "2160p" | "1080p" | "720p" | "480p" | "original" | "" (defer to config). Quality string `json:"quality,omitempty"` // AudioIndex selects the source audio track (-map 0:a:N). -1 means - // "use the default/first track" (HLS) or ignored (WebRTC). + // "use the default/first track". AudioIndex int `json:"audioIndex,omitempty"` } @@ -405,7 +398,7 @@ type SyncResponse struct { NewTasks []Task `json:"newTasks,omitempty"` Controls []ControlAction `json:"controls,omitempty"` StreamRequests []StreamRequest `json:"streamRequests,omitempty"` - WebRTCSessions []WebRTCSession `json:"webrtcSessions,omitempty"` + StreamSessions []StreamSession `json:"streamSessions,omitempty"` Watching bool `json:"watching"` Upgrade *UpgradeSignal `json:"upgrade,omitempty"` Scan bool `json:"scan,omitempty"` diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 54759b2..84c458c 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -255,9 +255,6 @@ func runDaemonStart() error { MaxUploadRate: maxUl, ListenPort: cfg.Download.ListenPort, SeedEnabled: false, - WebRTCEnabled: cfg.Download.WebRTC.Enabled, - WebRTCTrackers: cfg.Download.WebRTC.Trackers, - ICEServers: engine.BuildICEServers(cfg.Download.WebRTC), VPNTunnel: vpnTunnel, }) if err != nil { @@ -330,13 +327,7 @@ func runDaemonStart() error { // Wire: sync receives new tasks → submit to manager or handle stream d.OnTasksClaimed = func(tasks []agent.Task) { for _, t := range tasks { - if t.Mode == "seed_file" { - // Browser asked us to wrap an arbitrary on-disk file as - // a single-file torrent + seed it via WebRTC. Runs in - // its own goroutine so a slow / failing seed can't - // stall the rest of the claim batch. - go handleSeedFileTask(t, torrentDl, agentClient) - } else if t.Mode == "stream" { + if t.Mode == "stream" { if isStreamingTask(t.ID) { continue } @@ -497,23 +488,23 @@ func runDaemonStart() error { }() } - // Wire: sync receives custom WebRTC streaming session requests. - // Each session is a one-shot browser↔daemon DataChannel. Validate the - // FilePath against allowed dirs to prevent path traversal abuse from a - // compromised server, then spawn the pion peer in its own goroutine. - d.OnWebRTCSession = func(sess agent.WebRTCSession) { - if webrtcRegistry.has(sess.SessionID) { + // Wire: sync receives HLS streaming session requests. Each session spawns + // one ffmpeg process and registers its HLS playlist with the StreamServer. + // Validate FilePath against allowed dirs to prevent path traversal abuse + // from a compromised server. + d.OnStreamSession = func(sess agent.StreamSession) { + if playerSessionRegistry.has(sess.SessionID) { return // already running } filePath := sess.FilePath if filePath == "" { - log.Printf("webrtc session %s rejected: empty file path", agent.ShortID(sess.SessionID)) + log.Printf("[hls %s] rejected: empty file path", agent.ShortID(sess.SessionID)) return } filePath = filepath.Clean(filePath) if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) { - log.Printf("webrtc session %s rejected: path outside allowed dirs: %s", + log.Printf("[hls %s] rejected: path outside allowed dirs: %s", agent.ShortID(sess.SessionID), filePath) return } @@ -521,75 +512,36 @@ func runDaemonStart() error { if info, err := os.Stat(filePath); err == nil && info.IsDir() { found := engine.FindVideoFile(filePath) if found == "" { - log.Printf("webrtc session %s rejected: no video file in dir %s", + log.Printf("[hls %s] rejected: no video file in dir %s", agent.ShortID(sess.SessionID), filePath) return } filePath = found } - // Branch on transport: HLS sessions only need ffmpeg + StreamServer, - // not a WebRTC peer, so they must bypass the WebRTC.Enabled gate. - // Default ("" or "webrtc") runs the DataChannel pipeline and requires it. - if strings.EqualFold(sess.Transport, "hls") { - tcRuntime := buildTranscodeRuntime(ctx, cfg) - if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { - log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID)) - return - } - hlsCtx, hlsCancel := context.WithCancel(ctx) - webrtcRegistry.add(sess.SessionID, hlsCancel) - hlsCfg := engine.HLSSessionConfig{ - SessionID: sess.SessionID, - SourcePath: filePath, - FileName: sess.FileName, - Quality: sess.Quality, - AudioIndex: sess.AudioIndex, - Transcode: tcRuntime, - } - hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) - if err != nil { - webrtcRegistry.remove(sess.SessionID) - hlsCancel() - log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err) - return - } - streamSrv.HLS().Register(hsess) + tcRuntime := buildTranscodeRuntime(ctx, cfg) + if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { + log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID)) return } - - // Non-HLS transport requires WebRTC peer support. - if !cfg.Download.WebRTC.Enabled { - log.Printf("webrtc session %s rejected: webrtc disabled in config", agent.ShortID(sess.SessionID)) + hlsCtx, hlsCancel := context.WithCancel(ctx) + playerSessionRegistry.add(sess.SessionID, hlsCancel) + hlsCfg := engine.HLSSessionConfig{ + SessionID: sess.SessionID, + SourcePath: filePath, + FileName: sess.FileName, + Quality: sess.Quality, + AudioIndex: sess.AudioIndex, + Transcode: tcRuntime, + } + hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) + if err != nil { + playerSessionRegistry.remove(sess.SessionID) + hlsCancel() + log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err) return } - - sessCtx, sessCancel := context.WithCancel(ctx) //nolint:gosec // G118 cancel stored in registry - webrtcRegistry.add(sess.SessionID, sessCancel) - go func() { - defer func() { - webrtcRegistry.remove(sess.SessionID) - sessCancel() - }() - tcRuntime := buildTranscodeRuntime(ctx, cfg) - runCfg := engine.WebRTCStreamConfig{ - SessionID: sess.SessionID, - FilePath: filePath, - FileName: sess.FileName, - FileSize: sess.FileSize, - Quality: sess.Quality, - ICEServers: engine.BuildICEServers(cfg.Download.WebRTC), - Signal: agentClient, - Logger: stdLogger{}, - Transcode: tcRuntime, - } - log.Printf("[wrtc %s] starting session: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath)) - if err := engine.RunWebRTCStream(sessCtx, runCfg); err != nil { - if sessCtx.Err() == nil { - log.Printf("[wrtc %s] ended: %v", agent.ShortID(sess.SessionID), err) - } - } - }() + streamSrv.HLS().Register(hsess) } // Periodic DHT node persistence (every 5 min) @@ -658,7 +610,7 @@ func runDaemonStart() error { case sig := <-sigCh: fmt.Printf("\n Received %s, shutting down...\n", sig) cancelStreamContexts() - cancelAllWebRTCSessions() + cancelAllPlayerSessions() streamSrv.Shutdown(context.Background()) cancel() @@ -673,7 +625,7 @@ func runDaemonStart() error { case err := <-errCh: cancelStreamContexts() - cancelAllWebRTCSessions() + cancelAllPlayerSessions() streamSrv.Shutdown(context.Background()) cancel() return err diff --git a/internal/cmd/download.go b/internal/cmd/download.go index 5189166..bd5ceab 100644 --- a/internal/cmd/download.go +++ b/internal/cmd/download.go @@ -114,9 +114,6 @@ func runDownloadWithDeps(input, method string, deps downloadDeps) error { StallTimeout: 10 * time.Minute, MaxTimeout: 0, // unlimited SeedEnabled: false, - WebRTCEnabled: cfg.Download.WebRTC.Enabled, - WebRTCTrackers: cfg.Download.WebRTC.Trackers, - ICEServers: engine.BuildICEServers(cfg.Download.WebRTC), }) if err != nil { return fmt.Errorf("create downloader: %w", err) diff --git a/internal/cmd/webrtc_session_registry.go b/internal/cmd/player_session_registry.go similarity index 51% rename from internal/cmd/webrtc_session_registry.go rename to internal/cmd/player_session_registry.go index a1bf37a..bb3743b 100644 --- a/internal/cmd/webrtc_session_registry.go +++ b/internal/cmd/player_session_registry.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "log" "sync" "github.com/torrentclaw/unarr/internal/config" @@ -10,66 +9,57 @@ import ( "github.com/torrentclaw/unarr/internal/library/mediainfo" ) -// webrtcRegistry tracks per-session cancel funcs for active custom WebRTC -// streams (engine.RunWebRTCStream goroutines). Each session lives only as -// long as its DataChannel; the registry exists so duplicate sync responses -// don't double-spawn the same session and so daemon shutdown can drain. -var webrtcRegistry = &webrtcSessionRegistry{ +// playerSessionRegistry tracks per-session cancel funcs for active in-browser +// HLS streaming sessions. Each session lives only as long as its ffmpeg +// process; the registry exists so duplicate sync responses don't double-spawn +// the same session and so daemon shutdown can drain. +var playerSessionRegistry = &playerSessionRegistryT{ cancels: make(map[string]context.CancelFunc), } -type webrtcSessionRegistry struct { +type playerSessionRegistryT struct { mu sync.Mutex cancels map[string]context.CancelFunc } -func (r *webrtcSessionRegistry) has(sessionID string) bool { +func (r *playerSessionRegistryT) has(sessionID string) bool { r.mu.Lock() defer r.mu.Unlock() _, ok := r.cancels[sessionID] return ok } -func (r *webrtcSessionRegistry) add(sessionID string, cancel context.CancelFunc) { +func (r *playerSessionRegistryT) add(sessionID string, cancel context.CancelFunc) { r.mu.Lock() defer r.mu.Unlock() r.cancels[sessionID] = cancel } -func (r *webrtcSessionRegistry) remove(sessionID string) { +func (r *playerSessionRegistryT) remove(sessionID string) { r.mu.Lock() defer r.mu.Unlock() delete(r.cancels, sessionID) } -// cancelAllWebRTCSessions cancels every running session. Called on daemon -// shutdown so pion peers and SSE consumers exit cleanly. -func cancelAllWebRTCSessions() { - webrtcRegistry.mu.Lock() - cancels := make([]context.CancelFunc, 0, len(webrtcRegistry.cancels)) - for _, c := range webrtcRegistry.cancels { +// cancelAllPlayerSessions cancels every running session. Called on daemon +// shutdown so the ffmpeg children and SSE consumers exit cleanly. +func cancelAllPlayerSessions() { + playerSessionRegistry.mu.Lock() + cancels := make([]context.CancelFunc, 0, len(playerSessionRegistry.cancels)) + for _, c := range playerSessionRegistry.cancels { cancels = append(cancels, c) } - webrtcRegistry.cancels = make(map[string]context.CancelFunc) - webrtcRegistry.mu.Unlock() + playerSessionRegistry.cancels = make(map[string]context.CancelFunc) + playerSessionRegistry.mu.Unlock() for _, c := range cancels { c() } } -// stdLogger is a tiny adapter so engine.RunWebRTCStream can log through the -// standard library logger without pulling in a logging dependency. -type stdLogger struct{} - -func (stdLogger) Infof(format string, args ...any) { log.Printf(format, args...) } -func (stdLogger) Warnf(format string, args ...any) { log.Printf("WARN: "+format, args...) } -func (stdLogger) Errorf(format string, args ...any) { log.Printf("ERROR: "+format, args...) } - // buildTranscodeRuntime resolves the ffmpeg/ffprobe binaries + config knobs -// for the WebRTC streaming pipeline. Failure to resolve a binary returns a -// runtime with empty paths so engine.RunWebRTCStream falls back to -// passthrough — the user gets a clearer codec error from the browser than a -// daemon-side abort. +// for the HLS streaming pipeline. Failure to resolve a binary returns a +// runtime with empty paths so the caller can short-circuit instead of +// launching a transcoder that will immediately fail. func buildTranscodeRuntime(ctx context.Context, cfg config.Config) engine.TranscodeRuntime { if !cfg.Download.Transcode.Enabled { return engine.TranscodeRuntime{Disabled: true} diff --git a/internal/cmd/probe_hwaccel.go b/internal/cmd/probe_hwaccel.go index f7ed1c1..609a443 100644 --- a/internal/cmd/probe_hwaccel.go +++ b/internal/cmd/probe_hwaccel.go @@ -15,7 +15,7 @@ import ( ) // newProbeHWAccelCmd reports the hardware-acceleration capabilities the daemon -// would actually use for HLS/WebRTC transcoding. The motivation: a beefy host +// would actually use for HLS transcoding. The motivation: a beefy host // (e.g. RTX 3090) can still fall back to software encoding when the installed // ffmpeg binary was built without nvenc/qsv/vaapi support — Homebrew ffmpeg // is a common offender. Without this command, users see slow / failing 4K diff --git a/internal/cmd/seed_file_handler.go b/internal/cmd/seed_file_handler.go deleted file mode 100644 index fe2438a..0000000 --- a/internal/cmd/seed_file_handler.go +++ /dev/null @@ -1,65 +0,0 @@ -package cmd - -import ( - "context" - "log" - "time" - - "github.com/torrentclaw/unarr/internal/agent" - "github.com/torrentclaw/unarr/internal/engine" -) - -// handleSeedFileTask wraps an arbitrary on-disk file as a single-file -// torrent and adds it to the existing torrent client so the WebRTC -// peer can serve pieces to a browser. Reports the generated info_hash -// back to the server so the web player can target /stream/. -// -// Runs in its own goroutine; never blocks the claim batch. -func handleSeedFileTask(t agent.Task, dl *engine.TorrentDownloader, client *agent.Client) { - short := agent.ShortID(t.ID) - - if t.FilePath == "" { - log.Printf("[%s] seed_file: missing filePath, marking failed", short) - reportSeedFileFailed(client, t.ID, "Missing filePath") - return - } - - log.Printf("[%s] seed_file: building torrent from %s", short, t.FilePath) - hash, err := engine.SeedFileOnDownloader(dl, t.FilePath) - if err != nil { - log.Printf("[%s] seed_file: %v", short, err) - reportSeedFileFailed(client, t.ID, err.Error()) - return - } - - infoHash := hash.HexString() - log.Printf("[%s] seed_file: seeding ih=%s", short, infoHash) - - // Push the info_hash + downloading status (file is on disk; from the - // client's perspective it's already complete). The web side polls - // /api/internal/stream/seed-file/ waiting for this update. - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - _, reportErr := client.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: t.ID, - Status: "downloading", // semantic: actively serving - InfoHash: infoHash, - FilePath: t.FilePath, - }) - if reportErr != nil { - log.Printf("[%s] seed_file: failed to push info_hash: %v", short, reportErr) - } -} - -func reportSeedFileFailed(client *agent.Client, taskID, msg string) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _, err := client.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: taskID, - Status: "failed", - ErrorMessage: msg, - }) - if err != nil { - log.Printf("[%s] seed_file: report-failed itself failed: %v", agent.ShortID(taskID), err) - } -} diff --git a/internal/cmd/version.go b/internal/cmd/version.go index e03063b..4cd7a03 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.9.3" +var Version = "0.9.4" diff --git a/internal/config/config.go b/internal/config/config.go index 9f46b53..6e65df8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -51,7 +51,6 @@ type DownloadConfig struct { StreamPort int `toml:"stream_port"` // fixed port for streaming HTTP server (default: 11818) EnableUPnP bool `toml:"enable_upnp"` // map StreamPort to the WAN via UPnP/NAT-PMP (default: false; opt-in because it exposes the unauthenticated /stream + /hls endpoints to the public internet) CORSExtraOrigins []string `toml:"cors_extra_origins"` // extra browser origins added on top of the baked-in allowlist (torrentclaw.com, app.torrentclaw.com, localhost:3030) - WebRTC WebRTCConfig `toml:"webrtc"` Transcode TranscodeConfig `toml:"transcode"` VPN VPNConfig `toml:"vpn"` } @@ -84,19 +83,6 @@ type TranscodeConfig struct { MaxConcurrent int `toml:"max_concurrent"` // safety cap on simultaneous transcoder processes } -// WebRTCConfig opts the daemon into acting as a WebTorrent peer so browsers -// can fetch pieces via WebRTC data channels — required by the in-browser -// player on torrentclaw.com. Disabled by default; enabling implies upload -// is allowed for active torrents (browsers can't download otherwise). -type WebRTCConfig struct { - Enabled bool `toml:"enabled"` // master switch - Trackers []string `toml:"trackers"` // wss:// signaling trackers - STUNServers []string `toml:"stun_servers"` // stun:host:port - TURNServers []string `toml:"turn_servers"` // turn:host:port (no auth) — see TURNCredentials for authed - TURNUser string `toml:"turn_user"` // optional, applied to all TURNServers - TURNPass string `toml:"turn_pass"` // optional -} - type OrganizeConfig struct { Enabled bool `toml:"enabled"` MoviesDir string `toml:"movies_dir"` @@ -121,7 +107,7 @@ type LibraryConfig struct { ScanPath string `toml:"scan_path"` // remembered from last scan Workers int `toml:"workers"` // concurrent ffprobe (default 8) FFprobePath string `toml:"ffprobe_path"` // optional explicit path - FFmpegPath string `toml:"ffmpeg_path"` // optional explicit path (used by WebRTC streaming transcoder) + FFmpegPath string `toml:"ffmpeg_path"` // optional explicit path (used by the HLS streaming transcoder) BackupDir string `toml:"backup_dir"` // for replaced files AutoScan bool `toml:"auto_scan"` // enable daily auto-scan in daemon (default true) ScanInterval string `toml:"scan_interval"` // e.g. "24h", "12h", "6h" (default "24h") @@ -146,11 +132,6 @@ func Default() Config { PreferredMethod: "auto", MaxConcurrent: 3, StreamPort: 11818, - WebRTC: WebRTCConfig{ - Enabled: true, - Trackers: []string{"wss://tracker.torrentclaw.com"}, - STUNServers: []string{"stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"}, - }, Transcode: TranscodeConfig{ Enabled: true, HWAccel: "auto", @@ -231,19 +212,6 @@ func applyDefaults(cfg *Config, meta toml.MetaData) { cfg.General.Country = "US" } - if !meta.IsDefined("downloads", "webrtc", "enabled") { - cfg.Download.WebRTC.Enabled = true - } - if !meta.IsDefined("downloads", "webrtc", "trackers") { - cfg.Download.WebRTC.Trackers = []string{"wss://tracker.torrentclaw.com"} - } - if !meta.IsDefined("downloads", "webrtc", "stun_servers") { - cfg.Download.WebRTC.STUNServers = []string{ - "stun:stun.l.google.com:19302", - "stun:stun1.l.google.com:19302", - } - } - if !meta.IsDefined("downloads", "transcode", "enabled") { cfg.Download.Transcode.Enabled = true } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 02fcdc4..8097395 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -208,17 +208,6 @@ name = "Test" t.Fatalf("Load failed: %v", err) } - // WebRTC should be on by default for fresh installs. - if !cfg.Download.WebRTC.Enabled { - t.Error("WebRTC.Enabled should default to true when [downloads.webrtc] is absent") - } - if len(cfg.Download.WebRTC.Trackers) == 0 { - t.Error("WebRTC.Trackers should default to torrentclaw tracker when absent") - } - if len(cfg.Download.WebRTC.STUNServers) == 0 { - t.Error("WebRTC.STUNServers should default to public STUN list when absent") - } - // Transcode should be on by default. if !cfg.Download.Transcode.Enabled { t.Error("Transcode.Enabled should default to true when [downloads.transcode] is absent") @@ -238,12 +227,9 @@ func TestLoadRespectsExplicitlyDisabledStreaming(t *testing.T) { tmp := t.TempDir() path := filepath.Join(tmp, "config.toml") - // User explicitly opted out of webrtc + transcode. Defaults must NOT - // override them — that would silently re-enable features the user disabled. - os.WriteFile(path, []byte(`[downloads.webrtc] -enabled = false - -[downloads.transcode] + // User explicitly opted out of transcode. Defaults must NOT override + // it — that would silently re-enable a feature the user disabled. + os.WriteFile(path, []byte(`[downloads.transcode] enabled = false `), 0o644) @@ -252,9 +238,6 @@ enabled = false t.Fatalf("Load failed: %v", err) } - if cfg.Download.WebRTC.Enabled { - t.Error("WebRTC.Enabled = true, want false (user explicitly disabled)") - } if cfg.Download.Transcode.Enabled { t.Error("Transcode.Enabled = true, want false (user explicitly disabled)") } diff --git a/internal/engine/hls.go b/internal/engine/hls.go index cc0b442..9524627 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -3,9 +3,7 @@ // Browser ↔ daemon over plain HTTP (LAN / Tailscale / UPnP). The daemon runs // ffmpeg in `-f hls` mode, writing fragmented MP4 segments to a per-session // tmpdir. Master + media playlists are pre-rendered from the probed source -// duration so the player knows the full timeline before any segment exists, -// which fixes the seek/duration/pause/multi-track problems we hit with the -// raw fMP4-over-WebRTC pipeline. +// duration so the player knows the full timeline before any segment exists. // // One HLSSession == one browser playback. Sessions are registered in a // process-wide map keyed by session ID; the StreamServer routes diff --git a/internal/engine/probe.go b/internal/engine/probe.go index 39ff374..930b669 100644 --- a/internal/engine/probe.go +++ b/internal/engine/probe.go @@ -9,7 +9,7 @@ import ( ) // StreamProbe summarises the codec / container shape of a file as it relates -// to the WebRTC streaming pipeline. It tells the transcoder whether bytes can +// to the HLS streaming pipeline. It tells the transcoder whether bytes can // be streamed as-is, just remuxed to fragmented MP4, or fully transcoded. type StreamProbe struct { // VideoCodec lowercased — e.g. "h264", "hevc", "av1", "vp9", "mpeg4". diff --git a/internal/engine/seed_file.go b/internal/engine/seed_file.go deleted file mode 100644 index 7d9a046..0000000 --- a/internal/engine/seed_file.go +++ /dev/null @@ -1,138 +0,0 @@ -package engine - -import ( - "errors" - "fmt" - "os" - "path/filepath" - "time" - - "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/metainfo" -) - -// SeedFile builds a single-file torrent from an arbitrary on-disk file -// and adds it to an existing torrent client so the WebRTC peer wire -// (already configured on the client) can serve the file to a browser -// that knows the resulting info-hash. -// -// Returns the generated info-hash. The torrent is left attached to the -// client — caller is responsible for keeping it alive while a browser -// is watching. Drop it via Client.RemoveTorrent / Torrent.Drop when -// idle to free resources. -// -// Behaviour notes: -// - The file must already exist; no download is attempted. -// - Piece length follows the libtorrent ladder (16 KiB → 4 MiB). -// - The torrent is "complete" from the agent's POV — it has every -// piece — so the upload-only flow kicks in immediately. -// - WebRTC peer behaviour comes from the client config the caller -// constructed; SeedFile does not toggle DisableWebtorrent itself. -// If the operator's [downloads.webrtc].enabled = false, the file -// is still added but no browser will discover it via WSS tracker. -func SeedFile(client *torrent.Client, filePath string, trackerURLs []string) (metainfo.Hash, error) { - if client == nil { - return metainfo.Hash{}, errors.New("seed_file: torrent client is nil") - } - if filePath == "" { - return metainfo.Hash{}, errors.New("seed_file: filePath is empty") - } - - abs, err := filepath.Abs(filePath) - if err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: resolve path: %w", err) - } - st, err := os.Stat(abs) - if err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: stat: %w", err) - } - if st.IsDir() { - return metainfo.Hash{}, fmt.Errorf("seed_file: only single files are supported, %s is a directory", abs) - } - - info := metainfo.Info{ - PieceLength: chooseSeedPieceLength(st.Size()), - Name: filepath.Base(abs), - } - if err := info.BuildFromFilePath(abs); err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: build info: %w", err) - } - infoBytes, err := bencode.Marshal(info) - if err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: marshal info: %w", err) - } - - mi := &metainfo.MetaInfo{ - InfoBytes: infoBytes, - AnnounceList: makeAnnounceList(trackerURLs), - CreatedBy: "unarr-seed-file", - CreationDate: time.Now().Unix(), - } - ih := mi.HashInfoBytes() - - t, err := client.AddTorrent(mi) - if err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: add torrent: %w", err) - } - // Mark every piece as needed so the client treats us as a complete - // seeder right away — anacrolix's verifier will hash the file - // asynchronously and flip pieces to "have" as it goes. - t.DownloadAll() - - return ih, nil -} - -// makeAnnounceList shapes the tracker URL slice into the bencoded -// AnnounceList format anacrolix expects. -func makeAnnounceList(urls []string) metainfo.AnnounceList { - if len(urls) == 0 { - return nil - } - tier := make([]string, 0, len(urls)) - for _, u := range urls { - if u == "" { - continue - } - tier = append(tier, u) - } - if len(tier) == 0 { - return nil - } - return metainfo.AnnounceList{tier} -} - -// chooseSeedPieceLength picks the piece size for a single-file torrent -// based on the libtorrent / qBittorrent ladder. Mirrored from the -// wstracker-probe seeder so generated torrents are interoperable. -func chooseSeedPieceLength(size int64) int64 { - switch { - case size < 4*1024*1024: - return 16 * 1024 - case size < 64*1024*1024: - return 64 * 1024 - case size < 512*1024*1024: - return 256 * 1024 - case size < 4*1024*1024*1024: - return 1024 * 1024 - default: - return 4 * 1024 * 1024 - } -} - -// SeedFileOnDownloader is a convenience wrapper that pulls the -// underlying anacrolix client out of a TorrentDownloader and forwards -// to SeedFile. trackerURLs default to the downloader's WebRTC -// trackers when nil/empty. -func SeedFileOnDownloader(d *TorrentDownloader, filePath string) (metainfo.Hash, error) { - if d == nil { - return metainfo.Hash{}, errors.New("seed_file: downloader is nil") - } - trackers := d.cfg.WebRTCTrackers - if !d.cfg.WebRTCEnabled { - // We could still build the torrent, but no browser would find - // it via the WSS tracker — bail loud so the operator notices. - return metainfo.Hash{}, errors.New("seed_file: WebRTC peer disabled in config; set [downloads.webrtc].enabled = true to use this feature") - } - return SeedFile(d.client, filePath, trackers) -} diff --git a/internal/engine/seed_file_test.go b/internal/engine/seed_file_test.go deleted file mode 100644 index 1c0f616..0000000 --- a/internal/engine/seed_file_test.go +++ /dev/null @@ -1,164 +0,0 @@ -package engine - -import ( - "context" - "os" - "path/filepath" - "testing" -) - -// TestSeedFile_RejectsMissingFile — explicit error rather than crashing -// inside anacrolix when the path doesn't exist. -func TestSeedFile_RejectsMissingFile(t *testing.T) { - dir := t.TempDir() - dl, err := NewTorrentDownloader(TorrentConfig{ - DataDir: dir, - ListenPort: 0, - WebRTCEnabled: true, - WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, - }) - if err != nil { - t.Fatalf("NewTorrentDownloader: %v", err) - } - defer dl.Shutdown(context.Background()) - - if _, err := SeedFile(dl.client, "/nonexistent/path", nil); err == nil { - t.Fatal("expected error for missing file") - } -} - -// TestSeedFile_RejectsDirectory — single-file torrents only for now. -func TestSeedFile_RejectsDirectory(t *testing.T) { - dir := t.TempDir() - dl, err := NewTorrentDownloader(TorrentConfig{ - DataDir: dir, - ListenPort: 0, - WebRTCEnabled: true, - WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, - }) - if err != nil { - t.Fatalf("NewTorrentDownloader: %v", err) - } - defer dl.Shutdown(context.Background()) - - subDir := filepath.Join(dir, "sub") - if err := os.Mkdir(subDir, 0o755); err != nil { - t.Fatalf("mkdir: %v", err) - } - - if _, err := SeedFile(dl.client, subDir, nil); err == nil { - t.Fatal("expected error for directory path") - } -} - -// TestSeedFile_BuildsDeterministicInfoHash — the same file should yield -// the same info_hash on every call so the web client can poll for it. -func TestSeedFile_BuildsDeterministicInfoHash(t *testing.T) { - dir := t.TempDir() - file := filepath.Join(dir, "data.bin") - payload := []byte("hello world — torrentclaw seed_file test") - if err := os.WriteFile(file, payload, 0o644); err != nil { - t.Fatalf("write file: %v", err) - } - - mkClient := func() *TorrentDownloader { - dl, err := NewTorrentDownloader(TorrentConfig{ - DataDir: t.TempDir(), - ListenPort: 0, - WebRTCEnabled: true, - WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, - }) - if err != nil { - t.Fatalf("NewTorrentDownloader: %v", err) - } - return dl - } - - dl1 := mkClient() - defer dl1.Shutdown(context.Background()) - hash1, err := SeedFile(dl1.client, file, []string{"wss://tracker.torrentclaw.com"}) - if err != nil { - t.Fatalf("first SeedFile: %v", err) - } - - dl2 := mkClient() - defer dl2.Shutdown(context.Background()) - hash2, err := SeedFile(dl2.client, file, []string{"wss://tracker.torrentclaw.com"}) - if err != nil { - t.Fatalf("second SeedFile: %v", err) - } - - if hash1 != hash2 { - t.Fatalf("info_hash not deterministic: %s vs %s", hash1.HexString(), hash2.HexString()) - } - if hash1.HexString() == "" || len(hash1.HexString()) != 40 { - t.Fatalf("info_hash is not 40 hex chars: %q", hash1.HexString()) - } -} - -// TestSeedFileOnDownloader_RequiresWebRTC — silent failure mode is the -// worst UX; bail loud when the operator hasn't opted into WebRTC. -func TestSeedFileOnDownloader_RequiresWebRTC(t *testing.T) { - dir := t.TempDir() - dl, err := NewTorrentDownloader(TorrentConfig{ - DataDir: dir, - ListenPort: 0, - WebRTCEnabled: false, - }) - if err != nil { - t.Fatalf("NewTorrentDownloader: %v", err) - } - defer dl.Shutdown(context.Background()) - - file := filepath.Join(dir, "data.bin") - if err := os.WriteFile(file, []byte("x"), 0o644); err != nil { - t.Fatalf("write file: %v", err) - } - - if _, err := SeedFileOnDownloader(dl, file); err == nil { - t.Fatal("expected error when WebRTC disabled") - } -} - -// TestChooseSeedPieceLength_LadderShape — sanity-check the breakpoints -// stay aligned with the libtorrent reference (16 KiB → 4 MiB). -func TestChooseSeedPieceLength_LadderShape(t *testing.T) { - cases := []struct { - size int64 - expect int64 - }{ - {1, 16 * 1024}, - {4 * 1024 * 1024, 64 * 1024}, - {64 * 1024 * 1024, 256 * 1024}, - {512 * 1024 * 1024, 1024 * 1024}, - {4 * 1024 * 1024 * 1024, 4 * 1024 * 1024}, - } - for _, c := range cases { - if got := chooseSeedPieceLength(c.size); got != c.expect { - t.Errorf("chooseSeedPieceLength(%d) = %d want %d", c.size, got, c.expect) - } - } -} - -// TestMakeAnnounceList_HandlesEmpty — nil/empty in → nil out, so -// AddTorrent doesn't see a dangling tier with no URLs. -func TestMakeAnnounceList_HandlesEmpty(t *testing.T) { - if got := makeAnnounceList(nil); got != nil { - t.Errorf("nil input should yield nil announce list, got %+v", got) - } - if got := makeAnnounceList([]string{}); got != nil { - t.Errorf("empty input should yield nil announce list, got %+v", got) - } - if got := makeAnnounceList([]string{"", " ", ""}); got != nil { - // Empty strings should be filtered; if everything is empty, - // nil is the right answer. - // (We do NOT trim whitespace today — only literal "".) - if len(got) != 1 || len(got[0]) != 1 { - t.Errorf("expected 1 single-element tier, got %+v", got) - } - } - got := makeAnnounceList([]string{"wss://a", "", "wss://b"}) - if len(got) != 1 || len(got[0]) != 2 { - t.Fatalf("expected 1 tier of 2 URLs, got %+v", got) - } -} diff --git a/internal/engine/stream_source.go b/internal/engine/stream_source.go index 2dc1d3c..b418e61 100644 --- a/internal/engine/stream_source.go +++ b/internal/engine/stream_source.go @@ -12,7 +12,7 @@ import ( "time" ) -// streamSource abstracts the byte source served over the WebRTC DataChannel. +// streamSource abstracts the byte source consumed by the HLS transcoder. // Two implementations: // - diskFileSource — direct passthrough of the on-disk file. // - transcodeSource — ffmpeg writes a fragmented MP4 to a temp file in diff --git a/internal/engine/torrent.go b/internal/engine/torrent.go index 445f317..f4b1b6d 100644 --- a/internal/engine/torrent.go +++ b/internal/engine/torrent.go @@ -16,7 +16,6 @@ import ( alog "github.com/anacrolix/log" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/storage" - "github.com/pion/webrtc/v4" "github.com/torrentclaw/unarr/internal/config" "github.com/torrentclaw/unarr/internal/vpn" "golang.org/x/term" @@ -73,14 +72,6 @@ type TorrentConfig struct { SeedRatio float64 // target seed ratio (default 0, meaning seed until SeedTime) SeedTime time.Duration // min seed time after completion (default 0) - // WebRTC peer (WebTorrent protocol) for browser ↔ unarr P2P streaming. - // When enabled, anacrolix/torrent's built-in webtorrent package handles - // the WSS signaling + WebRTC data channels. Implies upload allowed for - // every torrent in the client (browsers can't pull pieces otherwise). - WebRTCEnabled bool - WebRTCTrackers []string // wss://… signaling trackers added to every magnet - ICEServers []webrtc.ICEServer // STUN + TURN servers for NAT traversal - // VPNTunnel, when set, split-tunnels the torrent client's peer + tracker // traffic through an in-process userspace WireGuard tunnel (managed-VPN // add-on). nil = downloads in the clear. Brought up by the daemon. @@ -111,26 +102,11 @@ func NewTorrentDownloader(cfg TorrentConfig) (*TorrentDownloader, error) { tcfg := torrent.NewDefaultClientConfig() tcfg.DataDir = cfg.DataDir tcfg.Seed = cfg.SeedEnabled - // WebRTC peers (browsers) can only pull pieces from us if upload is - // enabled. We honour SeedEnabled for the long-tail seed-after-complete - // behaviour but unconditionally allow upload while WebRTC is on so an - // active download can still serve to a watching browser. - tcfg.NoUpload = !cfg.SeedEnabled && !cfg.WebRTCEnabled - tcfg.Logger = alog.Default.FilterLevel(alog.Warning) // bumped from Critical for WebRTC peer + tracker announce visibility + tcfg.NoUpload = !cfg.SeedEnabled + tcfg.Logger = alog.Default.FilterLevel(alog.Warning) - // WebRTC / WebTorrent peer: anacrolix auto-routes ws://+wss:// trackers - // to the bundled webtorrent.TrackerClient. We only need to populate the - // ICE server list so the SDP offers we send carry usable candidates. - if cfg.WebRTCEnabled { - tcfg.DisableWebtorrent = false - if len(cfg.ICEServers) > 0 { - tcfg.ICEServerList = cfg.ICEServers - } - log.Printf("[torrent] WebRTC peer enabled (trackers=%d ice_servers=%d)", - len(cfg.WebRTCTrackers), len(cfg.ICEServers)) - } else { - tcfg.DisableWebtorrent = true - } + // No browser-facing WebTorrent peer; daemon never seeds via WSS. + tcfg.DisableWebtorrent = true // --- Performance optimizations --- @@ -657,30 +633,17 @@ func (d *TorrentDownloader) selectFiles(t *torrent.Torrent, taskID string) (tota return totalBytes, fileName } -// buildMagnet composes a magnet URI for the info hash. extraTrackers (e.g. -// wss://… for WebRTC peer signaling) are prepended so anacrolix's -// webtorrent.TrackerClient picks them up first; the static UDP list -// follows. Empty / whitespace entries in extraTrackers are skipped. -func buildMagnet(infoHash string, extraTrackers ...string) string { +// buildMagnet composes a magnet URI for the info hash with the static +// tracker list. +func buildMagnet(infoHash string) string { params := []string{"xt=urn:btih:" + infoHash} - for _, t := range extraTrackers { - t = strings.TrimSpace(t) - if t == "" { - continue - } - params = append(params, "tr="+url.QueryEscape(t)) - } for _, tracker := range defaultTrackers { params = append(params, "tr="+url.QueryEscape(tracker)) } return "magnet:?" + strings.Join(params, "&") } -// buildMagnet on the downloader injects its WebRTC trackers when enabled. func (d *TorrentDownloader) buildMagnet(infoHash string) string { - if d != nil && d.cfg.WebRTCEnabled { - return buildMagnet(infoHash, d.cfg.WebRTCTrackers...) - } return buildMagnet(infoHash) } diff --git a/internal/engine/transcode_quality.go b/internal/engine/transcode_quality.go new file mode 100644 index 0000000..4efda59 --- /dev/null +++ b/internal/engine/transcode_quality.go @@ -0,0 +1,64 @@ +package engine + +// TranscodeRuntime carries the resolved ffmpeg/ffprobe paths + tunables so +// each session can decide whether to passthrough or pipe through ffmpeg. +type TranscodeRuntime struct { + FFmpegPath string + FFprobePath string + HWAccel HWAccel + Preset string + VideoBitrate string + AudioBitrate string + MaxHeight int + // Disabled forces passthrough for every file even when codecs are not + // browser-friendly. Useful when the user explicitly turns transcoding + // off in config. + Disabled bool +} + +// qualityCap maps a session's Quality label to a (MaxHeight, VideoBitrate) +// pair. An empty label or "original" returns zero-values, signalling "no +// override" to the caller. +type qualityCap struct { + MaxHeight int + VideoBitrate string // ffmpeg -b:v string, e.g. "3500k" +} + +func resolveQualityCap(label string) qualityCap { + switch label { + case "2160p": + return qualityCap{MaxHeight: 2160, VideoBitrate: "25000k"} + case "1080p": + return qualityCap{MaxHeight: 1080, VideoBitrate: "6000k"} + case "720p": + return qualityCap{MaxHeight: 720, VideoBitrate: "3500k"} + case "480p": + return qualityCap{MaxHeight: 480, VideoBitrate: "1500k"} + default: + // "original", "auto", "" → defer to config. + return qualityCap{} + } +} + +// capForHeight returns the bitrate-cap pair appropriate for an effective +// output height. Used after clamping outputHeight to the source's resolution: +// asking ffmpeg for "2160p" bitrate (25 Mbps) on a 1080p source overshoots +// the H.264 level we derived from the EFFECTIVE height (4.0, max 20 Mbps) and +// makes libx264 refuse with "VBV bitrate > level limit". This helper picks +// the bitrate that matches the level libx264 will actually accept. +func capForHeight(height int) qualityCap { + switch { + case height <= 0: + return qualityCap{} + case height <= 480: + return qualityCap{MaxHeight: 480, VideoBitrate: "1500k"} + case height <= 720: + return qualityCap{MaxHeight: 720, VideoBitrate: "3500k"} + case height <= 1080: + return qualityCap{MaxHeight: 1080, VideoBitrate: "6000k"} + case height <= 1440: + return qualityCap{MaxHeight: 1440, VideoBitrate: "12000k"} + default: + return qualityCap{MaxHeight: 2160, VideoBitrate: "25000k"} + } +} diff --git a/internal/engine/transcoder.go b/internal/engine/transcoder.go index 9ea37cc..030c28c 100644 --- a/internal/engine/transcoder.go +++ b/internal/engine/transcoder.go @@ -11,10 +11,9 @@ import ( "time" ) -// TranscodeOpts steers how Transcoder builds its ffmpeg command line. Defaults -// match the project's plan/clever-weaving-dove.md (Fase 2.5): +// TranscodeOpts steers how Transcoder builds its ffmpeg command line. // -// - Output: fragmented MP4 readable by browser