Compare commits
1 commit
main
...
feature/pl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6adf1e2c4c |
13 changed files with 1065 additions and 16 deletions
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/torrentclaw/unarr/internal/config"
|
"github.com/torrentclaw/unarr/internal/config"
|
||||||
"github.com/torrentclaw/unarr/internal/engine"
|
"github.com/torrentclaw/unarr/internal/engine"
|
||||||
"github.com/torrentclaw/unarr/internal/library"
|
"github.com/torrentclaw/unarr/internal/library"
|
||||||
|
"github.com/torrentclaw/unarr/internal/mediaserver"
|
||||||
"github.com/torrentclaw/unarr/internal/usenet/download"
|
"github.com/torrentclaw/unarr/internal/usenet/download"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -237,10 +238,28 @@ func runDaemonStart() error {
|
||||||
// Trigger immediate sync when a download slot frees up
|
// Trigger immediate sync when a download slot frees up
|
||||||
manager.OnTaskDone = func() { d.TriggerSync() }
|
manager.OnTaskDone = func() { d.TriggerSync() }
|
||||||
|
|
||||||
|
// Trigger Plex/Jellyfin/Emby library refresh after a task finalises so
|
||||||
|
// the new file appears in the user's library within seconds (instead
|
||||||
|
// of waiting for the next periodic scan). No-op if no servers
|
||||||
|
// configured. Errors are logged inside Refresh and never propagate.
|
||||||
|
if len(cfg.MediaServers) > 0 {
|
||||||
|
manager.OnFinalized = func(task *engine.Task) {
|
||||||
|
if task == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fp := task.SafeFilePath()
|
||||||
|
if fp == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
mediaserver.Refresh(cfg.MediaServers, fp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Wire: sync receives new tasks → submit to manager or handle stream
|
// Wire: sync receives new tasks → submit to manager or handle stream
|
||||||
d.OnTasksClaimed = func(tasks []agent.Task) {
|
d.OnTasksClaimed = func(tasks []agent.Task) {
|
||||||
for _, t := range tasks {
|
for _, t := range tasks {
|
||||||
if t.Mode == "stream" {
|
switch t.Mode {
|
||||||
|
case "stream":
|
||||||
if isStreamingTask(t.ID) {
|
if isStreamingTask(t.ID) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -251,7 +270,9 @@ func runDaemonStart() error {
|
||||||
streamRegistry.cancels[t.ID] = streamCancel
|
streamRegistry.cancels[t.ID] = streamCancel
|
||||||
streamRegistry.mu.Unlock()
|
streamRegistry.mu.Unlock()
|
||||||
go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv)
|
go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv)
|
||||||
} else {
|
case "strm-to-library":
|
||||||
|
go handleStrmToLibrary(ctx, t, cfg, agentClient)
|
||||||
|
default:
|
||||||
manager.Submit(ctx, t)
|
manager.Submit(ctx, t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -296,6 +296,34 @@ func runInit(apiURLOverride string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Plex / Jellyfin / Emby refresh hook ────────────────────────
|
||||||
|
// Offer to wire library refreshes if a media server was detected and
|
||||||
|
// none are configured yet. Skipping here is fine — the user can run
|
||||||
|
// `unarr mediaserver setup` later.
|
||||||
|
if len(detected.Servers) > 0 && len(cfg.MediaServers) == 0 {
|
||||||
|
fmt.Println()
|
||||||
|
var configureMS bool
|
||||||
|
err = huh.NewForm(
|
||||||
|
huh.NewGroup(
|
||||||
|
huh.NewConfirm().
|
||||||
|
Title(fmt.Sprintf("Auto-refresh %s on every download?", detected.Servers[0].Name)).
|
||||||
|
Description("New downloads appear on Roku/Apple TV/etc. within seconds, instead of waiting for the next periodic library scan").
|
||||||
|
Affirmative("Yes, configure").
|
||||||
|
Negative("Skip (do it later with 'unarr mediaserver setup')").
|
||||||
|
Value(&configureMS),
|
||||||
|
),
|
||||||
|
).Run()
|
||||||
|
if err == nil && configureMS {
|
||||||
|
fmt.Println()
|
||||||
|
if err := runMediaserverSetup(); err != nil {
|
||||||
|
color.New(color.FgYellow).Printf(" Media server setup failed: %s\n", err)
|
||||||
|
} else {
|
||||||
|
// runMediaserverSetup already saved + updated appCfg.
|
||||||
|
cfg = appCfg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ── Debrid auto-detection from *arr ─────────────────────────────
|
// ── Debrid auto-detection from *arr ─────────────────────────────
|
||||||
|
|
||||||
if resp.User.IsPro {
|
if resp.User.IsPro {
|
||||||
|
|
|
||||||
335
internal/cmd/mediaserver.go
Normal file
335
internal/cmd/mediaserver.go
Normal file
|
|
@ -0,0 +1,335 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/charmbracelet/huh"
|
||||||
|
"github.com/fatih/color"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/torrentclaw/unarr/internal/config"
|
||||||
|
"github.com/torrentclaw/unarr/internal/mediaserver"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newMediaserverCmd() *cobra.Command {
|
||||||
|
cmd := &cobra.Command{
|
||||||
|
Use: "mediaserver",
|
||||||
|
Aliases: []string{"plex", "jellyfin"},
|
||||||
|
Short: "Configure Plex / Jellyfin / Emby auto-refresh",
|
||||||
|
Long: `Manage the list of media servers that unarr should refresh after a
|
||||||
|
download finishes.
|
||||||
|
|
||||||
|
When configured, unarr triggers a partial library refresh on each server
|
||||||
|
right after a download is verified and organised, so the new file shows
|
||||||
|
up in your Plex / Jellyfin / Emby (and therefore on Roku, Apple TV, Fire
|
||||||
|
TV, etc.) within seconds instead of waiting for the next periodic scan.`,
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd.AddCommand(
|
||||||
|
newMediaserverSetupCmd(),
|
||||||
|
newMediaserverListCmd(),
|
||||||
|
newMediaserverRemoveCmd(),
|
||||||
|
newMediaserverTestCmd(),
|
||||||
|
)
|
||||||
|
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMediaserverSetupCmd() *cobra.Command {
|
||||||
|
return &cobra.Command{
|
||||||
|
Use: "setup",
|
||||||
|
Short: "Interactive wizard to add a Plex / Jellyfin / Emby server",
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
return runMediaserverSetup()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMediaserverListCmd() *cobra.Command {
|
||||||
|
return &cobra.Command{
|
||||||
|
Use: "list",
|
||||||
|
Short: "List configured media servers",
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
cfg := loadConfig()
|
||||||
|
if len(cfg.MediaServers) == 0 {
|
||||||
|
fmt.Println("No media servers configured. Run 'unarr mediaserver setup' to add one.")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for i, s := range cfg.MediaServers {
|
||||||
|
fmt.Printf("%d. %s @ %s\n", i+1, strings.ToUpper(s.Kind), s.URL)
|
||||||
|
if len(s.Sections) > 0 {
|
||||||
|
fmt.Printf(" Sections: %v\n", s.Sections)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMediaserverRemoveCmd() *cobra.Command {
|
||||||
|
return &cobra.Command{
|
||||||
|
Use: "remove",
|
||||||
|
Short: "Remove a configured media server",
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
cfg := loadConfig()
|
||||||
|
if len(cfg.MediaServers) == 0 {
|
||||||
|
fmt.Println("No media servers configured.")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var options []huh.Option[int]
|
||||||
|
for i, s := range cfg.MediaServers {
|
||||||
|
label := fmt.Sprintf("%s @ %s", strings.ToUpper(s.Kind), s.URL)
|
||||||
|
options = append(options, huh.NewOption(label, i))
|
||||||
|
}
|
||||||
|
|
||||||
|
var idx int
|
||||||
|
err := huh.NewForm(
|
||||||
|
huh.NewGroup(
|
||||||
|
huh.NewSelect[int]().
|
||||||
|
Title("Which server to remove?").
|
||||||
|
Options(options...).
|
||||||
|
Value(&idx),
|
||||||
|
),
|
||||||
|
).Run()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, huh.ErrUserAborted) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.MediaServers = append(cfg.MediaServers[:idx], cfg.MediaServers[idx+1:]...)
|
||||||
|
if err := config.Save(cfg, cfgFile); err != nil {
|
||||||
|
return fmt.Errorf("save config: %w", err)
|
||||||
|
}
|
||||||
|
appCfg = cfg
|
||||||
|
color.New(color.FgGreen).Println(" ✓ Removed.")
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMediaserverTestCmd() *cobra.Command {
|
||||||
|
return &cobra.Command{
|
||||||
|
Use: "test",
|
||||||
|
Short: "Trigger a refresh on each configured server (sanity check)",
|
||||||
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
cfg := loadConfig()
|
||||||
|
if len(cfg.MediaServers) == 0 {
|
||||||
|
fmt.Println("No media servers configured.")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, s := range cfg.MediaServers {
|
||||||
|
fmt.Printf("Refreshing %s @ %s ... ", s.Kind, s.URL)
|
||||||
|
mediaserver.Refresh([]mediaserver.ServerConfig{s}, "")
|
||||||
|
}
|
||||||
|
// Refresh fans out goroutines; give them time to log results.
|
||||||
|
fmt.Println("dispatched (errors, if any, are logged).")
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// runMediaserverSetup walks the user through adding a single media server.
|
||||||
|
// Auto-detects local Plex/Jellyfin/Emby via port scan and prefills as much
|
||||||
|
// as possible.
|
||||||
|
func runMediaserverSetup() error {
|
||||||
|
if !isTerminal() {
|
||||||
|
return fmt.Errorf("interactive mode requires a terminal")
|
||||||
|
}
|
||||||
|
|
||||||
|
bold := color.New(color.Bold)
|
||||||
|
cyan := color.New(color.FgCyan)
|
||||||
|
dim := color.New(color.FgHiBlack)
|
||||||
|
green := color.New(color.FgGreen)
|
||||||
|
|
||||||
|
cfg := loadConfig()
|
||||||
|
|
||||||
|
fmt.Println()
|
||||||
|
bold.Println(" Add a media server")
|
||||||
|
fmt.Println()
|
||||||
|
dim.Println(" unarr will hit the server's refresh API after each download,")
|
||||||
|
dim.Println(" so new files appear in your library within seconds.")
|
||||||
|
fmt.Println()
|
||||||
|
|
||||||
|
detected := mediaserver.Detect()
|
||||||
|
|
||||||
|
// Pick kind
|
||||||
|
var kind string
|
||||||
|
kindOpts := []huh.Option[string]{
|
||||||
|
huh.NewOption("Plex", "plex"),
|
||||||
|
huh.NewOption("Jellyfin", "jellyfin"),
|
||||||
|
huh.NewOption("Emby", "emby"),
|
||||||
|
}
|
||||||
|
// Default selection: first detected server's kind, lower-cased.
|
||||||
|
if len(detected.Servers) > 0 {
|
||||||
|
kind = strings.ToLower(detected.Servers[0].Name)
|
||||||
|
} else {
|
||||||
|
kind = "plex"
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := huh.NewForm(huh.NewGroup(
|
||||||
|
huh.NewSelect[string]().
|
||||||
|
Title("Server type").
|
||||||
|
Options(kindOpts...).
|
||||||
|
Value(&kind),
|
||||||
|
)).Run(); err != nil {
|
||||||
|
if errors.Is(err, huh.ErrUserAborted) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prefill URL from detection if available
|
||||||
|
url := ""
|
||||||
|
for _, s := range detected.Servers {
|
||||||
|
if strings.EqualFold(s.Name, kind) {
|
||||||
|
url = s.URL
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if url == "" {
|
||||||
|
url = defaultURLFor(kind)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := huh.NewForm(huh.NewGroup(
|
||||||
|
huh.NewInput().
|
||||||
|
Title("Server URL").
|
||||||
|
Description("Reachable from this machine — e.g. http://localhost:32400").
|
||||||
|
Value(&url).
|
||||||
|
Validate(func(s string) error {
|
||||||
|
s = strings.TrimSpace(s)
|
||||||
|
if s == "" {
|
||||||
|
return fmt.Errorf("URL is required")
|
||||||
|
}
|
||||||
|
if !strings.HasPrefix(s, "http://") && !strings.HasPrefix(s, "https://") {
|
||||||
|
return fmt.Errorf("must start with http:// or https://")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
|
)).Run(); err != nil {
|
||||||
|
if errors.Is(err, huh.ErrUserAborted) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
url = strings.TrimRight(strings.TrimSpace(url), "/")
|
||||||
|
|
||||||
|
// Token entry
|
||||||
|
token := ""
|
||||||
|
if kind == "plex" {
|
||||||
|
// Try local Preferences.xml first (works when Plex runs on same host).
|
||||||
|
if t := mediaserver.LocalPlexToken(); t != "" {
|
||||||
|
cyan.Println(" ✓ Found Plex token in local Preferences.xml")
|
||||||
|
fmt.Println()
|
||||||
|
useLocal := true
|
||||||
|
_ = huh.NewForm(huh.NewGroup(
|
||||||
|
huh.NewConfirm().
|
||||||
|
Title("Use the auto-detected token?").
|
||||||
|
Affirmative("Yes").
|
||||||
|
Negative("No, paste a different one").
|
||||||
|
Value(&useLocal),
|
||||||
|
)).Run()
|
||||||
|
if useLocal {
|
||||||
|
token = t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if token == "" {
|
||||||
|
title := "API key"
|
||||||
|
desc := ""
|
||||||
|
switch kind {
|
||||||
|
case "plex":
|
||||||
|
title = "Plex token"
|
||||||
|
desc = "Get it via Plex web UI → any item → ⋯ → Get Info → View XML → copy ?X-Plex-Token=... from URL"
|
||||||
|
case "jellyfin":
|
||||||
|
title = "Jellyfin API key"
|
||||||
|
desc = "Dashboard → Advanced → API Keys → New API Key"
|
||||||
|
case "emby":
|
||||||
|
title = "Emby API key"
|
||||||
|
desc = "Server Dashboard → API Keys → New Application"
|
||||||
|
}
|
||||||
|
if err := huh.NewForm(huh.NewGroup(
|
||||||
|
huh.NewInput().
|
||||||
|
Title(title).
|
||||||
|
Description(desc).
|
||||||
|
Value(&token).
|
||||||
|
Validate(func(s string) error {
|
||||||
|
if strings.TrimSpace(s) == "" {
|
||||||
|
return fmt.Errorf("token is required")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
|
)).Run(); err != nil {
|
||||||
|
if errors.Is(err, huh.ErrUserAborted) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
token = strings.TrimSpace(token)
|
||||||
|
|
||||||
|
// Save
|
||||||
|
newServer := mediaserver.ServerConfig{
|
||||||
|
Kind: kind,
|
||||||
|
URL: url,
|
||||||
|
Token: token,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replace if same kind+URL already present, else append
|
||||||
|
replaced := false
|
||||||
|
for i, existing := range cfg.MediaServers {
|
||||||
|
if strings.EqualFold(existing.Kind, kind) && existing.URL == url {
|
||||||
|
cfg.MediaServers[i] = newServer
|
||||||
|
replaced = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !replaced {
|
||||||
|
cfg.MediaServers = append(cfg.MediaServers, newServer)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := config.Save(cfg, cfgFile); err != nil {
|
||||||
|
return fmt.Errorf("save config: %w", err)
|
||||||
|
}
|
||||||
|
appCfg = cfg
|
||||||
|
|
||||||
|
fmt.Println()
|
||||||
|
if replaced {
|
||||||
|
green.Printf(" ✓ Updated %s @ %s\n", strings.ToUpper(kind), url)
|
||||||
|
} else {
|
||||||
|
green.Printf(" ✓ Added %s @ %s\n", strings.ToUpper(kind), url)
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
|
|
||||||
|
// Sanity test
|
||||||
|
doTest := true
|
||||||
|
_ = huh.NewForm(huh.NewGroup(
|
||||||
|
huh.NewConfirm().
|
||||||
|
Title("Trigger a test refresh now?").
|
||||||
|
Affirmative("Yes").
|
||||||
|
Negative("Skip").
|
||||||
|
Value(&doTest),
|
||||||
|
)).Run()
|
||||||
|
if doTest {
|
||||||
|
mediaserver.Refresh([]mediaserver.ServerConfig{newServer}, "")
|
||||||
|
fmt.Println(" Refresh dispatched. If it failed, the error is in the logs.")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultURLFor(kind string) string {
|
||||||
|
switch strings.ToLower(kind) {
|
||||||
|
case "plex":
|
||||||
|
return "http://localhost:32400"
|
||||||
|
case "jellyfin":
|
||||||
|
return "http://localhost:8096"
|
||||||
|
case "emby":
|
||||||
|
return "http://localhost:8920"
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
@ -117,6 +117,9 @@ Source: https://github.com/torrentclaw/unarr`,
|
||||||
scanCmd := newScanCmd()
|
scanCmd := newScanCmd()
|
||||||
scanCmd.GroupID = "search"
|
scanCmd.GroupID = "search"
|
||||||
|
|
||||||
|
mediaserverCmd := newMediaserverCmd()
|
||||||
|
mediaserverCmd.GroupID = "start"
|
||||||
|
|
||||||
rootCmd.AddCommand(
|
rootCmd.AddCommand(
|
||||||
// Getting Started
|
// Getting Started
|
||||||
initCmd,
|
initCmd,
|
||||||
|
|
@ -146,6 +149,7 @@ Source: https://github.com/torrentclaw/unarr`,
|
||||||
completionCmd,
|
completionCmd,
|
||||||
// Library
|
// Library
|
||||||
scanCmd,
|
scanCmd,
|
||||||
|
mediaserverCmd,
|
||||||
// Alias: upgrade → self-update
|
// Alias: upgrade → self-update
|
||||||
newUpgradeCmd(),
|
newUpgradeCmd(),
|
||||||
)
|
)
|
||||||
|
|
|
||||||
70
internal/cmd/strm_handler.go
Normal file
70
internal/cmd/strm_handler.go
Normal file
|
|
@ -0,0 +1,70 @@
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/torrentclaw/unarr/internal/agent"
|
||||||
|
"github.com/torrentclaw/unarr/internal/config"
|
||||||
|
"github.com/torrentclaw/unarr/internal/engine"
|
||||||
|
"github.com/torrentclaw/unarr/internal/mediaserver"
|
||||||
|
)
|
||||||
|
|
||||||
|
// handleStrmToLibrary processes a Mode="strm-to-library" task by writing a
|
||||||
|
// one-line .strm file to the user's media library and triggering a
|
||||||
|
// Plex/Jellyfin/Emby refresh. No actual download happens; the .strm points
|
||||||
|
// at the cloud-resolved debrid HTTPS URL, and the media server streams from
|
||||||
|
// there when the user presses play.
|
||||||
|
//
|
||||||
|
// Reports completion (or failure) back to the cloud via the agent client.
|
||||||
|
func handleStrmToLibrary(ctx context.Context, t agent.Task, cfg config.Config, agentClient *agent.Client) {
|
||||||
|
short := agent.ShortID(t.ID)
|
||||||
|
|
||||||
|
if t.DirectURL == "" {
|
||||||
|
log.Printf("[%s] strm-to-library: missing directUrl from server", short)
|
||||||
|
reportStrmFailure(ctx, agentClient, t.ID, "missing directUrl")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
organizeCfg := engine.OrganizeConfig{
|
||||||
|
Enabled: cfg.Organize.Enabled,
|
||||||
|
MoviesDir: cfg.Organize.MoviesDir,
|
||||||
|
TVShowsDir: cfg.Organize.TVShowsDir,
|
||||||
|
OutputDir: cfg.Download.Dir,
|
||||||
|
}
|
||||||
|
|
||||||
|
finalPath, err := engine.WriteStrm(t, organizeCfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("[%s] strm-to-library write failed: %v", short, err)
|
||||||
|
reportStrmFailure(ctx, agentClient, t.ID, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("[%s] strm-to-library wrote %s", short, finalPath)
|
||||||
|
|
||||||
|
// Trigger media-server refresh if any are configured. Errors are logged
|
||||||
|
// inside Refresh and never propagate — the .strm is on disk, so the
|
||||||
|
// next periodic scan would pick it up regardless.
|
||||||
|
if len(cfg.MediaServers) > 0 {
|
||||||
|
mediaserver.Refresh(cfg.MediaServers, finalPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, reportErr := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
||||||
|
TaskID: t.ID,
|
||||||
|
Status: "completed",
|
||||||
|
Progress: 100,
|
||||||
|
FilePath: finalPath,
|
||||||
|
}); reportErr != nil {
|
||||||
|
log.Printf("[%s] strm-to-library: status report failed: %v", short, reportErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func reportStrmFailure(ctx context.Context, agentClient *agent.Client, taskID, msg string) {
|
||||||
|
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
||||||
|
TaskID: taskID,
|
||||||
|
Status: "failed",
|
||||||
|
ErrorMessage: msg,
|
||||||
|
}); err != nil {
|
||||||
|
log.Printf("[%s] strm failure report failed: %v", agent.ShortID(taskID), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -9,18 +9,20 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/BurntSushi/toml"
|
"github.com/BurntSushi/toml"
|
||||||
|
"github.com/torrentclaw/unarr/internal/mediaserver"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Config holds all persistent CLI configuration.
|
// Config holds all persistent CLI configuration.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Auth AuthConfig `toml:"auth"`
|
Auth AuthConfig `toml:"auth"`
|
||||||
Agent AgentConfig `toml:"agent"`
|
Agent AgentConfig `toml:"agent"`
|
||||||
Download DownloadConfig `toml:"downloads"`
|
Download DownloadConfig `toml:"downloads"`
|
||||||
Organize OrganizeConfig `toml:"organize"`
|
Organize OrganizeConfig `toml:"organize"`
|
||||||
Daemon DaemonConfig `toml:"daemon"`
|
Daemon DaemonConfig `toml:"daemon"`
|
||||||
Notifications NotificationsConfig `toml:"notifications"`
|
Notifications NotificationsConfig `toml:"notifications"`
|
||||||
General GeneralConfig `toml:"general"`
|
General GeneralConfig `toml:"general"`
|
||||||
Library LibraryConfig `toml:"library"`
|
Library LibraryConfig `toml:"library"`
|
||||||
|
MediaServers []mediaserver.ServerConfig `toml:"mediaserver"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type AuthConfig struct {
|
type AuthConfig struct {
|
||||||
|
|
|
||||||
|
|
@ -33,6 +33,12 @@ type Manager struct {
|
||||||
// Used by the daemon to trigger an immediate sync.
|
// Used by the daemon to trigger an immediate sync.
|
||||||
OnTaskDone func()
|
OnTaskDone func()
|
||||||
|
|
||||||
|
// OnFinalized is called after a task successfully transitions to
|
||||||
|
// completed (after verify + organize + optional upgrade replace).
|
||||||
|
// Used by the daemon to trigger media-server library refreshes.
|
||||||
|
// Not invoked on failure.
|
||||||
|
OnFinalized func(task *Task)
|
||||||
|
|
||||||
// recentlyFinished holds tasks that completed/failed since the last sync read.
|
// recentlyFinished holds tasks that completed/failed since the last sync read.
|
||||||
// The sync goroutine reads and clears this to include final states in the next sync.
|
// The sync goroutine reads and clears this to include final states in the next sync.
|
||||||
recentMu sync.Mutex
|
recentMu sync.Mutex
|
||||||
|
|
@ -444,6 +450,9 @@ func (m *Manager) finalize(ctx context.Context, task *Task, result *Result) {
|
||||||
if m.cfg.Notifications {
|
if m.cfg.Notifications {
|
||||||
desktopNotify("Download complete", task.Title)
|
desktopNotify("Download complete", task.Title)
|
||||||
}
|
}
|
||||||
|
if m.OnFinalized != nil {
|
||||||
|
m.OnFinalized(task)
|
||||||
|
}
|
||||||
m.recordFinished(task.ToStatusUpdate())
|
m.recordFinished(task.ToStatusUpdate())
|
||||||
m.reporter.ReportFinal(ctx, task)
|
m.reporter.ReportFinal(ctx, task)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
130
internal/engine/strm.go
Normal file
130
internal/engine/strm.go
Normal file
|
|
@ -0,0 +1,130 @@
|
||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/torrentclaw/unarr/internal/agent"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StrmDest holds the resolved location for a .strm file to be written.
|
||||||
|
type StrmDest struct {
|
||||||
|
Dir string // directory the .strm file lives in (created if missing)
|
||||||
|
FileName string // filename including the .strm extension
|
||||||
|
FullPath string // Dir + FileName, joined
|
||||||
|
}
|
||||||
|
|
||||||
|
// StrmDestForTask computes where a .strm file should land for the given
|
||||||
|
// agent task, mirroring organize()'s naming so Plex/Jellyfin sees the same
|
||||||
|
// folder structure as a real download would have produced.
|
||||||
|
//
|
||||||
|
// Returns an error if cfg lacks the relevant library directory.
|
||||||
|
func StrmDestForTask(task agent.Task, cfg OrganizeConfig) (StrmDest, error) {
|
||||||
|
switch {
|
||||||
|
case task.ContentType == "show":
|
||||||
|
if cfg.TVShowsDir == "" {
|
||||||
|
return StrmDest{}, fmt.Errorf("strm: TVShowsDir not configured")
|
||||||
|
}
|
||||||
|
showName := task.ContentTitle
|
||||||
|
if showName == "" {
|
||||||
|
showName = cleanTitle(task.Title)
|
||||||
|
}
|
||||||
|
dir := filepath.Join(cfg.TVShowsDir, sanitizePath(showName))
|
||||||
|
var fileName string
|
||||||
|
if task.Season != nil {
|
||||||
|
dir = filepath.Join(dir, fmt.Sprintf("Season %02d", *task.Season))
|
||||||
|
if task.Episode != nil {
|
||||||
|
fileName = fmt.Sprintf("%s - S%02dE%02d.strm",
|
||||||
|
sanitizePath(showName), *task.Season, *task.Episode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if fileName == "" {
|
||||||
|
// Missing season/episode metadata — fall back to a sanitised title.
|
||||||
|
fileName = sanitizePath(showName) + ".strm"
|
||||||
|
}
|
||||||
|
return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil
|
||||||
|
|
||||||
|
case task.CollectionName != "" && cfg.MoviesDir != "":
|
||||||
|
movieName := task.ContentTitle
|
||||||
|
if movieName == "" {
|
||||||
|
movieName = cleanTitle(task.Title)
|
||||||
|
}
|
||||||
|
year := strYear(task)
|
||||||
|
base := sanitizePath(movieName)
|
||||||
|
if year != "" {
|
||||||
|
base = fmt.Sprintf("%s (%s)", base, year)
|
||||||
|
}
|
||||||
|
dir := filepath.Join(cfg.MoviesDir, sanitizePath(task.CollectionName), base)
|
||||||
|
fileName := base + ".strm"
|
||||||
|
return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil
|
||||||
|
|
||||||
|
case task.ContentType == "movie":
|
||||||
|
if cfg.MoviesDir == "" {
|
||||||
|
return StrmDest{}, fmt.Errorf("strm: MoviesDir not configured")
|
||||||
|
}
|
||||||
|
movieName := task.ContentTitle
|
||||||
|
if movieName == "" {
|
||||||
|
movieName = cleanTitle(task.Title)
|
||||||
|
}
|
||||||
|
year := strYear(task)
|
||||||
|
base := sanitizePath(movieName)
|
||||||
|
if year != "" {
|
||||||
|
base = fmt.Sprintf("%s (%s)", base, year)
|
||||||
|
}
|
||||||
|
dir := filepath.Join(cfg.MoviesDir, base)
|
||||||
|
fileName := base + ".strm"
|
||||||
|
return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
// No metadata at all — drop into MoviesDir under a sanitised title so
|
||||||
|
// at least the file lands somewhere a library scan might find it.
|
||||||
|
if cfg.MoviesDir == "" {
|
||||||
|
return StrmDest{}, fmt.Errorf("strm: no library dir configured for content without metadata")
|
||||||
|
}
|
||||||
|
base := sanitizePath(cleanTitle(task.Title))
|
||||||
|
if base == "" {
|
||||||
|
base = "Unknown"
|
||||||
|
}
|
||||||
|
dir := filepath.Join(cfg.MoviesDir, base)
|
||||||
|
fileName := base + ".strm"
|
||||||
|
return StrmDest{Dir: dir, FileName: fileName, FullPath: filepath.Join(dir, fileName)}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteStrm writes a .strm file containing the given URL at the destination
|
||||||
|
// computed from the task. Creates parent dirs as needed. Atomic write
|
||||||
|
// (temp + rename) so a partial file never gets indexed.
|
||||||
|
func WriteStrm(task agent.Task, cfg OrganizeConfig) (string, error) {
|
||||||
|
if task.DirectURL == "" {
|
||||||
|
return "", fmt.Errorf("strm: task has no directUrl")
|
||||||
|
}
|
||||||
|
|
||||||
|
dest, err := StrmDestForTask(task, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.MkdirAll(dest.Dir, 0o755); err != nil {
|
||||||
|
return "", fmt.Errorf("strm: create dir: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tmp := dest.FullPath + ".tmp"
|
||||||
|
if err := os.WriteFile(tmp, []byte(strings.TrimSpace(task.DirectURL)+"\n"), 0o644); err != nil {
|
||||||
|
return "", fmt.Errorf("strm: write temp: %w", err)
|
||||||
|
}
|
||||||
|
if err := os.Rename(tmp, dest.FullPath); err != nil {
|
||||||
|
_ = os.Remove(tmp)
|
||||||
|
return "", fmt.Errorf("strm: rename: %w", err)
|
||||||
|
}
|
||||||
|
return dest.FullPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// strYear is the agent.Task counterpart to organize.go's resolveYear.
|
||||||
|
func strYear(task agent.Task) string {
|
||||||
|
if task.ContentYear != nil && *task.ContentYear > 0 {
|
||||||
|
return fmt.Sprintf("%d", *task.ContentYear)
|
||||||
|
}
|
||||||
|
return yearRegex.FindString(task.Title)
|
||||||
|
}
|
||||||
|
|
@ -155,6 +155,13 @@ func (t *Task) GetStreamURL() string {
|
||||||
return t.StreamURL
|
return t.StreamURL
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SafeFilePath returns the task's final file path thread-safely.
|
||||||
|
func (t *Task) SafeFilePath() string {
|
||||||
|
t.mu.RLock()
|
||||||
|
defer t.mu.RUnlock()
|
||||||
|
return t.FilePath
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateProgress updates download metrics thread-safely.
|
// UpdateProgress updates download metrics thread-safely.
|
||||||
func (t *Task) UpdateProgress(p Progress) {
|
func (t *Task) UpdateProgress(p Progress) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
|
|
|
||||||
|
|
@ -87,6 +87,17 @@ func Detect() DetectedPaths {
|
||||||
|
|
||||||
// ── Plex ────────────────────────────────────────────────────────────
|
// ── Plex ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
// LocalPlexToken returns the Plex auth token from the local Plex config
|
||||||
|
// directory, if Plex Media Server is installed on this host. Returns ""
|
||||||
|
// when Plex isn't installed or the token can't be read.
|
||||||
|
func LocalPlexToken() string {
|
||||||
|
dir := plexConfigDir()
|
||||||
|
if dir == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return PlexTokenFromPrefs(filepath.Join(dir, "Preferences.xml"))
|
||||||
|
}
|
||||||
|
|
||||||
func plexLibraryPaths() []string {
|
func plexLibraryPaths() []string {
|
||||||
configDir := plexConfigDir()
|
configDir := plexConfigDir()
|
||||||
if configDir == "" {
|
if configDir == "" {
|
||||||
|
|
@ -95,7 +106,7 @@ func plexLibraryPaths() []string {
|
||||||
|
|
||||||
// Read token from Preferences.xml
|
// Read token from Preferences.xml
|
||||||
prefsPath := filepath.Join(configDir, "Preferences.xml")
|
prefsPath := filepath.Join(configDir, "Preferences.xml")
|
||||||
token := plexTokenFromPrefs(prefsPath)
|
token := PlexTokenFromPrefs(prefsPath)
|
||||||
if token == "" {
|
if token == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -154,7 +165,10 @@ type plexPrefs struct {
|
||||||
PlexOnlineToken string `xml:"PlexOnlineToken,attr"`
|
PlexOnlineToken string `xml:"PlexOnlineToken,attr"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func plexTokenFromPrefs(path string) string {
|
// PlexTokenFromPrefs reads the Plex auth token from a Preferences.xml file.
|
||||||
|
// Returns "" if the file can't be read or parsed. Used by the setup wizard
|
||||||
|
// when configuring a Plex server running on the same host as unarr.
|
||||||
|
func PlexTokenFromPrefs(path string) string {
|
||||||
data, err := os.ReadFile(path)
|
data, err := os.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ""
|
return ""
|
||||||
|
|
|
||||||
|
|
@ -79,7 +79,7 @@ func TestPlexTokenFromPrefs(t *testing.T) {
|
||||||
<Preferences PlexOnlineToken="my-secret-token" OldestPreviousVersion="1.0"/>`
|
<Preferences PlexOnlineToken="my-secret-token" OldestPreviousVersion="1.0"/>`
|
||||||
os.WriteFile(prefsPath, []byte(xml), 0o644)
|
os.WriteFile(prefsPath, []byte(xml), 0o644)
|
||||||
|
|
||||||
token := plexTokenFromPrefs(prefsPath)
|
token := PlexTokenFromPrefs(prefsPath)
|
||||||
if token != "my-secret-token" {
|
if token != "my-secret-token" {
|
||||||
t.Errorf("token = %q, want my-secret-token", token)
|
t.Errorf("token = %q, want my-secret-token", token)
|
||||||
}
|
}
|
||||||
|
|
@ -91,14 +91,14 @@ func TestPlexTokenFromPrefs(t *testing.T) {
|
||||||
xml := `<?xml version="1.0"?><Preferences/>`
|
xml := `<?xml version="1.0"?><Preferences/>`
|
||||||
os.WriteFile(prefsPath, []byte(xml), 0o644)
|
os.WriteFile(prefsPath, []byte(xml), 0o644)
|
||||||
|
|
||||||
token := plexTokenFromPrefs(prefsPath)
|
token := PlexTokenFromPrefs(prefsPath)
|
||||||
if token != "" {
|
if token != "" {
|
||||||
t.Errorf("token = %q, want empty", token)
|
t.Errorf("token = %q, want empty", token)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("file not found", func(t *testing.T) {
|
t.Run("file not found", func(t *testing.T) {
|
||||||
token := plexTokenFromPrefs("/nonexistent/Preferences.xml")
|
token := PlexTokenFromPrefs("/nonexistent/Preferences.xml")
|
||||||
if token != "" {
|
if token != "" {
|
||||||
t.Errorf("token = %q, want empty", token)
|
t.Errorf("token = %q, want empty", token)
|
||||||
}
|
}
|
||||||
|
|
@ -109,7 +109,7 @@ func TestPlexTokenFromPrefs(t *testing.T) {
|
||||||
prefsPath := filepath.Join(dir, "Preferences.xml")
|
prefsPath := filepath.Join(dir, "Preferences.xml")
|
||||||
os.WriteFile(prefsPath, []byte("not xml at all"), 0o644)
|
os.WriteFile(prefsPath, []byte("not xml at all"), 0o644)
|
||||||
|
|
||||||
token := plexTokenFromPrefs(prefsPath)
|
token := PlexTokenFromPrefs(prefsPath)
|
||||||
if token != "" {
|
if token != "" {
|
||||||
t.Errorf("token = %q, want empty", token)
|
t.Errorf("token = %q, want empty", token)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
280
internal/mediaserver/refresh.go
Normal file
280
internal/mediaserver/refresh.go
Normal file
|
|
@ -0,0 +1,280 @@
|
||||||
|
package mediaserver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ServerConfig describes a media server that should be refreshed after a
|
||||||
|
// download (or .strm write) finishes. Stored in unarr's config.toml under
|
||||||
|
// [[mediaserver]].
|
||||||
|
type ServerConfig struct {
|
||||||
|
Kind string `toml:"kind"` // "plex" | "jellyfin" | "emby"
|
||||||
|
URL string `toml:"url"` // e.g. http://localhost:32400
|
||||||
|
Token string `toml:"token"` // Plex token / Jellyfin or Emby API key
|
||||||
|
Sections []int `toml:"sections"` // optional: Plex section IDs to refresh (else auto)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Section describes a Plex library section.
|
||||||
|
type Section struct {
|
||||||
|
ID int
|
||||||
|
Title string
|
||||||
|
Locations []string
|
||||||
|
}
|
||||||
|
|
||||||
|
// httpClient is the shared HTTP client for refresh calls. Short timeouts
|
||||||
|
// because a refresh trigger is fire-and-forget.
|
||||||
|
var httpClient = &http.Client{Timeout: 8 * time.Second}
|
||||||
|
|
||||||
|
// plexSectionCache caches section lookups per server URL+token, so we don't
|
||||||
|
// re-fetch sections on every download.
|
||||||
|
var (
|
||||||
|
plexSectionMu sync.RWMutex
|
||||||
|
plexSectionCache = map[string][]Section{}
|
||||||
|
)
|
||||||
|
|
||||||
|
// Refresh fans out a refresh call to every configured media server. Errors
|
||||||
|
// are logged but never returned — a failed refresh is non-fatal because the
|
||||||
|
// download itself succeeded and the next periodic scan will pick it up.
|
||||||
|
//
|
||||||
|
// `filePath` is the path of the file that was just placed (or the .strm
|
||||||
|
// pointer); it's used to resolve the matching Plex section for partial
|
||||||
|
// refreshes. Pass "" to fall back to a full refresh.
|
||||||
|
//
|
||||||
|
// Each refresh runs in its own goroutine with a 15s timeout — the call
|
||||||
|
// returns immediately and never blocks the caller.
|
||||||
|
func Refresh(servers []ServerConfig, filePath string) {
|
||||||
|
for _, s := range servers {
|
||||||
|
go func() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := refreshOne(ctx, s, filePath); err != nil {
|
||||||
|
log.Printf("mediaserver: %s refresh failed (%s): %v", s.Kind, s.URL, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func refreshOne(ctx context.Context, s ServerConfig, filePath string) error {
|
||||||
|
switch strings.ToLower(s.Kind) {
|
||||||
|
case "plex":
|
||||||
|
return refreshPlex(ctx, s, filePath)
|
||||||
|
case "jellyfin", "emby":
|
||||||
|
return refreshJellyfin(ctx, s)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown kind %q", s.Kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Plex ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
func refreshPlex(ctx context.Context, s ServerConfig, filePath string) error {
|
||||||
|
if s.URL == "" || s.Token == "" {
|
||||||
|
return fmt.Errorf("plex: missing url or token")
|
||||||
|
}
|
||||||
|
|
||||||
|
sectionIDs := s.Sections
|
||||||
|
if len(sectionIDs) == 0 {
|
||||||
|
// Auto-resolve: fetch sections, pick whichever owns the file path.
|
||||||
|
sections, err := plexSections(ctx, s.URL, s.Token)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("fetch sections: %w", err)
|
||||||
|
}
|
||||||
|
if filePath != "" {
|
||||||
|
if id, ok := matchSectionByPath(sections, filePath); ok {
|
||||||
|
sectionIDs = []int{id}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(sectionIDs) == 0 {
|
||||||
|
// Fall back to refreshing every section.
|
||||||
|
for _, sec := range sections {
|
||||||
|
sectionIDs = append(sectionIDs, sec.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var firstErr error
|
||||||
|
for _, id := range sectionIDs {
|
||||||
|
if err := plexRefreshSection(ctx, s.URL, s.Token, id, filePath); err != nil {
|
||||||
|
if firstErr == nil {
|
||||||
|
firstErr = err
|
||||||
|
}
|
||||||
|
log.Printf("mediaserver: plex section %d refresh failed: %v", id, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return firstErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func plexRefreshSection(ctx context.Context, baseURL, token string, sectionID int, filePath string) error {
|
||||||
|
q := url.Values{}
|
||||||
|
if filePath != "" {
|
||||||
|
q.Set("path", filePath)
|
||||||
|
}
|
||||||
|
q.Set("X-Plex-Token", token)
|
||||||
|
|
||||||
|
endpoint := fmt.Sprintf("%s/library/sections/%d/refresh?%s",
|
||||||
|
strings.TrimRight(baseURL, "/"), sectionID, q.Encode())
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Header.Set("Accept", "application/json")
|
||||||
|
|
||||||
|
resp, err := httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
_, _ = io.Copy(io.Discard, resp.Body)
|
||||||
|
|
||||||
|
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("status %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
// plexSections returns the cached or freshly fetched library sections for a
|
||||||
|
// Plex server. The cache lives for the agent's lifetime — Plex sections
|
||||||
|
// rarely change, so refetching on every download is wasteful.
|
||||||
|
func plexSections(ctx context.Context, baseURL, token string) ([]Section, error) {
|
||||||
|
key := baseURL + "|" + token
|
||||||
|
plexSectionMu.RLock()
|
||||||
|
cached, ok := plexSectionCache[key]
|
||||||
|
plexSectionMu.RUnlock()
|
||||||
|
if ok {
|
||||||
|
return cached, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sections, err := fetchPlexSections(ctx, baseURL, token)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
plexSectionMu.Lock()
|
||||||
|
plexSectionCache[key] = sections
|
||||||
|
plexSectionMu.Unlock()
|
||||||
|
|
||||||
|
return sections, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchPlexSections(ctx context.Context, baseURL, token string) ([]Section, error) {
|
||||||
|
endpoint := strings.TrimRight(baseURL, "/") + "/library/sections"
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Set("X-Plex-Token", token)
|
||||||
|
req.Header.Set("Accept", "application/json")
|
||||||
|
|
||||||
|
resp, err := httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("status %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return parsePlexSectionsFull(body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// parsePlexSectionsFull parses the full sections response with IDs.
|
||||||
|
func parsePlexSectionsFull(body []byte) ([]Section, error) {
|
||||||
|
var container struct {
|
||||||
|
MediaContainer struct {
|
||||||
|
Directory []struct {
|
||||||
|
Key string `json:"key"`
|
||||||
|
Title string `json:"title"`
|
||||||
|
Location []struct {
|
||||||
|
Path string `json:"path"`
|
||||||
|
} `json:"Location"`
|
||||||
|
} `json:"Directory"`
|
||||||
|
} `json:"MediaContainer"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(body, &container); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var out []Section
|
||||||
|
for _, d := range container.MediaContainer.Directory {
|
||||||
|
var id int
|
||||||
|
_, _ = fmt.Sscanf(d.Key, "%d", &id)
|
||||||
|
if id == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sec := Section{ID: id, Title: d.Title}
|
||||||
|
for _, loc := range d.Location {
|
||||||
|
if loc.Path != "" {
|
||||||
|
sec.Locations = append(sec.Locations, loc.Path)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
out = append(out, sec)
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// matchSectionByPath returns the ID of the section whose Locations contain
|
||||||
|
// (as prefix) the given file path. Picks the most specific (longest) match.
|
||||||
|
func matchSectionByPath(sections []Section, filePath string) (int, bool) {
|
||||||
|
bestID := 0
|
||||||
|
bestLen := 0
|
||||||
|
for _, s := range sections {
|
||||||
|
for _, loc := range s.Locations {
|
||||||
|
loc = strings.TrimRight(loc, "/")
|
||||||
|
if loc == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if filePath == loc || strings.HasPrefix(filePath, loc+"/") {
|
||||||
|
if len(loc) > bestLen {
|
||||||
|
bestLen = len(loc)
|
||||||
|
bestID = s.ID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bestID, bestID != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Jellyfin / Emby ─────────────────────────────────────────────────
|
||||||
|
|
||||||
|
func refreshJellyfin(ctx context.Context, s ServerConfig) error {
|
||||||
|
if s.URL == "" || s.Token == "" {
|
||||||
|
return fmt.Errorf("%s: missing url or token", s.Kind)
|
||||||
|
}
|
||||||
|
|
||||||
|
endpoint := strings.TrimRight(s.URL, "/") + "/Library/Refresh"
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Both Jellyfin and Emby accept this header.
|
||||||
|
req.Header.Set("X-Emby-Token", s.Token)
|
||||||
|
req.Header.Set("Accept", "application/json")
|
||||||
|
|
||||||
|
resp, err := httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
_, _ = io.Copy(io.Discard, resp.Body)
|
||||||
|
|
||||||
|
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("status %d", resp.StatusCode)
|
||||||
|
}
|
||||||
149
internal/mediaserver/refresh_test.go
Normal file
149
internal/mediaserver/refresh_test.go
Normal file
|
|
@ -0,0 +1,149 @@
|
||||||
|
package mediaserver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParsePlexSectionsFull(t *testing.T) {
|
||||||
|
body := []byte(`{
|
||||||
|
"MediaContainer": {
|
||||||
|
"Directory": [
|
||||||
|
{ "key": "1", "title": "Movies", "Location": [{"path": "/data/media/movies"}] },
|
||||||
|
{ "key": "2", "title": "TV Shows", "Location": [{"path": "/data/media/tv"}, {"path": "/mnt/tv2"}] },
|
||||||
|
{ "key": "0", "title": "Bogus", "Location": [{"path": "/skip"}] }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
|
sections, err := parsePlexSectionsFull(body)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("parsePlexSectionsFull error: %v", err)
|
||||||
|
}
|
||||||
|
if len(sections) != 2 {
|
||||||
|
t.Fatalf("got %d sections, want 2 (id=0 should be skipped)", len(sections))
|
||||||
|
}
|
||||||
|
if sections[0].ID != 1 || sections[0].Title != "Movies" {
|
||||||
|
t.Errorf("section[0] = %+v", sections[0])
|
||||||
|
}
|
||||||
|
if sections[1].ID != 2 || len(sections[1].Locations) != 2 {
|
||||||
|
t.Errorf("section[1] = %+v", sections[1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMatchSectionByPath(t *testing.T) {
|
||||||
|
sections := []Section{
|
||||||
|
{ID: 1, Title: "Movies", Locations: []string{"/data/media/movies"}},
|
||||||
|
{ID: 2, Title: "TV", Locations: []string{"/data/media/tv"}},
|
||||||
|
{ID: 3, Title: "TV-HD", Locations: []string{"/data/media/tv/hd"}},
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
path string
|
||||||
|
wantID int
|
||||||
|
wantOK bool
|
||||||
|
}{
|
||||||
|
{"/data/media/movies/Inception (2010)/Inception.mkv", 1, true},
|
||||||
|
{"/data/media/tv/Show/Season 01/ep.mkv", 2, true},
|
||||||
|
{"/data/media/tv/hd/Show/Season 01/ep.mkv", 3, true}, // most specific wins
|
||||||
|
{"/data/media/movies", 1, true}, // exact
|
||||||
|
{"/elsewhere/foo.mkv", 0, false},
|
||||||
|
}
|
||||||
|
for _, tc := range tests {
|
||||||
|
gotID, gotOK := matchSectionByPath(sections, tc.path)
|
||||||
|
if gotID != tc.wantID || gotOK != tc.wantOK {
|
||||||
|
t.Errorf("matchSectionByPath(%q) = (%d,%v), want (%d,%v)",
|
||||||
|
tc.path, gotID, gotOK, tc.wantID, tc.wantOK)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRefreshPlex_PartialRefreshWithPath(t *testing.T) {
|
||||||
|
var refreshHits int32
|
||||||
|
var gotPath, gotToken string
|
||||||
|
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
switch {
|
||||||
|
case r.URL.Path == "/library/sections":
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
_, _ = w.Write([]byte(`{
|
||||||
|
"MediaContainer": {
|
||||||
|
"Directory": [
|
||||||
|
{ "key": "7", "title": "Movies", "Location": [{"path": "/m"}] }
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}`))
|
||||||
|
case strings.HasPrefix(r.URL.Path, "/library/sections/7/refresh"):
|
||||||
|
atomic.AddInt32(&refreshHits, 1)
|
||||||
|
gotPath = r.URL.Query().Get("path")
|
||||||
|
gotToken = r.URL.Query().Get("X-Plex-Token")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
default:
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
// Reset cache so the test server is hit fresh.
|
||||||
|
plexSectionMu.Lock()
|
||||||
|
plexSectionCache = map[string][]Section{}
|
||||||
|
plexSectionMu.Unlock()
|
||||||
|
|
||||||
|
cfg := ServerConfig{Kind: "plex", URL: srv.URL, Token: "tk-1"}
|
||||||
|
Refresh([]ServerConfig{cfg}, "/m/Inception/Inception.mkv")
|
||||||
|
|
||||||
|
// Refresh fans out goroutines — give it time.
|
||||||
|
deadline := time.Now().Add(2 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if atomic.LoadInt32(&refreshHits) > 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
if atomic.LoadInt32(&refreshHits) != 1 {
|
||||||
|
t.Fatalf("refresh endpoint hit %d times, want 1", refreshHits)
|
||||||
|
}
|
||||||
|
if gotPath != "/m/Inception/Inception.mkv" {
|
||||||
|
t.Errorf("path query = %q", gotPath)
|
||||||
|
}
|
||||||
|
if gotToken != "tk-1" {
|
||||||
|
t.Errorf("token query = %q", gotToken)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRefreshJellyfin(t *testing.T) {
|
||||||
|
var hits int32
|
||||||
|
var gotToken string
|
||||||
|
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method == http.MethodPost && r.URL.Path == "/Library/Refresh" {
|
||||||
|
atomic.AddInt32(&hits, 1)
|
||||||
|
gotToken = r.Header.Get("X-Emby-Token")
|
||||||
|
w.WriteHeader(http.StatusNoContent)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusNotFound)
|
||||||
|
}))
|
||||||
|
defer srv.Close()
|
||||||
|
|
||||||
|
cfg := ServerConfig{Kind: "jellyfin", URL: srv.URL, Token: "jf-key"}
|
||||||
|
Refresh([]ServerConfig{cfg}, "")
|
||||||
|
|
||||||
|
deadline := time.Now().Add(2 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if atomic.LoadInt32(&hits) > 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if atomic.LoadInt32(&hits) != 1 {
|
||||||
|
t.Fatalf("Jellyfin hits = %d, want 1", hits)
|
||||||
|
}
|
||||||
|
if gotToken != "jf-key" {
|
||||||
|
t.Errorf("X-Emby-Token = %q", gotToken)
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue