diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dd5fc7d..7dabcc4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,14 +86,11 @@ jobs: run: | # Threshold applies only to engine and agent — cmd contains interactive UI # commands (config menus, daemon, auth browser) that are not unit-testable. - # WebRTC files are excluded: deprecated, slated for removal in 0.9.0. go test -race -coverprofile=coverage-core.out -covermode=atomic \ ./internal/engine/... \ ./internal/agent/... - # Strip webrtc lines from the profile before computing the threshold. - grep -v '/internal/engine/webrtc' coverage-core.out > coverage-core-filtered.out - COVERAGE=$(go tool cover -func=coverage-core-filtered.out | grep ^total | awk '{print $3}' | tr -d '%') - echo "Coverage on engine+agent (excluding webrtc): ${COVERAGE}%" + COVERAGE=$(go tool cover -func=coverage-core.out | grep ^total | awk '{print $3}' | tr -d '%') + echo "Coverage on engine+agent: ${COVERAGE}%" python3 -c " coverage = float('${COVERAGE}') threshold = 50.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 961db09..dfc0f79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,37 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.4] - 2026-05-26 + +### Removed + +- **streaming**: retire the custom WebRTC DataChannel pipeline. The daemon no + longer ships pion/webrtc, the WSS signaling client, or the wire framing + package — every in-browser session now uses HLS over HTTP from the daemon + (Tailscale / LAN / UPnP). Browser P2P (WebTorrent) bytes never re-enabled. +- **config**: `[downloads.webrtc]` block removed from the TOML schema; existing + config files with the section parse cleanly because go-toml ignores unknown + sections. +- **seed_file**: `mode=seed_file` task handler + `engine.SeedFile` helper + dropped — the last in-browser caller was retired with the WebRTC player. +- **wstracker-probe**: standalone probe binary removed. + +### Changed + +- **agent wire**: `SyncResponse.WebRTCSessions` (JSON: `webrtcSessions`) renamed + to `StreamSessions` (JSON: `streamSessions`). The Go type `agent.WebRTCSession` + is now `agent.StreamSession`. Wire-incompatible with web < 2026-05-26. +- **torrent**: `buildMagnet` no longer accepts an `extraTrackers` variadic — + the default tracker list is the only set used. + +### Fixed + +- **hls**: clamp the ffmpeg `-b:v` to the bitrate cap derived from the EFFECTIVE + output height instead of the requested quality. Previously asking for "2160p" + on a 1080p source overshot the H.264 level we resolved from the effective + height (4.0, max 20 Mbps) and made libx264 abort with + `VBV bitrate > level limit`. + ## [0.9.2] - 2026-05-21 ### Added diff --git a/README.md b/README.md index 6984bd0..e1cc6e3 100644 --- a/README.md +++ b/README.md @@ -434,24 +434,12 @@ country = "US" ### Streaming reference -The in-browser player on torrentclaw.com streams from the daemon over WebRTC -(low-latency P2P) or HLS (HTTP fragments + ffmpeg transcode for codecs the -browser can't decode natively). Both are enabled by default — a fresh install -"just works" without editing the TOML. Disable surgically only if you have a -reason. +The in-browser player on torrentclaw.com streams from the daemon over HLS +(HTTP fragments + ffmpeg transcode for codecs the browser can't decode +natively). Enabled by default — a fresh install "just works" without editing +the TOML. ```toml -[downloads.webrtc] -enabled = true # master switch -trackers = ["wss://tracker.torrentclaw.com"] # signaling trackers -stun_servers = [ # NAT traversal - "stun:stun.l.google.com:19302", - "stun:stun1.l.google.com:19302", -] -turn_servers = [] # optional TURN relays -turn_user = "" -turn_pass = "" - [downloads.transcode] enabled = true # master switch hw_accel = "auto" # auto | none | nvenc | qsv | vaapi | videotoolbox @@ -462,16 +450,6 @@ max_height = 0 # 0 = no cap; e.g. 720 forces 720p max max_concurrent = 2 # max simultaneous ffmpeg processes ``` -#### `[downloads.webrtc]` - -| Key | Type | Default | Notes | -|-----|------|---------|-------| -| `enabled` | bool | `true` | Browser↔daemon WebRTC peer for the in-browser P2P player. Disable to skip WebRTC tracker signalling (saves ~5MB RAM, blocks WebRTC streaming — HLS still works). | -| `trackers` | `[]string` | `["wss://tracker.torrentclaw.com"]` | Signaling trackers for peer discovery. | -| `stun_servers` | `[]string` | Google public STUN ×2 | ICE candidate gathering. | -| `turn_servers` | `[]string` | `[]` | Optional TURN relays for symmetric-NAT users. | -| `turn_user` / `turn_pass` | string | `""` | Credentials for authed TURN servers. Applied to all `turn_servers`. | - #### `[downloads.transcode]` | Key | Type | Default | Notes | diff --git a/SECURITY.md b/SECURITY.md index 9b64c4c..b88b335 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -72,7 +72,7 @@ Docker Hub vulnerability count: package pulls ~40 codec/parser libraries (`x264`, `x265`, `libvpx`, `aom`, `dav1d`, `libtheora`, `libvorbis`, `libwebp`, `libbluray`, `libopenmpt`, …). Each carries a long NVD history that Alpine does not backport. ffmpeg is a - **functional dependency** — the WebRTC/HLS transcode pipeline shells out to + **functional dependency** — the HLS transcode pipeline shells out to `ffmpeg`/`ffprobe` to decode untrusted media and re-encode to H.264 + AAC. ### Accepted risk and policy @@ -100,7 +100,7 @@ Recommended additions for exposed deployments: - no-new-privileges:true ``` -If you do not need WebRTC/HLS transcoding, you can run with transcoding disabled to +If you do not need HLS transcoding, you can run with transcoding disabled to avoid feeding untrusted media to ffmpeg at all. ## Disclosure Policy diff --git a/cmd/wstracker-probe/main.go b/cmd/wstracker-probe/main.go deleted file mode 100644 index 7eecaa5..0000000 --- a/cmd/wstracker-probe/main.go +++ /dev/null @@ -1,268 +0,0 @@ -// wstracker-probe — connects to a WebSocket BitTorrent tracker and either -// (a) advertises a fake info_hash to verify announce signalling, or -// (b) seeds a real file via the WebTorrent protocol so a browser -// webtorrent.js client can fetch it for end-to-end verification. -// -// Modes: -// -// wstracker-probe -tracker wss://tracker.torrentclaw.com -// Announces a random info_hash; exits 0 on TrackerAnnounceSuccessful. -// -// wstracker-probe -tracker wss://… -seed /path/to/file.mp4 -// Builds a single-file torrent in memory, seeds forever, prints the -// magnet (with the WSS tracker injected). Ctrl-C to stop. -// -// Useful for browser ↔ unarr e2e — point a webtorrent.js page at the -// printed magnet and the player should pull pieces via WebRTC data channel. -package main - -import ( - "context" - "crypto/rand" - "flag" - "fmt" - "log" - "net/url" - "os" - "os/signal" - "path/filepath" - "syscall" - "time" - - alog "github.com/anacrolix/log" - "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/metainfo" - "github.com/anacrolix/torrent/storage" - "github.com/pion/webrtc/v4" -) - -func main() { - tracker := flag.String("tracker", "wss://tracker.torrentclaw.com", "WSS tracker URL to probe") - timeout := flag.Duration("timeout", 30*time.Second, "max wait for successful announce (ignored in -seed mode)") - seedPath := flag.String("seed", "", "path to a file to seed (single-file torrent). When set, runs forever instead of exiting on first announce.") - flag.Parse() - - if *seedPath != "" { - runSeeder(*seedPath, *tracker) - return - } - - runProbe(*tracker, *timeout) -} - -// runProbe — single random-hash announce, exits on success/error/timeout. -func runProbe(trackerURL string, timeout time.Duration) { - tmp, err := os.MkdirTemp("", "wstracker-probe-*") - if err != nil { - log.Fatalf("temp dir: %v", err) - } - defer os.RemoveAll(tmp) - - cfg := baseClientConfig(tmp) - - annSuccess := make(chan struct{}, 1) - annError := make(chan error, 1) - cfg.Callbacks.StatusUpdated = append( - cfg.Callbacks.StatusUpdated, - func(e torrent.StatusUpdatedEvent) { - switch e.Event { //nolint:exhaustive // peer events are noise for tracker probe - case torrent.TrackerConnected: - if e.Error != nil { - fmt.Printf("[probe] tracker connect FAILED: %v\n", e.Error) - } else { - fmt.Printf("[probe] tracker connected: %s\n", e.Url) - } - case torrent.TrackerAnnounceSuccessful: - fmt.Printf("[probe] tracker announce OK: %s ih=%s\n", e.Url, e.InfoHash) - select { - case annSuccess <- struct{}{}: - default: - } - case torrent.TrackerAnnounceError: - fmt.Printf("[probe] tracker announce ERROR: %s ih=%s err=%v\n", e.Url, e.InfoHash, e.Error) - select { - case annError <- e.Error: - default: - } - case torrent.TrackerDisconnected: - fmt.Printf("[probe] tracker disconnected: %s err=%v\n", e.Url, e.Error) - } - }, - ) - - client, err := torrent.NewClient(cfg) - if err != nil { - log.Fatalf("create torrent client: %v", err) - } - defer client.Close() - - var ih [20]byte - if _, err := rand.Read(ih[:]); err != nil { - log.Fatalf("random info_hash: %v", err) - } - magnet := fmt.Sprintf("magnet:?xt=urn:btih:%x&tr=%s", ih, trackerURL) - fmt.Printf("[probe] tracker=%s info_hash=%x timeout=%s\n", trackerURL, ih, timeout) - - t, err := client.AddMagnet(magnet) - if err != nil { - log.Fatalf("add magnet: %v", err) - } - defer t.Drop() - - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - select { - case <-annSuccess: - fmt.Println("[probe] OK — tracker announce succeeded") - os.Exit(0) - case err := <-annError: - fmt.Printf("[probe] FAIL — tracker announce error: %v\n", err) - os.Exit(1) - case <-ctx.Done(): - fmt.Printf("[probe] FAIL — timeout after %s\n", timeout) - os.Exit(2) - } -} - -// runSeeder — builds a single-file torrent for the given path, adds it to -// a WebTorrent-enabled client, and seeds until SIGINT/SIGTERM. -func runSeeder(filePath, trackerURL string) { - abs, err := filepath.Abs(filePath) - if err != nil { - log.Fatalf("resolve seed path: %v", err) - } - st, err := os.Stat(abs) - if err != nil { - log.Fatalf("stat seed file: %v", err) - } - if st.IsDir() { - log.Fatalf("-seed currently supports a single file, not a directory: %s", abs) - } - - dataDir := filepath.Dir(abs) - - // Build single-file torrent metadata. - info := metainfo.Info{ - PieceLength: chooseSeedPieceLength(st.Size()), - Name: filepath.Base(abs), - } - if err := info.BuildFromFilePath(abs); err != nil { - log.Fatalf("build info from file: %v", err) - } - infoBytes, err := bencode.Marshal(info) - if err != nil { - log.Fatalf("marshal info: %v", err) - } - - mi := &metainfo.MetaInfo{ - InfoBytes: infoBytes, - AnnounceList: metainfo.AnnounceList{{trackerURL}}, - CreatedBy: "wstracker-probe", - } - ih := mi.HashInfoBytes() - - cfg := baseClientConfig(dataDir) - cfg.Seed = true - - cfg.Callbacks.StatusUpdated = append( - cfg.Callbacks.StatusUpdated, - func(e torrent.StatusUpdatedEvent) { - switch e.Event { //nolint:exhaustive - case torrent.TrackerConnected: - if e.Error != nil { - fmt.Printf("[seed] tracker connect FAILED: %v\n", e.Error) - } else { - fmt.Printf("[seed] tracker connected: %s\n", e.Url) - } - case torrent.TrackerAnnounceSuccessful: - fmt.Printf("[seed] tracker announce OK: %s ih=%s\n", e.Url, e.InfoHash) - case torrent.TrackerAnnounceError: - fmt.Printf("[seed] tracker announce ERROR: %s err=%v\n", e.Url, e.Error) - case torrent.TrackerDisconnected: - fmt.Printf("[seed] tracker disconnected: %s err=%v\n", e.Url, e.Error) - } - }, - ) - - client, err := torrent.NewClient(cfg) - if err != nil { - log.Fatalf("create torrent client: %v", err) - } - defer client.Close() - - t, err := client.AddTorrent(mi) - if err != nil { - log.Fatalf("add torrent: %v", err) - } - t.DownloadAll() - - dn := url.QueryEscape(info.Name) - enc := url.QueryEscape(trackerURL) - magnet := fmt.Sprintf("magnet:?xt=urn:btih:%s&dn=%s&tr=%s", ih.HexString(), dn, enc) - - fmt.Printf("[seed] file=%s size=%d bytes piece_length=%d\n", abs, st.Size(), info.PieceLength) - fmt.Printf("[seed] info_hash=%s\n", ih.HexString()) - fmt.Printf("[seed] magnet=%s\n", magnet) - fmt.Println("[seed] seeding via WebRTC. Ctrl-C to stop.") - - stop := make(chan os.Signal, 1) - signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) - statTicker := time.NewTicker(5 * time.Second) - defer statTicker.Stop() - - for { - select { - case <-statTicker.C: - s := t.Stats() - fmt.Printf("[seed] peers=%d uploaded=%d bytes seeders=%d leechers=%d\n", - s.ActivePeers, s.BytesWrittenData.Int64(), - s.ConnectedSeeders, s.ActivePeers-s.ConnectedSeeders) - case <-stop: - fmt.Println("[seed] stopping") - return - } - } -} - -// baseClientConfig — shared anacrolix client config for both modes. -// WebTorrent is the only transport enabled; TCP/uTP/DHT/IPv6 are disabled -// to keep the moving parts to the minimum required for a WSS-only test. -func baseClientConfig(dataDir string) *torrent.ClientConfig { - cfg := torrent.NewDefaultClientConfig() - cfg.DataDir = dataDir - cfg.DefaultStorage = storage.NewMMap(dataDir) - cfg.NoUpload = false - cfg.DisableTCP = true - cfg.DisableUTP = true - cfg.DisableIPv6 = true - cfg.NoDHT = true - cfg.NoDefaultPortForwarding = true - cfg.ListenPort = 0 - cfg.Logger = alog.Default.FilterLevel(alog.Critical) - cfg.DisableWebtorrent = false - cfg.ICEServerList = []webrtc.ICEServer{ - {URLs: []string{"stun:stun.l.google.com:19302"}}, - {URLs: []string{"stun:stun1.l.google.com:19302"}}, - } - return cfg -} - -// chooseSeedPieceLength picks a sane piece size for a given file size. -// Mirrors the libtorrent / qBittorrent ladder so the resulting torrent -// is interoperable with mainstream clients. -func chooseSeedPieceLength(size int64) int64 { - switch { - case size < 4*1024*1024: // < 4 MiB - return 16 * 1024 // 16 KiB - case size < 64*1024*1024: // < 64 MiB - return 64 * 1024 // 64 KiB - case size < 512*1024*1024: // < 512 MiB - return 256 * 1024 // 256 KiB - case size < 4*1024*1024*1024: // < 4 GiB - return 1024 * 1024 // 1 MiB - default: - return 4 * 1024 * 1024 // 4 MiB - } -} diff --git a/go.mod b/go.mod index f3aea87..a47f6e3 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,6 @@ require ( github.com/google/uuid v1.6.0 github.com/huin/goupnp v1.3.0 github.com/olekukonko/tablewriter v1.1.4 - github.com/pion/webrtc/v4 v4.2.11 github.com/spf13/cobra v1.10.2 github.com/torrentclaw/go-client v0.2.0 golang.org/x/term v0.43.0 @@ -107,6 +106,7 @@ require ( github.com/pion/stun/v3 v3.1.1 // indirect github.com/pion/transport/v4 v4.0.1 // indirect github.com/pion/turn/v4 v4.1.4 // indirect + github.com/pion/webrtc/v4 v4.2.11 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/protolambda/ctxlock v0.1.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index 385454a..d0b1458 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -38,7 +38,7 @@ type Daemon struct { // Callbacks — set by cmd/daemon.go before calling Run. OnTasksClaimed func(tasks []Task) OnStreamRequested func(req StreamRequest) - OnWebRTCSession func(sess WebRTCSession) + OnStreamSession func(sess StreamSession) OnControlAction func(action, taskID string, deleteFiles bool) GetActiveCount func() int // returns number of active downloads (wired from manager) @@ -210,9 +210,9 @@ func (d *Daemon) Run(ctx context.Context) error { d.OnStreamRequested(req) } } - d.sync.OnWebRTCSession = func(sess WebRTCSession) { - if d.OnWebRTCSession != nil { - d.OnWebRTCSession(sess) + d.sync.OnStreamSession = func(sess StreamSession) { + if d.OnStreamSession != nil { + d.OnStreamSession(sess) } } d.sync.OnUpgrade = func(version string) { diff --git a/internal/agent/signal_client.go b/internal/agent/signal_client.go deleted file mode 100644 index 624dc6c..0000000 --- a/internal/agent/signal_client.go +++ /dev/null @@ -1,258 +0,0 @@ -package agent - -import ( - "bufio" - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "strings" - "time" -) - -// SignalRole identifies who produced a signalling message. The opposite role -// receives it. -type SignalRole string - -const ( - SignalRoleBrowser SignalRole = "browser" - SignalRoleAgent SignalRole = "agent" -) - -// SignalMessageType matches the server-side z.enum on -// /api/internal/stream/signal/[sessionId] route. -type SignalMessageType string - -const ( - SignalMsgOffer SignalMessageType = "offer" - SignalMsgAnswer SignalMessageType = "answer" - SignalMsgCandidate SignalMessageType = "candidate" - SignalMsgCandidateEnd SignalMessageType = "candidate-end" - SignalMsgBye SignalMessageType = "bye" -) - -// SignalMessage mirrors the bus envelope on the web side. -type SignalMessage struct { - From SignalRole `json:"from"` - Type SignalMessageType `json:"type"` - Payload string `json:"payload"` - TS int64 `json:"ts"` -} - -// PostSignal enqueues a signalling message produced by this agent. The -// browser receives it on its next SSE event push. -func (c *Client) PostSignal(ctx context.Context, sessionID string, msg SignalMessage) error { - body := map[string]any{ - "from": string(SignalRoleAgent), - "type": string(msg.Type), - "payload": msg.Payload, - } - path := fmt.Sprintf("/api/internal/stream/signal/%s", sessionID) - return c.doPost(ctx, path, body, &struct { - OK bool `json:"ok"` - }{}) -} - -// SignalEventStream wraps an open SSE connection. Read messages from Events() -// until the channel closes (server timeout or context cancel). Always defer -// Close() to release the underlying response body. -type SignalEventStream struct { - resp *http.Response - cancel context.CancelFunc - events chan SignalMessage - errs chan error - done chan struct{} -} - -// Events streams browser-produced messages addressed to the agent. -// The channel closes when the SSE connection ends; the caller should then -// call Close() and reopen if it wants to keep listening. -func (s *SignalEventStream) Events() <-chan SignalMessage { return s.events } - -// Err returns the terminating error (if any) once Events() has closed. -func (s *SignalEventStream) Err() error { - select { - case err := <-s.errs: - return err - default: - return nil - } -} - -// Close cancels the underlying HTTP request and waits for the reader goroutine -// to drain. Safe to call more than once. -func (s *SignalEventStream) Close() error { - if s.cancel != nil { - s.cancel() - } - if s.resp != nil { - s.resp.Body.Close() - } - <-s.done - return nil -} - -// OpenSignalStream opens a long-lived SSE connection to the signal events -// endpoint. Caller MUST cancel ctx (or call Close()) to free resources. -// -// The server caps each response at ~25 s; OpenSignalStream surfaces the -// disconnect by closing the events channel. Caller should reopen until the -// session ends. -func (c *Client) OpenSignalStream(ctx context.Context, sessionID string) (*SignalEventStream, error) { - streamCtx, cancel := context.WithCancel(ctx) - - url := fmt.Sprintf("%s/api/internal/stream/signal/%s/events", c.baseURL(), sessionID) - req, err := http.NewRequestWithContext(streamCtx, http.MethodGet, url, nil) - if err != nil { - cancel() - return nil, fmt.Errorf("open signal stream: %w", err) - } - req.Header.Set("Accept", "text/event-stream") - req.Header.Set("Authorization", "Bearer "+c.apiKey) - req.Header.Set("User-Agent", c.userAgent) - req.Header.Set("Cache-Control", "no-cache") - - // Use a per-call client with no timeout (SSE connections are long). - sseClient := &http.Client{} - resp, err := sseClient.Do(req) - if err != nil { - cancel() - return nil, fmt.Errorf("open signal stream: %w", err) - } - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) - resp.Body.Close() - cancel() - return nil, fmt.Errorf("open signal stream: HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(body))) - } - - stream := &SignalEventStream{ - resp: resp, - cancel: cancel, - events: make(chan SignalMessage, 8), - errs: make(chan error, 1), - done: make(chan struct{}), - } - - go stream.read() - return stream, nil -} - -// sseMaxLineBytes caps the size of a single SSE line. Real signalling lines -// are JSON payloads of a few hundred bytes; 256 KiB is generous enough to -// survive a future schema bump but small enough that a hostile or buggy -// server cannot grow daemon memory by streaming a single line forever. -const sseMaxLineBytes = 256 * 1024 - -// sseMaxEventBytes caps the total bytes buffered across the lines of one -// SSE event. Without a cap, a peer could send unbounded `data:` continuation -// lines and OOM the daemon between blank-line dispatches. -const sseMaxEventBytes = 1024 * 1024 - -func (s *SignalEventStream) read() { - defer close(s.done) - defer close(s.events) - - scanner := bufio.NewScanner(s.resp.Body) - scanner.Buffer(make([]byte, 16*1024), sseMaxLineBytes) - - var dataBuf bytes.Buffer - var eventName string - - for scanner.Scan() { - line := strings.TrimRight(scanner.Text(), "\r") - if line == "" { - // End of an event — dispatch if we have data. - if dataBuf.Len() == 0 { - eventName = "" - continue - } - if eventName == "" || eventName == "signal" { - var msg SignalMessage - if err := json.Unmarshal(dataBuf.Bytes(), &msg); err == nil { - select { - case s.events <- msg: - case <-s.resp.Request.Context().Done(): - return - } - } - } - dataBuf.Reset() - eventName = "" - continue - } - if strings.HasPrefix(line, ":") { - // SSE comment (heartbeat); ignore. - continue - } - if strings.HasPrefix(line, "event:") { - eventName = strings.TrimSpace(line[len("event:"):]) - continue - } - if strings.HasPrefix(line, "data:") { - payload := strings.TrimSpace(line[len("data:"):]) - // Refuse to grow the event buffer past the cap. Reset so a - // well-formed event after the offender can still be parsed, - // and surface an error so SignalLoop reconnects. - if dataBuf.Len()+len(payload)+1 > sseMaxEventBytes { - dataBuf.Reset() - eventName = "" - select { - case s.errs <- fmt.Errorf("sse: event exceeded %d bytes", sseMaxEventBytes): - default: - } - return - } - if dataBuf.Len() > 0 { - dataBuf.WriteByte('\n') - } - dataBuf.WriteString(payload) - continue - } - // id:, retry:, anything else — ignore for now. - } - if err := scanner.Err(); err != nil { - select { - case s.errs <- err: - default: - } - } -} - -// SignalLoop runs an SSE consumer that reconnects automatically on disconnect. -// onMessage is called for every browser-produced message. Returns when ctx is -// cancelled. Reconnect backoff is fixed at 1 s — the server already paces -// reconnects with `retry: 1500` headers so churn is bounded. -func (c *Client) SignalLoop(ctx context.Context, sessionID string, onMessage func(SignalMessage)) error { - for ctx.Err() == nil { - stream, err := c.OpenSignalStream(ctx, sessionID) - if err != nil { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return ctx.Err() - } - continue - } - for msg := range stream.Events() { - onMessage(msg) - } - streamErr := stream.Err() - stream.Close() - if ctx.Err() != nil { - return ctx.Err() - } - // Server closes the SSE every ~25 s; reconnect immediately. - // Hard error → small backoff so we don't hammer. - if streamErr != nil { - select { - case <-time.After(time.Second): - case <-ctx.Done(): - return ctx.Err() - } - } - } - return ctx.Err() -} diff --git a/internal/agent/signal_client_test.go b/internal/agent/signal_client_test.go deleted file mode 100644 index 796b545..0000000 --- a/internal/agent/signal_client_test.go +++ /dev/null @@ -1,196 +0,0 @@ -package agent - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "strings" - "sync" - "testing" - "time" -) - -// fakeSSEServer streams a fixed set of SSE events then closes the connection. -func fakeSSEServer(t *testing.T, msgs []SignalMessage, holdOpenAfter bool) *httptest.Server { - t.Helper() - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Authorization") != "Bearer test-key" { - http.Error(w, "auth", http.StatusUnauthorized) - return - } - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - flusher, ok := w.(http.Flusher) - if !ok { - t.Fatal("server: ResponseWriter is not a Flusher") - } - fmt.Fprint(w, "retry: 1500\n\n") - flusher.Flush() - for _, m := range msgs { - data, _ := json.Marshal(m) - fmt.Fprintf(w, "id: %d\nevent: signal\ndata: %s\n\n", m.TS, data) - flusher.Flush() - } - // Send a heartbeat comment to verify it's ignored. - fmt.Fprint(w, ": heartbeat\n\n") - flusher.Flush() - if holdOpenAfter { - // Hold the connection until the client disconnects so the test can - // exercise stream.Close(). - <-r.Context().Done() - } - })) -} - -func TestSignalStreamReadsMessages(t *testing.T) { - want := []SignalMessage{ - {From: SignalRoleBrowser, Type: SignalMsgOffer, Payload: "{sdp:1}", TS: 1}, - {From: SignalRoleBrowser, Type: SignalMsgCandidate, Payload: "{cand:1}", TS: 2}, - } - srv := fakeSSEServer(t, want, false) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "test-ua") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - stream, err := c.OpenSignalStream(ctx, "session-1") - if err != nil { - t.Fatalf("open: %v", err) - } - defer stream.Close() - - var got []SignalMessage - for m := range stream.Events() { - got = append(got, m) - if len(got) == len(want) { - break - } - } - if len(got) != len(want) { - t.Fatalf("got %d messages, want %d", len(got), len(want)) - } - for i, m := range got { - if m.From != want[i].From || m.Type != want[i].Type || m.Payload != want[i].Payload { - t.Errorf("[%d] mismatch: %+v want %+v", i, m, want[i]) - } - } -} - -func TestSignalStreamPropagatesAuthError(t *testing.T) { - srv := fakeSSEServer(t, nil, false) - defer srv.Close() - - c := NewClient(srv.URL, "wrong-key", "test-ua") - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - - _, err := c.OpenSignalStream(ctx, "session-1") - if err == nil { - t.Fatal("expected auth error, got nil") - } -} - -func TestSignalStreamCloseCancelsRead(t *testing.T) { - srv := fakeSSEServer(t, nil, true) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "test-ua") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - stream, err := c.OpenSignalStream(ctx, "session-1") - if err != nil { - t.Fatalf("open: %v", err) - } - - // Close on a separate goroutine then make sure the events channel drains. - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - time.Sleep(50 * time.Millisecond) - stream.Close() - }() - - for range stream.Events() { - // drain - } - wg.Wait() -} - -// TestSignalStreamRejectsOversizedEvent verifies that a hostile or buggy -// server sending an unbounded `data:` event surfaces an error and stops -// the reader instead of growing daemon memory forever. -func TestSignalStreamRejectsOversizedEvent(t *testing.T) { - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Authorization") != "Bearer test-key" { - http.Error(w, "auth", http.StatusUnauthorized) - return - } - w.Header().Set("Content-Type", "text/event-stream") - flusher := w.(http.Flusher) - // Send many data: continuation lines until we blow past the - // per-event cap. Each chunk is a short legitimate-looking line. - chunk := "data: " + strings.Repeat("x", 4096) + "\n" - fmt.Fprint(w, "event: signal\n") - for i := 0; i < (sseMaxEventBytes/4096)+8; i++ { - fmt.Fprint(w, chunk) - } - flusher.Flush() - <-r.Context().Done() - })) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "test-ua") - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - - stream, err := c.OpenSignalStream(ctx, "session-overflow") - if err != nil { - t.Fatalf("open: %v", err) - } - defer stream.Close() - - for range stream.Events() { - // Should never receive a parsed event — the over-sized buffer must - // be rejected before dispatch. - } - if err := stream.Err(); err == nil { - t.Fatal("expected error from oversized event, got nil") - } -} - -func TestPostSignalSendsCorrectBody(t *testing.T) { - var bodySeen map[string]any - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("Authorization") != "Bearer test-key" { - http.Error(w, "auth", http.StatusUnauthorized) - return - } - _ = json.NewDecoder(r.Body).Decode(&bodySeen) - w.Header().Set("Content-Type", "application/json") - fmt.Fprint(w, `{"ok":true}`) - })) - defer srv.Close() - - c := NewClient(srv.URL, "test-key", "test-ua") - err := c.PostSignal(context.Background(), "sess-x", SignalMessage{ - Type: SignalMsgAnswer, - Payload: "{sdp:answer}", - }) - if err != nil { - t.Fatalf("post: %v", err) - } - if bodySeen["from"] != string(SignalRoleAgent) { - t.Errorf("expected from=agent, got %v", bodySeen["from"]) - } - if bodySeen["type"] != string(SignalMsgAnswer) { - t.Errorf("expected type=answer, got %v", bodySeen["type"]) - } - if bodySeen["payload"] != "{sdp:answer}" { - t.Errorf("expected payload mismatch, got %v", bodySeen["payload"]) - } -} diff --git a/internal/agent/sync.go b/internal/agent/sync.go index 9847aba..c28c65f 100644 --- a/internal/agent/sync.go +++ b/internal/agent/sync.go @@ -29,7 +29,7 @@ type SyncClient struct { OnNewTasks func(tasks []Task) OnControl func(action, taskID string, deleteFiles bool) OnStreamRequest func(req StreamRequest) - OnWebRTCSession func(sess WebRTCSession) + OnStreamSession func(sess StreamSession) OnUpgrade func(version string) OnScan func() OnWatchingChange func(watching bool) @@ -199,10 +199,10 @@ func (sc *SyncClient) processResponse(resp *SyncResponse) { } } - // WebRTC streaming sessions - for _, ws := range resp.WebRTCSessions { - if sc.OnWebRTCSession != nil { - sc.OnWebRTCSession(ws) + // HLS streaming sessions. + for _, ws := range resp.StreamSessions { + if sc.OnStreamSession != nil { + sc.OnStreamSession(ws) } } diff --git a/internal/agent/types.go b/internal/agent/types.go index 8e0094a..72e8af5 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -374,29 +374,22 @@ type LibraryDeleteRequest struct { FilePath string `json:"filePath"` } -// WebRTCSession is a request to open a streaming session for a browser -// player. Transport selects the on-the-wire protocol: empty/"webrtc" runs the -// legacy custom WebRTC DataChannel pipeline; "hls" spawns an HLS session -// (ffmpeg producing fragmented MP4 served over HTTP). The CLI must POST an -// SDP answer to /api/internal/stream/signal/ for WebRTC sessions -// and register the HLS session in the StreamServer's HLS registry for HLS -// sessions; either way the source bytes come from FilePath (or, when only -// InfoHash is set, from a download_task on disk). -type WebRTCSession struct { - SessionID string `json:"sessionId"` - // Transport selects the streaming protocol. "" or "webrtc" → legacy - // WebRTC + MSE pipeline (Phase 1). "hls" → HLS over HTTP (Phase 2). - Transport string `json:"transport,omitempty"` - FilePath string `json:"filePath,omitempty"` - InfoHash string `json:"infoHash,omitempty"` - TaskID string `json:"taskId,omitempty"` - FileName string `json:"fileName,omitempty"` - FileSize int64 `json:"fileSize,omitempty"` +// StreamSession is a request to open an HLS streaming session for an +// in-browser player. The CLI registers the HLS session in the StreamServer's +// HLS registry; source bytes come from FilePath (or, when only InfoHash is +// set, from a download_task on disk). +type StreamSession struct { + SessionID string `json:"sessionId"` + FilePath string `json:"filePath,omitempty"` + InfoHash string `json:"infoHash,omitempty"` + TaskID string `json:"taskId,omitempty"` + FileName string `json:"fileName,omitempty"` + FileSize int64 `json:"fileSize,omitempty"` // Quality target the daemon should aim for when transcoding. One of // "2160p" | "1080p" | "720p" | "480p" | "original" | "" (defer to config). Quality string `json:"quality,omitempty"` // AudioIndex selects the source audio track (-map 0:a:N). -1 means - // "use the default/first track" (HLS) or ignored (WebRTC). + // "use the default/first track". AudioIndex int `json:"audioIndex,omitempty"` } @@ -405,7 +398,7 @@ type SyncResponse struct { NewTasks []Task `json:"newTasks,omitempty"` Controls []ControlAction `json:"controls,omitempty"` StreamRequests []StreamRequest `json:"streamRequests,omitempty"` - WebRTCSessions []WebRTCSession `json:"webrtcSessions,omitempty"` + StreamSessions []StreamSession `json:"streamSessions,omitempty"` Watching bool `json:"watching"` Upgrade *UpgradeSignal `json:"upgrade,omitempty"` Scan bool `json:"scan,omitempty"` diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 54759b2..84c458c 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -255,9 +255,6 @@ func runDaemonStart() error { MaxUploadRate: maxUl, ListenPort: cfg.Download.ListenPort, SeedEnabled: false, - WebRTCEnabled: cfg.Download.WebRTC.Enabled, - WebRTCTrackers: cfg.Download.WebRTC.Trackers, - ICEServers: engine.BuildICEServers(cfg.Download.WebRTC), VPNTunnel: vpnTunnel, }) if err != nil { @@ -330,13 +327,7 @@ func runDaemonStart() error { // Wire: sync receives new tasks → submit to manager or handle stream d.OnTasksClaimed = func(tasks []agent.Task) { for _, t := range tasks { - if t.Mode == "seed_file" { - // Browser asked us to wrap an arbitrary on-disk file as - // a single-file torrent + seed it via WebRTC. Runs in - // its own goroutine so a slow / failing seed can't - // stall the rest of the claim batch. - go handleSeedFileTask(t, torrentDl, agentClient) - } else if t.Mode == "stream" { + if t.Mode == "stream" { if isStreamingTask(t.ID) { continue } @@ -497,23 +488,23 @@ func runDaemonStart() error { }() } - // Wire: sync receives custom WebRTC streaming session requests. - // Each session is a one-shot browser↔daemon DataChannel. Validate the - // FilePath against allowed dirs to prevent path traversal abuse from a - // compromised server, then spawn the pion peer in its own goroutine. - d.OnWebRTCSession = func(sess agent.WebRTCSession) { - if webrtcRegistry.has(sess.SessionID) { + // Wire: sync receives HLS streaming session requests. Each session spawns + // one ffmpeg process and registers its HLS playlist with the StreamServer. + // Validate FilePath against allowed dirs to prevent path traversal abuse + // from a compromised server. + d.OnStreamSession = func(sess agent.StreamSession) { + if playerSessionRegistry.has(sess.SessionID) { return // already running } filePath := sess.FilePath if filePath == "" { - log.Printf("webrtc session %s rejected: empty file path", agent.ShortID(sess.SessionID)) + log.Printf("[hls %s] rejected: empty file path", agent.ShortID(sess.SessionID)) return } filePath = filepath.Clean(filePath) if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) { - log.Printf("webrtc session %s rejected: path outside allowed dirs: %s", + log.Printf("[hls %s] rejected: path outside allowed dirs: %s", agent.ShortID(sess.SessionID), filePath) return } @@ -521,75 +512,36 @@ func runDaemonStart() error { if info, err := os.Stat(filePath); err == nil && info.IsDir() { found := engine.FindVideoFile(filePath) if found == "" { - log.Printf("webrtc session %s rejected: no video file in dir %s", + log.Printf("[hls %s] rejected: no video file in dir %s", agent.ShortID(sess.SessionID), filePath) return } filePath = found } - // Branch on transport: HLS sessions only need ffmpeg + StreamServer, - // not a WebRTC peer, so they must bypass the WebRTC.Enabled gate. - // Default ("" or "webrtc") runs the DataChannel pipeline and requires it. - if strings.EqualFold(sess.Transport, "hls") { - tcRuntime := buildTranscodeRuntime(ctx, cfg) - if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { - log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID)) - return - } - hlsCtx, hlsCancel := context.WithCancel(ctx) - webrtcRegistry.add(sess.SessionID, hlsCancel) - hlsCfg := engine.HLSSessionConfig{ - SessionID: sess.SessionID, - SourcePath: filePath, - FileName: sess.FileName, - Quality: sess.Quality, - AudioIndex: sess.AudioIndex, - Transcode: tcRuntime, - } - hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) - if err != nil { - webrtcRegistry.remove(sess.SessionID) - hlsCancel() - log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err) - return - } - streamSrv.HLS().Register(hsess) + tcRuntime := buildTranscodeRuntime(ctx, cfg) + if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { + log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID)) return } - - // Non-HLS transport requires WebRTC peer support. - if !cfg.Download.WebRTC.Enabled { - log.Printf("webrtc session %s rejected: webrtc disabled in config", agent.ShortID(sess.SessionID)) + hlsCtx, hlsCancel := context.WithCancel(ctx) + playerSessionRegistry.add(sess.SessionID, hlsCancel) + hlsCfg := engine.HLSSessionConfig{ + SessionID: sess.SessionID, + SourcePath: filePath, + FileName: sess.FileName, + Quality: sess.Quality, + AudioIndex: sess.AudioIndex, + Transcode: tcRuntime, + } + hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) + if err != nil { + playerSessionRegistry.remove(sess.SessionID) + hlsCancel() + log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err) return } - - sessCtx, sessCancel := context.WithCancel(ctx) //nolint:gosec // G118 cancel stored in registry - webrtcRegistry.add(sess.SessionID, sessCancel) - go func() { - defer func() { - webrtcRegistry.remove(sess.SessionID) - sessCancel() - }() - tcRuntime := buildTranscodeRuntime(ctx, cfg) - runCfg := engine.WebRTCStreamConfig{ - SessionID: sess.SessionID, - FilePath: filePath, - FileName: sess.FileName, - FileSize: sess.FileSize, - Quality: sess.Quality, - ICEServers: engine.BuildICEServers(cfg.Download.WebRTC), - Signal: agentClient, - Logger: stdLogger{}, - Transcode: tcRuntime, - } - log.Printf("[wrtc %s] starting session: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath)) - if err := engine.RunWebRTCStream(sessCtx, runCfg); err != nil { - if sessCtx.Err() == nil { - log.Printf("[wrtc %s] ended: %v", agent.ShortID(sess.SessionID), err) - } - } - }() + streamSrv.HLS().Register(hsess) } // Periodic DHT node persistence (every 5 min) @@ -658,7 +610,7 @@ func runDaemonStart() error { case sig := <-sigCh: fmt.Printf("\n Received %s, shutting down...\n", sig) cancelStreamContexts() - cancelAllWebRTCSessions() + cancelAllPlayerSessions() streamSrv.Shutdown(context.Background()) cancel() @@ -673,7 +625,7 @@ func runDaemonStart() error { case err := <-errCh: cancelStreamContexts() - cancelAllWebRTCSessions() + cancelAllPlayerSessions() streamSrv.Shutdown(context.Background()) cancel() return err diff --git a/internal/cmd/download.go b/internal/cmd/download.go index 5189166..bd5ceab 100644 --- a/internal/cmd/download.go +++ b/internal/cmd/download.go @@ -114,9 +114,6 @@ func runDownloadWithDeps(input, method string, deps downloadDeps) error { StallTimeout: 10 * time.Minute, MaxTimeout: 0, // unlimited SeedEnabled: false, - WebRTCEnabled: cfg.Download.WebRTC.Enabled, - WebRTCTrackers: cfg.Download.WebRTC.Trackers, - ICEServers: engine.BuildICEServers(cfg.Download.WebRTC), }) if err != nil { return fmt.Errorf("create downloader: %w", err) diff --git a/internal/cmd/webrtc_session_registry.go b/internal/cmd/player_session_registry.go similarity index 51% rename from internal/cmd/webrtc_session_registry.go rename to internal/cmd/player_session_registry.go index a1bf37a..bb3743b 100644 --- a/internal/cmd/webrtc_session_registry.go +++ b/internal/cmd/player_session_registry.go @@ -2,7 +2,6 @@ package cmd import ( "context" - "log" "sync" "github.com/torrentclaw/unarr/internal/config" @@ -10,66 +9,57 @@ import ( "github.com/torrentclaw/unarr/internal/library/mediainfo" ) -// webrtcRegistry tracks per-session cancel funcs for active custom WebRTC -// streams (engine.RunWebRTCStream goroutines). Each session lives only as -// long as its DataChannel; the registry exists so duplicate sync responses -// don't double-spawn the same session and so daemon shutdown can drain. -var webrtcRegistry = &webrtcSessionRegistry{ +// playerSessionRegistry tracks per-session cancel funcs for active in-browser +// HLS streaming sessions. Each session lives only as long as its ffmpeg +// process; the registry exists so duplicate sync responses don't double-spawn +// the same session and so daemon shutdown can drain. +var playerSessionRegistry = &playerSessionRegistryT{ cancels: make(map[string]context.CancelFunc), } -type webrtcSessionRegistry struct { +type playerSessionRegistryT struct { mu sync.Mutex cancels map[string]context.CancelFunc } -func (r *webrtcSessionRegistry) has(sessionID string) bool { +func (r *playerSessionRegistryT) has(sessionID string) bool { r.mu.Lock() defer r.mu.Unlock() _, ok := r.cancels[sessionID] return ok } -func (r *webrtcSessionRegistry) add(sessionID string, cancel context.CancelFunc) { +func (r *playerSessionRegistryT) add(sessionID string, cancel context.CancelFunc) { r.mu.Lock() defer r.mu.Unlock() r.cancels[sessionID] = cancel } -func (r *webrtcSessionRegistry) remove(sessionID string) { +func (r *playerSessionRegistryT) remove(sessionID string) { r.mu.Lock() defer r.mu.Unlock() delete(r.cancels, sessionID) } -// cancelAllWebRTCSessions cancels every running session. Called on daemon -// shutdown so pion peers and SSE consumers exit cleanly. -func cancelAllWebRTCSessions() { - webrtcRegistry.mu.Lock() - cancels := make([]context.CancelFunc, 0, len(webrtcRegistry.cancels)) - for _, c := range webrtcRegistry.cancels { +// cancelAllPlayerSessions cancels every running session. Called on daemon +// shutdown so the ffmpeg children and SSE consumers exit cleanly. +func cancelAllPlayerSessions() { + playerSessionRegistry.mu.Lock() + cancels := make([]context.CancelFunc, 0, len(playerSessionRegistry.cancels)) + for _, c := range playerSessionRegistry.cancels { cancels = append(cancels, c) } - webrtcRegistry.cancels = make(map[string]context.CancelFunc) - webrtcRegistry.mu.Unlock() + playerSessionRegistry.cancels = make(map[string]context.CancelFunc) + playerSessionRegistry.mu.Unlock() for _, c := range cancels { c() } } -// stdLogger is a tiny adapter so engine.RunWebRTCStream can log through the -// standard library logger without pulling in a logging dependency. -type stdLogger struct{} - -func (stdLogger) Infof(format string, args ...any) { log.Printf(format, args...) } -func (stdLogger) Warnf(format string, args ...any) { log.Printf("WARN: "+format, args...) } -func (stdLogger) Errorf(format string, args ...any) { log.Printf("ERROR: "+format, args...) } - // buildTranscodeRuntime resolves the ffmpeg/ffprobe binaries + config knobs -// for the WebRTC streaming pipeline. Failure to resolve a binary returns a -// runtime with empty paths so engine.RunWebRTCStream falls back to -// passthrough — the user gets a clearer codec error from the browser than a -// daemon-side abort. +// for the HLS streaming pipeline. Failure to resolve a binary returns a +// runtime with empty paths so the caller can short-circuit instead of +// launching a transcoder that will immediately fail. func buildTranscodeRuntime(ctx context.Context, cfg config.Config) engine.TranscodeRuntime { if !cfg.Download.Transcode.Enabled { return engine.TranscodeRuntime{Disabled: true} diff --git a/internal/cmd/probe_hwaccel.go b/internal/cmd/probe_hwaccel.go index f7ed1c1..609a443 100644 --- a/internal/cmd/probe_hwaccel.go +++ b/internal/cmd/probe_hwaccel.go @@ -15,7 +15,7 @@ import ( ) // newProbeHWAccelCmd reports the hardware-acceleration capabilities the daemon -// would actually use for HLS/WebRTC transcoding. The motivation: a beefy host +// would actually use for HLS transcoding. The motivation: a beefy host // (e.g. RTX 3090) can still fall back to software encoding when the installed // ffmpeg binary was built without nvenc/qsv/vaapi support — Homebrew ffmpeg // is a common offender. Without this command, users see slow / failing 4K diff --git a/internal/cmd/seed_file_handler.go b/internal/cmd/seed_file_handler.go deleted file mode 100644 index fe2438a..0000000 --- a/internal/cmd/seed_file_handler.go +++ /dev/null @@ -1,65 +0,0 @@ -package cmd - -import ( - "context" - "log" - "time" - - "github.com/torrentclaw/unarr/internal/agent" - "github.com/torrentclaw/unarr/internal/engine" -) - -// handleSeedFileTask wraps an arbitrary on-disk file as a single-file -// torrent and adds it to the existing torrent client so the WebRTC -// peer can serve pieces to a browser. Reports the generated info_hash -// back to the server so the web player can target /stream/. -// -// Runs in its own goroutine; never blocks the claim batch. -func handleSeedFileTask(t agent.Task, dl *engine.TorrentDownloader, client *agent.Client) { - short := agent.ShortID(t.ID) - - if t.FilePath == "" { - log.Printf("[%s] seed_file: missing filePath, marking failed", short) - reportSeedFileFailed(client, t.ID, "Missing filePath") - return - } - - log.Printf("[%s] seed_file: building torrent from %s", short, t.FilePath) - hash, err := engine.SeedFileOnDownloader(dl, t.FilePath) - if err != nil { - log.Printf("[%s] seed_file: %v", short, err) - reportSeedFileFailed(client, t.ID, err.Error()) - return - } - - infoHash := hash.HexString() - log.Printf("[%s] seed_file: seeding ih=%s", short, infoHash) - - // Push the info_hash + downloading status (file is on disk; from the - // client's perspective it's already complete). The web side polls - // /api/internal/stream/seed-file/ waiting for this update. - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - _, reportErr := client.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: t.ID, - Status: "downloading", // semantic: actively serving - InfoHash: infoHash, - FilePath: t.FilePath, - }) - if reportErr != nil { - log.Printf("[%s] seed_file: failed to push info_hash: %v", short, reportErr) - } -} - -func reportSeedFileFailed(client *agent.Client, taskID, msg string) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _, err := client.ReportStatus(ctx, agent.StatusUpdate{ - TaskID: taskID, - Status: "failed", - ErrorMessage: msg, - }) - if err != nil { - log.Printf("[%s] seed_file: report-failed itself failed: %v", agent.ShortID(taskID), err) - } -} diff --git a/internal/cmd/version.go b/internal/cmd/version.go index e03063b..4cd7a03 100644 --- a/internal/cmd/version.go +++ b/internal/cmd/version.go @@ -1,4 +1,4 @@ package cmd // Version is the CLI version. Overridden by goreleaser ldflags at release time. -var Version = "0.9.3" +var Version = "0.9.4" diff --git a/internal/config/config.go b/internal/config/config.go index 9f46b53..6e65df8 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -51,7 +51,6 @@ type DownloadConfig struct { StreamPort int `toml:"stream_port"` // fixed port for streaming HTTP server (default: 11818) EnableUPnP bool `toml:"enable_upnp"` // map StreamPort to the WAN via UPnP/NAT-PMP (default: false; opt-in because it exposes the unauthenticated /stream + /hls endpoints to the public internet) CORSExtraOrigins []string `toml:"cors_extra_origins"` // extra browser origins added on top of the baked-in allowlist (torrentclaw.com, app.torrentclaw.com, localhost:3030) - WebRTC WebRTCConfig `toml:"webrtc"` Transcode TranscodeConfig `toml:"transcode"` VPN VPNConfig `toml:"vpn"` } @@ -84,19 +83,6 @@ type TranscodeConfig struct { MaxConcurrent int `toml:"max_concurrent"` // safety cap on simultaneous transcoder processes } -// WebRTCConfig opts the daemon into acting as a WebTorrent peer so browsers -// can fetch pieces via WebRTC data channels — required by the in-browser -// player on torrentclaw.com. Disabled by default; enabling implies upload -// is allowed for active torrents (browsers can't download otherwise). -type WebRTCConfig struct { - Enabled bool `toml:"enabled"` // master switch - Trackers []string `toml:"trackers"` // wss:// signaling trackers - STUNServers []string `toml:"stun_servers"` // stun:host:port - TURNServers []string `toml:"turn_servers"` // turn:host:port (no auth) — see TURNCredentials for authed - TURNUser string `toml:"turn_user"` // optional, applied to all TURNServers - TURNPass string `toml:"turn_pass"` // optional -} - type OrganizeConfig struct { Enabled bool `toml:"enabled"` MoviesDir string `toml:"movies_dir"` @@ -121,7 +107,7 @@ type LibraryConfig struct { ScanPath string `toml:"scan_path"` // remembered from last scan Workers int `toml:"workers"` // concurrent ffprobe (default 8) FFprobePath string `toml:"ffprobe_path"` // optional explicit path - FFmpegPath string `toml:"ffmpeg_path"` // optional explicit path (used by WebRTC streaming transcoder) + FFmpegPath string `toml:"ffmpeg_path"` // optional explicit path (used by the HLS streaming transcoder) BackupDir string `toml:"backup_dir"` // for replaced files AutoScan bool `toml:"auto_scan"` // enable daily auto-scan in daemon (default true) ScanInterval string `toml:"scan_interval"` // e.g. "24h", "12h", "6h" (default "24h") @@ -146,11 +132,6 @@ func Default() Config { PreferredMethod: "auto", MaxConcurrent: 3, StreamPort: 11818, - WebRTC: WebRTCConfig{ - Enabled: true, - Trackers: []string{"wss://tracker.torrentclaw.com"}, - STUNServers: []string{"stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"}, - }, Transcode: TranscodeConfig{ Enabled: true, HWAccel: "auto", @@ -231,19 +212,6 @@ func applyDefaults(cfg *Config, meta toml.MetaData) { cfg.General.Country = "US" } - if !meta.IsDefined("downloads", "webrtc", "enabled") { - cfg.Download.WebRTC.Enabled = true - } - if !meta.IsDefined("downloads", "webrtc", "trackers") { - cfg.Download.WebRTC.Trackers = []string{"wss://tracker.torrentclaw.com"} - } - if !meta.IsDefined("downloads", "webrtc", "stun_servers") { - cfg.Download.WebRTC.STUNServers = []string{ - "stun:stun.l.google.com:19302", - "stun:stun1.l.google.com:19302", - } - } - if !meta.IsDefined("downloads", "transcode", "enabled") { cfg.Download.Transcode.Enabled = true } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 02fcdc4..8097395 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -208,17 +208,6 @@ name = "Test" t.Fatalf("Load failed: %v", err) } - // WebRTC should be on by default for fresh installs. - if !cfg.Download.WebRTC.Enabled { - t.Error("WebRTC.Enabled should default to true when [downloads.webrtc] is absent") - } - if len(cfg.Download.WebRTC.Trackers) == 0 { - t.Error("WebRTC.Trackers should default to torrentclaw tracker when absent") - } - if len(cfg.Download.WebRTC.STUNServers) == 0 { - t.Error("WebRTC.STUNServers should default to public STUN list when absent") - } - // Transcode should be on by default. if !cfg.Download.Transcode.Enabled { t.Error("Transcode.Enabled should default to true when [downloads.transcode] is absent") @@ -238,12 +227,9 @@ func TestLoadRespectsExplicitlyDisabledStreaming(t *testing.T) { tmp := t.TempDir() path := filepath.Join(tmp, "config.toml") - // User explicitly opted out of webrtc + transcode. Defaults must NOT - // override them — that would silently re-enable features the user disabled. - os.WriteFile(path, []byte(`[downloads.webrtc] -enabled = false - -[downloads.transcode] + // User explicitly opted out of transcode. Defaults must NOT override + // it — that would silently re-enable a feature the user disabled. + os.WriteFile(path, []byte(`[downloads.transcode] enabled = false `), 0o644) @@ -252,9 +238,6 @@ enabled = false t.Fatalf("Load failed: %v", err) } - if cfg.Download.WebRTC.Enabled { - t.Error("WebRTC.Enabled = true, want false (user explicitly disabled)") - } if cfg.Download.Transcode.Enabled { t.Error("Transcode.Enabled = true, want false (user explicitly disabled)") } diff --git a/internal/engine/hls.go b/internal/engine/hls.go index cc0b442..9524627 100644 --- a/internal/engine/hls.go +++ b/internal/engine/hls.go @@ -3,9 +3,7 @@ // Browser ↔ daemon over plain HTTP (LAN / Tailscale / UPnP). The daemon runs // ffmpeg in `-f hls` mode, writing fragmented MP4 segments to a per-session // tmpdir. Master + media playlists are pre-rendered from the probed source -// duration so the player knows the full timeline before any segment exists, -// which fixes the seek/duration/pause/multi-track problems we hit with the -// raw fMP4-over-WebRTC pipeline. +// duration so the player knows the full timeline before any segment exists. // // One HLSSession == one browser playback. Sessions are registered in a // process-wide map keyed by session ID; the StreamServer routes diff --git a/internal/engine/probe.go b/internal/engine/probe.go index 39ff374..930b669 100644 --- a/internal/engine/probe.go +++ b/internal/engine/probe.go @@ -9,7 +9,7 @@ import ( ) // StreamProbe summarises the codec / container shape of a file as it relates -// to the WebRTC streaming pipeline. It tells the transcoder whether bytes can +// to the HLS streaming pipeline. It tells the transcoder whether bytes can // be streamed as-is, just remuxed to fragmented MP4, or fully transcoded. type StreamProbe struct { // VideoCodec lowercased — e.g. "h264", "hevc", "av1", "vp9", "mpeg4". diff --git a/internal/engine/seed_file.go b/internal/engine/seed_file.go deleted file mode 100644 index 7d9a046..0000000 --- a/internal/engine/seed_file.go +++ /dev/null @@ -1,138 +0,0 @@ -package engine - -import ( - "errors" - "fmt" - "os" - "path/filepath" - "time" - - "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/bencode" - "github.com/anacrolix/torrent/metainfo" -) - -// SeedFile builds a single-file torrent from an arbitrary on-disk file -// and adds it to an existing torrent client so the WebRTC peer wire -// (already configured on the client) can serve the file to a browser -// that knows the resulting info-hash. -// -// Returns the generated info-hash. The torrent is left attached to the -// client — caller is responsible for keeping it alive while a browser -// is watching. Drop it via Client.RemoveTorrent / Torrent.Drop when -// idle to free resources. -// -// Behaviour notes: -// - The file must already exist; no download is attempted. -// - Piece length follows the libtorrent ladder (16 KiB → 4 MiB). -// - The torrent is "complete" from the agent's POV — it has every -// piece — so the upload-only flow kicks in immediately. -// - WebRTC peer behaviour comes from the client config the caller -// constructed; SeedFile does not toggle DisableWebtorrent itself. -// If the operator's [downloads.webrtc].enabled = false, the file -// is still added but no browser will discover it via WSS tracker. -func SeedFile(client *torrent.Client, filePath string, trackerURLs []string) (metainfo.Hash, error) { - if client == nil { - return metainfo.Hash{}, errors.New("seed_file: torrent client is nil") - } - if filePath == "" { - return metainfo.Hash{}, errors.New("seed_file: filePath is empty") - } - - abs, err := filepath.Abs(filePath) - if err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: resolve path: %w", err) - } - st, err := os.Stat(abs) - if err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: stat: %w", err) - } - if st.IsDir() { - return metainfo.Hash{}, fmt.Errorf("seed_file: only single files are supported, %s is a directory", abs) - } - - info := metainfo.Info{ - PieceLength: chooseSeedPieceLength(st.Size()), - Name: filepath.Base(abs), - } - if err := info.BuildFromFilePath(abs); err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: build info: %w", err) - } - infoBytes, err := bencode.Marshal(info) - if err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: marshal info: %w", err) - } - - mi := &metainfo.MetaInfo{ - InfoBytes: infoBytes, - AnnounceList: makeAnnounceList(trackerURLs), - CreatedBy: "unarr-seed-file", - CreationDate: time.Now().Unix(), - } - ih := mi.HashInfoBytes() - - t, err := client.AddTorrent(mi) - if err != nil { - return metainfo.Hash{}, fmt.Errorf("seed_file: add torrent: %w", err) - } - // Mark every piece as needed so the client treats us as a complete - // seeder right away — anacrolix's verifier will hash the file - // asynchronously and flip pieces to "have" as it goes. - t.DownloadAll() - - return ih, nil -} - -// makeAnnounceList shapes the tracker URL slice into the bencoded -// AnnounceList format anacrolix expects. -func makeAnnounceList(urls []string) metainfo.AnnounceList { - if len(urls) == 0 { - return nil - } - tier := make([]string, 0, len(urls)) - for _, u := range urls { - if u == "" { - continue - } - tier = append(tier, u) - } - if len(tier) == 0 { - return nil - } - return metainfo.AnnounceList{tier} -} - -// chooseSeedPieceLength picks the piece size for a single-file torrent -// based on the libtorrent / qBittorrent ladder. Mirrored from the -// wstracker-probe seeder so generated torrents are interoperable. -func chooseSeedPieceLength(size int64) int64 { - switch { - case size < 4*1024*1024: - return 16 * 1024 - case size < 64*1024*1024: - return 64 * 1024 - case size < 512*1024*1024: - return 256 * 1024 - case size < 4*1024*1024*1024: - return 1024 * 1024 - default: - return 4 * 1024 * 1024 - } -} - -// SeedFileOnDownloader is a convenience wrapper that pulls the -// underlying anacrolix client out of a TorrentDownloader and forwards -// to SeedFile. trackerURLs default to the downloader's WebRTC -// trackers when nil/empty. -func SeedFileOnDownloader(d *TorrentDownloader, filePath string) (metainfo.Hash, error) { - if d == nil { - return metainfo.Hash{}, errors.New("seed_file: downloader is nil") - } - trackers := d.cfg.WebRTCTrackers - if !d.cfg.WebRTCEnabled { - // We could still build the torrent, but no browser would find - // it via the WSS tracker — bail loud so the operator notices. - return metainfo.Hash{}, errors.New("seed_file: WebRTC peer disabled in config; set [downloads.webrtc].enabled = true to use this feature") - } - return SeedFile(d.client, filePath, trackers) -} diff --git a/internal/engine/seed_file_test.go b/internal/engine/seed_file_test.go deleted file mode 100644 index 1c0f616..0000000 --- a/internal/engine/seed_file_test.go +++ /dev/null @@ -1,164 +0,0 @@ -package engine - -import ( - "context" - "os" - "path/filepath" - "testing" -) - -// TestSeedFile_RejectsMissingFile — explicit error rather than crashing -// inside anacrolix when the path doesn't exist. -func TestSeedFile_RejectsMissingFile(t *testing.T) { - dir := t.TempDir() - dl, err := NewTorrentDownloader(TorrentConfig{ - DataDir: dir, - ListenPort: 0, - WebRTCEnabled: true, - WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, - }) - if err != nil { - t.Fatalf("NewTorrentDownloader: %v", err) - } - defer dl.Shutdown(context.Background()) - - if _, err := SeedFile(dl.client, "/nonexistent/path", nil); err == nil { - t.Fatal("expected error for missing file") - } -} - -// TestSeedFile_RejectsDirectory — single-file torrents only for now. -func TestSeedFile_RejectsDirectory(t *testing.T) { - dir := t.TempDir() - dl, err := NewTorrentDownloader(TorrentConfig{ - DataDir: dir, - ListenPort: 0, - WebRTCEnabled: true, - WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, - }) - if err != nil { - t.Fatalf("NewTorrentDownloader: %v", err) - } - defer dl.Shutdown(context.Background()) - - subDir := filepath.Join(dir, "sub") - if err := os.Mkdir(subDir, 0o755); err != nil { - t.Fatalf("mkdir: %v", err) - } - - if _, err := SeedFile(dl.client, subDir, nil); err == nil { - t.Fatal("expected error for directory path") - } -} - -// TestSeedFile_BuildsDeterministicInfoHash — the same file should yield -// the same info_hash on every call so the web client can poll for it. -func TestSeedFile_BuildsDeterministicInfoHash(t *testing.T) { - dir := t.TempDir() - file := filepath.Join(dir, "data.bin") - payload := []byte("hello world — torrentclaw seed_file test") - if err := os.WriteFile(file, payload, 0o644); err != nil { - t.Fatalf("write file: %v", err) - } - - mkClient := func() *TorrentDownloader { - dl, err := NewTorrentDownloader(TorrentConfig{ - DataDir: t.TempDir(), - ListenPort: 0, - WebRTCEnabled: true, - WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, - }) - if err != nil { - t.Fatalf("NewTorrentDownloader: %v", err) - } - return dl - } - - dl1 := mkClient() - defer dl1.Shutdown(context.Background()) - hash1, err := SeedFile(dl1.client, file, []string{"wss://tracker.torrentclaw.com"}) - if err != nil { - t.Fatalf("first SeedFile: %v", err) - } - - dl2 := mkClient() - defer dl2.Shutdown(context.Background()) - hash2, err := SeedFile(dl2.client, file, []string{"wss://tracker.torrentclaw.com"}) - if err != nil { - t.Fatalf("second SeedFile: %v", err) - } - - if hash1 != hash2 { - t.Fatalf("info_hash not deterministic: %s vs %s", hash1.HexString(), hash2.HexString()) - } - if hash1.HexString() == "" || len(hash1.HexString()) != 40 { - t.Fatalf("info_hash is not 40 hex chars: %q", hash1.HexString()) - } -} - -// TestSeedFileOnDownloader_RequiresWebRTC — silent failure mode is the -// worst UX; bail loud when the operator hasn't opted into WebRTC. -func TestSeedFileOnDownloader_RequiresWebRTC(t *testing.T) { - dir := t.TempDir() - dl, err := NewTorrentDownloader(TorrentConfig{ - DataDir: dir, - ListenPort: 0, - WebRTCEnabled: false, - }) - if err != nil { - t.Fatalf("NewTorrentDownloader: %v", err) - } - defer dl.Shutdown(context.Background()) - - file := filepath.Join(dir, "data.bin") - if err := os.WriteFile(file, []byte("x"), 0o644); err != nil { - t.Fatalf("write file: %v", err) - } - - if _, err := SeedFileOnDownloader(dl, file); err == nil { - t.Fatal("expected error when WebRTC disabled") - } -} - -// TestChooseSeedPieceLength_LadderShape — sanity-check the breakpoints -// stay aligned with the libtorrent reference (16 KiB → 4 MiB). -func TestChooseSeedPieceLength_LadderShape(t *testing.T) { - cases := []struct { - size int64 - expect int64 - }{ - {1, 16 * 1024}, - {4 * 1024 * 1024, 64 * 1024}, - {64 * 1024 * 1024, 256 * 1024}, - {512 * 1024 * 1024, 1024 * 1024}, - {4 * 1024 * 1024 * 1024, 4 * 1024 * 1024}, - } - for _, c := range cases { - if got := chooseSeedPieceLength(c.size); got != c.expect { - t.Errorf("chooseSeedPieceLength(%d) = %d want %d", c.size, got, c.expect) - } - } -} - -// TestMakeAnnounceList_HandlesEmpty — nil/empty in → nil out, so -// AddTorrent doesn't see a dangling tier with no URLs. -func TestMakeAnnounceList_HandlesEmpty(t *testing.T) { - if got := makeAnnounceList(nil); got != nil { - t.Errorf("nil input should yield nil announce list, got %+v", got) - } - if got := makeAnnounceList([]string{}); got != nil { - t.Errorf("empty input should yield nil announce list, got %+v", got) - } - if got := makeAnnounceList([]string{"", " ", ""}); got != nil { - // Empty strings should be filtered; if everything is empty, - // nil is the right answer. - // (We do NOT trim whitespace today — only literal "".) - if len(got) != 1 || len(got[0]) != 1 { - t.Errorf("expected 1 single-element tier, got %+v", got) - } - } - got := makeAnnounceList([]string{"wss://a", "", "wss://b"}) - if len(got) != 1 || len(got[0]) != 2 { - t.Fatalf("expected 1 tier of 2 URLs, got %+v", got) - } -} diff --git a/internal/engine/stream_source.go b/internal/engine/stream_source.go index 2dc1d3c..b418e61 100644 --- a/internal/engine/stream_source.go +++ b/internal/engine/stream_source.go @@ -12,7 +12,7 @@ import ( "time" ) -// streamSource abstracts the byte source served over the WebRTC DataChannel. +// streamSource abstracts the byte source consumed by the HLS transcoder. // Two implementations: // - diskFileSource — direct passthrough of the on-disk file. // - transcodeSource — ffmpeg writes a fragmented MP4 to a temp file in diff --git a/internal/engine/torrent.go b/internal/engine/torrent.go index 445f317..f4b1b6d 100644 --- a/internal/engine/torrent.go +++ b/internal/engine/torrent.go @@ -16,7 +16,6 @@ import ( alog "github.com/anacrolix/log" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/storage" - "github.com/pion/webrtc/v4" "github.com/torrentclaw/unarr/internal/config" "github.com/torrentclaw/unarr/internal/vpn" "golang.org/x/term" @@ -73,14 +72,6 @@ type TorrentConfig struct { SeedRatio float64 // target seed ratio (default 0, meaning seed until SeedTime) SeedTime time.Duration // min seed time after completion (default 0) - // WebRTC peer (WebTorrent protocol) for browser ↔ unarr P2P streaming. - // When enabled, anacrolix/torrent's built-in webtorrent package handles - // the WSS signaling + WebRTC data channels. Implies upload allowed for - // every torrent in the client (browsers can't pull pieces otherwise). - WebRTCEnabled bool - WebRTCTrackers []string // wss://… signaling trackers added to every magnet - ICEServers []webrtc.ICEServer // STUN + TURN servers for NAT traversal - // VPNTunnel, when set, split-tunnels the torrent client's peer + tracker // traffic through an in-process userspace WireGuard tunnel (managed-VPN // add-on). nil = downloads in the clear. Brought up by the daemon. @@ -111,26 +102,11 @@ func NewTorrentDownloader(cfg TorrentConfig) (*TorrentDownloader, error) { tcfg := torrent.NewDefaultClientConfig() tcfg.DataDir = cfg.DataDir tcfg.Seed = cfg.SeedEnabled - // WebRTC peers (browsers) can only pull pieces from us if upload is - // enabled. We honour SeedEnabled for the long-tail seed-after-complete - // behaviour but unconditionally allow upload while WebRTC is on so an - // active download can still serve to a watching browser. - tcfg.NoUpload = !cfg.SeedEnabled && !cfg.WebRTCEnabled - tcfg.Logger = alog.Default.FilterLevel(alog.Warning) // bumped from Critical for WebRTC peer + tracker announce visibility + tcfg.NoUpload = !cfg.SeedEnabled + tcfg.Logger = alog.Default.FilterLevel(alog.Warning) - // WebRTC / WebTorrent peer: anacrolix auto-routes ws://+wss:// trackers - // to the bundled webtorrent.TrackerClient. We only need to populate the - // ICE server list so the SDP offers we send carry usable candidates. - if cfg.WebRTCEnabled { - tcfg.DisableWebtorrent = false - if len(cfg.ICEServers) > 0 { - tcfg.ICEServerList = cfg.ICEServers - } - log.Printf("[torrent] WebRTC peer enabled (trackers=%d ice_servers=%d)", - len(cfg.WebRTCTrackers), len(cfg.ICEServers)) - } else { - tcfg.DisableWebtorrent = true - } + // No browser-facing WebTorrent peer; daemon never seeds via WSS. + tcfg.DisableWebtorrent = true // --- Performance optimizations --- @@ -657,30 +633,17 @@ func (d *TorrentDownloader) selectFiles(t *torrent.Torrent, taskID string) (tota return totalBytes, fileName } -// buildMagnet composes a magnet URI for the info hash. extraTrackers (e.g. -// wss://… for WebRTC peer signaling) are prepended so anacrolix's -// webtorrent.TrackerClient picks them up first; the static UDP list -// follows. Empty / whitespace entries in extraTrackers are skipped. -func buildMagnet(infoHash string, extraTrackers ...string) string { +// buildMagnet composes a magnet URI for the info hash with the static +// tracker list. +func buildMagnet(infoHash string) string { params := []string{"xt=urn:btih:" + infoHash} - for _, t := range extraTrackers { - t = strings.TrimSpace(t) - if t == "" { - continue - } - params = append(params, "tr="+url.QueryEscape(t)) - } for _, tracker := range defaultTrackers { params = append(params, "tr="+url.QueryEscape(tracker)) } return "magnet:?" + strings.Join(params, "&") } -// buildMagnet on the downloader injects its WebRTC trackers when enabled. func (d *TorrentDownloader) buildMagnet(infoHash string) string { - if d != nil && d.cfg.WebRTCEnabled { - return buildMagnet(infoHash, d.cfg.WebRTCTrackers...) - } return buildMagnet(infoHash) } diff --git a/internal/engine/transcode_quality.go b/internal/engine/transcode_quality.go new file mode 100644 index 0000000..4efda59 --- /dev/null +++ b/internal/engine/transcode_quality.go @@ -0,0 +1,64 @@ +package engine + +// TranscodeRuntime carries the resolved ffmpeg/ffprobe paths + tunables so +// each session can decide whether to passthrough or pipe through ffmpeg. +type TranscodeRuntime struct { + FFmpegPath string + FFprobePath string + HWAccel HWAccel + Preset string + VideoBitrate string + AudioBitrate string + MaxHeight int + // Disabled forces passthrough for every file even when codecs are not + // browser-friendly. Useful when the user explicitly turns transcoding + // off in config. + Disabled bool +} + +// qualityCap maps a session's Quality label to a (MaxHeight, VideoBitrate) +// pair. An empty label or "original" returns zero-values, signalling "no +// override" to the caller. +type qualityCap struct { + MaxHeight int + VideoBitrate string // ffmpeg -b:v string, e.g. "3500k" +} + +func resolveQualityCap(label string) qualityCap { + switch label { + case "2160p": + return qualityCap{MaxHeight: 2160, VideoBitrate: "25000k"} + case "1080p": + return qualityCap{MaxHeight: 1080, VideoBitrate: "6000k"} + case "720p": + return qualityCap{MaxHeight: 720, VideoBitrate: "3500k"} + case "480p": + return qualityCap{MaxHeight: 480, VideoBitrate: "1500k"} + default: + // "original", "auto", "" → defer to config. + return qualityCap{} + } +} + +// capForHeight returns the bitrate-cap pair appropriate for an effective +// output height. Used after clamping outputHeight to the source's resolution: +// asking ffmpeg for "2160p" bitrate (25 Mbps) on a 1080p source overshoots +// the H.264 level we derived from the EFFECTIVE height (4.0, max 20 Mbps) and +// makes libx264 refuse with "VBV bitrate > level limit". This helper picks +// the bitrate that matches the level libx264 will actually accept. +func capForHeight(height int) qualityCap { + switch { + case height <= 0: + return qualityCap{} + case height <= 480: + return qualityCap{MaxHeight: 480, VideoBitrate: "1500k"} + case height <= 720: + return qualityCap{MaxHeight: 720, VideoBitrate: "3500k"} + case height <= 1080: + return qualityCap{MaxHeight: 1080, VideoBitrate: "6000k"} + case height <= 1440: + return qualityCap{MaxHeight: 1440, VideoBitrate: "12000k"} + default: + return qualityCap{MaxHeight: 2160, VideoBitrate: "25000k"} + } +} diff --git a/internal/engine/transcoder.go b/internal/engine/transcoder.go index 9ea37cc..030c28c 100644 --- a/internal/engine/transcoder.go +++ b/internal/engine/transcoder.go @@ -11,10 +11,9 @@ import ( "time" ) -// TranscodeOpts steers how Transcoder builds its ffmpeg command line. Defaults -// match the project's plan/clever-weaving-dove.md (Fase 2.5): +// TranscodeOpts steers how Transcoder builds its ffmpeg command line. // -// - Output: fragmented MP4 readable by browser