feat(stream): real-time transcoding for non-browser-decodable codecs
Source files in HEVC, AV1, AC3, DTS, EAC3, etc. now transcode through ffmpeg
to fragmented MP4 (h264 + aac) on-the-fly when the browser would otherwise
play silent black. Decision matrix lives in engine.DecideAction:
passthrough → remux → audio-transcode → full video-transcode.
Architecture — temp file + growing-size source:
- engine.streamSource interface abstracts byte source. Two impls:
* diskFileSource: passthrough when codecs are already browser-friendly.
* transcodeSource: spawns ffmpeg writing to a /tmp/tc-stream-*.mp4 file.
A ticker polls file size and wakes blocked ReadAt callers as ffmpeg
produces output. Estimate of final size (bitrate × duration) is
announced over the wire so the browser's scrubber has something to
anchor on.
- dataChannelPump now reads from streamSource instead of *os.File. HELLO
carries Transcoding=true + an estimated total size; Seekable=true (we
read random-access from the temp file even while writing).
- Transcoder runtime resolved per session by buildTranscodeRuntime in
cmd/daemon: ffmpeg/ffprobe path lookup + HWAccel auto-detection
(NVENC/QSV/VAAPI/VideoToolbox).
- New [downloads.transcode] TOML section: enabled (default true), hw_accel
(auto), preset (veryfast), video_bitrate (5M), audio_bitrate (192k),
max_height (optional downscale), max_concurrent (safety cap).
Falls back to passthrough if ffprobe is missing, fails, or codecs are
already browser-friendly. tmp file is cleaned up on session shutdown.
This commit is contained in:
parent
4314c06c5c
commit
66ac79664b
6 changed files with 583 additions and 51 deletions
|
|
@ -451,6 +451,7 @@ func runDaemonStart() error {
|
||||||
webrtcRegistry.remove(sess.SessionID)
|
webrtcRegistry.remove(sess.SessionID)
|
||||||
sessCancel()
|
sessCancel()
|
||||||
}()
|
}()
|
||||||
|
tcRuntime := buildTranscodeRuntime(ctx, cfg)
|
||||||
runCfg := engine.WebRTCStreamConfig{
|
runCfg := engine.WebRTCStreamConfig{
|
||||||
SessionID: sess.SessionID,
|
SessionID: sess.SessionID,
|
||||||
FilePath: filePath,
|
FilePath: filePath,
|
||||||
|
|
@ -459,6 +460,7 @@ func runDaemonStart() error {
|
||||||
ICEServers: engine.BuildICEServers(cfg.Download.WebRTC),
|
ICEServers: engine.BuildICEServers(cfg.Download.WebRTC),
|
||||||
Signal: agentClient,
|
Signal: agentClient,
|
||||||
Logger: stdLogger{},
|
Logger: stdLogger{},
|
||||||
|
Transcode: tcRuntime,
|
||||||
}
|
}
|
||||||
log.Printf("[wrtc %s] starting session: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath))
|
log.Printf("[wrtc %s] starting session: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath))
|
||||||
if err := engine.RunWebRTCStream(sessCtx, runCfg); err != nil {
|
if err := engine.RunWebRTCStream(sessCtx, runCfg); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/torrentclaw/unarr/internal/config"
|
||||||
|
"github.com/torrentclaw/unarr/internal/engine"
|
||||||
|
"github.com/torrentclaw/unarr/internal/library/mediainfo"
|
||||||
)
|
)
|
||||||
|
|
||||||
// webrtcRegistry tracks per-session cancel funcs for active custom WebRTC
|
// webrtcRegistry tracks per-session cancel funcs for active custom WebRTC
|
||||||
|
|
@ -60,3 +64,43 @@ type stdLogger struct{}
|
||||||
func (stdLogger) Infof(format string, args ...any) { log.Printf(format, args...) }
|
func (stdLogger) Infof(format string, args ...any) { log.Printf(format, args...) }
|
||||||
func (stdLogger) Warnf(format string, args ...any) { log.Printf("WARN: "+format, args...) }
|
func (stdLogger) Warnf(format string, args ...any) { log.Printf("WARN: "+format, args...) }
|
||||||
func (stdLogger) Errorf(format string, args ...any) { log.Printf("ERROR: "+format, args...) }
|
func (stdLogger) Errorf(format string, args ...any) { log.Printf("ERROR: "+format, args...) }
|
||||||
|
|
||||||
|
// buildTranscodeRuntime resolves the ffmpeg/ffprobe binaries + config knobs
|
||||||
|
// for the WebRTC streaming pipeline. Failure to resolve a binary returns a
|
||||||
|
// runtime with empty paths so engine.RunWebRTCStream falls back to
|
||||||
|
// passthrough — the user gets a clearer codec error from the browser than a
|
||||||
|
// daemon-side abort.
|
||||||
|
func buildTranscodeRuntime(ctx context.Context, cfg config.Config) engine.TranscodeRuntime {
|
||||||
|
if !cfg.Download.Transcode.Enabled {
|
||||||
|
return engine.TranscodeRuntime{Disabled: true}
|
||||||
|
}
|
||||||
|
ffmpegPath, errF := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath)
|
||||||
|
ffprobePath, errP := mediainfo.ResolveFFprobe(cfg.Library.FFprobePath)
|
||||||
|
if errF != nil || errP != nil {
|
||||||
|
return engine.TranscodeRuntime{Disabled: true}
|
||||||
|
}
|
||||||
|
hw := engine.HWAccelNone
|
||||||
|
switch cfg.Download.Transcode.HWAccel {
|
||||||
|
case "auto":
|
||||||
|
hw = engine.DetectHWAccel(ctx, ffmpegPath)
|
||||||
|
case "nvenc":
|
||||||
|
hw = engine.HWAccelNVENC
|
||||||
|
case "qsv":
|
||||||
|
hw = engine.HWAccelQSV
|
||||||
|
case "vaapi":
|
||||||
|
hw = engine.HWAccelVAAPI
|
||||||
|
case "videotoolbox":
|
||||||
|
hw = engine.HWAccelVideoToolbox
|
||||||
|
case "none", "":
|
||||||
|
hw = engine.HWAccelNone
|
||||||
|
}
|
||||||
|
return engine.TranscodeRuntime{
|
||||||
|
FFmpegPath: ffmpegPath,
|
||||||
|
FFprobePath: ffprobePath,
|
||||||
|
HWAccel: hw,
|
||||||
|
Preset: cfg.Download.Transcode.Preset,
|
||||||
|
VideoBitrate: cfg.Download.Transcode.VideoBitrate,
|
||||||
|
AudioBitrate: cfg.Download.Transcode.AudioBitrate,
|
||||||
|
MaxHeight: cfg.Download.Transcode.MaxHeight,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,17 +34,32 @@ type AgentConfig struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type DownloadConfig struct {
|
type DownloadConfig struct {
|
||||||
Dir string `toml:"dir"`
|
Dir string `toml:"dir"`
|
||||||
PreferredMethod string `toml:"preferred_method"`
|
PreferredMethod string `toml:"preferred_method"`
|
||||||
PreferredQuality string `toml:"preferred_quality"` // "2160p", "1080p", "720p" — hint for auto-selection
|
PreferredQuality string `toml:"preferred_quality"` // "2160p", "1080p", "720p" — hint for auto-selection
|
||||||
MaxConcurrent int `toml:"max_concurrent"`
|
MaxConcurrent int `toml:"max_concurrent"`
|
||||||
MaxDownloadSpeed string `toml:"max_download_speed"` // e.g. "10MB", "500KB", "0" = unlimited
|
MaxDownloadSpeed string `toml:"max_download_speed"` // e.g. "10MB", "500KB", "0" = unlimited
|
||||||
MaxUploadSpeed string `toml:"max_upload_speed"` // e.g. "1MB", "0" = unlimited
|
MaxUploadSpeed string `toml:"max_upload_speed"` // e.g. "1MB", "0" = unlimited
|
||||||
MetadataTimeout string `toml:"metadata_timeout"` // e.g. "1h", "30m", "0" = unlimited (default: "0")
|
MetadataTimeout string `toml:"metadata_timeout"` // e.g. "1h", "30m", "0" = unlimited (default: "0")
|
||||||
StallTimeout string `toml:"stall_timeout"` // e.g. "30m", "1h", "0" = unlimited (default: "30m")
|
StallTimeout string `toml:"stall_timeout"` // e.g. "30m", "1h", "0" = unlimited (default: "30m")
|
||||||
ListenPort int `toml:"listen_port"` // fixed port for incoming peer connections (default: 42069, 0 = random)
|
ListenPort int `toml:"listen_port"` // fixed port for incoming peer connections (default: 42069, 0 = random)
|
||||||
StreamPort int `toml:"stream_port"` // fixed port for streaming HTTP server (default: 11818)
|
StreamPort int `toml:"stream_port"` // fixed port for streaming HTTP server (default: 11818)
|
||||||
WebRTC WebRTCConfig `toml:"webrtc"`
|
WebRTC WebRTCConfig `toml:"webrtc"`
|
||||||
|
Transcode TranscodeConfig `toml:"transcode"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// TranscodeConfig controls real-time transcoding for the in-browser player
|
||||||
|
// when source codecs aren't browser-decodable (HEVC, AV1, AC3, DTS, etc.).
|
||||||
|
// Disabled by default; enabling requires ffmpeg + ffprobe on PATH (or
|
||||||
|
// explicit paths via the library config).
|
||||||
|
type TranscodeConfig struct {
|
||||||
|
Enabled bool `toml:"enabled"` // master switch
|
||||||
|
HWAccel string `toml:"hw_accel"` // "auto" | "none" | "nvenc" | "qsv" | "vaapi" | "videotoolbox"
|
||||||
|
Preset string `toml:"preset"` // libx264 preset; "veryfast" by default
|
||||||
|
VideoBitrate string `toml:"video_bitrate"` // e.g. "5M"
|
||||||
|
AudioBitrate string `toml:"audio_bitrate"` // e.g. "192k"
|
||||||
|
MaxHeight int `toml:"max_height"` // optional downscale cap (e.g. 720)
|
||||||
|
MaxConcurrent int `toml:"max_concurrent"` // safety cap on simultaneous transcoder processes
|
||||||
}
|
}
|
||||||
|
|
||||||
// WebRTCConfig opts the daemon into acting as a WebTorrent peer so browsers
|
// WebRTCConfig opts the daemon into acting as a WebTorrent peer so browsers
|
||||||
|
|
@ -106,6 +121,14 @@ func Default() Config {
|
||||||
Trackers: []string{"wss://tracker.torrentclaw.com"},
|
Trackers: []string{"wss://tracker.torrentclaw.com"},
|
||||||
STUNServers: []string{"stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"},
|
STUNServers: []string{"stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"},
|
||||||
},
|
},
|
||||||
|
Transcode: TranscodeConfig{
|
||||||
|
Enabled: true,
|
||||||
|
HWAccel: "auto",
|
||||||
|
Preset: "veryfast",
|
||||||
|
VideoBitrate: "5M",
|
||||||
|
AudioBitrate: "192k",
|
||||||
|
MaxConcurrent: 2,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Organize: OrganizeConfig{
|
Organize: OrganizeConfig{
|
||||||
Enabled: true,
|
Enabled: true,
|
||||||
|
|
|
||||||
354
internal/engine/stream_source.go
Normal file
354
internal/engine/stream_source.go
Normal file
|
|
@ -0,0 +1,354 @@
|
||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// streamSource abstracts the byte source served over the WebRTC DataChannel.
|
||||||
|
// Two implementations:
|
||||||
|
// - diskFileSource — direct passthrough of the on-disk file.
|
||||||
|
// - transcodeSource — ffmpeg writes a fragmented MP4 to a temp file in
|
||||||
|
// real time; reads block briefly when callers ask for bytes ahead of
|
||||||
|
// the writer.
|
||||||
|
type streamSource interface {
|
||||||
|
ReadAt(p []byte, off int64) (int, error)
|
||||||
|
// Size returns the currently known size. For transcoded sources this
|
||||||
|
// grows as ffmpeg produces output; on Final() it's the final size.
|
||||||
|
Size() int64
|
||||||
|
// Final reports whether the source size is now stable (passthrough is
|
||||||
|
// always final, transcoder becomes final when ffmpeg exits).
|
||||||
|
Final() bool
|
||||||
|
// EstimatedSize returns the final size we expect to converge on. For
|
||||||
|
// passthrough it's the same as Size(). For transcoder it's a bitrate
|
||||||
|
// × duration estimate so the browser scrubber has something to anchor
|
||||||
|
// on; the real size will differ ±20%.
|
||||||
|
EstimatedSize() int64
|
||||||
|
FileName() string
|
||||||
|
Transcoded() bool
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
// disk passthrough
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
type diskFileSource struct {
|
||||||
|
f *os.File
|
||||||
|
size int64
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDiskFileSource(path string) (*diskFileSource, error) {
|
||||||
|
f, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("stream source: open %s: %w", path, err)
|
||||||
|
}
|
||||||
|
stat, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
f.Close()
|
||||||
|
return nil, fmt.Errorf("stream source: stat %s: %w", path, err)
|
||||||
|
}
|
||||||
|
return &diskFileSource{f: f, size: stat.Size(), name: filepath.Base(path)}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *diskFileSource) ReadAt(p []byte, off int64) (int, error) {
|
||||||
|
return d.f.ReadAt(p, off)
|
||||||
|
}
|
||||||
|
func (d *diskFileSource) Size() int64 { return d.size }
|
||||||
|
func (d *diskFileSource) Final() bool { return true }
|
||||||
|
func (d *diskFileSource) EstimatedSize() int64 { return d.size }
|
||||||
|
func (d *diskFileSource) FileName() string { return d.name }
|
||||||
|
func (d *diskFileSource) Transcoded() bool { return false }
|
||||||
|
func (d *diskFileSource) Close() error { return d.f.Close() }
|
||||||
|
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
// transcode source — ffmpeg → tmp file
|
||||||
|
// ─────────────────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
type transcodeSource struct {
|
||||||
|
tmpPath string
|
||||||
|
tmpFile *os.File
|
||||||
|
cmd *Transcoder
|
||||||
|
name string
|
||||||
|
estimate int64
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
cond *sync.Cond
|
||||||
|
size atomic.Int64
|
||||||
|
final atomic.Bool
|
||||||
|
failure error
|
||||||
|
startedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// readBlockTimeout caps how long ReadAt waits for bytes that haven't
|
||||||
|
// been transcoded yet before returning EOF/io.ErrUnexpectedEOF. The
|
||||||
|
// pump treats EOF as "respond with whatever we have so far + RangeEnd"
|
||||||
|
// so the browser can re-request once more bytes appear.
|
||||||
|
readBlockTimeout = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
func newTranscodeSource(
|
||||||
|
ctx context.Context,
|
||||||
|
srcPath string,
|
||||||
|
probe *StreamProbe,
|
||||||
|
action TranscodeAction,
|
||||||
|
opts TranscodeOpts,
|
||||||
|
displayName string,
|
||||||
|
) (*transcodeSource, error) {
|
||||||
|
tmpFile, err := os.CreateTemp("", "tc-stream-*.mp4")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("transcode source: tmp file: %w", err)
|
||||||
|
}
|
||||||
|
tmpPath := tmpFile.Name()
|
||||||
|
tmpFile.Close()
|
||||||
|
|
||||||
|
args := buildFFmpegArgs(srcPath, opts)
|
||||||
|
// Override -f mp4 pipe:1 with output to our tmp file path (last 3 args).
|
||||||
|
if len(args) >= 3 && args[len(args)-1] == "pipe:1" {
|
||||||
|
args[len(args)-1] = tmpPath
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn ffmpeg directly (not via NewTranscoder pipe) so it writes to
|
||||||
|
// disk in real time. We re-use the rest of TranscodeOpts wiring.
|
||||||
|
cmd := &Transcoder{}
|
||||||
|
cmd, err = startTranscoderToFile(ctx, opts.FFmpegPath, args, cmd)
|
||||||
|
if err != nil {
|
||||||
|
os.Remove(tmpPath)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
estimate := estimateOutputSize(probe, opts)
|
||||||
|
|
||||||
|
t := &transcodeSource{
|
||||||
|
tmpPath: tmpPath,
|
||||||
|
cmd: cmd,
|
||||||
|
name: displayName,
|
||||||
|
estimate: estimate,
|
||||||
|
startedAt: time.Now(),
|
||||||
|
}
|
||||||
|
t.cond = sync.NewCond(&t.mu)
|
||||||
|
|
||||||
|
// Re-open the tmp file for reading; ffmpeg keeps writing to it.
|
||||||
|
rf, err := os.Open(tmpPath)
|
||||||
|
if err != nil {
|
||||||
|
_ = cmd.Close()
|
||||||
|
os.Remove(tmpPath)
|
||||||
|
return nil, fmt.Errorf("transcode source: reopen tmp: %w", err)
|
||||||
|
}
|
||||||
|
t.tmpFile = rf
|
||||||
|
|
||||||
|
go t.watchSize(ctx)
|
||||||
|
go t.watchExit()
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// watchSize polls the temp file size every 200 ms and wakes any blocked
|
||||||
|
// ReadAt callers once new bytes arrive.
|
||||||
|
func (t *transcodeSource) watchSize(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(200 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.mu.Lock()
|
||||||
|
t.cond.Broadcast()
|
||||||
|
t.mu.Unlock()
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
}
|
||||||
|
if t.final.Load() {
|
||||||
|
t.mu.Lock()
|
||||||
|
t.cond.Broadcast()
|
||||||
|
t.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
stat, err := os.Stat(t.tmpPath)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
current := stat.Size()
|
||||||
|
if current > t.size.Load() {
|
||||||
|
t.size.Store(current)
|
||||||
|
t.mu.Lock()
|
||||||
|
t.cond.Broadcast()
|
||||||
|
t.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// watchExit waits for ffmpeg to exit and locks in the final size.
|
||||||
|
func (t *transcodeSource) watchExit() {
|
||||||
|
err := t.cmd.cmd.Wait()
|
||||||
|
if err != nil && !isExpectedExit(err) {
|
||||||
|
t.mu.Lock()
|
||||||
|
t.failure = fmt.Errorf("ffmpeg exited: %w (%s)", err, t.cmd.Stderr())
|
||||||
|
t.mu.Unlock()
|
||||||
|
}
|
||||||
|
if stat, err := os.Stat(t.tmpPath); err == nil {
|
||||||
|
t.size.Store(stat.Size())
|
||||||
|
}
|
||||||
|
t.final.Store(true)
|
||||||
|
t.mu.Lock()
|
||||||
|
t.cond.Broadcast()
|
||||||
|
t.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func isExpectedExit(err error) bool {
|
||||||
|
// Killed by Close() — pion DC closed, that's fine.
|
||||||
|
if err == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *transcodeSource) ReadAt(p []byte, off int64) (int, error) {
|
||||||
|
if t.failure != nil {
|
||||||
|
return 0, t.failure
|
||||||
|
}
|
||||||
|
if int64(len(p)) == 0 {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
deadline := time.Now().Add(readBlockTimeout)
|
||||||
|
|
||||||
|
for {
|
||||||
|
size := t.size.Load()
|
||||||
|
if off+int64(len(p)) <= size || t.final.Load() {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Need to wait for ffmpeg to write more.
|
||||||
|
t.mu.Lock()
|
||||||
|
// Check again under lock to avoid lost wakeup.
|
||||||
|
size = t.size.Load()
|
||||||
|
if off+int64(len(p)) <= size || t.final.Load() {
|
||||||
|
t.mu.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Wait with timeout via a small sleep loop — sync.Cond doesn't
|
||||||
|
// support timed wait, and a goroutine-per-sleep pattern works fine
|
||||||
|
// for our scale.
|
||||||
|
waited := time.NewTimer(500 * time.Millisecond)
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
t.cond.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
t.mu.Unlock()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-waited.C:
|
||||||
|
t.mu.Lock()
|
||||||
|
t.cond.Broadcast() // wake the goroutine so it can return
|
||||||
|
t.mu.Unlock()
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if t.failure != nil {
|
||||||
|
return 0, t.failure
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := t.tmpFile.ReadAt(p, off)
|
||||||
|
// On growing file ReadAt returns io.EOF when reading past current size.
|
||||||
|
// Convert to io.ErrUnexpectedEOF only when we actually exceeded the
|
||||||
|
// final size; otherwise return n, nil so the pump sends what we have.
|
||||||
|
if err == io.EOF && !t.final.Load() {
|
||||||
|
if n > 0 {
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
return 0, errors.New("transcode source: read timed out waiting for ffmpeg output")
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *transcodeSource) Size() int64 { return t.size.Load() }
|
||||||
|
func (t *transcodeSource) Final() bool { return t.final.Load() }
|
||||||
|
func (t *transcodeSource) EstimatedSize() int64 {
|
||||||
|
if t.final.Load() {
|
||||||
|
return t.size.Load()
|
||||||
|
}
|
||||||
|
return t.estimate
|
||||||
|
}
|
||||||
|
func (t *transcodeSource) FileName() string {
|
||||||
|
// Keep the original extension stripped — output is always fragmented MP4.
|
||||||
|
base := t.name
|
||||||
|
if i := lastIndexByte(base, '.'); i >= 0 {
|
||||||
|
base = base[:i]
|
||||||
|
}
|
||||||
|
return base + ".mp4"
|
||||||
|
}
|
||||||
|
func (t *transcodeSource) Transcoded() bool { return true }
|
||||||
|
func (t *transcodeSource) Close() error {
|
||||||
|
_ = t.cmd.Close()
|
||||||
|
if t.tmpFile != nil {
|
||||||
|
_ = t.tmpFile.Close()
|
||||||
|
}
|
||||||
|
if t.tmpPath != "" {
|
||||||
|
_ = os.Remove(t.tmpPath)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// estimateOutputSize converts probed bitrate × duration into a byte estimate
|
||||||
|
// so the browser scrubber has something to anchor on while transcoding.
|
||||||
|
func estimateOutputSize(probe *StreamProbe, opts TranscodeOpts) int64 {
|
||||||
|
if probe == nil || probe.DurationSec <= 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
videoKbps := parseBitrateKbps(opts.VideoBitrate, 5000)
|
||||||
|
audioKbps := parseBitrateKbps(opts.AudioBitrate, 192)
|
||||||
|
totalKbps := videoKbps + audioKbps
|
||||||
|
bytesPerSec := int64(totalKbps) * 1000 / 8
|
||||||
|
return int64(probe.DurationSec) * bytesPerSec
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseBitrateKbps converts ffmpeg-style bitrate strings ("5M", "192k") to
|
||||||
|
// kilobits per second. Unknown formats fall back to fallback.
|
||||||
|
func parseBitrateKbps(s string, fallback int) int {
|
||||||
|
if s == "" {
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
last := s[len(s)-1]
|
||||||
|
num := s
|
||||||
|
mult := 1
|
||||||
|
switch last {
|
||||||
|
case 'k', 'K':
|
||||||
|
num = s[:len(s)-1]
|
||||||
|
case 'M', 'm':
|
||||||
|
num = s[:len(s)-1]
|
||||||
|
mult = 1000
|
||||||
|
default:
|
||||||
|
// already in bps? treat as kbps
|
||||||
|
}
|
||||||
|
v := 0
|
||||||
|
for _, c := range num {
|
||||||
|
if c < '0' || c > '9' {
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
v = v*10 + int(c-'0')
|
||||||
|
}
|
||||||
|
if v == 0 {
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
return v * mult
|
||||||
|
}
|
||||||
|
|
||||||
|
func lastIndexByte(s string, c byte) int {
|
||||||
|
for i := len(s) - 1; i >= 0; i-- {
|
||||||
|
if s[i] == c {
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
@ -64,6 +64,26 @@ func NewTranscoder(ctx context.Context, filePath string, opts TranscodeOpts) (*T
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// startTranscoderToFile spawns ffmpeg with a pre-built argv where the last
|
||||||
|
// argument is an output file path (instead of pipe:1). Used by streamSource
|
||||||
|
// when we want random-access reads against a growing temp file rather than
|
||||||
|
// sequential pipe consumption.
|
||||||
|
func startTranscoderToFile(ctx context.Context, ffmpegPath string, args []string, t *Transcoder) (*Transcoder, error) {
|
||||||
|
if ffmpegPath == "" {
|
||||||
|
return nil, fmt.Errorf("transcoder: empty ffmpeg path")
|
||||||
|
}
|
||||||
|
cmd := exec.CommandContext(ctx, ffmpegPath, args...)
|
||||||
|
if t == nil {
|
||||||
|
t = &Transcoder{}
|
||||||
|
}
|
||||||
|
t.cmd = cmd
|
||||||
|
cmd.Stderr = &errWriter{t: t}
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
return nil, fmt.Errorf("transcoder: start ffmpeg: %w", err)
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Read implements io.Reader.
|
// Read implements io.Reader.
|
||||||
func (t *Transcoder) Read(p []byte) (int, error) { return t.out.Read(p) }
|
func (t *Transcoder) Read(p []byte) (int, error) { return t.out.Read(p) }
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
@ -62,6 +61,25 @@ type WebRTCStreamConfig struct {
|
||||||
Signal *agent.Client
|
Signal *agent.Client
|
||||||
// Logger receives diagnostic events; a nil logger swallows everything.
|
// Logger receives diagnostic events; a nil logger swallows everything.
|
||||||
Logger StreamLogger
|
Logger StreamLogger
|
||||||
|
// Transcode steers on-the-fly transcoding when source codecs are not
|
||||||
|
// browser-decodable (HEVC/AV1/AC3/DTS). Empty FFmpegPath disables it.
|
||||||
|
Transcode TranscodeRuntime
|
||||||
|
}
|
||||||
|
|
||||||
|
// TranscodeRuntime carries the resolved ffmpeg/ffprobe paths + tunables so
|
||||||
|
// each session can decide whether to passthrough or pipe through ffmpeg.
|
||||||
|
type TranscodeRuntime struct {
|
||||||
|
FFmpegPath string
|
||||||
|
FFprobePath string
|
||||||
|
HWAccel HWAccel
|
||||||
|
Preset string
|
||||||
|
VideoBitrate string
|
||||||
|
AudioBitrate string
|
||||||
|
MaxHeight int
|
||||||
|
// Disabled forces passthrough for every file even when codecs are not
|
||||||
|
// browser-friendly. Useful when the user explicitly turns transcoding
|
||||||
|
// off in config.
|
||||||
|
Disabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// StreamLogger is an injectable logger so tests can capture events.
|
// StreamLogger is an injectable logger so tests can capture events.
|
||||||
|
|
@ -84,6 +102,48 @@ func logger(l StreamLogger) StreamLogger {
|
||||||
return l
|
return l
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildStreamSource picks between passthrough and transcoded source. ffprobe
|
||||||
|
// failure or missing ffmpeg falls back to passthrough — the browser surfaces
|
||||||
|
// a clearer codec error than us refusing to start.
|
||||||
|
func buildStreamSource(
|
||||||
|
ctx context.Context,
|
||||||
|
abs string,
|
||||||
|
displayName string,
|
||||||
|
cfg WebRTCStreamConfig,
|
||||||
|
log StreamLogger,
|
||||||
|
) (streamSource, error) {
|
||||||
|
tc := cfg.Transcode
|
||||||
|
if tc.Disabled || tc.FFmpegPath == "" || tc.FFprobePath == "" {
|
||||||
|
return newDiskFileSource(abs)
|
||||||
|
}
|
||||||
|
|
||||||
|
probe, err := ProbeFile(ctx, tc.FFprobePath, abs)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("[wrtc %s] probe failed (%v) — passthrough", agent.ShortID(cfg.SessionID), err)
|
||||||
|
return newDiskFileSource(abs)
|
||||||
|
}
|
||||||
|
action := DecideAction(probe)
|
||||||
|
if action == ActionPassthrough {
|
||||||
|
log.Infof("[wrtc %s] codec passthrough (%s + %s in %s)",
|
||||||
|
agent.ShortID(cfg.SessionID), probe.VideoCodec, probe.AudioCodec, probe.Container)
|
||||||
|
return newDiskFileSource(abs)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("[wrtc %s] transcoding %s/%s/%s → h264+aac (%s)",
|
||||||
|
agent.ShortID(cfg.SessionID), probe.Container, probe.VideoCodec, probe.AudioCodec, action)
|
||||||
|
|
||||||
|
opts := TranscodeOpts{
|
||||||
|
Action: action,
|
||||||
|
HWAccel: tc.HWAccel,
|
||||||
|
Preset: tc.Preset,
|
||||||
|
VideoBitrate: tc.VideoBitrate,
|
||||||
|
AudioBitrate: tc.AudioBitrate,
|
||||||
|
MaxHeight: tc.MaxHeight,
|
||||||
|
FFmpegPath: tc.FFmpegPath,
|
||||||
|
}
|
||||||
|
return newTranscodeSource(ctx, abs, probe, action, opts, displayName)
|
||||||
|
}
|
||||||
|
|
||||||
// RunWebRTCStream blocks until the session ends — either the DataChannel
|
// RunWebRTCStream blocks until the session ends — either the DataChannel
|
||||||
// closes, the peer connection drops, or ctx is cancelled. Always returns a
|
// closes, the peer connection drops, or ctx is cancelled. Always returns a
|
||||||
// non-nil error explaining the termination reason.
|
// non-nil error explaining the termination reason.
|
||||||
|
|
@ -101,24 +161,20 @@ func RunWebRTCStream(ctx context.Context, cfg WebRTCStreamConfig) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("webrtc_stream: resolve path: %w", err)
|
return fmt.Errorf("webrtc_stream: resolve path: %w", err)
|
||||||
}
|
}
|
||||||
file, err := os.Open(abs)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("webrtc_stream: open file: %w", err)
|
|
||||||
}
|
|
||||||
defer file.Close()
|
|
||||||
|
|
||||||
stat, err := file.Stat()
|
displayName := cfg.FileName
|
||||||
|
if displayName == "" {
|
||||||
|
displayName = filepath.Base(abs)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decide passthrough vs transcoding. Probe is best-effort: if ffprobe
|
||||||
|
// is missing or fails we fall back to passthrough (the browser will
|
||||||
|
// surface a clearer error than us guessing wrong).
|
||||||
|
source, err := buildStreamSource(ctx, abs, displayName, cfg, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("webrtc_stream: stat: %w", err)
|
return fmt.Errorf("webrtc_stream: build source: %w", err)
|
||||||
}
|
|
||||||
fileSize := stat.Size()
|
|
||||||
if cfg.FileSize > 0 && cfg.FileSize != fileSize {
|
|
||||||
log.Warnf("webrtc_stream: declared size %d != actual %d", cfg.FileSize, fileSize)
|
|
||||||
}
|
|
||||||
fileName := cfg.FileName
|
|
||||||
if fileName == "" {
|
|
||||||
fileName = filepath.Base(abs)
|
|
||||||
}
|
}
|
||||||
|
defer source.Close()
|
||||||
|
|
||||||
// 1. Build PeerConnection.
|
// 1. Build PeerConnection.
|
||||||
api := webrtc.NewAPI()
|
api := webrtc.NewAPI()
|
||||||
|
|
@ -188,10 +244,17 @@ func RunWebRTCStream(ctx context.Context, cfg WebRTCStreamConfig) error {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// 2. Drive the SDP exchange.
|
// 2. Drive the SDP exchange. Any error from the loop (browser sent
|
||||||
|
// "bye", signal stream closed, etc.) cancels the session so we don't
|
||||||
|
// dangle on the DC waiting for a peer that's already gone.
|
||||||
sdpDone := make(chan error, 1)
|
sdpDone := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
sdpDone <- runSDPExchange(sessionCtx, pc, cfg)
|
err := runSDPExchange(sessionCtx, pc, cfg)
|
||||||
|
sdpDone <- err
|
||||||
|
if err != nil && sessionCtx.Err() == nil {
|
||||||
|
log.Infof("[wrtc %s] signal loop ended: %v", agent.ShortID(cfg.SessionID), err)
|
||||||
|
cancelSession()
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// 3. Wait for either SDP error or DataChannel open.
|
// 3. Wait for either SDP error or DataChannel open.
|
||||||
|
|
@ -217,7 +280,7 @@ func RunWebRTCStream(ctx context.Context, cfg WebRTCStreamConfig) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Wire up the data channel pump.
|
// 4. Wire up the data channel pump.
|
||||||
pump := newDataChannelPump(dc, file, fileSize, fileName, log, cancelSession)
|
pump := newDataChannelPump(dc, source, log, cancelSession)
|
||||||
dc.OnOpen(pump.onOpen)
|
dc.OnOpen(pump.onOpen)
|
||||||
dc.OnMessage(pump.onMessage)
|
dc.OnMessage(pump.onMessage)
|
||||||
dc.OnClose(func() {
|
dc.OnClose(func() {
|
||||||
|
|
@ -346,14 +409,12 @@ func handleSignal(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// dataChannelPump owns the DC + file handle and serves wire-protocol frames.
|
// dataChannelPump owns the DC + stream source and serves wire-protocol frames.
|
||||||
type dataChannelPump struct {
|
type dataChannelPump struct {
|
||||||
dc *webrtc.DataChannel
|
dc *webrtc.DataChannel
|
||||||
file *os.File
|
source streamSource
|
||||||
fileSize int64
|
log StreamLogger
|
||||||
fileName string
|
cancel context.CancelFunc
|
||||||
log StreamLogger
|
|
||||||
cancel context.CancelFunc
|
|
||||||
|
|
||||||
// Flow control: writers wait on resumeCh when bufferedAmount goes high.
|
// Flow control: writers wait on resumeCh when bufferedAmount goes high.
|
||||||
paused atomic.Bool
|
paused atomic.Bool
|
||||||
|
|
@ -372,17 +433,13 @@ type dataChannelPump struct {
|
||||||
|
|
||||||
func newDataChannelPump(
|
func newDataChannelPump(
|
||||||
dc *webrtc.DataChannel,
|
dc *webrtc.DataChannel,
|
||||||
file *os.File,
|
source streamSource,
|
||||||
fileSize int64,
|
|
||||||
fileName string,
|
|
||||||
log StreamLogger,
|
log StreamLogger,
|
||||||
cancel context.CancelFunc,
|
cancel context.CancelFunc,
|
||||||
) *dataChannelPump {
|
) *dataChannelPump {
|
||||||
p := &dataChannelPump{
|
p := &dataChannelPump{
|
||||||
dc: dc,
|
dc: dc,
|
||||||
file: file,
|
source: source,
|
||||||
fileSize: fileSize,
|
|
||||||
fileName: fileName,
|
|
||||||
log: log,
|
log: log,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
resumeCh: make(chan struct{}, 1),
|
resumeCh: make(chan struct{}, 1),
|
||||||
|
|
@ -395,16 +452,25 @@ func newDataChannelPump(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *dataChannelPump) onOpen() {
|
func (p *dataChannelPump) onOpen() {
|
||||||
|
// Use estimated size for transcoded streams so the browser scrubber has
|
||||||
|
// something to anchor on. Real size is reflected by Range responses as
|
||||||
|
// ffmpeg writes more bytes; the estimate just bootstraps the UI.
|
||||||
|
announceSize := p.source.EstimatedSize()
|
||||||
|
transcoding := p.source.Transcoded()
|
||||||
|
// Seekable=true even for transcoded sources because we read from a tmp
|
||||||
|
// file (random access). Seek backwards just works; seek forward beyond
|
||||||
|
// what ffmpeg has produced will block briefly inside ReadAt.
|
||||||
|
seekable := true
|
||||||
hello := wire.HelloPayload{
|
hello := wire.HelloPayload{
|
||||||
FileSize: uint64(p.fileSize),
|
FileSize: uint64(announceSize),
|
||||||
Transcoding: false,
|
Transcoding: transcoding,
|
||||||
Seekable: true,
|
Seekable: seekable,
|
||||||
FileName: p.fileName,
|
FileName: p.source.FileName(),
|
||||||
}
|
}
|
||||||
payload := wire.EncodeHello(hello)
|
payload := wire.EncodeHello(hello)
|
||||||
frame := wire.EncodeFrame(wire.Header{
|
frame := wire.EncodeFrame(wire.Header{
|
||||||
Type: wire.FrameHello,
|
Type: wire.FrameHello,
|
||||||
Flags: wire.HelloFlags(false, true),
|
Flags: wire.HelloFlags(transcoding, seekable),
|
||||||
StreamID: 0,
|
StreamID: 0,
|
||||||
Length: uint32(len(payload)),
|
Length: uint32(len(payload)),
|
||||||
}, payload)
|
}, payload)
|
||||||
|
|
@ -487,19 +553,42 @@ func (p *dataChannelPump) serveRange(streamID uint32, req wire.RangeReqPayload)
|
||||||
// Reject offsets above MaxInt64 — uint64→int64 narrowing would wrap to a
|
// Reject offsets above MaxInt64 — uint64→int64 narrowing would wrap to a
|
||||||
// negative value and bypass the bounds check, then ReadAt would be called
|
// negative value and bypass the bounds check, then ReadAt would be called
|
||||||
// with a negative offset.
|
// with a negative offset.
|
||||||
if req.Offset > math.MaxInt64 || int64(req.Offset) >= p.fileSize {
|
currentSize := p.source.Size()
|
||||||
|
finalSize := p.source.EstimatedSize()
|
||||||
|
if req.Offset > math.MaxInt64 {
|
||||||
p.sendRangeEnd(streamID, 2) // out of range
|
p.sendRangeEnd(streamID, 2) // out of range
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// For transcoded streams `currentSize` grows over time; only reject when
|
||||||
|
// the offset is past the *estimated* final size.
|
||||||
|
if int64(req.Offset) >= finalSize && p.source.Final() {
|
||||||
|
p.sendRangeEnd(streamID, 2)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
want := int64(req.Length)
|
want := int64(req.Length)
|
||||||
if req.Length > math.MaxInt64 {
|
if req.Length > math.MaxInt64 {
|
||||||
want = 0 // treat absurd length as "remainder of file"
|
want = 0 // treat absurd length as "remainder of file"
|
||||||
}
|
}
|
||||||
remaining := p.fileSize - int64(req.Offset)
|
// "Remainder" target: prefer current known size, fall back to estimate
|
||||||
|
// for transcoded streams so the browser can keep scrolling forward as
|
||||||
|
// ffmpeg produces output.
|
||||||
|
knownEnd := currentSize
|
||||||
|
if p.source.Final() {
|
||||||
|
knownEnd = finalSize
|
||||||
|
}
|
||||||
|
if knownEnd < int64(req.Offset) {
|
||||||
|
knownEnd = int64(req.Offset)
|
||||||
|
}
|
||||||
|
remaining := knownEnd - int64(req.Offset)
|
||||||
if want <= 0 || want > remaining {
|
if want <= 0 || want > remaining {
|
||||||
want = remaining
|
want = remaining
|
||||||
}
|
}
|
||||||
|
if want <= 0 {
|
||||||
|
// Nothing to serve right now (transcoder hasn't reached this offset).
|
||||||
|
p.sendRangeEnd(streamID, 0)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
p.activeMu.Lock()
|
p.activeMu.Lock()
|
||||||
|
|
@ -527,7 +616,7 @@ func (p *dataChannelPump) serveRange(streamID uint32, req wire.RangeReqPayload)
|
||||||
if end-offset < chunkLen {
|
if end-offset < chunkLen {
|
||||||
chunkLen = end - offset
|
chunkLen = end - offset
|
||||||
}
|
}
|
||||||
n, rerr := p.file.ReadAt(buf[:chunkLen], offset)
|
n, rerr := p.source.ReadAt(buf[:chunkLen], offset)
|
||||||
if n > 0 {
|
if n > 0 {
|
||||||
// EOF on a short read means this is the final chunk — flag it so the
|
// EOF on a short read means this is the final chunk — flag it so the
|
||||||
// browser doesn't wait for more data before processing RangeEnd.
|
// browser doesn't wait for more data before processing RangeEnd.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue