feat(cli): upgrade command, rich status, and version cache
- Replace `upgrade` stub with real command (alias for `self-update`) - Also register `update` as alias: `unarr update` works too - Rewrite `status` to show full config, disk usage, daemon state, and update availability with colored sections - Add version check cache (1h TTL) so `status` is instant on repeat runs - Guard against division by zero on empty filesystems - Guard against negative durations from clock skew - Guard against stale PID via heartbeat recency check (2 min) - Add comprehensive test coverage across agent, engine, upgrade, usenet, arr, library, mediaserver, and UI packages - Improve Makefile coverage target to exclude cmd/ glue code - Fix stream handler resource cleanup and ffprobe error handling
This commit is contained in:
parent
01d62ffa13
commit
3e0f3a5a64
33 changed files with 7084 additions and 65 deletions
|
|
@ -2,6 +2,7 @@ package engine
|
|||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
|
@ -83,3 +84,223 @@ func TestManagerShutdown(t *testing.T) {
|
|||
mgr.Shutdown(ctx)
|
||||
// Should not hang
|
||||
}
|
||||
|
||||
func TestManagerDefaultConcurrency(t *testing.T) {
|
||||
reporter := NewProgressReporter(
|
||||
agent.NewClient("http://localhost", "test", "test"),
|
||||
1*time.Second,
|
||||
)
|
||||
mgr := NewManager(ManagerConfig{MaxConcurrent: 0}, reporter)
|
||||
if cap(mgr.sem) != 3 {
|
||||
t.Errorf("default MaxConcurrent should be 3, got %d", cap(mgr.sem))
|
||||
}
|
||||
}
|
||||
|
||||
func TestManagerGetTask(t *testing.T) {
|
||||
reporter := NewProgressReporter(
|
||||
agent.NewClient("http://localhost", "test", "test"),
|
||||
1*time.Second,
|
||||
)
|
||||
mgr := NewManager(ManagerConfig{MaxConcurrent: 2}, reporter)
|
||||
|
||||
// No task added
|
||||
if task := mgr.GetTask("nonexistent"); task != nil {
|
||||
t.Error("expected nil for nonexistent task")
|
||||
}
|
||||
}
|
||||
|
||||
func TestManagerActiveTasks(t *testing.T) {
|
||||
reporter := NewProgressReporter(
|
||||
agent.NewClient("http://localhost", "test", "test"),
|
||||
1*time.Second,
|
||||
)
|
||||
mgr := NewManager(ManagerConfig{MaxConcurrent: 2}, reporter)
|
||||
|
||||
tasks := mgr.ActiveTasks()
|
||||
if len(tasks) != 0 {
|
||||
t.Errorf("expected 0 active tasks, got %d", len(tasks))
|
||||
}
|
||||
}
|
||||
|
||||
func TestManagerSubmitCompletesWithValidFile(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
// Create a file that verify() will accept
|
||||
filePath := dir + "/movie.mkv"
|
||||
os.WriteFile(filePath, make([]byte, 1024), 0o644)
|
||||
|
||||
reporter := &mockStatusReporter{}
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: 100 * time.Millisecond,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
}
|
||||
|
||||
dl := &resultMockDownloader{
|
||||
method: MethodTorrent,
|
||||
result: &Result{
|
||||
FilePath: filePath,
|
||||
FileName: "movie.mkv",
|
||||
Method: MethodTorrent,
|
||||
Size: 1024,
|
||||
},
|
||||
}
|
||||
|
||||
mgr := NewManager(ManagerConfig{
|
||||
MaxConcurrent: 2,
|
||||
OutputDir: dir,
|
||||
}, pr, dl)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
go pr.Run(ctx)
|
||||
|
||||
mgr.Submit(ctx, agent.Task{
|
||||
ID: "task-complete-test1",
|
||||
InfoHash: "abc123def456abc123def456abc123def456abc1",
|
||||
Title: "Test Movie",
|
||||
PreferredMethod: "torrent",
|
||||
})
|
||||
|
||||
mgr.Wait()
|
||||
cancel()
|
||||
|
||||
// Task should have completed successfully
|
||||
// (we can't check directly since it's removed from active map after processing)
|
||||
}
|
||||
|
||||
func TestManagerCancelTask(t *testing.T) {
|
||||
reporter := NewProgressReporter(
|
||||
agent.NewClient("http://localhost", "test", "test"),
|
||||
1*time.Second,
|
||||
)
|
||||
|
||||
dl := &slowMockDownloader{method: MethodTorrent}
|
||||
mgr := NewManager(ManagerConfig{
|
||||
MaxConcurrent: 2,
|
||||
OutputDir: t.TempDir(),
|
||||
}, reporter, dl)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
go reporter.Run(ctx)
|
||||
|
||||
mgr.Submit(ctx, agent.Task{
|
||||
ID: "task-cancel-test12",
|
||||
InfoHash: "abc123def456abc123def456abc123def456abc1",
|
||||
Title: "Cancel Me",
|
||||
PreferredMethod: "torrent",
|
||||
})
|
||||
|
||||
// Give it time to start
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
mgr.CancelTask("task-cancel-test12")
|
||||
mgr.Wait()
|
||||
}
|
||||
|
||||
func TestManagerPauseTask(t *testing.T) {
|
||||
reporter := NewProgressReporter(
|
||||
agent.NewClient("http://localhost", "test", "test"),
|
||||
1*time.Second,
|
||||
)
|
||||
|
||||
dl := &slowMockDownloader{method: MethodTorrent}
|
||||
mgr := NewManager(ManagerConfig{
|
||||
MaxConcurrent: 2,
|
||||
OutputDir: t.TempDir(),
|
||||
}, reporter, dl)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
go reporter.Run(ctx)
|
||||
|
||||
mgr.Submit(ctx, agent.Task{
|
||||
ID: "task-pause-test123",
|
||||
InfoHash: "abc123def456abc123def456abc123def456abc1",
|
||||
Title: "Pause Me",
|
||||
PreferredMethod: "torrent",
|
||||
})
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
mgr.PauseTask("task-pause-test123")
|
||||
mgr.Wait()
|
||||
}
|
||||
|
||||
func TestManagerCancelAndDeleteFiles(t *testing.T) {
|
||||
reporter := NewProgressReporter(
|
||||
agent.NewClient("http://localhost", "test", "test"),
|
||||
1*time.Second,
|
||||
)
|
||||
|
||||
dl := &slowMockDownloader{method: MethodTorrent}
|
||||
mgr := NewManager(ManagerConfig{
|
||||
MaxConcurrent: 2,
|
||||
OutputDir: t.TempDir(),
|
||||
}, reporter, dl)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
go reporter.Run(ctx)
|
||||
|
||||
mgr.Submit(ctx, agent.Task{
|
||||
ID: "task-delfile-test12",
|
||||
InfoHash: "abc123def456abc123def456abc123def456abc1",
|
||||
Title: "Delete Me",
|
||||
PreferredMethod: "torrent",
|
||||
})
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
mgr.CancelAndDeleteFiles("task-delfile-test12")
|
||||
mgr.Wait()
|
||||
}
|
||||
|
||||
func TestManagerCancelNonexistent(t *testing.T) {
|
||||
reporter := NewProgressReporter(
|
||||
agent.NewClient("http://localhost", "test", "test"),
|
||||
1*time.Second,
|
||||
)
|
||||
mgr := NewManager(ManagerConfig{MaxConcurrent: 2}, reporter)
|
||||
// Should not panic
|
||||
mgr.CancelTask("nonexistent")
|
||||
mgr.PauseTask("nonexistent")
|
||||
mgr.CancelAndDeleteFiles("nonexistent")
|
||||
}
|
||||
|
||||
// resultMockDownloader returns a configurable result
|
||||
type resultMockDownloader struct {
|
||||
method DownloadMethod
|
||||
result *Result
|
||||
}
|
||||
|
||||
func (m *resultMockDownloader) Method() DownloadMethod { return m.method }
|
||||
func (m *resultMockDownloader) Available(_ context.Context, _ *Task) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
func (m *resultMockDownloader) Download(_ context.Context, _ *Task, _ string, _ chan<- Progress) (*Result, error) {
|
||||
return m.result, nil
|
||||
}
|
||||
func (m *resultMockDownloader) Pause(_ string) error { return nil }
|
||||
func (m *resultMockDownloader) Cancel(_ string) error { return nil }
|
||||
func (m *resultMockDownloader) Shutdown(_ context.Context) error { return nil }
|
||||
|
||||
// slowMockDownloader blocks until context is cancelled
|
||||
type slowMockDownloader struct {
|
||||
method DownloadMethod
|
||||
}
|
||||
|
||||
func (m *slowMockDownloader) Method() DownloadMethod { return m.method }
|
||||
func (m *slowMockDownloader) Available(_ context.Context, _ *Task) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
func (m *slowMockDownloader) Download(ctx context.Context, _ *Task, _ string, _ chan<- Progress) (*Result, error) {
|
||||
<-ctx.Done()
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
func (m *slowMockDownloader) Pause(_ string) error { return nil }
|
||||
func (m *slowMockDownloader) Cancel(_ string) error { return nil }
|
||||
func (m *slowMockDownloader) Shutdown(_ context.Context) error { return nil }
|
||||
|
|
|
|||
50
internal/engine/method_test.go
Normal file
50
internal/engine/method_test.go
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
package engine
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestDownloadMethodConstants(t *testing.T) {
|
||||
if MethodTorrent != "torrent" {
|
||||
t.Errorf("MethodTorrent = %q, want torrent", MethodTorrent)
|
||||
}
|
||||
if MethodDebrid != "debrid" {
|
||||
t.Errorf("MethodDebrid = %q, want debrid", MethodDebrid)
|
||||
}
|
||||
if MethodUsenet != "usenet" {
|
||||
t.Errorf("MethodUsenet = %q, want usenet", MethodUsenet)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressStruct(t *testing.T) {
|
||||
p := Progress{
|
||||
DownloadedBytes: 1024,
|
||||
TotalBytes: 2048,
|
||||
SpeedBps: 512,
|
||||
ETA: 10,
|
||||
Peers: 5,
|
||||
Seeds: 3,
|
||||
FileName: "movie.mkv",
|
||||
}
|
||||
|
||||
if p.DownloadedBytes != 1024 {
|
||||
t.Errorf("DownloadedBytes = %d, want 1024", p.DownloadedBytes)
|
||||
}
|
||||
if p.FileName != "movie.mkv" {
|
||||
t.Errorf("FileName = %q, want movie.mkv", p.FileName)
|
||||
}
|
||||
}
|
||||
|
||||
func TestResultStruct(t *testing.T) {
|
||||
r := Result{
|
||||
FilePath: "/downloads/movie.mkv",
|
||||
FileName: "movie.mkv",
|
||||
Method: MethodTorrent,
|
||||
Size: 1073741824,
|
||||
}
|
||||
|
||||
if r.Method != MethodTorrent {
|
||||
t.Errorf("Method = %q, want torrent", r.Method)
|
||||
}
|
||||
if r.Size != 1073741824 {
|
||||
t.Errorf("Size = %d, want 1073741824", r.Size)
|
||||
}
|
||||
}
|
||||
181
internal/engine/organize_expand_test.go
Normal file
181
internal/engine/organize_expand_test.go
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestReplaceFile(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
backupDir := filepath.Join(tmp, "backups")
|
||||
|
||||
// Create "old" file
|
||||
oldPath := filepath.Join(tmp, "movie.mkv")
|
||||
os.WriteFile(oldPath, []byte("old content"), 0o644)
|
||||
|
||||
// Create "new" file
|
||||
newPath := filepath.Join(tmp, "movie-new.mkv")
|
||||
os.WriteFile(newPath, []byte("new better content"), 0o644)
|
||||
|
||||
err := replaceFile(oldPath, newPath, backupDir)
|
||||
if err != nil {
|
||||
t.Fatalf("replaceFile: %v", err)
|
||||
}
|
||||
|
||||
// Old path should now contain new content
|
||||
data, err := os.ReadFile(oldPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read old path: %v", err)
|
||||
}
|
||||
if string(data) != "new better content" {
|
||||
t.Errorf("old path content = %q, want 'new better content'", string(data))
|
||||
}
|
||||
|
||||
// Backup should exist
|
||||
entries, _ := os.ReadDir(backupDir)
|
||||
if len(entries) != 1 {
|
||||
t.Errorf("expected 1 backup file, got %d", len(entries))
|
||||
}
|
||||
|
||||
// New file should be gone
|
||||
if _, err := os.Stat(newPath); !os.IsNotExist(err) {
|
||||
t.Error("new file should have been moved/deleted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReplaceFileOldNotFound(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
err := replaceFile(filepath.Join(tmp, "nonexistent.mkv"), filepath.Join(tmp, "new.mkv"), "")
|
||||
if err == nil {
|
||||
t.Error("expected error when old file doesn't exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCopyFile(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
src := filepath.Join(tmp, "source.txt")
|
||||
dst := filepath.Join(tmp, "dest.txt")
|
||||
|
||||
content := []byte("hello world copy test")
|
||||
os.WriteFile(src, content, 0o644)
|
||||
|
||||
err := copyFile(src, dst)
|
||||
if err != nil {
|
||||
t.Fatalf("copyFile: %v", err)
|
||||
}
|
||||
|
||||
data, err := os.ReadFile(dst)
|
||||
if err != nil {
|
||||
t.Fatalf("read dest: %v", err)
|
||||
}
|
||||
if string(data) != string(content) {
|
||||
t.Errorf("dest content = %q, want %q", string(data), string(content))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCopyFileSrcNotFound(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
err := copyFile(filepath.Join(tmp, "nope.txt"), filepath.Join(tmp, "out.txt"))
|
||||
if err == nil {
|
||||
t.Error("expected error when source doesn't exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrganizeNoDirs(t *testing.T) {
|
||||
r := &Result{FilePath: "/tmp/file.mkv", FileName: "file.mkv"}
|
||||
task := &Task{Title: "Movie"}
|
||||
|
||||
path, err := organize(r, task, OrganizeConfig{Enabled: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if path != "/tmp/file.mkv" {
|
||||
t.Errorf("should return original path when no dirs configured, got %q", path)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrganizeNilResult(t *testing.T) {
|
||||
task := &Task{Title: "Movie"}
|
||||
path, err := organize(&Result{}, task, OrganizeConfig{Enabled: true})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if path != "" {
|
||||
t.Errorf("expected empty path for empty result, got %q", path)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrganizeMovieDirectory(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
srcDir := filepath.Join(tmp, "src", "MovieDir")
|
||||
os.MkdirAll(srcDir, 0o755)
|
||||
os.WriteFile(filepath.Join(srcDir, "movie.mkv"), []byte("data"), 0o644)
|
||||
|
||||
moviesDir := filepath.Join(tmp, "Movies")
|
||||
|
||||
r := &Result{FilePath: srcDir, FileName: "MovieDir"}
|
||||
task := &Task{Title: "My Movie 2023"}
|
||||
|
||||
path, err := organize(r, task, OrganizeConfig{
|
||||
Enabled: true,
|
||||
MoviesDir: moviesDir,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if path == srcDir {
|
||||
t.Error("directory should have moved")
|
||||
}
|
||||
if _, err := os.Stat(path); err != nil {
|
||||
t.Errorf("organized directory should exist at %s", path)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrganizeSeasonOnly(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
srcFile := filepath.Join(tmp, "Show.S01.Complete.mkv")
|
||||
os.WriteFile(srcFile, []byte("data"), 0o644)
|
||||
|
||||
tvDir := filepath.Join(tmp, "TV")
|
||||
|
||||
r := &Result{FilePath: srcFile, FileName: "Show.S01.Complete.mkv"}
|
||||
task := &Task{Title: "Show S01"}
|
||||
|
||||
path, err := organize(r, task, OrganizeConfig{
|
||||
Enabled: true,
|
||||
TVShowsDir: tvDir,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
dir := filepath.Dir(path)
|
||||
if filepath.Base(dir) != "Season 01" {
|
||||
t.Errorf("expected Season 01 directory, got %q", filepath.Base(dir))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanTitleEdgeCases(t *testing.T) {
|
||||
tests := []struct {
|
||||
input string
|
||||
want string
|
||||
}{
|
||||
{"", ""},
|
||||
{"Simple Title", "Simple Title"},
|
||||
{"Title (2023) 1080p BluRay", "Title"},
|
||||
{"Title 720p HDTV", "Title"},
|
||||
{"Title x264 HEVC", "Title"},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.input, func(t *testing.T) {
|
||||
got := cleanTitle(tt.input)
|
||||
if got != tt.want {
|
||||
t.Errorf("cleanTitle(%q) = %q, want %q", tt.input, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
419
internal/engine/progress_test.go
Normal file
419
internal/engine/progress_test.go
Normal file
|
|
@ -0,0 +1,419 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/torrentclaw/unarr/internal/agent"
|
||||
)
|
||||
|
||||
// mockStatusReporter records calls to ReportStatus.
|
||||
type mockStatusReporter struct {
|
||||
mu sync.Mutex
|
||||
calls []agent.StatusUpdate
|
||||
resp *agent.StatusResponse
|
||||
respErr error
|
||||
}
|
||||
|
||||
func (m *mockStatusReporter) ReportStatus(_ context.Context, update agent.StatusUpdate) (*agent.StatusResponse, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.calls = append(m.calls, update)
|
||||
if m.resp != nil {
|
||||
return m.resp, m.respErr
|
||||
}
|
||||
return &agent.StatusResponse{}, m.respErr
|
||||
}
|
||||
|
||||
// mockBatchReporter records batch calls.
|
||||
type mockBatchReporter struct {
|
||||
mockStatusReporter
|
||||
batchCalls [][]agent.StatusUpdate
|
||||
batchResp *agent.BatchStatusResponse
|
||||
}
|
||||
|
||||
func (m *mockBatchReporter) BatchReportStatus(_ context.Context, updates []agent.StatusUpdate) (*agent.BatchStatusResponse, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.batchCalls = append(m.batchCalls, updates)
|
||||
if m.batchResp != nil {
|
||||
return m.batchResp, nil
|
||||
}
|
||||
results := make([]agent.StatusResponse, len(updates))
|
||||
return &agent.BatchStatusResponse{Results: results}, nil
|
||||
}
|
||||
|
||||
func TestProgressReporter_TrackUntrack(t *testing.T) {
|
||||
reporter := &mockStatusReporter{}
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
}
|
||||
|
||||
task := &Task{ID: "task-001", Status: StatusDownloading}
|
||||
pr.Track(task)
|
||||
|
||||
pr.mu.Lock()
|
||||
if _, ok := pr.latest["task-001"]; !ok {
|
||||
t.Error("task should be tracked")
|
||||
}
|
||||
pr.mu.Unlock()
|
||||
|
||||
pr.Untrack("task-001")
|
||||
|
||||
pr.mu.Lock()
|
||||
if _, ok := pr.latest["task-001"]; ok {
|
||||
t.Error("task should be untracked")
|
||||
}
|
||||
pr.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestProgressReporter_FlushReportsFinalStates(t *testing.T) {
|
||||
reporter := &mockStatusReporter{}
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
}
|
||||
|
||||
completed := &Task{ID: "task-completed-1234", Status: StatusCompleted}
|
||||
pr.Track(completed)
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
reporter.mu.Lock()
|
||||
defer reporter.mu.Unlock()
|
||||
if len(reporter.calls) != 1 {
|
||||
t.Fatalf("expected 1 report, got %d", len(reporter.calls))
|
||||
}
|
||||
if reporter.calls[0].TaskID != "task-completed-1234" {
|
||||
t.Errorf("reported wrong task: %s", reporter.calls[0].TaskID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_FlushSkipsWhenNotWatching(t *testing.T) {
|
||||
reporter := &mockStatusReporter{}
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
isWatching: func() bool { return false },
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
lastCheckAt: time.Now(), // not due for control check
|
||||
}
|
||||
|
||||
// Active downloading task, already reported as downloading
|
||||
task := &Task{ID: "task-active-12345678", Status: StatusDownloading}
|
||||
pr.Track(task)
|
||||
pr.mu.Lock()
|
||||
pr.lastReported["task-active-12345678"] = StatusDownloading
|
||||
pr.mu.Unlock()
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
reporter.mu.Lock()
|
||||
defer reporter.mu.Unlock()
|
||||
if len(reporter.calls) != 0 {
|
||||
t.Errorf("expected 0 reports when not watching (no transition), got %d", len(reporter.calls))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_FlushReportsTransitions(t *testing.T) {
|
||||
reporter := &mockStatusReporter{}
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
isWatching: func() bool { return false },
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
lastCheckAt: time.Now(),
|
||||
}
|
||||
|
||||
// Task transitioning from resolving to downloading
|
||||
task := &Task{ID: "task-trans-12345678", Status: StatusDownloading}
|
||||
pr.Track(task)
|
||||
pr.mu.Lock()
|
||||
pr.lastReported["task-trans-12345678"] = StatusResolving
|
||||
pr.mu.Unlock()
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
reporter.mu.Lock()
|
||||
defer reporter.mu.Unlock()
|
||||
if len(reporter.calls) != 1 {
|
||||
t.Fatalf("expected 1 report for transition, got %d", len(reporter.calls))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_FlushActiveWhenWatching(t *testing.T) {
|
||||
reporter := &mockStatusReporter{}
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
isWatching: func() bool { return true },
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
}
|
||||
|
||||
task := &Task{ID: "task-watch-12345678", Status: StatusDownloading}
|
||||
pr.Track(task)
|
||||
pr.mu.Lock()
|
||||
pr.lastReported["task-watch-12345678"] = StatusDownloading
|
||||
pr.mu.Unlock()
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
reporter.mu.Lock()
|
||||
defer reporter.mu.Unlock()
|
||||
if len(reporter.calls) != 1 {
|
||||
t.Fatalf("expected 1 report when watching active task, got %d", len(reporter.calls))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_HandleResponseCancel(t *testing.T) {
|
||||
reporter := &mockStatusReporter{
|
||||
resp: &agent.StatusResponse{Cancelled: true},
|
||||
}
|
||||
|
||||
var cancelledID string
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
onCancel: func(id string) { cancelledID = id },
|
||||
}
|
||||
|
||||
task := &Task{ID: "task-cancel-1234567", Status: StatusCompleted}
|
||||
pr.Track(task)
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
if cancelledID != "task-cancel-1234567" {
|
||||
t.Errorf("expected cancel handler called with task ID, got %q", cancelledID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_HandleResponsePause(t *testing.T) {
|
||||
reporter := &mockStatusReporter{
|
||||
resp: &agent.StatusResponse{Paused: true},
|
||||
}
|
||||
|
||||
var pausedID string
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
onPause: func(id string) { pausedID = id },
|
||||
}
|
||||
|
||||
task := &Task{ID: "task-paused-1234567", Status: StatusCompleted}
|
||||
pr.Track(task)
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
if pausedID != "task-paused-1234567" {
|
||||
t.Errorf("expected pause handler called, got %q", pausedID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_HandleResponseDeleteFiles(t *testing.T) {
|
||||
reporter := &mockStatusReporter{
|
||||
resp: &agent.StatusResponse{Cancelled: true, DeleteFiles: true},
|
||||
}
|
||||
|
||||
var deletedID string
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
onDeleteFiles: func(id string) { deletedID = id },
|
||||
}
|
||||
|
||||
task := &Task{ID: "task-delete-1234567", Status: StatusCompleted}
|
||||
pr.Track(task)
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
if deletedID != "task-delete-1234567" {
|
||||
t.Errorf("expected deleteFiles handler called, got %q", deletedID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_HandleResponseStream(t *testing.T) {
|
||||
reporter := &mockStatusReporter{
|
||||
resp: &agent.StatusResponse{StreamRequested: true},
|
||||
}
|
||||
|
||||
var streamID string
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
onStreamRequested: func(id string) { streamID = id },
|
||||
}
|
||||
|
||||
// Task with no stream URL yet
|
||||
task := &Task{ID: "task-stream-1234567", Status: StatusCompleted}
|
||||
pr.Track(task)
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
if streamID != "task-stream-1234567" {
|
||||
t.Errorf("expected stream handler called, got %q", streamID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_HandleResponseWatchingChanged(t *testing.T) {
|
||||
reporter := &mockStatusReporter{
|
||||
resp: &agent.StatusResponse{Watching: true},
|
||||
}
|
||||
|
||||
var watchingValue bool
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
onWatchingChanged: func(w bool) { watchingValue = w },
|
||||
}
|
||||
|
||||
task := &Task{ID: "task-watch2-1234567", Status: StatusCompleted}
|
||||
pr.Track(task)
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
if !watchingValue {
|
||||
t.Error("expected watchingChanged called with true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_BatchFlush(t *testing.T) {
|
||||
batcher := &mockBatchReporter{
|
||||
batchResp: &agent.BatchStatusResponse{
|
||||
Results: []agent.StatusResponse{{}, {}},
|
||||
},
|
||||
}
|
||||
|
||||
pr := &ProgressReporter{
|
||||
reporter: batcher,
|
||||
interval: time.Second,
|
||||
isWatching: func() bool { return true },
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
}
|
||||
|
||||
pr.Track(&Task{ID: "task-batch1-1234567", Status: StatusDownloading})
|
||||
pr.Track(&Task{ID: "task-batch2-1234567", Status: StatusDownloading})
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
batcher.mu.Lock()
|
||||
defer batcher.mu.Unlock()
|
||||
|
||||
if len(batcher.batchCalls) != 1 {
|
||||
t.Fatalf("expected 1 batch call, got %d", len(batcher.batchCalls))
|
||||
}
|
||||
if len(batcher.batchCalls[0]) != 2 {
|
||||
t.Errorf("expected 2 updates in batch, got %d", len(batcher.batchCalls[0]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_RunStopsOnCancel(t *testing.T) {
|
||||
reporter := &mockStatusReporter{}
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: 50 * time.Millisecond,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
err := pr.Run(ctx)
|
||||
if err != nil {
|
||||
t.Errorf("Run should return nil on context cancel, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_ReportFinal(t *testing.T) {
|
||||
reporter := &mockStatusReporter{}
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
}
|
||||
|
||||
task := &Task{ID: "task-final-12345678", Status: StatusCompleted}
|
||||
pr.Track(task)
|
||||
|
||||
pr.ReportFinal(context.Background(), task)
|
||||
|
||||
reporter.mu.Lock()
|
||||
defer reporter.mu.Unlock()
|
||||
if len(reporter.calls) != 1 {
|
||||
t.Fatalf("expected 1 final report, got %d", len(reporter.calls))
|
||||
}
|
||||
|
||||
// Should be untracked after final report
|
||||
pr.mu.Lock()
|
||||
if _, ok := pr.latest["task-final-12345678"]; ok {
|
||||
t.Error("task should be untracked after ReportFinal")
|
||||
}
|
||||
pr.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestProgressReporter_SetHandlers(t *testing.T) {
|
||||
pr := &ProgressReporter{
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
}
|
||||
|
||||
pr.SetCancelHandler(func(id string) {})
|
||||
pr.SetPauseHandler(func(id string) {})
|
||||
pr.SetDeleteFilesHandler(func(id string) {})
|
||||
pr.SetStreamRequestedHandler(func(id string) {})
|
||||
pr.SetWatchingFunc(func() bool { return true })
|
||||
pr.SetWatchingChangedHandler(func(w bool) {})
|
||||
|
||||
if pr.onCancel == nil || pr.onPause == nil || pr.onDeleteFiles == nil ||
|
||||
pr.onStreamRequested == nil || pr.isWatching == nil || pr.onWatchingChanged == nil {
|
||||
t.Error("expected all handlers to be set")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressReporter_ControlCheckDue(t *testing.T) {
|
||||
reporter := &mockStatusReporter{}
|
||||
pr := &ProgressReporter{
|
||||
reporter: reporter,
|
||||
interval: time.Second,
|
||||
isWatching: func() bool { return false },
|
||||
latest: make(map[string]*Task),
|
||||
lastReported: make(map[string]TaskStatus),
|
||||
lastCheckAt: time.Now().Add(-31 * time.Second), // 31s ago - due for control check
|
||||
}
|
||||
|
||||
task := &Task{ID: "task-ctrl-123456789", Status: StatusDownloading}
|
||||
pr.Track(task)
|
||||
pr.mu.Lock()
|
||||
pr.lastReported["task-ctrl-123456789"] = StatusDownloading
|
||||
pr.mu.Unlock()
|
||||
|
||||
pr.flush(context.Background())
|
||||
|
||||
reporter.mu.Lock()
|
||||
defer reporter.mu.Unlock()
|
||||
if len(reporter.calls) != 1 {
|
||||
t.Errorf("expected 1 report for control check, got %d", len(reporter.calls))
|
||||
}
|
||||
}
|
||||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
|
|
@ -24,11 +25,12 @@ type fileProvider interface {
|
|||
|
||||
// StreamServer serves a torrent file over HTTP with Range request support.
|
||||
type StreamServer struct {
|
||||
provider fileProvider
|
||||
server *http.Server
|
||||
port int
|
||||
url string
|
||||
upnpMapping *UPnPMapping
|
||||
provider fileProvider
|
||||
server *http.Server
|
||||
port int
|
||||
url string
|
||||
upnpMapping *UPnPMapping
|
||||
lastActivity atomic.Int64 // UnixNano of last HTTP request
|
||||
}
|
||||
|
||||
// NewStreamServer creates a new HTTP server for streaming via StreamEngine.
|
||||
|
|
@ -93,11 +95,38 @@ func NewStreamServerFromDisk(filePath string, port int) *StreamServer {
|
|||
}
|
||||
}
|
||||
|
||||
// Start begins serving the file on all interfaces. Returns the best reachable URL:
|
||||
// 1. UPnP public IP (accessible from anywhere on the internet)
|
||||
// 2. Tailscale IP (accessible from any device in the tailnet)
|
||||
// 3. LAN IP (accessible from local network)
|
||||
// FindVideoFile scans a directory (recursively) for the largest video file.
|
||||
// Returns empty string if no video file found.
|
||||
func FindVideoFile(dir string) string {
|
||||
var best string
|
||||
var bestSize int64
|
||||
|
||||
filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
|
||||
if err != nil || d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
ext := strings.ToLower(filepath.Ext(d.Name()))
|
||||
if !VideoExts[ext] {
|
||||
return nil
|
||||
}
|
||||
info, err := d.Info()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
if info.Size() > bestSize {
|
||||
best = path
|
||||
bestSize = info.Size()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return best
|
||||
}
|
||||
|
||||
// Start begins serving the file on all interfaces. Returns the best reachable URL.
|
||||
// The file is served as-is — the user's media player (VLC, mpv, etc.) handles decoding.
|
||||
func (ss *StreamServer) Start(ctx context.Context) (string, error) {
|
||||
ss.lastActivity.Store(time.Now().UnixNano())
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/stream", ss.handler)
|
||||
|
||||
|
|
@ -107,19 +136,9 @@ func (ss *StreamServer) Start(ctx context.Context) (string, error) {
|
|||
return "", fmt.Errorf("listen on %s: %w", addr, err)
|
||||
}
|
||||
|
||||
// Extract actual port (important when port=0)
|
||||
ss.port = listener.Addr().(*net.TCPAddr).Port
|
||||
|
||||
// Try UPnP for public internet access (like Plex Remote Access)
|
||||
if mapping, upnpErr := setupUPnP(ss.port); upnpErr == nil {
|
||||
ss.upnpMapping = mapping
|
||||
ss.url = fmt.Sprintf("http://%s:%d/stream", mapping.ExternalIP, mapping.ExternalPort)
|
||||
log.Printf("stream: UPnP mapped %s:%d -> local:%d", mapping.ExternalIP, mapping.ExternalPort, ss.port)
|
||||
} else {
|
||||
// Fallback: Tailscale IP > LAN IP > 127.0.0.1
|
||||
ss.url = fmt.Sprintf("http://%s:%d/stream", reachableIP(), ss.port)
|
||||
log.Printf("stream: UPnP unavailable (%v), using %s", upnpErr, ss.url)
|
||||
}
|
||||
ss.url = fmt.Sprintf("http://%s:%d/stream", reachableIP(), ss.port)
|
||||
log.Printf("stream: serving on %s", ss.url)
|
||||
|
||||
ss.server = &http.Server{
|
||||
Handler: mux,
|
||||
|
|
@ -141,6 +160,15 @@ func (ss *StreamServer) URL() string { return ss.url }
|
|||
// Port returns the bound port.
|
||||
func (ss *StreamServer) Port() int { return ss.port }
|
||||
|
||||
// IdleSince returns how long since the last HTTP request was received.
|
||||
func (ss *StreamServer) IdleSince() time.Duration {
|
||||
last := ss.lastActivity.Load()
|
||||
if last == 0 {
|
||||
return 0
|
||||
}
|
||||
return time.Since(time.Unix(0, last))
|
||||
}
|
||||
|
||||
// Shutdown gracefully stops the HTTP server and removes the UPnP port mapping.
|
||||
func (ss *StreamServer) Shutdown(ctx context.Context) error {
|
||||
ss.upnpMapping.Remove()
|
||||
|
|
@ -151,6 +179,8 @@ func (ss *StreamServer) Shutdown(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) {
|
||||
ss.lastActivity.Store(time.Now().UnixNano())
|
||||
|
||||
// CORS headers — only when browser sends Origin (HTTPS site → localhost)
|
||||
if origin := r.Header.Get("Origin"); origin != "" {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue