feat(stream): persistent stream server with file swapping

This commit is contained in:
Deivid Soto 2026-04-07 19:08:37 +02:00
parent 080fdf4d76
commit 5994a30447
11 changed files with 354 additions and 282 deletions

View file

@ -151,6 +151,9 @@ func runDaemonStart() error {
DownloadDir: cfg.Download.Dir,
PollInterval: pollInterval,
HeartbeatInterval: heartbeatInterval,
StreamPort: cfg.Download.StreamPort,
LanIP: engine.LanIP(),
TailscaleIP: engine.TailscaleIP(),
}
// Create transport: Hybrid (WS + HTTP fallback) or HTTP-only
@ -236,6 +239,15 @@ func runDaemonStart() error {
},
}, reporter, torrentDl, debridDl, engine.NewUsenetDownloader(httpT.Client()))
// Create persistent stream server — lives for the entire daemon lifecycle.
// One port, one server, swap files with SetFile(). No more port churn.
streamSrv := engine.NewStreamServer(cfg.Download.StreamPort)
if err := streamSrv.Listen(ctx); err != nil {
return fmt.Errorf("start stream server: %w", err)
}
// Update heartbeat with actual port (may differ if configured port was busy)
d.UpdateStreamPort(streamSrv.Port())
// Wire state tracking
d.GetActiveCount = manager.ActiveCount
d.GetCleanableBytes = CleanableBytes
@ -254,7 +266,7 @@ func runDaemonStart() error {
cancelStreamTask(taskID)
})
// Wire: stream requested on active download → start HTTP server
// Wire: stream requested on active download → set file on persistent server
reporter.SetStreamRequestedHandler(func(taskID string) {
task := manager.GetTask(taskID)
if task == nil {
@ -264,19 +276,18 @@ func runDaemonStart() error {
if task.GetStreamURL() != "" {
return // already streaming
}
srv, err := torrentDl.StartStream(taskID)
provider, err := torrentDl.GetStreamProvider(taskID)
if err != nil {
log.Printf("[%s] stream failed: %v", taskID[:8], err)
return
}
// Register server before setting URL to avoid TOCTOU race
streamRegistry.mu.Lock()
streamRegistry.servers[taskID] = srv
streamRegistry.mu.Unlock()
task.SetStreamURL(srv.URL())
cancelStreamContexts()
streamSrv.SetFile(provider, taskID)
task.SetStreamURL(streamSrv.URLsJSON())
log.Printf("[%s] streaming active download: %s", taskID[:8], provider.FileName())
// Start watch progress reporter
go engine.NewWatchReporter(agentClient, srv, taskID).Run(ctx)
go engine.NewWatchReporter(agentClient, streamSrv, taskID).Run(ctx)
})
// Wire: daemon claimed tasks -> manager
@ -288,15 +299,15 @@ func runDaemonStart() error {
if isStreamingTask(t.ID) {
continue
}
// Only 1 stream at a time: cancel all existing streams
cancelAllStreams()
// Only 1 stream at a time: cancel existing stream goroutines + clear file
cancelStreamContexts()
streamSrv.ClearFile()
// Reserve slot before spawning goroutine to prevent TOCTOU race.
// streamCancel is stored in streamRegistry and called by cancelAllStreams/cancelStreamTask.
streamCtx, streamCancel := context.WithCancel(ctx) //nolint:gosec // G118: cancel ownership transferred to streamRegistry
streamRegistry.mu.Lock()
streamRegistry.cancels[t.ID] = streamCancel
streamRegistry.mu.Unlock()
go handleStreamTask(streamCtx, t, reporter, cfg, agentClient)
go handleStreamTask(streamCtx, t, reporter, cfg, agentClient, streamSrv)
} else if t.ForceStart || manager.HasCapacity() {
manager.Submit(ctx, t)
} else {
@ -305,16 +316,13 @@ func runDaemonStart() error {
}
}
// Wire: stream requests for completed downloads → serve file from disk
// Wire: stream requests for completed downloads → set file on persistent server
d.OnStreamRequested = func(sr agent.StreamRequest) {
// Skip if already streaming this task
if isStreamingTask(sr.TaskID) {
// Skip if already serving this task
if streamSrv.CurrentTaskID() == sr.TaskID {
return
}
// Only 1 stream at a time: cancel all existing streams
cancelAllStreams()
filePath := sr.FilePath
info, err := os.Stat(filePath)
if err != nil {
@ -351,43 +359,24 @@ func runDaemonStart() error {
log.Printf("[%s] resolved directory to video file: %s", sr.TaskID[:8], filepath.Base(filePath))
}
srv := engine.NewStreamServerFromDisk(filePath, cfg.Download.StreamPort)
streamURL, err := srv.Start(ctx)
if err != nil {
log.Printf("[%s] stream failed: %v", sr.TaskID[:8], err)
go func() {
if _, err := transport.SendProgress(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
Status: "failed",
ErrorMessage: fmt.Sprintf("stream server start failed: %v", err),
}); err != nil {
log.Printf("[%s] stream error report failed: %v", sr.TaskID[:8], err)
}
}()
return
}
// Cancel any active stream goroutines and swap file on the persistent server
cancelStreamContexts()
streamSrv.SetFile(engine.NewDiskFileProvider(filePath), sr.TaskID)
streamRegistry.mu.Lock()
streamRegistry.servers[sr.TaskID] = srv
streamRegistry.mu.Unlock()
log.Printf("[%s] streaming from disk: %s → %s", sr.TaskID[:8], filepath.Base(sr.FilePath), streamURL)
log.Printf("[%s] streaming from disk: %s → %s", sr.TaskID[:8], filepath.Base(filePath), streamSrv.URL())
// Start watch progress reporter
go engine.NewWatchReporter(agentClient, srv, sr.TaskID).Run(ctx)
go engine.NewWatchReporter(agentClient, streamSrv, sr.TaskID).Run(ctx)
// Report stream URL back to the server via transport
// Notify server that stream is ready (clears streamRequested flag)
go func() {
if _, err := transport.SendProgress(ctx, agent.StatusUpdate{
TaskID: sr.TaskID,
StreamURL: streamURL,
TaskID: sr.TaskID,
StreamReady: true,
}); err != nil {
log.Printf("[%s] stream URL report failed: %v", sr.TaskID[:8], err)
log.Printf("[%s] stream ready report failed: %v", sr.TaskID[:8], err)
}
}()
// Auto-shutdown after 30 min of idle (no HTTP requests)
go startIdleGuard(ctx, srv, sr.TaskID)
}
// Wire: WS control actions (pause/cancel/stream pushed from server)
@ -396,34 +385,41 @@ func runDaemonStart() error {
case "cancel":
manager.CancelTask(taskID)
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
}
case "pause":
manager.PauseTask(taskID)
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
}
case "resume":
log.Printf("[%s] resume requested via WebSocket, triggering poll", taskID[:8])
d.TriggerPoll()
case "stream":
// Skip if already streaming this task
if isStreamingTask(taskID) {
if streamSrv.CurrentTaskID() == taskID {
return
}
task := manager.GetTask(taskID)
if task == nil || task.GetStreamURL() != "" {
return
}
// Only 1 stream at a time: cancel all existing streams
cancelAllStreams()
srv, err := torrentDl.StartStream(taskID)
provider, err := torrentDl.GetStreamProvider(taskID)
if err != nil {
log.Printf("[%s] stream failed: %v", taskID[:8], err)
return
}
streamRegistry.mu.Lock()
streamRegistry.servers[taskID] = srv
streamRegistry.mu.Unlock()
task.SetStreamURL(srv.URL())
cancelStreamContexts()
streamSrv.SetFile(provider, taskID)
task.SetStreamURL(streamSrv.URLsJSON())
log.Printf("[%s] streaming via WS: %s", taskID[:8], provider.FileName())
case "stop-stream":
cancelStreamTask(taskID)
if streamSrv.CurrentTaskID() == taskID {
streamSrv.ClearFile()
}
}
}
@ -477,10 +473,15 @@ func runDaemonStart() error {
errCh <- d.Run(ctx)
}()
// Start idle guard for the persistent stream server
go startIdleGuard(ctx, streamSrv)
// Wait for signal or error
select {
case sig := <-sigCh:
fmt.Printf("\n Received %s, shutting down...\n", sig)
cancelStreamContexts()
streamSrv.Shutdown(context.Background())
cancel()
// Give active downloads 30s to finish
@ -492,6 +493,8 @@ func runDaemonStart() error {
return nil
case err := <-errCh:
cancelStreamContexts()
streamSrv.Shutdown(context.Background())
cancel()
return err
}