Daemon now runs engine.DetectHWAccelDiagnostic at startup (instead of the lighter DetectHWAccel) and ships the full picture — ffmpeg version, resolved binary path, HW encoders compiled in, device files / drivers detected — up to the server in the RegisterRequest payload. Why: the most common cause of slow first-play is a software-only ffmpeg build. Surfacing the diagnostic in the web AgentsTab "Diagnose transcoder" modal lets a user see *why* their backend landed on libx264 (e.g. brew's default formula ships without --enable-nvenc, or the container is missing /dev/nvidia0) without SSHing in to run `unarr probe-hwaccel` manually. Also emits a single `[transcode]` startup log line summarising the same data — convenient for `journalctl --user -u unarr | grep transcode`. Bounded by a 10 s context so a hung ffmpeg binary can't stall daemon startup forever.
382 lines
12 KiB
Go
382 lines
12 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"runtime"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/torrentclaw/unarr/internal/upgrade"
|
|
)
|
|
|
|
// DaemonConfig holds daemon runtime settings.
|
|
type DaemonConfig struct {
|
|
AgentID string
|
|
AgentName string
|
|
Version string
|
|
DownloadDir string
|
|
StreamPort int // port for the HTTP stream server
|
|
LanIP string // LAN IP (reported in sync for stream URL resolution)
|
|
TailscaleIP string // Tailscale IP (reported in sync for stream URL resolution)
|
|
CanDelete bool // library.allow_delete is enabled
|
|
ScanPaths []string // configured scan paths for file deletion validation
|
|
HWAccel string // detected encoder backend ("nvenc"/"qsv"/"vaapi"/"videotoolbox"/"none")
|
|
MaxTranscodeHeight int // resolution cap the agent can transcode comfortably (px)
|
|
// Diagnostic data populated by engine.DetectHWAccelDiagnostic at daemon
|
|
// start. Surfaced in the web "Diagnose transcoder" modal — lets a user
|
|
// see which encoders the ffmpeg binary supports and which devices the
|
|
// host exposes without running `unarr probe-hwaccel`.
|
|
FFmpegVersion string // first line of `ffmpeg -version`
|
|
FFmpegPath string // resolved binary path
|
|
HWEncoders []string // HW-class encoder names found in `ffmpeg -encoders`
|
|
HWDevices []string // device files + driver bins detected at probe time
|
|
AutoUpgrade bool // honor server-flagged upgrades by downloading + restarting (default: true)
|
|
}
|
|
|
|
// Daemon manages agent registration and the sync loop.
|
|
type Daemon struct {
|
|
cfg DaemonConfig
|
|
client *Client
|
|
sync *SyncClient
|
|
state *LocalState
|
|
|
|
// Callbacks — set by cmd/daemon.go before calling Run.
|
|
OnTasksClaimed func(tasks []Task)
|
|
OnStreamRequested func(req StreamRequest)
|
|
OnStreamSession func(sess StreamSession)
|
|
OnControlAction func(action, taskID string, deleteFiles bool)
|
|
GetActiveCount func() int // returns number of active downloads (wired from manager)
|
|
|
|
// State
|
|
User UserInfo
|
|
Features FeatureFlags
|
|
Info AgentInfo
|
|
State DaemonState
|
|
lastNotifiedVersion string
|
|
|
|
// Managed-VPN split-tunnel state, set by cmd/daemon.go before Run and folded
|
|
// into DaemonState on every write so external tools (`unarr vpn status`) see it.
|
|
vpnActive bool
|
|
vpnMode string
|
|
vpnServer string
|
|
|
|
// CloudFlare Quick Tunnel public URL; folded into DaemonState + heartbeat
|
|
// so the web can prefer it over Tailscale/LAN for in-browser playback.
|
|
funnelURL string
|
|
|
|
// Watching tracks whether a user is viewing download progress in the web UI.
|
|
Watching atomic.Bool
|
|
|
|
// ScanNow triggers an immediate library scan.
|
|
ScanNow chan struct{}
|
|
}
|
|
|
|
// NewDaemon creates a daemon with an HTTP client for sync-based communication.
|
|
func NewDaemon(cfg DaemonConfig, client *Client) *Daemon {
|
|
state := NewLocalState()
|
|
return &Daemon{
|
|
cfg: cfg,
|
|
client: client,
|
|
state: state,
|
|
sync: NewSyncClient(client, cfg, state),
|
|
ScanNow: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
// SyncClient returns the sync client for external wiring.
|
|
func (d *Daemon) SyncClient() *SyncClient { return d.sync }
|
|
|
|
// SetVPNState records the managed-VPN split-tunnel state so it's reflected in the
|
|
// daemon state file (read by `unarr vpn status`). Call before Run.
|
|
func (d *Daemon) SetVPNState(active bool, mode, server string) {
|
|
d.vpnActive = active
|
|
d.vpnMode = mode
|
|
d.vpnServer = server
|
|
}
|
|
|
|
// SetFunnelURL records the CloudFlare Quick Tunnel hostname so it's reflected
|
|
// in the daemon state file (read by `unarr funnel status`) and in heartbeat
|
|
// requests (so the web prefers it over Tailscale/LAN). Pass "" to clear.
|
|
func (d *Daemon) SetFunnelURL(url string) {
|
|
d.funnelURL = url
|
|
d.State.FunnelURL = url
|
|
WriteState(&d.State)
|
|
}
|
|
|
|
// UpdateStreamPort updates the stream port reported in sync requests.
|
|
func (d *Daemon) UpdateStreamPort(port int) {
|
|
d.cfg.StreamPort = port
|
|
d.sync.cfg.StreamPort = port
|
|
}
|
|
|
|
// Register registers the agent and fetches user info + features.
|
|
// Retries with exponential backoff on transient errors (429, 5xx, network).
|
|
func (d *Daemon) Register(ctx context.Context) error {
|
|
req := RegisterRequest{
|
|
AgentID: d.cfg.AgentID,
|
|
Name: d.cfg.AgentName,
|
|
OS: runtime.GOOS,
|
|
Arch: runtime.GOARCH,
|
|
Version: d.cfg.Version,
|
|
DownloadDir: d.cfg.DownloadDir,
|
|
StreamPort: d.cfg.StreamPort,
|
|
LanIP: d.cfg.LanIP,
|
|
TailscaleIP: d.cfg.TailscaleIP,
|
|
HWAccel: d.cfg.HWAccel,
|
|
MaxTranscodeHeight: d.cfg.MaxTranscodeHeight,
|
|
FFmpegVersion: d.cfg.FFmpegVersion,
|
|
FFmpegPath: d.cfg.FFmpegPath,
|
|
HWEncoders: d.cfg.HWEncoders,
|
|
HWDevices: d.cfg.HWDevices,
|
|
VPNActive: d.vpnActive,
|
|
VPNMode: d.vpnMode,
|
|
VPNServer: d.vpnServer,
|
|
FunnelURL: d.funnelURL,
|
|
}
|
|
if free, total, err := DiskInfo(d.cfg.DownloadDir); err == nil {
|
|
req.DiskFreeBytes = free
|
|
req.DiskTotalBytes = total
|
|
}
|
|
|
|
const maxRetries = 5
|
|
backoff := 5 * time.Second
|
|
|
|
var resp *RegisterResponse
|
|
var err error
|
|
for attempt := range maxRetries {
|
|
resp, err = d.client.Register(ctx, req)
|
|
if err == nil {
|
|
break
|
|
}
|
|
if !isTransientError(err) {
|
|
return fmt.Errorf("register: %w", err)
|
|
}
|
|
log.Printf("Register failed (attempt %d/%d): %v - retrying in %v", attempt+1, maxRetries, err, backoff)
|
|
timer := time.NewTimer(backoff)
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return fmt.Errorf("register: %w", ctx.Err())
|
|
case <-timer.C:
|
|
}
|
|
backoff = min(backoff*2, 60*time.Second)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("register: %w (after %d retries)", err, maxRetries)
|
|
}
|
|
|
|
d.User = resp.User
|
|
d.Features = resp.Features
|
|
now := time.Now()
|
|
d.Info = AgentInfo{
|
|
ID: d.cfg.AgentID,
|
|
Name: d.cfg.AgentName,
|
|
User: resp.User,
|
|
Features: resp.Features,
|
|
StartedAt: now,
|
|
}
|
|
d.State = DaemonState{
|
|
AgentID: d.cfg.AgentID,
|
|
Status: "running",
|
|
Version: d.cfg.Version,
|
|
PID: os.Getpid(),
|
|
StartedAt: now,
|
|
MethodStats: make(map[string]int),
|
|
VPNActive: d.vpnActive,
|
|
VPNMode: d.vpnMode,
|
|
VPNServer: d.vpnServer,
|
|
FunnelURL: d.funnelURL,
|
|
}
|
|
WriteState(&d.State)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Run registers the agent and starts the sync loop.
|
|
// Blocks until ctx is cancelled.
|
|
func (d *Daemon) Run(ctx context.Context) error {
|
|
// Register
|
|
if err := d.Register(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("Agent registered: %s (%s) [%s]", d.User.Name, d.User.Email, d.User.Plan)
|
|
log.Printf("Features: torrent=%v debrid=%v usenet=%v", d.Features.Torrent, d.Features.Debrid, d.Features.Usenet)
|
|
|
|
// Usenet needs par2 (segment repair) + an extractor (RAR/7z) on the host.
|
|
// Without par2, a single bad segment corrupts the file silently; without
|
|
// an extractor, RAR-packed downloads can't be unpacked. Warn loudly at
|
|
// startup so the operator installs them before the first download fails.
|
|
if d.Features.Usenet {
|
|
if _, err := exec.LookPath("par2"); err != nil {
|
|
log.Printf("[usenet] WARNING: par2 not found in PATH — corrupted segments cannot be repaired and extraction may fail. Install par2 (apt install par2 / brew install par2).")
|
|
}
|
|
_, unrarErr := exec.LookPath("unrar")
|
|
_, sevenZErr := exec.LookPath("7z")
|
|
if unrarErr != nil && sevenZErr != nil {
|
|
log.Printf("[usenet] WARNING: no archive extractor (unrar or 7z) found — RAR-packed downloads cannot be unpacked. Install unrar or 7z.")
|
|
}
|
|
}
|
|
|
|
// Wire sync callbacks
|
|
d.sync.OnNewTasks = func(tasks []Task) {
|
|
if d.OnTasksClaimed != nil {
|
|
d.OnTasksClaimed(tasks)
|
|
}
|
|
}
|
|
d.sync.OnControl = func(action, taskID string, deleteFiles bool) {
|
|
if d.OnControlAction != nil {
|
|
d.OnControlAction(action, taskID, deleteFiles)
|
|
}
|
|
}
|
|
d.sync.OnStreamRequest = func(req StreamRequest) {
|
|
if d.OnStreamRequested != nil {
|
|
d.OnStreamRequested(req)
|
|
}
|
|
}
|
|
d.sync.OnStreamSession = func(sess StreamSession) {
|
|
if d.OnStreamSession != nil {
|
|
d.OnStreamSession(sess)
|
|
}
|
|
}
|
|
d.sync.OnUpgrade = func(version string) {
|
|
if version == d.lastNotifiedVersion {
|
|
return
|
|
}
|
|
d.lastNotifiedVersion = version
|
|
if !d.cfg.AutoUpgrade {
|
|
log.Printf("[upgrade] new version available: %s — auto_upgrade=false, run `unarr update` to apply", version)
|
|
return
|
|
}
|
|
log.Printf("[upgrade] new version available: %s — applying auto-upgrade", version)
|
|
go d.applyAutoUpgrade(version)
|
|
}
|
|
d.sync.OnScan = func() {
|
|
log.Printf("Library scan requested by server")
|
|
select {
|
|
case d.ScanNow <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
d.sync.OnWatchingChange = func(watching bool) {
|
|
d.Watching.Store(watching)
|
|
}
|
|
d.sync.GetVPNState = func() (bool, string, string) {
|
|
return d.vpnActive, d.vpnMode, d.vpnServer
|
|
}
|
|
d.sync.GetFunnelURL = func() string {
|
|
return d.funnelURL
|
|
}
|
|
d.sync.OnSyncSuccess = func() {
|
|
d.State.LastHeartbeat = time.Now()
|
|
if d.GetActiveCount != nil {
|
|
d.State.ActiveTasks = d.GetActiveCount()
|
|
}
|
|
WriteState(&d.State)
|
|
}
|
|
|
|
// Start sync loop (blocks)
|
|
return d.sync.Run(ctx)
|
|
}
|
|
|
|
// TriggerSync requests an immediate sync cycle.
|
|
func (d *Daemon) TriggerSync() {
|
|
d.sync.TriggerSync()
|
|
}
|
|
|
|
// Deregister notifies the server of graceful shutdown.
|
|
func (d *Daemon) Deregister() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := d.client.Deregister(ctx, d.cfg.AgentID); err != nil {
|
|
log.Printf("Deregister failed: %v", err)
|
|
} else {
|
|
log.Println("Agent deregistered")
|
|
}
|
|
RemoveState()
|
|
}
|
|
|
|
// applyAutoUpgrade downloads the target version and exits so the service
|
|
// supervisor (systemd Restart=always on Linux) respawns on the new binary.
|
|
// Triggered by the server's upgrade signal — opt-in flag set by the user from
|
|
// the web UI; the daemon never auto-upgrades on a passive version bump.
|
|
//
|
|
// Reports the outcome to /api/internal/agent/upgrade-result so the server
|
|
// clears `upgrade_requested`. Without this report the flag stays sticky and
|
|
// the daemon would loop on every sync — including the no-op case where it's
|
|
// already on the target version.
|
|
func (d *Daemon) applyAutoUpgrade(targetVersion string) {
|
|
currentClean := strings.TrimPrefix(d.cfg.Version, "v")
|
|
targetClean := strings.TrimPrefix(targetVersion, "v")
|
|
|
|
// No-op: server signal arrived but we're already running the target. This
|
|
// happens when the daemon restarts after a previous auto-upgrade before
|
|
// reportUpgradeResult cleared the flag, or when the operator manually
|
|
// installed the same version off-band. Skip Execute (which would also
|
|
// no-op) AND skip os.Exit, but DO clear the flag — otherwise we loop.
|
|
if currentClean == targetClean {
|
|
log.Printf("[upgrade] already on v%s — clearing server flag", currentClean)
|
|
ctxR, cancelR := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancelR()
|
|
if err := d.client.ReportUpgradeResult(ctxR, d.cfg.AgentID, true, currentClean, ""); err != nil {
|
|
log.Printf("[upgrade] report-result failed (will retry on next signal): %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
upgrader := &upgrade.Upgrader{
|
|
CurrentVersion: currentClean,
|
|
OnProgress: func(msg string) {
|
|
log.Printf("[upgrade] %s", msg)
|
|
},
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
|
defer cancel()
|
|
result := upgrader.Execute(ctx, targetVersion)
|
|
if !result.Success {
|
|
log.Printf("[upgrade] auto-upgrade failed: %v", result.Error)
|
|
errMsg := ""
|
|
if result.Error != nil {
|
|
errMsg = result.Error.Error()
|
|
}
|
|
ctxR, cancelR := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancelR()
|
|
if err := d.client.ReportUpgradeResult(ctxR, d.cfg.AgentID, false, targetClean, errMsg); err != nil {
|
|
log.Printf("[upgrade] report-result failed: %v", err)
|
|
}
|
|
return
|
|
}
|
|
log.Printf("[upgrade] upgraded v%s → v%s; reporting result + exiting so service supervisor restarts on new binary",
|
|
result.OldVersion, result.NewVersion)
|
|
ctxR, cancelR := context.WithTimeout(context.Background(), 10*time.Second)
|
|
if err := d.client.ReportUpgradeResult(ctxR, d.cfg.AgentID, true, result.NewVersion, ""); err != nil {
|
|
log.Printf("[upgrade] report-result failed: %v", err)
|
|
}
|
|
cancelR()
|
|
time.Sleep(500 * time.Millisecond)
|
|
os.Exit(0)
|
|
}
|
|
|
|
// isTransientError returns true for errors worth retrying (429, 5xx, network).
|
|
func isTransientError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
var httpErr *HTTPError
|
|
if errors.As(err, &httpErr) {
|
|
return httpErr.StatusCode == 429 || httpErr.StatusCode >= 500
|
|
}
|
|
lower := strings.ToLower(err.Error())
|
|
for _, keyword := range []string{"connection refused", "no such host", "timeout", "request failed"} {
|
|
if strings.Contains(lower, keyword) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|