feat(stream): /speedtest endpoint for agent-path bandwidth probing

The web player measured bandwidth against the web origin, which says nothing
about the path the video actually travels (LAN-direct, tailnet, or the CF
funnel) — on a fast LAN where the web server is the slow link it falsely
recommended a lower resolution. Serve a fixed-size, incompressible payload
from the agent so the web can measure the REAL stream path.

- GET /speedtest?size=N (clamped 64KB–4MB, default 2MB), HEAD supported
- CORS-gated like the other endpoints; no auth (carries no data)
- single-flight guard (atomic): one measurement at a time → a concurrent
  request gets 429, bounding the bandwidth an unauthenticated caller can
  drain over the public funnel
This commit is contained in:
Deivid Soto 2026-06-02 22:52:25 +02:00
parent 2b5a45674a
commit ea152a2276

View file

@ -120,6 +120,7 @@ type StreamServer struct {
durationSec atomic.Int64 // video duration in seconds (from ffprobe, 0 = unknown) durationSec atomic.Int64 // video duration in seconds (from ffprobe, 0 = unknown)
topReaderID atomic.Int64 // ID of the reader that set maxByteOffset (only it can advance it) topReaderID atomic.Int64 // ID of the reader that set maxByteOffset (only it can advance it)
readerCounter atomic.Int64 // monotonic counter for assigning reader IDs readerCounter atomic.Int64 // monotonic counter for assigning reader IDs
speedtestActive atomic.Bool // single-flight guard for /speedtest (unauth + public via funnel)
} }
// NewStreamServer creates a stream server bound to the given port. // NewStreamServer creates a stream server bound to the given port.
@ -280,6 +281,7 @@ func (ss *StreamServer) Listen(ctx context.Context) error {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/stream", ss.handler) mux.HandleFunc("/stream", ss.handler)
mux.HandleFunc("/health", ss.healthHandler) mux.HandleFunc("/health", ss.healthHandler)
mux.HandleFunc("/speedtest", ss.speedtestHandler)
mux.HandleFunc("/playlist.m3u", ss.playlistHandler) mux.HandleFunc("/playlist.m3u", ss.playlistHandler)
mux.HandleFunc("/hls/", ss.hlsHandler) mux.HandleFunc("/hls/", ss.hlsHandler)
mux.HandleFunc("/thumbnail", ss.thumbnailHandler) mux.HandleFunc("/thumbnail", ss.thumbnailHandler)
@ -765,6 +767,66 @@ func (ss *StreamServer) healthHandler(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(resp) //nolint:errcheck json.NewEncoder(w).Encode(resp) //nolint:errcheck
} }
// speedtestHandler streams a fixed-size, incompressible payload so the web
// player can measure REAL throughput to THIS agent — the path the stream
// actually travels (LAN-direct, tailnet, or the CF funnel). The web origin's
// link tells us nothing about that path; measuring it here is the only honest
// signal for the pre-play quality suggestion. No auth or active stream needed:
// the bytes carry no information. CORS-gated like the other endpoints so the
// cross-origin fetch can read + time the body.
func (ss *StreamServer) speedtestHandler(w http.ResponseWriter, r *http.Request) {
ss.lastActivity.Store(time.Now().UnixNano())
if ss.writeCORSHeaders(w, r, "") {
return
}
// Single-flight: this endpoint is unauthenticated (it carries no data) and
// reachable over the public cloudflared funnel, so bound the bandwidth a
// caller can drain — only one measurement runs at a time, a concurrent
// request gets 429 instead of stacking another multi-MB transfer.
if !ss.speedtestActive.CompareAndSwap(false, true) {
http.Error(w, "speedtest busy", http.StatusTooManyRequests)
return
}
defer ss.speedtestActive.Store(false)
const defaultSize = 2 * 1024 * 1024
const maxSize = 4 * 1024 * 1024 // matches the web /api/v1/speed-test cap
size := defaultSize
if v := r.URL.Query().Get("size"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
if n < 64*1024 {
n = 64 * 1024
} else if n > maxSize {
n = maxSize
}
size = n
}
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", strconv.Itoa(size))
w.Header().Set("Cache-Control", "no-store")
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
// Reuse one non-repeating chunk (incompressible enough that gzip can't skew
// the measurement) to avoid per-write allocation.
const chunk = 64 * 1024
buf := make([]byte, chunk)
for i := range buf {
buf[i] = byte((i*31 + 7) & 0xff)
}
for remaining := size; remaining > 0; {
n := chunk
if remaining < n {
n = remaining
}
if _, err := w.Write(buf[:n]); err != nil {
return // client disconnected mid-measure
}
remaining -= n
}
}
// playlistHandler generates an M3U playlist for VLC with #EXTVLCOPT language hints. // playlistHandler generates an M3U playlist for VLC with #EXTVLCOPT language hints.
// Query params: audioLangs (comma-sep), subLangs (comma-sep), resumeSec, title, streamUrl. // Query params: audioLangs (comma-sep), subLangs (comma-sep), resumeSec, title, streamUrl.
// If streamUrl is omitted, uses the current best stream URL. // If streamUrl is omitted, uses the current best stream URL.