diff --git a/internal/agent/types.go b/internal/agent/types.go index 82d70a4..c16e194 100644 --- a/internal/agent/types.go +++ b/internal/agent/types.go @@ -351,12 +351,19 @@ type LibraryDeleteRequest struct { FilePath string `json:"filePath"` } -// WebRTCSession is a request to open a custom WebRTC DataChannel byte-stream -// to a browser player. The CLI must POST an SDP answer to -// /api/internal/stream/signal/ and serve bytes from FilePath -// (or, when only InfoHash is set, from a download_task on disk). +// WebRTCSession is a request to open a streaming session for a browser +// player. Transport selects the on-the-wire protocol: empty/"webrtc" runs the +// legacy custom WebRTC DataChannel pipeline; "hls" spawns an HLS session +// (ffmpeg producing fragmented MP4 served over HTTP). The CLI must POST an +// SDP answer to /api/internal/stream/signal/ for WebRTC sessions +// and register the HLS session in the StreamServer's HLS registry for HLS +// sessions; either way the source bytes come from FilePath (or, when only +// InfoHash is set, from a download_task on disk). type WebRTCSession struct { SessionID string `json:"sessionId"` + // Transport selects the streaming protocol. "" or "webrtc" → legacy + // WebRTC + MSE pipeline (Phase 1). "hls" → HLS over HTTP (Phase 2). + Transport string `json:"transport,omitempty"` FilePath string `json:"filePath,omitempty"` InfoHash string `json:"infoHash,omitempty"` TaskID string `json:"taskId,omitempty"` @@ -365,6 +372,9 @@ type WebRTCSession struct { // Quality target the daemon should aim for when transcoding. One of // "2160p" | "1080p" | "720p" | "480p" | "original" | "" (defer to config). Quality string `json:"quality,omitempty"` + // AudioIndex selects the source audio track (-map 0:a:N). -1 means + // "use the default/first track" (HLS) or ignored (WebRTC). + AudioIndex int `json:"audioIndex,omitempty"` } // SyncResponse is returned by the server with all pending actions for the CLI. diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 9845bd7..2710e20 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -444,6 +444,33 @@ func runDaemonStart() error { } filePath = found } + + // Branch on transport: HLS sessions register with the StreamServer + // HLS registry and serve over HTTP; default ("" or "webrtc") runs + // the legacy DataChannel pipeline. + if strings.EqualFold(sess.Transport, "hls") { + tcRuntime := buildTranscodeRuntime(ctx, cfg) + if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { + log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID)) + return + } + hlsCfg := engine.HLSSessionConfig{ + SessionID: sess.SessionID, + SourcePath: filePath, + FileName: sess.FileName, + Quality: sess.Quality, + AudioIndex: sess.AudioIndex, + Transcode: tcRuntime, + } + hsess, err := engine.StartHLSSession(ctx, hlsCfg) + if err != nil { + log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err) + return + } + streamSrv.HLS().Register(hsess) + return + } + sessCtx, sessCancel := context.WithCancel(ctx) //nolint:gosec // G118 cancel stored in registry webrtcRegistry.add(sess.SessionID, sessCancel) go func() { diff --git a/internal/engine/hls.go b/internal/engine/hls.go new file mode 100644 index 0000000..9ba0235 --- /dev/null +++ b/internal/engine/hls.go @@ -0,0 +1,788 @@ +// Package engine — hls.go implements the HLS streaming pipeline. +// +// Browser ↔ daemon over plain HTTP (LAN / Tailscale / UPnP). The daemon runs +// ffmpeg in `-f hls` mode, writing fragmented MP4 segments to a per-session +// tmpdir. Master + media playlists are pre-rendered from the probed source +// duration so the player knows the full timeline before any segment exists, +// which fixes the seek/duration/pause/multi-track problems we hit with the +// raw fMP4-over-WebRTC pipeline. +// +// One HLSSession == one browser playback. Sessions are registered in a +// process-wide map keyed by session ID; the StreamServer routes +// GET /hls//master.m3u8 +// GET /hls//video/index.m3u8 +// GET /hls//video/init.mp4 +// GET /hls//video/seg-.m4s +// GET /hls//subs/.vtt +// to the matching session. + +package engine + +import ( + "context" + "errors" + "fmt" + "io" + "log" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync" + "time" +) + +// hlsSegmentDuration is the target seconds per HLS fragment. Four seconds is +// the Plex/Apple default — short enough that seek granularity is acceptable, +// long enough that GOP overhead doesn't dominate. +const hlsSegmentDuration = 4 + +// hlsSessionTTL is how long a session can sit idle (no segment requests) +// before the manager kills ffmpeg + cleans the tmpdir. +const hlsSessionTTL = 30 * time.Minute + +// hlsTmpDirRoot returns the per-user tmpdir root for HLS sessions. +// +// Linux: ~/.cache/unarr/hls-sessions +// macOS: ~/Library/Caches/unarr/hls-sessions +// Windows: %LOCALAPPDATA%/unarr/hls-sessions +// +// Falls back to os.TempDir() if the user cache dir can't be resolved. +func hlsTmpDirRoot() string { + if dir, err := os.UserCacheDir(); err == nil { + return filepath.Join(dir, "unarr", "hls-sessions") + } + return filepath.Join(os.TempDir(), "unarr-hls-sessions") +} + +// HLSSessionConfig describes a single browser playback session driven by HLS. +type HLSSessionConfig struct { + SessionID string + SourcePath string + FileName string + Quality string // "2160p"|"1080p"|"720p"|"480p"|"original"|"" + AudioIndex int // 0-based ffmpeg audio stream selection (-map 0:a:N). -1 = default. + Transcode TranscodeRuntime +} + +// HLSSession owns a tmpdir + ffmpeg subprocess producing HLS fragments. +type HLSSession struct { + cfg HLSSessionConfig + probe *StreamProbe + + tmpDir string + durationSec float64 + segmentCount int + manifestVideo string // pre-rendered video media playlist + manifestRoot string // pre-rendered master playlist + + mu sync.Mutex + cmd *exec.Cmd + cancel context.CancelFunc + closed bool + startedAt time.Time + lastTouch time.Time + + // readyCond + readyMax track which segments ffmpeg has finished writing. + // Handlers waiting on a future segment block on readyCond until the + // poller advances readyMax past their index (or ffmpeg exits). + readyMu sync.Mutex + readyMax int // highest segment index whose .m4s file is fully written + exitErr error + exited bool + readyCh chan struct{} // closed + replaced each time readyMax advances +} + +// HLSSessionRegistry tracks active sessions keyed by ID. +type HLSSessionRegistry struct { + mu sync.RWMutex + sessions map[string]*HLSSession +} + +// NewHLSSessionRegistry returns an empty registry. +func NewHLSSessionRegistry() *HLSSessionRegistry { + return &HLSSessionRegistry{sessions: make(map[string]*HLSSession)} +} + +// Get fetches a session by ID; returns nil if not registered. +func (r *HLSSessionRegistry) Get(id string) *HLSSession { + r.mu.RLock() + defer r.mu.RUnlock() + return r.sessions[id] +} + +// Register adds a session under its ID. Replaces any previous session with +// the same ID (which is closed first to release ffmpeg + tmpdir). +func (r *HLSSessionRegistry) Register(s *HLSSession) { + r.mu.Lock() + defer r.mu.Unlock() + if prev, ok := r.sessions[s.cfg.SessionID]; ok { + _ = prev.Close() + } + r.sessions[s.cfg.SessionID] = s +} + +// Remove drops a session from the registry without closing it. +func (r *HLSSessionRegistry) Remove(id string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.sessions, id) +} + +// CloseAll terminates every active session. Call at daemon shutdown. +func (r *HLSSessionRegistry) CloseAll() { + r.mu.Lock() + sessions := make([]*HLSSession, 0, len(r.sessions)) + for _, s := range r.sessions { + sessions = append(sessions, s) + } + r.sessions = make(map[string]*HLSSession) + r.mu.Unlock() + for _, s := range sessions { + _ = s.Close() + } +} + +// SweepIdle closes sessions that have not been touched within hlsSessionTTL. +// Returns the number of sessions reaped. +func (r *HLSSessionRegistry) SweepIdle() int { + r.mu.Lock() + stale := make([]*HLSSession, 0) + for id, s := range r.sessions { + s.mu.Lock() + idle := time.Since(s.lastTouch) + s.mu.Unlock() + if idle > hlsSessionTTL { + stale = append(stale, s) + delete(r.sessions, id) + } + } + r.mu.Unlock() + for _, s := range stale { + _ = s.Close() + } + return len(stale) +} + +// StartHLSSession probes the source, builds the playlists, spawns ffmpeg, +// and returns a HLSSession ready to serve HTTP requests. Caller must register +// the session with a HLSSessionRegistry so the server can route to it. +func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, error) { + if cfg.SessionID == "" { + return nil, errors.New("hls: empty session id") + } + if cfg.SourcePath == "" { + return nil, errors.New("hls: empty source path") + } + if cfg.Transcode.FFmpegPath == "" || cfg.Transcode.FFprobePath == "" { + return nil, errors.New("hls: ffmpeg/ffprobe not available") + } + + probe, err := ProbeFile(ctx, cfg.Transcode.FFprobePath, cfg.SourcePath) + if err != nil { + return nil, fmt.Errorf("hls: probe: %w", err) + } + if probe.DurationSec <= 0 { + return nil, errors.New("hls: source has no duration") + } + + tmpDir := filepath.Join(hlsTmpDirRoot(), cfg.SessionID) + if err := os.MkdirAll(filepath.Join(tmpDir, "video"), 0o755); err != nil { + return nil, fmt.Errorf("hls: mkdir video: %w", err) + } + if err := os.MkdirAll(filepath.Join(tmpDir, "subs"), 0o755); err != nil { + return nil, fmt.Errorf("hls: mkdir subs: %w", err) + } + + segCount := int((probe.DurationSec + float64(hlsSegmentDuration) - 1) / float64(hlsSegmentDuration)) + if segCount < 1 { + segCount = 1 + } + + s := &HLSSession{ + cfg: cfg, + probe: probe, + tmpDir: tmpDir, + durationSec: probe.DurationSec, + segmentCount: segCount, + startedAt: time.Now(), + lastTouch: time.Now(), + readyCh: make(chan struct{}), + } + s.manifestVideo = renderVideoPlaylist(probe.DurationSec, segCount) + s.manifestRoot = renderMasterPlaylist(probe, cfg.Quality) + + // Spawn ffmpeg under a dedicated context so Close() can kill it without + // touching the parent ctx. + ffCtx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + args := buildHLSFFmpegArgs(cfg, probe, tmpDir) + cmd := exec.CommandContext(ffCtx, cfg.Transcode.FFmpegPath, args...) + cmd.Stderr = &hlsStderrCapture{owner: s} + if err := cmd.Start(); err != nil { + cancel() + _ = os.RemoveAll(tmpDir) + return nil, fmt.Errorf("hls: start ffmpeg: %w", err) + } + s.cmd = cmd + + go s.waitFFmpeg() + go s.pollSegments(ffCtx) + + if len(probe.SubtitleTracks) > 0 { + go s.extractSubtitles(ffCtx) + } + + log.Printf("[hls %s] started: %s, %.1fs, %d segs (quality=%s)", + shortHLSID(cfg.SessionID), filepath.Base(cfg.SourcePath), + probe.DurationSec, segCount, coalesce(cfg.Quality, "auto")) + return s, nil +} + +// shortHLSID truncates a session ID for log lines. +func shortHLSID(id string) string { + if len(id) > 8 { + return id[:8] + } + return id +} + +// MasterPlaylist returns the rendered master.m3u8 contents. +func (s *HLSSession) MasterPlaylist() string { return s.manifestRoot } + +// VideoPlaylist returns the rendered video media playlist contents. +func (s *HLSSession) VideoPlaylist() string { return s.manifestVideo } + +// DurationSeconds returns the source duration in seconds. +func (s *HLSSession) DurationSeconds() float64 { return s.durationSec } + +// Probe returns the probe metadata used to start the session. +func (s *HLSSession) Probe() *StreamProbe { return s.probe } + +// Touch updates the last-activity timestamp; the registry sweeper compares +// this against hlsSessionTTL. +func (s *HLSSession) Touch() { + s.mu.Lock() + s.lastTouch = time.Now() + s.mu.Unlock() +} + +// Close stops ffmpeg, deletes the tmpdir, and prevents further requests from +// blocking on segment readiness. Idempotent. +func (s *HLSSession) Close() error { + s.mu.Lock() + if s.closed { + s.mu.Unlock() + return nil + } + s.closed = true + cancel := s.cancel + tmpDir := s.tmpDir + s.mu.Unlock() + if cancel != nil { + cancel() + } + // Unblock any handler waiting on readyCh. + s.readyMu.Lock() + if s.readyCh != nil { + close(s.readyCh) + s.readyCh = nil + } + s.exited = true + s.readyMu.Unlock() + if tmpDir != "" { + _ = os.RemoveAll(tmpDir) + } + log.Printf("[hls %s] closed", shortHLSID(s.cfg.SessionID)) + return nil +} + +// waitFFmpeg reaps the ffmpeg process and records its exit error for handlers. +func (s *HLSSession) waitFFmpeg() { + err := s.cmd.Wait() + s.readyMu.Lock() + s.exitErr = err + s.exited = true + if s.readyCh != nil { + close(s.readyCh) + s.readyCh = nil + } + s.readyMu.Unlock() + if err != nil && !s.isClosed() { + log.Printf("[hls %s] ffmpeg exited: %v", shortHLSID(s.cfg.SessionID), err) + } +} + +// pollSegments watches the video tmpdir for newly-finished .m4s files and +// advances readyMax. ffmpeg writes a segment by first creating an empty +// file, then closing+renaming on completion (atomic-replace), so we use +// stat size > 0 + presence of the *next* segment as proof the previous one +// is done. For the last segment, ffmpeg's exit terminates the wait. +func (s *HLSSession) pollSegments(ctx context.Context) { + ticker := time.NewTicker(250 * time.Millisecond) + defer ticker.Stop() + videoDir := filepath.Join(s.tmpDir, "video") + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + // Walk segment files and find the highest contiguous index whose + // successor exists (which proves the segment is fully closed). + s.readyMu.Lock() + start := s.readyMax + exited := s.exited + s.readyMu.Unlock() + + highest := start + for i := start; i < s.segmentCount; i++ { + cur := filepath.Join(videoDir, fmt.Sprintf("seg-%d.m4s", i)) + next := filepath.Join(videoDir, fmt.Sprintf("seg-%d.m4s", i+1)) + ci, err := os.Stat(cur) + if err != nil || ci.Size() == 0 { + break + } + // Last segment is "ready" only when ffmpeg has exited (no successor + // can ever appear) or when a later segment exists. + if i == s.segmentCount-1 { + if !exited { + break + } + highest = i + 1 + break + } + if _, err := os.Stat(next); err != nil { + break + } + highest = i + 1 + } + if highest > start { + s.readyMu.Lock() + s.readyMax = highest + ch := s.readyCh + s.readyCh = make(chan struct{}) + s.readyMu.Unlock() + if ch != nil { + close(ch) + } + } + if exited && highest >= s.segmentCount { + return + } + } +} + +// waitForSegment blocks until segment idx has been fully written, ffmpeg +// has exited, or ctx is cancelled. Returns nil iff the segment file is +// safe to read at return time. +func (s *HLSSession) waitForSegment(ctx context.Context, idx int) error { + deadline := time.Now().Add(60 * time.Second) + for { + s.readyMu.Lock() + ready := idx < s.readyMax + exited := s.exited + ch := s.readyCh + exitErr := s.exitErr + s.readyMu.Unlock() + if ready { + return nil + } + if exited { + if exitErr != nil { + return fmt.Errorf("hls: ffmpeg exited: %w", exitErr) + } + return errors.New("hls: ffmpeg exited before segment ready") + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-ch: + // loop and re-check + case <-time.After(time.Until(deadline)): + return errors.New("hls: timeout waiting for segment") + } + if time.Now().After(deadline) { + return errors.New("hls: timeout waiting for segment") + } + } +} + +// isClosed reports whether Close() has been invoked. +func (s *HLSSession) isClosed() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.closed +} + +// ---- HTTP handlers ---- + +// ServeMaster writes master.m3u8 to w. +func (s *HLSSession) ServeMaster(w http.ResponseWriter, r *http.Request) { + s.Touch() + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + w.Header().Set("Cache-Control", "no-cache") + _, _ = io.WriteString(w, s.manifestRoot) +} + +// ServeVideoPlaylist writes the video media playlist (index.m3u8) to w. +func (s *HLSSession) ServeVideoPlaylist(w http.ResponseWriter, r *http.Request) { + s.Touch() + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + w.Header().Set("Cache-Control", "no-cache") + _, _ = io.WriteString(w, s.manifestVideo) +} + +// ServeInit writes init.mp4 (the fMP4 init segment) to w. +func (s *HLSSession) ServeInit(w http.ResponseWriter, r *http.Request) { + s.Touch() + path := filepath.Join(s.tmpDir, "video", "init.mp4") + // Init segment is the first thing ffmpeg writes — wait briefly for it. + deadline := time.Now().Add(30 * time.Second) + for { + if fi, err := os.Stat(path); err == nil && fi.Size() > 0 { + break + } + if s.isClosed() || time.Now().After(deadline) { + http.Error(w, "init segment unavailable", http.StatusServiceUnavailable) + return + } + time.Sleep(150 * time.Millisecond) + } + w.Header().Set("Content-Type", "video/mp4") + w.Header().Set("Cache-Control", "max-age=3600") + http.ServeFile(w, r, path) +} + +// ServeSegment writes the requested video segment, blocking until ffmpeg +// produces it (capped by waitForSegment timeout). +func (s *HLSSession) ServeSegment(w http.ResponseWriter, r *http.Request, idx int) { + s.Touch() + if idx < 0 || idx >= s.segmentCount { + http.Error(w, "segment out of range", http.StatusNotFound) + return + } + if err := s.waitForSegment(r.Context(), idx); err != nil { + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + path := filepath.Join(s.tmpDir, "video", fmt.Sprintf("seg-%d.m4s", idx)) + w.Header().Set("Content-Type", "video/mp4") + w.Header().Set("Cache-Control", "max-age=3600") + http.ServeFile(w, r, path) +} + +// ServeSubtitle writes the WebVTT subtitle for the requested track index, if +// extraction has finished. +func (s *HLSSession) ServeSubtitle(w http.ResponseWriter, r *http.Request, idx int) { + s.Touch() + if idx < 0 || idx >= len(s.probe.SubtitleTracks) { + http.Error(w, "subtitle track not found", http.StatusNotFound) + return + } + path := filepath.Join(s.tmpDir, "subs", fmt.Sprintf("sub-%d.vtt", idx)) + deadline := time.Now().Add(15 * time.Second) + for { + if fi, err := os.Stat(path); err == nil && fi.Size() > 0 { + break + } + if s.isClosed() || time.Now().After(deadline) { + http.Error(w, "subtitle not yet extracted", http.StatusServiceUnavailable) + return + } + time.Sleep(200 * time.Millisecond) + } + w.Header().Set("Content-Type", "text/vtt; charset=utf-8") + w.Header().Set("Cache-Control", "max-age=3600") + http.ServeFile(w, r, path) +} + +// ---- ffmpeg argument builders ---- + +// buildHLSFFmpegArgs returns the argv for the main HLS encode. It always +// re-encodes video + audio so segment boundaries align with -force_key_frames. +// Pure -c copy can be added later for h264+aac+mp4 sources where the GOP is +// already short enough; keeping it simple for the MVP. +func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) []string { + hwHint := cfg.Transcode.HWAccel + args := []string{"-y", "-hide_banner", "-loglevel", "warning"} + + switch hwHint { + case HWAccelNVENC: + args = append(args, "-hwaccel", "cuda") + case HWAccelQSV: + args = append(args, "-hwaccel", "qsv") + case HWAccelVAAPI: + args = append(args, "-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi") + case HWAccelNone, HWAccelVideoToolbox: + // No demuxer-side hint. + } + + args = append(args, "-i", cfg.SourcePath) + + // Map video + selected audio. Always use first video stream. + args = append(args, "-map", "0:v:0") + audioIdx := cfg.AudioIndex + if audioIdx < 0 { + audioIdx = 0 + for i, a := range probe.AudioTracks { + if a.Default { + audioIdx = i + break + } + } + } + args = append(args, "-map", fmt.Sprintf("0:a:%d?", audioIdx)) + + // Video encode. + codec := hwHint.FFmpegVideoCodec("h264") + args = append(args, "-c:v", codec) + if codec == "libx264" { + preset := cfg.Transcode.Preset + if preset == "" { + preset = "veryfast" + } + args = append(args, "-preset", preset) + } + args = append(args, "-profile:v", "main", "-level:v", "4.0") + + qcap := resolveQualityCap(cfg.Quality) + bitrate := qcap.VideoBitrate + if bitrate == "" { + bitrate = cfg.Transcode.VideoBitrate + } + if bitrate == "" { + bitrate = "5M" + } + args = append(args, "-b:v", bitrate, "-maxrate", bitrate, "-bufsize", bitrate) + + // Force keyframe alignment with segment boundaries. + args = append(args, "-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", hlsSegmentDuration)) + + // Filter chain: optional scale, force 8-bit yuv420p, normalise color metadata. + maxH := qcap.MaxHeight + if maxH == 0 { + maxH = cfg.Transcode.MaxHeight + } + var filterChain string + if maxH > 0 && probe.Height > maxH { + filterChain = fmt.Sprintf( + "scale=-2:%d:force_original_aspect_ratio=decrease,format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv", + maxH, + ) + } else { + filterChain = "format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv" + } + args = append(args, "-vf", filterChain) + + // Audio: AAC stereo 48 kHz — broadest browser compatibility. + audioBitrate := cfg.Transcode.AudioBitrate + if audioBitrate == "" { + audioBitrate = "192k" + } + args = append(args, + "-c:a", "aac", + "-b:a", audioBitrate, + "-ar", "48000", + "-ac", "2", + ) + + // HLS muxer — fmp4 segments with pre-computed segment count. + videoDir := filepath.Join(tmpDir, "video") + args = append(args, + "-f", "hls", + "-hls_time", strconv.Itoa(hlsSegmentDuration), + "-hls_playlist_type", "vod", + "-hls_segment_type", "fmp4", + "-hls_list_size", "0", + "-hls_fmp4_init_filename", "init.mp4", + "-hls_segment_filename", filepath.Join(videoDir, "seg-%d.m4s"), + filepath.Join(videoDir, "ffmpeg.m3u8"), + ) + return args +} + +// extractSubtitles spawns short-lived ffmpeg jobs to convert each text-based +// subtitle track to WebVTT in parallel. Bitmap subs (PGS, DVB) are skipped — +// they would require burn-in into the video encode, which is out of scope. +func (s *HLSSession) extractSubtitles(ctx context.Context) { + subsDir := filepath.Join(s.tmpDir, "subs") + for i, sub := range s.probe.SubtitleTracks { + if !sub.IsTextSubtitle() { + continue + } + out := filepath.Join(subsDir, fmt.Sprintf("sub-%d.vtt", i)) + args := []string{ + "-y", "-hide_banner", "-loglevel", "warning", + "-i", s.cfg.SourcePath, + "-map", fmt.Sprintf("0:s:%d?", i), + "-c:s", "webvtt", + out, + } + // Run sequentially to avoid hammering the disk; subtitle extraction + // is fast enough that parallelism isn't worth the complexity. + cmd := exec.CommandContext(ctx, s.cfg.Transcode.FFmpegPath, args...) + if err := cmd.Run(); err != nil { + if ctx.Err() != nil { + return + } + log.Printf("[hls %s] subtitle %d (%s) extract failed: %v", + shortHLSID(s.cfg.SessionID), i, sub.Lang, err) + continue + } + } +} + +// ---- Manifest rendering ---- + +// renderVideoPlaylist builds the VOD media playlist for the video stream. +// Segment count is derived from the source duration — the player learns the +// total timeline from the manifest before any segment is fetched. +func renderVideoPlaylist(durationSec float64, segCount int) string { + var b strings.Builder + b.WriteString("#EXTM3U\n") + b.WriteString("#EXT-X-VERSION:7\n") + b.WriteString("#EXT-X-PLAYLIST-TYPE:VOD\n") + b.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", hlsSegmentDuration+1)) + b.WriteString("#EXT-X-MEDIA-SEQUENCE:0\n") + b.WriteString(`#EXT-X-MAP:URI="init.mp4"` + "\n") + remaining := durationSec + for i := 0; i < segCount; i++ { + segDur := float64(hlsSegmentDuration) + if remaining < segDur { + segDur = remaining + } + b.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n", segDur)) + b.WriteString(fmt.Sprintf("seg-%d.m4s\n", i)) + remaining -= segDur + } + b.WriteString("#EXT-X-ENDLIST\n") + return b.String() +} + +// renderMasterPlaylist builds the top-level master playlist with the single +// video variant + every text subtitle as an EXT-X-MEDIA group. Audio is muxed +// into the video segments for the MVP — separate audio renditions can come +// later (they require a second ffmpeg pipeline producing audio-only segments). +func renderMasterPlaylist(probe *StreamProbe, qualityLabel string) string { + var b strings.Builder + b.WriteString("#EXTM3U\n") + b.WriteString("#EXT-X-VERSION:7\n") + + // Subtitle renditions. + hasSubs := false + for i, s := range probe.SubtitleTracks { + if !s.IsTextSubtitle() { + continue + } + hasSubs = true + lang := s.Lang + if lang == "" { + lang = "und" + } + name := s.Title + if name == "" { + name = strings.ToUpper(lang) + } + def := "NO" + if s.Forced || i == 0 { + def = "YES" + } + b.WriteString(fmt.Sprintf( + `#EXT-X-MEDIA:TYPE=SUBTITLES,GROUP-ID="subs",NAME=%q,LANGUAGE=%q,DEFAULT=%s,AUTOSELECT=YES,FORCED=%s,URI="subs/sub-%d.m3u8"`+"\n", + name, lang, def, ynBool(s.Forced), i, + )) + } + + // Video variant. Bandwidth + resolution are best-effort estimates from probe. + bw := bitrateForQuality(qualityLabel) + w, h := scaledDimensions(probe.Width, probe.Height, qualityHeight(qualityLabel)) + codecs := `avc1.4D4028,mp4a.40.2` + streamInf := fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d,CODECS=%q", bw, w, h, codecs) + if hasSubs { + streamInf += `,SUBTITLES="subs"` + } + b.WriteString(streamInf + "\n") + b.WriteString("video/index.m3u8\n") + return b.String() +} + +func ynBool(b bool) string { + if b { + return "YES" + } + return "NO" +} + +// bitrateForQuality returns a synthetic bandwidth attribute for the master +// playlist's STREAM-INF — only used by ABR logic, which we don't run yet. +func bitrateForQuality(q string) int { + switch q { + case "2160p": + return 25_000_000 + case "1080p": + return 6_000_000 + case "720p": + return 3_500_000 + case "480p": + return 1_500_000 + } + return 6_000_000 +} + +func qualityHeight(q string) int { + switch q { + case "2160p": + return 2160 + case "1080p": + return 1080 + case "720p": + return 720 + case "480p": + return 480 + } + return 0 +} + +// scaledDimensions returns (width, height) after applying a height cap that +// preserves the source aspect ratio. capH=0 returns the original dims. +func scaledDimensions(srcW, srcH, capH int) (int, int) { + if srcW <= 0 || srcH <= 0 { + return 1920, 1080 + } + if capH == 0 || srcH <= capH { + return srcW, srcH + } + w := srcW * capH / srcH + if w%2 != 0 { + w++ + } + return w, capH +} + +// ---- Logger plumbing ---- + +// hlsStderrCapture forwards ffmpeg stderr lines to the daemon log prefixed by +// the session ID, so failures are visible without spelunking tmpdirs. +type hlsStderrCapture struct { + owner *HLSSession + buf strings.Builder +} + +func (c *hlsStderrCapture) Write(p []byte) (int, error) { + c.buf.Write(p) + for { + line, rest, ok := strings.Cut(c.buf.String(), "\n") + if !ok { + break + } + c.buf.Reset() + c.buf.WriteString(rest) + if line = strings.TrimSpace(line); line != "" { + log.Printf("[hls %s] ffmpeg: %s", shortHLSID(c.owner.cfg.SessionID), line) + } + } + return len(p), nil +} diff --git a/internal/engine/probe.go b/internal/engine/probe.go index 8e3e654..39ff374 100644 --- a/internal/engine/probe.go +++ b/internal/engine/probe.go @@ -15,6 +15,7 @@ type StreamProbe struct { // VideoCodec lowercased — e.g. "h264", "hevc", "av1", "vp9", "mpeg4". VideoCodec string // AudioCodec lowercased — e.g. "aac", "ac3", "dts", "eac3", "opus". + // Reflects the default/first audio track for legacy single-track callers. AudioCodec string // Width / Height of the primary video stream. Width int @@ -27,6 +28,43 @@ type StreamProbe struct { DurationSec float64 // Container is the file extension lowercased (".mp4", ".mkv", ".avi"). Container string + // AudioTracks lists every audio stream in source order. Index in this + // slice == ffmpeg `-map 0:a:N` index (where N starts at 0). + AudioTracks []ProbeAudioTrack + // SubtitleTracks lists every subtitle stream in source order. Index in + // this slice == ffmpeg `-map 0:s:N` index. + SubtitleTracks []ProbeSubtitleTrack +} + +// ProbeAudioTrack is a slimmed AudioTrack view tied to ffmpeg stream index. +type ProbeAudioTrack struct { + Index int // 0-based audio stream index (ffmpeg -map 0:a:Index) + Lang string // ISO 639-1 + Codec string // lowercased + Channels int + Title string + Default bool +} + +// ProbeSubtitleTrack is a slimmed SubtitleTrack view tied to ffmpeg stream index. +// Codec discriminates text (srt/ass/webvtt → extract to WebVTT) vs bitmap +// (pgs/dvbsub → require burn-in). +type ProbeSubtitleTrack struct { + Index int // 0-based subtitle stream index (ffmpeg -map 0:s:Index) + Lang string // ISO 639-1 + Codec string // lowercased — "subrip", "ass", "webvtt", "hdmv_pgs_subtitle", ... + Title string + Forced bool +} + +// IsTextSubtitle reports whether a subtitle codec can be extracted to WebVTT +// without re-rendering. Bitmap subs (PGS, DVB) need burn-in. +func (s ProbeSubtitleTrack) IsTextSubtitle() bool { + switch s.Codec { + case "subrip", "srt", "ass", "ssa", "webvtt", "mov_text": + return true + } + return false } // TranscodeAction tells the streaming pipeline how to feed the file to @@ -74,6 +112,29 @@ func ProbeFile(ctx context.Context, ffprobePath, filePath string) (*StreamProbe, } } probe.AudioCodec = strings.ToLower(picked.Codec) + probe.AudioTracks = make([]ProbeAudioTrack, 0, len(mi.Audio)) + for i, a := range mi.Audio { + probe.AudioTracks = append(probe.AudioTracks, ProbeAudioTrack{ + Index: i, + Lang: a.Lang, + Codec: strings.ToLower(a.Codec), + Channels: a.Channels, + Title: a.Title, + Default: a.Default, + }) + } + } + if len(mi.Subtitles) > 0 { + probe.SubtitleTracks = make([]ProbeSubtitleTrack, 0, len(mi.Subtitles)) + for i, s := range mi.Subtitles { + probe.SubtitleTracks = append(probe.SubtitleTracks, ProbeSubtitleTrack{ + Index: i, + Lang: s.Lang, + Codec: strings.ToLower(s.Codec), + Title: s.Title, + Forced: s.Forced, + }) + } } return probe, nil } diff --git a/internal/engine/stream_server.go b/internal/engine/stream_server.go index 2a6c72f..82b0a29 100644 --- a/internal/engine/stream_server.go +++ b/internal/engine/stream_server.go @@ -52,6 +52,8 @@ type StreamServer struct { upnpMapping *UPnPMapping disableUPnP bool + hls *HLSSessionRegistry // HLS sessions served on /hls//... + lastActivity atomic.Int64 maxByteOffset atomic.Int64 // highest sequential read position (main playback connection) totalFileSize atomic.Int64 @@ -64,15 +66,20 @@ type StreamServer struct { // NewStreamServer creates a stream server bound to the given port. // Call Listen() to start accepting connections, then SetFile() to serve content. func NewStreamServer(port int) *StreamServer { - return &StreamServer{port: port} + return &StreamServer{port: port, hls: NewHLSSessionRegistry()} } +// HLS returns the HLS session registry for this server. Daemon code uses it +// to register a session when the backend asks for HLS playback. +func (ss *StreamServer) HLS() *HLSSessionRegistry { return ss.hls } + // Listen starts the HTTP server on the configured port. Call once at daemon startup. func (ss *StreamServer) Listen(ctx context.Context) error { mux := http.NewServeMux() mux.HandleFunc("/stream", ss.handler) mux.HandleFunc("/health", ss.healthHandler) mux.HandleFunc("/playlist.m3u", ss.playlistHandler) + mux.HandleFunc("/hls/", ss.hlsHandler) // SO_REUSEADDR allows immediate rebind if the port is in TIME_WAIT (e.g. after agent restart) lc := net.ListenConfig{ @@ -230,12 +237,146 @@ func (ss *StreamServer) IdleSince() time.Duration { // Call only at daemon shutdown — NOT between file swaps. func (ss *StreamServer) Shutdown(ctx context.Context) error { ss.upnpMapping.Remove() + if ss.hls != nil { + ss.hls.CloseAll() + } if ss.server != nil { return ss.server.Shutdown(ctx) } return nil } +// hlsBaseURLs returns the per-network HLS base URLs for a given session. +// The web client picks the first reachable one — same fallback strategy as +// the legacy /stream URLs. +func (ss *StreamServer) hlsBaseURLs(sessionID string) StreamURLs { + var out StreamURLs + if ss.urls.LAN != "" { + out.LAN = strings.Replace(ss.urls.LAN, "/stream", "/hls/"+sessionID, 1) + } + if ss.urls.Tailscale != "" { + out.Tailscale = strings.Replace(ss.urls.Tailscale, "/stream", "/hls/"+sessionID, 1) + } + if ss.urls.Public != "" { + out.Public = strings.Replace(ss.urls.Public, "/stream", "/hls/"+sessionID, 1) + } + return out +} + +// HLSURLsJSON returns base URLs for an HLS session as a JSON string for the +// session response payload. +func (ss *StreamServer) HLSURLsJSON(sessionID string) string { + urls := ss.hlsBaseURLs(sessionID) + b, _ := json.Marshal(urls) + return string(b) +} + +// hlsHandler routes /hls// to the matching HLSSession. +// +// Recognised resources: +// +// master.m3u8 — top-level playlist +// video/index.m3u8 — video media playlist +// video/init.mp4 — fMP4 init segment +// video/seg-.m4s — video segment +// subs/sub-.m3u8 — per-subtitle media playlist (synthesised) +// subs/sub-.vtt — WebVTT subtitle (extracted by ffmpeg) +func (ss *StreamServer) hlsHandler(w http.ResponseWriter, r *http.Request) { + ss.lastActivity.Store(time.Now().UnixNano()) + + // CORS for app.torrentclaw.com → 127.0.0.1/Tailscale daemon. + if origin := r.Header.Get("Origin"); origin != "" { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Range") + w.Header().Set("Access-Control-Expose-Headers", "Content-Length, Content-Range, Accept-Ranges") + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + } + + rest := strings.TrimPrefix(r.URL.Path, "/hls/") + parts := strings.SplitN(rest, "/", 2) + if len(parts) == 0 || parts[0] == "" { + http.Error(w, "missing session id", http.StatusNotFound) + return + } + sessionID := parts[0] + session := ss.hls.Get(sessionID) + if session == nil { + http.Error(w, "hls session not found", http.StatusNotFound) + return + } + if len(parts) == 1 { + http.Error(w, "missing resource", http.StatusNotFound) + return + } + resource := parts[1] + + switch { + case resource == "master.m3u8": + session.ServeMaster(w, r) + case resource == "video/index.m3u8": + session.ServeVideoPlaylist(w, r) + case resource == "video/init.mp4": + session.ServeInit(w, r) + case strings.HasPrefix(resource, "video/seg-") && strings.HasSuffix(resource, ".m4s"): + idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "video/seg-"), ".m4s") + idx, err := strconv.Atoi(idxStr) + if err != nil { + http.Error(w, "bad segment index", http.StatusBadRequest) + return + } + session.ServeSegment(w, r, idx) + case strings.HasPrefix(resource, "subs/sub-") && strings.HasSuffix(resource, ".m3u8"): + idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "subs/sub-"), ".m3u8") + idx, err := strconv.Atoi(idxStr) + if err != nil { + http.Error(w, "bad subtitle index", http.StatusBadRequest) + return + } + ss.serveSubtitlePlaylist(w, r, session, idx) + case strings.HasPrefix(resource, "subs/sub-") && strings.HasSuffix(resource, ".vtt"): + idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "subs/sub-"), ".vtt") + idx, err := strconv.Atoi(idxStr) + if err != nil { + http.Error(w, "bad subtitle index", http.StatusBadRequest) + return + } + session.ServeSubtitle(w, r, idx) + default: + http.Error(w, "unknown hls resource", http.StatusNotFound) + } +} + +// serveSubtitlePlaylist generates a single-VTT-segment HLS playlist on the +// fly so hls.js can consume it as a regular subtitle rendition. The VTT file +// itself is extracted asynchronously by HLSSession.extractSubtitles. +func (ss *StreamServer) serveSubtitlePlaylist(w http.ResponseWriter, r *http.Request, session *HLSSession, idx int) { + if idx < 0 || idx >= len(session.probe.SubtitleTracks) { + http.Error(w, "subtitle out of range", http.StatusNotFound) + return + } + dur := session.durationSec + if dur < 1 { + dur = 1 + } + body := strings.Builder{} + body.WriteString("#EXTM3U\n") + body.WriteString("#EXT-X-VERSION:3\n") + body.WriteString("#EXT-X-PLAYLIST-TYPE:VOD\n") + body.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(dur)+1)) + body.WriteString("#EXT-X-MEDIA-SEQUENCE:0\n") + body.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n", dur)) + body.WriteString(fmt.Sprintf("sub-%d.vtt\n", idx)) + body.WriteString("#EXT-X-ENDLIST\n") + + w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + w.Header().Set("Cache-Control", "no-cache") + _, _ = io.WriteString(w, body.String()) +} + // healthHandler responde con el estado del servidor en JSON. // Útil para diagnosticar conectividad desde redes remotas o Tailscale: //