diff --git a/internal/agent/types.go b/internal/agent/types.go index 16ba92a..eb88385 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -72,6 +72,12 @@ type Task struct { Episode *int `json:"episode,omitempty"` // Episode number ContentYear *int `json:"contentYear,omitempty"` // Year from TMDB (avoids regex on torrent title) CollectionName string `json:"collectionName,omitempty"` // Collection name (e.g., "Harry Potter Collection") + + // FilePath is the on-disk path of the file the agent is being asked + // to operate on. Currently used by mode=seed_file to know which + // arbitrary file to wrap as a single-file torrent for browser + // streaming; populated by the server from libraryItem.filePath. + FilePath string `json:"filePath,omitempty"` } // StreamRequest is a request to stream a completed download from disk. @@ -95,6 +101,9 @@ type StatusUpdate struct { StreamURL string `json:"streamUrl,omitempty"` StreamReady bool `json:"streamReady,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` + // mode=seed_file: agent computes the info_hash from the local file + // and reports it back so the web player can target /stream/. + InfoHash string `json:"infoHash,omitempty"` } // StatusResponse is returned by the status endpoint. diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 46059fd..9bdb714 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -243,7 +243,13 @@ func runDaemonStart() error { // Wire: sync receives new tasks → submit to manager or handle stream d.OnTasksClaimed = func(tasks []agent.Task) { for _, t := range tasks { - if t.Mode == "stream" { + if t.Mode == "seed_file" { + // Browser asked us to wrap an arbitrary on-disk file as + // a single-file torrent + seed it via WebRTC. Runs in + // its own goroutine so a slow / failing seed can't + // stall the rest of the claim batch. + go handleSeedFileTask(t, torrentDl, agentClient) + } else if t.Mode == "stream" { if isStreamingTask(t.ID) { continue } diff --git a/internal/cmd/seed_file_handler.go b/internal/cmd/seed_file_handler.go new file mode 100644 index 0000000..fe2438a --- /dev/null +++ b/internal/cmd/seed_file_handler.go @@ -0,0 +1,65 @@ +package cmd + +import ( + "context" + "log" + "time" + + "github.com/torrentclaw/unarr/internal/agent" + "github.com/torrentclaw/unarr/internal/engine" +) + +// handleSeedFileTask wraps an arbitrary on-disk file as a single-file +// torrent and adds it to the existing torrent client so the WebRTC +// peer can serve pieces to a browser. Reports the generated info_hash +// back to the server so the web player can target /stream/. +// +// Runs in its own goroutine; never blocks the claim batch. +func handleSeedFileTask(t agent.Task, dl *engine.TorrentDownloader, client *agent.Client) { + short := agent.ShortID(t.ID) + + if t.FilePath == "" { + log.Printf("[%s] seed_file: missing filePath, marking failed", short) + reportSeedFileFailed(client, t.ID, "Missing filePath") + return + } + + log.Printf("[%s] seed_file: building torrent from %s", short, t.FilePath) + hash, err := engine.SeedFileOnDownloader(dl, t.FilePath) + if err != nil { + log.Printf("[%s] seed_file: %v", short, err) + reportSeedFileFailed(client, t.ID, err.Error()) + return + } + + infoHash := hash.HexString() + log.Printf("[%s] seed_file: seeding ih=%s", short, infoHash) + + // Push the info_hash + downloading status (file is on disk; from the + // client's perspective it's already complete). The web side polls + // /api/internal/stream/seed-file/ waiting for this update. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, reportErr := client.ReportStatus(ctx, agent.StatusUpdate{ + TaskID: t.ID, + Status: "downloading", // semantic: actively serving + InfoHash: infoHash, + FilePath: t.FilePath, + }) + if reportErr != nil { + log.Printf("[%s] seed_file: failed to push info_hash: %v", short, reportErr) + } +} + +func reportSeedFileFailed(client *agent.Client, taskID, msg string) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + _, err := client.ReportStatus(ctx, agent.StatusUpdate{ + TaskID: taskID, + Status: "failed", + ErrorMessage: msg, + }) + if err != nil { + log.Printf("[%s] seed_file: report-failed itself failed: %v", agent.ShortID(taskID), err) + } +} diff --git a/internal/engine/seed_file.go b/internal/engine/seed_file.go new file mode 100644 index 0000000..7d9a046 --- /dev/null +++ b/internal/engine/seed_file.go @@ -0,0 +1,138 @@ +package engine + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/anacrolix/torrent" + "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/metainfo" +) + +// SeedFile builds a single-file torrent from an arbitrary on-disk file +// and adds it to an existing torrent client so the WebRTC peer wire +// (already configured on the client) can serve the file to a browser +// that knows the resulting info-hash. +// +// Returns the generated info-hash. The torrent is left attached to the +// client — caller is responsible for keeping it alive while a browser +// is watching. Drop it via Client.RemoveTorrent / Torrent.Drop when +// idle to free resources. +// +// Behaviour notes: +// - The file must already exist; no download is attempted. +// - Piece length follows the libtorrent ladder (16 KiB → 4 MiB). +// - The torrent is "complete" from the agent's POV — it has every +// piece — so the upload-only flow kicks in immediately. +// - WebRTC peer behaviour comes from the client config the caller +// constructed; SeedFile does not toggle DisableWebtorrent itself. +// If the operator's [downloads.webrtc].enabled = false, the file +// is still added but no browser will discover it via WSS tracker. +func SeedFile(client *torrent.Client, filePath string, trackerURLs []string) (metainfo.Hash, error) { + if client == nil { + return metainfo.Hash{}, errors.New("seed_file: torrent client is nil") + } + if filePath == "" { + return metainfo.Hash{}, errors.New("seed_file: filePath is empty") + } + + abs, err := filepath.Abs(filePath) + if err != nil { + return metainfo.Hash{}, fmt.Errorf("seed_file: resolve path: %w", err) + } + st, err := os.Stat(abs) + if err != nil { + return metainfo.Hash{}, fmt.Errorf("seed_file: stat: %w", err) + } + if st.IsDir() { + return metainfo.Hash{}, fmt.Errorf("seed_file: only single files are supported, %s is a directory", abs) + } + + info := metainfo.Info{ + PieceLength: chooseSeedPieceLength(st.Size()), + Name: filepath.Base(abs), + } + if err := info.BuildFromFilePath(abs); err != nil { + return metainfo.Hash{}, fmt.Errorf("seed_file: build info: %w", err) + } + infoBytes, err := bencode.Marshal(info) + if err != nil { + return metainfo.Hash{}, fmt.Errorf("seed_file: marshal info: %w", err) + } + + mi := &metainfo.MetaInfo{ + InfoBytes: infoBytes, + AnnounceList: makeAnnounceList(trackerURLs), + CreatedBy: "unarr-seed-file", + CreationDate: time.Now().Unix(), + } + ih := mi.HashInfoBytes() + + t, err := client.AddTorrent(mi) + if err != nil { + return metainfo.Hash{}, fmt.Errorf("seed_file: add torrent: %w", err) + } + // Mark every piece as needed so the client treats us as a complete + // seeder right away — anacrolix's verifier will hash the file + // asynchronously and flip pieces to "have" as it goes. + t.DownloadAll() + + return ih, nil +} + +// makeAnnounceList shapes the tracker URL slice into the bencoded +// AnnounceList format anacrolix expects. +func makeAnnounceList(urls []string) metainfo.AnnounceList { + if len(urls) == 0 { + return nil + } + tier := make([]string, 0, len(urls)) + for _, u := range urls { + if u == "" { + continue + } + tier = append(tier, u) + } + if len(tier) == 0 { + return nil + } + return metainfo.AnnounceList{tier} +} + +// chooseSeedPieceLength picks the piece size for a single-file torrent +// based on the libtorrent / qBittorrent ladder. Mirrored from the +// wstracker-probe seeder so generated torrents are interoperable. +func chooseSeedPieceLength(size int64) int64 { + switch { + case size < 4*1024*1024: + return 16 * 1024 + case size < 64*1024*1024: + return 64 * 1024 + case size < 512*1024*1024: + return 256 * 1024 + case size < 4*1024*1024*1024: + return 1024 * 1024 + default: + return 4 * 1024 * 1024 + } +} + +// SeedFileOnDownloader is a convenience wrapper that pulls the +// underlying anacrolix client out of a TorrentDownloader and forwards +// to SeedFile. trackerURLs default to the downloader's WebRTC +// trackers when nil/empty. +func SeedFileOnDownloader(d *TorrentDownloader, filePath string) (metainfo.Hash, error) { + if d == nil { + return metainfo.Hash{}, errors.New("seed_file: downloader is nil") + } + trackers := d.cfg.WebRTCTrackers + if !d.cfg.WebRTCEnabled { + // We could still build the torrent, but no browser would find + // it via the WSS tracker — bail loud so the operator notices. + return metainfo.Hash{}, errors.New("seed_file: WebRTC peer disabled in config; set [downloads.webrtc].enabled = true to use this feature") + } + return SeedFile(d.client, filePath, trackers) +} diff --git a/internal/engine/seed_file_test.go b/internal/engine/seed_file_test.go new file mode 100644 index 0000000..1c0f616 --- /dev/null +++ b/internal/engine/seed_file_test.go @@ -0,0 +1,164 @@ +package engine + +import ( + "context" + "os" + "path/filepath" + "testing" +) + +// TestSeedFile_RejectsMissingFile — explicit error rather than crashing +// inside anacrolix when the path doesn't exist. +func TestSeedFile_RejectsMissingFile(t *testing.T) { + dir := t.TempDir() + dl, err := NewTorrentDownloader(TorrentConfig{ + DataDir: dir, + ListenPort: 0, + WebRTCEnabled: true, + WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, + }) + if err != nil { + t.Fatalf("NewTorrentDownloader: %v", err) + } + defer dl.Shutdown(context.Background()) + + if _, err := SeedFile(dl.client, "/nonexistent/path", nil); err == nil { + t.Fatal("expected error for missing file") + } +} + +// TestSeedFile_RejectsDirectory — single-file torrents only for now. +func TestSeedFile_RejectsDirectory(t *testing.T) { + dir := t.TempDir() + dl, err := NewTorrentDownloader(TorrentConfig{ + DataDir: dir, + ListenPort: 0, + WebRTCEnabled: true, + WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, + }) + if err != nil { + t.Fatalf("NewTorrentDownloader: %v", err) + } + defer dl.Shutdown(context.Background()) + + subDir := filepath.Join(dir, "sub") + if err := os.Mkdir(subDir, 0o755); err != nil { + t.Fatalf("mkdir: %v", err) + } + + if _, err := SeedFile(dl.client, subDir, nil); err == nil { + t.Fatal("expected error for directory path") + } +} + +// TestSeedFile_BuildsDeterministicInfoHash — the same file should yield +// the same info_hash on every call so the web client can poll for it. +func TestSeedFile_BuildsDeterministicInfoHash(t *testing.T) { + dir := t.TempDir() + file := filepath.Join(dir, "data.bin") + payload := []byte("hello world — torrentclaw seed_file test") + if err := os.WriteFile(file, payload, 0o644); err != nil { + t.Fatalf("write file: %v", err) + } + + mkClient := func() *TorrentDownloader { + dl, err := NewTorrentDownloader(TorrentConfig{ + DataDir: t.TempDir(), + ListenPort: 0, + WebRTCEnabled: true, + WebRTCTrackers: []string{"wss://tracker.torrentclaw.com"}, + }) + if err != nil { + t.Fatalf("NewTorrentDownloader: %v", err) + } + return dl + } + + dl1 := mkClient() + defer dl1.Shutdown(context.Background()) + hash1, err := SeedFile(dl1.client, file, []string{"wss://tracker.torrentclaw.com"}) + if err != nil { + t.Fatalf("first SeedFile: %v", err) + } + + dl2 := mkClient() + defer dl2.Shutdown(context.Background()) + hash2, err := SeedFile(dl2.client, file, []string{"wss://tracker.torrentclaw.com"}) + if err != nil { + t.Fatalf("second SeedFile: %v", err) + } + + if hash1 != hash2 { + t.Fatalf("info_hash not deterministic: %s vs %s", hash1.HexString(), hash2.HexString()) + } + if hash1.HexString() == "" || len(hash1.HexString()) != 40 { + t.Fatalf("info_hash is not 40 hex chars: %q", hash1.HexString()) + } +} + +// TestSeedFileOnDownloader_RequiresWebRTC — silent failure mode is the +// worst UX; bail loud when the operator hasn't opted into WebRTC. +func TestSeedFileOnDownloader_RequiresWebRTC(t *testing.T) { + dir := t.TempDir() + dl, err := NewTorrentDownloader(TorrentConfig{ + DataDir: dir, + ListenPort: 0, + WebRTCEnabled: false, + }) + if err != nil { + t.Fatalf("NewTorrentDownloader: %v", err) + } + defer dl.Shutdown(context.Background()) + + file := filepath.Join(dir, "data.bin") + if err := os.WriteFile(file, []byte("x"), 0o644); err != nil { + t.Fatalf("write file: %v", err) + } + + if _, err := SeedFileOnDownloader(dl, file); err == nil { + t.Fatal("expected error when WebRTC disabled") + } +} + +// TestChooseSeedPieceLength_LadderShape — sanity-check the breakpoints +// stay aligned with the libtorrent reference (16 KiB → 4 MiB). +func TestChooseSeedPieceLength_LadderShape(t *testing.T) { + cases := []struct { + size int64 + expect int64 + }{ + {1, 16 * 1024}, + {4 * 1024 * 1024, 64 * 1024}, + {64 * 1024 * 1024, 256 * 1024}, + {512 * 1024 * 1024, 1024 * 1024}, + {4 * 1024 * 1024 * 1024, 4 * 1024 * 1024}, + } + for _, c := range cases { + if got := chooseSeedPieceLength(c.size); got != c.expect { + t.Errorf("chooseSeedPieceLength(%d) = %d want %d", c.size, got, c.expect) + } + } +} + +// TestMakeAnnounceList_HandlesEmpty — nil/empty in → nil out, so +// AddTorrent doesn't see a dangling tier with no URLs. +func TestMakeAnnounceList_HandlesEmpty(t *testing.T) { + if got := makeAnnounceList(nil); got != nil { + t.Errorf("nil input should yield nil announce list, got %+v", got) + } + if got := makeAnnounceList([]string{}); got != nil { + t.Errorf("empty input should yield nil announce list, got %+v", got) + } + if got := makeAnnounceList([]string{"", " ", ""}); got != nil { + // Empty strings should be filtered; if everything is empty, + // nil is the right answer. + // (We do NOT trim whitespace today — only literal "".) + if len(got) != 1 || len(got[0]) != 1 { + t.Errorf("expected 1 single-element tier, got %+v", got) + } + } + got := makeAnnounceList([]string{"wss://a", "", "wss://b"}) + if len(got) != 1 || len(got[0]) != 2 { + t.Fatalf("expected 1 tier of 2 URLs, got %+v", got) + } +}