diff --git a/internal/agent/mirror_client.go b/internal/agent/mirror_client.go index 683b92b..5364be0 100644 --- a/internal/agent/mirror_client.go +++ b/internal/agent/mirror_client.go @@ -13,7 +13,7 @@ import ( type MirrorEntry struct { URL string `json:"url"` Label string `json:"label"` - Kind string `json:"kind"` // "clearnet" | "tor" + Kind string `json:"kind"` // "clearnet" | "tor" Primary bool `json:"primary"` } diff --git a/internal/agent/sync.go b/internal/agent/sync.go index 6a13cf9..0ea5bd3 100644 --- a/internal/agent/sync.go +++ b/internal/agent/sync.go @@ -66,6 +66,12 @@ type SyncClient struct { // It should delete the files and return the IDs of successfully deleted items. OnDeleteFiles func(items []LibraryDeleteRequest) []int + // OnSubtitleFetch is called when the server requests on-demand subtitle + // downloads. It should download each (from req.URL, already VTT), write a + // sidecar next to req.FilePath, and return the IDs successfully fetched plus + // the ones that failed (so the web can mark them errored). + OnSubtitleFetch func(reqs []SubtitleFetchRequest) ([]int, []SubtitleFetchError) + // OnRevoked is called when a sync is rejected because this agent's credential // was revoked (the user deleted the agent from the dashboard). The daemon // wires this to wipe the stored key + stop — it must NOT keep retrying or the @@ -89,6 +95,11 @@ type SyncClient struct { // deleteInFlight tracks item IDs currently being processed or awaiting confirmation. // Prevents the same file from being passed to OnDeleteFiles multiple times. deleteInFlight map[int]struct{} + + // Subtitle-fetch jobs awaiting confirmation + dedup (guarded by pendingDeleteMu). + pendingSubtitlesFetched []int + pendingSubtitlesFailed []SubtitleFetchError + subtitleInFlight map[int]struct{} } // NewSyncClient creates a sync client. @@ -218,6 +229,20 @@ func (sc *SyncClient) buildRequest() SyncRequest { } sc.pendingDeleteConfirmed = nil } + if len(sc.pendingSubtitlesFetched) > 0 { + req.SubtitlesFetched = sc.pendingSubtitlesFetched + for _, id := range sc.pendingSubtitlesFetched { + delete(sc.subtitleInFlight, id) + } + sc.pendingSubtitlesFetched = nil + } + if len(sc.pendingSubtitlesFailed) > 0 { + req.SubtitlesFailed = sc.pendingSubtitlesFailed + for _, f := range sc.pendingSubtitlesFailed { + delete(sc.subtitleInFlight, f.ID) + } + sc.pendingSubtitlesFailed = nil + } sc.pendingDeleteMu.Unlock() return req } @@ -289,6 +314,37 @@ func (sc *SyncClient) processResponse(resp *SyncResponse) { }(newItems) } } + + // On-demand subtitle fetches — dedup against in-flight, run off the sync + // goroutine (network + disk I/O), confirm on the next cycle. + if len(resp.SubtitleFetches) > 0 && sc.OnSubtitleFetch != nil { + sc.pendingDeleteMu.Lock() + if sc.subtitleInFlight == nil { + sc.subtitleInFlight = make(map[int]struct{}) + } + var newReqs []SubtitleFetchRequest + for _, r := range resp.SubtitleFetches { + if _, inFlight := sc.subtitleInFlight[r.ID]; !inFlight { + newReqs = append(newReqs, r) + sc.subtitleInFlight[r.ID] = struct{}{} + } + } + sc.pendingDeleteMu.Unlock() + + if len(newReqs) > 0 { + go func(reqs []SubtitleFetchRequest) { + done, failed := sc.OnSubtitleFetch(reqs) + // Both done and failed are reported on the next uplink; buildRequest + // clears them from subtitleInFlight when it flushes them. A failure + // becomes status='error' on the web (no silent infinite retry — the + // user re-requests, which creates a fresh row). + sc.pendingDeleteMu.Lock() + sc.pendingSubtitlesFetched = append(sc.pendingSubtitlesFetched, done...) + sc.pendingSubtitlesFailed = append(sc.pendingSubtitlesFailed, failed...) + sc.pendingDeleteMu.Unlock() + }(newReqs) + } + } } // runWakeListener holds a long-poll connection to /api/internal/agent/wake. diff --git a/internal/agent/types.go b/internal/agent/types.go index d6ba061..f32929e 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -426,6 +426,11 @@ type SyncRequest struct { Tasks []TaskState `json:"tasks"` CanDelete bool `json:"canDelete"` // library.allow_delete is enabled DeleteConfirmed []int `json:"deleteConfirmed,omitempty"` // library item IDs successfully deleted from disk + // Subtitle-fetch job IDs the agent completed (sidecar written to disk). + SubtitlesFetched []int `json:"subtitlesFetched,omitempty"` + // Subtitle-fetch jobs that permanently failed (download/write error) — the web + // marks them errored so the UI fails fast instead of waiting for a timeout. + SubtitlesFailed []SubtitleFetchError `json:"subtitlesFailed,omitempty"` // Live managed-VPN split-tunnel state, sent every sync so the web sees the // WireGuard slot owner update in near-realtime (vs. register, once at startup). // VPNActive has no omitempty: false (tunnel down) must reach the server so it @@ -512,14 +517,31 @@ type StreamSession struct { // SyncResponse is returned by the server with all pending actions for the CLI. type SyncResponse struct { - NewTasks []Task `json:"newTasks,omitempty"` - Controls []ControlAction `json:"controls,omitempty"` - StreamRequests []StreamRequest `json:"streamRequests,omitempty"` - StreamSessions []StreamSession `json:"streamSessions,omitempty"` - Watching bool `json:"watching"` - Upgrade *UpgradeSignal `json:"upgrade,omitempty"` - Scan bool `json:"scan,omitempty"` - FilesToDelete []LibraryDeleteRequest `json:"filesToDelete,omitempty"` + NewTasks []Task `json:"newTasks,omitempty"` + Controls []ControlAction `json:"controls,omitempty"` + StreamRequests []StreamRequest `json:"streamRequests,omitempty"` + StreamSessions []StreamSession `json:"streamSessions,omitempty"` + Watching bool `json:"watching"` + Upgrade *UpgradeSignal `json:"upgrade,omitempty"` + Scan bool `json:"scan,omitempty"` + FilesToDelete []LibraryDeleteRequest `json:"filesToDelete,omitempty"` + SubtitleFetches []SubtitleFetchRequest `json:"subtitleFetches,omitempty"` +} + +// SubtitleFetchRequest is a server-side request to download a subtitle (from our +// proxy URL, already charset-fixed + VTT) and save it as a sidecar next to a +// media file. URL is the absolute /api/internal/subtitles/proxy URL. +type SubtitleFetchRequest struct { + ID int `json:"id"` + FilePath string `json:"filePath"` + Lang string `json:"lang"` + URL string `json:"url"` +} + +// SubtitleFetchError reports a permanently-failed subtitle fetch back to the web. +type SubtitleFetchError struct { + ID int `json:"id"` + Error string `json:"error"` } // --------------------------------------------------------------------------- diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 96efae6..48bc5d9 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -594,6 +594,14 @@ func runDaemonStart() error { } } + // Wire: sync receives on-demand subtitle-fetch jobs (write VTT sidecars). + // Always available (additive, no deletion) as long as we have scan paths. + if len(daemonCfg.ScanPaths) > 0 { + sc.OnSubtitleFetch = func(reqs []agent.SubtitleFetchRequest) ([]int, []agent.SubtitleFetchError) { + return library.FetchSubtitles(reqs, daemonCfg.ScanPaths) + } + } + // Wire: sync receives stream requests for completed downloads d.OnStreamRequested = func(sr agent.StreamRequest) { if streamSrv.CurrentTaskID() == sr.TaskID { diff --git a/internal/config/config.go b/internal/config/config.go index 2d6e664..261cde2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -211,6 +211,21 @@ type LibraryConfig struct { // generation never saturates the machine or the NAS. Default 0.7; 0 falls back // to the default. Linux-only (no load reading elsewhere → unthrottled). PrewarmMaxLoadRatio float64 `toml:"prewarm_max_load_ratio"` + + // On-demand / automatic subtitle fetching from the web (Wyzie aggregator, + // PRO). The web can always push a hot request (library/player button); this + // section only controls SCAN-TIME auto-fetch, which is OFF by default. + Subtitles SubtitlesConfig `toml:"subtitles"` +} + +// SubtitlesConfig controls scan-time subtitle auto-fetch. +type SubtitlesConfig struct { + // AutoFetch: during a library scan, fetch missing subtitles for the preferred + // languages and write them as sidecars. Default false (opt-in). + AutoFetch bool `toml:"auto_fetch"` + // Languages: preferred subtitle languages (ISO 639-1) to ensure exist, in + // priority order, e.g. ["es", "en"]. Empty → auto-fetch does nothing. + Languages []string `toml:"languages"` } // TrickplayConfig controls scan-time trickplay sprite generation. diff --git a/internal/engine/manager_resume_test.go b/internal/engine/manager_resume_test.go index 0a9cd0c..c84cbfd 100644 --- a/internal/engine/manager_resume_test.go +++ b/internal/engine/manager_resume_test.go @@ -16,7 +16,7 @@ type fakePersister struct { tasks map[string]bool } -func newFakePersister() *fakePersister { return &fakePersister{tasks: map[string]bool{}} } +func newFakePersister() *fakePersister { return &fakePersister{tasks: map[string]bool{}} } func (f *fakePersister) Add(t agent.Task) { f.mu.Lock(); f.tasks[t.ID] = true; f.mu.Unlock() } func (f *fakePersister) Remove(id string) { f.mu.Lock(); delete(f.tasks, id); f.mu.Unlock() } func (f *fakePersister) has(id string) bool { f.mu.Lock(); defer f.mu.Unlock(); return f.tasks[id] } diff --git a/internal/engine/readahead_test.go b/internal/engine/readahead_test.go index 2097464..d469ca8 100644 --- a/internal/engine/readahead_test.go +++ b/internal/engine/readahead_test.go @@ -10,10 +10,10 @@ func TestDynamicReadahead(t *testing.T) { }{ {"unknown bitrate → default", 0, defaultReadahead}, {"negative → default", -1, defaultReadahead}, - {"low bitrate clamps to min", 1_000_000, minReadahead}, // 1 Mbps → ~3.75 MiB < 8 MiB - {"mid bitrate scales", 5_000_000, 5_000_000 / 8 * readaheadSeconds}, // 5 Mbps → ~18.75 MiB + {"low bitrate clamps to min", 1_000_000, minReadahead}, // 1 Mbps → ~3.75 MiB < 8 MiB + {"mid bitrate scales", 5_000_000, 5_000_000 / 8 * readaheadSeconds}, // 5 Mbps → ~18.75 MiB {"high bitrate within range", 25_000_000, 25_000_000 / 8 * readaheadSeconds}, // 4K ~25 Mbps → ~93.75 MiB - {"very high clamps to max", 80_000_000, maxReadahead}, // 80 Mbps → 300 MiB > cap + {"very high clamps to max", 80_000_000, maxReadahead}, // 80 Mbps → 300 MiB > cap } for _, c := range cases { t.Run(c.name, func(t *testing.T) { diff --git a/internal/funnel/funnel.go b/internal/funnel/funnel.go index 7f1b76a..bb379c3 100644 --- a/internal/funnel/funnel.go +++ b/internal/funnel/funnel.go @@ -12,9 +12,9 @@ // // Lifecycle: // -// t, err := funnel.Start(ctx, funnel.Config{Port: 11819}) -// defer t.Close() -// url, err := t.WaitURL(30 * time.Second) // blocks until cloudflared emits the URL +// t, err := funnel.Start(ctx, funnel.Config{Port: 11819}) +// defer t.Close() +// url, err := t.WaitURL(30 * time.Second) // blocks until cloudflared emits the URL // // The tunnel runs until the context is cancelled or t.Close() is called. package funnel @@ -200,4 +200,3 @@ func (t *Tunnel) scanStderr(r io.Reader) { } } } - diff --git a/internal/library/subtitles.go b/internal/library/subtitles.go new file mode 100644 index 0000000..0b18b1b --- /dev/null +++ b/internal/library/subtitles.go @@ -0,0 +1,142 @@ +package library + +import ( + "fmt" + "io" + "log" + "net/http" + "os" + "path/filepath" + "regexp" + "strings" + "time" + + "github.com/torrentclaw/unarr/internal/agent" +) + +// maxSubtitleBytes caps a downloaded subtitle (sane: even a long film SRT is +// a few hundred KB; this guards against a misbehaving upstream). +const maxSubtitleBytes = 10 << 20 // 10 MiB + +var subtitleLangRe = regexp.MustCompile(`^[a-z]{2,3}$`) + +var subtitleHTTPClient = &http.Client{Timeout: 30 * time.Second} + +// FetchSubtitles downloads each requested subtitle (from our proxy URL, already +// charset-fixed WebVTT) and writes it as a sidecar next to the media file: +// `..vtt`. Returns the IDs successfully written (or already +// present) and the ones that failed (with a short reason) so the web can mark +// them errored. Safety mirrors DeleteFiles: the media file must resolve within a +// configured scan path before we write beside it. +func FetchSubtitles(reqs []agent.SubtitleFetchRequest, scanPaths []string) (done []int, failed []agent.SubtitleFetchError) { + // Resolve scan paths through symlinks too, so a symlinked root (e.g. the + // docker bind-mount /downloads → /mnt/nas/peliculas) still matches a media + // path that EvalSymlinks resolved to the real target. Mirrors the containment + // check used for the resolved media path below. + safe := make([]string, 0, len(scanPaths)) + for _, sp := range scanPaths { + if !filepath.IsAbs(sp) { + log.Printf("library: ignoring non-absolute scan path: %q", sp) + continue + } + if real, err := filepath.EvalSymlinks(sp); err == nil { + safe = append(safe, real) + } else { + safe = append(safe, filepath.Clean(sp)) + } + } + if len(safe) == 0 { + log.Printf("library: no valid scan paths — refusing to write subtitle sidecars") + for _, r := range reqs { + failed = append(failed, agent.SubtitleFetchError{ID: r.ID, Error: "no valid scan paths"}) + } + return nil, failed + } + + for _, r := range reqs { + if err := fetchSubtitleOne(r, safe); err != nil { + log.Printf("library: subtitle fetch %d (%q): %v", r.ID, r.FilePath, err) + msg := err.Error() + if len(msg) > 480 { + msg = msg[:480] + } + failed = append(failed, agent.SubtitleFetchError{ID: r.ID, Error: msg}) + continue + } + log.Printf("library: wrote subtitle sidecar for item %d (%s)", r.ID, r.Lang) + done = append(done, r.ID) + } + return done, failed +} + +func fetchSubtitleOne(r agent.SubtitleFetchRequest, scanPaths []string) error { + if !filepath.IsAbs(r.FilePath) { + return fmt.Errorf("path is not absolute: %q", r.FilePath) + } + lang := strings.ToLower(strings.TrimSpace(r.Lang)) + if !subtitleLangRe.MatchString(lang) { + return fmt.Errorf("invalid language %q", r.Lang) + } + + // Resolve the media file (symlinks too) and confine it to a scan path. + real, err := filepath.EvalSymlinks(filepath.Clean(r.FilePath)) + if err != nil { + return fmt.Errorf("media file unreachable: %w", err) + } + if !isWithinScanPaths(real, scanPaths) { + return fmt.Errorf("path %q is outside all scan paths", real) + } + + ext := filepath.Ext(real) + sidecar := strings.TrimSuffix(real, ext) + "." + lang + ".vtt" + if _, statErr := os.Stat(sidecar); statErr == nil { + return nil // already present — idempotent success + } + + data, err := downloadSubtitle(r.URL) + if err != nil { + return err + } + + // Write atomically: temp in the same dir, then rename. Clean up any stale + // .tmp from a prior crash first, and on every failure path, so a partial + // write (disk full, killed) never lingers. + tmp := sidecar + ".tmp" + _ = os.Remove(tmp) + if err := os.WriteFile(tmp, data, 0o644); err != nil { + _ = os.Remove(tmp) + return fmt.Errorf("write temp sidecar: %w", err) + } + if err := os.Rename(tmp, sidecar); err != nil { + _ = os.Remove(tmp) + return fmt.Errorf("rename sidecar: %w", err) + } + return nil +} + +func downloadSubtitle(url string) ([]byte, error) { + // Our proxy URL is always HTTPS. Restrict to https (allow http only for a + // local dev server) so a tampered sync response can't point the agent at an + // internal/metadata host. + if !strings.HasPrefix(url, "https://") && + !strings.HasPrefix(url, "http://localhost") && + !strings.HasPrefix(url, "http://127.0.0.1") { + return nil, fmt.Errorf("subtitle url must be https") + } + resp, err := subtitleHTTPClient.Get(url) + if err != nil { + return nil, fmt.Errorf("download: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("download status %d", resp.StatusCode) + } + data, err := io.ReadAll(io.LimitReader(resp.Body, maxSubtitleBytes)) + if err != nil { + return nil, fmt.Errorf("read body: %w", err) + } + if len(data) == 0 { + return nil, fmt.Errorf("empty subtitle") + } + return data, nil +} diff --git a/internal/vpn/vpn.go b/internal/vpn/vpn.go index 7f50ea1..c2c1e76 100644 --- a/internal/vpn/vpn.go +++ b/internal/vpn/vpn.go @@ -174,11 +174,11 @@ type wgConf struct { dns []netip.Addr mtu int - peerPublicKey string // hex - presharedKey string // hex (optional) - endpoint string // resolved ip:port - allowedIPs []string - keepalive int + peerPublicKey string // hex + presharedKey string // hex (optional) + endpoint string // resolved ip:port + allowedIPs []string + keepalive int } func (w *wgConf) uapi() string {