From 0dafeaa70d61024d89c151da2b19ae291927731a Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Wed, 1 Apr 2026 12:16:45 +0200 Subject: [PATCH] feat(stream): report watch progress to API via HTTP Range tracking Track the highest byte offset served by the stream server to estimate playback progress (0-100%). A WatchReporter goroutine sends progress to POST /api/internal/agent/watch-progress every 10s during streaming. - Add maxByteOffset + totalFileSize to StreamServer for Range tracking - Add FileSize() to fileProvider interface (all 3 providers) - New WatchReporter: periodic progress reporter tied to daemon context - New WatchProgressUpdate type with optional progress/position/duration - Wire reporter into all 3 stream paths (task stream, disk stream, active download stream) --- internal/agent/client.go | 9 ++ internal/agent/types.go | 21 +++ internal/cmd/daemon.go | 17 ++- internal/cmd/stream_handler.go | 8 +- internal/engine/stream.go | 3 + internal/engine/stream_server.go | 74 ++++++++++- internal/engine/watch_reporter.go | 68 ++++++++++ internal/engine/watch_reporter_test.go | 176 +++++++++++++++++++++++++ 8 files changed, 366 insertions(+), 10 deletions(-) create mode 100644 internal/engine/watch_reporter.go create mode 100644 internal/engine/watch_reporter_test.go diff --git a/internal/agent/client.go b/internal/agent/client.go index 9fd6ec8..7da6fcd 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -178,6 +178,15 @@ func (c *Client) SyncLibrary(ctx context.Context, req LibrarySyncRequest) (*Libr return &resp, nil } +// ReportWatchProgress sends playback position to the server for watch tracking. +func (c *Client) ReportWatchProgress(ctx context.Context, update WatchProgressUpdate) error { + var resp WatchProgressResponse + if err := c.doPost(ctx, "/api/internal/agent/watch-progress", update, &resp); err != nil { + return fmt.Errorf("watch progress: %w", err) + } + return nil +} + // doPost sends a JSON POST request and decodes the response. func (c *Client) doPost(ctx context.Context, path string, body any, dst any) error { jsonBody, err := json.Marshal(body) diff --git a/internal/agent/types.go b/internal/agent/types.go index 616f23f..09bf9bf 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -304,3 +304,24 @@ type LibrarySyncResponse struct { Matched int `json:"matched"` Removed int `json:"removed"` } + +// --------------------------------------------------------------------------- +// Watch progress types (used by stream tracking) +// --------------------------------------------------------------------------- + +// WatchProgressUpdate reports playback position during streaming. +// Two modes: +// - Estimated (range): set Progress (0-100). Position/Duration omitted. +// - Precise (browser): set Position + Duration in seconds. Progress computed server-side. +type WatchProgressUpdate struct { + TaskID string `json:"taskId"` + Source string `json:"source"` // "range" or "browser" + Progress *int `json:"progress,omitempty"` // 0-100 (range source) + Position *int `json:"position,omitempty"` // seconds (browser source) + Duration *int `json:"duration,omitempty"` // seconds (browser source) +} + +// WatchProgressResponse is returned after reporting watch progress. +type WatchProgressResponse struct { + Success bool `json:"success"` +} diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 4024311..61ca65e 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -174,6 +174,13 @@ func runDaemonStart() error { // Create daemon — always uses Transport interface d := agent.NewDaemon(daemonCfg, transport) + // Create agent client for watch progress reporting + agentClient := agent.NewClient(cfg.Auth.APIURL, cfg.Auth.APIKey, userAgent) + + // Daemon-scoped context — cancelled on shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create progress reporter using transport reporter := engine.NewProgressReporterWithTransport(transport, statusInterval) reporter.SetWatchingFunc(func() bool { return d.Watching.Load() }) @@ -266,18 +273,19 @@ func runDaemonStart() error { streamRegistry.servers[taskID] = srv streamRegistry.mu.Unlock() task.SetStreamURL(srv.URL()) + + // Start watch progress reporter + go engine.NewWatchReporter(agentClient, srv, taskID).Run(ctx) }) // Wire: daemon claimed tasks -> manager - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() d.OnTasksClaimed = func(tasks []agent.Task) { for _, t := range tasks { if t.Mode == "stream" { // Only 1 stream at a time: cancel all existing streams cancelAllStreams() - go handleStreamTask(ctx, t, reporter, cfg) + go handleStreamTask(ctx, t, reporter, cfg, agentClient) } else if t.ForceStart || manager.HasCapacity() { manager.Submit(ctx, t) } else { @@ -322,6 +330,9 @@ func runDaemonStart() error { log.Printf("[%s] streaming from disk: %s → %s", sr.TaskID[:8], filepath.Base(sr.FilePath), streamURL) + // Start watch progress reporter + go engine.NewWatchReporter(agentClient, srv, sr.TaskID).Run(ctx) + // Report stream URL back to the server via transport go func() { if _, err := transport.SendProgress(ctx, agent.StatusUpdate{ diff --git a/internal/cmd/stream_handler.go b/internal/cmd/stream_handler.go index 88f3111..7a2705a 100644 --- a/internal/cmd/stream_handler.go +++ b/internal/cmd/stream_handler.go @@ -55,7 +55,7 @@ func cancelStreamTask(taskID string) { // handleStreamTask manages a streaming task lifecycle outside the Manager. // It creates a StreamEngine, buffers, starts an HTTP server, and reports // progress until the task is cancelled or the download completes. -func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine.ProgressReporter, cfg config.Config) { +func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine.ProgressReporter, cfg config.Config, agentClient *agent.Client) { ctx, cancel := context.WithCancel(parentCtx) defer cancel() @@ -121,6 +121,12 @@ func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine task.StreamURL = streamURL log.Printf("[%s] stream ready: %s", at.ID[:8], streamURL) + // 5b. Start watch progress reporter (tracks Range requests for playback position) + if agentClient != nil { + watchReporter := engine.NewWatchReporter(agentClient, srv, at.ID) + go watchReporter.Run(ctx) + } + // 6. Unified progress + idle timeout loop eng.StartProgressLoop(ctx) progressTicker := time.NewTicker(3 * time.Second) diff --git a/internal/engine/stream.go b/internal/engine/stream.go index aa69e43..bfb131d 100644 --- a/internal/engine/stream.go +++ b/internal/engine/stream.go @@ -297,6 +297,9 @@ func (s *StreamEngine) FileName() string { return s.fileName } // FileLength returns the total size of the selected file in bytes. func (s *StreamEngine) FileLength() int64 { return s.totalBytes } +// FileSize implements fileProvider for StreamServer compatibility. +func (s *StreamEngine) FileSize() int64 { return s.totalBytes } + // BufferTarget returns the buffer threshold in bytes. func (s *StreamEngine) BufferTarget() int64 { return s.bufferTarget } diff --git a/internal/engine/stream_server.go b/internal/engine/stream_server.go index e85cb13..33995fa 100644 --- a/internal/engine/stream_server.go +++ b/internal/engine/stream_server.go @@ -10,6 +10,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "strings" "sync/atomic" "time" @@ -21,16 +22,19 @@ import ( type fileProvider interface { NewFileReader(ctx context.Context) io.ReadSeekCloser FileName() string + FileSize() int64 } // StreamServer serves a torrent file over HTTP with Range request support. type StreamServer struct { - provider fileProvider - server *http.Server - port int - url string - upnpMapping *UPnPMapping - lastActivity atomic.Int64 // UnixNano of last HTTP request + provider fileProvider + server *http.Server + port int + url string + upnpMapping *UPnPMapping + lastActivity atomic.Int64 // UnixNano of last HTTP request + maxByteOffset atomic.Int64 // highest byte offset served (for watch progress estimation) + totalFileSize int64 // total file size in bytes (set on Start) } // NewStreamServer creates a new HTTP server for streaming via StreamEngine. @@ -67,6 +71,10 @@ func (p *torrentFileProvider) FileName() string { return filepath.Base(p.file.DisplayPath()) } +func (p *torrentFileProvider) FileSize() int64 { + return p.file.Length() +} + // diskFileProvider serves a file from disk. type diskFileProvider struct { path string @@ -84,6 +92,14 @@ func (p *diskFileProvider) NewFileReader(_ context.Context) io.ReadSeekCloser { func (p *diskFileProvider) FileName() string { return p.name } +func (p *diskFileProvider) FileSize() int64 { + fi, err := os.Stat(p.path) + if err != nil { + return 0 + } + return fi.Size() +} + // NewStreamServerFromDisk creates a server that streams a file from disk. func NewStreamServerFromDisk(filePath string, port int) *StreamServer { return &StreamServer{ @@ -126,6 +142,7 @@ func FindVideoFile(dir string) string { // The file is served as-is — the user's media player (VLC, mpv, etc.) handles decoding. func (ss *StreamServer) Start(ctx context.Context) (string, error) { ss.lastActivity.Store(time.Now().UnixNano()) + ss.totalFileSize = ss.provider.FileSize() mux := http.NewServeMux() mux.HandleFunc("/stream", ss.handler) @@ -181,6 +198,18 @@ func (ss *StreamServer) Shutdown(ctx context.Context) error { func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { ss.lastActivity.Store(time.Now().UnixNano()) + // Track Range header for watch progress estimation + if rangeHeader := r.Header.Get("Range"); rangeHeader != "" { + if start := parseRangeStart(rangeHeader); start >= 0 { + for { + cur := ss.maxByteOffset.Load() + if start <= cur || ss.maxByteOffset.CompareAndSwap(cur, start) { + break + } + } + } + } + // CORS headers — only when browser sends Origin (HTTPS site → localhost) if origin := r.Header.Get("Origin"); origin != "" { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -206,6 +235,39 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { http.ServeContent(w, r, ss.provider.FileName(), time.Time{}, reader) } +// EstimatedProgress returns an estimated watch progress based on HTTP Range requests. +// Returns (position, duration) where both are 0-100 scale (percentage-based). +func (ss *StreamServer) EstimatedProgress() (position int, duration int) { + total := ss.totalFileSize + if total <= 0 { + return 0, 0 + } + maxOffset := ss.maxByteOffset.Load() + pct := int(float64(maxOffset) / float64(total) * 100) + if pct > 100 { + pct = 100 + } + return pct, 100 +} + +// parseRangeStart extracts the start byte from a "Range: bytes=START-" header. +func parseRangeStart(rangeHeader string) int64 { + // Format: "bytes=START-" or "bytes=START-END" + after, found := strings.CutPrefix(rangeHeader, "bytes=") + if !found { + return -1 + } + dashIdx := strings.IndexByte(after, '-') + if dashIdx < 0 { + return -1 + } + start, err := strconv.ParseInt(after[:dashIdx], 10, 64) + if err != nil { + return -1 + } + return start +} + // reachableIP returns the best IP to use for the stream URL, in priority order: // 1. Tailscale IP (100.x.x.x) — accessible from anywhere via Tailscale mesh // 2. LAN IP — accessible from local network diff --git a/internal/engine/watch_reporter.go b/internal/engine/watch_reporter.go new file mode 100644 index 0000000..e7fa4da --- /dev/null +++ b/internal/engine/watch_reporter.go @@ -0,0 +1,68 @@ +package engine + +import ( + "context" + "log" + "time" + + "github.com/torrentclaw/unarr/internal/agent" +) + +// WatchReporter periodically sends watch progress to the API based on +// HTTP Range request tracking from the StreamServer. +type WatchReporter struct { + client *agent.Client + server *StreamServer + taskID string + lastSentPct int // last progress percentage reported (0-100) +} + +// NewWatchReporter creates a reporter that tracks playback progress via Range offsets. +func NewWatchReporter(client *agent.Client, server *StreamServer, taskID string) *WatchReporter { + return &WatchReporter{ + client: client, + server: server, + taskID: taskID, + } +} + +// Run reports watch progress every 10 seconds until the context is cancelled. +// A final report is sent on shutdown using a short independent timeout. +func (wr *WatchReporter) Run(ctx context.Context) { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + // Final report on shutdown — use background context since parent is cancelled. + finalCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + wr.sendReport(finalCtx) + cancel() + return + case <-ticker.C: + wr.sendReport(ctx) + } + } +} + +func (wr *WatchReporter) sendReport(ctx context.Context) { + pct, _ := wr.server.EstimatedProgress() + if pct == 0 || pct == wr.lastSentPct { + return + } + + wr.lastSentPct = pct + update := agent.WatchProgressUpdate{ + TaskID: wr.taskID, + Source: "range", + Progress: &pct, + } + + reportCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + if err := wr.client.ReportWatchProgress(reportCtx, update); err != nil { + log.Printf("[%s] watch-progress: report failed: %v", wr.taskID[:8], err) + } +} diff --git a/internal/engine/watch_reporter_test.go b/internal/engine/watch_reporter_test.go new file mode 100644 index 0000000..80a6e78 --- /dev/null +++ b/internal/engine/watch_reporter_test.go @@ -0,0 +1,176 @@ +package engine + +import ( + "context" + "net/http" + "os" + "testing" +) + +// --------------------------------------------------------------------------- +// parseRangeStart +// --------------------------------------------------------------------------- + +func TestParseRangeStart(t *testing.T) { + tests := []struct { + header string + want int64 + }{ + {"bytes=0-", 0}, + {"bytes=1024-", 1024}, + {"bytes=5000-9999", 5000}, + {"bytes=1048576-", 1048576}, + {"", -1}, + {"invalid", -1}, + {"bytes=", -1}, + {"bytes=-500", -1}, + } + + for _, tc := range tests { + got := parseRangeStart(tc.header) + if got != tc.want { + t.Errorf("parseRangeStart(%q) = %d, want %d", tc.header, got, tc.want) + } + } +} + +// --------------------------------------------------------------------------- +// StreamServer.EstimatedProgress +// --------------------------------------------------------------------------- + +func TestEstimatedProgress_NoFile(t *testing.T) { + ss := &StreamServer{} + pos, dur := ss.EstimatedProgress() + if pos != 0 || dur != 0 { + t.Errorf("expected (0, 0), got (%d, %d)", pos, dur) + } +} + +func TestEstimatedProgress_HalfWay(t *testing.T) { + ss := &StreamServer{totalFileSize: 1000} + ss.maxByteOffset.Store(500) + + pos, dur := ss.EstimatedProgress() + if pos != 50 || dur != 100 { + t.Errorf("expected (50, 100), got (%d, %d)", pos, dur) + } +} + +func TestEstimatedProgress_CapsAt100(t *testing.T) { + ss := &StreamServer{totalFileSize: 1000} + ss.maxByteOffset.Store(1500) + + pos, dur := ss.EstimatedProgress() + if pos != 100 || dur != 100 { + t.Errorf("expected (100, 100), got (%d, %d)", pos, dur) + } +} + +// --------------------------------------------------------------------------- +// maxByteOffset only increases (simulated Range tracking) +// --------------------------------------------------------------------------- + +func TestMaxByteOffsetNeverRegresses(t *testing.T) { + ss := &StreamServer{totalFileSize: 10000} + + offsets := []int64{0, 2000, 5000, 3000, 8000, 4000} + for _, off := range offsets { + for { + cur := ss.maxByteOffset.Load() + if off <= cur || ss.maxByteOffset.CompareAndSwap(cur, off) { + break + } + } + } + + if ss.maxByteOffset.Load() != 8000 { + t.Errorf("expected 8000, got %d", ss.maxByteOffset.Load()) + } +} + +// --------------------------------------------------------------------------- +// End-to-end: real HTTP server with Range requests +// --------------------------------------------------------------------------- + +func TestStreamServerRangeTracking(t *testing.T) { + // Create temp file (10 KB) + tmpFile := t.TempDir() + "/test.mp4" + data := make([]byte, 10240) + for i := range data { + data[i] = byte(i % 256) + } + if err := os.WriteFile(tmpFile, data, 0o644); err != nil { + t.Fatal(err) + } + + srv := NewStreamServerFromDisk(tmpFile, 0) + ctx := context.Background() + url, err := srv.Start(ctx) + if err != nil { + t.Fatalf("start: %v", err) + } + defer srv.Shutdown(ctx) + + // 1. Non-range GET — maxByteOffset stays 0 + resp, err := http.Get(url) + if err != nil { + t.Fatalf("GET: %v", err) + } + resp.Body.Close() + + if srv.maxByteOffset.Load() != 0 { + t.Errorf("non-range: expected 0, got %d", srv.maxByteOffset.Load()) + } + + // 2. Range: bytes=5000- → offset 5000 + req, _ := http.NewRequest("GET", url, nil) + req.Header.Set("Range", "bytes=5000-") + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Range GET: %v", err) + } + resp.Body.Close() + + if resp.StatusCode != http.StatusPartialContent { + t.Errorf("expected 206, got %d", resp.StatusCode) + } + if srv.maxByteOffset.Load() != 5000 { + t.Errorf("expected 5000, got %d", srv.maxByteOffset.Load()) + } + + // 3. Higher offset + req, _ = http.NewRequest("GET", url, nil) + req.Header.Set("Range", "bytes=8000-") + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Range GET 2: %v", err) + } + resp.Body.Close() + + if srv.maxByteOffset.Load() != 8000 { + t.Errorf("expected 8000, got %d", srv.maxByteOffset.Load()) + } + + // 4. Lower offset does NOT regress + req, _ = http.NewRequest("GET", url, nil) + req.Header.Set("Range", "bytes=2000-") + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Range GET 3: %v", err) + } + resp.Body.Close() + + if srv.maxByteOffset.Load() != 8000 { + t.Errorf("expected still 8000, got %d", srv.maxByteOffset.Load()) + } + + // 5. Verify progress estimate + pos, dur := srv.EstimatedProgress() + // 8000/10240 = 78.1% → 78 + if pos < 78 || pos > 79 { + t.Errorf("expected pos ~78, got %d", pos) + } + if dur != 100 { + t.Errorf("expected dur=100, got %d", dur) + } +}