diff --git a/internal/agent/signal_client.go b/internal/agent/signal_client.go index b5424f6..27fe2e1 100644 --- a/internal/agent/signal_client.go +++ b/internal/agent/signal_client.go @@ -169,7 +169,11 @@ func (s *SignalEventStream) read() { if eventName == "" || eventName == "signal" { var msg SignalMessage if err := json.Unmarshal(dataBuf.Bytes(), &msg); err == nil { - s.events <- msg + select { + case s.events <- msg: + case <-s.resp.Request.Context().Done(): + return + } } } dataBuf.Reset() diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index 9bd0d9b..17ad49b 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -455,11 +455,16 @@ func runDaemonStart() error { // HLS registry and serve over HTTP; default ("" or "webrtc") runs // the legacy DataChannel pipeline. if strings.EqualFold(sess.Transport, "hls") { + if webrtcRegistry.has(sess.SessionID) { + return + } tcRuntime := buildTranscodeRuntime(ctx, cfg) if tcRuntime.FFmpegPath == "" || tcRuntime.FFprobePath == "" { log.Printf("[hls %s] rejected: ffmpeg/ffprobe unavailable", agent.ShortID(sess.SessionID)) return } + hlsCtx, hlsCancel := context.WithCancel(ctx) + webrtcRegistry.add(sess.SessionID, hlsCancel) hlsCfg := engine.HLSSessionConfig{ SessionID: sess.SessionID, SourcePath: filePath, @@ -468,8 +473,10 @@ func runDaemonStart() error { AudioIndex: sess.AudioIndex, Transcode: tcRuntime, } - hsess, err := engine.StartHLSSession(ctx, hlsCfg) + hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg) if err != nil { + webrtcRegistry.remove(sess.SessionID) + hlsCancel() log.Printf("[hls %s] start failed: %v", agent.ShortID(sess.SessionID), err) return } diff --git a/internal/engine/webrtc_stream.go b/internal/engine/webrtc_stream.go index e3c6378..63fe0fe 100644 --- a/internal/engine/webrtc_stream.go +++ b/internal/engine/webrtc_stream.go @@ -663,6 +663,12 @@ func (p *dataChannelPump) serveRange(streamID uint32, req wire.RangeReqPayload) ctx, cancel := context.WithCancel(context.Background()) p.activeMu.Lock() + if p.active == nil { + p.activeMu.Unlock() + cancel() + p.sendRangeEnd(streamID, 3) + return + } p.active[streamID] = cancel p.activeMu.Unlock() defer func() { diff --git a/internal/streaming/ffmpeg_args.go b/internal/streaming/ffmpeg_args.go deleted file mode 100644 index 1869864..0000000 --- a/internal/streaming/ffmpeg_args.go +++ /dev/null @@ -1,173 +0,0 @@ -package streaming - -import ( - "fmt" - "strconv" - "time" -) - -// StreamOptions controls a single transcode/remux invocation. -type StreamOptions struct { - // Quality caps the output resolution and bitrate when transcoding. - // Direct play ignores it (the source bitrate wins). One of: - // "2160p", "1080p", "720p", "480p", "" (= "1080p"). - Quality string - - // StartOffset seeks the input N seconds in before transcoding. Useful - // for resume / scrubbing. Zero means start from the beginning. - StartOffset time.Duration - - // HW selects the hardware encoder. "" (or "none") means software libx264. - HW HWAccel - - // AudioTrackIndex selects which audio track to keep (0-based, before - // the video stream is excluded). Zero is the default track. - AudioTrackIndex int -} - -// QualityProfile maps a Quality label to encoder constraints. -type QualityProfile struct { - Label string // "1080p" - MaxHeight int // 1080 - VideoBitrate int // bits/s for libx264 -b:v - AudioBitrate int // bits/s for AAC -} - -// qualityProfiles is the full ladder. We default to 1080p when unset. -var qualityProfiles = map[string]QualityProfile{ - "2160p": {Label: "2160p", MaxHeight: 2160, VideoBitrate: 25_000_000, AudioBitrate: 192_000}, - "1080p": {Label: "1080p", MaxHeight: 1080, VideoBitrate: 6_000_000, AudioBitrate: 160_000}, - "720p": {Label: "720p", MaxHeight: 720, VideoBitrate: 3_500_000, AudioBitrate: 128_000}, - "480p": {Label: "480p", MaxHeight: 480, VideoBitrate: 1_500_000, AudioBitrate: 96_000}, -} - -// ResolveQuality returns the QualityProfile for a label, falling back to -// 1080p when the label is empty / unknown. -func ResolveQuality(label string) QualityProfile { - if p, ok := qualityProfiles[label]; ok { - return p - } - return qualityProfiles["1080p"] -} - -// fragmentedMP4Movflags are the magic flags MSE needs to consume an -// ffmpeg pipe as it's produced — avoids the moov atom being written at the -// end of the file (which would force buffering the whole stream). -const fragmentedMP4Movflags = "frag_keyframe+empty_moov+default_base_moof" - -// BuildFFmpegArgs returns the argv (without the binary itself) for -// ffmpeg given the input file, stream options, and a compatibility report. -// -// Two recipes: -// -// - Direct play: -c copy on every selected stream + remux to fMP4. -// - Transcode: re-encode video (libx264 / hwaccel) + audio (aac). -// -// The result writes fMP4 fragments to stdout (`pipe:1`) so the HTTP -// handler can stream them directly to the browser without touching disk. -func BuildFFmpegArgs(inputPath string, report CompatibilityReport, opts StreamOptions) []string { - args := []string{ - "-hide_banner", - "-loglevel", "warning", - "-nostdin", - } - - if opts.HW.HasDecoder() { - args = append(args, opts.HW.DecoderArgs()...) - } - - if opts.StartOffset > 0 { - args = append(args, "-ss", formatDuration(opts.StartOffset)) - } - - args = append(args, "-i", inputPath) - - // Map first video + selected audio. Drop subtitles (browser handles - // them out-of-band; baking them in is a Phase 4.x decision). - args = append(args, - "-map", "0:v:0", - "-map", fmt.Sprintf("0:a:%d?", opts.AudioTrackIndex), - ) - - if report.DirectPlay { - // Cheap path: copy streams, just remux container. - args = append(args, "-c", "copy") - } else { - // Transcode path: pick encoder per HW. - profile := ResolveQuality(opts.Quality) - args = append(args, transcodeArgs(profile, opts.HW)...) - } - - args = append(args, - "-movflags", fragmentedMP4Movflags, - "-f", "mp4", - "pipe:1", - ) - return args -} - -// transcodeArgs returns the encoder + bitrate flags. Keeps the function -// flat so the BuildFFmpegArgs reader can scan the recipe top to bottom. -func transcodeArgs(profile QualityProfile, hw HWAccel) []string { - args := []string{} - - // Video encoder. - args = append(args, "-c:v", hw.VideoEncoder()) - - // Scale filter caps the long edge to MaxHeight, preserving aspect. - // `force_original_aspect_ratio=decrease` keeps it ≤ MaxHeight when - // the source is taller and leaves smaller sources untouched. The - // `force_divisible_by=2` keeps libx264 happy. - scale := fmt.Sprintf( - "scale=-2:%d:force_original_aspect_ratio=decrease:force_divisible_by=2", - profile.MaxHeight, - ) - if hw == HWAccelVAAPI { - // VAAPI needs frames in the GPU surface, scaling is done with - // scale_vaapi. We still upload via format=nv12. - scale = fmt.Sprintf("format=nv12,hwupload,scale_vaapi=-2:%d", profile.MaxHeight) - } - args = append(args, "-vf", scale) - - // Bitrate ceiling (variable bitrate with 2× burst). - args = append(args, - "-b:v", strconv.Itoa(profile.VideoBitrate), - "-maxrate", strconv.Itoa(profile.VideoBitrate*2), - "-bufsize", strconv.Itoa(profile.VideoBitrate*4), - ) - - // SW-only: tune for low latency + don't waste cycles on the deepest - // preset when we're feeding live playback. - if hw == HWAccelNone || hw == HWAccelUnset { - args = append(args, - "-preset", "veryfast", - "-tune", "zerolatency", - ) - } - - // Force yuv420p so MSE reliably plays the result (some libx264 - // configurations otherwise emit yuv422p for SD content). - args = append(args, "-pix_fmt", "yuv420p") - - // Audio: re-encode to AAC stereo. Mono / 5.1 sources are downmixed. - args = append(args, - "-c:a", "aac", - "-b:a", strconv.Itoa(profile.AudioBitrate), - "-ac", "2", - ) - - return args -} - -// formatDuration prints a Go Duration as ffmpeg's `-ss HH:MM:SS.mmm`. -func formatDuration(d time.Duration) string { - if d < 0 { - d = 0 - } - h := int(d / time.Hour) - d -= time.Duration(h) * time.Hour - m := int(d / time.Minute) - d -= time.Duration(m) * time.Minute - s := float64(d) / float64(time.Second) - return fmt.Sprintf("%02d:%02d:%06.3f", h, m, s) -} diff --git a/internal/streaming/hwaccel.go b/internal/streaming/hwaccel.go deleted file mode 100644 index 1c8dff6..0000000 --- a/internal/streaming/hwaccel.go +++ /dev/null @@ -1,144 +0,0 @@ -package streaming - -import ( - "context" - "os/exec" - "runtime" - "strings" - "sync" - "time" -) - -// HWAccel identifies which hardware encoder family the host can use. -type HWAccel string - -const ( - HWAccelUnset HWAccel = "" - HWAccelNone HWAccel = "none" // explicit software libx264 - HWAccelNVENC HWAccel = "nvenc" // NVIDIA GPUs - HWAccelQSV HWAccel = "qsv" // Intel Quick Sync (Linux/Win) - HWAccelVAAPI HWAccel = "vaapi" // Intel/AMD GPUs on Linux - HWAccelVideoToolbox HWAccel = "videotoolbox" // macOS native -) - -// VideoEncoder returns the ffmpeg `-c:v` argument for this accelerator. -func (h HWAccel) VideoEncoder() string { - switch h { - case HWAccelNVENC: - return "h264_nvenc" - case HWAccelQSV: - return "h264_qsv" - case HWAccelVAAPI: - return "h264_vaapi" - case HWAccelVideoToolbox: - return "h264_videotoolbox" - default: - return "libx264" - } -} - -// HasDecoder reports whether the accelerator also supports HW decode. -// We always feed encoders software-decoded frames except for VAAPI where -// the GPU pipeline expects HW-decoded surfaces end-to-end. -func (h HWAccel) HasDecoder() bool { - return h == HWAccelVAAPI -} - -// DecoderArgs returns the ffmpeg flags that enable HW decode for this -// accelerator. Only meaningful when HasDecoder() == true. -func (h HWAccel) DecoderArgs() []string { - if h == HWAccelVAAPI { - return []string{ - "-hwaccel", "vaapi", - "-hwaccel_device", "/dev/dri/renderD128", - "-hwaccel_output_format", "vaapi", - } - } - return nil -} - -// detectedHWAccel caches the result of DetectHWAccel so we don't fork -// ffmpeg on every transcode request. -var ( - detectedHWAccelOnce sync.Once - detectedHWAccel HWAccel -) - -// DetectHWAccel asks ffmpeg what encoders it supports and returns the -// best available. Result is cached for the process lifetime — callers -// should construct the Transcoder once and reuse it. -// -// Detection order (best perf → fallback): -// 1. NVENC (NVIDIA GPU + CUDA driver) -// 2. QSV (Intel iGPU/dGPU + libmfx/intel-media-driver) -// 3. VAAPI (Linux Intel/AMD via /dev/dri) -// 4. VideoToolbox (macOS only) -// 5. None (fallback to libx264 software) -func DetectHWAccel(ctx context.Context, ffmpegPath string) HWAccel { - detectedHWAccelOnce.Do(func() { - detectedHWAccel = doDetectHWAccel(ctx, ffmpegPath) - }) - return detectedHWAccel -} - -// ResetHWAccelCache forces the next DetectHWAccel call to re-probe. -// Intended for tests. -func ResetHWAccelCache() { - detectedHWAccelOnce = sync.Once{} - detectedHWAccel = HWAccelUnset -} - -func doDetectHWAccel(ctx context.Context, ffmpegPath string) HWAccel { - if ctx == nil { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second) - defer cancel() - } - - // macOS videotoolbox is reliable enough that we don't bother probing - // — every Apple Silicon Mac has it; Intel Macs since 10.13 do too. - if runtime.GOOS == "darwin" { - if encoderAvailable(ctx, ffmpegPath, "h264_videotoolbox") { - return HWAccelVideoToolbox - } - } - - for _, candidate := range []struct { - Name HWAccel - Encoder string - }{ - {HWAccelNVENC, "h264_nvenc"}, - {HWAccelQSV, "h264_qsv"}, - {HWAccelVAAPI, "h264_vaapi"}, - } { - if encoderAvailable(ctx, ffmpegPath, candidate.Encoder) { - return candidate.Name - } - } - - return HWAccelNone -} - -// encoderAvailable returns true when `ffmpeg -hide_banner -encoders` -// lists the named encoder. -// -// Note: this only verifies ffmpeg was COMPILED with the encoder. It does -// NOT guarantee the host hardware works at runtime — some users will see -// libx264 fall back at the first failed encode. That's OK; the worst -// case is a one-time slow request. -func encoderAvailable(ctx context.Context, ffmpegPath, encoder string) bool { - cmd := exec.CommandContext(ctx, ffmpegPath, "-hide_banner", "-encoders") - out, err := cmd.Output() - if err != nil { - return false - } - for _, line := range strings.Split(string(out), "\n") { - // `-encoders` output looks like: - // V..... libx264 libx264 H.264 / AVC / MPEG-4 AVC - fields := strings.Fields(line) - if len(fields) >= 2 && fields[1] == encoder { - return true - } - } - return false -} diff --git a/internal/streaming/integration_test.go b/internal/streaming/integration_test.go deleted file mode 100644 index 2cd0b21..0000000 --- a/internal/streaming/integration_test.go +++ /dev/null @@ -1,204 +0,0 @@ -package streaming - -import ( - "bytes" - "context" - "encoding/json" - "os" - "os/exec" - "path/filepath" - "testing" - "time" - - "github.com/torrentclaw/unarr/internal/library/mediainfo" -) - -// These tests need a real ffmpeg + ffprobe on PATH. They're skipped on -// CI runners that lack them — the unit tests already pin the recipes -// deterministically. Run locally when changing the transcoder pipeline. - -func resolveBins(t *testing.T) (string, string) { - t.Helper() - ffmpeg, err := exec.LookPath("ffmpeg") - if err != nil { - t.Skip("ffmpeg not on PATH — skipping integration test") - } - ffprobe, err := exec.LookPath("ffprobe") - if err != nil { - t.Skip("ffprobe not on PATH — skipping integration test") - } - return ffmpeg, ffprobe -} - -// generateTestVideo synthesises a short MP4 for the transcoder to chew on. -// vcodec/acodec let us exercise both direct-play and transcode branches. -func generateTestVideo(t *testing.T, ffmpeg, dir, vcodec, acodec, container string) string { - t.Helper() - out := filepath.Join(dir, "sample."+container) - args := []string{ - "-hide_banner", "-loglevel", "error", "-y", - "-f", "lavfi", "-i", "testsrc=duration=2:size=320x240:rate=15", - "-f", "lavfi", "-i", "sine=frequency=440:duration=2", - "-c:v", vcodec, - } - // libx265 needs at least one keyframe; 2s @ 15fps is fine. - if vcodec == "libx265" { - args = append(args, "-x265-params", "log-level=error") - } - args = append(args, "-c:a", acodec, "-shortest", out) - cmd := exec.Command(ffmpeg, args...) - if buf, err := cmd.CombinedOutput(); err != nil { - t.Skipf("could not synthesise test video (%s/%s/%s): %v\n%s", - vcodec, acodec, container, err, buf) - } - return out -} - -// probeOutput uses ffprobe to inspect the (synthesised) transcoder output -// and returns video + audio codec names. -func probeOutput(t *testing.T, ffprobe, path string) (string, string) { - t.Helper() - cmd := exec.Command(ffprobe, - "-hide_banner", "-loglevel", "error", - "-print_format", "json", "-show_streams", path) - buf, err := cmd.Output() - if err != nil { - t.Fatalf("ffprobe %s: %v", path, err) - } - var data struct { - Streams []struct { - CodecType string `json:"codec_type"` - CodecName string `json:"codec_name"` - } `json:"streams"` - } - if err := json.Unmarshal(buf, &data); err != nil { - t.Fatalf("ffprobe parse: %v", err) - } - var v, a string - for _, s := range data.Streams { - switch s.CodecType { - case "video": - v = s.CodecName - case "audio": - a = s.CodecName - } - } - return v, a -} - -// TestTranscoder_DirectPlayProducesH264 — H.264 + AAC source → direct play -// → output keeps both codecs, just remuxed to fMP4. -func TestTranscoder_DirectPlayProducesH264(t *testing.T) { - ffmpeg, ffprobe := resolveBins(t) - dir := t.TempDir() - src := generateTestVideo(t, ffmpeg, dir, "libx264", "aac", "mp4") - - tr, err := NewTranscoder(ffmpeg, ffprobe) - if err != nil { - t.Fatalf("NewTranscoder: %v", err) - } - - report, _, err := tr.Analyze(context.Background(), src) - if err != nil { - t.Fatalf("Analyze: %v", err) - } - if !report.DirectPlay { - t.Fatalf("h264+aac sample should be direct-playable, got %+v", report) - } - - out := filepath.Join(dir, "out.mp4") - f, err := os.Create(out) - if err != nil { - t.Fatalf("create out: %v", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - if err := tr.Stream(ctx, src, f, StreamOptions{HW: HWAccelNone}); err != nil { - f.Close() - t.Fatalf("Stream: %v", err) - } - f.Close() - - v, a := probeOutput(t, ffprobe, out) - if v != "h264" { - t.Fatalf("direct-play output video codec = %q want h264", v) - } - if a != "aac" { - t.Fatalf("direct-play output audio codec = %q want aac", a) - } -} - -// TestTranscoder_TranscodeHEVCToH264 — HEVC source → transcode → -// output is H.264 + AAC ready for the browser. -func TestTranscoder_TranscodeHEVCToH264(t *testing.T) { - ffmpeg, ffprobe := resolveBins(t) - dir := t.TempDir() - - // Verify libx265 available; some Alpine builds disable it. - if !encoderAvailable(context.Background(), ffmpeg, "libx265") { - t.Skip("ffmpeg lacks libx265 — skipping HEVC transcode integration") - } - src := generateTestVideo(t, ffmpeg, dir, "libx265", "ac3", "mkv") - - tr, err := NewTranscoder(ffmpeg, ffprobe) - if err != nil { - t.Fatalf("NewTranscoder: %v", err) - } - report, _, err := tr.Analyze(context.Background(), src) - if err != nil { - t.Fatalf("Analyze: %v", err) - } - if report.DirectPlay { - t.Fatalf("hevc+ac3 sample must NOT be direct-playable") - } - - var buf bytes.Buffer - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - if err := tr.Stream(ctx, src, &buf, StreamOptions{Quality: "480p", HW: HWAccelNone}); err != nil { - t.Fatalf("Stream: %v", err) - } - - out := filepath.Join(dir, "transcoded.mp4") - if err := os.WriteFile(out, buf.Bytes(), 0o644); err != nil { - t.Fatalf("persist transcode: %v", err) - } - - v, a := probeOutput(t, ffprobe, out) - if v != "h264" { - t.Fatalf("transcoded video codec = %q want h264", v) - } - if a != "aac" { - t.Fatalf("transcoded audio codec = %q want aac", a) - } -} - -// TestTranscoder_AnalyzeReportsRealMediaInfo validates that the Transcoder -// returns a usable MediaInfo on top of the report — the API handler will -// surface duration / resolution to the player UI. -func TestTranscoder_AnalyzeReportsRealMediaInfo(t *testing.T) { - ffmpeg, ffprobe := resolveBins(t) - dir := t.TempDir() - src := generateTestVideo(t, ffmpeg, dir, "libx264", "aac", "mp4") - - tr, err := NewTranscoder(ffmpeg, ffprobe) - if err != nil { - t.Fatalf("NewTranscoder: %v", err) - } - _, info, err := tr.Analyze(context.Background(), src) - if err != nil { - t.Fatalf("Analyze: %v", err) - } - if info == nil || info.Video == nil { - t.Fatalf("missing parsed mediainfo: %+v", info) - } - if info.Video.Width != 320 || info.Video.Height != 240 { - t.Errorf("dimensions = %dx%d want 320x240", info.Video.Width, info.Video.Height) - } - if info.Video.Duration < 1.5 || info.Video.Duration > 2.5 { - t.Errorf("duration ~2s expected, got %v", info.Video.Duration) - } - // Ensure the package types line up with mediainfo's exported model. - _ = mediainfo.MediaInfo{} -} diff --git a/internal/streaming/stream.go b/internal/streaming/stream.go deleted file mode 100644 index 67d956e..0000000 --- a/internal/streaming/stream.go +++ /dev/null @@ -1,131 +0,0 @@ -package streaming - -import ( - "context" - "errors" - "fmt" - "io" - "os/exec" - "sync" - - "github.com/torrentclaw/unarr/internal/library/mediainfo" -) - -// Transcoder owns the resolved ffmpeg / ffprobe binaries plus the -// detected hardware accelerator. One per process; safe for concurrent use. -type Transcoder struct { - ffmpegPath string - ffprobePath string - - hwOnce sync.Once - hw HWAccel -} - -// NewTranscoder constructs a Transcoder from explicit binary paths. -// Both must be non-empty; resolve them upstream via -// mediainfo.ResolveFFmpeg / ResolveFFprobe. -func NewTranscoder(ffmpegPath, ffprobePath string) (*Transcoder, error) { - if ffmpegPath == "" { - return nil, errors.New("streaming: ffmpeg path is required") - } - if ffprobePath == "" { - return nil, errors.New("streaming: ffprobe path is required") - } - return &Transcoder{ - ffmpegPath: ffmpegPath, - ffprobePath: ffprobePath, - }, nil -} - -// HWAccel returns the cached / detected hardware accelerator. First call -// runs `ffmpeg -encoders`; subsequent calls reuse the result. -func (t *Transcoder) HWAccel(ctx context.Context) HWAccel { - t.hwOnce.Do(func() { - t.hw = DetectHWAccel(ctx, t.ffmpegPath) - }) - return t.hw -} - -// Analyze runs ffprobe on the input file and returns a compatibility -// report so the caller can decide direct play vs transcode. -func (t *Transcoder) Analyze(ctx context.Context, inputPath string) (CompatibilityReport, *mediainfo.MediaInfo, error) { - info, err := mediainfo.ExtractMediaInfo(ctx, t.ffprobePath, inputPath) - if err != nil { - return CompatibilityReport{}, nil, fmt.Errorf("streaming: ffprobe failed: %w", err) - } - return AnalyzeCompatibility(info), info, nil -} - -// Stream runs ffmpeg with the right recipe for the given file + options -// and writes fragmented MP4 to dst. Blocks until ffmpeg exits or the -// context is cancelled. If ffmpeg's stderr captures something useful, it's -// included in the returned error. -func (t *Transcoder) Stream(ctx context.Context, inputPath string, dst io.Writer, opts StreamOptions) error { - report, _, err := t.Analyze(ctx, inputPath) - if err != nil { - return err - } - return t.StreamWithReport(ctx, inputPath, dst, opts, report) -} - -// StreamWithReport is the lower-level entry point — accepts a -// pre-computed CompatibilityReport so the API handler can inspect the -// decision before kicking off a transcode (useful for billing / -// telemetry / quality-fallback policies). -func (t *Transcoder) StreamWithReport( - ctx context.Context, - inputPath string, - dst io.Writer, - opts StreamOptions, - report CompatibilityReport, -) error { - if opts.HW == HWAccelUnset { - opts.HW = t.HWAccel(ctx) - } - - args := BuildFFmpegArgs(inputPath, report, opts) - cmd := exec.CommandContext(ctx, t.ffmpegPath, args...) - cmd.Stdout = dst - - stderrBuf := newCappedBuffer(8 * 1024) // last 8 KiB is plenty for diagnosing - cmd.Stderr = stderrBuf - - if err := cmd.Run(); err != nil { - // Cancellation looks like an exec error too; surface the cause - // so callers don't blame ffmpeg for client disconnects. - if ctxErr := ctx.Err(); ctxErr != nil { - return ctxErr - } - return fmt.Errorf("streaming: ffmpeg exited: %w (stderr tail: %s)", err, stderrBuf.String()) - } - return nil -} - -// cappedBuffer is an io.Writer that keeps only the last `cap` bytes -// written. Used to capture ffmpeg's tail stderr for error reporting -// without unbounded memory growth on long transcodes. -type cappedBuffer struct { - buf []byte - cap int -} - -func newCappedBuffer(cap int) *cappedBuffer { - return &cappedBuffer{cap: cap} -} - -func (c *cappedBuffer) Write(p []byte) (int, error) { - if len(p) >= c.cap { - c.buf = append(c.buf[:0], p[len(p)-c.cap:]...) - return len(p), nil - } - if len(c.buf)+len(p) > c.cap { - drop := len(c.buf) + len(p) - c.cap - c.buf = c.buf[drop:] - } - c.buf = append(c.buf, p...) - return len(p), nil -} - -func (c *cappedBuffer) String() string { - return string(c.buf) -} diff --git a/internal/streaming/transcoder.go b/internal/streaming/transcoder.go deleted file mode 100644 index 8daa786..0000000 --- a/internal/streaming/transcoder.go +++ /dev/null @@ -1,135 +0,0 @@ -// Package streaming wraps ffmpeg for the WebRTC-streaming pipeline. -// -// The browser-side reproductor lives on torrentclaw.com and consumes -// fragmented MP4 (fMP4) chunks via Media Source Extensions (MSE). MSE is -// strict about codecs: H.264 / VP8 / VP9 / AV1 video + AAC / Opus / MP3 -// audio + MP4 / WebM container. Anything else (HEVC/x265, MKV, EAC3, FLAC, -// 10-bit H.264, …) needs transcoding. -// -// The transcoder picks one of two paths per request: -// -// - Direct play — input is already MSE-compatible. Container is remuxed -// to fragmented MP4 with the audio + video streams copied. Cheap: -// ~no CPU, ~no memory. -// -// - Transcode — input is incompatible. Re-encode video to H.264 -// (libx264 sw / h264_nvenc / h264_qsv / h264_vaapi / h264_videotoolbox -// depending on what the host supports) and audio to AAC. Expensive: -// 1× core for 1080p sw, ~free with HW accel. -package streaming - -import ( - "github.com/torrentclaw/unarr/internal/library/mediainfo" -) - -// browserVideoCodecs lists video codecs the player can render natively -// without transcoding. Names match ffprobe's `codec_name`. -var browserVideoCodecs = map[string]struct{}{ - "h264": {}, - "vp8": {}, - "vp9": {}, - "av1": {}, -} - -// browserAudioCodecs lists audio codecs the player accepts natively. -var browserAudioCodecs = map[string]struct{}{ - "aac": {}, - "opus": {}, - "mp3": {}, -} - -// browserPixelFormats lists pixel formats MSE H.264 reliably decodes -// in-browser. 10-bit / 12-bit profiles are rejected because Safari + most -// Chromium versions software-decode them at 1-2 fps. -var browserPixelFormats = map[string]struct{}{ - "yuv420p": {}, - "yuvj420p": {}, -} - -// CompatibilityReport explains why a file is or isn't direct-playable. -// Returned by AnalyzeCompatibility so the caller can show actionable -// feedback (e.g. "transcoding video: HEVC → H.264"). -type CompatibilityReport struct { - DirectPlay bool - VideoCompat bool - AudioCompat bool - Container string // input container hint (best effort) - VideoCodec string - AudioCodec string - PixelFormat string - BitDepth int - Reasons []string // human-readable list of mismatches; empty when DirectPlay -} - -// AnalyzeCompatibility inspects a parsed mediainfo and decides whether the -// stream needs transcoding. It does NOT touch disk or run ffmpeg. -// -// Direct play requires ALL of: -// - Video codec ∈ {h264, vp8, vp9, av1} -// - Pixel format ∈ {yuv420p, yuvj420p} -// - Bit depth ≤ 8 -// - Audio codec ∈ {aac, opus, mp3} -// -// First audio track wins for the compatibility decision; later tracks are -// repacked along with it. Container is intentionally ignored — even MKV -// carrying H.264 + AAC can be remuxed to fMP4 cheaply, so it's not worth -// failing direct-play on container alone. -func AnalyzeCompatibility(info *mediainfo.MediaInfo) CompatibilityReport { - r := CompatibilityReport{} - if info == nil || info.Video == nil { - r.Reasons = append(r.Reasons, "missing video stream metadata") - return r - } - - r.VideoCodec = info.Video.Codec - r.PixelFormat = pixelFormatFor(info.Video) - r.BitDepth = info.Video.BitDepth - - _, vcOK := browserVideoCodecs[r.VideoCodec] - r.VideoCompat = vcOK - if !vcOK { - r.Reasons = append(r.Reasons, - "video codec "+r.VideoCodec+" not playable in browser") - } - if r.BitDepth > 8 { - r.VideoCompat = false - r.Reasons = append(r.Reasons, "video bit depth >8 (HDR / 10-bit)") - } - if r.PixelFormat != "" { - if _, ok := browserPixelFormats[r.PixelFormat]; !ok { - r.VideoCompat = false - r.Reasons = append(r.Reasons, - "pixel format "+r.PixelFormat+" not playable in browser") - } - } - - if len(info.Audio) > 0 { - r.AudioCodec = info.Audio[0].Codec - _, acOK := browserAudioCodecs[r.AudioCodec] - r.AudioCompat = acOK - if !acOK { - r.Reasons = append(r.Reasons, - "audio codec "+r.AudioCodec+" not playable in browser") - } - } else { - // No audio track — direct play allowed for video-only streams. - r.AudioCompat = true - } - - r.DirectPlay = r.VideoCompat && r.AudioCompat - return r -} - -// pixelFormatFor returns a best-effort pixel format string for a VideoInfo. -// mediainfo doesn't carry pix_fmt explicitly today, so we infer from the -// HDR flag: HDR streams are 10-bit yuv420p10le (incompatible by definition) -// while everything else is assumed yuv420p. -// -// Once mediainfo grows a PixFmt field we replace this heuristic with the -// raw value. -func pixelFormatFor(v *mediainfo.VideoInfo) string { - if v.HDR != "" || v.BitDepth >= 10 { - return "yuv420p10le" - } - return "yuv420p" -} diff --git a/internal/streaming/transcoder_test.go b/internal/streaming/transcoder_test.go deleted file mode 100644 index 42d4979..0000000 --- a/internal/streaming/transcoder_test.go +++ /dev/null @@ -1,267 +0,0 @@ -package streaming - -import ( - "strings" - "testing" - "time" - - "github.com/torrentclaw/unarr/internal/library/mediainfo" -) - -// AnalyzeCompatibility — direct play happy paths. -func TestAnalyzeCompatibility_DirectPlayH264AAC(t *testing.T) { - info := &mediainfo.MediaInfo{ - Video: &mediainfo.VideoInfo{Codec: "h264", BitDepth: 8}, - Audio: []mediainfo.AudioTrack{{Codec: "aac", Channels: 2}}, - } - r := AnalyzeCompatibility(info) - if !r.DirectPlay { - t.Fatalf("h264+aac must be direct-playable, got %+v", r) - } - if len(r.Reasons) != 0 { - t.Fatalf("direct play should have no reasons, got %v", r.Reasons) - } -} - -func TestAnalyzeCompatibility_DirectPlayVideoOnly(t *testing.T) { - info := &mediainfo.MediaInfo{ - Video: &mediainfo.VideoInfo{Codec: "vp9", BitDepth: 8}, - } - r := AnalyzeCompatibility(info) - if !r.DirectPlay { - t.Fatalf("video-only vp9 must be direct-playable, got %+v", r) - } -} - -// AnalyzeCompatibility — transcode required. -func TestAnalyzeCompatibility_TranscodeHEVC(t *testing.T) { - info := &mediainfo.MediaInfo{ - Video: &mediainfo.VideoInfo{Codec: "hevc", BitDepth: 8}, - Audio: []mediainfo.AudioTrack{{Codec: "aac"}}, - } - r := AnalyzeCompatibility(info) - if r.DirectPlay { - t.Fatalf("HEVC must NOT be direct-playable") - } - if !strings.Contains(strings.Join(r.Reasons, ";"), "hevc") { - t.Fatalf("expected reason mentioning hevc, got %v", r.Reasons) - } -} - -func TestAnalyzeCompatibility_TranscodeHDR10bit(t *testing.T) { - info := &mediainfo.MediaInfo{ - Video: &mediainfo.VideoInfo{Codec: "h264", BitDepth: 10, HDR: "HDR10"}, - Audio: []mediainfo.AudioTrack{{Codec: "aac"}}, - } - r := AnalyzeCompatibility(info) - if r.DirectPlay { - t.Fatalf("10-bit HDR10 must NOT be direct-playable") - } -} - -func TestAnalyzeCompatibility_TranscodeEAC3Audio(t *testing.T) { - info := &mediainfo.MediaInfo{ - Video: &mediainfo.VideoInfo{Codec: "h264", BitDepth: 8}, - Audio: []mediainfo.AudioTrack{{Codec: "eac3", Channels: 6}}, - } - r := AnalyzeCompatibility(info) - if r.DirectPlay { - t.Fatalf("EAC3 audio must trigger transcode") - } - if r.VideoCompat != true { - t.Fatalf("video stayed h264 — VideoCompat should still be true; got %+v", r) - } -} - -func TestAnalyzeCompatibility_NilGuard(t *testing.T) { - r := AnalyzeCompatibility(nil) - if r.DirectPlay { - t.Fatal("nil MediaInfo must not be direct-playable") - } - r2 := AnalyzeCompatibility(&mediainfo.MediaInfo{Video: nil}) - if r2.DirectPlay { - t.Fatal("MediaInfo without video must not be direct-playable") - } -} - -// ResolveQuality — fallback + table lookup. -func TestResolveQuality_FallbackTo1080p(t *testing.T) { - got := ResolveQuality("") - if got.Label != "1080p" { - t.Fatalf("empty label fallback wrong: %s", got.Label) - } - got = ResolveQuality("garbage") - if got.Label != "1080p" { - t.Fatalf("unknown label fallback wrong: %s", got.Label) - } -} - -func TestResolveQuality_KnownLabels(t *testing.T) { - cases := map[string]int{ - "480p": 480, - "720p": 720, - "1080p": 1080, - "2160p": 2160, - } - for label, height := range cases { - got := ResolveQuality(label) - if got.MaxHeight != height { - t.Errorf("ResolveQuality(%q).MaxHeight = %d want %d", label, got.MaxHeight, height) - } - } -} - -// BuildFFmpegArgs — recipe shape verified by argv content. -func TestBuildFFmpegArgs_DirectPlayUsesCopy(t *testing.T) { - report := CompatibilityReport{DirectPlay: true, VideoCompat: true, AudioCompat: true} - args := BuildFFmpegArgs("/tmp/movie.mp4", report, StreamOptions{}) - joined := strings.Join(args, " ") - - want := []string{"-i /tmp/movie.mp4", "-c copy", "-movflags " + fragmentedMP4Movflags, "-f mp4", "pipe:1"} - for _, w := range want { - if !strings.Contains(joined, w) { - t.Fatalf("direct-play argv missing %q\n got: %s", w, joined) - } - } - if strings.Contains(joined, "libx264") { - t.Fatalf("direct-play must NOT invoke libx264, got: %s", joined) - } -} - -func TestBuildFFmpegArgs_TranscodeUsesLibx264(t *testing.T) { - report := CompatibilityReport{DirectPlay: false, VideoCompat: false, AudioCompat: true} - args := BuildFFmpegArgs("/tmp/m.mkv", report, StreamOptions{Quality: "720p"}) - joined := strings.Join(args, " ") - - want := []string{ - "-c:v libx264", - "scale=-2:720", - "-b:v 3500000", - "-c:a aac", - "-b:a 128000", - "-pix_fmt yuv420p", - "-preset veryfast", - } - for _, w := range want { - if !strings.Contains(joined, w) { - t.Fatalf("720p transcode argv missing %q\n got: %s", w, joined) - } - } -} - -func TestBuildFFmpegArgs_NVENCSwapsEncoder(t *testing.T) { - report := CompatibilityReport{DirectPlay: false} - args := BuildFFmpegArgs("/tmp/m.mkv", report, StreamOptions{HW: HWAccelNVENC}) - joined := strings.Join(args, " ") - - if !strings.Contains(joined, "-c:v h264_nvenc") { - t.Fatalf("NVENC must use h264_nvenc, got: %s", joined) - } - if strings.Contains(joined, "-preset veryfast") { - t.Fatalf("HW accel skips libx264 preset, got: %s", joined) - } -} - -func TestBuildFFmpegArgs_VAAPIInjectsHwaccelDecoder(t *testing.T) { - report := CompatibilityReport{DirectPlay: false} - args := BuildFFmpegArgs("/tmp/m.mkv", report, StreamOptions{HW: HWAccelVAAPI}) - joined := strings.Join(args, " ") - - if !strings.Contains(joined, "-hwaccel vaapi") { - t.Fatalf("VAAPI must add -hwaccel vaapi, got: %s", joined) - } - if !strings.Contains(joined, "scale_vaapi") { - t.Fatalf("VAAPI must use scale_vaapi filter, got: %s", joined) - } -} - -func TestBuildFFmpegArgs_StartOffsetEmitsSS(t *testing.T) { - report := CompatibilityReport{DirectPlay: true} - args := BuildFFmpegArgs("/tmp/m.mp4", report, StreamOptions{StartOffset: 65*time.Second + 500*time.Millisecond}) - joined := strings.Join(args, " ") - - if !strings.Contains(joined, "-ss 00:01:05.500") { - t.Fatalf("expected -ss 00:01:05.500, got: %s", joined) - } -} - -// HWAccel encoders. -func TestHWAccel_VideoEncoder(t *testing.T) { - cases := map[HWAccel]string{ - HWAccelNone: "libx264", - HWAccelUnset: "libx264", - HWAccelNVENC: "h264_nvenc", - HWAccelQSV: "h264_qsv", - HWAccelVAAPI: "h264_vaapi", - HWAccelVideoToolbox: "h264_videotoolbox", - } - for hw, want := range cases { - if got := hw.VideoEncoder(); got != want { - t.Errorf("%s.VideoEncoder() = %q want %q", hw, got, want) - } - } -} - -func TestHWAccel_OnlyVAAPIHasDecoder(t *testing.T) { - for _, h := range []HWAccel{HWAccelNone, HWAccelNVENC, HWAccelQSV, HWAccelVideoToolbox} { - if h.HasDecoder() { - t.Errorf("%s shouldn't claim HW decoder", h) - } - } - if !HWAccelVAAPI.HasDecoder() { - t.Error("VAAPI should claim HW decoder") - } -} - -// formatDuration — boundary cases. -func TestFormatDuration(t *testing.T) { - cases := []struct { - in time.Duration - want string - }{ - {0, "00:00:00.000"}, - {500 * time.Millisecond, "00:00:00.500"}, - {65 * time.Second, "00:01:05.000"}, - {2*time.Hour + 3*time.Minute + 7*time.Second + 250*time.Millisecond, "02:03:07.250"}, - {-time.Second, "00:00:00.000"}, - } - for _, c := range cases { - if got := formatDuration(c.in); got != c.want { - t.Errorf("formatDuration(%v) = %q want %q", c.in, got, c.want) - } - } -} - -// cappedBuffer — overflow keeps only the tail. -func TestCappedBuffer_KeepsTail(t *testing.T) { - b := newCappedBuffer(10) - b.Write([]byte("hello ")) - b.Write([]byte("world")) - b.Write([]byte("!")) - // "hello " + "world" + "!" = 12 bytes; cap 10 → keep last 10 = "llo world!". - got := b.String() - if got != "llo world!" { - t.Fatalf("unexpected tail %q", got) - } -} - -func TestCappedBuffer_LargeSingleWrite(t *testing.T) { - b := newCappedBuffer(5) - b.Write([]byte("abcdefghij")) - if got := b.String(); got != "fghij" { - t.Fatalf("large write tail wrong: %q", got) - } -} - -// NewTranscoder rejects empty paths. -func TestNewTranscoder_RequiresBothBinaries(t *testing.T) { - if _, err := NewTranscoder("", "/usr/bin/ffprobe"); err == nil { - t.Error("expected error for empty ffmpeg path") - } - if _, err := NewTranscoder("/usr/bin/ffmpeg", ""); err == nil { - t.Error("expected error for empty ffprobe path") - } - if _, err := NewTranscoder("/usr/bin/ffmpeg", "/usr/bin/ffprobe"); err != nil { - t.Errorf("valid paths should not error: %v", err) - } -}