feat(streaming): add HLS transport pipeline (daemon side)

Introduces an HLS-over-HTTP path as Plan B for in-browser streaming. The
WebRTC + MSE pipeline keeps working untouched; the new path is selected
when the backend sets transport="hls" on a streaming session.

Daemon scope:
- engine/hls.go: HLSSession + HLSSessionRegistry. Spawns ffmpeg with
  -f hls -hls_segment_type fmp4 + force_key_frames aligned with 4 s
  segments. Pre-renders master + media playlists from the probe duration
  so the browser knows the total timeline before any segment exists,
  fixing seek/duration/pause/multi-track issues seen with the live fMP4
  pipe.
- engine/probe.go: enumerate every audio + subtitle track instead of
  collapsing to a single default audio track.
- engine/stream_server.go: route /hls/<id>/{master.m3u8,video/...,
  subs/...} to the matching session. Emit a synthesised single-VTT
  subtitle playlist per text track; bitmap subs (PGS/DVB) skip silently.
- cmd/daemon.go: branch on WebRTCSession.Transport == "hls" to register
  an HLS session instead of running the legacy DataChannel pump.
- agent/types.go: WebRTCSession.Transport + AudioIndex fields.

Backend + web sides land in a follow-up commit.
This commit is contained in:
Deivid Soto 2026-05-07 16:10:22 +02:00
parent 81abc4acca
commit 0fc0e1c21a
5 changed files with 1032 additions and 5 deletions

788
internal/engine/hls.go Normal file
View file

@ -0,0 +1,788 @@
// Package engine — hls.go implements the HLS streaming pipeline.
//
// Browser ↔ daemon over plain HTTP (LAN / Tailscale / UPnP). The daemon runs
// ffmpeg in `-f hls` mode, writing fragmented MP4 segments to a per-session
// tmpdir. Master + media playlists are pre-rendered from the probed source
// duration so the player knows the full timeline before any segment exists,
// which fixes the seek/duration/pause/multi-track problems we hit with the
// raw fMP4-over-WebRTC pipeline.
//
// One HLSSession == one browser playback. Sessions are registered in a
// process-wide map keyed by session ID; the StreamServer routes
// GET /hls/<id>/master.m3u8
// GET /hls/<id>/video/index.m3u8
// GET /hls/<id>/video/init.mp4
// GET /hls/<id>/video/seg-<n>.m4s
// GET /hls/<id>/subs/<lang>.vtt
// to the matching session.
package engine
import (
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
// hlsSegmentDuration is the target seconds per HLS fragment. Four seconds is
// the Plex/Apple default — short enough that seek granularity is acceptable,
// long enough that GOP overhead doesn't dominate.
const hlsSegmentDuration = 4
// hlsSessionTTL is how long a session can sit idle (no segment requests)
// before the manager kills ffmpeg + cleans the tmpdir.
const hlsSessionTTL = 30 * time.Minute
// hlsTmpDirRoot returns the per-user tmpdir root for HLS sessions.
//
// Linux: ~/.cache/unarr/hls-sessions
// macOS: ~/Library/Caches/unarr/hls-sessions
// Windows: %LOCALAPPDATA%/unarr/hls-sessions
//
// Falls back to os.TempDir() if the user cache dir can't be resolved.
func hlsTmpDirRoot() string {
if dir, err := os.UserCacheDir(); err == nil {
return filepath.Join(dir, "unarr", "hls-sessions")
}
return filepath.Join(os.TempDir(), "unarr-hls-sessions")
}
// HLSSessionConfig describes a single browser playback session driven by HLS.
type HLSSessionConfig struct {
SessionID string
SourcePath string
FileName string
Quality string // "2160p"|"1080p"|"720p"|"480p"|"original"|""
AudioIndex int // 0-based ffmpeg audio stream selection (-map 0:a:N). -1 = default.
Transcode TranscodeRuntime
}
// HLSSession owns a tmpdir + ffmpeg subprocess producing HLS fragments.
type HLSSession struct {
cfg HLSSessionConfig
probe *StreamProbe
tmpDir string
durationSec float64
segmentCount int
manifestVideo string // pre-rendered video media playlist
manifestRoot string // pre-rendered master playlist
mu sync.Mutex
cmd *exec.Cmd
cancel context.CancelFunc
closed bool
startedAt time.Time
lastTouch time.Time
// readyCond + readyMax track which segments ffmpeg has finished writing.
// Handlers waiting on a future segment block on readyCond until the
// poller advances readyMax past their index (or ffmpeg exits).
readyMu sync.Mutex
readyMax int // highest segment index whose .m4s file is fully written
exitErr error
exited bool
readyCh chan struct{} // closed + replaced each time readyMax advances
}
// HLSSessionRegistry tracks active sessions keyed by ID.
type HLSSessionRegistry struct {
mu sync.RWMutex
sessions map[string]*HLSSession
}
// NewHLSSessionRegistry returns an empty registry.
func NewHLSSessionRegistry() *HLSSessionRegistry {
return &HLSSessionRegistry{sessions: make(map[string]*HLSSession)}
}
// Get fetches a session by ID; returns nil if not registered.
func (r *HLSSessionRegistry) Get(id string) *HLSSession {
r.mu.RLock()
defer r.mu.RUnlock()
return r.sessions[id]
}
// Register adds a session under its ID. Replaces any previous session with
// the same ID (which is closed first to release ffmpeg + tmpdir).
func (r *HLSSessionRegistry) Register(s *HLSSession) {
r.mu.Lock()
defer r.mu.Unlock()
if prev, ok := r.sessions[s.cfg.SessionID]; ok {
_ = prev.Close()
}
r.sessions[s.cfg.SessionID] = s
}
// Remove drops a session from the registry without closing it.
func (r *HLSSessionRegistry) Remove(id string) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.sessions, id)
}
// CloseAll terminates every active session. Call at daemon shutdown.
func (r *HLSSessionRegistry) CloseAll() {
r.mu.Lock()
sessions := make([]*HLSSession, 0, len(r.sessions))
for _, s := range r.sessions {
sessions = append(sessions, s)
}
r.sessions = make(map[string]*HLSSession)
r.mu.Unlock()
for _, s := range sessions {
_ = s.Close()
}
}
// SweepIdle closes sessions that have not been touched within hlsSessionTTL.
// Returns the number of sessions reaped.
func (r *HLSSessionRegistry) SweepIdle() int {
r.mu.Lock()
stale := make([]*HLSSession, 0)
for id, s := range r.sessions {
s.mu.Lock()
idle := time.Since(s.lastTouch)
s.mu.Unlock()
if idle > hlsSessionTTL {
stale = append(stale, s)
delete(r.sessions, id)
}
}
r.mu.Unlock()
for _, s := range stale {
_ = s.Close()
}
return len(stale)
}
// StartHLSSession probes the source, builds the playlists, spawns ffmpeg,
// and returns a HLSSession ready to serve HTTP requests. Caller must register
// the session with a HLSSessionRegistry so the server can route to it.
func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, error) {
if cfg.SessionID == "" {
return nil, errors.New("hls: empty session id")
}
if cfg.SourcePath == "" {
return nil, errors.New("hls: empty source path")
}
if cfg.Transcode.FFmpegPath == "" || cfg.Transcode.FFprobePath == "" {
return nil, errors.New("hls: ffmpeg/ffprobe not available")
}
probe, err := ProbeFile(ctx, cfg.Transcode.FFprobePath, cfg.SourcePath)
if err != nil {
return nil, fmt.Errorf("hls: probe: %w", err)
}
if probe.DurationSec <= 0 {
return nil, errors.New("hls: source has no duration")
}
tmpDir := filepath.Join(hlsTmpDirRoot(), cfg.SessionID)
if err := os.MkdirAll(filepath.Join(tmpDir, "video"), 0o755); err != nil {
return nil, fmt.Errorf("hls: mkdir video: %w", err)
}
if err := os.MkdirAll(filepath.Join(tmpDir, "subs"), 0o755); err != nil {
return nil, fmt.Errorf("hls: mkdir subs: %w", err)
}
segCount := int((probe.DurationSec + float64(hlsSegmentDuration) - 1) / float64(hlsSegmentDuration))
if segCount < 1 {
segCount = 1
}
s := &HLSSession{
cfg: cfg,
probe: probe,
tmpDir: tmpDir,
durationSec: probe.DurationSec,
segmentCount: segCount,
startedAt: time.Now(),
lastTouch: time.Now(),
readyCh: make(chan struct{}),
}
s.manifestVideo = renderVideoPlaylist(probe.DurationSec, segCount)
s.manifestRoot = renderMasterPlaylist(probe, cfg.Quality)
// Spawn ffmpeg under a dedicated context so Close() can kill it without
// touching the parent ctx.
ffCtx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
args := buildHLSFFmpegArgs(cfg, probe, tmpDir)
cmd := exec.CommandContext(ffCtx, cfg.Transcode.FFmpegPath, args...)
cmd.Stderr = &hlsStderrCapture{owner: s}
if err := cmd.Start(); err != nil {
cancel()
_ = os.RemoveAll(tmpDir)
return nil, fmt.Errorf("hls: start ffmpeg: %w", err)
}
s.cmd = cmd
go s.waitFFmpeg()
go s.pollSegments(ffCtx)
if len(probe.SubtitleTracks) > 0 {
go s.extractSubtitles(ffCtx)
}
log.Printf("[hls %s] started: %s, %.1fs, %d segs (quality=%s)",
shortHLSID(cfg.SessionID), filepath.Base(cfg.SourcePath),
probe.DurationSec, segCount, coalesce(cfg.Quality, "auto"))
return s, nil
}
// shortHLSID truncates a session ID for log lines.
func shortHLSID(id string) string {
if len(id) > 8 {
return id[:8]
}
return id
}
// MasterPlaylist returns the rendered master.m3u8 contents.
func (s *HLSSession) MasterPlaylist() string { return s.manifestRoot }
// VideoPlaylist returns the rendered video media playlist contents.
func (s *HLSSession) VideoPlaylist() string { return s.manifestVideo }
// DurationSeconds returns the source duration in seconds.
func (s *HLSSession) DurationSeconds() float64 { return s.durationSec }
// Probe returns the probe metadata used to start the session.
func (s *HLSSession) Probe() *StreamProbe { return s.probe }
// Touch updates the last-activity timestamp; the registry sweeper compares
// this against hlsSessionTTL.
func (s *HLSSession) Touch() {
s.mu.Lock()
s.lastTouch = time.Now()
s.mu.Unlock()
}
// Close stops ffmpeg, deletes the tmpdir, and prevents further requests from
// blocking on segment readiness. Idempotent.
func (s *HLSSession) Close() error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return nil
}
s.closed = true
cancel := s.cancel
tmpDir := s.tmpDir
s.mu.Unlock()
if cancel != nil {
cancel()
}
// Unblock any handler waiting on readyCh.
s.readyMu.Lock()
if s.readyCh != nil {
close(s.readyCh)
s.readyCh = nil
}
s.exited = true
s.readyMu.Unlock()
if tmpDir != "" {
_ = os.RemoveAll(tmpDir)
}
log.Printf("[hls %s] closed", shortHLSID(s.cfg.SessionID))
return nil
}
// waitFFmpeg reaps the ffmpeg process and records its exit error for handlers.
func (s *HLSSession) waitFFmpeg() {
err := s.cmd.Wait()
s.readyMu.Lock()
s.exitErr = err
s.exited = true
if s.readyCh != nil {
close(s.readyCh)
s.readyCh = nil
}
s.readyMu.Unlock()
if err != nil && !s.isClosed() {
log.Printf("[hls %s] ffmpeg exited: %v", shortHLSID(s.cfg.SessionID), err)
}
}
// pollSegments watches the video tmpdir for newly-finished .m4s files and
// advances readyMax. ffmpeg writes a segment by first creating an empty
// file, then closing+renaming on completion (atomic-replace), so we use
// stat size > 0 + presence of the *next* segment as proof the previous one
// is done. For the last segment, ffmpeg's exit terminates the wait.
func (s *HLSSession) pollSegments(ctx context.Context) {
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
videoDir := filepath.Join(s.tmpDir, "video")
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
// Walk segment files and find the highest contiguous index whose
// successor exists (which proves the segment is fully closed).
s.readyMu.Lock()
start := s.readyMax
exited := s.exited
s.readyMu.Unlock()
highest := start
for i := start; i < s.segmentCount; i++ {
cur := filepath.Join(videoDir, fmt.Sprintf("seg-%d.m4s", i))
next := filepath.Join(videoDir, fmt.Sprintf("seg-%d.m4s", i+1))
ci, err := os.Stat(cur)
if err != nil || ci.Size() == 0 {
break
}
// Last segment is "ready" only when ffmpeg has exited (no successor
// can ever appear) or when a later segment exists.
if i == s.segmentCount-1 {
if !exited {
break
}
highest = i + 1
break
}
if _, err := os.Stat(next); err != nil {
break
}
highest = i + 1
}
if highest > start {
s.readyMu.Lock()
s.readyMax = highest
ch := s.readyCh
s.readyCh = make(chan struct{})
s.readyMu.Unlock()
if ch != nil {
close(ch)
}
}
if exited && highest >= s.segmentCount {
return
}
}
}
// waitForSegment blocks until segment idx has been fully written, ffmpeg
// has exited, or ctx is cancelled. Returns nil iff the segment file is
// safe to read at return time.
func (s *HLSSession) waitForSegment(ctx context.Context, idx int) error {
deadline := time.Now().Add(60 * time.Second)
for {
s.readyMu.Lock()
ready := idx < s.readyMax
exited := s.exited
ch := s.readyCh
exitErr := s.exitErr
s.readyMu.Unlock()
if ready {
return nil
}
if exited {
if exitErr != nil {
return fmt.Errorf("hls: ffmpeg exited: %w", exitErr)
}
return errors.New("hls: ffmpeg exited before segment ready")
}
select {
case <-ctx.Done():
return ctx.Err()
case <-ch:
// loop and re-check
case <-time.After(time.Until(deadline)):
return errors.New("hls: timeout waiting for segment")
}
if time.Now().After(deadline) {
return errors.New("hls: timeout waiting for segment")
}
}
}
// isClosed reports whether Close() has been invoked.
func (s *HLSSession) isClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.closed
}
// ---- HTTP handlers ----
// ServeMaster writes master.m3u8 to w.
func (s *HLSSession) ServeMaster(w http.ResponseWriter, r *http.Request) {
s.Touch()
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
w.Header().Set("Cache-Control", "no-cache")
_, _ = io.WriteString(w, s.manifestRoot)
}
// ServeVideoPlaylist writes the video media playlist (index.m3u8) to w.
func (s *HLSSession) ServeVideoPlaylist(w http.ResponseWriter, r *http.Request) {
s.Touch()
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
w.Header().Set("Cache-Control", "no-cache")
_, _ = io.WriteString(w, s.manifestVideo)
}
// ServeInit writes init.mp4 (the fMP4 init segment) to w.
func (s *HLSSession) ServeInit(w http.ResponseWriter, r *http.Request) {
s.Touch()
path := filepath.Join(s.tmpDir, "video", "init.mp4")
// Init segment is the first thing ffmpeg writes — wait briefly for it.
deadline := time.Now().Add(30 * time.Second)
for {
if fi, err := os.Stat(path); err == nil && fi.Size() > 0 {
break
}
if s.isClosed() || time.Now().After(deadline) {
http.Error(w, "init segment unavailable", http.StatusServiceUnavailable)
return
}
time.Sleep(150 * time.Millisecond)
}
w.Header().Set("Content-Type", "video/mp4")
w.Header().Set("Cache-Control", "max-age=3600")
http.ServeFile(w, r, path)
}
// ServeSegment writes the requested video segment, blocking until ffmpeg
// produces it (capped by waitForSegment timeout).
func (s *HLSSession) ServeSegment(w http.ResponseWriter, r *http.Request, idx int) {
s.Touch()
if idx < 0 || idx >= s.segmentCount {
http.Error(w, "segment out of range", http.StatusNotFound)
return
}
if err := s.waitForSegment(r.Context(), idx); err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
path := filepath.Join(s.tmpDir, "video", fmt.Sprintf("seg-%d.m4s", idx))
w.Header().Set("Content-Type", "video/mp4")
w.Header().Set("Cache-Control", "max-age=3600")
http.ServeFile(w, r, path)
}
// ServeSubtitle writes the WebVTT subtitle for the requested track index, if
// extraction has finished.
func (s *HLSSession) ServeSubtitle(w http.ResponseWriter, r *http.Request, idx int) {
s.Touch()
if idx < 0 || idx >= len(s.probe.SubtitleTracks) {
http.Error(w, "subtitle track not found", http.StatusNotFound)
return
}
path := filepath.Join(s.tmpDir, "subs", fmt.Sprintf("sub-%d.vtt", idx))
deadline := time.Now().Add(15 * time.Second)
for {
if fi, err := os.Stat(path); err == nil && fi.Size() > 0 {
break
}
if s.isClosed() || time.Now().After(deadline) {
http.Error(w, "subtitle not yet extracted", http.StatusServiceUnavailable)
return
}
time.Sleep(200 * time.Millisecond)
}
w.Header().Set("Content-Type", "text/vtt; charset=utf-8")
w.Header().Set("Cache-Control", "max-age=3600")
http.ServeFile(w, r, path)
}
// ---- ffmpeg argument builders ----
// buildHLSFFmpegArgs returns the argv for the main HLS encode. It always
// re-encodes video + audio so segment boundaries align with -force_key_frames.
// Pure -c copy can be added later for h264+aac+mp4 sources where the GOP is
// already short enough; keeping it simple for the MVP.
func buildHLSFFmpegArgs(cfg HLSSessionConfig, probe *StreamProbe, tmpDir string) []string {
hwHint := cfg.Transcode.HWAccel
args := []string{"-y", "-hide_banner", "-loglevel", "warning"}
switch hwHint {
case HWAccelNVENC:
args = append(args, "-hwaccel", "cuda")
case HWAccelQSV:
args = append(args, "-hwaccel", "qsv")
case HWAccelVAAPI:
args = append(args, "-hwaccel", "vaapi", "-hwaccel_output_format", "vaapi")
case HWAccelNone, HWAccelVideoToolbox:
// No demuxer-side hint.
}
args = append(args, "-i", cfg.SourcePath)
// Map video + selected audio. Always use first video stream.
args = append(args, "-map", "0:v:0")
audioIdx := cfg.AudioIndex
if audioIdx < 0 {
audioIdx = 0
for i, a := range probe.AudioTracks {
if a.Default {
audioIdx = i
break
}
}
}
args = append(args, "-map", fmt.Sprintf("0:a:%d?", audioIdx))
// Video encode.
codec := hwHint.FFmpegVideoCodec("h264")
args = append(args, "-c:v", codec)
if codec == "libx264" {
preset := cfg.Transcode.Preset
if preset == "" {
preset = "veryfast"
}
args = append(args, "-preset", preset)
}
args = append(args, "-profile:v", "main", "-level:v", "4.0")
qcap := resolveQualityCap(cfg.Quality)
bitrate := qcap.VideoBitrate
if bitrate == "" {
bitrate = cfg.Transcode.VideoBitrate
}
if bitrate == "" {
bitrate = "5M"
}
args = append(args, "-b:v", bitrate, "-maxrate", bitrate, "-bufsize", bitrate)
// Force keyframe alignment with segment boundaries.
args = append(args, "-force_key_frames", fmt.Sprintf("expr:gte(t,n_forced*%d)", hlsSegmentDuration))
// Filter chain: optional scale, force 8-bit yuv420p, normalise color metadata.
maxH := qcap.MaxHeight
if maxH == 0 {
maxH = cfg.Transcode.MaxHeight
}
var filterChain string
if maxH > 0 && probe.Height > maxH {
filterChain = fmt.Sprintf(
"scale=-2:%d:force_original_aspect_ratio=decrease,format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv",
maxH,
)
} else {
filterChain = "format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv"
}
args = append(args, "-vf", filterChain)
// Audio: AAC stereo 48 kHz — broadest browser compatibility.
audioBitrate := cfg.Transcode.AudioBitrate
if audioBitrate == "" {
audioBitrate = "192k"
}
args = append(args,
"-c:a", "aac",
"-b:a", audioBitrate,
"-ar", "48000",
"-ac", "2",
)
// HLS muxer — fmp4 segments with pre-computed segment count.
videoDir := filepath.Join(tmpDir, "video")
args = append(args,
"-f", "hls",
"-hls_time", strconv.Itoa(hlsSegmentDuration),
"-hls_playlist_type", "vod",
"-hls_segment_type", "fmp4",
"-hls_list_size", "0",
"-hls_fmp4_init_filename", "init.mp4",
"-hls_segment_filename", filepath.Join(videoDir, "seg-%d.m4s"),
filepath.Join(videoDir, "ffmpeg.m3u8"),
)
return args
}
// extractSubtitles spawns short-lived ffmpeg jobs to convert each text-based
// subtitle track to WebVTT in parallel. Bitmap subs (PGS, DVB) are skipped —
// they would require burn-in into the video encode, which is out of scope.
func (s *HLSSession) extractSubtitles(ctx context.Context) {
subsDir := filepath.Join(s.tmpDir, "subs")
for i, sub := range s.probe.SubtitleTracks {
if !sub.IsTextSubtitle() {
continue
}
out := filepath.Join(subsDir, fmt.Sprintf("sub-%d.vtt", i))
args := []string{
"-y", "-hide_banner", "-loglevel", "warning",
"-i", s.cfg.SourcePath,
"-map", fmt.Sprintf("0:s:%d?", i),
"-c:s", "webvtt",
out,
}
// Run sequentially to avoid hammering the disk; subtitle extraction
// is fast enough that parallelism isn't worth the complexity.
cmd := exec.CommandContext(ctx, s.cfg.Transcode.FFmpegPath, args...)
if err := cmd.Run(); err != nil {
if ctx.Err() != nil {
return
}
log.Printf("[hls %s] subtitle %d (%s) extract failed: %v",
shortHLSID(s.cfg.SessionID), i, sub.Lang, err)
continue
}
}
}
// ---- Manifest rendering ----
// renderVideoPlaylist builds the VOD media playlist for the video stream.
// Segment count is derived from the source duration — the player learns the
// total timeline from the manifest before any segment is fetched.
func renderVideoPlaylist(durationSec float64, segCount int) string {
var b strings.Builder
b.WriteString("#EXTM3U\n")
b.WriteString("#EXT-X-VERSION:7\n")
b.WriteString("#EXT-X-PLAYLIST-TYPE:VOD\n")
b.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", hlsSegmentDuration+1))
b.WriteString("#EXT-X-MEDIA-SEQUENCE:0\n")
b.WriteString(`#EXT-X-MAP:URI="init.mp4"` + "\n")
remaining := durationSec
for i := 0; i < segCount; i++ {
segDur := float64(hlsSegmentDuration)
if remaining < segDur {
segDur = remaining
}
b.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n", segDur))
b.WriteString(fmt.Sprintf("seg-%d.m4s\n", i))
remaining -= segDur
}
b.WriteString("#EXT-X-ENDLIST\n")
return b.String()
}
// renderMasterPlaylist builds the top-level master playlist with the single
// video variant + every text subtitle as an EXT-X-MEDIA group. Audio is muxed
// into the video segments for the MVP — separate audio renditions can come
// later (they require a second ffmpeg pipeline producing audio-only segments).
func renderMasterPlaylist(probe *StreamProbe, qualityLabel string) string {
var b strings.Builder
b.WriteString("#EXTM3U\n")
b.WriteString("#EXT-X-VERSION:7\n")
// Subtitle renditions.
hasSubs := false
for i, s := range probe.SubtitleTracks {
if !s.IsTextSubtitle() {
continue
}
hasSubs = true
lang := s.Lang
if lang == "" {
lang = "und"
}
name := s.Title
if name == "" {
name = strings.ToUpper(lang)
}
def := "NO"
if s.Forced || i == 0 {
def = "YES"
}
b.WriteString(fmt.Sprintf(
`#EXT-X-MEDIA:TYPE=SUBTITLES,GROUP-ID="subs",NAME=%q,LANGUAGE=%q,DEFAULT=%s,AUTOSELECT=YES,FORCED=%s,URI="subs/sub-%d.m3u8"`+"\n",
name, lang, def, ynBool(s.Forced), i,
))
}
// Video variant. Bandwidth + resolution are best-effort estimates from probe.
bw := bitrateForQuality(qualityLabel)
w, h := scaledDimensions(probe.Width, probe.Height, qualityHeight(qualityLabel))
codecs := `avc1.4D4028,mp4a.40.2`
streamInf := fmt.Sprintf("#EXT-X-STREAM-INF:BANDWIDTH=%d,RESOLUTION=%dx%d,CODECS=%q", bw, w, h, codecs)
if hasSubs {
streamInf += `,SUBTITLES="subs"`
}
b.WriteString(streamInf + "\n")
b.WriteString("video/index.m3u8\n")
return b.String()
}
func ynBool(b bool) string {
if b {
return "YES"
}
return "NO"
}
// bitrateForQuality returns a synthetic bandwidth attribute for the master
// playlist's STREAM-INF — only used by ABR logic, which we don't run yet.
func bitrateForQuality(q string) int {
switch q {
case "2160p":
return 25_000_000
case "1080p":
return 6_000_000
case "720p":
return 3_500_000
case "480p":
return 1_500_000
}
return 6_000_000
}
func qualityHeight(q string) int {
switch q {
case "2160p":
return 2160
case "1080p":
return 1080
case "720p":
return 720
case "480p":
return 480
}
return 0
}
// scaledDimensions returns (width, height) after applying a height cap that
// preserves the source aspect ratio. capH=0 returns the original dims.
func scaledDimensions(srcW, srcH, capH int) (int, int) {
if srcW <= 0 || srcH <= 0 {
return 1920, 1080
}
if capH == 0 || srcH <= capH {
return srcW, srcH
}
w := srcW * capH / srcH
if w%2 != 0 {
w++
}
return w, capH
}
// ---- Logger plumbing ----
// hlsStderrCapture forwards ffmpeg stderr lines to the daemon log prefixed by
// the session ID, so failures are visible without spelunking tmpdirs.
type hlsStderrCapture struct {
owner *HLSSession
buf strings.Builder
}
func (c *hlsStderrCapture) Write(p []byte) (int, error) {
c.buf.Write(p)
for {
line, rest, ok := strings.Cut(c.buf.String(), "\n")
if !ok {
break
}
c.buf.Reset()
c.buf.WriteString(rest)
if line = strings.TrimSpace(line); line != "" {
log.Printf("[hls %s] ffmpeg: %s", shortHLSID(c.owner.cfg.SessionID), line)
}
}
return len(p), nil
}

View file

@ -15,6 +15,7 @@ type StreamProbe struct {
// VideoCodec lowercased — e.g. "h264", "hevc", "av1", "vp9", "mpeg4".
VideoCodec string
// AudioCodec lowercased — e.g. "aac", "ac3", "dts", "eac3", "opus".
// Reflects the default/first audio track for legacy single-track callers.
AudioCodec string
// Width / Height of the primary video stream.
Width int
@ -27,6 +28,43 @@ type StreamProbe struct {
DurationSec float64
// Container is the file extension lowercased (".mp4", ".mkv", ".avi").
Container string
// AudioTracks lists every audio stream in source order. Index in this
// slice == ffmpeg `-map 0:a:N` index (where N starts at 0).
AudioTracks []ProbeAudioTrack
// SubtitleTracks lists every subtitle stream in source order. Index in
// this slice == ffmpeg `-map 0:s:N` index.
SubtitleTracks []ProbeSubtitleTrack
}
// ProbeAudioTrack is a slimmed AudioTrack view tied to ffmpeg stream index.
type ProbeAudioTrack struct {
Index int // 0-based audio stream index (ffmpeg -map 0:a:Index)
Lang string // ISO 639-1
Codec string // lowercased
Channels int
Title string
Default bool
}
// ProbeSubtitleTrack is a slimmed SubtitleTrack view tied to ffmpeg stream index.
// Codec discriminates text (srt/ass/webvtt → extract to WebVTT) vs bitmap
// (pgs/dvbsub → require burn-in).
type ProbeSubtitleTrack struct {
Index int // 0-based subtitle stream index (ffmpeg -map 0:s:Index)
Lang string // ISO 639-1
Codec string // lowercased — "subrip", "ass", "webvtt", "hdmv_pgs_subtitle", ...
Title string
Forced bool
}
// IsTextSubtitle reports whether a subtitle codec can be extracted to WebVTT
// without re-rendering. Bitmap subs (PGS, DVB) need burn-in.
func (s ProbeSubtitleTrack) IsTextSubtitle() bool {
switch s.Codec {
case "subrip", "srt", "ass", "ssa", "webvtt", "mov_text":
return true
}
return false
}
// TranscodeAction tells the streaming pipeline how to feed the file to
@ -74,6 +112,29 @@ func ProbeFile(ctx context.Context, ffprobePath, filePath string) (*StreamProbe,
}
}
probe.AudioCodec = strings.ToLower(picked.Codec)
probe.AudioTracks = make([]ProbeAudioTrack, 0, len(mi.Audio))
for i, a := range mi.Audio {
probe.AudioTracks = append(probe.AudioTracks, ProbeAudioTrack{
Index: i,
Lang: a.Lang,
Codec: strings.ToLower(a.Codec),
Channels: a.Channels,
Title: a.Title,
Default: a.Default,
})
}
}
if len(mi.Subtitles) > 0 {
probe.SubtitleTracks = make([]ProbeSubtitleTrack, 0, len(mi.Subtitles))
for i, s := range mi.Subtitles {
probe.SubtitleTracks = append(probe.SubtitleTracks, ProbeSubtitleTrack{
Index: i,
Lang: s.Lang,
Codec: strings.ToLower(s.Codec),
Title: s.Title,
Forced: s.Forced,
})
}
}
return probe, nil
}

View file

@ -52,6 +52,8 @@ type StreamServer struct {
upnpMapping *UPnPMapping
disableUPnP bool
hls *HLSSessionRegistry // HLS sessions served on /hls/<id>/...
lastActivity atomic.Int64
maxByteOffset atomic.Int64 // highest sequential read position (main playback connection)
totalFileSize atomic.Int64
@ -64,15 +66,20 @@ type StreamServer struct {
// NewStreamServer creates a stream server bound to the given port.
// Call Listen() to start accepting connections, then SetFile() to serve content.
func NewStreamServer(port int) *StreamServer {
return &StreamServer{port: port}
return &StreamServer{port: port, hls: NewHLSSessionRegistry()}
}
// HLS returns the HLS session registry for this server. Daemon code uses it
// to register a session when the backend asks for HLS playback.
func (ss *StreamServer) HLS() *HLSSessionRegistry { return ss.hls }
// Listen starts the HTTP server on the configured port. Call once at daemon startup.
func (ss *StreamServer) Listen(ctx context.Context) error {
mux := http.NewServeMux()
mux.HandleFunc("/stream", ss.handler)
mux.HandleFunc("/health", ss.healthHandler)
mux.HandleFunc("/playlist.m3u", ss.playlistHandler)
mux.HandleFunc("/hls/", ss.hlsHandler)
// SO_REUSEADDR allows immediate rebind if the port is in TIME_WAIT (e.g. after agent restart)
lc := net.ListenConfig{
@ -230,12 +237,146 @@ func (ss *StreamServer) IdleSince() time.Duration {
// Call only at daemon shutdown — NOT between file swaps.
func (ss *StreamServer) Shutdown(ctx context.Context) error {
ss.upnpMapping.Remove()
if ss.hls != nil {
ss.hls.CloseAll()
}
if ss.server != nil {
return ss.server.Shutdown(ctx)
}
return nil
}
// hlsBaseURLs returns the per-network HLS base URLs for a given session.
// The web client picks the first reachable one — same fallback strategy as
// the legacy /stream URLs.
func (ss *StreamServer) hlsBaseURLs(sessionID string) StreamURLs {
var out StreamURLs
if ss.urls.LAN != "" {
out.LAN = strings.Replace(ss.urls.LAN, "/stream", "/hls/"+sessionID, 1)
}
if ss.urls.Tailscale != "" {
out.Tailscale = strings.Replace(ss.urls.Tailscale, "/stream", "/hls/"+sessionID, 1)
}
if ss.urls.Public != "" {
out.Public = strings.Replace(ss.urls.Public, "/stream", "/hls/"+sessionID, 1)
}
return out
}
// HLSURLsJSON returns base URLs for an HLS session as a JSON string for the
// session response payload.
func (ss *StreamServer) HLSURLsJSON(sessionID string) string {
urls := ss.hlsBaseURLs(sessionID)
b, _ := json.Marshal(urls)
return string(b)
}
// hlsHandler routes /hls/<sessionID>/<resource> to the matching HLSSession.
//
// Recognised resources:
//
// master.m3u8 — top-level playlist
// video/index.m3u8 — video media playlist
// video/init.mp4 — fMP4 init segment
// video/seg-<n>.m4s — video segment
// subs/sub-<n>.m3u8 — per-subtitle media playlist (synthesised)
// subs/sub-<n>.vtt — WebVTT subtitle (extracted by ffmpeg)
func (ss *StreamServer) hlsHandler(w http.ResponseWriter, r *http.Request) {
ss.lastActivity.Store(time.Now().UnixNano())
// CORS for app.torrentclaw.com → 127.0.0.1/Tailscale daemon.
if origin := r.Header.Get("Origin"); origin != "" {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Range")
w.Header().Set("Access-Control-Expose-Headers", "Content-Length, Content-Range, Accept-Ranges")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
}
rest := strings.TrimPrefix(r.URL.Path, "/hls/")
parts := strings.SplitN(rest, "/", 2)
if len(parts) == 0 || parts[0] == "" {
http.Error(w, "missing session id", http.StatusNotFound)
return
}
sessionID := parts[0]
session := ss.hls.Get(sessionID)
if session == nil {
http.Error(w, "hls session not found", http.StatusNotFound)
return
}
if len(parts) == 1 {
http.Error(w, "missing resource", http.StatusNotFound)
return
}
resource := parts[1]
switch {
case resource == "master.m3u8":
session.ServeMaster(w, r)
case resource == "video/index.m3u8":
session.ServeVideoPlaylist(w, r)
case resource == "video/init.mp4":
session.ServeInit(w, r)
case strings.HasPrefix(resource, "video/seg-") && strings.HasSuffix(resource, ".m4s"):
idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "video/seg-"), ".m4s")
idx, err := strconv.Atoi(idxStr)
if err != nil {
http.Error(w, "bad segment index", http.StatusBadRequest)
return
}
session.ServeSegment(w, r, idx)
case strings.HasPrefix(resource, "subs/sub-") && strings.HasSuffix(resource, ".m3u8"):
idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "subs/sub-"), ".m3u8")
idx, err := strconv.Atoi(idxStr)
if err != nil {
http.Error(w, "bad subtitle index", http.StatusBadRequest)
return
}
ss.serveSubtitlePlaylist(w, r, session, idx)
case strings.HasPrefix(resource, "subs/sub-") && strings.HasSuffix(resource, ".vtt"):
idxStr := strings.TrimSuffix(strings.TrimPrefix(resource, "subs/sub-"), ".vtt")
idx, err := strconv.Atoi(idxStr)
if err != nil {
http.Error(w, "bad subtitle index", http.StatusBadRequest)
return
}
session.ServeSubtitle(w, r, idx)
default:
http.Error(w, "unknown hls resource", http.StatusNotFound)
}
}
// serveSubtitlePlaylist generates a single-VTT-segment HLS playlist on the
// fly so hls.js can consume it as a regular subtitle rendition. The VTT file
// itself is extracted asynchronously by HLSSession.extractSubtitles.
func (ss *StreamServer) serveSubtitlePlaylist(w http.ResponseWriter, r *http.Request, session *HLSSession, idx int) {
if idx < 0 || idx >= len(session.probe.SubtitleTracks) {
http.Error(w, "subtitle out of range", http.StatusNotFound)
return
}
dur := session.durationSec
if dur < 1 {
dur = 1
}
body := strings.Builder{}
body.WriteString("#EXTM3U\n")
body.WriteString("#EXT-X-VERSION:3\n")
body.WriteString("#EXT-X-PLAYLIST-TYPE:VOD\n")
body.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(dur)+1))
body.WriteString("#EXT-X-MEDIA-SEQUENCE:0\n")
body.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n", dur))
body.WriteString(fmt.Sprintf("sub-%d.vtt\n", idx))
body.WriteString("#EXT-X-ENDLIST\n")
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
w.Header().Set("Cache-Control", "no-cache")
_, _ = io.WriteString(w, body.String())
}
// healthHandler responde con el estado del servidor en JSON.
// Útil para diagnosticar conectividad desde redes remotas o Tailscale:
//