feat(sync): replace WS+DO transport with unified HTTP sync

Replace the WebSocket + Cloudflare Durable Object architecture with a
single POST /sync endpoint. The CLI now operates autonomously with local
state (tasks.json) and syncs bidirectionally via adaptive-interval HTTP
polling (3s watching, 60s idle).

- Remove transport_ws, transport_hybrid, transport_http (~2,600 lines)
- Add SyncClient with adaptive interval loop
- Add LocalState for CLI-side task persistence
- Add TaskStateFromUpdate() helper (DRY)
- Extract finalize() to deduplicate processTask/processTaskRetry
- Consolidate shortID() into agent.ShortID (was in 3 packages)
- Wire GetActiveCount so `unarr status` shows active tasks
- Remove poll_interval, heartbeat_interval, ws_url from config
- Simplify ProgressReporter (sync replaces direct HTTP reporting)
This commit is contained in:
Deivid Soto 2026-04-08 18:50:59 +02:00
parent 2398707cc1
commit 5d4a67c7a2
26 changed files with 1320 additions and 3400 deletions

View file

@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
@ -27,13 +26,13 @@ func newStartCmd() *cobra.Command {
Short: "Start the download daemon (foreground)",
Long: `Start the unarr daemon in the foreground.
Registers with the server, receives download tasks via WebSocket (with
HTTP fallback), and executes them using the configured download method.
Registers with the server, receives download tasks via periodic sync,
and executes them using the configured download method.
Supports torrent, debrid, and usenet downloads concurrently.
The daemon sends periodic heartbeats and reports download progress back
to the web dashboard. Press Ctrl+C to stop gracefully active downloads
get up to 30 seconds to finish.
The daemon syncs state with the server every 3s when someone is viewing
the web dashboard, or every 60s when idle. Press Ctrl+C to stop
gracefully active downloads get up to 30 seconds to finish.
Requires: API key, agent ID, and download directory (run 'unarr init' first).
@ -127,85 +126,59 @@ func runDaemonStart() error {
bold.Println(" unarr Daemon")
fmt.Println()
// Parse intervals
pollInterval, _ := time.ParseDuration(cfg.Daemon.PollInterval)
if pollInterval == 0 {
pollInterval = 30 * time.Second
}
heartbeatInterval, _ := time.ParseDuration(cfg.Daemon.HeartbeatInterval)
if heartbeatInterval == 0 {
heartbeatInterval = 30 * time.Second
}
statusInterval, _ := time.ParseDuration(cfg.Daemon.StatusInterval)
if statusInterval == 0 {
statusInterval = 3 * time.Second
}
userAgent := "unarr/" + Version
// Create daemon config
daemonCfg := agent.DaemonConfig{
AgentID: cfg.Agent.ID,
AgentName: cfg.Agent.Name,
Version: Version,
DownloadDir: cfg.Download.Dir,
PollInterval: pollInterval,
HeartbeatInterval: heartbeatInterval,
StreamPort: cfg.Download.StreamPort,
LanIP: engine.LanIP(),
TailscaleIP: engine.TailscaleIP(),
AgentID: cfg.Agent.ID,
AgentName: cfg.Agent.Name,
Version: Version,
DownloadDir: cfg.Download.Dir,
StreamPort: cfg.Download.StreamPort,
LanIP: engine.LanIP(),
TailscaleIP: engine.TailscaleIP(),
}
// Create transport: Hybrid (WS + HTTP fallback) or HTTP-only
httpT := agent.NewHTTPTransport(cfg.Auth.APIURL, cfg.Auth.APIKey, userAgent)
wsURL := cfg.Auth.WSURL
if wsURL == "" {
wsURL = deriveWSURL(cfg.Auth.APIURL, cfg.Agent.ID)
}
var transport agent.Transport
if wsURL != "" {
wsT := agent.NewWSTransport(wsURL, cfg.Auth.APIKey, cfg.Agent.ID, userAgent)
transport = agent.NewHybridTransport(wsT, httpT)
log.Printf("Transport: WebSocket (fallback: HTTP) → %s", wsURL)
} else {
transport = httpT
log.Println("Transport: HTTP only")
}
// Create daemon — always uses Transport interface
d := agent.NewDaemon(daemonCfg, transport)
// Create agent client for watch progress reporting
// Create HTTP client — single communication channel
agentClient := agent.NewClient(cfg.Auth.APIURL, cfg.Auth.APIKey, userAgent)
log.Printf("Transport: HTTP sync → %s", cfg.Auth.APIURL)
// Create daemon
d := agent.NewDaemon(daemonCfg, agentClient)
// Start SIGUSR1 reload watcher (unix only, no-op on Windows)
startReloadWatcher(&ReloadableConfig{Daemon: d})
// Daemon-scoped context — cancelled on shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create progress reporter using transport
reporter := engine.NewProgressReporterWithTransport(transport, statusInterval)
reporter.SetWatchingFunc(func() bool { return d.Watching.Load() })
reporter.SetWatchingChangedHandler(func(watching bool) { d.Watching.Store(watching) })
// Parse speed limits
maxDl, _ := config.ParseSpeed(cfg.Download.MaxDownloadSpeed)
maxUl, _ := config.ParseSpeed(cfg.Download.MaxUploadSpeed)
// Parse torrent timeouts from config (default: 0 = unlimited, like qBittorrent)
// Parse torrent timeouts
metaTimeout, _ := time.ParseDuration(cfg.Download.MetadataTimeout)
stallTimeout, _ := time.ParseDuration(cfg.Download.StallTimeout)
// Create progress reporter — only used for stream tasks (handleStreamTask)
// The sync goroutine handles all regular progress reporting.
statusInterval, _ := time.ParseDuration(cfg.Daemon.StatusInterval)
if statusInterval == 0 {
statusInterval = 3 * time.Second
}
reporter := engine.NewProgressReporter(agentClient, statusInterval)
reporter.SetWatchingFunc(func() bool { return d.Watching.Load() })
// Create torrent downloader
torrentDl, err := engine.NewTorrentDownloader(engine.TorrentConfig{
DataDir: cfg.Download.Dir,
MetadataTimeout: metaTimeout, // 0 = unlimited (default)
StallTimeout: stallTimeout, // 0 = unlimited (default)
MaxTimeout: 0, // unlimited
MetadataTimeout: metaTimeout,
StallTimeout: stallTimeout,
MaxTimeout: 0,
MaxDownloadRate: maxDl,
MaxUploadRate: maxUl,
ListenPort: cfg.Download.ListenPort, // 0 = default 42069
ListenPort: cfg.Download.ListenPort,
SeedEnabled: false,
})
if err != nil {
@ -223,7 +196,7 @@ func runDaemonStart() error {
log.Printf("Speed limits: download=%s upload=%s", dlStr, ulStr)
}
// Create debrid downloader (HTTPS-based, no provider interaction needed)
// Create debrid downloader
debridDl := engine.NewDebridDownloader()
// Create download manager
@ -237,170 +210,53 @@ func runDaemonStart() error {
TVShowsDir: cfg.Organize.TVShowsDir,
OutputDir: cfg.Download.Dir,
},
}, reporter, torrentDl, debridDl, engine.NewUsenetDownloader(httpT.Client()))
}, reporter, torrentDl, debridDl, engine.NewUsenetDownloader(agentClient))
// Create persistent stream server — lives for the entire daemon lifecycle.
// One port, one server, swap files with SetFile(). No more port churn.
// Create persistent stream server
streamSrv := engine.NewStreamServer(cfg.Download.StreamPort)
if err := streamSrv.Listen(ctx); err != nil {
return fmt.Errorf("start stream server: %w", err)
}
// Update heartbeat with actual port (may differ if configured port was busy)
d.UpdateStreamPort(streamSrv.Port())
// Wire state tracking
// Wire sync client callbacks
sc := d.SyncClient()
sc.GetFreeSlots = manager.FreeSlots
sc.GetTaskStates = manager.TaskStates
d.GetActiveCount = manager.ActiveCount
d.GetCleanableBytes = CleanableBytes
// Wire: server-side signals -> manager actions + stream tasks
reporter.SetCancelHandler(func(taskID string) {
manager.CancelTask(taskID)
cancelStreamTask(taskID)
})
reporter.SetPauseHandler(func(taskID string) {
manager.PauseTask(taskID)
cancelStreamTask(taskID)
})
reporter.SetDeleteFilesHandler(func(taskID string) {
manager.CancelAndDeleteFiles(taskID)
cancelStreamTask(taskID)
})
// Wire: stream requested on active download → set file on persistent server
reporter.SetStreamRequestedHandler(func(taskID string) {
task := manager.GetTask(taskID)
if task == nil {
log.Printf("[%s] stream requested but task not found in manager", taskID[:8])
return
}
if task.GetStreamURL() != "" {
return // already streaming
}
provider, err := torrentDl.GetStreamProvider(taskID)
if err != nil {
log.Printf("[%s] stream failed: %v", taskID[:8], err)
return
}
cancelStreamContexts()
streamSrv.SetFile(provider, taskID)
task.SetStreamURL(streamSrv.URLsJSON())
log.Printf("[%s] streaming active download: %s", taskID[:8], provider.FileName())
// Start watch progress reporter with cancellable context
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // cancel stored in streamRegistry, called by cancelStreamContexts()
streamRegistry.mu.Lock()
streamRegistry.cancels["watch:"+taskID] = watchCancel
streamRegistry.mu.Unlock()
go engine.NewWatchReporter(agentClient, streamSrv, taskID).Run(watchCtx)
})
// Wire: daemon claimed tasks -> manager
// Trigger immediate sync when a download slot frees up
manager.OnTaskDone = func() { d.TriggerSync() }
// Wire: sync receives new tasks → submit to manager or handle stream
d.OnTasksClaimed = func(tasks []agent.Task) {
for _, t := range tasks {
if t.Mode == "stream" {
// Skip if already streaming this task
if isStreamingTask(t.ID) {
continue
}
// Only 1 stream at a time: cancel existing stream goroutines + clear file
cancelStreamContexts()
streamSrv.ClearFile()
// Reserve slot before spawning goroutine to prevent TOCTOU race.
streamCtx, streamCancel := context.WithCancel(ctx) //nolint:gosec // G118: cancel ownership transferred to streamRegistry
streamCtx, streamCancel := context.WithCancel(ctx) //nolint:gosec // G118: cancel stored in registry
streamRegistry.mu.Lock()
streamRegistry.cancels[t.ID] = streamCancel
streamRegistry.mu.Unlock()
go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv)
} else if t.ForceStart || manager.HasCapacity() {
manager.Submit(ctx, t)
} else {
log.Printf("[%s] skipped: no capacity (max %d)", t.ID[:8], cfg.Download.MaxConcurrent)
manager.Submit(ctx, t)
}
}
}
// Wire: stream requests for completed downloads → set file on persistent server
d.OnStreamRequested = func(sr agent.StreamRequest) {
// Already serving this task — just notify server it's ready
if streamSrv.CurrentTaskID() == sr.TaskID {
go func() {
if _, err := transport.SendProgress(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamReady: true,
}); err != nil {
log.Printf("[%s] stream ready re-notify failed: %v", sr.TaskID[:8], err)
}
}()
return
}
filePath := sr.FilePath
info, err := os.Stat(filePath)
if err != nil {
log.Printf("[%s] stream request: file not found: %s", sr.TaskID[:8], filePath)
go func() {
if _, err := transport.SendProgress(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
Status: "failed",
ErrorMessage: fmt.Sprintf("file not found: %s", filePath),
}); err != nil {
log.Printf("[%s] stream error report failed: %v", sr.TaskID[:8], err)
}
}()
return
}
// If filePath is a directory, find the largest video file inside
if info.IsDir() {
found := engine.FindVideoFile(filePath)
if found == "" {
log.Printf("[%s] stream request: no video file in directory: %s", sr.TaskID[:8], filePath)
go func() {
if _, err := transport.SendProgress(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
Status: "failed",
ErrorMessage: fmt.Sprintf("no video file in directory: %s", filePath),
}); err != nil {
log.Printf("[%s] stream error report failed: %v", sr.TaskID[:8], err)
}
}()
return
}
filePath = found
log.Printf("[%s] resolved directory to video file: %s", sr.TaskID[:8], filepath.Base(filePath))
}
// Cancel any active stream goroutines and swap file on the persistent server
cancelStreamContexts()
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID)
log.Printf("[%s] streaming from disk: %s → %s", sr.TaskID[:8], filepath.Base(filePath), streamSrv.URL())
// Start watch progress reporter with a cancellable context
// so it stops when the user switches to a different stream.
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // cancel stored in streamRegistry, called by cancelStreamContexts()
streamRegistry.mu.Lock()
streamRegistry.cancels["watch:"+sr.TaskID] = watchCancel
streamRegistry.mu.Unlock()
go engine.NewWatchReporter(agentClient, streamSrv, sr.TaskID).Run(watchCtx)
// Notify server that stream is ready (clears streamRequested flag)
go func() {
if _, err := transport.SendProgress(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamReady: true,
}); err != nil {
log.Printf("[%s] stream ready report failed: %v", sr.TaskID[:8], err)
}
}()
}
// Wire: WS control actions (pause/cancel/stream pushed from server)
d.OnControlAction = func(action, taskID string) {
// Wire: sync receives control signals → act on manager
d.OnControlAction = func(action, taskID string, deleteFiles bool) {
switch action {
case "cancel":
manager.CancelTask(taskID)
if deleteFiles {
manager.CancelAndDeleteFiles(taskID)
} else {
manager.CancelTask(taskID)
}
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
@ -412,10 +268,9 @@ func runDaemonStart() error {
streamSrv.ClearFile()
}
case "resume":
log.Printf("[%s] resume requested via WebSocket, triggering poll", taskID[:8])
d.TriggerPoll()
log.Printf("[%s] resume requested, triggering sync", agent.ShortID(taskID))
d.TriggerSync()
case "stream":
// Skip if already streaming this task
if streamSrv.CurrentTaskID() == taskID {
return
}
@ -425,13 +280,19 @@ func runDaemonStart() error {
}
provider, err := torrentDl.GetStreamProvider(taskID)
if err != nil {
log.Printf("[%s] stream failed: %v", taskID[:8], err)
log.Printf("[%s] stream failed: %v", agent.ShortID(taskID), err)
return
}
cancelStreamContexts()
streamSrv.SetFile(provider, taskID)
task.SetStreamURL(streamSrv.URLsJSON())
log.Printf("[%s] streaming via WS: %s", taskID[:8], provider.FileName())
log.Printf("[%s] streaming: %s", agent.ShortID(taskID), provider.FileName())
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
streamRegistry.mu.Lock()
streamRegistry.cancels["watch:"+taskID] = watchCancel
streamRegistry.mu.Unlock()
go engine.NewWatchReporter(agentClient, streamSrv, taskID).Run(watchCtx)
case "stop-stream":
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
@ -440,19 +301,77 @@ func runDaemonStart() error {
}
}
// Config hot-reload (SIGUSR1 on Unix, no-op on Windows)
// Tickers are initialized inside d.Run(), so we pass the daemon
// and the reload goroutine reads them when the signal arrives.
startReloadWatcher(&ReloadableConfig{Daemon: d})
// Wire: sync receives stream requests for completed downloads
d.OnStreamRequested = func(sr agent.StreamRequest) {
if streamSrv.CurrentTaskID() == sr.TaskID {
// Already serving — notify server it's ready
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamReady: true,
}); err != nil {
log.Printf("[%s] stream ready re-notify failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
return
}
// Signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
filePath := sr.FilePath
info, err := os.Stat(filePath)
if err != nil {
log.Printf("[%s] stream request: file not found: %s", agent.ShortID(sr.TaskID), filePath)
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
Status: "failed",
ErrorMessage: fmt.Sprintf("file not found: %s", filePath),
}); err != nil {
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
return
}
// Start progress reporter in background
go reporter.Run(ctx)
if info.IsDir() {
found := engine.FindVideoFile(filePath)
if found == "" {
log.Printf("[%s] stream request: no video file in directory: %s", agent.ShortID(sr.TaskID), filePath)
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
Status: "failed",
ErrorMessage: fmt.Sprintf("no video file in directory: %s", filePath),
}); err != nil {
log.Printf("[%s] stream error report failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
return
}
filePath = found
log.Printf("[%s] resolved directory to video file: %s", agent.ShortID(sr.TaskID), filepath.Base(filePath))
}
// Periodic DHT node persistence (every 5 min) — protects against crash data loss
cancelStreamContexts()
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID)
log.Printf("[%s] streaming from disk: %s → %s", agent.ShortID(sr.TaskID), filepath.Base(filePath), streamSrv.URL())
watchCtx, watchCancel := context.WithCancel(ctx) //nolint:gosec // G118
streamRegistry.mu.Lock()
streamRegistry.cancels["watch:"+sr.TaskID] = watchCancel
streamRegistry.mu.Unlock()
go engine.NewWatchReporter(agentClient, streamSrv, sr.TaskID).Run(watchCtx)
go func() {
if _, err := agentClient.ReportStatus(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamReady: true,
}); err != nil {
log.Printf("[%s] stream ready report failed: %v", agent.ShortID(sr.TaskID), err)
}
}()
}
// Periodic DHT node persistence (every 5 min)
go func() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
@ -466,8 +385,7 @@ func runDaemonStart() error {
}
}()
// Start auto-scan goroutine (daily library scan + sync)
// Default scan_path to download dir so auto-scan works out of the box.
// Start auto-scan goroutine
scanPath := cfg.Library.ScanPath
if scanPath == "" {
scanPath = cfg.Download.Dir
@ -484,7 +402,10 @@ func runDaemonStart() error {
go runAutoScan(ctx, scanCfg, scanInterval, agentClient, d.ScanNow)
}
// Start daemon (blocks)
// Start reporter only for stream task handling
go reporter.Run(ctx)
// Start daemon (blocks — runs sync loop)
errCh := make(chan error, 1)
go func() {
errCh <- d.Run(ctx)
@ -493,6 +414,10 @@ func runDaemonStart() error {
// Start idle guard for the persistent stream server
go startIdleGuard(ctx, streamSrv)
// Signal handling
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// Wait for signal or error
select {
case sig := <-sigCh:
@ -506,6 +431,7 @@ func runDaemonStart() error {
defer shutdownCancel()
manager.Shutdown(shutdownCtx)
d.Deregister()
fmt.Println(" Daemon stopped.")
return nil
@ -517,41 +443,6 @@ func runDaemonStart() error {
}
}
// deriveWSURL derives a WebSocket URL from the API URL.
// https://torrentclaw.com → wss://unarr.torrentclaw.com/ws/{agentId}
// Returns "" for localhost/dev environments where WS gateway isn't available.
func deriveWSURL(apiURL, agentID string) string {
if apiURL == "" || agentID == "" {
return ""
}
// Parse domain from API URL
domain := apiURL
for _, prefix := range []string{"https://", "http://"} {
if len(domain) > len(prefix) && domain[:len(prefix)] == prefix {
domain = domain[len(prefix):]
break
}
}
// Strip trailing slash/path
for i := 0; i < len(domain); i++ {
if domain[i] == '/' {
domain = domain[:i]
break
}
}
// Strip port if present
if idx := strings.LastIndex(domain, ":"); idx > 0 {
domain = domain[:idx]
}
// Skip WS for localhost/dev — gateway only available in production
if domain == "localhost" || domain == "127.0.0.1" || domain == "0.0.0.0" {
return ""
}
return "wss://unarr." + domain + "/ws/" + agentID
}
func formatSpeedLog(bps int64) string {
switch {
case bps >= 1024*1024*1024:
@ -569,11 +460,9 @@ func formatSpeedLog(bps int64) string {
func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, ac *agent.Client, scanNow <-chan struct{}) {
log.Printf("[auto-scan] enabled: every %s, path: %s", interval, cfg.Library.ScanPath)
// Run first scan after a short delay (let daemon stabilize)
select {
case <-time.After(30 * time.Second):
case <-scanNow:
// Immediate scan requested before initial delay
case <-ctx.Done():
return
}
@ -608,7 +497,6 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration,
return
}
// Sync to server
items := library.BuildSyncItems(cache)
if len(items) == 0 {
log.Printf("[auto-scan] no items to sync")