unarr/internal/engine/stream_source.go
Deivid Soto ca7de23a56
Some checks failed
Release / release (push) Failing after 0s
Release / docker (push) Has been skipped
Release / virustotal (push) Failing after 0s
feat(stream)!: retire WebRTC, HLS-only, bump 0.9.4
Drops the custom WebRTC DataChannel pipeline + pion deps + WSS signaling
client + wire framing. Every in-browser playback now uses HLS over HTTP
from the daemon (Tailscale/LAN/UPnP). Browser P2P never re-enabled.

Wire renames (incompatible with web < 2026-05-26): agent.WebRTCSession
=> agent.StreamSession, SyncResponse.WebRTCSessions (JSON: webrtcSessions)
=> StreamSessions (JSON: streamSessions). MIN_AGENT_VERSION is bumped
to 0.9.4 on the web side so older agents see an upgrade card.

Also fixes the libx264 'VBV bitrate > level limit' abort by clamping
the encoder bitrate to the effective output height instead of the
requested label (carried over from the prior 0.9.3 unreleased work).

The seed_file vertical (mode=seed_file handler + engine.SeedFile) was
retired with the in-browser P2P player. [downloads.webrtc] config block
deleted; existing TOML files with the section still parse fine.
2026-05-26 18:04:35 +02:00

347 lines
9.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
)
// streamSource abstracts the byte source consumed by the HLS transcoder.
// Two implementations:
// - diskFileSource — direct passthrough of the on-disk file.
// - transcodeSource — ffmpeg writes a fragmented MP4 to a temp file in
// real time; reads block briefly when callers ask for bytes ahead of
// the writer.
type streamSource interface {
ReadAt(p []byte, off int64) (int, error)
// Size returns the currently known size. For transcoded sources this
// grows as ffmpeg produces output; on Final() it's the final size.
Size() int64
// Final reports whether the source size is now stable (passthrough is
// always final, transcoder becomes final when ffmpeg exits).
Final() bool
// EstimatedSize returns the final size we expect to converge on. For
// passthrough it's the same as Size(). For transcoder it's a bitrate
// × duration estimate so the browser scrubber has something to anchor
// on; the real size will differ ±20%.
EstimatedSize() int64
FileName() string
Transcoded() bool
Close() error
}
// ─────────────────────────────────────────────────────────────────────────────
// disk passthrough
// ─────────────────────────────────────────────────────────────────────────────
type diskFileSource struct {
f *os.File
size int64
name string
}
func newDiskFileSource(path string) (*diskFileSource, error) {
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("stream source: open %s: %w", path, err)
}
stat, err := f.Stat()
if err != nil {
f.Close()
return nil, fmt.Errorf("stream source: stat %s: %w", path, err)
}
return &diskFileSource{f: f, size: stat.Size(), name: filepath.Base(path)}, nil
}
func (d *diskFileSource) ReadAt(p []byte, off int64) (int, error) {
return d.f.ReadAt(p, off)
}
func (d *diskFileSource) Size() int64 { return d.size }
func (d *diskFileSource) Final() bool { return true }
func (d *diskFileSource) EstimatedSize() int64 { return d.size }
func (d *diskFileSource) FileName() string { return d.name }
func (d *diskFileSource) Transcoded() bool { return false }
func (d *diskFileSource) Close() error { return d.f.Close() }
// ─────────────────────────────────────────────────────────────────────────────
// transcode source — ffmpeg → tmp file
// ─────────────────────────────────────────────────────────────────────────────
type transcodeSource struct {
tmpPath string
tmpFile *os.File
cmd *Transcoder
name string
estimate int64
ctx context.Context
notify chan struct{} // size grew or final flipped; cap=1, non-blocking send
size atomic.Int64
final atomic.Bool
failure atomic.Pointer[error]
startedAt time.Time
}
const (
// readBlockTimeout caps how long ReadAt waits for bytes that haven't
// been transcoded yet before returning EOF/io.ErrUnexpectedEOF. The
// pump treats EOF as "respond with whatever we have so far + RangeEnd"
// so the browser can re-request once more bytes appear.
readBlockTimeout = 30 * time.Second
)
func newTranscodeSource(
ctx context.Context,
srcPath string,
probe *StreamProbe,
action TranscodeAction,
opts TranscodeOpts,
displayName string,
) (*transcodeSource, error) {
tmpFile, err := os.CreateTemp("", "tc-stream-*.mp4")
if err != nil {
return nil, fmt.Errorf("transcode source: tmp file: %w", err)
}
tmpPath := tmpFile.Name()
tmpFile.Close()
args := buildFFmpegArgs(srcPath, opts)
// Override -f mp4 pipe:1 with output to our tmp file path (last 3 args).
if len(args) >= 3 && args[len(args)-1] == "pipe:1" {
args[len(args)-1] = tmpPath
}
// Spawn ffmpeg directly (not via NewTranscoder pipe) so it writes to
// disk in real time. We re-use the rest of TranscodeOpts wiring.
cmd, err := startTranscoderToFile(ctx, opts.FFmpegPath, args, nil)
if err != nil {
os.Remove(tmpPath)
return nil, err
}
estimate := estimateOutputSize(probe, opts)
t := &transcodeSource{
tmpPath: tmpPath,
cmd: cmd,
name: displayName,
estimate: estimate,
ctx: ctx,
notify: make(chan struct{}, 1),
startedAt: time.Now(),
}
// Re-open the tmp file for reading; ffmpeg keeps writing to it.
rf, err := os.Open(tmpPath)
if err != nil {
_ = cmd.Close()
os.Remove(tmpPath)
return nil, fmt.Errorf("transcode source: reopen tmp: %w", err)
}
t.tmpFile = rf
go t.watchSize(ctx)
go t.watchExit()
return t, nil
}
// signalNotify wakes any goroutine blocked in ReadAt. Non-blocking: if a
// notification is already pending the new event is folded into it (callers
// always re-check size + final after waking, so a coalesced signal still
// produces correct behaviour).
func (t *transcodeSource) signalNotify() {
select {
case t.notify <- struct{}{}:
default:
}
}
// watchSize polls the temp file size every 200 ms and wakes any blocked
// ReadAt callers once new bytes arrive.
func (t *transcodeSource) watchSize(ctx context.Context) {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
t.signalNotify()
return
case <-ticker.C:
}
if t.final.Load() {
t.signalNotify()
return
}
stat, err := os.Stat(t.tmpPath)
if err != nil {
continue
}
current := stat.Size()
if current > t.size.Load() {
t.size.Store(current)
t.signalNotify()
}
}
}
// watchExit waits for ffmpeg to exit (via Transcoder's single-Wait goroutine)
// and locks in the final size. A kill triggered by Close() is NOT a failure.
func (t *transcodeSource) watchExit() {
<-t.cmd.Done()
err := t.cmd.WaitErr()
if err != nil && !t.cmd.IsClosing() {
failure := fmt.Errorf("ffmpeg exited: %w (%s)", err, t.cmd.Stderr())
t.failure.Store(&failure)
}
if stat, err := os.Stat(t.tmpPath); err == nil {
t.size.Store(stat.Size())
}
t.final.Store(true)
t.signalNotify()
}
// loadFailure returns the current failure (or nil) without taking a lock.
func (t *transcodeSource) loadFailure() error {
if p := t.failure.Load(); p != nil {
return *p
}
return nil
}
func (t *transcodeSource) ReadAt(p []byte, off int64) (int, error) {
if err := t.loadFailure(); err != nil {
return 0, err
}
if len(p) == 0 {
return 0, nil
}
if off < 0 {
return 0, fmt.Errorf("transcode source: negative offset %d", off)
}
want := int64(len(p))
deadline := time.Now().Add(readBlockTimeout)
for {
if t.final.Load() {
break
}
size := t.size.Load()
// Overflow-safe form of "off + want <= size":
if size >= off && size-off >= want {
break
}
remaining := time.Until(deadline)
if remaining <= 0 {
break
}
wait := 500 * time.Millisecond
if remaining < wait {
wait = remaining
}
select {
case <-t.ctx.Done():
return 0, t.ctx.Err()
case <-t.notify:
case <-time.After(wait):
}
}
if err := t.loadFailure(); err != nil {
return 0, err
}
n, err := t.tmpFile.ReadAt(p, off)
// On a growing file ReadAt returns io.EOF when reading past current size.
// Translate that into "send what we have, RangeEnd will follow" by
// returning (n, nil) so the pump treats the data as a partial chunk and
// caller re-requests once more bytes appear. Only true EOF (final=true)
// propagates as io.EOF.
if err == io.EOF && !t.final.Load() {
if n > 0 {
return n, nil
}
return 0, errors.New("transcode source: read timed out waiting for ffmpeg output")
}
return n, err
}
func (t *transcodeSource) Size() int64 { return t.size.Load() }
func (t *transcodeSource) Final() bool { return t.final.Load() }
func (t *transcodeSource) EstimatedSize() int64 {
if t.final.Load() {
return t.size.Load()
}
return t.estimate
}
func (t *transcodeSource) FileName() string {
// Output is always fragmented MP4 regardless of source extension.
return strings.TrimSuffix(t.name, filepath.Ext(t.name)) + ".mp4"
}
func (t *transcodeSource) Transcoded() bool { return true }
func (t *transcodeSource) Close() error {
var errs []error
if err := t.cmd.Close(); err != nil {
errs = append(errs, err)
}
if t.tmpFile != nil {
if err := t.tmpFile.Close(); err != nil {
errs = append(errs, err)
}
}
if t.tmpPath != "" {
if err := os.Remove(t.tmpPath); err != nil && !os.IsNotExist(err) {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// estimateOutputSize converts probed bitrate × duration into a byte estimate
// so the browser scrubber has something to anchor on while transcoding.
func estimateOutputSize(probe *StreamProbe, opts TranscodeOpts) int64 {
if probe == nil || probe.DurationSec <= 0 {
return 0
}
videoKbps := parseBitrateKbps(opts.VideoBitrate, 5000)
audioKbps := parseBitrateKbps(opts.AudioBitrate, 192)
totalKbps := videoKbps + audioKbps
bytesPerSec := int64(totalKbps) * 1000 / 8
return int64(probe.DurationSec) * bytesPerSec
}
// parseBitrateKbps converts ffmpeg-style bitrate strings ("5M", "192k") to
// kilobits per second. Unknown formats fall back to fallback.
func parseBitrateKbps(s string, fallback int) int {
if s == "" {
return fallback
}
last := s[len(s)-1]
num := s
mult := 1
switch last {
case 'k', 'K':
num = s[:len(s)-1]
case 'M', 'm':
num = s[:len(s)-1]
mult = 1000
default:
// already in bps? treat as kbps
}
v := 0
for _, c := range num {
if c < '0' || c > '9' {
return fallback
}
v = v*10 + int(c-'0')
}
if v == 0 {
return fallback
}
return v * mult
}