In-process userspace WireGuard tunnel (wireguard-go + gVisor netstack) for
the managed-VPN add-on. No root, no OS routing changes: only the embedded
anacrolix/torrent client's peer + tracker traffic is routed through the
tunnel, so the swarm and trackers see the VPN IP, not the user's home IP.
unarr's control plane (API, heartbeats) keeps using the normal net.
- internal/vpn: FetchConfig (GET /api/internal/agent/vpn-config, Bearer auth,
typed errors for disabled/not_provisioned/slot_on_device) + Up (parse .conf
→ uapi, CreateNetTUN, device Up) + DialContext/ListenPacket adapters.
- engine/torrent.go: when a tunnel is set, wire TrackerDialContext +
HTTPDialContext + TrackerListenPacket to netstack, DisableUTP, and
AddDialer(NetworkDialer{tcp, netstack}) for peer conns.
- config: downloads.vpn.enabled flag.
- daemon: bring up the tunnel before the torrent client; non-fatal on
failure (logs + downloads in the clear); slot_on_device warns the user.
- version bump 0.8.1 → 0.9.0.
Pairs with the web VPN add-on (dormant behind NEXT_PUBLIC_VPN_ENABLED).
Runtime-verified once a VPNResellers trial provides a live endpoint.
802 lines
26 KiB
Go
802 lines
26 KiB
Go
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/fatih/color"
|
|
"github.com/spf13/cobra"
|
|
"github.com/torrentclaw/unarr/internal/agent"
|
|
"github.com/torrentclaw/unarr/internal/config"
|
|
"github.com/torrentclaw/unarr/internal/engine"
|
|
"github.com/torrentclaw/unarr/internal/library"
|
|
"github.com/torrentclaw/unarr/internal/library/mediainfo"
|
|
"github.com/torrentclaw/unarr/internal/usenet/download"
|
|
"github.com/torrentclaw/unarr/internal/vpn"
|
|
)
|
|
|
|
// newStartCmd creates the top-level `unarr start` command.
|
|
func newStartCmd() *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "start",
|
|
Short: "Start the download daemon (foreground)",
|
|
Long: `Start the unarr daemon in the foreground.
|
|
|
|
Registers with the server, receives download tasks via periodic sync,
|
|
and executes them using the configured download method.
|
|
Supports torrent, debrid, and usenet downloads concurrently.
|
|
|
|
The daemon syncs state with the server every 3s when someone is viewing
|
|
the web dashboard, or every 60s when idle. Press Ctrl+C to stop
|
|
gracefully — active downloads get up to 30 seconds to finish.
|
|
|
|
Requires: API key, agent ID, and download directory (run 'unarr init' first).
|
|
|
|
To run as a background service, use 'unarr daemon install' instead.`,
|
|
Example: ` unarr start
|
|
unarr start --config /path/to/config.toml`,
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
return runDaemonStart()
|
|
},
|
|
}
|
|
}
|
|
|
|
// newStopCmd creates the top-level `unarr stop` command.
|
|
func newStopCmd() *cobra.Command {
|
|
return &cobra.Command{
|
|
Use: "stop",
|
|
Short: "Stop the running daemon",
|
|
Long: `Stop the unarr daemon gracefully.
|
|
|
|
Reads the daemon PID from the state file and sends a graceful stop signal.
|
|
Works regardless of whether the daemon was started in the foreground or as a service.
|
|
|
|
To stop a service-managed daemon and prevent auto-restart, use 'unarr daemon stop' instead.`,
|
|
Example: ` unarr stop`,
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
return stopDaemonByPID()
|
|
},
|
|
}
|
|
}
|
|
|
|
// newDaemonCmd creates `unarr daemon` for administrative subcommands.
|
|
func newDaemonCmd() *cobra.Command {
|
|
cmd := &cobra.Command{
|
|
Use: "daemon <command>",
|
|
Short: "Manage the daemon as a system service",
|
|
Long: `Install, control and inspect the unarr daemon as a system service.
|
|
|
|
Linux: systemd user service (~/.config/systemd/user/unarr.service)
|
|
macOS: launchd agent (~/Library/LaunchAgents/com.torrentclaw.unarr.plist)
|
|
Windows: Task Scheduler task (runs at logon)`,
|
|
Example: ` unarr daemon install
|
|
unarr daemon start
|
|
unarr daemon status
|
|
unarr daemon logs -f
|
|
unarr daemon reload
|
|
unarr daemon restart
|
|
unarr daemon stop
|
|
unarr daemon uninstall`,
|
|
}
|
|
|
|
cmd.AddCommand(
|
|
newDaemonInstallCmdReal(),
|
|
newDaemonUninstallCmdReal(),
|
|
newDaemonStartCmd(),
|
|
newDaemonStopCmd(),
|
|
newDaemonRestartCmd(),
|
|
newDaemonSvcStatusCmd(),
|
|
newDaemonLogsCmd(),
|
|
newDaemonReloadCmd(),
|
|
)
|
|
|
|
return cmd
|
|
}
|
|
|
|
func runDaemonStart() error {
|
|
cfg := loadConfig()
|
|
bold := color.New(color.Bold)
|
|
|
|
// Validate config
|
|
if cfg.Auth.APIKey == "" {
|
|
return fmt.Errorf("no API key configured — run 'unarr init' first")
|
|
}
|
|
if cfg.Agent.ID == "" {
|
|
return fmt.Errorf("no agent ID — run 'unarr init' first")
|
|
}
|
|
if cfg.Download.Dir == "" {
|
|
return fmt.Errorf("no download directory — run 'unarr init' first")
|
|
}
|
|
|
|
// Validate configured paths are safe
|
|
if err := cfg.ValidatePaths(); err != nil {
|
|
return fmt.Errorf("unsafe configuration: %w", err)
|
|
}
|
|
|
|
// Ensure download dir exists
|
|
if err := os.MkdirAll(cfg.Download.Dir, 0o755); err != nil {
|
|
return fmt.Errorf("create download dir: %w", err)
|
|
}
|
|
|
|
// Clean up stale resume files (>7 days old)
|
|
resumeDir := filepath.Join(config.DataDir(), "resume")
|
|
if removed := download.CleanStaleFiles(resumeDir, 7*24*time.Hour); removed > 0 {
|
|
log.Printf("Cleaned %d stale resume file(s)", removed)
|
|
}
|
|
|
|
fmt.Println()
|
|
bold.Println(" unarr Daemon")
|
|
fmt.Println()
|
|
|
|
userAgent := "unarr/" + Version
|
|
|
|
// Probe HW accel + derive a sensible transcode resolution cap. The cap
|
|
// is what the web side uses to decide whether the user should pre-empt
|
|
// transcoding by downloading a smaller version (4K source on a software
|
|
// libx264-only host is the canonical case where pre-download wins).
|
|
hwAccelPick := engine.DetectHWAccel(context.Background(), cfg.Library.FFmpegPath)
|
|
maxTranscodeHeight := 1080
|
|
if hwAccelPick != engine.HWAccelNone {
|
|
maxTranscodeHeight = 2160
|
|
}
|
|
|
|
// Create daemon config
|
|
daemonCfg := agent.DaemonConfig{
|
|
AgentID: cfg.Agent.ID,
|
|
AgentName: cfg.Agent.Name,
|
|
Version: Version,
|
|
DownloadDir: cfg.Download.Dir,
|
|
StreamPort: cfg.Download.StreamPort,
|
|
LanIP: engine.LanIP(),
|
|
TailscaleIP: engine.TailscaleIP(),
|
|
CanDelete: cfg.Library.AllowDelete,
|
|
ScanPaths: library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath),
|
|
HWAccel: string(hwAccelPick),
|
|
MaxTranscodeHeight: maxTranscodeHeight,
|
|
}
|
|
|
|
// Create HTTP client with mirror failover so a `.com` block-out rolls
|
|
// over to `.to` / .onion without restarting the daemon.
|
|
agentClient := newAgentClientFromConfig(cfg, userAgent)
|
|
log.Printf("Transport: HTTP sync → %s (mirrors: %d)", cfg.Auth.APIURL, len(cfg.Auth.Mirrors))
|
|
|
|
// Create daemon
|
|
d := agent.NewDaemon(daemonCfg, agentClient)
|
|
|
|
// Start SIGUSR1 reload watcher (unix only, no-op on Windows)
|
|
startReloadWatcher(&ReloadableConfig{Daemon: d})
|
|
|
|
// Daemon-scoped context — cancelled on shutdown
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Parse speed limits
|
|
maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed)
|
|
maxUl, _ := config.ParseSpeed(cfg.Download.MaxUploadSpeed)
|
|
|
|
// Parse torrent timeouts
|
|
metaTimeout, _ := time.ParseDuration(cfg.Download.MetadataTimeout)
|
|
stallTimeout, _ := time.ParseDuration(cfg.Download.StallTimeout)
|
|
|
|
// Create progress reporter — only used for stream tasks (handleStreamTask)
|
|
// The sync goroutine handles all regular progress reporting.
|
|
statusInterval, _ := time.ParseDuration(cfg.Daemon.StatusInterval)
|
|
if statusInterval == 0 {
|
|
statusInterval = 3 * time.Second
|
|
}
|
|
reporter := engine.NewProgressReporter(agentClient, statusInterval)
|
|
reporter.SetWatchingFunc(func() bool { return d.Watching.Load() })
|
|
|
|
// Managed-VPN add-on: bring up the in-process WireGuard split-tunnel before
|
|
// the torrent client so peer + tracker traffic routes through it. Failure is
|
|
// non-fatal — log and download in the clear (better than refusing to run).
|
|
var vpnTunnel *vpn.Tunnel
|
|
if cfg.Download.VPN.Enabled {
|
|
apiURL := cfg.Auth.APIURL
|
|
if apiURL == "" {
|
|
apiURL = "https://torrentclaw.com"
|
|
}
|
|
fetchCtx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
|
|
conf, ferr := vpn.FetchConfig(fetchCtx, apiURL, cfg.Auth.APIKey, "unarr/"+Version)
|
|
cancel()
|
|
var fe *vpn.FetchError
|
|
switch {
|
|
case ferr != nil && errors.As(ferr, &fe) && fe.Code == vpn.ErrSlotOnDevice:
|
|
log.Printf("[vpn] slot is active on one of your devices — downloads will NOT use the VPN. Switch the slot to unarr in your profile to protect downloads.")
|
|
case ferr != nil:
|
|
log.Printf("[vpn] could not enable VPN (%v) — downloading in the clear", ferr)
|
|
default:
|
|
if t, uerr := vpn.Up(conf); uerr != nil {
|
|
log.Printf("[vpn] tunnel failed to start (%v) — downloading in the clear", uerr)
|
|
} else {
|
|
vpnTunnel = t
|
|
defer vpnTunnel.Close()
|
|
log.Printf("[vpn] managed VPN active — torrent traffic split-tunnelled through WireGuard")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create torrent downloader
|
|
torrentDl, err := engine.NewTorrentDownloader(engine.TorrentConfig{
|
|
DataDir: cfg.Download.Dir,
|
|
MetadataTimeout: metaTimeout,
|
|
StallTimeout: stallTimeout,
|
|
MaxTimeout: 0,
|
|
MaxDownloadRate: maxDl,
|
|
MaxUploadRate: maxUl,
|
|
ListenPort: cfg.Download.ListenPort,
|
|
SeedEnabled: false,
|
|
WebRTCEnabled: cfg.Download.WebRTC.Enabled,
|
|
WebRTCTrackers: cfg.Download.WebRTC.Trackers,
|
|
ICEServers: engine.BuildICEServers(cfg.Download.WebRTC),
|
|
VPNTunnel: vpnTunnel,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("create torrent downloader: %w", err)
|
|
}
|
|
|
|
if maxDl > 0 || maxUl > 0 {
|
|
dlStr, ulStr := "unlimited", "unlimited"
|
|
if maxDl > 0 {
|
|
dlStr = formatSpeedLog(maxDl)
|
|
}
|
|
if maxUl > 0 {
|
|
ulStr = formatSpeedLog(maxUl)
|
|
}
|
|
log.Printf("Speed limits: download=%s upload=%s", dlStr, ulStr)
|
|
}
|
|
|
|
// Create debrid downloader
|
|
debridDl := engine.NewDebridDownloader()
|
|
|
|
// Create download manager
|
|
manager := engine.NewManager(engine.ManagerConfig{
|
|
MaxConcurrent: cfg.Download.MaxConcurrent,
|
|
OutputDir: cfg.Download.Dir,
|
|
Notifications: cfg.Notifications.Enabled,
|
|
Organize: engine.OrganizeConfig{
|
|
Enabled: cfg.Organize.Enabled,
|
|
MoviesDir: cfg.Organize.MoviesDir,
|
|
TVShowsDir: cfg.Organize.TVShowsDir,
|
|
OutputDir: cfg.Download.Dir,
|
|
},
|
|
}, reporter, torrentDl, debridDl, engine.NewUsenetDownloader(agentClient))
|
|
|
|
// Create persistent stream server
|
|
streamSrv := engine.NewStreamServer(cfg.Download.StreamPort)
|
|
streamSrv.SetUPnPEnabled(cfg.Download.EnableUPnP)
|
|
streamSrv.SetCORSAllowedOrigins(cfg.Download.CORSExtraOrigins)
|
|
// Reap HLS tmpdirs left over from a previous daemon run before we start
|
|
// accepting new sessions. The in-memory registry doesn't survive a
|
|
// restart, so without this disk usage grows unbounded across restarts.
|
|
if err := engine.CleanupHLSOrphanDirs(); err != nil {
|
|
log.Printf("[hls] orphan tmpdir cleanup: %v", err)
|
|
}
|
|
if err := streamSrv.Listen(ctx); err != nil {
|
|
return fmt.Errorf("start stream server: %w", err)
|
|
}
|
|
d.UpdateStreamPort(streamSrv.Port())
|
|
|
|
// Warn at startup if transcode is enabled but ffmpeg/ffprobe are missing.
|
|
// HLS sessions get rejected at runtime (see daemon.go ~line 455), but
|
|
// surfacing it here gives the operator a chance to install ffmpeg before
|
|
// a user hits a confusing "rejected" line in the logs.
|
|
if cfg.Download.Transcode.Enabled {
|
|
if _, err := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath); err != nil {
|
|
log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS")
|
|
} else if _, err := mediainfo.ResolveFFprobe(cfg.Library.FFprobePath); err != nil {
|
|
log.Printf("[hls] transcode enabled but ffmpeg/ffprobe not found — install ffmpeg to use HLS")
|
|
}
|
|
}
|
|
|
|
// Wire sync client callbacks
|
|
sc := d.SyncClient()
|
|
sc.GetFreeSlots = manager.FreeSlots
|
|
sc.GetTaskStates = manager.TaskStates
|
|
d.GetActiveCount = manager.ActiveCount
|
|
|
|
// Trigger immediate sync when a download slot frees up
|
|
manager.OnTaskDone = func() { d.TriggerSync() }
|
|
|
|
// Wire: sync receives new tasks → submit to manager or handle stream
|
|
d.OnTasksClaimed = func(tasks []agent.Task) {
|
|
for _, t := range tasks {
|
|
if t.Mode == "seed_file" {
|
|
// Browser asked us to wrap an arbitrary on-disk file as
|
|
// a single-file torrent + seed it via WebRTC. Runs in
|
|
// its own goroutine so a slow / failing seed can't
|
|
// stall the rest of the claim batch.
|
|
go handleSeedFileTask(t, torrentDl, agentClient)
|
|
} else if t.Mode == "stream" {
|
|
if isStreamingTask(t.ID) {
|
|
continue
|
|
}
|
|
cancelStreamContexts()
|
|
streamSrv.ClearFile()
|
|
streamCtx, streamCancel := context.WithCancel(ctx) //nolint:gosec // G118: cancel stored in registry
|
|
streamRegistry.mu.Lock()
|
|
streamRegistry.cancels[t.ID] = streamCancel
|
|
streamRegistry.mu.Unlock()
|
|
go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv)
|
|
} else {
|
|
manager.Submit(ctx, t)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Wire: sync receives control signals → act on manager
|
|
d.OnControlAction = func(action, taskID string, deleteFiles bool) {
|
|
switch action {
|
|
case "cancel":
|
|
if deleteFiles {
|
|
manager.CancelAndDeleteFiles(taskID)
|
|
} else {
|
|
manager.CancelTask(taskID)
|
|
}
|
|
cancelStreamTask(taskID)
|
|
if streamSrv.CurrentTaskID() == taskID {
|
|
streamSrv.ClearFile()
|
|
}
|
|
case "pause":
|
|
manager.PauseTask(taskID)
|
|
cancelStreamTask(taskID)
|
|
if streamSrv.CurrentTaskID() == taskID {
|
|
streamSrv.ClearFile()
|
|
}
|
|
case "resume":
|
|
log.Printf("[%s] resume requested, triggering sync", agent.ShortID(taskID))
|
|
d.TriggerSync()
|
|
case "stream":
|
|
if streamSrv.CurrentTaskID() == taskID {
|
|
return
|
|
}
|
|
task := manager.GetTask(taskID)
|
|
if task == nil || task.GetStreamURL() != "" {
|
|
return
|
|
}
|
|
provider, err := torrentDl.GetStreamProvider(taskID)
|
|
if err != nil {
|
|
log.Printf("[%s] stream failed: %v", agent.ShortID(taskID), err)
|
|
return
|
|
}
|
|
cancelStreamContexts()
|
|
streamSrv.SetFile(provider, taskID)
|
|
task.SetStreamURL(streamSrv.URLsJSON())
|
|
log.Printf("[%s] streaming: %s", agent.ShortID(taskID), provider.FileName())
|
|
|
|
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
|
|
streamRegistry.mu.Lock()
|
|
streamRegistry.cancels["watch:"+taskID] = watchCancel
|
|
streamRegistry.mu.Unlock()
|
|
go engine.NewWatchReporter(agentClient, streamSrv, taskID).Run(watchCtx)
|
|
case "stop-stream":
|
|
cancelStreamTask(taskID)
|
|
if streamSrv.CurrentTaskID() == taskID {
|
|
streamSrv.ClearFile()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Wire: sync receives file deletion requests from the server
|
|
if cfg.Library.AllowDelete && len(daemonCfg.ScanPaths) > 0 {
|
|
sc.OnDeleteFiles = func(items []agent.LibraryDeleteRequest) []int {
|
|
return library.DeleteFiles(items, daemonCfg.ScanPaths)
|
|
}
|
|
}
|
|
|
|
// Wire: sync receives stream requests for completed downloads
|
|
d.OnStreamRequested = func(sr agent.StreamRequest) {
|
|
if streamSrv.CurrentTaskID() == sr.TaskID {
|
|
// Already serving — notify server it's ready
|
|
go func() {
|
|
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
|
TaskID: sr.TaskID,
|
|
StreamReady: true,
|
|
}); err != nil {
|
|
log.Printf("[%s] stream ready re-notify failed: %v", agent.ShortID(sr.TaskID), err)
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
|
|
filePath := filepath.Clean(sr.FilePath)
|
|
if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath,
|
|
cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) {
|
|
log.Printf("[%s] stream request rejected: path outside allowed dirs: %s", agent.ShortID(sr.TaskID), filePath)
|
|
go func() {
|
|
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
|
TaskID: sr.TaskID,
|
|
Status: "failed",
|
|
ErrorMessage: fmt.Sprintf("path outside allowed dirs: %s", filePath),
|
|
}); err != nil {
|
|
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
info, err := os.Stat(filePath)
|
|
if err != nil {
|
|
log.Printf("[%s] stream request: file not found: %s", agent.ShortID(sr.TaskID), filePath)
|
|
go func() {
|
|
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
|
TaskID: sr.TaskID,
|
|
Status: "failed",
|
|
ErrorMessage: fmt.Sprintf("file not found: %s", filePath),
|
|
}); err != nil {
|
|
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
|
|
if info.IsDir() {
|
|
found := engine.FindVideoFile(filePath)
|
|
if found == "" {
|
|
log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath)
|
|
go func() {
|
|
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
|
TaskID: sr.TaskID,
|
|
Status: "failed",
|
|
ErrorMessage: fmt.Sprintf("no video file in directory: %s", filePath),
|
|
}); err != nil {
|
|
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
filePath = found
|
|
log.Printf("[%s] resolved directory to video file: %s", agent.ShortID(sr.TaskID), filepath.Base(filePath))
|
|
}
|
|
|
|
cancelStreamContexts()
|
|
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID)
|
|
log.Printf("[%s] streaming from disk: %s → %s", agent.ShortID(sr.TaskID), filepath.Base(filePath), streamSrv.URL())
|
|
|
|
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
|
|
streamRegistry.mu.Lock()
|
|
streamRegistry.cancels["watch:"+sr.TaskID] = watchCancel
|
|
streamRegistry.mu.Unlock()
|
|
go engine.NewWatchReporter(agentClient, streamSrv, sr.TaskID).Run(watchCtx)
|
|
|
|
go func() {
|
|
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
|
|
TaskID: sr.TaskID,
|
|
StreamReady: true,
|
|
}); err != nil {
|
|
log.Printf("[%s] stream ready report failed: %v", agent.ShortID(sr.TaskID), err)
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Wire: sync receives custom WebRTC streaming session requests.
|
|
// Each session is a one-shot browser↔daemon DataChannel. Validate the
|
|
// FilePath against allowed dirs to prevent path traversal abuse from a
|
|
// compromised server, then spawn the pion peer in its own goroutine.
|
|
d.OnWebRTCSession = func(sess agent.WebRTCSession) {
|
|
if webrtcRegistry.has(sess.SessionID) {
|
|
return // already running
|
|
}
|
|
filePath := sess.FilePath
|
|
if filePath == "" {
|
|
log.Printf("webrtc session %s rejected: empty file path", agent.ShortID(sess.SessionID))
|
|
return
|
|
}
|
|
filePath = filepath.Clean(filePath)
|
|
if !isAllowedStreamPath(filePath, cfg.Download.Dir, cfg.Library.ScanPath,
|
|
cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir) {
|
|
log.Printf("webrtc session %s rejected: path outside allowed dirs: %s",
|
|
agent.ShortID(sess.SessionID), filePath)
|
|
return
|
|
}
|
|
// Resolve directory → first video file (matches StreamRequest behavior).
|
|
if info, err := os.Stat(filePath); err == nil && info.IsDir() {
|
|
found := engine.FindVideoFile(filePath)
|
|
if found == "" {
|
|
log.Printf("webrtc session %s rejected: no video file in dir %s",
|
|
agent.ShortID(sess.SessionID), filePath)
|
|
return
|
|
}
|
|
filePath = found
|
|
}
|
|
|
|
// Branch on transport: HLS sessions only need ffmpeg + StreamServer,
|
|
// not a WebRTC peer, so they must bypass the WebRTC.Enabled gate.
|
|
// Default ("" or "webrtc") runs the DataChannel pipeline and requires it.
|
|
if strings.EqualFold(sess.Transport, "hls") {
|
|
tcRuntime := buildTranscodeRuntime(ctx, cfg)
|
|
if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" {
|
|
log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID))
|
|
return
|
|
}
|
|
hlsCtx, hlsCancel := context.WithCancel(ctx)
|
|
webrtcRegistry.add(sess.SessionID, hlsCancel)
|
|
hlsCfg := engine.HLSSessionConfig{
|
|
SessionID: sess.SessionID,
|
|
SourcePath: filePath,
|
|
FileName: sess.FileName,
|
|
Quality: sess.Quality,
|
|
AudioIndex: sess.AudioIndex,
|
|
Transcode: tcRuntime,
|
|
}
|
|
hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg)
|
|
if err != nil {
|
|
webrtcRegistry.remove(sess.SessionID)
|
|
hlsCancel()
|
|
log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err)
|
|
return
|
|
}
|
|
streamSrv.HLS().Register(hsess)
|
|
return
|
|
}
|
|
|
|
// Non-HLS transport requires WebRTC peer support.
|
|
if !cfg.Download.WebRTC.Enabled {
|
|
log.Printf("webrtc session %s rejected: webrtc disabled in config", agent.ShortID(sess.SessionID))
|
|
return
|
|
}
|
|
|
|
sessCtx, sessCancel := context.WithCancel(ctx) //nolint:gosec // G118 cancel stored in registry
|
|
webrtcRegistry.add(sess.SessionID, sessCancel)
|
|
go func() {
|
|
defer func() {
|
|
webrtcRegistry.remove(sess.SessionID)
|
|
sessCancel()
|
|
}()
|
|
tcRuntime := buildTranscodeRuntime(ctx, cfg)
|
|
runCfg := engine.WebRTCStreamConfig{
|
|
SessionID: sess.SessionID,
|
|
FilePath: filePath,
|
|
FileName: sess.FileName,
|
|
FileSize: sess.FileSize,
|
|
Quality: sess.Quality,
|
|
ICEServers: engine.BuildICEServers(cfg.Download.WebRTC),
|
|
Signal: agentClient,
|
|
Logger: stdLogger{},
|
|
Transcode: tcRuntime,
|
|
}
|
|
log.Printf("[wrtc %s] starting session: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath))
|
|
if err := engine.RunWebRTCStream(sessCtx, runCfg); err != nil {
|
|
if sessCtx.Err() == nil {
|
|
log.Printf("[wrtc %s] ended: %v", agent.ShortID(sess.SessionID), err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// Periodic DHT node persistence (every 5 min)
|
|
go func() {
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
torrentDl.SaveDhtNodes()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Periodic HLS session sweeper (every 5 min). Closes sessions whose last
|
|
// segment fetch was over 30 min ago — kills the orphan ffmpeg + removes
|
|
// the per-session tmpdir, so a tab that died mid-stream doesn't leak
|
|
// disk space until daemon shutdown.
|
|
go func() {
|
|
ticker := time.NewTicker(5 * time.Minute)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if n := streamSrv.HLS().SweepIdle(); n > 0 {
|
|
log.Printf("[hls] swept %d idle session(s)", n)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Start auto-scan goroutine
|
|
scanPaths := daemonCfg.ScanPaths
|
|
if len(scanPaths) > 0 && cfg.Library.AutoScan {
|
|
scanInterval := 24 * time.Hour
|
|
if cfg.Library.ScanInterval != "" {
|
|
if parsed, err := time.ParseDuration(cfg.Library.ScanInterval); err == nil && parsed > 0 {
|
|
scanInterval = parsed
|
|
}
|
|
}
|
|
go runAutoScan(ctx, cfg, scanInterval, agentClient, d.ScanNow, scanPaths)
|
|
}
|
|
|
|
// Start reporter only for stream task handling
|
|
go reporter.Run(ctx)
|
|
|
|
// Start daemon (blocks — runs sync loop)
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
errCh <- d.Run(ctx)
|
|
}()
|
|
|
|
// Start idle guard for the persistent stream server
|
|
go startIdleGuard(ctx, streamSrv)
|
|
|
|
// Signal handling
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
// Wait for signal or error
|
|
select {
|
|
case sig := <-sigCh:
|
|
fmt.Printf("\n Received %s, shutting down...\n", sig)
|
|
cancelStreamContexts()
|
|
cancelAllWebRTCSessions()
|
|
streamSrv.Shutdown(context.Background())
|
|
cancel()
|
|
|
|
// Give active downloads 30s to finish
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer shutdownCancel()
|
|
manager.Shutdown(shutdownCtx)
|
|
|
|
d.Deregister()
|
|
fmt.Println(" Daemon stopped.")
|
|
return nil
|
|
|
|
case err := <-errCh:
|
|
cancelStreamContexts()
|
|
cancelAllWebRTCSessions()
|
|
streamSrv.Shutdown(context.Background())
|
|
cancel()
|
|
return err
|
|
}
|
|
}
|
|
|
|
// isAllowedStreamPath checks that filePath is within one of the directories
|
|
// the daemon is configured to manage. This defends against a compromised API
|
|
// server sending a path traversal payload (e.g. /etc/passwd) in StreamRequest.
|
|
// isAllowedStreamPath reports whether filePath is contained within one of the
|
|
// allowedDirs. filePath must already be cleaned (filepath.Clean) by the caller.
|
|
// This defends against a compromised API server sending a path traversal payload.
|
|
func isAllowedStreamPath(filePath string, allowedDirs ...string) bool {
|
|
for _, dir := range allowedDirs {
|
|
if dir == "" {
|
|
continue
|
|
}
|
|
rel, err := filepath.Rel(filepath.Clean(dir), filePath)
|
|
if err == nil && !strings.HasPrefix(rel, "..") {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func formatSpeedLog(bps int64) string {
|
|
switch {
|
|
case bps >= 1024*1024*1024:
|
|
return fmt.Sprintf("%.1f GB/s", float64(bps)/(1024*1024*1024))
|
|
case bps >= 1024*1024:
|
|
return fmt.Sprintf("%.1f MB/s", float64(bps)/(1024*1024))
|
|
case bps >= 1024:
|
|
return fmt.Sprintf("%.0f KB/s", float64(bps)/1024)
|
|
default:
|
|
return fmt.Sprintf("%d B/s", bps)
|
|
}
|
|
}
|
|
|
|
// runAutoScan runs a library scan + sync on a timer or on-demand via scanNow channel.
|
|
// It scans all provided paths and syncs each independently so stale-item cleanup
|
|
// is scoped to the correct directory prefix on the server.
|
|
func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, ac *agent.Client, scanNow <-chan struct{}, scanPaths []string) {
|
|
log.Printf("[auto-scan] enabled: every %s, paths: %v", interval, scanPaths)
|
|
|
|
select {
|
|
case <-time.After(30 * time.Second):
|
|
case <-scanNow:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
|
|
doScan := func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
log.Printf("[auto-scan] panic recovered: %v", r)
|
|
}
|
|
}()
|
|
log.Printf("[auto-scan] starting scan of %v", scanPaths)
|
|
|
|
existing, _ := library.LoadCache()
|
|
|
|
workers := cfg.Library.Workers
|
|
if workers == 0 {
|
|
workers = 8
|
|
}
|
|
|
|
scanOpts := library.ScanOptions{
|
|
Workers: workers,
|
|
FFprobePath: cfg.Library.FFprobePath,
|
|
Incremental: existing != nil,
|
|
}
|
|
|
|
// Scan each path independently and sync per path so the server can
|
|
// scope stale-item deletion to the correct directory prefix.
|
|
const batchSize = 100
|
|
totalSynced := 0
|
|
var mergedItems []library.LibraryItem
|
|
|
|
for _, scanPath := range scanPaths {
|
|
cache, err := library.Scan(ctx, scanPath, existing, scanOpts)
|
|
if err != nil {
|
|
log.Printf("[auto-scan] scan failed for %s: %v", scanPath, err)
|
|
continue
|
|
}
|
|
mergedItems = append(mergedItems, cache.Items...)
|
|
|
|
items := library.BuildSyncItems(cache)
|
|
if len(items) == 0 {
|
|
log.Printf("[auto-scan] no items under %s", scanPath)
|
|
continue
|
|
}
|
|
|
|
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
|
|
for i := 0; i < len(items); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(items) {
|
|
end = len(items)
|
|
}
|
|
isLast := end >= len(items)
|
|
|
|
_, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
|
|
Items: items[i:end],
|
|
ScanPath: scanPath,
|
|
IsLastBatch: isLast,
|
|
SyncStartedAt: syncStartedAt,
|
|
})
|
|
if err != nil {
|
|
log.Printf("[auto-scan] sync failed for %s: %v", scanPath, err)
|
|
break
|
|
}
|
|
}
|
|
totalSynced += len(items)
|
|
}
|
|
|
|
// Save merged cache for incremental scanning next time.
|
|
if len(mergedItems) > 0 {
|
|
mergedCache := &library.LibraryCache{
|
|
ScannedAt: time.Now().UTC().Format(time.RFC3339),
|
|
Path: scanPaths[0],
|
|
Items: mergedItems,
|
|
}
|
|
if err := library.SaveCache(mergedCache); err != nil {
|
|
log.Printf("[auto-scan] save cache failed: %v", err)
|
|
}
|
|
}
|
|
|
|
log.Printf("[auto-scan] synced %d items across %d path(s)", totalSynced, len(scanPaths))
|
|
}
|
|
|
|
doScan()
|
|
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
doScan()
|
|
case <-scanNow:
|
|
log.Printf("[auto-scan] on-demand scan triggered")
|
|
ticker.Reset(interval)
|
|
doScan()
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|