Daemon now runs engine.DetectHWAccelDiagnostic at startup (instead of the lighter DetectHWAccel) and ships the full picture — ffmpeg version, resolved binary path, HW encoders compiled in, device files / drivers detected — up to the server in the RegisterRequest payload. Why: the most common cause of slow first-play is a software-only ffmpeg build. Surfacing the diagnostic in the web AgentsTab "Diagnose transcoder" modal lets a user see *why* their backend landed on libx264 (e.g. brew's default formula ships without --enable-nvenc, or the container is missing /dev/nvidia0) without SSHing in to run `unarr probe-hwaccel` manually. Also emits a single `[transcode]` startup log line summarising the same data — convenient for `journalctl --user -u unarr | grep transcode`. Bounded by a 10 s context so a hung ffmpeg binary can't stall daemon startup forever.
942 lines
31 KiB
Go
942 lines
31 KiB
Go
package cmd
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"log"
|
||
"os"
|
||
"os/signal"
|
||
"path/filepath"
|
||
"strings"
|
||
"syscall"
|
||
"time"
|
||
|
||
"github.com/fatih/color"
|
||
"github.com/spf13/cobra"
|
||
"github.com/torrentclaw/unarr/internal/agent"
|
||
"github.com/torrentclaw/unarr/internal/config"
|
||
"github.com/torrentclaw/unarr/internal/engine"
|
||
"github.com/torrentclaw/unarr/internal/funnel"
|
||
"github.com/torrentclaw/unarr/internal/library"
|
||
"github.com/torrentclaw/unarr/internal/library/mediainfo"
|
||
"github.com/torrentclaw/unarr/internal/usenet/download"
|
||
"github.com/torrentclaw/unarr/internal/vpn"
|
||
)
|
||
|
||
// newStartCmd creates the top-level `unarr start` command.
|
||
func newStartCmd() *cobra.Command {
|
||
return &cobra.Command{
|
||
Use: "start",
|
||
Short: "Start the download daemon (foreground)",
|
||
Long: `Start the unarr daemon in the foreground.
|
||
|
||
Registers with the server, receives download tasks via periodic sync,
|
||
and executes them using the configured download method.
|
||
Supports torrent, debrid, and usenet downloads concurrently.
|
||
|
||
The daemon syncs state with the server every 3s when someone is viewing
|
||
the web dashboard, or every 60s when idle. Press Ctrl+C to stop
|
||
gracefully — active downloads get up to 30 seconds to finish.
|
||
|
||
Requires: API key, agent ID, and download directory (run 'unarr init' first).
|
||
|
||
To run as a background service, use 'unarr daemon install' instead.`,
|
||
Example: ` unarr start
|
||
unarr start --config /path/to/config.toml`,
|
||
RunE: func(cmd *cobra.Command, args []string) error {
|
||
return runDaemonStart()
|
||
},
|
||
}
|
||
}
|
||
|
||
// newStopCmd creates the top-level `unarr stop` command.
|
||
func newStopCmd() *cobra.Command {
|
||
return &cobra.Command{
|
||
Use: "stop",
|
||
Short: "Stop the running daemon",
|
||
Long: `Stop the unarr daemon gracefully.
|
||
|
||
Reads the daemon PID from the state file and sends a graceful stop signal.
|
||
Works regardless of whether the daemon was started in the foreground or as a service.
|
||
|
||
To stop a service-managed daemon and prevent auto-restart, use 'unarr daemon stop' instead.`,
|
||
Example: ` unarr stop`,
|
||
RunE: func(cmd *cobra.Command, args []string) error {
|
||
return stopDaemonByPID()
|
||
},
|
||
}
|
||
}
|
||
|
||
// newDaemonCmd creates `unarr daemon` for administrative subcommands.
|
||
func newDaemonCmd() *cobra.Command {
|
||
cmd := &cobra.Command{
|
||
Use: "daemon <command>",
|
||
Short: "Manage the daemon as a system service",
|
||
Long: `Install, control and inspect the unarr daemon as a system service.
|
||
|
||
Linux: systemd user service (~/.config/systemd/user/unarr.service)
|
||
macOS: launchd agent (~/Library/LaunchAgents/com.torrentclaw.unarr.plist)
|
||
Windows: Task Scheduler task (runs at logon)`,
|
||
Example: ` unarr daemon install
|
||
unarr daemon start
|
||
unarr daemon status
|
||
unarr daemon logs -f
|
||
unarr daemon reload
|
||
unarr daemon restart
|
||
unarr daemon stop
|
||
unarr daemon uninstall`,
|
||
}
|
||
|
||
cmd.AddCommand(
|
||
newDaemonInstallCmdReal(),
|
||
newDaemonUninstallCmdReal(),
|
||
newDaemonStartCmd(),
|
||
newDaemonStopCmd(),
|
||
newDaemonRestartCmd(),
|
||
newDaemonSvcStatusCmd(),
|
||
newDaemonLogsCmd(),
|
||
newDaemonReloadCmd(),
|
||
)
|
||
|
||
return cmd
|
||
}
|
||
|
||
func runDaemonStart() error {
|
||
cfg := loadConfig()
|
||
bold := color.New(color.Bold)
|
||
|
||
// Validate config
|
||
if cfg.Auth.APIKey == "" {
|
||
return fmt.Errorf("no API key configured — run 'unarr init' first")
|
||
}
|
||
if cfg.Agent.ID == "" {
|
||
return fmt.Errorf("no agent ID — run 'unarr init' first")
|
||
}
|
||
if cfg.Download.Dir == "" {
|
||
return fmt.Errorf("no download directory — run 'unarr init' first")
|
||
}
|
||
|
||
// Validate configured paths are safe
|
||
if err := cfg.ValidatePaths(); err != nil {
|
||
return fmt.Errorf("unsafe configuration: %w", err)
|
||
}
|
||
|
||
// Ensure download dir exists
|
||
if err := os.MkdirAll(cfg.Download.Dir, 0o755); err != nil {
|
||
return fmt.Errorf("create download dir: %w", err)
|
||
}
|
||
|
||
// Clean up stale resume files (>7 days old)
|
||
resumeDir := filepath.Join(config.DataDir(), "resume")
|
||
if removed := download.CleanStaleFiles(resumeDir, 7*24*time.Hour); removed > 0 {
|
||
log.Printf("Cleaned %d stale resume file(s)", removed)
|
||
}
|
||
|
||
fmt.Println()
|
||
bold.Println(" unarr Daemon")
|
||
fmt.Println()
|
||
|
||
userAgent := "unarr/" + Version
|
||
|
||
// Probe HW accel + derive a sensible transcode resolution cap. The cap
|
||
// is what the web side uses to decide whether the user should pre-empt
|
||
// transcoding by downloading a smaller version (4K source on a software
|
||
// libx264-only host is the canonical case where pre-download wins).
|
||
//
|
||
// Use the full diagnostic (encoders + devices + ffmpeg version) instead
|
||
// of just the picked backend — the extra fields ride along in the
|
||
// register payload so the web "Diagnose transcoder" modal can show *why*
|
||
// libx264 was selected on a host with a GPU (e.g. brew's ffmpeg without
|
||
// --enable-nvenc). 10 s ceiling so a hung ffmpeg binary can't stall
|
||
// startup forever.
|
||
ffmpegResolved, _ := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath)
|
||
probeCtx, probeCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
hwDiag := engine.DetectHWAccelDiagnostic(probeCtx, ffmpegResolved)
|
||
probeCancel()
|
||
log.Println(hwDiag.LogLine())
|
||
hwAccelPick := hwDiag.Pick
|
||
maxTranscodeHeight := 1080
|
||
if hwAccelPick != engine.HWAccelNone {
|
||
maxTranscodeHeight = 2160
|
||
}
|
||
|
||
// Create daemon config
|
||
daemonCfg := agent.DaemonConfig{
|
||
AgentID: cfg.Agent.ID,
|
||
AgentName: cfg.Agent.Name,
|
||
Version: Version,
|
||
DownloadDir: cfg.Download.Dir,
|
||
StreamPort: cfg.Download.StreamPort,
|
||
LanIP: engine.LanIP(),
|
||
TailscaleIP: engine.TailscaleIP(),
|
||
CanDelete: cfg.Library.AllowDelete,
|
||
ScanPaths: library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath),
|
||
HWAccel: string(hwAccelPick),
|
||
MaxTranscodeHeight: maxTranscodeHeight,
|
||
FFmpegVersion: hwDiag.FFmpegVersion,
|
||
FFmpegPath: hwDiag.FFmpegPath,
|
||
HWEncoders: hwDiag.Encoders,
|
||
HWDevices: hwDiag.Devices,
|
||
AutoUpgrade: cfg.Daemon.AutoUpgradeEnabled(),
|
||
}
|
||
|
||
// Create HTTP client with mirror failover so a `.com` block-out rolls
|
||
// over to `.to` / .onion without restarting the daemon.
|
||
agentClient := newAgentClientFromConfig(cfg, userAgent)
|
||
log.Printf("Transport: HTTP sync → %s (mirrors: %d)", cfg.Auth.APIURL, len(cfg.Auth.Mirrors))
|
||
|
||
// Create daemon
|
||
d := agent.NewDaemon(daemonCfg, agentClient)
|
||
|
||
// Start SIGUSR1 reload watcher (unix only, no-op on Windows)
|
||
startReloadWatcher(&ReloadableConfig{Daemon: d})
|
||
|
||
// Daemon-scoped context — cancelled on shutdown
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
|
||
// Parse speed limits
|
||
maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed)
|
||
maxUl, _ := config.ParseSpeed(cfg.Download.MaxUploadSpeed)
|
||
|
||
// Parse torrent timeouts
|
||
metaTimeout, _ := time.ParseDuration(cfg.Download.MetadataTimeout)
|
||
stallTimeout, _ := time.ParseDuration(cfg.Download.StallTimeout)
|
||
|
||
// Create progress reporter — only used for stream tasks (handleStreamTask)
|
||
// The sync goroutine handles all regular progress reporting.
|
||
statusInterval, _ := time.ParseDuration(cfg.Daemon.StatusInterval)
|
||
if statusInterval == 0 {
|
||
statusInterval = 3 * time.Second
|
||
}
|
||
reporter := engine.NewProgressReporter(agentClient, statusInterval)
|
||
reporter.SetWatchingFunc(func() bool { return d.Watching.Load() })
|
||
|
||
// Managed-VPN add-on: bring up the in-process WireGuard split-tunnel before
|
||
// the torrent client so peer + tracker traffic routes through it. Failure is
|
||
// non-fatal — log and download in the clear (better than refusing to run).
|
||
var vpnTunnel *vpn.Tunnel
|
||
if cfg.Download.VPN.ConfigFile != "" {
|
||
// Self-hosted / personal-VPN mode: read a local .conf directly.
|
||
raw, rerr := os.ReadFile(cfg.Download.VPN.ConfigFile)
|
||
if rerr != nil {
|
||
log.Printf("[vpn] could not read config_file %q (%v) — downloading in the clear", cfg.Download.VPN.ConfigFile, rerr)
|
||
} else if t, uerr := vpn.Up(string(raw)); uerr != nil {
|
||
log.Printf("[vpn] tunnel failed to start from config_file (%v) — downloading in the clear", uerr)
|
||
} else {
|
||
vpnTunnel = t
|
||
defer vpnTunnel.Close()
|
||
log.Printf("[vpn] managed VPN active (local config_file) — torrent traffic split-tunnelled through WireGuard")
|
||
}
|
||
} else if cfg.Download.VPN.Enabled {
|
||
apiURL := cfg.Auth.APIURL
|
||
if apiURL == "" {
|
||
apiURL = "https://torrentclaw.com"
|
||
}
|
||
fetchCtx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
|
||
conf, ferr := vpn.FetchConfig(fetchCtx, apiURL, cfg.Auth.APIKey, "unarr/"+Version, cfg.Agent.ID, false)
|
||
cancel()
|
||
var fe *vpn.FetchError
|
||
switch {
|
||
case ferr != nil && errors.As(ferr, &fe) && fe.Code == vpn.ErrSlotOnDevice:
|
||
log.Printf("[vpn] the single WireGuard slot is already held by another unarr agent — this one downloads in the clear. To protect this machine too, set up OpenVPN on it (1 agent uses WireGuard, the rest use OpenVPN — up to 10). See https://torrentclaw.com/vpn")
|
||
case ferr != nil:
|
||
log.Printf("[vpn] could not enable VPN (%v) — downloading in the clear", ferr)
|
||
default:
|
||
if t, uerr := vpn.Up(conf); uerr != nil {
|
||
log.Printf("[vpn] tunnel failed to start (%v) — downloading in the clear", uerr)
|
||
} else {
|
||
vpnTunnel = t
|
||
defer vpnTunnel.Close()
|
||
log.Printf("[vpn] managed VPN active — torrent traffic split-tunnelled through WireGuard")
|
||
}
|
||
}
|
||
}
|
||
|
||
// Record VPN split-tunnel state for `unarr vpn status`.
|
||
if vpnTunnel != nil {
|
||
mode := "managed"
|
||
if cfg.Download.VPN.ConfigFile != "" {
|
||
mode = "self-hosted"
|
||
}
|
||
d.SetVPNState(true, mode, vpnTunnel.Endpoint)
|
||
}
|
||
|
||
// Create torrent downloader
|
||
torrentDl, err := engine.NewTorrentDownloader(engine.TorrentConfig{
|
||
DataDir: cfg.Download.Dir,
|
||
MetadataTimeout: metaTimeout,
|
||
StallTimeout: stallTimeout,
|
||
MaxTimeout: 0,
|
||
MaxDownloadRate: maxDl,
|
||
MaxUploadRate: maxUl,
|
||
ListenPort: cfg.Download.ListenPort,
|
||
SeedEnabled: false,
|
||
VPNTunnel: vpnTunnel,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("create torrent downloader: %w", err)
|
||
}
|
||
|
||
if maxDl > 0 || maxUl > 0 {
|
||
dlStr, ulStr := "unlimited", "unlimited"
|
||
if maxDl > 0 {
|
||
dlStr = formatSpeedLog(maxDl)
|
||
}
|
||
if maxUl > 0 {
|
||
ulStr = formatSpeedLog(maxUl)
|
||
}
|
||
log.Printf("Speed limits: download=%s upload=%s", dlStr, ulStr)
|
||
}
|
||
|
||
// Create debrid downloader
|
||
debridDl := engine.NewDebridDownloader()
|
||
|
||
// Create download manager
|
||
manager := engine.NewManager(engine.ManagerConfig{
|
||
MaxConcurrent: cfg.Download.MaxConcurrent,
|
||
OutputDir: cfg.Download.Dir,
|
||
Notifications: cfg.Notifications.Enabled,
|
||
Organize: engine.OrganizeConfig{
|
||
Enabled: cfg.Organize.Enabled,
|
||
MoviesDir: cfg.Organize.MoviesDir,
|
||
TVShowsDir: cfg.Organize.TVShowsDir,
|
||
OutputDir: cfg.Download.Dir,
|
||
},
|
||
}, reporter, torrentDl, debridDl, engine.NewUsenetDownloader(agentClient))
|
||
|
||
// Create persistent stream server
|
||
streamSrv := engine.NewStreamServer(cfg.Download.StreamPort)
|
||
streamSrv.SetUPnPEnabled(cfg.Download.EnableUPnP)
|
||
// CORS extras = operator config + dynamic mirror list from /api/mirrors.
|
||
// Without the mirror merge, a user playing from `torrentclaw.to` (or any
|
||
// future mirror) hits the daemon, gets 200 + body, but no
|
||
// `Access-Control-Allow-Origin` → browser drops the response → player
|
||
// reports "404 todos los canales". Fetching /api/mirrors at startup
|
||
// future-proofs against mirror additions without a CLI rebuild.
|
||
corsExtras := append([]string(nil), cfg.Download.CORSExtraOrigins...)
|
||
corsExtras = append(corsExtras, mirrorCORSOrigins(ctx, cfg, userAgent)...)
|
||
streamSrv.SetCORSAllowedOrigins(corsExtras)
|
||
// Reap HLS tmpdirs left over from a previous daemon run before we start
|
||
// accepting new sessions. The in-memory registry doesn't survive a
|
||
// restart, so without this disk usage grows unbounded across restarts.
|
||
if err := engine.CleanupHLSOrphanDirs(); err != nil {
|
||
log.Printf("[hls] orphan tmpdir cleanup: %v", err)
|
||
}
|
||
|
||
// Persistent HLS segment cache — survives across sessions so re-plays
|
||
// of the same file at the same quality skip ffmpeg entirely. Off when
|
||
// hls_cache.enabled = false; size cap from hls_cache.size_gb; path from
|
||
// hls_cache.dir (defaults to ~/.cache/unarr/hls-cache).
|
||
var hlsCache *engine.HLSCache
|
||
if cfg.Download.HLSCache.Enabled {
|
||
cacheDir := cfg.Download.HLSCache.Dir
|
||
if cacheDir == "" {
|
||
if base, err := os.UserCacheDir(); err == nil {
|
||
cacheDir = filepath.Join(base, "unarr", "hls-cache")
|
||
} else {
|
||
cacheDir = filepath.Join(os.TempDir(), "unarr-hls-cache")
|
||
}
|
||
}
|
||
c, err := engine.NewHLSCache(cacheDir, cfg.Download.HLSCache.SizeGB)
|
||
if err != nil {
|
||
log.Printf("[hls_cache] init failed (%v) — falling back to per-session tmpdirs", err)
|
||
} else {
|
||
hlsCache = c
|
||
hlsCache.StartSweeper(ctx, time.Hour)
|
||
log.Printf("[hls_cache] enabled: dir=%s budget=%dGB", cacheDir, cfg.Download.HLSCache.SizeGB)
|
||
}
|
||
} else {
|
||
log.Printf("[hls_cache] disabled by config — every play re-encodes from scratch")
|
||
}
|
||
if err := streamSrv.Listen(ctx); err != nil {
|
||
return fmt.Errorf("start stream server: %w", err)
|
||
}
|
||
d.UpdateStreamPort(streamSrv.Port())
|
||
|
||
// CloudFlare Quick Tunnel — needs the ACTUAL listening port (the
|
||
// configured port may have been busy and bumped). Spawning here ensures
|
||
// cloudflared --url points at the right socket. Failures degrade to
|
||
// Tailscale/LAN only; the supervisor keeps the tunnel up across CF's
|
||
// periodic rotation + transient cloudflared crashes.
|
||
if cfg.Download.Funnel.Enabled {
|
||
go superviseFunnel(ctx, d, streamSrv.Port())
|
||
}
|
||
|
||
// Warn at startup if transcode is enabled but ffmpeg/ffprobe are missing.
|
||
// HLS sessions get rejected at runtime (see daemon.go ~line 455), but
|
||
// surfacing it here gives the operator a chance to install ffmpeg before
|
||
// a user hits a confusing "rejected" line in the logs.
|
||
if cfg.Download.Transcode.Enabled {
|
||
if _, err := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath); err != nil {
|
||
log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS")
|
||
} else if _, err := mediainfo.ResolveFFprobe(cfg.Library.FFprobePath); err != nil {
|
||
log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS")
|
||
}
|
||
}
|
||
|
||
// Wire sync client callbacks
|
||
sc := d.SyncClient()
|
||
sc.GetFreeSlots = manager.FreeSlots
|
||
sc.GetTaskStates = manager.TaskStates
|
||
d.GetActiveCount = manager.ActiveCount
|
||
|
||
// Trigger immediate sync when a download slot frees up
|
||
manager.OnTaskDone = func() { d.TriggerSync() }
|
||
|
||
// Wire: sync receives new tasks → submit to manager or handle stream
|
||
d.OnTasksClaimed = func(tasks []agent.Task) {
|
||
for _, t := range tasks {
|
||
if t.Mode == "stream" {
|
||
if isStreamingTask(t.ID) {
|
||
continue
|
||
}
|
||
cancelStreamContexts()
|
||
streamSrv.ClearFile()
|
||
streamCtx, streamCancel := context.WithCancel(ctx) //nolint:gosec // G118: cancel stored in registry
|
||
streamRegistry.mu.Lock()
|
||
streamRegistry.cancels[t.ID] = streamCancel
|
||
streamRegistry.mu.Unlock()
|
||
go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv)
|
||
} else {
|
||
manager.Submit(ctx, t)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Wire: sync receives control signals → act on manager
|
||
d.OnControlAction = func(action, taskID string, deleteFiles bool) {
|
||
switch action {
|
||
case "cancel":
|
||
if deleteFiles {
|
||
manager.CancelAndDeleteFiles(taskID)
|
||
} else {
|
||
manager.CancelTask(taskID)
|
||
}
|
||
cancelStreamTask(taskID)
|
||
if streamSrv.CurrentTaskID() == taskID {
|
||
streamSrv.ClearFile()
|
||
}
|
||
case "pause":
|
||
manager.PauseTask(taskID)
|
||
cancelStreamTask(taskID)
|
||
if streamSrv.CurrentTaskID() == taskID {
|
||
streamSrv.ClearFile()
|
||
}
|
||
case "resume":
|
||
log.Printf("[%s] resume requested, triggering sync", agent.ShortID(taskID))
|
||
d.TriggerSync()
|
||
case "stream":
|
||
if streamSrv.CurrentTaskID() == taskID {
|
||
return
|
||
}
|
||
task := manager.GetTask(taskID)
|
||
if task == nil || task.GetStreamURL() != "" {
|
||
return
|
||
}
|
||
provider, err := torrentDl.GetStreamProvider(taskID)
|
||
if err != nil {
|
||
log.Printf("[%s] stream failed: %v", agent.ShortID(taskID), err)
|
||
return
|
||
}
|
||
cancelStreamContexts()
|
||
streamSrv.SetFile(provider, taskID)
|
||
task.SetStreamURL(streamSrv.URLsJSON())
|
||
log.Printf("[%s] streaming: %s", agent.ShortID(taskID), provider.FileName())
|
||
|
||
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
|
||
streamRegistry.mu.Lock()
|
||
streamRegistry.cancels["watch:"+taskID] = watchCancel
|
||
streamRegistry.mu.Unlock()
|
||
go engine.NewWatchReporter(agentClient, streamSrv, taskID).Run(watchCtx)
|
||
case "stop-stream":
|
||
cancelStreamTask(taskID)
|
||
if streamSrv.CurrentTaskID() == taskID {
|
||
streamSrv.ClearFile()
|
||
}
|
||
}
|
||
}
|
||
|
||
// Wire: sync receives file deletion requests from the server
|
||
if cfg.Library.AllowDelete && len(daemonCfg.ScanPaths) > 0 {
|
||
sc.OnDeleteFiles = func(items []agent.LibraryDeleteRequest) []int {
|
||
return library.DeleteFiles(items, daemonCfg.ScanPaths)
|
||
}
|
||
}
|
||
|
||
// Wire: sync receives stream requests for completed downloads
|
||
d.OnStreamRequested = func(sr agent.StreamRequest) {
|
||
if streamSrv.CurrentTaskID() == sr.TaskID {
|
||
// Already serving — notify server it's ready
|
||
go func() {
|
||
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
||
TaskID: sr.TaskID,
|
||
StreamReady: true,
|
||
}); err != nil {
|
||
log.Printf("[%s] stream ready re-notify failed: %v", agent.ShortID(sr.TaskID), err)
|
||
}
|
||
}()
|
||
return
|
||
}
|
||
|
||
filePath := filepath.Clean(sr.FilePath)
|
||
if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath,
|
||
cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) {
|
||
log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath)
|
||
go func() {
|
||
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
||
TaskID: sr.TaskID,
|
||
Status: "failed",
|
||
ErrorMessage: fmt.Sprintf("path outside allowed dirs: %s", filePath),
|
||
}); err != nil {
|
||
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
||
}
|
||
}()
|
||
return
|
||
}
|
||
info, err := os.Stat(filePath)
|
||
if err != nil {
|
||
log.Printf("[%s] stream request: file not found: %s", agent.ShortID(sr.TaskID), filePath)
|
||
go func() {
|
||
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
||
TaskID: sr.TaskID,
|
||
Status: "failed",
|
||
ErrorMessage: fmt.Sprintf("file not found: %s", filePath),
|
||
}); err != nil {
|
||
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
||
}
|
||
}()
|
||
return
|
||
}
|
||
|
||
if info.IsDir() {
|
||
found := engine.FindVideoFile(filePath)
|
||
if found == "" {
|
||
log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath)
|
||
go func() {
|
||
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
||
TaskID: sr.TaskID,
|
||
Status: "failed",
|
||
ErrorMessage: fmt.Sprintf("no video file in directory: %s", filePath),
|
||
}); err != nil {
|
||
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
||
}
|
||
}()
|
||
return
|
||
}
|
||
filePath = found
|
||
log.Printf("[%s] resolved directory to video file: %s", agent.ShortID(sr.TaskID), filepath.Base(filePath))
|
||
}
|
||
|
||
cancelStreamContexts()
|
||
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID)
|
||
log.Printf("[%s] streaming from disk: %s → %s", agent.ShortID(sr.TaskID), filepath.Base(filePath), streamSrv.URL())
|
||
|
||
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
|
||
streamRegistry.mu.Lock()
|
||
streamRegistry.cancels["watch:"+sr.TaskID] = watchCancel
|
||
streamRegistry.mu.Unlock()
|
||
go engine.NewWatchReporter(agentClient, streamSrv, sr.TaskID).Run(watchCtx)
|
||
|
||
go func() {
|
||
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
||
TaskID: sr.TaskID,
|
||
StreamReady: true,
|
||
}); err != nil {
|
||
log.Printf("[%s] stream ready report failed: %v", agent.ShortID(sr.TaskID), err)
|
||
}
|
||
}()
|
||
}
|
||
|
||
// Wire: sync receives HLS streaming session requests. Each session spawns
|
||
// one ffmpeg process and registers its HLS playlist with the StreamServer.
|
||
// Validate FilePath against allowed dirs to prevent path traversal abuse
|
||
// from a compromised server.
|
||
d.OnStreamSession = func(sess agent.StreamSession) {
|
||
if playerSessionRegistry.has(sess.SessionID) {
|
||
return // already running
|
||
}
|
||
filePath := sess.FilePath
|
||
if filePath == "" {
|
||
log.Printf("[hls %s] rejected: empty file path", agent.ShortID(sess.SessionID))
|
||
return
|
||
}
|
||
filePath = filepath.Clean(filePath)
|
||
if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath,
|
||
cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) {
|
||
log.Printf("[hls %s] rejected: path outside allowed dirs: %s",
|
||
agent.ShortID(sess.SessionID), filePath)
|
||
return
|
||
}
|
||
// Resolve directory → first video file (matches StreamRequest behavior).
|
||
if info, err := os.Stat(filePath); err == nil && info.IsDir() {
|
||
found := engine.FindVideoFile(filePath)
|
||
if found == "" {
|
||
log.Printf("[hls %s] rejected: no video file in dir %s",
|
||
agent.ShortID(sess.SessionID), filePath)
|
||
return
|
||
}
|
||
filePath = found
|
||
}
|
||
|
||
tcRuntime := buildTranscodeRuntime(ctx, cfg)
|
||
if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" {
|
||
log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID))
|
||
return
|
||
}
|
||
hlsCtx, hlsCancel := context.WithCancel(ctx)
|
||
playerSessionRegistry.add(sess.SessionID, hlsCancel)
|
||
hlsCfg := engine.HLSSessionConfig{
|
||
SessionID: sess.SessionID,
|
||
SourcePath: filePath,
|
||
FileName: sess.FileName,
|
||
Quality: sess.Quality,
|
||
AudioIndex: sess.AudioIndex,
|
||
Transcode: tcRuntime,
|
||
Cache: hlsCache,
|
||
}
|
||
// StartHLSSession runs ffprobe (15 s cap, typical 0.3–1 s) before
|
||
// returning. Doing this synchronously inside the sync handler holds
|
||
// the next sync HTTP cycle until ffprobe is done, so any other
|
||
// pending actions (new tasks, deletes) wait too. Hand it off so
|
||
// the sync loop returns immediately — browser HEAD probes already
|
||
// have a 30 s retry budget that absorbs the gap until
|
||
// `streamSrv.HLS().Register` lands.
|
||
go func() {
|
||
hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg)
|
||
if err != nil {
|
||
playerSessionRegistry.remove(sess.SessionID)
|
||
hlsCancel()
|
||
log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err)
|
||
return
|
||
}
|
||
streamSrv.HLS().Register(hsess)
|
||
}()
|
||
}
|
||
|
||
// Periodic DHT node persistence (every 5 min)
|
||
go func() {
|
||
ticker := time.NewTicker(5 * time.Minute)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
torrentDl.SaveDhtNodes()
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
// Periodic HLS session sweeper (every 5 min). Closes sessions whose last
|
||
// segment fetch was over 30 min ago — kills the orphan ffmpeg + removes
|
||
// the per-session tmpdir, so a tab that died mid-stream doesn't leak
|
||
// disk space until daemon shutdown.
|
||
go func() {
|
||
ticker := time.NewTicker(5 * time.Minute)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
if n := streamSrv.HLS().SweepIdle(); n > 0 {
|
||
log.Printf("[hls] swept %d idle session(s)", n)
|
||
}
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
// Start auto-scan goroutine
|
||
scanPaths := daemonCfg.ScanPaths
|
||
if len(scanPaths) > 0 && cfg.Library.AutoScan {
|
||
scanInterval := 24 * time.Hour
|
||
if cfg.Library.ScanInterval != "" {
|
||
if parsed, err := time.ParseDuration(cfg.Library.ScanInterval); err == nil && parsed > 0 {
|
||
scanInterval = parsed
|
||
}
|
||
}
|
||
go runAutoScan(ctx, cfg, scanInterval, agentClient, d.ScanNow, scanPaths)
|
||
}
|
||
|
||
// Start reporter only for stream task handling
|
||
go reporter.Run(ctx)
|
||
|
||
// Start daemon (blocks — runs sync loop)
|
||
errCh := make(chan error, 1)
|
||
go func() {
|
||
errCh <- d.Run(ctx)
|
||
}()
|
||
|
||
// Start idle guard for the persistent stream server
|
||
go startIdleGuard(ctx, streamSrv)
|
||
|
||
// Signal handling
|
||
sigCh := make(chan os.Signal, 1)
|
||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||
|
||
// Wait for signal or error
|
||
select {
|
||
case sig := <-sigCh:
|
||
fmt.Printf("\n Received %s, shutting down...\n", sig)
|
||
cancelStreamContexts()
|
||
cancelAllPlayerSessions()
|
||
streamSrv.Shutdown(context.Background())
|
||
cancel()
|
||
|
||
// Give active downloads 30s to finish
|
||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||
defer shutdownCancel()
|
||
manager.Shutdown(shutdownCtx)
|
||
|
||
d.Deregister()
|
||
fmt.Println(" Daemon stopped.")
|
||
return nil
|
||
|
||
case err := <-errCh:
|
||
cancelStreamContexts()
|
||
cancelAllPlayerSessions()
|
||
streamSrv.Shutdown(context.Background())
|
||
cancel()
|
||
return err
|
||
}
|
||
}
|
||
|
||
// isAllowedStreamPath checks that filePath is within one of the directories
|
||
// the daemon is configured to manage. This defends against a compromised API
|
||
// server sending a path traversal payload (e.g. /etc/passwd) in StreamRequest.
|
||
// isAllowedStreamPath reports whether filePath is contained within one of the
|
||
// allowedDirs. filePath must already be cleaned (filepath.Clean) by the caller.
|
||
// This defends against a compromised API server sending a path traversal payload.
|
||
func isAllowedStreamPath(filePath string, allowedDirs ...string) bool {
|
||
for _, dir := range allowedDirs {
|
||
if dir == "" {
|
||
continue
|
||
}
|
||
rel, err := filepath.Rel(filepath.Clean(dir), filePath)
|
||
if err == nil && !strings.HasPrefix(rel, "..") {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func formatSpeedLog(bps int64) string {
|
||
switch {
|
||
case bps >= 1024*1024*1024:
|
||
return fmt.Sprintf("%.1f GB/s", float64(bps)/(1024*1024*1024))
|
||
case bps >= 1024*1024:
|
||
return fmt.Sprintf("%.1f MB/s", float64(bps)/(1024*1024))
|
||
case bps >= 1024:
|
||
return fmt.Sprintf("%.0f KB/s", float64(bps)/1024)
|
||
default:
|
||
return fmt.Sprintf("%d B/s", bps)
|
||
}
|
||
}
|
||
|
||
// runAutoScan runs a library scan + sync on a timer or on-demand via scanNow channel.
|
||
// It scans all provided paths and syncs each independently so stale-item cleanup
|
||
// is scoped to the correct directory prefix on the server.
|
||
func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, ac *agent.Client, scanNow <-chan struct{}, scanPaths []string) {
|
||
log.Printf("[auto-scan] enabled: every %s, paths: %v", interval, scanPaths)
|
||
|
||
select {
|
||
case <-time.After(30 * time.Second):
|
||
case <-scanNow:
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
|
||
doScan := func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Printf("[auto-scan] panic recovered: %v", r)
|
||
}
|
||
}()
|
||
log.Printf("[auto-scan] starting scan of %v", scanPaths)
|
||
|
||
existing, _ := library.LoadCache()
|
||
|
||
workers := cfg.Library.Workers
|
||
if workers == 0 {
|
||
workers = 8
|
||
}
|
||
|
||
scanOpts := library.ScanOptions{
|
||
Workers: workers,
|
||
FFprobePath: cfg.Library.FFprobePath,
|
||
Incremental: existing != nil,
|
||
}
|
||
|
||
// Scan each path independently and sync per path so the server can
|
||
// scope stale-item deletion to the correct directory prefix.
|
||
const batchSize = 100
|
||
totalSynced := 0
|
||
var mergedItems []library.LibraryItem
|
||
|
||
for _, scanPath := range scanPaths {
|
||
cache, err := library.Scan(ctx, scanPath, existing, scanOpts)
|
||
if err != nil {
|
||
log.Printf("[auto-scan] scan failed for %s: %v", scanPath, err)
|
||
continue
|
||
}
|
||
mergedItems = append(mergedItems, cache.Items...)
|
||
|
||
items := library.BuildSyncItems(cache)
|
||
if len(items) == 0 {
|
||
log.Printf("[auto-scan] no items under %s", scanPath)
|
||
continue
|
||
}
|
||
|
||
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
|
||
for i := 0; i < len(items); i += batchSize {
|
||
end := i + batchSize
|
||
if end > len(items) {
|
||
end = len(items)
|
||
}
|
||
isLast := end >= len(items)
|
||
|
||
_, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
|
||
Items: items[i:end],
|
||
ScanPath: scanPath,
|
||
IsLastBatch: isLast,
|
||
SyncStartedAt: syncStartedAt,
|
||
})
|
||
if err != nil {
|
||
log.Printf("[auto-scan] sync failed for %s: %v", scanPath, err)
|
||
break
|
||
}
|
||
}
|
||
totalSynced += len(items)
|
||
}
|
||
|
||
// Save merged cache for incremental scanning next time.
|
||
if len(mergedItems) > 0 {
|
||
mergedCache := &library.LibraryCache{
|
||
ScannedAt: time.Now().UTC().Format(time.RFC3339),
|
||
Path: scanPaths[0],
|
||
Items: mergedItems,
|
||
}
|
||
if err := library.SaveCache(mergedCache); err != nil {
|
||
log.Printf("[auto-scan] save cache failed: %v", err)
|
||
}
|
||
}
|
||
|
||
log.Printf("[auto-scan] synced %d items across %d path(s)", totalSynced, len(scanPaths))
|
||
}
|
||
|
||
doScan()
|
||
|
||
ticker := time.NewTicker(interval)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
doScan()
|
||
case <-scanNow:
|
||
log.Printf("[auto-scan] on-demand scan triggered")
|
||
ticker.Reset(interval)
|
||
doScan()
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// superviseFunnel keeps a CloudFlare Quick Tunnel up across cloudflared
|
||
// crashes and CF's ~6h tunnel rotation. On a clean exit (cancellation) it
|
||
// returns; on a crash it clears the reported URL and respawns with an
|
||
// exponential backoff so we don't hammer cloudflared into a tight loop when
|
||
// it can't reach the CF edge.
|
||
func superviseFunnel(ctx context.Context, d *agent.Daemon, port int) {
|
||
backoff := 2 * time.Second
|
||
const maxBackoff = 5 * time.Minute
|
||
for ctx.Err() == nil {
|
||
t, err := funnel.Start(ctx, funnel.Config{Port: port})
|
||
if err != nil {
|
||
log.Printf("[funnel] could not start CloudFlare tunnel (%v) — retrying in %s", err, backoff)
|
||
select {
|
||
case <-time.After(backoff):
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
backoff = min(backoff*2, maxBackoff)
|
||
continue
|
||
}
|
||
log.Printf("[funnel] cloudflared started, waiting for public URL...")
|
||
go func() {
|
||
url, werr := t.WaitURL(45 * time.Second)
|
||
if werr != nil {
|
||
log.Printf("[funnel] cloudflared did not emit a URL (%v)", werr)
|
||
return
|
||
}
|
||
log.Printf("[funnel] public URL: %s", url)
|
||
d.SetFunnelURL(url)
|
||
}()
|
||
// Block until cloudflared exits (CF rotation, crash, or shutdown).
|
||
exitErr := <-t.Done()
|
||
_ = t.Close()
|
||
d.SetFunnelURL("")
|
||
if ctx.Err() != nil {
|
||
return
|
||
}
|
||
if exitErr != nil {
|
||
log.Printf("[funnel] cloudflared exited: %v — restarting in %s", exitErr, backoff)
|
||
} else {
|
||
log.Printf("[funnel] cloudflared exited cleanly — restarting in %s", backoff)
|
||
}
|
||
select {
|
||
case <-time.After(backoff):
|
||
case <-ctx.Done():
|
||
return
|
||
}
|
||
backoff = min(backoff*2, maxBackoff)
|
||
}
|
||
}
|
||
|
||
// mirrorCORSOrigins fetches /api/mirrors from the configured primary (+ extra
|
||
// mirror candidates + static IPFS fallback) and returns the discovered URLs as
|
||
// Origin strings. Best-effort: any failure logs a warning and returns an empty
|
||
// slice; the static defaultCORSAllowedOrigins in validate.go covers the known
|
||
// mirrors (.com / .to / built-in onion) so the daemon still accepts the
|
||
// official surfaces when this call fails.
|
||
//
|
||
// Bounded to a short timeout so a slow /api/mirrors response can't delay
|
||
// daemon startup — every second here is a second the user can't play.
|
||
func mirrorCORSOrigins(parent context.Context, cfg config.Config, userAgent string) []string {
|
||
ctx, cancel := context.WithTimeout(parent, 10*time.Second)
|
||
defer cancel()
|
||
|
||
candidates := append([]string{cfg.Auth.APIURL}, cfg.Auth.Mirrors...)
|
||
resp, err := agent.FetchMirrorsWithFallback(ctx, candidates, userAgent)
|
||
if err != nil {
|
||
log.Printf("[cors] mirror discovery failed (%v) — using static allowlist only", err)
|
||
return nil
|
||
}
|
||
|
||
seen := make(map[string]struct{})
|
||
out := make([]string, 0, len(resp.Mirrors))
|
||
add := func(rawURL string) {
|
||
if rawURL == "" {
|
||
return
|
||
}
|
||
origin := strings.TrimRight(rawURL, "/")
|
||
if _, dup := seen[origin]; dup {
|
||
return
|
||
}
|
||
seen[origin] = struct{}{}
|
||
out = append(out, origin)
|
||
}
|
||
for _, m := range resp.Mirrors {
|
||
add(m.URL)
|
||
}
|
||
if resp.Tor != nil {
|
||
add(resp.Tor.URL)
|
||
}
|
||
if len(out) > 0 {
|
||
log.Printf("[cors] merged %d mirror origins from /api/mirrors", len(out))
|
||
}
|
||
return out
|
||
}
|