feat(streaming): ffmpeg transcoding pipeline (direct play / fMP4 / HW accel)

The browser-side WebRTC reproductor needs MP4 / H.264 / AAC / yuv420p to
keep MSE happy. This package decides per request whether to:

  • direct-play  — input already MSE-compatible, just remux to fMP4
  • transcode    — re-encode video (libx264 / NVENC / QSV / VAAPI /
                   VideoToolbox) + audio (AAC), fragment to fMP4

Pieces:

- internal/streaming/transcoder.go — AnalyzeCompatibility decides the
  recipe from a parsed mediainfo. CompatibilityReport carries the reasons
  so the player UI can show "transcoding video: HEVC → H.264".

- internal/streaming/ffmpeg_args.go — BuildFFmpegArgs assembles the argv
  for ffmpeg. Direct play uses `-c copy`; transcode uses libx264 or the
  selected HW encoder. Output is always fragmented MP4 piped to stdout
  (-movflags frag_keyframe+empty_moov+default_base_moof) so the HTTP
  handler can stream straight to the browser without disk I/O.

  Quality ladder: 480p (1.5Mb), 720p (3.5Mb), 1080p (6Mb), 2160p (25Mb).
  Default 1080p when unset / unknown. -ss seek for resume / scrubbing.

- internal/streaming/hwaccel.go — DetectHWAccel runs `ffmpeg -encoders`
  once per process and caches the best available. Order: NVENC → QSV →
  VAAPI → VideoToolbox → libx264. VAAPI is the only family that wires up
  HW decode too (`-hwaccel vaapi`); the others software-decode and HW-
  encode (works fine and avoids /dev/dri permission rabbit holes).

- internal/streaming/stream.go — Transcoder facade wires Analyze + Stream
  together for the API handler in Fase 4. Captures the last 8 KiB of
  ffmpeg stderr for diagnosable errors without unbounded memory.

Tests (20 unit, all green):
- AnalyzeCompatibility: h264+aac direct, video-only direct, HEVC →
  transcode, 10-bit HDR → transcode, EAC3 audio → transcode, nil guards
- ResolveQuality: empty + unknown fallback to 1080p, 4-step ladder
- BuildFFmpegArgs: direct play -c copy, transcode libx264 + bitrate +
  scale, NVENC swaps encoder & drops preset, VAAPI injects -hwaccel +
  scale_vaapi, -ss timestamp formatting
- HWAccel: encoder-name table, VAAPI is the only one with HW decode
- formatDuration: zero, sub-second, HH:MM:SS, negative-clamped
- cappedBuffer: tail retention through multi-write and large-write paths
- NewTranscoder: rejects empty paths
This commit is contained in:
Deivid Soto 2026-05-06 11:34:57 +02:00
parent e68b127acc
commit 75dcc0f1cb
5 changed files with 850 additions and 0 deletions

View file

@ -0,0 +1,131 @@
package streaming
import (
"context"
"errors"
"fmt"
"io"
"os/exec"
"sync"
"github.com/torrentclaw/unarr/internal/library/mediainfo"
)
// Transcoder owns the resolved ffmpeg / ffprobe binaries plus the
// detected hardware accelerator. One per process; safe for concurrent use.
type Transcoder struct {
ffmpegPath string
ffprobePath string
hwOnce sync.Once
hw HWAccel
}
// NewTranscoder constructs a Transcoder from explicit binary paths.
// Both must be non-empty; resolve them upstream via
// mediainfo.ResolveFFmpeg / ResolveFFprobe.
func NewTranscoder(ffmpegPath, ffprobePath string) (*Transcoder, error) {
if ffmpegPath == "" {
return nil, errors.New("streaming: ffmpeg path is required")
}
if ffprobePath == "" {
return nil, errors.New("streaming: ffprobe path is required")
}
return &Transcoder{
ffmpegPath: ffmpegPath,
ffprobePath: ffprobePath,
}, nil
}
// HWAccel returns the cached / detected hardware accelerator. First call
// runs `ffmpeg -encoders`; subsequent calls reuse the result.
func (t *Transcoder) HWAccel(ctx context.Context) HWAccel {
t.hwOnce.Do(func() {
t.hw = DetectHWAccel(ctx, t.ffmpegPath)
})
return t.hw
}
// Analyze runs ffprobe on the input file and returns a compatibility
// report so the caller can decide direct play vs transcode.
func (t *Transcoder) Analyze(ctx context.Context, inputPath string) (CompatibilityReport, *mediainfo.MediaInfo, error) {
info, err := mediainfo.ExtractMediaInfo(ctx, t.ffprobePath, inputPath)
if err != nil {
return CompatibilityReport{}, nil, fmt.Errorf("streaming: ffprobe failed: %w", err)
}
return AnalyzeCompatibility(info), info, nil
}
// Stream runs ffmpeg with the right recipe for the given file + options
// and writes fragmented MP4 to dst. Blocks until ffmpeg exits or the
// context is cancelled. If ffmpeg's stderr captures something useful, it's
// included in the returned error.
func (t *Transcoder) Stream(ctx context.Context, inputPath string, dst io.Writer, opts StreamOptions) error {
report, _, err := t.Analyze(ctx, inputPath)
if err != nil {
return err
}
return t.StreamWithReport(ctx, inputPath, dst, opts, report)
}
// StreamWithReport is the lower-level entry point — accepts a
// pre-computed CompatibilityReport so the API handler can inspect the
// decision before kicking off a transcode (useful for billing /
// telemetry / quality-fallback policies).
func (t *Transcoder) StreamWithReport(
ctx context.Context,
inputPath string,
dst io.Writer,
opts StreamOptions,
report CompatibilityReport,
) error {
if opts.HW == HWAccelUnset {
opts.HW = t.HWAccel(ctx)
}
args := BuildFFmpegArgs(inputPath, report, opts)
cmd := exec.CommandContext(ctx, t.ffmpegPath, args...)
cmd.Stdout = dst
stderrBuf := newCappedBuffer(8 * 1024) // last 8 KiB is plenty for diagnosing
cmd.Stderr = stderrBuf
if err := cmd.Run(); err != nil {
// Cancellation looks like an exec error too; surface the cause
// so callers don't blame ffmpeg for client disconnects.
if ctxErr := ctx.Err(); ctxErr != nil {
return ctxErr
}
return fmt.Errorf("streaming: ffmpeg exited: %w (stderr tail: %s)", err, stderrBuf.String())
}
return nil
}
// cappedBuffer is an io.Writer that keeps only the last `cap` bytes
// written. Used to capture ffmpeg's tail stderr for error reporting
// without unbounded memory growth on long transcodes.
type cappedBuffer struct {
buf []byte
cap int
}
func newCappedBuffer(cap int) *cappedBuffer {
return &cappedBuffer{cap: cap}
}
func (c *cappedBuffer) Write(p []byte) (int, error) {
if len(p) >= c.cap {
c.buf = append(c.buf[:0], p[len(p)-c.cap:]...)
return len(p), nil
}
if len(c.buf)+len(p) > c.cap {
drop := len(c.buf) + len(p) - c.cap
c.buf = c.buf[drop:]
}
c.buf = append(c.buf, p...)
return len(p), nil
}
func (c *cappedBuffer) String() string {
return string(c.buf)
}