fix(transcoder): force main profile + setparams Rec.709 + serveRange wait

1. -profile:v main + -level:v 4.0 to avoid Chrome's HW decoder path that
   failed with "VaapiWrapper: failed initializing for h264 high" on Linux.
2. setparams to rewrite HDR HEVC color metadata to SDR Rec.709 so browsers
   don't reject wide-gamut output.
3. serveRange caps `want` by estimated final size (not current). ReadAt
   blocks until ffmpeg catches up — that's the right behaviour. Returning
   RangeEnd inmediato was making the browser abort with "Format error".
4. Debug log on every range_req.
This commit is contained in:
Deivid Soto 2026-05-07 13:48:45 +02:00
parent 457d6e1f7c
commit 27fe84f2a0
3 changed files with 196 additions and 130 deletions

View file

@ -7,7 +7,7 @@ import (
"io"
"os"
"path/filepath"
"sync"
"strings"
"sync/atomic"
"time"
)
@ -80,11 +80,11 @@ type transcodeSource struct {
name string
estimate int64
mu sync.Mutex
cond *sync.Cond
ctx context.Context
notify chan struct{} // size grew or final flipped; cap=1, non-blocking send
size atomic.Int64
final atomic.Bool
failure error
failure atomic.Pointer[error]
startedAt time.Time
}
@ -119,8 +119,7 @@ func newTranscodeSource(
// Spawn ffmpeg directly (not via NewTranscoder pipe) so it writes to
// disk in real time. We re-use the rest of TranscodeOpts wiring.
cmd := &Transcoder{}
cmd, err = startTranscoderToFile(ctx, opts.FFmpegPath, args, cmd)
cmd, err := startTranscoderToFile(ctx, opts.FFmpegPath, args, nil)
if err != nil {
os.Remove(tmpPath)
return nil, err
@ -133,9 +132,10 @@ func newTranscodeSource(
cmd: cmd,
name: displayName,
estimate: estimate,
ctx: ctx,
notify: make(chan struct{}, 1),
startedAt: time.Now(),
}
t.cond = sync.NewCond(&t.mu)
// Re-open the tmp file for reading; ffmpeg keeps writing to it.
rf, err := os.Open(tmpPath)
@ -151,6 +151,17 @@ func newTranscodeSource(
return t, nil
}
// signalNotify wakes any goroutine blocked in ReadAt. Non-blocking: if a
// notification is already pending the new event is folded into it (callers
// always re-check size + final after waking, so a coalesced signal still
// produces correct behaviour).
func (t *transcodeSource) signalNotify() {
select {
case t.notify <- struct{}{}:
default:
}
}
// watchSize polls the temp file size every 200 ms and wakes any blocked
// ReadAt callers once new bytes arrive.
func (t *transcodeSource) watchSize(ctx context.Context) {
@ -159,16 +170,12 @@ func (t *transcodeSource) watchSize(ctx context.Context) {
for {
select {
case <-ctx.Done():
t.mu.Lock()
t.cond.Broadcast()
t.mu.Unlock()
t.signalNotify()
return
case <-ticker.C:
}
if t.final.Load() {
t.mu.Lock()
t.cond.Broadcast()
t.mu.Unlock()
t.signalNotify()
return
}
stat, err := os.Stat(t.tmpPath)
@ -178,91 +185,83 @@ func (t *transcodeSource) watchSize(ctx context.Context) {
current := stat.Size()
if current > t.size.Load() {
t.size.Store(current)
t.mu.Lock()
t.cond.Broadcast()
t.mu.Unlock()
t.signalNotify()
}
}
}
// watchExit waits for ffmpeg to exit and locks in the final size.
// watchExit waits for ffmpeg to exit (via Transcoder's single-Wait goroutine)
// and locks in the final size. A kill triggered by Close() is NOT a failure.
func (t *transcodeSource) watchExit() {
err := t.cmd.cmd.Wait()
if err != nil && !isExpectedExit(err) {
t.mu.Lock()
t.failure = fmt.Errorf("ffmpeg exited: %w (%s)", err, t.cmd.Stderr())
t.mu.Unlock()
<-t.cmd.Done()
err := t.cmd.WaitErr()
if err != nil && !t.cmd.IsClosing() {
failure := fmt.Errorf("ffmpeg exited: %w (%s)", err, t.cmd.Stderr())
t.failure.Store(&failure)
}
if stat, err := os.Stat(t.tmpPath); err == nil {
t.size.Store(stat.Size())
}
t.final.Store(true)
t.mu.Lock()
t.cond.Broadcast()
t.mu.Unlock()
t.signalNotify()
}
func isExpectedExit(err error) bool {
// Killed by Close() — pion DC closed, that's fine.
if err == nil {
return true
// loadFailure returns the current failure (or nil) without taking a lock.
func (t *transcodeSource) loadFailure() error {
if p := t.failure.Load(); p != nil {
return *p
}
return false
return nil
}
func (t *transcodeSource) ReadAt(p []byte, off int64) (int, error) {
if t.failure != nil {
return 0, t.failure
if err := t.loadFailure(); err != nil {
return 0, err
}
if int64(len(p)) == 0 {
if len(p) == 0 {
return 0, nil
}
if off < 0 {
return 0, fmt.Errorf("transcode source: negative offset %d", off)
}
want := int64(len(p))
deadline := time.Now().Add(readBlockTimeout)
for {
if t.final.Load() {
break
}
size := t.size.Load()
if off+int64(len(p)) <= size || t.final.Load() {
// Overflow-safe form of "off + want <= size":
if size >= off && size-off >= want {
break
}
// Need to wait for ffmpeg to write more.
t.mu.Lock()
// Check again under lock to avoid lost wakeup.
size = t.size.Load()
if off+int64(len(p)) <= size || t.final.Load() {
t.mu.Unlock()
remaining := time.Until(deadline)
if remaining <= 0 {
break
}
// Wait with timeout via a small sleep loop — sync.Cond doesn't
// support timed wait, and a goroutine-per-sleep pattern works fine
// for our scale.
waited := time.NewTimer(500 * time.Millisecond)
done := make(chan struct{})
go func() {
t.cond.Wait()
close(done)
}()
t.mu.Unlock()
wait := 500 * time.Millisecond
if remaining < wait {
wait = remaining
}
select {
case <-done:
case <-waited.C:
t.mu.Lock()
t.cond.Broadcast() // wake the goroutine so it can return
t.mu.Unlock()
<-done
}
if time.Now().After(deadline) {
break
case <-t.ctx.Done():
return 0, t.ctx.Err()
case <-t.notify:
case <-time.After(wait):
}
}
if t.failure != nil {
return 0, t.failure
if err := t.loadFailure(); err != nil {
return 0, err
}
n, err := t.tmpFile.ReadAt(p, off)
// On growing file ReadAt returns io.EOF when reading past current size.
// Convert to io.ErrUnexpectedEOF only when we actually exceeded the
// final size; otherwise return n, nil so the pump sends what we have.
// On a growing file ReadAt returns io.EOF when reading past current size.
// Translate that into "send what we have, RangeEnd will follow" by
// returning (n, nil) so the pump treats the data as a partial chunk and
// caller re-requests once more bytes appear. Only true EOF (final=true)
// propagates as io.EOF.
if err == io.EOF && !t.final.Load() {
if n > 0 {
return n, nil
@ -281,23 +280,26 @@ func (t *transcodeSource) EstimatedSize() int64 {
return t.estimate
}
func (t *transcodeSource) FileName() string {
// Keep the original extension stripped — output is always fragmented MP4.
base := t.name
if i := lastIndexByte(base, '.'); i >= 0 {
base = base[:i]
}
return base + ".mp4"
// Output is always fragmented MP4 regardless of source extension.
return strings.TrimSuffix(t.name, filepath.Ext(t.name)) + ".mp4"
}
func (t *transcodeSource) Transcoded() bool { return true }
func (t *transcodeSource) Close() error {
_ = t.cmd.Close()
var errs []error
if err := t.cmd.Close(); err != nil {
errs = append(errs, err)
}
if t.tmpFile != nil {
_ = t.tmpFile.Close()
if err := t.tmpFile.Close(); err != nil {
errs = append(errs, err)
}
}
if t.tmpPath != "" {
_ = os.Remove(t.tmpPath)
if err := os.Remove(t.tmpPath); err != nil && !os.IsNotExist(err) {
errs = append(errs, err)
}
return nil
}
return errors.Join(errs...)
}
// estimateOutputSize converts probed bitrate × duration into a byte estimate
@ -343,12 +345,3 @@ func parseBitrateKbps(s string, fallback int) int {
}
return v * mult
}
func lastIndexByte(s string, c byte) int {
for i := len(s) - 1; i >= 0; i-- {
if s[i] == c {
return i
}
}
return -1
}

View file

@ -35,6 +35,10 @@ type TranscodeOpts struct {
// One Transcoder == one playback position. A seek beyond the buffered window
// requires Close()ing this transcoder and starting a new one with a higher
// StartSeconds (handled in webrtc_stream.go).
//
// A single internal goroutine owns cmd.Wait() — never call cmd.Wait()
// directly from outside (os/exec forbids concurrent Wait callers). Use
// Done() / WaitErr() instead.
type Transcoder struct {
cmd *exec.Cmd
out io.ReadCloser
@ -42,6 +46,9 @@ type Transcoder struct {
mu sync.Mutex
closed bool
stderr strings.Builder
done chan struct{} // closed once cmd.Wait returns; nil if cmd never started
waitErr error // populated before done is closed; read-only after
}
// NewTranscoder spawns ffmpeg and returns a Transcoder whose Read() yields
@ -61,6 +68,7 @@ func NewTranscoder(ctx context.Context, filePath string, opts TranscodeOpts) (*T
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("transcoder: start ffmpeg: %w", err)
}
t.startWaitGoroutine()
return t, nil
}
@ -81,13 +89,43 @@ func startTranscoderToFile(ctx context.Context, ffmpegPath string, args []string
if err := cmd.Start(); err != nil {
return nil, fmt.Errorf("transcoder: start ffmpeg: %w", err)
}
t.startWaitGoroutine()
return t, nil
}
// startWaitGoroutine launches the single goroutine that owns cmd.Wait().
// Idempotent — protected by sync.Once-via-nil-check on done.
func (t *Transcoder) startWaitGoroutine() {
if t.done != nil {
return
}
t.done = make(chan struct{})
go func() {
t.waitErr = t.cmd.Wait()
close(t.done)
}()
}
// Done returns a channel that closes when ffmpeg exits. Returns nil for a
// Transcoder whose cmd never started.
func (t *Transcoder) Done() <-chan struct{} { return t.done }
// WaitErr blocks until ffmpeg exits and returns the wait error. Safe to
// call concurrently from multiple goroutines.
func (t *Transcoder) WaitErr() error {
if t.done == nil {
return nil
}
<-t.done
return t.waitErr
}
// Read implements io.Reader.
func (t *Transcoder) Read(p []byte) (int, error) { return t.out.Read(p) }
// Close kills the child process if still running and waits up to 2s for exit.
// IsClosing reports true after Close has been invoked — used by streamSource
// to distinguish a kill-by-Close from a genuine ffmpeg crash.
func (t *Transcoder) Close() error {
t.mu.Lock()
if t.closed {
@ -106,19 +144,26 @@ func (t *Transcoder) Close() error {
if t.cmd != nil && t.cmd.Process != nil {
_ = t.cmd.Process.Kill()
}
if t.cmd == nil {
if t.done == nil {
return nil
}
done := make(chan error, 1)
go func() { done <- t.cmd.Wait() }()
select {
case <-done:
case <-t.done:
case <-time.After(2 * time.Second):
// Process refused to die — leak it; the OS will clean up on exit.
}
return nil
}
// IsClosing reports whether Close has been invoked. Cheap atomic-ish check
// for callers that want to distinguish a kill-by-Close exit from a real
// ffmpeg failure when reading WaitErr.
func (t *Transcoder) IsClosing() bool {
t.mu.Lock()
defer t.mu.Unlock()
return t.closed
}
// Stderr returns the accumulated ffmpeg stderr so far. Useful for surfacing
// failure reasons in logs after Close().
func (t *Transcoder) Stderr() string {
@ -185,27 +230,47 @@ func buildFFmpegArgs(filePath string, opts TranscodeOpts) []string {
if videoCodec == "libx264" {
args = append(args, "-preset", coalesce(opts.Preset, "veryfast"))
}
// Force the broadest browser-compatible h264 profile. `high` (libx264
// default) makes Chrome try its hardware decoder path first, which
// can fail with "VaapiWrapper: failed initializing" on Linux boxes
// where VA-API isn't fully wired up. `main` keeps a clean software
// decode fallback on every desktop + mobile platform.
args = append(args, "-profile:v", "main", "-level:v", "4.0")
args = append(args, "-b:v", coalesce(opts.VideoBitrate, "5M"))
// Filter chain:
// 1. scale (optional) — cap height + force even width.
// 2. format=yuv420p — drop 10-bit + reset pix_fmt to 8-bit before
// libx264 (which refuses 10-bit unless built with --bit-depth=10).
// 3. setparams — REWRITE the color metadata in the output stream's
// VUI/SEI without touching pixels. This is what makes HDR HEVC
// sources (color_primaries=bt2020, color_transfer=arib-std-b67)
// decodeable in browsers that reject anything but Rec.709. We
// can't actually tonemap without libzimg/zscale (most ffmpeg
// builds — including ours — ship without it), so colours look
// desaturated on HDR sources, but the file plays. SDR sources
// already match these params and are unaffected.
var filterChain string
if opts.MaxHeight > 0 {
// `-2:H` scales to height H, derives width preserving aspect ratio,
// and rounds to a multiple of 2 (libx264 refuses odd dimensions).
// `force_original_aspect_ratio=decrease` keeps shorter sources
// untouched instead of upscaling. `pix_fmt yuv420p` keeps 10-bit
// HEVC sources playable in browsers (8-bit only).
args = append(args,
"-vf",
fmt.Sprintf("scale=-2:%d:force_original_aspect_ratio=decrease", opts.MaxHeight),
"-pix_fmt", "yuv420p",
filterChain = fmt.Sprintf(
"scale=-2:%d:force_original_aspect_ratio=decrease,format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv",
opts.MaxHeight,
)
} else {
args = append(args, "-pix_fmt", "yuv420p")
filterChain = "format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv"
}
args = append(args, "-vf", filterChain)
args = append(args, "-c:a", "aac", "-b:a", coalesce(opts.AudioBitrate, "192k"))
}
// Common output flags — fragmented MP4 to a single pipe.
// NO faststart: that flag rewrites the moov atom to the front of the
// file as a SECOND pass after encoding finishes, which means the
// browser never sees a moov until ffmpeg exits. For live transcoding
// we need empty_moov (write a placeholder up front) so MSE can start
// decoding the very first fragment. faststart is only safe for
// already-finished files.
args = append(args,
"-movflags", "frag_keyframe+empty_moov+default_base_moof+faststart",
"-movflags", "frag_keyframe+empty_moov+default_base_moof",
"-f", "mp4",
"pipe:1",
)

View file

@ -145,7 +145,7 @@ func buildStreamSource(
log StreamLogger,
) (streamSource, error) {
tc := cfg.Transcode
cap := resolveQualityCap(cfg.Quality)
qcap := resolveQualityCap(cfg.Quality)
if tc.Disabled || tc.FFmpegPath == "" || tc.FFprobePath == "" {
return newDiskFileSource(abs)
@ -160,14 +160,10 @@ func buildStreamSource(
// Quality cap can promote a passthrough/remux decision into a full video
// transcode when the source resolution exceeds the requested cap.
forcedByQuality := false
if cap.MaxHeight > 0 && probe.Height > 0 && probe.Height > cap.MaxHeight {
if action != ActionTranscodeVideo {
if qcap.MaxHeight > 0 && probe.Height > 0 && probe.Height > qcap.MaxHeight && action != ActionTranscodeVideo {
log.Infof("[wrtc %s] quality=%s caps height %d→%d — forcing video transcode",
agent.ShortID(cfg.SessionID), cfg.Quality, probe.Height, cap.MaxHeight)
agent.ShortID(cfg.SessionID), cfg.Quality, probe.Height, qcap.MaxHeight)
action = ActionTranscodeVideo
forcedByQuality = true
}
}
if action == ActionPassthrough {
@ -178,15 +174,14 @@ func buildStreamSource(
log.Infof("[wrtc %s] transcoding %s/%s/%s → h264+aac (%s, quality=%s)",
agent.ShortID(cfg.SessionID), probe.Container, probe.VideoCodec, probe.AudioCodec,
action, coalesceLabel(cfg.Quality))
action, coalesce(cfg.Quality, "default"))
maxHeight := tc.MaxHeight
videoBitrate := tc.VideoBitrate
if cap.MaxHeight > 0 {
maxHeight = cap.MaxHeight
videoBitrate = cap.VideoBitrate
if qcap.MaxHeight > 0 {
maxHeight = qcap.MaxHeight
videoBitrate = qcap.VideoBitrate
}
_ = forcedByQuality // reserved for future telemetry
opts := TranscodeOpts{
Action: action,
@ -200,13 +195,6 @@ func buildStreamSource(
return newTranscodeSource(ctx, abs, probe, action, opts, displayName)
}
func coalesceLabel(s string) string {
if s == "" {
return "default"
}
return s
}
// RunWebRTCStream blocks until the session ends — either the DataChannel
// closes, the peer connection drops, or ctx is cancelled. Always returns a
// non-nil error explaining the termination reason.
@ -520,6 +508,13 @@ func (p *dataChannelPump) onOpen() {
// ffmpeg writes more bytes; the estimate just bootstraps the UI.
announceSize := p.source.EstimatedSize()
transcoding := p.source.Transcoded()
// Browsers refuse to start playback when Content-Length is 0. If we don't
// have a duration estimate (e.g. ffprobe couldn't tag the source), declare
// a large sentinel so the browser issues range requests; the Transcoding
// flag tells it the value is provisional.
if transcoding && announceSize <= 0 {
announceSize = math.MaxInt64
}
// Seekable=true even for transcoded sources because we read from a tmp
// file (random access). Seek backwards just works; seek forward beyond
// what ffmpeg has produced will block briefly inside ReadAt.
@ -633,22 +628,35 @@ func (p *dataChannelPump) serveRange(streamID uint32, req wire.RangeReqPayload)
if req.Length > math.MaxInt64 {
want = 0 // treat absurd length as "remainder of file"
}
// "Remainder" target: prefer current known size, fall back to estimate
// for transcoded streams so the browser can keep scrolling forward as
// ffmpeg produces output.
knownEnd := currentSize
if p.source.Final() {
knownEnd = finalSize
// Cap by *final* size, not currentSize. For a still-transcoding stream
// currentSize grows over time and ReadAt below already blocks until
// ffmpeg produces the requested bytes (with a deadline). If we cap
// `want` by currentSize here we'll send an empty RangeEnd whenever the
// browser asks for bytes faster than ffmpeg writes them — which is
// always true on the first few seconds — and the browser then aborts
// playback with "Format error".
cap := finalSize
if !p.source.Final() && cap < int64(req.Offset)+1 {
// Estimate too small: serve as much as the browser asked for and
// let ReadAt block.
cap = int64(req.Offset) + want
}
if knownEnd < int64(req.Offset) {
knownEnd = int64(req.Offset)
if int64(req.Offset) >= cap && p.source.Final() {
// Past true end of a finished file.
p.sendRangeEnd(streamID, 0)
return
}
remaining := cap - int64(req.Offset)
if remaining < 0 {
remaining = 0
}
remaining := knownEnd - int64(req.Offset)
if want <= 0 || want > remaining {
want = remaining
}
p.log.Infof("dc: range_req sid=%d offset=%d wantReq=%d wantServe=%d currentSize=%d final=%v",
streamID, req.Offset, req.Length, want, currentSize, p.source.Final())
if want <= 0 {
// Nothing to serve right now (transcoder hasn't reached this offset).
// Only happens for a finished file when offset is at/past EOF.
p.sendRangeEnd(streamID, 0)
return
}