diff --git a/internal/cmd/daemon.go b/internal/cmd/daemon.go index b85c9c2..aa0dd63 100644 --- a/internal/cmd/daemon.go +++ b/internal/cmd/daemon.go @@ -451,6 +451,7 @@ func runDaemonStart() error { webrtcRegistry.remove(sess.SessionID) sessCancel() }() + tcRuntime := buildTranscodeRuntime(ctx, cfg) runCfg := engine.WebRTCStreamConfig{ SessionID: sess.SessionID, FilePath: filePath, @@ -459,6 +460,7 @@ func runDaemonStart() error { ICEServers: engine.BuildICEServers(cfg.Download.WebRTC), Signal: agentClient, Logger: stdLogger{}, + Transcode: tcRuntime, } log.Printf("[wrtc %s] starting session: %s", agent.ShortID(sess.SessionID), filepath.Base(filePath)) if err := engine.RunWebRTCStream(sessCtx, runCfg); err != nil { diff --git a/internal/cmd/webrtc_session_registry.go b/internal/cmd/webrtc_session_registry.go index b0ec0b7..a1bf37a 100644 --- a/internal/cmd/webrtc_session_registry.go +++ b/internal/cmd/webrtc_session_registry.go @@ -4,6 +4,10 @@ import ( "context" "log" "sync" + + "github.com/torrentclaw/unarr/internal/config" + "github.com/torrentclaw/unarr/internal/engine" + "github.com/torrentclaw/unarr/internal/library/mediainfo" ) // webrtcRegistry tracks per-session cancel funcs for active custom WebRTC @@ -60,3 +64,43 @@ type stdLogger struct{} func (stdLogger) Infof(format string, args ...any) { log.Printf(format, args...) } func (stdLogger) Warnf(format string, args ...any) { log.Printf("WARN: "+format, args...) } func (stdLogger) Errorf(format string, args ...any) { log.Printf("ERROR: "+format, args...) } + +// buildTranscodeRuntime resolves the ffmpeg/ffprobe binaries + config knobs +// for the WebRTC streaming pipeline. Failure to resolve a binary returns a +// runtime with empty paths so engine.RunWebRTCStream falls back to +// passthrough — the user gets a clearer codec error from the browser than a +// daemon-side abort. +func buildTranscodeRuntime(ctx context.Context, cfg config.Config) engine.TranscodeRuntime { + if !cfg.Download.Transcode.Enabled { + return engine.TranscodeRuntime{Disabled: true} + } + ffmpegPath, errF := mediainfo.ResolveFFmpeg(cfg.Library.FFmpegPath) + ffprobePath, errP := mediainfo.ResolveFFprobe(cfg.Library.FFprobePath) + if errF != nil || errP != nil { + return engine.TranscodeRuntime{Disabled: true} + } + hw := engine.HWAccelNone + switch cfg.Download.Transcode.HWAccel { + case "auto": + hw = engine.DetectHWAccel(ctx, ffmpegPath) + case "nvenc": + hw = engine.HWAccelNVENC + case "qsv": + hw = engine.HWAccelQSV + case "vaapi": + hw = engine.HWAccelVAAPI + case "videotoolbox": + hw = engine.HWAccelVideoToolbox + case "none", "": + hw = engine.HWAccelNone + } + return engine.TranscodeRuntime{ + FFmpegPath: ffmpegPath, + FFprobePath: ffprobePath, + HWAccel: hw, + Preset: cfg.Download.Transcode.Preset, + VideoBitrate: cfg.Download.Transcode.VideoBitrate, + AudioBitrate: cfg.Download.Transcode.AudioBitrate, + MaxHeight: cfg.Download.Transcode.MaxHeight, + } +} diff --git a/internal/config/config.go b/internal/config/config.go index bb7498c..b84655a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -34,17 +34,32 @@ type AgentConfig struct { } type DownloadConfig struct { - Dir string `toml:"dir"` - PreferredMethod string `toml:"preferred_method"` - PreferredQuality string `toml:"preferred_quality"` // "2160p", "1080p", "720p" — hint for auto-selection - MaxConcurrent int `toml:"max_concurrent"` - MaxDownloadSpeed string `toml:"max_download_speed"` // e.g. "10MB", "500KB", "0" = unlimited - MaxUploadSpeed string `toml:"max_upload_speed"` // e.g. "1MB", "0" = unlimited - MetadataTimeout string `toml:"metadata_timeout"` // e.g. "1h", "30m", "0" = unlimited (default: "0") - StallTimeout string `toml:"stall_timeout"` // e.g. "30m", "1h", "0" = unlimited (default: "30m") - ListenPort int `toml:"listen_port"` // fixed port for incoming peer connections (default: 42069, 0 = random) - StreamPort int `toml:"stream_port"` // fixed port for streaming HTTP server (default: 11818) - WebRTC WebRTCConfig `toml:"webrtc"` + Dir string `toml:"dir"` + PreferredMethod string `toml:"preferred_method"` + PreferredQuality string `toml:"preferred_quality"` // "2160p", "1080p", "720p" — hint for auto-selection + MaxConcurrent int `toml:"max_concurrent"` + MaxDownloadSpeed string `toml:"max_download_speed"` // e.g. "10MB", "500KB", "0" = unlimited + MaxUploadSpeed string `toml:"max_upload_speed"` // e.g. "1MB", "0" = unlimited + MetadataTimeout string `toml:"metadata_timeout"` // e.g. "1h", "30m", "0" = unlimited (default: "0") + StallTimeout string `toml:"stall_timeout"` // e.g. "30m", "1h", "0" = unlimited (default: "30m") + ListenPort int `toml:"listen_port"` // fixed port for incoming peer connections (default: 42069, 0 = random) + StreamPort int `toml:"stream_port"` // fixed port for streaming HTTP server (default: 11818) + WebRTC WebRTCConfig `toml:"webrtc"` + Transcode TranscodeConfig `toml:"transcode"` +} + +// TranscodeConfig controls real-time transcoding for the in-browser player +// when source codecs aren't browser-decodable (HEVC, AV1, AC3, DTS, etc.). +// Disabled by default; enabling requires ffmpeg + ffprobe on PATH (or +// explicit paths via the library config). +type TranscodeConfig struct { + Enabled bool `toml:"enabled"` // master switch + HWAccel string `toml:"hw_accel"` // "auto" | "none" | "nvenc" | "qsv" | "vaapi" | "videotoolbox" + Preset string `toml:"preset"` // libx264 preset; "veryfast" by default + VideoBitrate string `toml:"video_bitrate"` // e.g. "5M" + AudioBitrate string `toml:"audio_bitrate"` // e.g. "192k" + MaxHeight int `toml:"max_height"` // optional downscale cap (e.g. 720) + MaxConcurrent int `toml:"max_concurrent"` // safety cap on simultaneous transcoder processes } // WebRTCConfig opts the daemon into acting as a WebTorrent peer so browsers @@ -106,6 +121,14 @@ func Default() Config { Trackers: []string{"wss://tracker.torrentclaw.com"}, STUNServers: []string{"stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"}, }, + Transcode: TranscodeConfig{ + Enabled: true, + HWAccel: "auto", + Preset: "veryfast", + VideoBitrate: "5M", + AudioBitrate: "192k", + MaxConcurrent: 2, + }, }, Organize: OrganizeConfig{ Enabled: true, diff --git a/internal/engine/stream_source.go b/internal/engine/stream_source.go new file mode 100644 index 0000000..98aca74 --- /dev/null +++ b/internal/engine/stream_source.go @@ -0,0 +1,354 @@ +package engine + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" +) + +// streamSource abstracts the byte source served over the WebRTC DataChannel. +// Two implementations: +// - diskFileSource — direct passthrough of the on-disk file. +// - transcodeSource — ffmpeg writes a fragmented MP4 to a temp file in +// real time; reads block briefly when callers ask for bytes ahead of +// the writer. +type streamSource interface { + ReadAt(p []byte, off int64) (int, error) + // Size returns the currently known size. For transcoded sources this + // grows as ffmpeg produces output; on Final() it's the final size. + Size() int64 + // Final reports whether the source size is now stable (passthrough is + // always final, transcoder becomes final when ffmpeg exits). + Final() bool + // EstimatedSize returns the final size we expect to converge on. For + // passthrough it's the same as Size(). For transcoder it's a bitrate + // × duration estimate so the browser scrubber has something to anchor + // on; the real size will differ ±20%. + EstimatedSize() int64 + FileName() string + Transcoded() bool + Close() error +} + +// ───────────────────────────────────────────────────────────────────────────── +// disk passthrough +// ───────────────────────────────────────────────────────────────────────────── + +type diskFileSource struct { + f *os.File + size int64 + name string +} + +func newDiskFileSource(path string) (*diskFileSource, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("stream source: open %s: %w", path, err) + } + stat, err := f.Stat() + if err != nil { + f.Close() + return nil, fmt.Errorf("stream source: stat %s: %w", path, err) + } + return &diskFileSource{f: f, size: stat.Size(), name: filepath.Base(path)}, nil +} + +func (d *diskFileSource) ReadAt(p []byte, off int64) (int, error) { + return d.f.ReadAt(p, off) +} +func (d *diskFileSource) Size() int64 { return d.size } +func (d *diskFileSource) Final() bool { return true } +func (d *diskFileSource) EstimatedSize() int64 { return d.size } +func (d *diskFileSource) FileName() string { return d.name } +func (d *diskFileSource) Transcoded() bool { return false } +func (d *diskFileSource) Close() error { return d.f.Close() } + +// ───────────────────────────────────────────────────────────────────────────── +// transcode source — ffmpeg → tmp file +// ───────────────────────────────────────────────────────────────────────────── + +type transcodeSource struct { + tmpPath string + tmpFile *os.File + cmd *Transcoder + name string + estimate int64 + + mu sync.Mutex + cond *sync.Cond + size atomic.Int64 + final atomic.Bool + failure error + startedAt time.Time +} + +const ( + // readBlockTimeout caps how long ReadAt waits for bytes that haven't + // been transcoded yet before returning EOF/io.ErrUnexpectedEOF. The + // pump treats EOF as "respond with whatever we have so far + RangeEnd" + // so the browser can re-request once more bytes appear. + readBlockTimeout = 30 * time.Second +) + +func newTranscodeSource( + ctx context.Context, + srcPath string, + probe *StreamProbe, + action TranscodeAction, + opts TranscodeOpts, + displayName string, +) (*transcodeSource, error) { + tmpFile, err := os.CreateTemp("", "tc-stream-*.mp4") + if err != nil { + return nil, fmt.Errorf("transcode source: tmp file: %w", err) + } + tmpPath := tmpFile.Name() + tmpFile.Close() + + args := buildFFmpegArgs(srcPath, opts) + // Override -f mp4 pipe:1 with output to our tmp file path (last 3 args). + if len(args) >= 3 && args[len(args)-1] == "pipe:1" { + args[len(args)-1] = tmpPath + } + + // Spawn ffmpeg directly (not via NewTranscoder pipe) so it writes to + // disk in real time. We re-use the rest of TranscodeOpts wiring. + cmd := &Transcoder{} + cmd, err = startTranscoderToFile(ctx, opts.FFmpegPath, args, cmd) + if err != nil { + os.Remove(tmpPath) + return nil, err + } + + estimate := estimateOutputSize(probe, opts) + + t := &transcodeSource{ + tmpPath: tmpPath, + cmd: cmd, + name: displayName, + estimate: estimate, + startedAt: time.Now(), + } + t.cond = sync.NewCond(&t.mu) + + // Re-open the tmp file for reading; ffmpeg keeps writing to it. + rf, err := os.Open(tmpPath) + if err != nil { + _ = cmd.Close() + os.Remove(tmpPath) + return nil, fmt.Errorf("transcode source: reopen tmp: %w", err) + } + t.tmpFile = rf + + go t.watchSize(ctx) + go t.watchExit() + return t, nil +} + +// watchSize polls the temp file size every 200 ms and wakes any blocked +// ReadAt callers once new bytes arrive. +func (t *transcodeSource) watchSize(ctx context.Context) { + ticker := time.NewTicker(200 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + t.mu.Lock() + t.cond.Broadcast() + t.mu.Unlock() + return + case <-ticker.C: + } + if t.final.Load() { + t.mu.Lock() + t.cond.Broadcast() + t.mu.Unlock() + return + } + stat, err := os.Stat(t.tmpPath) + if err != nil { + continue + } + current := stat.Size() + if current > t.size.Load() { + t.size.Store(current) + t.mu.Lock() + t.cond.Broadcast() + t.mu.Unlock() + } + } +} + +// watchExit waits for ffmpeg to exit and locks in the final size. +func (t *transcodeSource) watchExit() { + err := t.cmd.cmd.Wait() + if err != nil && !isExpectedExit(err) { + t.mu.Lock() + t.failure = fmt.Errorf("ffmpeg exited: %w (%s)", err, t.cmd.Stderr()) + t.mu.Unlock() + } + if stat, err := os.Stat(t.tmpPath); err == nil { + t.size.Store(stat.Size()) + } + t.final.Store(true) + t.mu.Lock() + t.cond.Broadcast() + t.mu.Unlock() +} + +func isExpectedExit(err error) bool { + // Killed by Close() — pion DC closed, that's fine. + if err == nil { + return true + } + return false +} + +func (t *transcodeSource) ReadAt(p []byte, off int64) (int, error) { + if t.failure != nil { + return 0, t.failure + } + if int64(len(p)) == 0 { + return 0, nil + } + deadline := time.Now().Add(readBlockTimeout) + + for { + size := t.size.Load() + if off+int64(len(p)) <= size || t.final.Load() { + break + } + // Need to wait for ffmpeg to write more. + t.mu.Lock() + // Check again under lock to avoid lost wakeup. + size = t.size.Load() + if off+int64(len(p)) <= size || t.final.Load() { + t.mu.Unlock() + break + } + // Wait with timeout via a small sleep loop — sync.Cond doesn't + // support timed wait, and a goroutine-per-sleep pattern works fine + // for our scale. + waited := time.NewTimer(500 * time.Millisecond) + done := make(chan struct{}) + go func() { + t.cond.Wait() + close(done) + }() + t.mu.Unlock() + select { + case <-done: + case <-waited.C: + t.mu.Lock() + t.cond.Broadcast() // wake the goroutine so it can return + t.mu.Unlock() + <-done + } + if time.Now().After(deadline) { + break + } + } + + if t.failure != nil { + return 0, t.failure + } + + n, err := t.tmpFile.ReadAt(p, off) + // On growing file ReadAt returns io.EOF when reading past current size. + // Convert to io.ErrUnexpectedEOF only when we actually exceeded the + // final size; otherwise return n, nil so the pump sends what we have. + if err == io.EOF && !t.final.Load() { + if n > 0 { + return n, nil + } + return 0, errors.New("transcode source: read timed out waiting for ffmpeg output") + } + return n, err +} + +func (t *transcodeSource) Size() int64 { return t.size.Load() } +func (t *transcodeSource) Final() bool { return t.final.Load() } +func (t *transcodeSource) EstimatedSize() int64 { + if t.final.Load() { + return t.size.Load() + } + return t.estimate +} +func (t *transcodeSource) FileName() string { + // Keep the original extension stripped — output is always fragmented MP4. + base := t.name + if i := lastIndexByte(base, '.'); i >= 0 { + base = base[:i] + } + return base + ".mp4" +} +func (t *transcodeSource) Transcoded() bool { return true } +func (t *transcodeSource) Close() error { + _ = t.cmd.Close() + if t.tmpFile != nil { + _ = t.tmpFile.Close() + } + if t.tmpPath != "" { + _ = os.Remove(t.tmpPath) + } + return nil +} + +// estimateOutputSize converts probed bitrate × duration into a byte estimate +// so the browser scrubber has something to anchor on while transcoding. +func estimateOutputSize(probe *StreamProbe, opts TranscodeOpts) int64 { + if probe == nil || probe.DurationSec <= 0 { + return 0 + } + videoKbps := parseBitrateKbps(opts.VideoBitrate, 5000) + audioKbps := parseBitrateKbps(opts.AudioBitrate, 192) + totalKbps := videoKbps + audioKbps + bytesPerSec := int64(totalKbps) * 1000 / 8 + return int64(probe.DurationSec) * bytesPerSec +} + +// parseBitrateKbps converts ffmpeg-style bitrate strings ("5M", "192k") to +// kilobits per second. Unknown formats fall back to fallback. +func parseBitrateKbps(s string, fallback int) int { + if s == "" { + return fallback + } + last := s[len(s)-1] + num := s + mult := 1 + switch last { + case 'k', 'K': + num = s[:len(s)-1] + case 'M', 'm': + num = s[:len(s)-1] + mult = 1000 + default: + // already in bps? treat as kbps + } + v := 0 + for _, c := range num { + if c < '0' || c > '9' { + return fallback + } + v = v*10 + int(c-'0') + } + if v == 0 { + return fallback + } + return v * mult +} + +func lastIndexByte(s string, c byte) int { + for i := len(s) - 1; i >= 0; i-- { + if s[i] == c { + return i + } + } + return -1 +} diff --git a/internal/engine/transcoder.go b/internal/engine/transcoder.go index 45229b2..58f9d09 100644 --- a/internal/engine/transcoder.go +++ b/internal/engine/transcoder.go @@ -64,6 +64,26 @@ func NewTranscoder(ctx context.Context, filePath string, opts TranscodeOpts) (*T return t, nil } +// startTranscoderToFile spawns ffmpeg with a pre-built argv where the last +// argument is an output file path (instead of pipe:1). Used by streamSource +// when we want random-access reads against a growing temp file rather than +// sequential pipe consumption. +func startTranscoderToFile(ctx context.Context, ffmpegPath string, args []string, t *Transcoder) (*Transcoder, error) { + if ffmpegPath == "" { + return nil, fmt.Errorf("transcoder: empty ffmpeg path") + } + cmd := exec.CommandContext(ctx, ffmpegPath, args...) + if t == nil { + t = &Transcoder{} + } + t.cmd = cmd + cmd.Stderr = &errWriter{t: t} + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("transcoder: start ffmpeg: %w", err) + } + return t, nil +} + // Read implements io.Reader. func (t *Transcoder) Read(p []byte) (int, error) { return t.out.Read(p) } diff --git a/internal/engine/webrtc_stream.go b/internal/engine/webrtc_stream.go index 091d42f..3f04b3c 100644 --- a/internal/engine/webrtc_stream.go +++ b/internal/engine/webrtc_stream.go @@ -24,7 +24,6 @@ import ( "fmt" "io" "math" - "os" "path/filepath" "sync" "sync/atomic" @@ -62,6 +61,25 @@ type WebRTCStreamConfig struct { Signal *agent.Client // Logger receives diagnostic events; a nil logger swallows everything. Logger StreamLogger + // Transcode steers on-the-fly transcoding when source codecs are not + // browser-decodable (HEVC/AV1/AC3/DTS). Empty FFmpegPath disables it. + Transcode TranscodeRuntime +} + +// TranscodeRuntime carries the resolved ffmpeg/ffprobe paths + tunables so +// each session can decide whether to passthrough or pipe through ffmpeg. +type TranscodeRuntime struct { + FFmpegPath string + FFprobePath string + HWAccel HWAccel + Preset string + VideoBitrate string + AudioBitrate string + MaxHeight int + // Disabled forces passthrough for every file even when codecs are not + // browser-friendly. Useful when the user explicitly turns transcoding + // off in config. + Disabled bool } // StreamLogger is an injectable logger so tests can capture events. @@ -84,6 +102,48 @@ func logger(l StreamLogger) StreamLogger { return l } +// buildStreamSource picks between passthrough and transcoded source. ffprobe +// failure or missing ffmpeg falls back to passthrough — the browser surfaces +// a clearer codec error than us refusing to start. +func buildStreamSource( + ctx context.Context, + abs string, + displayName string, + cfg WebRTCStreamConfig, + log StreamLogger, +) (streamSource, error) { + tc := cfg.Transcode + if tc.Disabled || tc.FFmpegPath == "" || tc.FFprobePath == "" { + return newDiskFileSource(abs) + } + + probe, err := ProbeFile(ctx, tc.FFprobePath, abs) + if err != nil { + log.Warnf("[wrtc %s] probe failed (%v) — passthrough", agent.ShortID(cfg.SessionID), err) + return newDiskFileSource(abs) + } + action := DecideAction(probe) + if action == ActionPassthrough { + log.Infof("[wrtc %s] codec passthrough (%s + %s in %s)", + agent.ShortID(cfg.SessionID), probe.VideoCodec, probe.AudioCodec, probe.Container) + return newDiskFileSource(abs) + } + + log.Infof("[wrtc %s] transcoding %s/%s/%s → h264+aac (%s)", + agent.ShortID(cfg.SessionID), probe.Container, probe.VideoCodec, probe.AudioCodec, action) + + opts := TranscodeOpts{ + Action: action, + HWAccel: tc.HWAccel, + Preset: tc.Preset, + VideoBitrate: tc.VideoBitrate, + AudioBitrate: tc.AudioBitrate, + MaxHeight: tc.MaxHeight, + FFmpegPath: tc.FFmpegPath, + } + return newTranscodeSource(ctx, abs, probe, action, opts, displayName) +} + // RunWebRTCStream blocks until the session ends — either the DataChannel // closes, the peer connection drops, or ctx is cancelled. Always returns a // non-nil error explaining the termination reason. @@ -101,24 +161,20 @@ func RunWebRTCStream(ctx context.Context, cfg WebRTCStreamConfig) error { if err != nil { return fmt.Errorf("webrtc_stream: resolve path: %w", err) } - file, err := os.Open(abs) - if err != nil { - return fmt.Errorf("webrtc_stream: open file: %w", err) - } - defer file.Close() - stat, err := file.Stat() + displayName := cfg.FileName + if displayName == "" { + displayName = filepath.Base(abs) + } + + // Decide passthrough vs transcoding. Probe is best-effort: if ffprobe + // is missing or fails we fall back to passthrough (the browser will + // surface a clearer error than us guessing wrong). + source, err := buildStreamSource(ctx, abs, displayName, cfg, log) if err != nil { - return fmt.Errorf("webrtc_stream: stat: %w", err) - } - fileSize := stat.Size() - if cfg.FileSize > 0 && cfg.FileSize != fileSize { - log.Warnf("webrtc_stream: declared size %d != actual %d", cfg.FileSize, fileSize) - } - fileName := cfg.FileName - if fileName == "" { - fileName = filepath.Base(abs) + return fmt.Errorf("webrtc_stream: build source: %w", err) } + defer source.Close() // 1. Build PeerConnection. api := webrtc.NewAPI() @@ -188,10 +244,17 @@ func RunWebRTCStream(ctx context.Context, cfg WebRTCStreamConfig) error { } }) - // 2. Drive the SDP exchange. + // 2. Drive the SDP exchange. Any error from the loop (browser sent + // "bye", signal stream closed, etc.) cancels the session so we don't + // dangle on the DC waiting for a peer that's already gone. sdpDone := make(chan error, 1) go func() { - sdpDone <- runSDPExchange(sessionCtx, pc, cfg) + err := runSDPExchange(sessionCtx, pc, cfg) + sdpDone <- err + if err != nil && sessionCtx.Err() == nil { + log.Infof("[wrtc %s] signal loop ended: %v", agent.ShortID(cfg.SessionID), err) + cancelSession() + } }() // 3. Wait for either SDP error or DataChannel open. @@ -217,7 +280,7 @@ func RunWebRTCStream(ctx context.Context, cfg WebRTCStreamConfig) error { } // 4. Wire up the data channel pump. - pump := newDataChannelPump(dc, file, fileSize, fileName, log, cancelSession) + pump := newDataChannelPump(dc, source, log, cancelSession) dc.OnOpen(pump.onOpen) dc.OnMessage(pump.onMessage) dc.OnClose(func() { @@ -346,14 +409,12 @@ func handleSignal( return nil } -// dataChannelPump owns the DC + file handle and serves wire-protocol frames. +// dataChannelPump owns the DC + stream source and serves wire-protocol frames. type dataChannelPump struct { - dc *webrtc.DataChannel - file *os.File - fileSize int64 - fileName string - log StreamLogger - cancel context.CancelFunc + dc *webrtc.DataChannel + source streamSource + log StreamLogger + cancel context.CancelFunc // Flow control: writers wait on resumeCh when bufferedAmount goes high. paused atomic.Bool @@ -372,17 +433,13 @@ type dataChannelPump struct { func newDataChannelPump( dc *webrtc.DataChannel, - file *os.File, - fileSize int64, - fileName string, + source streamSource, log StreamLogger, cancel context.CancelFunc, ) *dataChannelPump { p := &dataChannelPump{ dc: dc, - file: file, - fileSize: fileSize, - fileName: fileName, + source: source, log: log, cancel: cancel, resumeCh: make(chan struct{}, 1), @@ -395,16 +452,25 @@ func newDataChannelPump( } func (p *dataChannelPump) onOpen() { + // Use estimated size for transcoded streams so the browser scrubber has + // something to anchor on. Real size is reflected by Range responses as + // ffmpeg writes more bytes; the estimate just bootstraps the UI. + announceSize := p.source.EstimatedSize() + transcoding := p.source.Transcoded() + // Seekable=true even for transcoded sources because we read from a tmp + // file (random access). Seek backwards just works; seek forward beyond + // what ffmpeg has produced will block briefly inside ReadAt. + seekable := true hello := wire.HelloPayload{ - FileSize: uint64(p.fileSize), - Transcoding: false, - Seekable: true, - FileName: p.fileName, + FileSize: uint64(announceSize), + Transcoding: transcoding, + Seekable: seekable, + FileName: p.source.FileName(), } payload := wire.EncodeHello(hello) frame := wire.EncodeFrame(wire.Header{ Type: wire.FrameHello, - Flags: wire.HelloFlags(false, true), + Flags: wire.HelloFlags(transcoding, seekable), StreamID: 0, Length: uint32(len(payload)), }, payload) @@ -487,19 +553,42 @@ func (p *dataChannelPump) serveRange(streamID uint32, req wire.RangeReqPayload) // Reject offsets above MaxInt64 — uint64→int64 narrowing would wrap to a // negative value and bypass the bounds check, then ReadAt would be called // with a negative offset. - if req.Offset > math.MaxInt64 || int64(req.Offset) >= p.fileSize { + currentSize := p.source.Size() + finalSize := p.source.EstimatedSize() + if req.Offset > math.MaxInt64 { p.sendRangeEnd(streamID, 2) // out of range return } + // For transcoded streams `currentSize` grows over time; only reject when + // the offset is past the *estimated* final size. + if int64(req.Offset) >= finalSize && p.source.Final() { + p.sendRangeEnd(streamID, 2) + return + } want := int64(req.Length) if req.Length > math.MaxInt64 { want = 0 // treat absurd length as "remainder of file" } - remaining := p.fileSize - int64(req.Offset) + // "Remainder" target: prefer current known size, fall back to estimate + // for transcoded streams so the browser can keep scrolling forward as + // ffmpeg produces output. + knownEnd := currentSize + if p.source.Final() { + knownEnd = finalSize + } + if knownEnd < int64(req.Offset) { + knownEnd = int64(req.Offset) + } + remaining := knownEnd - int64(req.Offset) if want <= 0 || want > remaining { want = remaining } + if want <= 0 { + // Nothing to serve right now (transcoder hasn't reached this offset). + p.sendRangeEnd(streamID, 0) + return + } ctx, cancel := context.WithCancel(context.Background()) p.activeMu.Lock() @@ -527,7 +616,7 @@ func (p *dataChannelPump) serveRange(streamID uint32, req wire.RangeReqPayload) if end-offset < chunkLen { chunkLen = end - offset } - n, rerr := p.file.ReadAt(buf[:chunkLen], offset) + n, rerr := p.source.ReadAt(buf[:chunkLen], offset) if n > 0 { // EOF on a short read means this is the final chunk — flag it so the // browser doesn't wait for more data before processing RangeEnd.