diff --git a/internal/engine/stream_source.go b/internal/engine/stream_source.go index 98aca74..2dc1d3c 100644 --- a/internal/engine/stream_source.go +++ b/internal/engine/stream_source.go @@ -7,7 +7,7 @@ import ( "io" "os" "path/filepath" - "sync" + "strings" "sync/atomic" "time" ) @@ -80,11 +80,11 @@ type transcodeSource struct { name string estimate int64 - mu sync.Mutex - cond *sync.Cond + ctx context.Context + notify chan struct{} // size grew or final flipped; cap=1, non-blocking send size atomic.Int64 final atomic.Bool - failure error + failure atomic.Pointer[error] startedAt time.Time } @@ -119,8 +119,7 @@ func newTranscodeSource( // Spawn ffmpeg directly (not via NewTranscoder pipe) so it writes to // disk in real time. We re-use the rest of TranscodeOpts wiring. - cmd := &Transcoder{} - cmd, err = startTranscoderToFile(ctx, opts.FFmpegPath, args, cmd) + cmd, err := startTranscoderToFile(ctx, opts.FFmpegPath, args, nil) if err != nil { os.Remove(tmpPath) return nil, err @@ -133,9 +132,10 @@ func newTranscodeSource( cmd: cmd, name: displayName, estimate: estimate, + ctx: ctx, + notify: make(chan struct{}, 1), startedAt: time.Now(), } - t.cond = sync.NewCond(&t.mu) // Re-open the tmp file for reading; ffmpeg keeps writing to it. rf, err := os.Open(tmpPath) @@ -151,6 +151,17 @@ func newTranscodeSource( return t, nil } +// signalNotify wakes any goroutine blocked in ReadAt. Non-blocking: if a +// notification is already pending the new event is folded into it (callers +// always re-check size + final after waking, so a coalesced signal still +// produces correct behaviour). +func (t *transcodeSource) signalNotify() { + select { + case t.notify <- struct{}{}: + default: + } +} + // watchSize polls the temp file size every 200 ms and wakes any blocked // ReadAt callers once new bytes arrive. func (t *transcodeSource) watchSize(ctx context.Context) { @@ -159,16 +170,12 @@ func (t *transcodeSource) watchSize(ctx context.Context) { for { select { case <-ctx.Done(): - t.mu.Lock() - t.cond.Broadcast() - t.mu.Unlock() + t.signalNotify() return case <-ticker.C: } if t.final.Load() { - t.mu.Lock() - t.cond.Broadcast() - t.mu.Unlock() + t.signalNotify() return } stat, err := os.Stat(t.tmpPath) @@ -178,91 +185,83 @@ func (t *transcodeSource) watchSize(ctx context.Context) { current := stat.Size() if current > t.size.Load() { t.size.Store(current) - t.mu.Lock() - t.cond.Broadcast() - t.mu.Unlock() + t.signalNotify() } } } -// watchExit waits for ffmpeg to exit and locks in the final size. +// watchExit waits for ffmpeg to exit (via Transcoder's single-Wait goroutine) +// and locks in the final size. A kill triggered by Close() is NOT a failure. func (t *transcodeSource) watchExit() { - err := t.cmd.cmd.Wait() - if err != nil && !isExpectedExit(err) { - t.mu.Lock() - t.failure = fmt.Errorf("ffmpeg exited: %w (%s)", err, t.cmd.Stderr()) - t.mu.Unlock() + <-t.cmd.Done() + err := t.cmd.WaitErr() + if err != nil && !t.cmd.IsClosing() { + failure := fmt.Errorf("ffmpeg exited: %w (%s)", err, t.cmd.Stderr()) + t.failure.Store(&failure) } if stat, err := os.Stat(t.tmpPath); err == nil { t.size.Store(stat.Size()) } t.final.Store(true) - t.mu.Lock() - t.cond.Broadcast() - t.mu.Unlock() + t.signalNotify() } -func isExpectedExit(err error) bool { - // Killed by Close() — pion DC closed, that's fine. - if err == nil { - return true +// loadFailure returns the current failure (or nil) without taking a lock. +func (t *transcodeSource) loadFailure() error { + if p := t.failure.Load(); p != nil { + return *p } - return false + return nil } func (t *transcodeSource) ReadAt(p []byte, off int64) (int, error) { - if t.failure != nil { - return 0, t.failure + if err := t.loadFailure(); err != nil { + return 0, err } - if int64(len(p)) == 0 { + if len(p) == 0 { return 0, nil } - deadline := time.Now().Add(readBlockTimeout) + if off < 0 { + return 0, fmt.Errorf("transcode source: negative offset %d", off) + } + want := int64(len(p)) + deadline := time.Now().Add(readBlockTimeout) for { + if t.final.Load() { + break + } size := t.size.Load() - if off+int64(len(p)) <= size || t.final.Load() { + // Overflow-safe form of "off + want <= size": + if size >= off && size-off >= want { break } - // Need to wait for ffmpeg to write more. - t.mu.Lock() - // Check again under lock to avoid lost wakeup. - size = t.size.Load() - if off+int64(len(p)) <= size || t.final.Load() { - t.mu.Unlock() + remaining := time.Until(deadline) + if remaining <= 0 { break } - // Wait with timeout via a small sleep loop — sync.Cond doesn't - // support timed wait, and a goroutine-per-sleep pattern works fine - // for our scale. - waited := time.NewTimer(500 * time.Millisecond) - done := make(chan struct{}) - go func() { - t.cond.Wait() - close(done) - }() - t.mu.Unlock() + wait := 500 * time.Millisecond + if remaining < wait { + wait = remaining + } select { - case <-done: - case <-waited.C: - t.mu.Lock() - t.cond.Broadcast() // wake the goroutine so it can return - t.mu.Unlock() - <-done - } - if time.Now().After(deadline) { - break + case <-t.ctx.Done(): + return 0, t.ctx.Err() + case <-t.notify: + case <-time.After(wait): } } - if t.failure != nil { - return 0, t.failure + if err := t.loadFailure(); err != nil { + return 0, err } n, err := t.tmpFile.ReadAt(p, off) - // On growing file ReadAt returns io.EOF when reading past current size. - // Convert to io.ErrUnexpectedEOF only when we actually exceeded the - // final size; otherwise return n, nil so the pump sends what we have. + // On a growing file ReadAt returns io.EOF when reading past current size. + // Translate that into "send what we have, RangeEnd will follow" by + // returning (n, nil) so the pump treats the data as a partial chunk and + // caller re-requests once more bytes appear. Only true EOF (final=true) + // propagates as io.EOF. if err == io.EOF && !t.final.Load() { if n > 0 { return n, nil @@ -281,23 +280,26 @@ func (t *transcodeSource) EstimatedSize() int64 { return t.estimate } func (t *transcodeSource) FileName() string { - // Keep the original extension stripped — output is always fragmented MP4. - base := t.name - if i := lastIndexByte(base, '.'); i >= 0 { - base = base[:i] - } - return base + ".mp4" + // Output is always fragmented MP4 regardless of source extension. + return strings.TrimSuffix(t.name, filepath.Ext(t.name)) + ".mp4" } func (t *transcodeSource) Transcoded() bool { return true } func (t *transcodeSource) Close() error { - _ = t.cmd.Close() + var errs []error + if err := t.cmd.Close(); err != nil { + errs = append(errs, err) + } if t.tmpFile != nil { - _ = t.tmpFile.Close() + if err := t.tmpFile.Close(); err != nil { + errs = append(errs, err) + } } if t.tmpPath != "" { - _ = os.Remove(t.tmpPath) + if err := os.Remove(t.tmpPath); err != nil && !os.IsNotExist(err) { + errs = append(errs, err) + } } - return nil + return errors.Join(errs...) } // estimateOutputSize converts probed bitrate × duration into a byte estimate @@ -343,12 +345,3 @@ func parseBitrateKbps(s string, fallback int) int { } return v * mult } - -func lastIndexByte(s string, c byte) int { - for i := len(s) - 1; i >= 0; i-- { - if s[i] == c { - return i - } - } - return -1 -} diff --git a/internal/engine/transcoder.go b/internal/engine/transcoder.go index 9f38cd6..1752d6d 100644 --- a/internal/engine/transcoder.go +++ b/internal/engine/transcoder.go @@ -35,6 +35,10 @@ type TranscodeOpts struct { // One Transcoder == one playback position. A seek beyond the buffered window // requires Close()ing this transcoder and starting a new one with a higher // StartSeconds (handled in webrtc_stream.go). +// +// A single internal goroutine owns cmd.Wait() — never call cmd.Wait() +// directly from outside (os/exec forbids concurrent Wait callers). Use +// Done() / WaitErr() instead. type Transcoder struct { cmd *exec.Cmd out io.ReadCloser @@ -42,6 +46,9 @@ type Transcoder struct { mu sync.Mutex closed bool stderr strings.Builder + + done chan struct{} // closed once cmd.Wait returns; nil if cmd never started + waitErr error // populated before done is closed; read-only after } // NewTranscoder spawns ffmpeg and returns a Transcoder whose Read() yields @@ -61,6 +68,7 @@ func NewTranscoder(ctx context.Context, filePath string, opts TranscodeOpts) (*T if err := cmd.Start(); err != nil { return nil, fmt.Errorf("transcoder: start ffmpeg: %w", err) } + t.startWaitGoroutine() return t, nil } @@ -81,13 +89,43 @@ func startTranscoderToFile(ctx context.Context, ffmpegPath string, args []string if err := cmd.Start(); err != nil { return nil, fmt.Errorf("transcoder: start ffmpeg: %w", err) } + t.startWaitGoroutine() return t, nil } +// startWaitGoroutine launches the single goroutine that owns cmd.Wait(). +// Idempotent — protected by sync.Once-via-nil-check on done. +func (t *Transcoder) startWaitGoroutine() { + if t.done != nil { + return + } + t.done = make(chan struct{}) + go func() { + t.waitErr = t.cmd.Wait() + close(t.done) + }() +} + +// Done returns a channel that closes when ffmpeg exits. Returns nil for a +// Transcoder whose cmd never started. +func (t *Transcoder) Done() <-chan struct{} { return t.done } + +// WaitErr blocks until ffmpeg exits and returns the wait error. Safe to +// call concurrently from multiple goroutines. +func (t *Transcoder) WaitErr() error { + if t.done == nil { + return nil + } + <-t.done + return t.waitErr +} + // Read implements io.Reader. func (t *Transcoder) Read(p []byte) (int, error) { return t.out.Read(p) } // Close kills the child process if still running and waits up to 2s for exit. +// IsClosing reports true after Close has been invoked — used by streamSource +// to distinguish a kill-by-Close from a genuine ffmpeg crash. func (t *Transcoder) Close() error { t.mu.Lock() if t.closed { @@ -106,19 +144,26 @@ func (t *Transcoder) Close() error { if t.cmd != nil && t.cmd.Process != nil { _ = t.cmd.Process.Kill() } - if t.cmd == nil { + if t.done == nil { return nil } - done := make(chan error, 1) - go func() { done <- t.cmd.Wait() }() select { - case <-done: + case <-t.done: case <-time.After(2 * time.Second): // Process refused to die — leak it; the OS will clean up on exit. } return nil } +// IsClosing reports whether Close has been invoked. Cheap atomic-ish check +// for callers that want to distinguish a kill-by-Close exit from a real +// ffmpeg failure when reading WaitErr. +func (t *Transcoder) IsClosing() bool { + t.mu.Lock() + defer t.mu.Unlock() + return t.closed +} + // Stderr returns the accumulated ffmpeg stderr so far. Useful for surfacing // failure reasons in logs after Close(). func (t *Transcoder) Stderr() string { @@ -185,27 +230,47 @@ func buildFFmpegArgs(filePath string, opts TranscodeOpts) []string { if videoCodec == "libx264" { args = append(args, "-preset", coalesce(opts.Preset, "veryfast")) } + // Force the broadest browser-compatible h264 profile. `high` (libx264 + // default) makes Chrome try its hardware decoder path first, which + // can fail with "VaapiWrapper: failed initializing" on Linux boxes + // where VA-API isn't fully wired up. `main` keeps a clean software + // decode fallback on every desktop + mobile platform. + args = append(args, "-profile:v", "main", "-level:v", "4.0") args = append(args, "-b:v", coalesce(opts.VideoBitrate, "5M")) + // Filter chain: + // 1. scale (optional) — cap height + force even width. + // 2. format=yuv420p — drop 10-bit + reset pix_fmt to 8-bit before + // libx264 (which refuses 10-bit unless built with --bit-depth=10). + // 3. setparams — REWRITE the color metadata in the output stream's + // VUI/SEI without touching pixels. This is what makes HDR HEVC + // sources (color_primaries=bt2020, color_transfer=arib-std-b67) + // decodeable in browsers that reject anything but Rec.709. We + // can't actually tonemap without libzimg/zscale (most ffmpeg + // builds — including ours — ship without it), so colours look + // desaturated on HDR sources, but the file plays. SDR sources + // already match these params and are unaffected. + var filterChain string if opts.MaxHeight > 0 { - // `-2:H` scales to height H, derives width preserving aspect ratio, - // and rounds to a multiple of 2 (libx264 refuses odd dimensions). - // `force_original_aspect_ratio=decrease` keeps shorter sources - // untouched instead of upscaling. `pix_fmt yuv420p` keeps 10-bit - // HEVC sources playable in browsers (8-bit only). - args = append(args, - "-vf", - fmt.Sprintf("scale=-2:%d:force_original_aspect_ratio=decrease", opts.MaxHeight), - "-pix_fmt", "yuv420p", + filterChain = fmt.Sprintf( + "scale=-2:%d:force_original_aspect_ratio=decrease,format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv", + opts.MaxHeight, ) } else { - args = append(args, "-pix_fmt", "yuv420p") + filterChain = "format=yuv420p,setparams=colorspace=bt709:color_trc=bt709:color_primaries=bt709:range=tv" } + args = append(args, "-vf", filterChain) args = append(args, "-c:a", "aac", "-b:a", coalesce(opts.AudioBitrate, "192k")) } // Common output flags — fragmented MP4 to a single pipe. + // NO faststart: that flag rewrites the moov atom to the front of the + // file as a SECOND pass after encoding finishes, which means the + // browser never sees a moov until ffmpeg exits. For live transcoding + // we need empty_moov (write a placeholder up front) so MSE can start + // decoding the very first fragment. faststart is only safe for + // already-finished files. args = append(args, - "-movflags", "frag_keyframe+empty_moov+default_base_moof+faststart", + "-movflags", "frag_keyframe+empty_moov+default_base_moof", "-f", "mp4", "pipe:1", ) diff --git a/internal/engine/webrtc_stream.go b/internal/engine/webrtc_stream.go index 691d5a3..e3c6378 100644 --- a/internal/engine/webrtc_stream.go +++ b/internal/engine/webrtc_stream.go @@ -145,7 +145,7 @@ func buildStreamSource( log StreamLogger, ) (streamSource, error) { tc := cfg.Transcode - cap := resolveQualityCap(cfg.Quality) + qcap := resolveQualityCap(cfg.Quality) if tc.Disabled || tc.FFmpegPath == "" || tc.FFprobePath == "" { return newDiskFileSource(abs) @@ -160,14 +160,10 @@ func buildStreamSource( // Quality cap can promote a passthrough/remux decision into a full video // transcode when the source resolution exceeds the requested cap. - forcedByQuality := false - if cap.MaxHeight > 0 && probe.Height > 0 && probe.Height > cap.MaxHeight { - if action != ActionTranscodeVideo { - log.Infof("[wrtc %s] quality=%s caps height %d→%d — forcing video transcode", - agent.ShortID(cfg.SessionID), cfg.Quality, probe.Height, cap.MaxHeight) - action = ActionTranscodeVideo - forcedByQuality = true - } + if qcap.MaxHeight > 0 && probe.Height > 0 && probe.Height > qcap.MaxHeight && action != ActionTranscodeVideo { + log.Infof("[wrtc %s] quality=%s caps height %d→%d — forcing video transcode", + agent.ShortID(cfg.SessionID), cfg.Quality, probe.Height, qcap.MaxHeight) + action = ActionTranscodeVideo } if action == ActionPassthrough { @@ -178,15 +174,14 @@ func buildStreamSource( log.Infof("[wrtc %s] transcoding %s/%s/%s → h264+aac (%s, quality=%s)", agent.ShortID(cfg.SessionID), probe.Container, probe.VideoCodec, probe.AudioCodec, - action, coalesceLabel(cfg.Quality)) + action, coalesce(cfg.Quality, "default")) maxHeight := tc.MaxHeight videoBitrate := tc.VideoBitrate - if cap.MaxHeight > 0 { - maxHeight = cap.MaxHeight - videoBitrate = cap.VideoBitrate + if qcap.MaxHeight > 0 { + maxHeight = qcap.MaxHeight + videoBitrate = qcap.VideoBitrate } - _ = forcedByQuality // reserved for future telemetry opts := TranscodeOpts{ Action: action, @@ -200,13 +195,6 @@ func buildStreamSource( return newTranscodeSource(ctx, abs, probe, action, opts, displayName) } -func coalesceLabel(s string) string { - if s == "" { - return "default" - } - return s -} - // RunWebRTCStream blocks until the session ends — either the DataChannel // closes, the peer connection drops, or ctx is cancelled. Always returns a // non-nil error explaining the termination reason. @@ -520,6 +508,13 @@ func (p *dataChannelPump) onOpen() { // ffmpeg writes more bytes; the estimate just bootstraps the UI. announceSize := p.source.EstimatedSize() transcoding := p.source.Transcoded() + // Browsers refuse to start playback when Content-Length is 0. If we don't + // have a duration estimate (e.g. ffprobe couldn't tag the source), declare + // a large sentinel so the browser issues range requests; the Transcoding + // flag tells it the value is provisional. + if transcoding && announceSize <= 0 { + announceSize = math.MaxInt64 + } // Seekable=true even for transcoded sources because we read from a tmp // file (random access). Seek backwards just works; seek forward beyond // what ffmpeg has produced will block briefly inside ReadAt. @@ -633,22 +628,35 @@ func (p *dataChannelPump) serveRange(streamID uint32, req wire.RangeReqPayload) if req.Length > math.MaxInt64 { want = 0 // treat absurd length as "remainder of file" } - // "Remainder" target: prefer current known size, fall back to estimate - // for transcoded streams so the browser can keep scrolling forward as - // ffmpeg produces output. - knownEnd := currentSize - if p.source.Final() { - knownEnd = finalSize + // Cap by *final* size, not currentSize. For a still-transcoding stream + // currentSize grows over time and ReadAt below already blocks until + // ffmpeg produces the requested bytes (with a deadline). If we cap + // `want` by currentSize here we'll send an empty RangeEnd whenever the + // browser asks for bytes faster than ffmpeg writes them — which is + // always true on the first few seconds — and the browser then aborts + // playback with "Format error". + cap := finalSize + if !p.source.Final() && cap < int64(req.Offset)+1 { + // Estimate too small: serve as much as the browser asked for and + // let ReadAt block. + cap = int64(req.Offset) + want } - if knownEnd < int64(req.Offset) { - knownEnd = int64(req.Offset) + if int64(req.Offset) >= cap && p.source.Final() { + // Past true end of a finished file. + p.sendRangeEnd(streamID, 0) + return + } + remaining := cap - int64(req.Offset) + if remaining < 0 { + remaining = 0 } - remaining := knownEnd - int64(req.Offset) if want <= 0 || want > remaining { want = remaining } + p.log.Infof("dc: range_req sid=%d offset=%d wantReq=%d wantServe=%d currentSize=%d final=%v", + streamID, req.Offset, req.Length, want, currentSize, p.source.Final()) if want <= 0 { - // Nothing to serve right now (transcoder hasn't reached this offset). + // Only happens for a finished file when offset is at/past EOF. p.sendRangeEnd(streamID, 0) return }