diff --git a/internal/agent/daemon.go b/internal/agent/daemon.go index c160da3..3fe8a75 100644 --- a/internal/agent/daemon.go +++ b/internal/agent/daemon.go @@ -20,6 +20,9 @@ type DaemonConfig struct { DownloadDir string PollInterval time.Duration HeartbeatInterval time.Duration + StreamPort int // port for the HTTP stream server (reported in heartbeat) + LanIP string // LAN IP (reported in heartbeat for stream URL resolution) + TailscaleIP string // Tailscale IP (reported in heartbeat for stream URL resolution) } // Daemon manages the main loop: register, heartbeat, poll tasks. @@ -211,6 +214,9 @@ func (d *Daemon) heartbeat(ctx context.Context) { Version: d.cfg.Version, OS: runtime.GOOS, DownloadDir: d.cfg.DownloadDir, + StreamPort: d.cfg.StreamPort, + LanIP: d.cfg.LanIP, + TailscaleIP: d.cfg.TailscaleIP, } if free, total, err := DiskInfo(d.cfg.DownloadDir); err == nil { req.DiskFreeBytes = free @@ -297,6 +303,12 @@ func (d *Daemon) handleEvent(event ServerEvent) { } } +// UpdateStreamPort updates the stream port reported in heartbeats. +// Called after the persistent stream server binds (actual port may differ from configured). +func (d *Daemon) UpdateStreamPort(port int) { + d.cfg.StreamPort = port +} + // TriggerPoll requests an immediate task poll cycle. // Used when a resume event is received to pick up re-pending tasks faster. func (d *Daemon) TriggerPoll() { diff --git a/internal/agent/transport_ws.go b/internal/agent/transport_ws.go index 65c9870..9d50f9e 100644 --- a/internal/agent/transport_ws.go +++ b/internal/agent/transport_ws.go @@ -178,6 +178,7 @@ func (t *WSTransport) SendProgress(_ context.Context, update StatusUpdate) (*Sta FileName string `json:"fileName,omitempty"` FilePath string `json:"filePath,omitempty"` StreamURL string `json:"streamUrl,omitempty"` + StreamReady bool `json:"streamReady,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` }{ Type: "progress", @@ -192,6 +193,7 @@ func (t *WSTransport) SendProgress(_ context.Context, update StatusUpdate) (*Sta FileName: update.FileName, FilePath: update.FilePath, StreamURL: update.StreamURL, + StreamReady: update.StreamReady, ErrorMessage: update.ErrorMessage, } diff --git a/internal/agent/types.go b/internal/agent/types.go index 7cc8781..51cef2b 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -56,6 +56,9 @@ type HeartbeatRequest struct { DownloadDir string `json:"downloadDir,omitempty"` DiskFreeBytes int64 `json:"diskFreeBytes,omitempty"` DiskTotalBytes int64 `json:"diskTotalBytes,omitempty"` + StreamPort int `json:"streamPort,omitempty"` + LanIP string `json:"lanIp,omitempty"` + TailscaleIP string `json:"tailscaleIp,omitempty"` } // Task represents a download task claimed from the server. @@ -107,6 +110,7 @@ type StatusUpdate struct { FileName string `json:"fileName,omitempty"` FilePath string `json:"filePath,omitempty"` StreamURL string `json:"streamUrl,omitempty"` + StreamReady bool `json:"streamReady,omitempty"` ErrorMessage string `json:"errorMessage,omitempty"` } diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 55b37c5..c1887e2 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -151,6 +151,9 @@ func runDaemonStart() error { DownloadDir: cfg.Download.Dir, PollInterval: pollInterval, HeartbeatInterval: heartbeatInterval, + StreamPort: cfg.Download.StreamPort, + LanIP: engine.LanIP(), + TailscaleIP: engine.TailscaleIP(), } // Create transport: Hybrid (WS + HTTP fallback) or HTTP-only @@ -236,6 +239,15 @@ func runDaemonStart() error { }, }, reporter, torrentDl, debridDl, engine.NewUsenetDownloader(httpT.Client())) + // Create persistent stream server — lives for the entire daemon lifecycle. + // One port, one server, swap files with SetFile(). No more port churn. + streamSrv := engine.NewStreamServer(cfg.Download.StreamPort) + if err := streamSrv.Listen(ctx); err != nil { + return fmt.Errorf("start stream server: %w", err) + } + // Update heartbeat with actual port (may differ if configured port was busy) + d.UpdateStreamPort(streamSrv.Port()) + // Wire state tracking d.GetActiveCount = manager.ActiveCount d.GetCleanableBytes = CleanableBytes @@ -254,7 +266,7 @@ func runDaemonStart() error { cancelStreamTask(taskID) }) - // Wire: stream requested on active download → start HTTP server + // Wire: stream requested on active download → set file on persistent server reporter.SetStreamRequestedHandler(func(taskID string) { task := manager.GetTask(taskID) if task == nil { @@ -264,19 +276,18 @@ func runDaemonStart() error { if task.GetStreamURL() != "" { return // already streaming } - srv, err := torrentDl.StartStream(taskID) + provider, err := torrentDl.GetStreamProvider(taskID) if err != nil { log.Printf("[%s] stream failed: %v", taskID[:8], err) return } - // Register server before setting URL to avoid TOCTOU race - streamRegistry.mu.Lock() - streamRegistry.servers[taskID] = srv - streamRegistry.mu.Unlock() - task.SetStreamURL(srv.URL()) + cancelStreamContexts() + streamSrv.SetFile(provider, taskID) + task.SetStreamURL(streamSrv.URLsJSON()) + log.Printf("[%s] streaming active download: %s", taskID[:8], provider.FileName()) // Start watch progress reporter - go engine.NewWatchReporter(agentClient, srv, taskID).Run(ctx) + go engine.NewWatchReporter(agentClient, streamSrv, taskID).Run(ctx) }) // Wire: daemon claimed tasks -> manager @@ -288,15 +299,15 @@ func runDaemonStart() error { if isStreamingTask(t.ID) { continue } - // Only 1 stream at a time: cancel all existing streams - cancelAllStreams() + // Only 1 stream at a time: cancel existing stream goroutines + clear file + cancelStreamContexts() + streamSrv.ClearFile() // Reserve slot before spawning goroutine to prevent TOCTOU race. - // streamCancel is stored in streamRegistry and called by cancelAllStreams/cancelStreamTask. streamCtx, streamCancel := context.WithCancel(ctx) //nolint:gosec // G118: cancel ownership transferred to streamRegistry streamRegistry.mu.Lock() streamRegistry.cancels[t.ID] = streamCancel streamRegistry.mu.Unlock() - go handleStreamTask(streamCtx, t, reporter, cfg, agentClient) + go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv) } else if t.ForceStart || manager.HasCapacity() { manager.Submit(ctx, t) } else { @@ -305,16 +316,13 @@ func runDaemonStart() error { } } - // Wire: stream requests for completed downloads → serve file from disk + // Wire: stream requests for completed downloads → set file on persistent server d.OnStreamRequested = func(sr agent.StreamRequest) { - // Skip if already streaming this task - if isStreamingTask(sr.TaskID) { + // Skip if already serving this task + if streamSrv.CurrentTaskID() == sr.TaskID { return } - // Only 1 stream at a time: cancel all existing streams - cancelAllStreams() - filePath := sr.FilePath info, err := os.Stat(filePath) if err != nil { @@ -351,43 +359,24 @@ func runDaemonStart() error { log.Printf("[%s] resolved directory to video file: %s", sr.TaskID[:8], filepath.Base(filePath)) } - srv := engine.NewStreamServerFromDisk(filePath, cfg.Download.StreamPort) - streamURL, err := srv.Start(ctx) - if err != nil { - log.Printf("[%s] stream failed: %v", sr.TaskID[:8], err) - go func() { - if _, err := transport.SendProgress(ctx, agent.StatusUpdate{ - TaskID: sr.TaskID, - Status: "failed", - ErrorMessage: fmt.Sprintf("stream server start failed: %v", err), - }); err != nil { - log.Printf("[%s] stream error report failed: %v", sr.TaskID[:8], err) - } - }() - return - } + // Cancel any active stream goroutines and swap file on the persistent server + cancelStreamContexts() + streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID) - streamRegistry.mu.Lock() - streamRegistry.servers[sr.TaskID] = srv - streamRegistry.mu.Unlock() - - log.Printf("[%s] streaming from disk: %s → %s", sr.TaskID[:8], filepath.Base(sr.FilePath), streamURL) + log.Printf("[%s] streaming from disk: %s → %s", sr.TaskID[:8], filepath.Base(filePath), streamSrv.URL()) // Start watch progress reporter - go engine.NewWatchReporter(agentClient, srv, sr.TaskID).Run(ctx) + go engine.NewWatchReporter(agentClient, streamSrv, sr.TaskID).Run(ctx) - // Report stream URL back to the server via transport + // Notify server that stream is ready (clears streamRequested flag) go func() { if _, err := transport.SendProgress(ctx, agent.StatusUpdate{ - TaskID: sr.TaskID, - StreamURL: streamURL, + TaskID: sr.TaskID, + StreamReady: true, }); err != nil { - log.Printf("[%s] stream URL report failed: %v", sr.TaskID[:8], err) + log.Printf("[%s] stream ready report failed: %v", sr.TaskID[:8], err) } }() - - // Auto-shutdown after 30 min of idle (no HTTP requests) - go startIdleGuard(ctx, srv, sr.TaskID) } // Wire: WS control actions (pause/cancel/stream pushed from server) @@ -396,34 +385,41 @@ func runDaemonStart() error { case "cancel": manager.CancelTask(taskID) cancelStreamTask(taskID) + if streamSrv.CurrentTaskID() == taskID { + streamSrv.ClearFile() + } case "pause": manager.PauseTask(taskID) cancelStreamTask(taskID) + if streamSrv.CurrentTaskID() == taskID { + streamSrv.ClearFile() + } case "resume": log.Printf("[%s] resume requested via WebSocket, triggering poll", taskID[:8]) d.TriggerPoll() case "stream": // Skip if already streaming this task - if isStreamingTask(taskID) { + if streamSrv.CurrentTaskID() == taskID { return } task := manager.GetTask(taskID) if task == nil || task.GetStreamURL() != "" { return } - // Only 1 stream at a time: cancel all existing streams - cancelAllStreams() - srv, err := torrentDl.StartStream(taskID) + provider, err := torrentDl.GetStreamProvider(taskID) if err != nil { log.Printf("[%s] stream failed: %v", taskID[:8], err) return } - streamRegistry.mu.Lock() - streamRegistry.servers[taskID] = srv - streamRegistry.mu.Unlock() - task.SetStreamURL(srv.URL()) + cancelStreamContexts() + streamSrv.SetFile(provider, taskID) + task.SetStreamURL(streamSrv.URLsJSON()) + log.Printf("[%s] streaming via WS: %s", taskID[:8], provider.FileName()) case "stop-stream": cancelStreamTask(taskID) + if streamSrv.CurrentTaskID() == taskID { + streamSrv.ClearFile() + } } } @@ -477,10 +473,15 @@ func runDaemonStart() error { errCh <- d.Run(ctx) }() + // Start idle guard for the persistent stream server + go startIdleGuard(ctx, streamSrv) + // Wait for signal or error select { case sig := <-sigCh: fmt.Printf("\n Received %s, shutting down...\n", sig) + cancelStreamContexts() + streamSrv.Shutdown(context.Background()) cancel() // Give active downloads 30s to finish @@ -492,6 +493,8 @@ func runDaemonStart() error { return nil case err := <-errCh: + cancelStreamContexts() + streamSrv.Shutdown(context.Background()) cancel() return err } diff --git a/internal/cmd/stream.go b/internal/cmd/stream.go index 91d2fea..52af14e 100644 --- a/internal/cmd/stream.go +++ b/internal/cmd/stream.go @@ -127,14 +127,14 @@ func runStream(input string, port int, noOpen bool, playerCmd string) error { } // Start HTTP server - srv := engine.NewStreamServer(eng, port) - streamURL, err := srv.Start(ctx) - if err != nil { + srv := engine.NewStreamServer(port) + if err := srv.Listen(ctx); err != nil { eng.Shutdown(context.Background()) return fmt.Errorf("start server: %w", err) } + srv.SetFile(eng, "cli-stream") - fmt.Printf(" URL: %s\n", streamURL) + fmt.Printf(" URL: %s\n", srv.URL()) fmt.Println() // Buffer before opening player @@ -159,15 +159,15 @@ func runStream(input string, port int, noOpen bool, playerCmd string) error { // Open player if !noOpen { - playerName, _, openErr := engine.OpenPlayer(streamURL, playerCmd) + playerName, _, openErr := engine.OpenPlayer(srv.URL(), playerCmd) if openErr != nil { yellow.Printf(" Could not open player: %s\n", openErr) - fmt.Printf(" Open this URL in your player: %s\n", streamURL) + fmt.Printf(" Open this URL in your player: %s\n", srv.URL()) } else { green.Printf(" Opened in %s\n", playerName) } } else { - fmt.Printf(" Open this URL in your player: %s\n", streamURL) + fmt.Printf(" Open this URL in your player: %s\n", srv.URL()) } fmt.Println() diff --git a/internal/cmd/stream_handler.go b/internal/cmd/stream_handler.go index 0c8e3af..aec884b 100644 --- a/internal/cmd/stream_handler.go +++ b/internal/cmd/stream_handler.go @@ -16,8 +16,8 @@ import ( const streamIdleTimeout = 30 * time.Minute -// startIdleGuard monitors a stream server and cancels the task after inactivity. -func startIdleGuard(ctx context.Context, srv *engine.StreamServer, taskID string) { +// startIdleGuard monitors the persistent stream server and clears the file after inactivity. +func startIdleGuard(ctx context.Context, srv *engine.StreamServer) { ticker := time.NewTicker(60 * time.Second) defer ticker.Stop() for { @@ -25,78 +25,69 @@ func startIdleGuard(ctx context.Context, srv *engine.StreamServer, taskID string case <-ctx.Done(): return case <-ticker.C: - if srv.IdleSince() > streamIdleTimeout { - log.Printf("[%s] stream idle timeout (%v no HTTP requests), shutting down", taskID[:8], streamIdleTimeout) - cancelStreamTask(taskID) - return + if srv.HasFile() && srv.IdleSince() > streamIdleTimeout { + taskID := srv.CurrentTaskID() + short := taskID + if len(short) > 8 { + short = short[:8] + } + log.Printf("[%s] stream idle timeout (%v no HTTP requests), clearing file", short, streamIdleTimeout) + cancelStreamContexts() + srv.ClearFile() } } } } -// streamRegistry tracks active stream tasks and servers for cancellation. +// streamRegistry tracks active stream goroutine contexts for cancellation. +// There is only ONE persistent StreamServer — no per-task servers. var streamRegistry = struct { mu sync.Mutex cancels map[string]context.CancelFunc - servers map[string]*engine.StreamServer // servers for active download streams }{ cancels: make(map[string]context.CancelFunc), - servers: make(map[string]*engine.StreamServer), } -// cancelAllStreams cancels all active stream tasks and servers (only 1 stream at a time). -func cancelAllStreams() { +// cancelStreamContexts cancels all active stream goroutines (download engines, etc.). +// Does NOT touch the persistent server — call srv.ClearFile() separately if needed. +func cancelStreamContexts() { streamRegistry.mu.Lock() cancels := make(map[string]context.CancelFunc, len(streamRegistry.cancels)) for k, v := range streamRegistry.cancels { cancels[k] = v delete(streamRegistry.cancels, k) } - servers := make(map[string]*engine.StreamServer, len(streamRegistry.servers)) - for k, v := range streamRegistry.servers { - servers[k] = v - delete(streamRegistry.servers, k) - } streamRegistry.mu.Unlock() for _, cancel := range cancels { cancel() } - for _, srv := range servers { - srv.Shutdown(context.Background()) - } } -// isStreamingTask returns true if there is an active stream (goroutine or server) for the given task. +// isStreamingTask returns true if there is an active stream goroutine for the given task. func isStreamingTask(taskID string) bool { streamRegistry.mu.Lock() defer streamRegistry.mu.Unlock() - _, inCancels := streamRegistry.cancels[taskID] - _, inServers := streamRegistry.servers[taskID] - return inCancels || inServers + _, ok := streamRegistry.cancels[taskID] + return ok } -// cancelStreamTask cancels a running stream task and shuts down any stream server. +// cancelStreamTask cancels a specific stream goroutine. func cancelStreamTask(taskID string) { streamRegistry.mu.Lock() - cancel, hasCancel := streamRegistry.cancels[taskID] + cancel, ok := streamRegistry.cancels[taskID] delete(streamRegistry.cancels, taskID) - srv, hasSrv := streamRegistry.servers[taskID] - delete(streamRegistry.servers, taskID) streamRegistry.mu.Unlock() - if hasCancel { + if ok { cancel() } - if hasSrv { - srv.Shutdown(context.Background()) - } } -// handleStreamTask manages a streaming task lifecycle outside the Manager. -// It creates a StreamEngine, buffers, starts an HTTP server, and reports -// progress until the task is cancelled or the download completes. -func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine.ProgressReporter, cfg config.Config, agentClient *agent.Client) { +// handleStreamTask manages a streaming task lifecycle for active torrent downloads. +// It creates a StreamEngine, buffers, sets the file on the persistent server, +// and reports progress until the task is cancelled or the download completes. +func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine.ProgressReporter, cfg config.Config, agentClient *agent.Client, srv *engine.StreamServer) { ctx, cancel := context.WithCancel(parentCtx) defer cancel() @@ -108,6 +99,10 @@ func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine streamRegistry.mu.Lock() delete(streamRegistry.cancels, at.ID) streamRegistry.mu.Unlock() + // Clear file from persistent server if we're still the current task + if srv.CurrentTaskID() == at.ID { + srv.ClearFile() + } }() task := engine.NewTaskFromAgent(at) @@ -148,36 +143,18 @@ func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine return } - // 4. Start HTTP server - srv := engine.NewStreamServer(eng, cfg.Download.StreamPort) - streamURL, err := srv.Start(ctx) - if err != nil { - task.ErrorMessage = "start HTTP server: " + err.Error() - task.Transition(engine.StatusFailed) - return - } - streamRegistry.mu.Lock() - streamRegistry.servers[at.ID] = srv - streamRegistry.mu.Unlock() - defer func() { - srv.Shutdown(context.Background()) - streamRegistry.mu.Lock() - delete(streamRegistry.servers, at.ID) - streamRegistry.mu.Unlock() - }() - - // 5. Report stream URLs — JSON with all network options for smart resolution + // 4. Set file on the persistent stream server (instant, no port binding) + srv.SetFile(eng, at.ID) task.StreamURL = srv.URLsJSON() - log.Printf("[%s] stream ready: %s (primary: %s)", at.ID[:8], task.StreamURL, streamURL) + log.Printf("[%s] stream ready: %s (url: %s)", at.ID[:8], eng.FileName(), srv.URL()) - // 5b. Start watch progress reporter (tracks Range requests for playback position) + // 5. Start watch progress reporter if agentClient != nil { watchReporter := engine.NewWatchReporter(agentClient, srv, at.ID) go watchReporter.Run(ctx) } - // 6. Start idle guard + progress loop - go startIdleGuard(ctx, srv, at.ID) + // 6. Progress loop until download completes or cancelled eng.StartProgressLoop(ctx) progressTicker := time.NewTicker(3 * time.Second) defer progressTicker.Stop() diff --git a/internal/engine/stream.go b/internal/engine/stream.go index bfb131d..af644b7 100644 --- a/internal/engine/stream.go +++ b/internal/engine/stream.go @@ -297,7 +297,7 @@ func (s *StreamEngine) FileName() string { return s.fileName } // FileLength returns the total size of the selected file in bytes. func (s *StreamEngine) FileLength() int64 { return s.totalBytes } -// FileSize implements fileProvider for StreamServer compatibility. +// FileSize implements FileProvider for StreamServer compatibility. func (s *StreamEngine) FileSize() int64 { return s.totalBytes } // BufferTarget returns the buffer threshold in bytes. diff --git a/internal/engine/stream_server.go b/internal/engine/stream_server.go index c504366..ebd3f67 100644 --- a/internal/engine/stream_server.go +++ b/internal/engine/stream_server.go @@ -13,7 +13,9 @@ import ( "path/filepath" "strconv" "strings" + "sync" "sync/atomic" + "syscall" "time" "github.com/anacrolix/torrent" @@ -28,151 +30,83 @@ type StreamURLs struct { Public string `json:"pub,omitempty"` } -// fileProvider abstracts where to get a file reader for streaming. -type fileProvider interface { +// FileProvider abstracts where to get a file reader for streaming. +type FileProvider interface { NewFileReader(ctx context.Context) io.ReadSeekCloser FileName() string FileSize() int64 } -// StreamServer serves a torrent file over HTTP with Range request support. +// StreamServer is a persistent HTTP server that serves one file at a time. +// Start it once with Listen(), then swap files with SetFile()/ClearFile(). +// The server stays alive for the entire daemon lifecycle — no port churn. type StreamServer struct { - provider fileProvider - server *http.Server - port int - url string // best single URL (backward compat) - urls StreamURLs // all available URLs by network type - upnpMapping *UPnPMapping - disableUPnP bool // for testing - lastActivity atomic.Int64 // UnixNano of last HTTP request - maxByteOffset atomic.Int64 // highest byte offset served (for watch progress estimation) - totalFileSize int64 // total file size in bytes (set on Start) + mu sync.RWMutex + provider FileProvider + taskID string // current task being streamed + + server *http.Server + port int + url string // best single URL (backward compat) + urls StreamURLs // all available URLs by network type + upnpMapping *UPnPMapping + disableUPnP bool + + lastActivity atomic.Int64 + maxByteOffset atomic.Int64 + totalFileSize atomic.Int64 } -// NewStreamServer creates a new HTTP server for streaming via StreamEngine. -func NewStreamServer(engine *StreamEngine, port int) *StreamServer { - return &StreamServer{ - provider: engine, - port: port, - } +// NewStreamServer creates a stream server bound to the given port. +// Call Listen() to start accepting connections, then SetFile() to serve content. +func NewStreamServer(port int) *StreamServer { + return &StreamServer{port: port} } -// NewStreamServerFromFile creates a server that streams directly from a torrent.File. -// Used for streaming an active download without a separate StreamEngine. -func NewStreamServerFromFile(file *torrent.File, port int) *StreamServer { - return &StreamServer{ - provider: &torrentFileProvider{file: file}, - port: port, - } -} - -// torrentFileProvider wraps a torrent.File to implement fileProvider. -type torrentFileProvider struct { - file *torrent.File -} - -func (p *torrentFileProvider) NewFileReader(ctx context.Context) io.ReadSeekCloser { - reader := p.file.NewReader() - reader.SetResponsive() - reader.SetReadahead(5 * 1024 * 1024) - reader.SetContext(ctx) - return reader -} - -func (p *torrentFileProvider) FileName() string { - return filepath.Base(p.file.DisplayPath()) -} - -func (p *torrentFileProvider) FileSize() int64 { - return p.file.Length() -} - -// diskFileProvider serves a file from disk. -type diskFileProvider struct { - path string - name string -} - -func (p *diskFileProvider) NewFileReader(_ context.Context) io.ReadSeekCloser { - f, err := os.Open(p.path) - if err != nil { - log.Printf("stream: failed to open %q: %v", p.path, err) - return nil - } - return f -} - -func (p *diskFileProvider) FileName() string { return p.name } - -func (p *diskFileProvider) FileSize() int64 { - fi, err := os.Stat(p.path) - if err != nil { - log.Printf("stream: failed to stat %q: %v", p.path, err) - return 0 - } - return fi.Size() -} - -// NewStreamServerFromDisk creates a server that streams a file from disk. -func NewStreamServerFromDisk(filePath string, port int) *StreamServer { - return &StreamServer{ - provider: &diskFileProvider{ - path: filePath, - name: filepath.Base(filePath), - }, - port: port, - } -} - -// FindVideoFile scans a directory (recursively) for the largest video file. -// Returns empty string if no video file found. -func FindVideoFile(dir string) string { - var best string - var bestSize int64 - - filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error { - if err != nil || d.IsDir() { - return nil - } - ext := strings.ToLower(filepath.Ext(d.Name())) - if !VideoExts[ext] { - return nil - } - info, err := d.Info() - if err != nil { - return nil - } - if info.Size() > bestSize { - best = path - bestSize = info.Size() - } - return nil - }) - return best -} - -// Start begins serving the file on all interfaces. Returns the best reachable URL. -// The file is served as-is — the user's media player (VLC, mpv, etc.) handles decoding. -func (ss *StreamServer) Start(ctx context.Context) (string, error) { - ss.lastActivity.Store(time.Now().UnixNano()) - ss.totalFileSize = ss.provider.FileSize() - +// Listen starts the HTTP server on the configured port. Call once at daemon startup. +func (ss *StreamServer) Listen(ctx context.Context) error { mux := http.NewServeMux() mux.HandleFunc("/stream", ss.handler) - addr := fmt.Sprintf("0.0.0.0:%d", ss.port) - listener, err := net.Listen("tcp", addr) - if err != nil { - return "", fmt.Errorf("listen on %s: %w", addr, err) + // SO_REUSEADDR allows immediate rebind if the port is in TIME_WAIT (e.g. after agent restart) + lc := net.ListenConfig{ + Control: func(network, address string, c syscall.RawConn) error { + return c.Control(func(fd uintptr) { + _ = syscall.SetsockoptInt(int(fd), syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1) + }) + }, + } + + // Try configured port; if busy, try next ports (heartbeat reports actual port to web) + var listener net.Listener + var listenErr error + basePort := ss.port + for attempt := 0; attempt < 10; attempt++ { + addr := fmt.Sprintf("0.0.0.0:%d", ss.port) + listener, listenErr = lc.Listen(ctx, "tcp", addr) + if listenErr == nil { + break + } + if !strings.Contains(listenErr.Error(), "address already in use") { + return fmt.Errorf("stream server listen on %s: %w", addr, listenErr) + } + ss.port++ + log.Printf("[stream] port %d in use, trying %d", ss.port-1, ss.port) + } + if listenErr != nil { + return fmt.Errorf("stream server: all ports busy (%d-%d): %w", basePort, ss.port, listenErr) + } + if ss.port != basePort { + log.Printf("[stream] using port %d (configured %d was busy)", ss.port, basePort) } ss.port = listener.Addr().(*net.TCPAddr).Port // Collect all reachable URLs by network type - if lanIP := lanIP(); lanIP != "" { + if lanIP := LanIP(); lanIP != "" { ss.urls.LAN = fmt.Sprintf("http://%s:%d/stream", lanIP, ss.port) } - if tsIP := tailscaleIP(); tsIP != "" { + if tsIP := TailscaleIP(); tsIP != "" { ss.urls.Tailscale = fmt.Sprintf("http://%s:%d/stream", tsIP, ss.port) } if !ss.disableUPnP { @@ -206,15 +140,49 @@ func (ss *StreamServer) Start(ctx context.Context) (string, error) { } }() - return ss.url, nil + log.Printf("[stream] server listening on port %d", ss.port) + return nil +} + +// SetFile atomically swaps the file being served and resets progress tracking. +func (ss *StreamServer) SetFile(provider FileProvider, taskID string) { + ss.mu.Lock() + ss.provider = provider + ss.taskID = taskID + ss.mu.Unlock() + ss.totalFileSize.Store(provider.FileSize()) + ss.lastActivity.Store(time.Now().UnixNano()) + ss.maxByteOffset.Store(0) +} + +// ClearFile stops serving any file. Subsequent requests return 404. +func (ss *StreamServer) ClearFile() { + ss.mu.Lock() + ss.provider = nil + ss.taskID = "" + ss.mu.Unlock() + ss.totalFileSize.Store(0) + ss.maxByteOffset.Store(0) +} + +// CurrentTaskID returns the task ID of the file currently being served. +func (ss *StreamServer) CurrentTaskID() string { + ss.mu.RLock() + defer ss.mu.RUnlock() + return ss.taskID +} + +// HasFile returns true if a file is currently being served. +func (ss *StreamServer) HasFile() bool { + ss.mu.RLock() + defer ss.mu.RUnlock() + return ss.provider != nil } // URL returns the best single stream URL (backward compat). func (ss *StreamServer) URL() string { return ss.url } // URLsJSON returns all available stream URLs as a JSON string. -// Stored in the stream_url DB field so the web API can resolve -// the best URL based on the browser's network. func (ss *StreamServer) URLsJSON() string { b, _ := json.Marshal(ss.urls) return string(b) @@ -233,6 +201,7 @@ func (ss *StreamServer) IdleSince() time.Duration { } // Shutdown gracefully stops the HTTP server and removes the UPnP port mapping. +// Call only at daemon shutdown — NOT between file swaps. func (ss *StreamServer) Shutdown(ctx context.Context) error { ss.upnpMapping.Remove() if ss.server != nil { @@ -256,6 +225,16 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { } } + // Get current provider (may be nil if no file is being served) + ss.mu.RLock() + provider := ss.provider + ss.mu.RUnlock() + + if provider == nil { + http.Error(w, "no active stream", http.StatusNotFound) + return + } + // CORS headers — only when browser sends Origin (HTTPS site → localhost) if origin := r.Header.Get("Origin"); origin != "" { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -269,21 +248,20 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { } } - reader := ss.provider.NewFileReader(r.Context()) + reader := provider.NewFileReader(r.Context()) if reader == nil { http.Error(w, "file not found", http.StatusNotFound) return } defer reader.Close() - w.Header().Set("Content-Type", mimeTypeFromExt(ss.provider.FileName())) + w.Header().Set("Content-Type", mimeTypeFromExt(provider.FileName())) // "inline" for play requests (VLC/mpv), "attachment" for download requests. - // Browser download via window.open() relies on "attachment" to trigger save dialog. disposition := "inline" if r.URL.Query().Get("download") == "1" { disposition = "attachment" } - downloadName := ss.provider.FileName() + downloadName := provider.FileName() if disposition == "attachment" { ext := filepath.Ext(downloadName) downloadName = strings.TrimSuffix(downloadName, ext) + " [TorrentClaw]" + ext @@ -291,13 +269,12 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Disposition", fmt.Sprintf("%s; filename=%q", disposition, downloadName)) w.Header().Set("Accept-Ranges", "bytes") - http.ServeContent(w, r, ss.provider.FileName(), time.Time{}, reader) + http.ServeContent(w, r, provider.FileName(), time.Time{}, reader) } // EstimatedProgress returns an estimated watch progress based on HTTP Range requests. -// Returns (position, duration) where both are 0-100 scale (percentage-based). func (ss *StreamServer) EstimatedProgress() (position int, duration int) { - total := ss.totalFileSize + total := ss.totalFileSize.Load() if total <= 0 { return 0, 0 } @@ -311,7 +288,6 @@ func (ss *StreamServer) EstimatedProgress() (position int, duration int) { // parseRangeStart extracts the start byte from a "Range: bytes=START-" header. func parseRangeStart(rangeHeader string) int64 { - // Format: "bytes=START-" or "bytes=START-END" after, found := strings.CutPrefix(rangeHeader, "bytes=") if !found { return -1 @@ -327,8 +303,98 @@ func parseRangeStart(rangeHeader string) int64 { return start } -// lanIP returns the machine's LAN IP, or "" if unavailable. -func lanIP() string { +// --- File Providers --- + +// NewDiskFileProvider creates a FileProvider that serves a file from disk. +func NewDiskFileProvider(filePath string) FileProvider { + return &diskFileProvider{ + path: filePath, + name: filepath.Base(filePath), + } +} + +// diskFileProvider serves a file from disk. +type diskFileProvider struct { + path string + name string +} + +func (p *diskFileProvider) NewFileReader(_ context.Context) io.ReadSeekCloser { + f, err := os.Open(p.path) + if err != nil { + log.Printf("stream: failed to open %q: %v", p.path, err) + return nil + } + return f +} + +func (p *diskFileProvider) FileName() string { return p.name } + +func (p *diskFileProvider) FileSize() int64 { + fi, err := os.Stat(p.path) + if err != nil { + log.Printf("stream: failed to stat %q: %v", p.path, err) + return 0 + } + return fi.Size() +} + +// NewTorrentFileProvider creates a FileProvider from an active torrent file. +func NewTorrentFileProvider(file *torrent.File) FileProvider { + return &torrentFileProvider{file: file} +} + +// torrentFileProvider wraps a torrent.File to implement FileProvider. +type torrentFileProvider struct { + file *torrent.File +} + +func (p *torrentFileProvider) NewFileReader(ctx context.Context) io.ReadSeekCloser { + reader := p.file.NewReader() + reader.SetResponsive() + reader.SetReadahead(5 * 1024 * 1024) + reader.SetContext(ctx) + return reader +} + +func (p *torrentFileProvider) FileName() string { + return filepath.Base(p.file.DisplayPath()) +} + +func (p *torrentFileProvider) FileSize() int64 { + return p.file.Length() +} + +// --- Utility functions --- + +// FindVideoFile scans a directory (recursively) for the largest video file. +func FindVideoFile(dir string) string { + var best string + var bestSize int64 + + filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error { + if err != nil || d.IsDir() { + return nil + } + ext := strings.ToLower(filepath.Ext(d.Name())) + if !VideoExts[ext] { + return nil + } + info, err := d.Info() + if err != nil { + return nil + } + if info.Size() > bestSize { + best = path + bestSize = info.Size() + } + return nil + }) + return best +} + +// LanIP returns the machine's LAN IP, or "" if unavailable. +func LanIP() string { conn, err := net.Dial("udp", "8.8.8.8:80") if err != nil { return "" @@ -337,8 +403,8 @@ func lanIP() string { return conn.LocalAddr().(*net.UDPAddr).IP.String() } -// tailscaleIP returns the Tailscale IPv4 address, or "" if Tailscale isn't running. -func tailscaleIP() string { +// TailscaleIP returns the Tailscale IPv4 address, or "" if Tailscale isn't running. +func TailscaleIP() string { out, err := exec.Command("tailscale", "ip", "-4").Output() if err != nil { return "" diff --git a/internal/engine/stream_test.go b/internal/engine/stream_test.go index 8357a5a..61e1612 100644 --- a/internal/engine/stream_test.go +++ b/internal/engine/stream_test.go @@ -189,16 +189,28 @@ func TestStreamServerStartShutdown(t *testing.T) { totalBytes: 1024, } - srv := NewStreamServer(s, 0) + srv := NewStreamServer(0) if srv.Port() != 0 { t.Errorf("initial port should be 0, got %d", srv.Port()) } - // We can't Start() because NewFileReader needs a real torrent File - // But we can test that Shutdown on an un-started server doesn't panic + // Test that Shutdown on an un-started server doesn't panic if err := srv.Shutdown(context.Background()); err != nil { t.Errorf("shutdown of un-started server should not error: %v", err) } + + // Test SetFile/ClearFile + srv.SetFile(s, "test-task-id") + if !srv.HasFile() { + t.Error("HasFile should be true after SetFile") + } + if srv.CurrentTaskID() != "test-task-id" { + t.Errorf("CurrentTaskID = %q, want %q", srv.CurrentTaskID(), "test-task-id") + } + srv.ClearFile() + if srv.HasFile() { + t.Error("HasFile should be false after ClearFile") + } } // --------------------------------------------------------------------------- diff --git a/internal/engine/torrent.go b/internal/engine/torrent.go index 16d4150..9a916df 100644 --- a/internal/engine/torrent.go +++ b/internal/engine/torrent.go @@ -502,10 +502,9 @@ func (d *TorrentDownloader) SaveDhtNodes() { saveDhtNodesBinary(d.client) } -// StartStream starts an HTTP server for an active torrent download. -// It selects the largest video file and serves it via HTTP Range requests. -// Returns the running server (caller is responsible for shutdown). -func (d *TorrentDownloader) StartStream(taskID string) (*StreamServer, error) { +// GetStreamProvider returns a FileProvider for the largest video file in an active torrent. +// Used with the persistent StreamServer's SetFile() method. +func (d *TorrentDownloader) GetStreamProvider(taskID string) (FileProvider, error) { d.activeMu.Lock() t, ok := d.active[taskID] d.activeMu.Unlock() @@ -535,14 +534,7 @@ func (d *TorrentDownloader) StartStream(taskID string) (*StreamServer, error) { return nil, fmt.Errorf("torrent has no files") } - srv := NewStreamServerFromFile(video, 0) - url, err := srv.Start(context.Background()) - if err != nil { - return nil, fmt.Errorf("start stream server: %w", err) - } - - log.Printf("[%s] stream started: %s → %s", taskID[:8], filepath.Base(video.DisplayPath()), url) - return srv, nil + return NewTorrentFileProvider(video), nil } // VideoExts is the canonical set of video file extensions used for file selection. diff --git a/internal/engine/watch_reporter_test.go b/internal/engine/watch_reporter_test.go index 2965914..8cd0878 100644 --- a/internal/engine/watch_reporter_test.go +++ b/internal/engine/watch_reporter_test.go @@ -47,7 +47,8 @@ func TestEstimatedProgress_NoFile(t *testing.T) { } func TestEstimatedProgress_HalfWay(t *testing.T) { - ss := &StreamServer{totalFileSize: 1000} + ss := &StreamServer{} + ss.totalFileSize.Store(1000) ss.maxByteOffset.Store(500) pos, dur := ss.EstimatedProgress() @@ -57,7 +58,8 @@ func TestEstimatedProgress_HalfWay(t *testing.T) { } func TestEstimatedProgress_CapsAt100(t *testing.T) { - ss := &StreamServer{totalFileSize: 1000} + ss := &StreamServer{} + ss.totalFileSize.Store(1000) ss.maxByteOffset.Store(1500) pos, dur := ss.EstimatedProgress() @@ -71,7 +73,8 @@ func TestEstimatedProgress_CapsAt100(t *testing.T) { // --------------------------------------------------------------------------- func TestMaxByteOffsetNeverRegresses(t *testing.T) { - ss := &StreamServer{totalFileSize: 10000} + ss := &StreamServer{} + ss.totalFileSize.Store(10000) offsets := []int64{0, 2000, 5000, 3000, 8000, 4000} for _, off := range offsets { @@ -103,14 +106,15 @@ func TestStreamServerRangeTracking(t *testing.T) { t.Fatal(err) } - srv := NewStreamServerFromDisk(tmpFile, 0) + srv := NewStreamServer(0) srv.disableUPnP = true ctx := context.Background() - url, err := srv.Start(ctx) - if err != nil { - t.Fatalf("start: %v", err) + if err := srv.Listen(ctx); err != nil { + t.Fatalf("listen: %v", err) } defer srv.Shutdown(ctx) + srv.SetFile(NewDiskFileProvider(tmpFile), "test-task") + url := srv.URL() // 1. Non-range GET — maxByteOffset stays 0 resp, err := http.Get(url)