From 6adf1e2c4c20414f1de73acc51871d2149f9d274 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Tue, 5 May 2026 20:35:08 +0200 Subject: [PATCH] feat(mediaserver): Plex/Jellyfin/Emby auto-refresh + .strm instant mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sprint 1 — Auto-refresh after download: - New [[mediaserver]] TOML section with kind/url/token/sections - mediaserver.Refresh() fans out to Plex (partial via section ID auto-mapping from file path prefix) and Jellyfin/Emby (full library scan) - Manager.OnFinalized callback wired in daemon to trigger refresh after organize() completes — keeps engine package free of mediaserver dep - New unarr mediaserver {setup,list,remove,test} commands - unarr init wizard offers to configure refresh when a server is detected Sprint 2 — .strm instant mode (cloud + agent): - Mode strm-to-library handled in daemon dispatch: writes a one-line .strm file pointing to the cloud-resolved debrid HTTPS URL, then triggers refresh - engine.WriteStrm + StrmDestForTask mirror organize()'s naming so Plex/Jellyfin see the expected folder structure (Movies/Title (Year)/, TV Shows/Show/Season XX/) - Atomic write (temp + rename) so partial files never get indexed - Reports completed/failed status to the cloud via existing agent client --- internal/cmd/daemon.go | 25 +- internal/cmd/init.go | 28 +++ internal/cmd/mediaserver.go | 335 +++++++++++++++++++++++++++ internal/cmd/root.go | 4 + internal/cmd/strm_handler.go | 70 ++++++ internal/config/config.go | 18 +- internal/engine/manager.go | 9 + internal/engine/strm.go | 130 +++++++++++ internal/engine/task.go | 7 + internal/mediaserver/detect.go | 18 +- internal/mediaserver/detect_test.go | 8 +- internal/mediaserver/refresh.go | 280 ++++++++++++++++++++++ internal/mediaserver/refresh_test.go | 149 ++++++++++++ 13 files changed, 1065 insertions(+), 16 deletions(-) create mode 100644 internal/cmd/mediaserver.go create mode 100644 internal/cmd/strm_handler.go create mode 100644 internal/engine/strm.go create mode 100644 internal/mediaserver/refresh.go create mode 100644 internal/mediaserver/refresh_test.go diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index b8db356..a486bff 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -17,6 +17,7 @@ import ( "github.com/torrentclaw/unarr/internal/config" "github.com/torrentclaw/unarr/internal/engine" "github.com/torrentclaw/unarr/internal/library" + "github.com/torrentclaw/unarr/internal/mediaserver" "github.com/torrentclaw/unarr/internal/usenet/download" ) @@ -237,10 +238,28 @@ func runDaemonStart() error { // Trigger immediate sync when a download slot frees up manager.OnTaskDone = func() { d.TriggerSync() } + // Trigger Plex/Jellyfin/Emby library refresh after a task finalises so + // the new file appears in the user's library within seconds (instead + // of waiting for the next periodic scan). No-op if no servers + // configured. Errors are logged inside Refresh and never propagate. + if len(cfg.MediaServers) > 0 { + manager.OnFinalized = func(task *engine.Task) { + if task == nil { + return + } + fp := task.SafeFilePath() + if fp == "" { + return + } + mediaserver.Refresh(cfg.MediaServers, fp) + } + } + // Wire: sync receives new tasks → submit to manager or handle stream d.OnTasksClaimed = func(tasks []agent.Task) { for _, t := range tasks { - if t.Mode == "stream" { + switch t.Mode { + case "stream": if isStreamingTask(t.ID) { continue } @@ -251,7 +270,9 @@ func runDaemonStart() error { streamRegistry.cancels[t.ID] = streamCancel streamRegistry.mu.Unlock() go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv) - } else { + case "strm-to-library": + go handleStrmToLibrary(ctx, t, cfg, agentClient) + default: manager.Submit(ctx, t) } } diff --git a/internal/cmd/init.go b/internal/cmd/init.go index 9e7a8ca..668dc8b 100644 --- a/internal/cmd/init.go +++ b/internal/cmd/init.go @@ -296,6 +296,34 @@ func runInit(apiURLOverride string) error { } } + // ── Plex / Jellyfin / Emby refresh hook ──────────────────────── + // Offer to wire library refreshes if a media server was detected and + // none are configured yet. Skipping here is fine — the user can run + // `unarr mediaserver setup` later. + if len(detected.Servers) > 0 && len(cfg.MediaServers) == 0 { + fmt.Println() + var configureMS bool + err = huh.NewForm( + huh.NewGroup( + huh.NewConfirm(). + Title(fmt.Sprintf("Auto-refresh %s on every download?", detected.Servers[0].Name)). + Description("New downloads appear on Roku/Apple TV/etc. within seconds, instead of waiting for the next periodic library scan"). + Affirmative("Yes, configure"). + Negative("Skip (do it later with 'unarr mediaserver setup')"). + Value(&configureMS), + ), + ).Run() + if err == nil && configureMS { + fmt.Println() + if err := runMediaserverSetup(); err != nil { + color.New(color.FgYellow).Printf(" Media server setup failed: %s\n", err) + } else { + // runMediaserverSetup already saved + updated appCfg. + cfg = appCfg + } + } + } + // ── Debrid auto-detection from *arr ───────────────────────────── if resp.User.IsPro { diff --git a/internal/cmd/mediaserver.go b/internal/cmd/mediaserver.go new file mode 100644 index 0000000..a0a1a74 --- /dev/null +++ b/internal/cmd/mediaserver.go @@ -0,0 +1,335 @@ +package cmd + +import ( + "errors" + "fmt" + "strings" + + "github.com/charmbracelet/huh" + "github.com/fatih/color" + "github.com/spf13/cobra" + "github.com/torrentclaw/unarr/internal/config" + "github.com/torrentclaw/unarr/internal/mediaserver" +) + +func newMediaserverCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "mediaserver", + Aliases: []string{"plex", "jellyfin"}, + Short: "Configure Plex / Jellyfin / Emby auto-refresh", + Long: `Manage the list of media servers that unarr should refresh after a +download finishes. + +When configured, unarr triggers a partial library refresh on each server +right after a download is verified and organised, so the new file shows +up in your Plex / Jellyfin / Emby (and therefore on Roku, Apple TV, Fire +TV, etc.) within seconds instead of waiting for the next periodic scan.`, + } + + cmd.AddCommand( + newMediaserverSetupCmd(), + newMediaserverListCmd(), + newMediaserverRemoveCmd(), + newMediaserverTestCmd(), + ) + + return cmd +} + +func newMediaserverSetupCmd() *cobra.Command { + return &cobra.Command{ + Use: "setup", + Short: "Interactive wizard to add a Plex / Jellyfin / Emby server", + RunE: func(cmd *cobra.Command, args []string) error { + return runMediaserverSetup() + }, + } +} + +func newMediaserverListCmd() *cobra.Command { + return &cobra.Command{ + Use: "list", + Short: "List configured media servers", + RunE: func(cmd *cobra.Command, args []string) error { + cfg := loadConfig() + if len(cfg.MediaServers) == 0 { + fmt.Println("No media servers configured. Run 'unarr mediaserver setup' to add one.") + return nil + } + for i, s := range cfg.MediaServers { + fmt.Printf("%d. %s @ %s\n", i+1, strings.ToUpper(s.Kind), s.URL) + if len(s.Sections) > 0 { + fmt.Printf(" Sections: %v\n", s.Sections) + } + } + return nil + }, + } +} + +func newMediaserverRemoveCmd() *cobra.Command { + return &cobra.Command{ + Use: "remove", + Short: "Remove a configured media server", + RunE: func(cmd *cobra.Command, args []string) error { + cfg := loadConfig() + if len(cfg.MediaServers) == 0 { + fmt.Println("No media servers configured.") + return nil + } + + var options []huh.Option[int] + for i, s := range cfg.MediaServers { + label := fmt.Sprintf("%s @ %s", strings.ToUpper(s.Kind), s.URL) + options = append(options, huh.NewOption(label, i)) + } + + var idx int + err := huh.NewForm( + huh.NewGroup( + huh.NewSelect[int](). + Title("Which server to remove?"). + Options(options...). + Value(&idx), + ), + ).Run() + if err != nil { + if errors.Is(err, huh.ErrUserAborted) { + return nil + } + return err + } + + cfg.MediaServers = append(cfg.MediaServers[:idx], cfg.MediaServers[idx+1:]...) + if err := config.Save(cfg, cfgFile); err != nil { + return fmt.Errorf("save config: %w", err) + } + appCfg = cfg + color.New(color.FgGreen).Println(" ✓ Removed.") + return nil + }, + } +} + +func newMediaserverTestCmd() *cobra.Command { + return &cobra.Command{ + Use: "test", + Short: "Trigger a refresh on each configured server (sanity check)", + RunE: func(cmd *cobra.Command, args []string) error { + cfg := loadConfig() + if len(cfg.MediaServers) == 0 { + fmt.Println("No media servers configured.") + return nil + } + for _, s := range cfg.MediaServers { + fmt.Printf("Refreshing %s @ %s ... ", s.Kind, s.URL) + mediaserver.Refresh([]mediaserver.ServerConfig{s}, "") + } + // Refresh fans out goroutines; give them time to log results. + fmt.Println("dispatched (errors, if any, are logged).") + return nil + }, + } +} + +// runMediaserverSetup walks the user through adding a single media server. +// Auto-detects local Plex/Jellyfin/Emby via port scan and prefills as much +// as possible. +func runMediaserverSetup() error { + if !isTerminal() { + return fmt.Errorf("interactive mode requires a terminal") + } + + bold := color.New(color.Bold) + cyan := color.New(color.FgCyan) + dim := color.New(color.FgHiBlack) + green := color.New(color.FgGreen) + + cfg := loadConfig() + + fmt.Println() + bold.Println(" Add a media server") + fmt.Println() + dim.Println(" unarr will hit the server's refresh API after each download,") + dim.Println(" so new files appear in your library within seconds.") + fmt.Println() + + detected := mediaserver.Detect() + + // Pick kind + var kind string + kindOpts := []huh.Option[string]{ + huh.NewOption("Plex", "plex"), + huh.NewOption("Jellyfin", "jellyfin"), + huh.NewOption("Emby", "emby"), + } + // Default selection: first detected server's kind, lower-cased. + if len(detected.Servers) > 0 { + kind = strings.ToLower(detected.Servers[0].Name) + } else { + kind = "plex" + } + + if err := huh.NewForm(huh.NewGroup( + huh.NewSelect[string](). + Title("Server type"). + Options(kindOpts...). + Value(&kind), + )).Run(); err != nil { + if errors.Is(err, huh.ErrUserAborted) { + return nil + } + return err + } + + // Prefill URL from detection if available + url := "" + for _, s := range detected.Servers { + if strings.EqualFold(s.Name, kind) { + url = s.URL + break + } + } + if url == "" { + url = defaultURLFor(kind) + } + + if err := huh.NewForm(huh.NewGroup( + huh.NewInput(). + Title("Server URL"). + Description("Reachable from this machine — e.g. http://localhost:32400"). + Value(&url). + Validate(func(s string) error { + s = strings.TrimSpace(s) + if s == "" { + return fmt.Errorf("URL is required") + } + if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") { + return fmt.Errorf("must start with http:// or https://") + } + return nil + }), + )).Run(); err != nil { + if errors.Is(err, huh.ErrUserAborted) { + return nil + } + return err + } + url = strings.TrimRight(strings.TrimSpace(url), "/") + + // Token entry + token := "" + if kind == "plex" { + // Try local Preferences.xml first (works when Plex runs on same host). + if t := mediaserver.LocalPlexToken(); t != "" { + cyan.Println(" ✓ Found Plex token in local Preferences.xml") + fmt.Println() + useLocal := true + _ = huh.NewForm(huh.NewGroup( + huh.NewConfirm(). + Title("Use the auto-detected token?"). + Affirmative("Yes"). + Negative("No, paste a different one"). + Value(&useLocal), + )).Run() + if useLocal { + token = t + } + } + } + + if token == "" { + title := "API key" + desc := "" + switch kind { + case "plex": + title = "Plex token" + desc = "Get it via Plex web UI → any item → ⋯ → Get Info → View XML → copy ?X-Plex-Token=... from URL" + case "jellyfin": + title = "Jellyfin API key" + desc = "Dashboard → Advanced → API Keys → New API Key" + case "emby": + title = "Emby API key" + desc = "Server Dashboard → API Keys → New Application" + } + if err := huh.NewForm(huh.NewGroup( + huh.NewInput(). + Title(title). + Description(desc). + Value(&token). + Validate(func(s string) error { + if strings.TrimSpace(s) == "" { + return fmt.Errorf("token is required") + } + return nil + }), + )).Run(); err != nil { + if errors.Is(err, huh.ErrUserAborted) { + return nil + } + return err + } + } + token = strings.TrimSpace(token) + + // Save + newServer := mediaserver.ServerConfig{ + Kind: kind, + URL: url, + Token: token, + } + + // Replace if same kind+URL already present, else append + replaced := false + for i, existing := range cfg.MediaServers { + if strings.EqualFold(existing.Kind, kind) && existing.URL == url { + cfg.MediaServers[i] = newServer + replaced = true + break + } + } + if !replaced { + cfg.MediaServers = append(cfg.MediaServers, newServer) + } + + if err := config.Save(cfg, cfgFile); err != nil { + return fmt.Errorf("save config: %w", err) + } + appCfg = cfg + + fmt.Println() + if replaced { + green.Printf(" ✓ Updated %s @ %s\n", strings.ToUpper(kind), url) + } else { + green.Printf(" ✓ Added %s @ %s\n", strings.ToUpper(kind), url) + } + fmt.Println() + + // Sanity test + doTest := true + _ = huh.NewForm(huh.NewGroup( + huh.NewConfirm(). + Title("Trigger a test refresh now?"). + Affirmative("Yes"). + Negative("Skip"). + Value(&doTest), + )).Run() + if doTest { + mediaserver.Refresh([]mediaserver.ServerConfig{newServer}, "") + fmt.Println(" Refresh dispatched. If it failed, the error is in the logs.") + } + + return nil +} + +func defaultURLFor(kind string) string { + switch strings.ToLower(kind) { + case "plex": + return "http://localhost:32400" + case "jellyfin": + return "http://localhost:8096" + case "emby": + return "http://localhost:8920" + } + return "" +} diff --git a/internal/cmd/root.go b/internal/cmd/root.go index b9b3d65..39f5e92 100644 --- a/internal/cmd/root.go +++ b/internal/cmd/root.go @@ -117,6 +117,9 @@ Source: https://github.com/torrentclaw/unarr`, scanCmd := newScanCmd() scanCmd.GroupID = "search" + mediaserverCmd := newMediaserverCmd() + mediaserverCmd.GroupID = "start" + rootCmd.AddCommand( // Getting Started initCmd, @@ -146,6 +149,7 @@ Source: https://github.com/torrentclaw/unarr`, completionCmd, // Library scanCmd, + mediaserverCmd, // Alias: upgrade → self-update newUpgradeCmd(), ) diff --git a/internal/cmd/strm_handler.go b/internal/cmd/strm_handler.go new file mode 100644 index 0000000..926d703 --- /dev/null +++ b/internal/cmd/strm_handler.go @@ -0,0 +1,70 @@ +package cmd + +import ( + "context" + "log" + + "github.com/torrentclaw/unarr/internal/agent" + "github.com/torrentclaw/unarr/internal/config" + "github.com/torrentclaw/unarr/internal/engine" + "github.com/torrentclaw/unarr/internal/mediaserver" +) + +// handleStrmToLibrary processes a Mode="strm-to-library" task by writing a +// one-line .strm file to the user's media library and triggering a +// Plex/Jellyfin/Emby refresh. No actual download happens; the .strm points +// at the cloud-resolved debrid HTTPS URL, and the media server streams from +// there when the user presses play. +// +// Reports completion (or failure) back to the cloud via the agent client. +func handleStrmToLibrary(ctx context.Context, t agent.Task, cfg config.Config, agentClient *agent.Client) { + short := agent.ShortID(t.ID) + + if t.DirectURL == "" { + log.Printf("[%s] strm-to-library: missing directUrl from server", short) + reportStrmFailure(ctx, agentClient, t.ID, "missing directUrl") + return + } + + organizeCfg := engine.OrganizeConfig{ + Enabled: cfg.Organize.Enabled, + MoviesDir: cfg.Organize.MoviesDir, + TVShowsDir: cfg.Organize.TVShowsDir, + OutputDir: cfg.Download.Dir, + } + + finalPath, err := engine.WriteStrm(t, organizeCfg) + if err != nil { + log.Printf("[%s] strm-to-library write failed: %v", short, err) + reportStrmFailure(ctx, agentClient, t.ID, err.Error()) + return + } + + log.Printf("[%s] strm-to-library wrote %s", short, finalPath) + + // Trigger media-server refresh if any are configured. Errors are logged + // inside Refresh and never propagate — the .strm is on disk, so the + // next periodic scan would pick it up regardless. + if len(cfg.MediaServers) > 0 { + mediaserver.Refresh(cfg.MediaServers, finalPath) + } + + if _, reportErr := agentClient.ReportStatus(ctx, agent.StatusUpdate{ + TaskID: t.ID, + Status: "completed", + Progress: 100, + FilePath: finalPath, + }); reportErr != nil { + log.Printf("[%s] strm-to-library: status report failed: %v", short, reportErr) + } +} + +func reportStrmFailure(ctx context.Context, agentClient *agent.Client, taskID, msg string) { + if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{ + TaskID: taskID, + Status: "failed", + ErrorMessage: msg, + }); err != nil { + log.Printf("[%s] strm failure report failed: %v", agent.ShortID(taskID), err) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 5c593d5..038e1e2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,18 +9,20 @@ import ( "strings" "github.com/BurntSushi/toml" + "github.com/torrentclaw/unarr/internal/mediaserver" ) // Config holds all persistent CLI configuration. type Config struct { - Auth AuthConfig `toml:"auth"` - Agent AgentConfig `toml:"agent"` - Download DownloadConfig `toml:"downloads"` - Organize OrganizeConfig `toml:"organize"` - Daemon DaemonConfig `toml:"daemon"` - Notifications NotificationsConfig `toml:"notifications"` - General GeneralConfig `toml:"general"` - Library LibraryConfig `toml:"library"` + Auth AuthConfig `toml:"auth"` + Agent AgentConfig `toml:"agent"` + Download DownloadConfig `toml:"downloads"` + Organize OrganizeConfig `toml:"organize"` + Daemon DaemonConfig `toml:"daemon"` + Notifications NotificationsConfig `toml:"notifications"` + General GeneralConfig `toml:"general"` + Library LibraryConfig `toml:"library"` + MediaServers []mediaserver.ServerConfig `toml:"mediaserver"` } type AuthConfig struct { diff --git a/internal/engine/manager.go b/internal/engine/manager.go index 2a07b6f..83e4b08 100644 --- a/internal/engine/manager.go +++ b/internal/engine/manager.go @@ -33,6 +33,12 @@ type Manager struct { // Used by the daemon to trigger an immediate sync. OnTaskDone func() + // OnFinalized is called after a task successfully transitions to + // completed (after verify + organize + optional upgrade replace). + // Used by the daemon to trigger media-server library refreshes. + // Not invoked on failure. + OnFinalized func(task *Task) + // recentlyFinished holds tasks that completed/failed since the last sync read. // The sync goroutine reads and clears this to include final states in the next sync. recentMu sync.Mutex @@ -444,6 +450,9 @@ func (m *Manager) finalize(ctx context.Context, task *Task, result *Result) { if m.cfg.Notifications { desktopNotify("Download complete", task.Title) } + if m.OnFinalized != nil { + m.OnFinalized(task) + } m.recordFinished(task.ToStatusUpdate()) m.reporter.ReportFinal(ctx, task) } diff --git a/internal/engine/strm.go b/internal/engine/strm.go new file mode 100644 index 0000000..0961fd9 --- /dev/null +++ b/internal/engine/strm.go @@ -0,0 +1,130 @@ +package engine + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/torrentclaw/unarr/internal/agent" +) + +// StrmDest holds the resolved location for a .strm file to be written. +type StrmDest struct { + Dir string // directory the .strm file lives in (created if missing) + FileName string // filename including the .strm extension + FullPath string // Dir + FileName, joined +} + +// StrmDestForTask computes where a .strm file should land for the given +// agent task, mirroring organize()'s naming so Plex/Jellyfin sees the same +// folder structure as a real download would have produced. +// +// Returns an error if cfg lacks the relevant library directory. +func StrmDestForTask(task agent.Task, cfg OrganizeConfig) (StrmDest, error) { + switch { + case task.ContentType == "show": + if cfg.TVShowsDir == "" { + return StrmDest{}, fmt.Errorf("strm: TVShowsDir not configured") + } + showName := task.ContentTitle + if showName == "" { + showName = cleanTitle(task.Title) + } + dir := filepath.Join(cfg.TVShowsDir, sanitizePath(showName)) + var fileName string + if task.Season != nil { + dir = filepath.Join(dir, fmt.Sprintf("Season %02d", *task.Season)) + if task.Episode != nil { + fileName = fmt.Sprintf("%s - S%02dE%02d.strm", + sanitizePath(showName), *task.Season, *task.Episode) + } + } + if fileName == "" { + // Missing season/episode metadata — fall back to a sanitised title. + fileName = sanitizePath(showName) + ".strm" + } + return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil + + case task.CollectionName != "" && cfg.MoviesDir != "": + movieName := task.ContentTitle + if movieName == "" { + movieName = cleanTitle(task.Title) + } + year := strYear(task) + base := sanitizePath(movieName) + if year != "" { + base = fmt.Sprintf("%s (%s)", base, year) + } + dir := filepath.Join(cfg.MoviesDir, sanitizePath(task.CollectionName), base) + fileName := base + ".strm" + return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil + + case task.ContentType == "movie": + if cfg.MoviesDir == "" { + return StrmDest{}, fmt.Errorf("strm: MoviesDir not configured") + } + movieName := task.ContentTitle + if movieName == "" { + movieName = cleanTitle(task.Title) + } + year := strYear(task) + base := sanitizePath(movieName) + if year != "" { + base = fmt.Sprintf("%s (%s)", base, year) + } + dir := filepath.Join(cfg.MoviesDir, base) + fileName := base + ".strm" + return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil + + default: + // No metadata at all — drop into MoviesDir under a sanitised title so + // at least the file lands somewhere a library scan might find it. + if cfg.MoviesDir == "" { + return StrmDest{}, fmt.Errorf("strm: no library dir configured for content without metadata") + } + base := sanitizePath(cleanTitle(task.Title)) + if base == "" { + base = "Unknown" + } + dir := filepath.Join(cfg.MoviesDir, base) + fileName := base + ".strm" + return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil + } +} + +// WriteStrm writes a .strm file containing the given URL at the destination +// computed from the task. Creates parent dirs as needed. Atomic write +// (temp + rename) so a partial file never gets indexed. +func WriteStrm(task agent.Task, cfg OrganizeConfig) (string, error) { + if task.DirectURL == "" { + return "", fmt.Errorf("strm: task has no directUrl") + } + + dest, err := StrmDestForTask(task, cfg) + if err != nil { + return "", err + } + + if err := os.MkdirAll(dest.Dir, 0o755); err != nil { + return "", fmt.Errorf("strm: create dir: %w", err) + } + + tmp := dest.FullPath + ".tmp" + if err := os.WriteFile(tmp, []byte(strings.TrimSpace(task.DirectURL)+"\n"), 0o644); err != nil { + return "", fmt.Errorf("strm: write temp: %w", err) + } + if err := os.Rename(tmp, dest.FullPath); err != nil { + _ = os.Remove(tmp) + return "", fmt.Errorf("strm: rename: %w", err) + } + return dest.FullPath, nil +} + +// strYear is the agent.Task counterpart to organize.go's resolveYear. +func strYear(task agent.Task) string { + if task.ContentYear != nil && *task.ContentYear > 0 { + return fmt.Sprintf("%d", *task.ContentYear) + } + return yearRegex.FindString(task.Title) +} diff --git a/internal/engine/task.go b/internal/engine/task.go index ceba6c9..36d3249 100644 --- a/internal/engine/task.go +++ b/internal/engine/task.go @@ -155,6 +155,13 @@ func (t *Task) GetStreamURL() string { return t.StreamURL } +// SafeFilePath returns the task's final file path thread-safely. +func (t *Task) SafeFilePath() string { + t.mu.RLock() + defer t.mu.RUnlock() + return t.FilePath +} + // UpdateProgress updates download metrics thread-safely. func (t *Task) UpdateProgress(p Progress) { t.mu.Lock() diff --git a/internal/mediaserver/detect.go b/internal/mediaserver/detect.go index e0b3030..039c363 100644 --- a/internal/mediaserver/detect.go +++ b/internal/mediaserver/detect.go @@ -87,6 +87,17 @@ func Detect() DetectedPaths { // ── Plex ──────────────────────────────────────────────────────────── +// LocalPlexToken returns the Plex auth token from the local Plex config +// directory, if Plex Media Server is installed on this host. Returns "" +// when Plex isn't installed or the token can't be read. +func LocalPlexToken() string { + dir := plexConfigDir() + if dir == "" { + return "" + } + return PlexTokenFromPrefs(filepath.Join(dir, "Preferences.xml")) +} + func plexLibraryPaths() []string { configDir := plexConfigDir() if configDir == "" { @@ -95,7 +106,7 @@ func plexLibraryPaths() []string { // Read token from Preferences.xml prefsPath := filepath.Join(configDir, "Preferences.xml") - token := plexTokenFromPrefs(prefsPath) + token := PlexTokenFromPrefs(prefsPath) if token == "" { return nil } @@ -154,7 +165,10 @@ type plexPrefs struct { PlexOnlineToken string `xml:"PlexOnlineToken,attr"` } -func plexTokenFromPrefs(path string) string { +// PlexTokenFromPrefs reads the Plex auth token from a Preferences.xml file. +// Returns "" if the file can't be read or parsed. Used by the setup wizard +// when configuring a Plex server running on the same host as unarr. +func PlexTokenFromPrefs(path string) string { data, err := os.ReadFile(path) if err != nil { return "" diff --git a/internal/mediaserver/detect_test.go b/internal/mediaserver/detect_test.go index 19ba53c..9887093 100644 --- a/internal/mediaserver/detect_test.go +++ b/internal/mediaserver/detect_test.go @@ -79,7 +79,7 @@ func TestPlexTokenFromPrefs(t *testing.T) { ` os.WriteFile(prefsPath, []byte(xml), 0o644) - token := plexTokenFromPrefs(prefsPath) + token := PlexTokenFromPrefs(prefsPath) if token != "my-secret-token" { t.Errorf("token = %q, want my-secret-token", token) } @@ -91,14 +91,14 @@ func TestPlexTokenFromPrefs(t *testing.T) { xml := `` os.WriteFile(prefsPath, []byte(xml), 0o644) - token := plexTokenFromPrefs(prefsPath) + token := PlexTokenFromPrefs(prefsPath) if token != "" { t.Errorf("token = %q, want empty", token) } }) t.Run("file not found", func(t *testing.T) { - token := plexTokenFromPrefs("/nonexistent/Preferences.xml") + token := PlexTokenFromPrefs("/nonexistent/Preferences.xml") if token != "" { t.Errorf("token = %q, want empty", token) } @@ -109,7 +109,7 @@ func TestPlexTokenFromPrefs(t *testing.T) { prefsPath := filepath.Join(dir, "Preferences.xml") os.WriteFile(prefsPath, []byte("not xml at all"), 0o644) - token := plexTokenFromPrefs(prefsPath) + token := PlexTokenFromPrefs(prefsPath) if token != "" { t.Errorf("token = %q, want empty", token) } diff --git a/internal/mediaserver/refresh.go b/internal/mediaserver/refresh.go new file mode 100644 index 0000000..74c8637 --- /dev/null +++ b/internal/mediaserver/refresh.go @@ -0,0 +1,280 @@ +package mediaserver + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strings" + "sync" + "time" +) + +// ServerConfig describes a media server that should be refreshed after a +// download (or .strm write) finishes. Stored in unarr's config.toml under +// [[mediaserver]]. +type ServerConfig struct { + Kind string `toml:"kind"` // "plex" | "jellyfin" | "emby" + URL string `toml:"url"` // e.g. http://localhost:32400 + Token string `toml:"token"` // Plex token / Jellyfin or Emby API key + Sections []int `toml:"sections"` // optional: Plex section IDs to refresh (else auto) +} + +// Section describes a Plex library section. +type Section struct { + ID int + Title string + Locations []string +} + +// httpClient is the shared HTTP client for refresh calls. Short timeouts +// because a refresh trigger is fire-and-forget. +var httpClient = &http.Client{Timeout: 8 * time.Second} + +// plexSectionCache caches section lookups per server URL+token, so we don't +// re-fetch sections on every download. +var ( + plexSectionMu sync.RWMutex + plexSectionCache = map[string][]Section{} +) + +// Refresh fans out a refresh call to every configured media server. Errors +// are logged but never returned — a failed refresh is non-fatal because the +// download itself succeeded and the next periodic scan will pick it up. +// +// `filePath` is the path of the file that was just placed (or the .strm +// pointer); it's used to resolve the matching Plex section for partial +// refreshes. Pass "" to fall back to a full refresh. +// +// Each refresh runs in its own goroutine with a 15s timeout — the call +// returns immediately and never blocks the caller. +func Refresh(servers []ServerConfig, filePath string) { + for _, s := range servers { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := refreshOne(ctx, s, filePath); err != nil { + log.Printf("mediaserver: %s refresh failed (%s): %v", s.Kind, s.URL, err) + } + }() + } +} + +func refreshOne(ctx context.Context, s ServerConfig, filePath string) error { + switch strings.ToLower(s.Kind) { + case "plex": + return refreshPlex(ctx, s, filePath) + case "jellyfin", "emby": + return refreshJellyfin(ctx, s) + default: + return fmt.Errorf("unknown kind %q", s.Kind) + } +} + +// ── Plex ──────────────────────────────────────────────────────────── + +func refreshPlex(ctx context.Context, s ServerConfig, filePath string) error { + if s.URL == "" || s.Token == "" { + return fmt.Errorf("plex: missing url or token") + } + + sectionIDs := s.Sections + if len(sectionIDs) == 0 { + // Auto-resolve: fetch sections, pick whichever owns the file path. + sections, err := plexSections(ctx, s.URL, s.Token) + if err != nil { + return fmt.Errorf("fetch sections: %w", err) + } + if filePath != "" { + if id, ok := matchSectionByPath(sections, filePath); ok { + sectionIDs = []int{id} + } + } + if len(sectionIDs) == 0 { + // Fall back to refreshing every section. + for _, sec := range sections { + sectionIDs = append(sectionIDs, sec.ID) + } + } + } + + var firstErr error + for _, id := range sectionIDs { + if err := plexRefreshSection(ctx, s.URL, s.Token, id, filePath); err != nil { + if firstErr == nil { + firstErr = err + } + log.Printf("mediaserver: plex section %d refresh failed: %v", id, err) + } + } + return firstErr +} + +func plexRefreshSection(ctx context.Context, baseURL, token string, sectionID int, filePath string) error { + q := url.Values{} + if filePath != "" { + q.Set("path", filePath) + } + q.Set("X-Plex-Token", token) + + endpoint := fmt.Sprintf("%s/library/sections/%d/refresh?%s", + strings.TrimRight(baseURL, "/"), sectionID, q.Encode()) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) + if err != nil { + return err + } + req.Header.Set("Accept", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + return fmt.Errorf("status %d", resp.StatusCode) +} + +// plexSections returns the cached or freshly fetched library sections for a +// Plex server. The cache lives for the agent's lifetime — Plex sections +// rarely change, so refetching on every download is wasteful. +func plexSections(ctx context.Context, baseURL, token string) ([]Section, error) { + key := baseURL + "|" + token + plexSectionMu.RLock() + cached, ok := plexSectionCache[key] + plexSectionMu.RUnlock() + if ok { + return cached, nil + } + + sections, err := fetchPlexSections(ctx, baseURL, token) + if err != nil { + return nil, err + } + + plexSectionMu.Lock() + plexSectionCache[key] = sections + plexSectionMu.Unlock() + + return sections, nil +} + +func fetchPlexSections(ctx context.Context, baseURL, token string) ([]Section, error) { + endpoint := strings.TrimRight(baseURL, "/") + "/library/sections" + req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) + if err != nil { + return nil, err + } + req.Header.Set("X-Plex-Token", token) + req.Header.Set("Accept", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("status %d", resp.StatusCode) + } + + body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + if err != nil { + return nil, err + } + + return parsePlexSectionsFull(body) +} + +// parsePlexSectionsFull parses the full sections response with IDs. +func parsePlexSectionsFull(body []byte) ([]Section, error) { + var container struct { + MediaContainer struct { + Directory []struct { + Key string `json:"key"` + Title string `json:"title"` + Location []struct { + Path string `json:"path"` + } `json:"Location"` + } `json:"Directory"` + } `json:"MediaContainer"` + } + if err := json.Unmarshal(body, &container); err != nil { + return nil, err + } + + var out []Section + for _, d := range container.MediaContainer.Directory { + var id int + _, _ = fmt.Sscanf(d.Key, "%d", &id) + if id == 0 { + continue + } + sec := Section{ID: id, Title: d.Title} + for _, loc := range d.Location { + if loc.Path != "" { + sec.Locations = append(sec.Locations, loc.Path) + } + } + out = append(out, sec) + } + return out, nil +} + +// matchSectionByPath returns the ID of the section whose Locations contain +// (as prefix) the given file path. Picks the most specific (longest) match. +func matchSectionByPath(sections []Section, filePath string) (int, bool) { + bestID := 0 + bestLen := 0 + for _, s := range sections { + for _, loc := range s.Locations { + loc = strings.TrimRight(loc, "/") + if loc == "" { + continue + } + if filePath == loc || strings.HasPrefix(filePath, loc+"/") { + if len(loc) > bestLen { + bestLen = len(loc) + bestID = s.ID + } + } + } + } + return bestID, bestID != 0 +} + +// ── Jellyfin / Emby ───────────────────────────────────────────────── + +func refreshJellyfin(ctx context.Context, s ServerConfig) error { + if s.URL == "" || s.Token == "" { + return fmt.Errorf("%s: missing url or token", s.Kind) + } + + endpoint := strings.TrimRight(s.URL, "/") + "/Library/Refresh" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, nil) + if err != nil { + return err + } + // Both Jellyfin and Emby accept this header. + req.Header.Set("X-Emby-Token", s.Token) + req.Header.Set("Accept", "application/json") + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + _, _ = io.Copy(io.Discard, resp.Body) + + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + return fmt.Errorf("status %d", resp.StatusCode) +} diff --git a/internal/mediaserver/refresh_test.go b/internal/mediaserver/refresh_test.go new file mode 100644 index 0000000..08a8d42 --- /dev/null +++ b/internal/mediaserver/refresh_test.go @@ -0,0 +1,149 @@ +package mediaserver + +import ( + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" +) + +func TestParsePlexSectionsFull(t *testing.T) { + body := []byte(`{ + "MediaContainer": { + "Directory": [ + { "key": "1", "title": "Movies", "Location": [{"path": "/data/media/movies"}] }, + { "key": "2", "title": "TV Shows", "Location": [{"path": "/data/media/tv"}, {"path": "/mnt/tv2"}] }, + { "key": "0", "title": "Bogus", "Location": [{"path": "/skip"}] } + ] + } + }`) + + sections, err := parsePlexSectionsFull(body) + if err != nil { + t.Fatalf("parsePlexSectionsFull error: %v", err) + } + if len(sections) != 2 { + t.Fatalf("got %d sections, want 2 (id=0 should be skipped)", len(sections)) + } + if sections[0].ID != 1 || sections[0].Title != "Movies" { + t.Errorf("section[0] = %+v", sections[0]) + } + if sections[1].ID != 2 || len(sections[1].Locations) != 2 { + t.Errorf("section[1] = %+v", sections[1]) + } +} + +func TestMatchSectionByPath(t *testing.T) { + sections := []Section{ + {ID: 1, Title: "Movies", Locations: []string{"/data/media/movies"}}, + {ID: 2, Title: "TV", Locations: []string{"/data/media/tv"}}, + {ID: 3, Title: "TV-HD", Locations: []string{"/data/media/tv/hd"}}, + } + + tests := []struct { + path string + wantID int + wantOK bool + }{ + {"/data/media/movies/Inception (2010)/Inception.mkv", 1, true}, + {"/data/media/tv/Show/Season 01/ep.mkv", 2, true}, + {"/data/media/tv/hd/Show/Season 01/ep.mkv", 3, true}, // most specific wins + {"/data/media/movies", 1, true}, // exact + {"/elsewhere/foo.mkv", 0, false}, + } + for _, tc := range tests { + gotID, gotOK := matchSectionByPath(sections, tc.path) + if gotID != tc.wantID || gotOK != tc.wantOK { + t.Errorf("matchSectionByPath(%q) = (%d,%v), want (%d,%v)", + tc.path, gotID, gotOK, tc.wantID, tc.wantOK) + } + } +} + +func TestRefreshPlex_PartialRefreshWithPath(t *testing.T) { + var refreshHits int32 + var gotPath, gotToken string + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.URL.Path == "/library/sections": + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{ + "MediaContainer": { + "Directory": [ + { "key": "7", "title": "Movies", "Location": [{"path": "/m"}] } + ] + } + }`)) + case strings.HasPrefix(r.URL.Path, "/library/sections/7/refresh"): + atomic.AddInt32(&refreshHits, 1) + gotPath = r.URL.Query().Get("path") + gotToken = r.URL.Query().Get("X-Plex-Token") + w.WriteHeader(http.StatusOK) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer srv.Close() + + // Reset cache so the test server is hit fresh. + plexSectionMu.Lock() + plexSectionCache = map[string][]Section{} + plexSectionMu.Unlock() + + cfg := ServerConfig{Kind: "plex", URL: srv.URL, Token: "tk-1"} + Refresh([]ServerConfig{cfg}, "/m/Inception/Inception.mkv") + + // Refresh fans out goroutines — give it time. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if atomic.LoadInt32(&refreshHits) > 0 { + break + } + time.Sleep(20 * time.Millisecond) + } + + if atomic.LoadInt32(&refreshHits) != 1 { + t.Fatalf("refresh endpoint hit %d times, want 1", refreshHits) + } + if gotPath != "/m/Inception/Inception.mkv" { + t.Errorf("path query = %q", gotPath) + } + if gotToken != "tk-1" { + t.Errorf("token query = %q", gotToken) + } +} + +func TestRefreshJellyfin(t *testing.T) { + var hits int32 + var gotToken string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost && r.URL.Path == "/Library/Refresh" { + atomic.AddInt32(&hits, 1) + gotToken = r.Header.Get("X-Emby-Token") + w.WriteHeader(http.StatusNoContent) + return + } + w.WriteHeader(http.StatusNotFound) + })) + defer srv.Close() + + cfg := ServerConfig{Kind: "jellyfin", URL: srv.URL, Token: "jf-key"} + Refresh([]ServerConfig{cfg}, "") + + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if atomic.LoadInt32(&hits) > 0 { + break + } + time.Sleep(20 * time.Millisecond) + } + if atomic.LoadInt32(&hits) != 1 { + t.Fatalf("Jellyfin hits = %d, want 1", hits) + } + if gotToken != "jf-key" { + t.Errorf("X-Emby-Token = %q", gotToken) + } +}