OnUpgrade now downloads + replaces the binary and exits in a background goroutine; the service supervisor (systemd Restart=always) respawns on the new version. Removes the "run unarr update" manual step after pressing the web's Force update button.
329 lines
9.9 KiB
Go
329 lines
9.9 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/exec"
|
|
"runtime"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/torrentclaw/unarr/internal/upgrade"
|
|
)
|
|
|
|
// DaemonConfig holds daemon runtime settings.
|
|
type DaemonConfig struct {
|
|
AgentID string
|
|
AgentName string
|
|
Version string
|
|
DownloadDir string
|
|
StreamPort int // port for the HTTP stream server
|
|
LanIP string // LAN IP (reported in sync for stream URL resolution)
|
|
TailscaleIP string // Tailscale IP (reported in sync for stream URL resolution)
|
|
CanDelete bool // library.allow_delete is enabled
|
|
ScanPaths []string // configured scan paths for file deletion validation
|
|
HWAccel string // detected encoder backend ("nvenc"/"qsv"/"vaapi"/"videotoolbox"/"none")
|
|
MaxTranscodeHeight int // resolution cap the agent can transcode comfortably (px)
|
|
}
|
|
|
|
// Daemon manages agent registration and the sync loop.
|
|
type Daemon struct {
|
|
cfg DaemonConfig
|
|
client *Client
|
|
sync *SyncClient
|
|
state *LocalState
|
|
|
|
// Callbacks — set by cmd/daemon.go before calling Run.
|
|
OnTasksClaimed func(tasks []Task)
|
|
OnStreamRequested func(req StreamRequest)
|
|
OnStreamSession func(sess StreamSession)
|
|
OnControlAction func(action, taskID string, deleteFiles bool)
|
|
GetActiveCount func() int // returns number of active downloads (wired from manager)
|
|
|
|
// State
|
|
User UserInfo
|
|
Features FeatureFlags
|
|
Info AgentInfo
|
|
State DaemonState
|
|
lastNotifiedVersion string
|
|
|
|
// Managed-VPN split-tunnel state, set by cmd/daemon.go before Run and folded
|
|
// into DaemonState on every write so external tools (`unarr vpn status`) see it.
|
|
vpnActive bool
|
|
vpnMode string
|
|
vpnServer string
|
|
|
|
// CloudFlare Quick Tunnel public URL; folded into DaemonState + heartbeat
|
|
// so the web can prefer it over Tailscale/LAN for in-browser playback.
|
|
funnelURL string
|
|
|
|
// Watching tracks whether a user is viewing download progress in the web UI.
|
|
Watching atomic.Bool
|
|
|
|
// ScanNow triggers an immediate library scan.
|
|
ScanNow chan struct{}
|
|
}
|
|
|
|
// NewDaemon creates a daemon with an HTTP client for sync-based communication.
|
|
func NewDaemon(cfg DaemonConfig, client *Client) *Daemon {
|
|
state := NewLocalState()
|
|
return &Daemon{
|
|
cfg: cfg,
|
|
client: client,
|
|
state: state,
|
|
sync: NewSyncClient(client, cfg, state),
|
|
ScanNow: make(chan struct{}, 1),
|
|
}
|
|
}
|
|
|
|
// SyncClient returns the sync client for external wiring.
|
|
func (d *Daemon) SyncClient() *SyncClient { return d.sync }
|
|
|
|
// SetVPNState records the managed-VPN split-tunnel state so it's reflected in the
|
|
// daemon state file (read by `unarr vpn status`). Call before Run.
|
|
func (d *Daemon) SetVPNState(active bool, mode, server string) {
|
|
d.vpnActive = active
|
|
d.vpnMode = mode
|
|
d.vpnServer = server
|
|
}
|
|
|
|
// SetFunnelURL records the CloudFlare Quick Tunnel hostname so it's reflected
|
|
// in the daemon state file (read by `unarr funnel status`) and in heartbeat
|
|
// requests (so the web prefers it over Tailscale/LAN). Pass "" to clear.
|
|
func (d *Daemon) SetFunnelURL(url string) {
|
|
d.funnelURL = url
|
|
d.State.FunnelURL = url
|
|
WriteState(&d.State)
|
|
}
|
|
|
|
// UpdateStreamPort updates the stream port reported in sync requests.
|
|
func (d *Daemon) UpdateStreamPort(port int) {
|
|
d.cfg.StreamPort = port
|
|
d.sync.cfg.StreamPort = port
|
|
}
|
|
|
|
// Register registers the agent and fetches user info + features.
|
|
// Retries with exponential backoff on transient errors (429, 5xx, network).
|
|
func (d *Daemon) Register(ctx context.Context) error {
|
|
req := RegisterRequest{
|
|
AgentID: d.cfg.AgentID,
|
|
Name: d.cfg.AgentName,
|
|
OS: runtime.GOOS,
|
|
Arch: runtime.GOARCH,
|
|
Version: d.cfg.Version,
|
|
DownloadDir: d.cfg.DownloadDir,
|
|
StreamPort: d.cfg.StreamPort,
|
|
LanIP: d.cfg.LanIP,
|
|
TailscaleIP: d.cfg.TailscaleIP,
|
|
HWAccel: d.cfg.HWAccel,
|
|
MaxTranscodeHeight: d.cfg.MaxTranscodeHeight,
|
|
VPNActive: d.vpnActive,
|
|
VPNMode: d.vpnMode,
|
|
VPNServer: d.vpnServer,
|
|
FunnelURL: d.funnelURL,
|
|
}
|
|
if free, total, err := DiskInfo(d.cfg.DownloadDir); err == nil {
|
|
req.DiskFreeBytes = free
|
|
req.DiskTotalBytes = total
|
|
}
|
|
|
|
const maxRetries = 5
|
|
backoff := 5 * time.Second
|
|
|
|
var resp *RegisterResponse
|
|
var err error
|
|
for attempt := range maxRetries {
|
|
resp, err = d.client.Register(ctx, req)
|
|
if err == nil {
|
|
break
|
|
}
|
|
if !isTransientError(err) {
|
|
return fmt.Errorf("register: %w", err)
|
|
}
|
|
log.Printf("Register failed (attempt %d/%d): %v - retrying in %v", attempt+1, maxRetries, err, backoff)
|
|
timer := time.NewTimer(backoff)
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return fmt.Errorf("register: %w", ctx.Err())
|
|
case <-timer.C:
|
|
}
|
|
backoff = min(backoff*2, 60*time.Second)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("register: %w (after %d retries)", err, maxRetries)
|
|
}
|
|
|
|
d.User = resp.User
|
|
d.Features = resp.Features
|
|
now := time.Now()
|
|
d.Info = AgentInfo{
|
|
ID: d.cfg.AgentID,
|
|
Name: d.cfg.AgentName,
|
|
User: resp.User,
|
|
Features: resp.Features,
|
|
StartedAt: now,
|
|
}
|
|
d.State = DaemonState{
|
|
AgentID: d.cfg.AgentID,
|
|
Status: "running",
|
|
Version: d.cfg.Version,
|
|
PID: os.Getpid(),
|
|
StartedAt: now,
|
|
MethodStats: make(map[string]int),
|
|
VPNActive: d.vpnActive,
|
|
VPNMode: d.vpnMode,
|
|
VPNServer: d.vpnServer,
|
|
FunnelURL: d.funnelURL,
|
|
}
|
|
WriteState(&d.State)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Run registers the agent and starts the sync loop.
|
|
// Blocks until ctx is cancelled.
|
|
func (d *Daemon) Run(ctx context.Context) error {
|
|
// Register
|
|
if err := d.Register(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Printf("Agent registered: %s (%s) [%s]", d.User.Name, d.User.Email, d.User.Plan)
|
|
log.Printf("Features: torrent=%v debrid=%v usenet=%v", d.Features.Torrent, d.Features.Debrid, d.Features.Usenet)
|
|
|
|
// Usenet needs par2 (segment repair) + an extractor (RAR/7z) on the host.
|
|
// Without par2, a single bad segment corrupts the file silently; without
|
|
// an extractor, RAR-packed downloads can't be unpacked. Warn loudly at
|
|
// startup so the operator installs them before the first download fails.
|
|
if d.Features.Usenet {
|
|
if _, err := exec.LookPath("par2"); err != nil {
|
|
log.Printf("[usenet] WARNING: par2 not found in PATH — corrupted segments cannot be repaired and extraction may fail. Install par2 (apt install par2 / brew install par2).")
|
|
}
|
|
_, unrarErr := exec.LookPath("unrar")
|
|
_, sevenZErr := exec.LookPath("7z")
|
|
if unrarErr != nil && sevenZErr != nil {
|
|
log.Printf("[usenet] WARNING: no archive extractor (unrar or 7z) found — RAR-packed downloads cannot be unpacked. Install unrar or 7z.")
|
|
}
|
|
}
|
|
|
|
// Wire sync callbacks
|
|
d.sync.OnNewTasks = func(tasks []Task) {
|
|
if d.OnTasksClaimed != nil {
|
|
d.OnTasksClaimed(tasks)
|
|
}
|
|
}
|
|
d.sync.OnControl = func(action, taskID string, deleteFiles bool) {
|
|
if d.OnControlAction != nil {
|
|
d.OnControlAction(action, taskID, deleteFiles)
|
|
}
|
|
}
|
|
d.sync.OnStreamRequest = func(req StreamRequest) {
|
|
if d.OnStreamRequested != nil {
|
|
d.OnStreamRequested(req)
|
|
}
|
|
}
|
|
d.sync.OnStreamSession = func(sess StreamSession) {
|
|
if d.OnStreamSession != nil {
|
|
d.OnStreamSession(sess)
|
|
}
|
|
}
|
|
d.sync.OnUpgrade = func(version string) {
|
|
if version == d.lastNotifiedVersion {
|
|
return
|
|
}
|
|
d.lastNotifiedVersion = version
|
|
log.Printf("[upgrade] new version available: %s — applying auto-upgrade", version)
|
|
go d.applyAutoUpgrade(version)
|
|
}
|
|
d.sync.OnScan = func() {
|
|
log.Printf("Library scan requested by server")
|
|
select {
|
|
case d.ScanNow <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
d.sync.OnWatchingChange = func(watching bool) {
|
|
d.Watching.Store(watching)
|
|
}
|
|
d.sync.GetVPNState = func() (bool, string, string) {
|
|
return d.vpnActive, d.vpnMode, d.vpnServer
|
|
}
|
|
d.sync.GetFunnelURL = func() string {
|
|
return d.funnelURL
|
|
}
|
|
d.sync.OnSyncSuccess = func() {
|
|
d.State.LastHeartbeat = time.Now()
|
|
if d.GetActiveCount != nil {
|
|
d.State.ActiveTasks = d.GetActiveCount()
|
|
}
|
|
WriteState(&d.State)
|
|
}
|
|
|
|
// Start sync loop (blocks)
|
|
return d.sync.Run(ctx)
|
|
}
|
|
|
|
// TriggerSync requests an immediate sync cycle.
|
|
func (d *Daemon) TriggerSync() {
|
|
d.sync.TriggerSync()
|
|
}
|
|
|
|
// Deregister notifies the server of graceful shutdown.
|
|
func (d *Daemon) Deregister() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := d.client.Deregister(ctx, d.cfg.AgentID); err != nil {
|
|
log.Printf("Deregister failed: %v", err)
|
|
} else {
|
|
log.Println("Agent deregistered")
|
|
}
|
|
RemoveState()
|
|
}
|
|
|
|
// applyAutoUpgrade downloads the target version and exits so the service
|
|
// supervisor (systemd Restart=always on Linux) respawns on the new binary.
|
|
// Triggered by the server's upgrade signal — opt-in flag set by the user from
|
|
// the web UI; the daemon never auto-upgrades on a passive version bump.
|
|
func (d *Daemon) applyAutoUpgrade(targetVersion string) {
|
|
currentClean := strings.TrimPrefix(d.cfg.Version, "v")
|
|
upgrader := &upgrade.Upgrader{
|
|
CurrentVersion: currentClean,
|
|
OnProgress: func(msg string) {
|
|
log.Printf("[upgrade] %s", msg)
|
|
},
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
|
defer cancel()
|
|
result := upgrader.Execute(ctx, targetVersion)
|
|
if !result.Success {
|
|
log.Printf("[upgrade] auto-upgrade failed: %v", result.Error)
|
|
return
|
|
}
|
|
log.Printf("[upgrade] upgraded v%s → v%s; exiting so service supervisor restarts on new binary",
|
|
result.OldVersion, result.NewVersion)
|
|
time.Sleep(500 * time.Millisecond)
|
|
os.Exit(0)
|
|
}
|
|
|
|
// isTransientError returns true for errors worth retrying (429, 5xx, network).
|
|
func isTransientError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
var httpErr *HTTPError
|
|
if errors.As(err, &httpErr) {
|
|
return httpErr.StatusCode == 429 || httpErr.StatusCode >= 500
|
|
}
|
|
lower := strings.ToLower(err.Error())
|
|
for _, keyword := range []string{"connection refused", "no such host", "timeout", "request failed"} {
|
|
if strings.Contains(lower, keyword) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|