unarr/internal/agent/sync_test.go
Deivid Soto 78c16c295e test: add comprehensive test suite for engine, agent and cmd packages
- Refactor download.go and stream.go with downloadDeps/streamDeps structs
  for dependency injection, enabling unit testing without real I/O
- download_test.go: 15 tests — input validation, mock downloaders, method
  selection, cobra Args, deadlock detection
- stream_test.go: input validation, noOpen flag, engine error handling
- client_test.go: context cancellation, timeout, full Sync roundtrip,
  watch-progress and HTTP error unwrapping
- sync_test.go: TriggerSync on watching transition, adjustInterval
- torrent_test.go: TorrentDownloader lifecycle without network
- stream_server_test.go: HTTP server lifecycle, SetFile/ClearFile,
  concurrent requests, Shutdown releases port, content-type
- manager_integration_test.go: full pipeline — success, torrent→debrid
  fallback, all-fail, multi-concurrent, ForceStart, OnTaskDone,
  recent-finished drain, cancel mid-download, organize
- usenet_test.go: Cancel/Pause race regression test (run with -race)
- daemon_test.go: isAllowedStreamPath table tests
- CI: split coverage gate to engine+agent only (50% threshold); cmd
  coverage still reported but not gated (interactive UI commands)
- lefthook: add pre-push hook with go test -race -count=1 -timeout=120s
2026-04-08 23:36:00 +02:00

542 lines
14 KiB
Go

package agent
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
)
func newTestSyncClient(url string) (*SyncClient, *Client) {
client := NewClient(url, "test-key", "test-agent/1.0")
cfg := DaemonConfig{
AgentID: "test-agent",
AgentName: "Test",
Version: "1.0.0",
DownloadDir: "/tmp/downloads",
}
state := NewLocalState()
sc := NewSyncClient(client, cfg, state)
return sc, client
}
func TestSyncClient_NewDefaults(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
if sc.Watching() {
t.Error("should not be watching initially")
}
if sc.currentInterval() != SyncIntervalIdle {
t.Errorf("expected idle interval %v, got %v", SyncIntervalIdle, sc.currentInterval())
}
}
func TestSyncClient_AdjustInterval_Watching(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
sc.adjustInterval(true)
if sc.currentInterval() != SyncIntervalWatching {
t.Errorf("expected watching interval %v, got %v", SyncIntervalWatching, sc.currentInterval())
}
if !sc.Watching() {
t.Error("expected watching=true")
}
}
func TestSyncClient_AdjustInterval_NotWatching(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
// First set watching, then unset
sc.adjustInterval(true)
sc.adjustInterval(false)
if sc.currentInterval() != SyncIntervalIdle {
t.Errorf("expected idle interval %v, got %v", SyncIntervalIdle, sc.currentInterval())
}
if sc.Watching() {
t.Error("expected watching=false")
}
}
func TestSyncClient_AdjustInterval_CallsOnWatchingChange(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
var changes []bool
sc.OnWatchingChange = func(w bool) { changes = append(changes, w) }
sc.adjustInterval(true)
sc.adjustInterval(true) // no change
sc.adjustInterval(false) // change
if len(changes) != 2 {
t.Fatalf("expected 2 changes, got %d: %v", len(changes), changes)
}
if !changes[0] {
t.Error("first change should be true")
}
if changes[1] {
t.Error("second change should be false")
}
}
func TestSyncClient_TriggerSync_NonBlocking(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
// Fill the channel
sc.TriggerSync()
// Should not block
sc.TriggerSync()
sc.TriggerSync()
// Drain
select {
case <-sc.SyncNow:
default:
t.Error("expected a sync trigger in channel")
}
}
func TestSyncClient_ProcessResponse_NewTasks(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
var received []Task
sc.OnNewTasks = func(tasks []Task) { received = tasks }
sc.processResponse(&SyncResponse{
NewTasks: []Task{
{ID: "t1", Title: "Movie 1", InfoHash: "abc"},
{ID: "t2", Title: "Movie 2", InfoHash: "def"},
},
})
if len(received) != 2 {
t.Fatalf("expected 2 tasks, got %d", len(received))
}
if received[0].Title != "Movie 1" {
t.Errorf("expected Movie 1, got %s", received[0].Title)
}
}
func TestSyncClient_ProcessResponse_NoTasks(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
var called bool
sc.OnNewTasks = func(tasks []Task) { called = true }
sc.processResponse(&SyncResponse{NewTasks: nil})
if called {
t.Error("OnNewTasks should not be called with empty tasks")
}
}
func TestSyncClient_ProcessResponse_Controls(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
var actions []string
var taskIDs []string
sc.OnControl = func(action, taskID string, deleteFiles bool) {
actions = append(actions, action)
taskIDs = append(taskIDs, taskID)
}
sc.processResponse(&SyncResponse{
Controls: []ControlAction{
{Action: "cancel", TaskID: "task-1234-5678"},
{Action: "pause", TaskID: "task-abcd-efgh"},
},
})
if len(actions) != 2 {
t.Fatalf("expected 2 controls, got %d", len(actions))
}
if actions[0] != "cancel" {
t.Errorf("expected cancel, got %s", actions[0])
}
if actions[1] != "pause" {
t.Errorf("expected pause, got %s", actions[1])
}
}
func TestSyncClient_ProcessResponse_Upgrade(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
var version string
sc.OnUpgrade = func(v string) { version = v }
sc.processResponse(&SyncResponse{
Upgrade: &UpgradeSignal{Version: "2.0.0"},
})
if version != "2.0.0" {
t.Errorf("expected 2.0.0, got %s", version)
}
}
func TestSyncClient_ProcessResponse_UpgradeEmpty(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
var called bool
sc.OnUpgrade = func(v string) { called = true }
sc.processResponse(&SyncResponse{
Upgrade: &UpgradeSignal{Version: ""},
})
if called {
t.Error("OnUpgrade should not be called with empty version")
}
}
func TestSyncClient_ProcessResponse_Scan(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
var called bool
sc.OnScan = func() { called = true }
sc.processResponse(&SyncResponse{Scan: true})
if !called {
t.Error("OnScan should have been called")
}
}
func TestSyncClient_ProcessResponse_StreamRequests(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
var received []StreamRequest
sc.OnStreamRequest = func(sr StreamRequest) { received = append(received, sr) }
sc.processResponse(&SyncResponse{
StreamRequests: []StreamRequest{
{TaskID: "t1", FilePath: "/tmp/movie.mkv"},
},
})
if len(received) != 1 {
t.Fatalf("expected 1 stream request, got %d", len(received))
}
if received[0].FilePath != "/tmp/movie.mkv" {
t.Errorf("expected /tmp/movie.mkv, got %s", received[0].FilePath)
}
}
func TestSyncClient_BuildRequest_WithGetTaskStates(t *testing.T) {
sc, _ := newTestSyncClient("http://localhost")
sc.GetTaskStates = func() []TaskState {
return []TaskState{
{TaskID: "t1", Status: "downloading", Progress: 50},
}
}
sc.GetFreeSlots = func() int { return 2 }
req := sc.buildRequest()
if req.AgentID != "test-agent" {
t.Errorf("expected test-agent, got %s", req.AgentID)
}
if len(req.Tasks) != 1 {
t.Fatalf("expected 1 task, got %d", len(req.Tasks))
}
if req.Tasks[0].Progress != 50 {
t.Errorf("expected progress 50, got %d", req.Tasks[0].Progress)
}
if req.FreeSlots != 2 {
t.Errorf("expected 2 free slots, got %d", req.FreeSlots)
}
}
func TestSyncClient_BuildRequest_FallbackToState(t *testing.T) {
client := NewClient("http://localhost", "key", "ua")
state := NewLocalState()
state.Update(TaskState{TaskID: "t1", Status: "completed", Progress: 100})
sc := NewSyncClient(client, DaemonConfig{AgentID: "a1", Version: "1.0"}, state)
// GetTaskStates is nil — should fall back to state.Snapshot()
req := sc.buildRequest()
if len(req.Tasks) != 1 {
t.Fatalf("expected 1 task from state fallback, got %d", len(req.Tasks))
}
}
func TestSyncClient_DoSync_Success(t *testing.T) {
var syncCount atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
syncCount.Add(1)
json.NewEncoder(w).Encode(SyncResponse{
Watching: true,
NewTasks: []Task{{ID: "t1", Title: "Test Movie", InfoHash: "abc"}},
})
}))
defer srv.Close()
sc, _ := newTestSyncClient(srv.URL)
var tasksReceived []Task
sc.OnNewTasks = func(tasks []Task) { tasksReceived = tasks }
sc.doSync(context.Background())
if syncCount.Load() != 1 {
t.Errorf("expected 1 sync call, got %d", syncCount.Load())
}
if len(tasksReceived) != 1 {
t.Fatalf("expected 1 task, got %d", len(tasksReceived))
}
if !sc.Watching() {
t.Error("expected watching=true after sync")
}
if sc.currentInterval() != SyncIntervalWatching {
t.Errorf("expected watching interval after sync")
}
}
func TestSyncClient_DoSync_Error(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer srv.Close()
sc, _ := newTestSyncClient(srv.URL)
// Should not panic on error
sc.doSync(context.Background())
}
func TestSyncClient_Run_CancelStopsLoop(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(SyncResponse{})
}))
defer srv.Close()
sc, _ := newTestSyncClient(srv.URL)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
err := sc.Run(ctx)
if err != nil {
t.Errorf("expected nil error, got %v", err)
}
}
// ---------------------------------------------------------------------------
// runWakeListener tests
// ---------------------------------------------------------------------------
func TestRunWakeListener_TriggersSyncOnWake(t *testing.T) {
// Server responds immediately with wake=true on the first call
var wakeCallCount atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/internal/agent/wake" {
wakeCallCount.Add(1)
json.NewEncoder(w).Encode(map[string]bool{"wake": true})
return
}
// sync endpoint — just respond OK
json.NewEncoder(w).Encode(SyncResponse{})
}))
defer srv.Close()
sc, _ := newTestSyncClient(srv.URL)
ctx, cancel := context.WithCancel(context.Background())
go sc.runWakeListener(ctx)
// Give the listener time to receive the wake and call TriggerSync
time.Sleep(200 * time.Millisecond)
cancel()
if wakeCallCount.Load() < 1 {
t.Error("expected at least one wake request")
}
// TriggerSync puts something in the buffered channel
select {
case <-sc.SyncNow:
// good — listener triggered a sync
default:
// channel may have been drained by Run (not running here) — check count
// The important thing is that wakeCallCount > 0 (request was made)
}
}
func TestRunWakeListener_ReconnectsAfterTimeout(t *testing.T) {
// Server returns wake=false (timeout) then wake=true on reconnect
callCount := atomic.Int32{}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/internal/agent/wake" {
json.NewEncoder(w).Encode(SyncResponse{})
return
}
n := callCount.Add(1)
if n == 1 {
// First call: timeout
json.NewEncoder(w).Encode(map[string]bool{"wake": false})
} else {
// Second call: wake
json.NewEncoder(w).Encode(map[string]bool{"wake": true})
}
}))
defer srv.Close()
sc, _ := newTestSyncClient(srv.URL)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go sc.runWakeListener(ctx)
// Wait for at least 2 wake calls (reconnect after timeout)
deadline := time.Now().Add(1500 * time.Millisecond)
for time.Now().Before(deadline) {
if callCount.Load() >= 2 {
break
}
time.Sleep(20 * time.Millisecond)
}
if callCount.Load() < 2 {
t.Errorf("expected at least 2 wake requests (reconnect after timeout), got %d", callCount.Load())
}
}
func TestRunWakeListener_RetriesAfterNetworkError(t *testing.T) {
// Server that refuses connections initially, then starts accepting
callCount := atomic.Int32{}
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/internal/agent/wake" {
json.NewEncoder(w).Encode(SyncResponse{})
return
}
callCount.Add(1)
json.NewEncoder(w).Encode(map[string]bool{"wake": false})
}))
defer srv.Close()
// Use a bad URL first, then switch — we can't easily switch URL, so
// test with a server that always errors (closed connection) via a custom transport
badClient := NewClient("http://127.0.0.1:1", "test-key", "unarr-test")
cfg := DaemonConfig{AgentID: "test-agent", Version: "1.0.0", DownloadDir: "/tmp"}
state := NewLocalState()
sc := NewSyncClient(badClient, cfg, state)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// Should not panic — just log errors and retry
done := make(chan struct{})
go func() {
sc.runWakeListener(ctx)
close(done)
}()
select {
case <-done:
// Good — listener exited when ctx was cancelled
case <-time.After(2 * time.Second):
t.Error("runWakeListener did not exit after context cancellation")
}
}
func TestRunWakeListener_StopsOnContextCancel(t *testing.T) {
// Server blocks until client disconnects
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/internal/agent/wake" {
<-r.Context().Done()
return
}
json.NewEncoder(w).Encode(SyncResponse{})
}))
defer srv.Close()
sc, _ := newTestSyncClient(srv.URL)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
sc.runWakeListener(ctx)
close(done)
}()
// Let it connect and block
time.Sleep(50 * time.Millisecond)
cancel()
select {
case <-done:
// Good
case <-time.After(2 * time.Second):
t.Error("runWakeListener did not stop when context was cancelled")
}
}
func TestRunWakeListener_DoesNotTriggerSyncOnTimeout(t *testing.T) {
// Server always returns wake=false — SyncNow channel should stay empty
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/api/internal/agent/wake" {
json.NewEncoder(w).Encode(map[string]bool{"wake": false})
return
}
json.NewEncoder(w).Encode(SyncResponse{})
}))
defer srv.Close()
sc, _ := newTestSyncClient(srv.URL)
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
go sc.runWakeListener(ctx)
<-ctx.Done()
// SyncNow should be empty (no wake triggered)
select {
case <-sc.SyncNow:
t.Error("expected no sync trigger on timeout response")
default:
// Good
}
}
func TestSyncClient_Run_ImmediateSyncOnTrigger(t *testing.T) {
var syncCount atomic.Int32
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
syncCount.Add(1)
json.NewEncoder(w).Encode(SyncResponse{})
}))
defer srv.Close()
sc, _ := newTestSyncClient(srv.URL)
// Set interval to something long so only triggers cause syncs
sc.interval.Store(int64(10 * time.Second))
ctx, cancel := context.WithCancel(context.Background())
go func() {
// Wait for initial sync, then trigger 2 more
time.Sleep(50 * time.Millisecond)
sc.TriggerSync()
time.Sleep(50 * time.Millisecond)
sc.TriggerSync()
time.Sleep(50 * time.Millisecond)
cancel()
}()
sc.Run(ctx)
// Initial sync (1) + 2 triggers + final sync = 4
count := syncCount.Load()
if count < 3 {
t.Errorf("expected at least 3 syncs (initial + 2 triggers), got %d", count)
}
}