diff --git a/internal/engine/stream_server.go b/internal/engine/stream_server.go index 35bf613..492bf7a 100644 --- a/internal/engine/stream_server.go +++ b/internal/engine/stream_server.go @@ -53,8 +53,12 @@ type StreamServer struct { disableUPnP bool lastActivity atomic.Int64 - maxByteOffset atomic.Int64 + maxByteOffset atomic.Int64 // highest sequential read position (main playback connection) totalFileSize atomic.Int64 + bitrateBps atomic.Int64 // video bitrate in bits/sec (from ffprobe, 0 = unknown) + durationSec atomic.Int64 // video duration in seconds (from ffprobe, 0 = unknown) + topReaderID atomic.Int64 // ID of the reader that set maxByteOffset (only it can advance it) + readerCounter atomic.Int64 // monotonic counter for assigning reader IDs } // NewStreamServer creates a stream server bound to the given port. @@ -153,6 +157,23 @@ func (ss *StreamServer) SetFile(provider FileProvider, taskID string) { ss.totalFileSize.Store(provider.FileSize()) ss.lastActivity.Store(time.Now().UnixNano()) ss.maxByteOffset.Store(0) + ss.topReaderID.Store(0) + ss.bitrateBps.Store(0) + ss.durationSec.Store(0) + + // Probe bitrate + duration synchronously so rate-limiting and duration + // are available before the first HTTP request arrives. + if dp, ok := provider.(*diskFileProvider); ok { + pm := probeMediaInfo(dp.path) + if pm.bitrateBps > 0 { + ss.bitrateBps.Store(pm.bitrateBps) + log.Printf("[stream] detected bitrate: %.1f Mbps → throttle at %.1f Mbps", + float64(pm.bitrateBps)/1e6, float64(pm.bitrateBps)*2/1e6) + } + if pm.durationSec > 0 { + ss.durationSec.Store(pm.durationSec) + } + } } // ClearFile stops serving any file. Subsequent requests return 404. @@ -163,6 +184,9 @@ func (ss *StreamServer) ClearFile() { ss.mu.Unlock() ss.totalFileSize.Store(0) ss.maxByteOffset.Store(0) + ss.topReaderID.Store(0) + ss.bitrateBps.Store(0) + ss.durationSec.Store(0) } // CurrentTaskID returns the task ID of the file currently being served. @@ -213,18 +237,6 @@ func (ss *StreamServer) Shutdown(ctx context.Context) error { func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { ss.lastActivity.Store(time.Now().UnixNano()) - // Track Range header for watch progress estimation - if rangeHeader := r.Header.Get("Range"); rangeHeader != "" { - if start := parseRangeStart(rangeHeader); start >= 0 { - for { - cur := ss.maxByteOffset.Load() - if start <= cur || ss.maxByteOffset.CompareAndSwap(cur, start) { - break - } - } - } - } - // Get current provider (may be nil if no file is being served) ss.mu.RLock() provider := ss.provider @@ -248,12 +260,34 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { } } - reader := provider.NewFileReader(r.Context()) - if reader == nil { + rawReader := provider.NewFileReader(r.Context()) + if rawReader == nil { http.Error(w, "file not found", http.StatusNotFound) return } - defer reader.Close() + defer rawReader.Close() + + // Wrap reader to track bytes read for progress estimation + rate limit. + // Rate limiting at ~2x bitrate ensures VLC can't download far ahead of + // playback, so bytes-read ≈ playback position (like Netflix/YouTube). + bps := ss.bitrateBps.Load() + var bytesPerSec int64 + if bps > 0 { + bytesPerSec = bps / 8 * 2 // 2x bitrate in bytes/sec + } + var burstSize int64 + if bytesPerSec > 0 { + burstSize = bytesPerSec * 30 + } + reader := &trackingReader{ + inner: rawReader, + server: ss, + id: ss.readerCounter.Add(1), + bytesPerSec: bytesPerSec, + burstSize: burstSize, + tokens: burstSize, + lastFill: time.Now(), + } w.Header().Set("Content-Type", mimeTypeFromExt(provider.FileName())) // "inline" for play requests (VLC/mpv), "attachment" for download requests. @@ -272,35 +306,19 @@ func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { http.ServeContent(w, r, provider.FileName(), time.Time{}, reader) } -// EstimatedProgress returns an estimated watch progress based on HTTP Range requests. -func (ss *StreamServer) EstimatedProgress() (position int, duration int) { +// EstimatedProgress returns estimated watch progress percentage (0-100) +// and the total duration in seconds (0 if unknown). +func (ss *StreamServer) EstimatedProgress() (pct int, durationSec int) { total := ss.totalFileSize.Load() if total <= 0 { return 0, 0 } maxOffset := ss.maxByteOffset.Load() - pct := int(float64(maxOffset) / float64(total) * 100) - if pct > 100 { - pct = 100 + p := int(float64(maxOffset) / float64(total) * 100) + if p > 100 { + p = 100 } - return pct, 100 -} - -// parseRangeStart extracts the start byte from a "Range: bytes=START-" header. -func parseRangeStart(rangeHeader string) int64 { - after, found := strings.CutPrefix(rangeHeader, "bytes=") - if !found { - return -1 - } - dashIdx := strings.IndexByte(after, '-') - if dashIdx < 0 { - return -1 - } - start, err := strconv.ParseInt(after[:dashIdx], 10, 64) - if err != nil { - return -1 - } - return start + return p, int(ss.durationSec.Load()) } // --- File Providers --- @@ -322,7 +340,7 @@ type diskFileProvider struct { func (p *diskFileProvider) NewFileReader(_ context.Context) io.ReadSeekCloser { f, err := os.Open(p.path) if err != nil { - log.Printf("stream: failed to open %q: %v", p.path, err) + log.Printf("[stream] failed to open %q: %v", p.path, err) return nil } return f @@ -333,7 +351,7 @@ func (p *diskFileProvider) FileName() string { return p.name } func (p *diskFileProvider) FileSize() int64 { fi, err := os.Stat(p.path) if err != nil { - log.Printf("stream: failed to stat %q: %v", p.path, err) + log.Printf("[stream] failed to stat %q: %v", p.path, err) return 0 } return fi.Size() @@ -416,6 +434,174 @@ func TailscaleIP() string { return ip } +// trackingReader wraps an io.ReadSeekCloser with: +// - Progress tracking: atomically updates maxByteOffset on Read (not Seek). +// - Rate limiting: token bucket throttle at ~2x video bitrate so that +// bytes-read ≈ playback position. Without this, local/NAS files get +// downloaded instantly and progress jumps to 100%. +// +// Rate limiting happens AFTER each Read (sleep to pace), never before. +// This ensures the client always receives data and never times out. +type trackingReader struct { + inner io.ReadSeekCloser + server *StreamServer + id int64 // unique ID for this reader + pos int64 // current read position + bytesRead int64 // total bytes read by THIS connection (measures sequential progress) + bytesPerSec int64 // 0 = unlimited (remote/torrent), >0 = throttled (local disk) + + // Token bucket state + tokens int64 // available bytes to serve (can go negative = we're ahead) + lastFill time.Time // last time tokens were replenished + burstSize int64 // max token accumulation (caps how far ahead VLC can buffer) +} + +func (t *trackingReader) Read(p []byte) (int, error) { + // Always read immediately — never block before serving data to the client. + n, err := t.inner.Read(p) + if n > 0 { + t.pos += int64(n) + t.bytesRead += int64(n) + + // Only the reader that has read the most bytes can update progress. + // This prevents VLC's metadata/index requests (which read near EOF) + // from inflating progress to 100%. + if t.server.topReaderID.Load() == t.id { + // We own the progress — advance it (never regress) + for { + cur := t.server.maxByteOffset.Load() + if t.pos <= cur || t.server.maxByteOffset.CompareAndSwap(cur, t.pos) { + break + } + } + } else { + // Try to take over if we've read more than the current progress. + // CAS loop prevents two goroutines from interleaving their stores. + for { + cur := t.server.maxByteOffset.Load() + if t.bytesRead <= cur { + break + } + if t.server.maxByteOffset.CompareAndSwap(cur, t.pos) { + t.server.topReaderID.Store(t.id) + break + } + } + } + + // Rate limit: sleep AFTER read to pace throughput. + if t.bytesPerSec > 0 { + t.fillTokens() + t.tokens -= int64(n) + if t.tokens < 0 { + deficit := -t.tokens + sleepNs := (deficit * int64(time.Second)) / t.bytesPerSec + if sleepNs > int64(time.Second) { + sleepNs = int64(time.Second) + } + time.Sleep(time.Duration(sleepNs)) + } + } + } + return n, err +} + +func (t *trackingReader) Seek(offset int64, whence int) (int64, error) { + newPos, err := t.inner.Seek(offset, whence) + if err == nil { + t.pos = newPos + // Don't update maxByteOffset on Seek — http.ServeContent seeks to EOF + // to determine size, which would instantly mark progress as 100%. + // Don't reset tokens — prevents clients from bypassing rate limiting + // by issuing repeated seeks to refill the token bucket. + } + return newPos, err +} + +func (t *trackingReader) Close() error { return t.inner.Close() } + +func (t *trackingReader) fillTokens() { + now := time.Now() + elapsed := now.Sub(t.lastFill) + if elapsed <= 0 { + return + } + newTokens := int64(elapsed.Seconds() * float64(t.bytesPerSec)) + t.tokens += newTokens + if t.tokens > t.burstSize { + t.tokens = t.burstSize + } + t.lastFill = now +} + +// probeMedia holds bitrate and duration extracted by ffprobe. +type probeMedia struct { + bitrateBps int64 // bits per second + durationSec int64 // seconds +} + +// probeBitrate uses ffprobe to detect the video bitrate and duration. +// Returns zero values if ffprobe is not available or the file can't be probed. +func probeMediaInfo(filePath string) probeMedia { + // Defense-in-depth: only probe regular files (not FIFOs, devices, etc.) + if fi, err := os.Stat(filePath); err != nil || !fi.Mode().IsRegular() { + return probeMedia{} + } + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + out, err := exec.CommandContext(ctx, "ffprobe", + "-v", "quiet", + "-print_format", "json", + "-show_format", + filePath, + ).Output() + if err != nil { + return probeMedia{} + } + + var result struct { + Format struct { + BitRate string `json:"bit_rate"` + Duration string `json:"duration"` + Size string `json:"size"` + } `json:"format"` + } + if err := json.Unmarshal(out, &result); err != nil { + return probeMedia{} + } + + var pm probeMedia + + // Parse duration + if result.Format.Duration != "" { + dur, _ := strconv.ParseFloat(result.Format.Duration, 64) + if dur > 0 { + pm.durationSec = int64(dur) + } + } + + // Prefer explicit bit_rate from ffprobe + if result.Format.BitRate != "" { + bps, _ := strconv.ParseInt(result.Format.BitRate, 10, 64) + if bps > 0 { + pm.bitrateBps = bps + return pm + } + } + + // Fallback: estimate bitrate from size / duration + if result.Format.Size != "" && pm.durationSec > 0 { + size, _ := strconv.ParseInt(result.Format.Size, 10, 64) + if size > 0 { + pm.bitrateBps = int64(float64(size) * 8 / float64(pm.durationSec)) + } + } + + return pm +} + func mimeTypeFromExt(filename string) string { ext := strings.ToLower(filepath.Ext(filename)) switch ext {