fix(stream): fix black screen on remote/Tailscale streaming
Three root-cause fixes for VLC showing a black screen when opening a
stream from a different network or via Tailscale:
1. PrioritizeTail: when VLC opens an MKV/MP4 stream it immediately seeks
to the end of the file to read the container index (seekhead/moov
atom). For active torrents those end-pieces aren't downloaded yet, so
the reader blocks indefinitely. PrioritizeTail() opens a background
reader positioned at the last 5 MB, keeping those pieces at high
priority until ctx is cancelled or they finish downloading.
2. /health endpoint: GET /health returns a lightweight JSON response
{"status":"ok","streaming":bool,...} so connectivity can be tested
with a simple curl from any device before involving VLC.
3. Per-request logging: every incoming /stream request now logs the
client IP and Range header, making it trivial to confirm whether
remote/Tailscale clients are reaching the server at all.
This commit is contained in:
parent
7eaf357680
commit
f1b4f2e327
5 changed files with 185 additions and 0 deletions
|
|
@ -148,6 +148,13 @@ func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine
|
|||
task.StreamURL = srv.URLsJSON()
|
||||
log.Printf("[%s] stream ready: %s (url: %s)", at.ID[:8], eng.FileName(), srv.URL())
|
||||
|
||||
// Pre-descargar los últimos 5 MB del archivo para que el moov atom (MP4)
|
||||
// o el seekhead (MKV) estén disponibles cuando VLC los pida al abrir el
|
||||
// stream. Sin esto, VLC busca el final del archivo, el lector bloquea
|
||||
// esperando piezas no descargadas, y el resultado es pantalla negra en
|
||||
// redes remotas donde la latencia amplifica el efecto.
|
||||
eng.PrioritizeTail(ctx, 5*1024*1024)
|
||||
|
||||
// 5. Start watch progress reporter
|
||||
if agentClient != nil {
|
||||
watchReporter := engine.NewWatchReporter(agentClient, srv, at.ID)
|
||||
|
|
|
|||
|
|
@ -303,6 +303,38 @@ func (s *StreamEngine) FileSize() int64 { return s.totalBytes }
|
|||
// BufferTarget returns the buffer threshold in bytes.
|
||||
func (s *StreamEngine) BufferTarget() int64 { return s.bufferTarget }
|
||||
|
||||
// PrioritizeTail abre un lector posicionado cerca del final del archivo para
|
||||
// forzar la descarga anticipada de los metadatos del container (moov atom en
|
||||
// MP4, seekhead en MKV). Sin esto, VLC busca el final del archivo al abrirlo
|
||||
// y el lector bloquea indefinidamente si esas piezas aún no están descargadas,
|
||||
// resultando en pantalla negra en redes lentas o remotas.
|
||||
//
|
||||
// Se ejecuta en una goroutine y se cancela cuando ctx expira.
|
||||
func (s *StreamEngine) PrioritizeTail(ctx context.Context, tailBytes int64) {
|
||||
if s.file == nil || s.totalBytes <= tailBytes*2 {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
reader := s.file.NewReader()
|
||||
defer reader.Close()
|
||||
|
||||
seekPos := s.totalBytes - tailBytes
|
||||
reader.Seek(seekPos, io.SeekStart) //nolint:errcheck
|
||||
reader.SetReadahead(tailBytes)
|
||||
reader.SetContext(ctx)
|
||||
|
||||
// Leer continuamente para mantener las piezas priorizadas hasta que
|
||||
// ctx se cancele o el final del archivo esté completamente descargado.
|
||||
buf := make([]byte, 32*1024)
|
||||
for {
|
||||
_, err := reader.Read(buf)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Shutdown gracefully closes the torrent and client.
|
||||
func (s *StreamEngine) Shutdown(_ context.Context) error {
|
||||
if s.tor != nil {
|
||||
|
|
|
|||
|
|
@ -71,6 +71,7 @@ func NewStreamServer(port int) *StreamServer {
|
|||
func (ss *StreamServer) Listen(ctx context.Context) error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/stream", ss.handler)
|
||||
mux.HandleFunc("/health", ss.healthHandler)
|
||||
|
||||
// SO_REUSEADDR allows immediate rebind if the port is in TIME_WAIT (e.g. after agent restart)
|
||||
lc := net.ListenConfig{
|
||||
|
|
@ -234,9 +235,52 @@ func (ss *StreamServer) Shutdown(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// healthHandler responde con el estado del servidor en JSON.
|
||||
// Útil para diagnosticar conectividad desde redes remotas o Tailscale:
|
||||
//
|
||||
// curl http://<tailscale-ip>:<port>/health
|
||||
func (ss *StreamServer) healthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ss.mu.RLock()
|
||||
provider := ss.provider
|
||||
taskID := ss.taskID
|
||||
ss.mu.RUnlock()
|
||||
|
||||
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
|
||||
|
||||
type healthResponse struct {
|
||||
Status string `json:"status"`
|
||||
Streaming bool `json:"streaming"`
|
||||
File string `json:"file,omitempty"`
|
||||
Task string `json:"task,omitempty"`
|
||||
Port int `json:"port"`
|
||||
Client string `json:"client"`
|
||||
}
|
||||
resp := healthResponse{
|
||||
Status: "ok",
|
||||
Port: ss.port,
|
||||
Client: clientIP,
|
||||
}
|
||||
if provider != nil {
|
||||
resp.Streaming = true
|
||||
resp.File = provider.FileName()
|
||||
resp.Task = taskID
|
||||
if len(resp.Task) > 8 {
|
||||
resp.Task = resp.Task[:8]
|
||||
}
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
json.NewEncoder(w).Encode(resp) //nolint:errcheck
|
||||
}
|
||||
|
||||
func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) {
|
||||
ss.lastActivity.Store(time.Now().UnixNano())
|
||||
|
||||
// Log every incoming request — essential for diagnosing remote/Tailscale issues.
|
||||
clientIP, _, _ := net.SplitHostPort(r.RemoteAddr)
|
||||
log.Printf("[stream] %s /stream from %s Range:%q", r.Method, clientIP, r.Header.Get("Range"))
|
||||
|
||||
// Get current provider (may be nil if no file is being served)
|
||||
ss.mu.RLock()
|
||||
provider := ss.provider
|
||||
|
|
|
|||
|
|
@ -305,6 +305,80 @@ func TestStreamServer_SetFile_SwapsProvider(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestStreamServer_Health_NoFile verifica que /health devuelve streaming:false
|
||||
// cuando no hay archivo configurado.
|
||||
func TestStreamServer_Health_NoFile(t *testing.T) {
|
||||
srv := NewStreamServer(0)
|
||||
ctx := context.Background()
|
||||
|
||||
if err := srv.Listen(ctx); err != nil {
|
||||
t.Fatalf("Listen() error: %v", err)
|
||||
}
|
||||
defer srv.Shutdown(ctx)
|
||||
|
||||
healthURL := fmt.Sprintf("http://127.0.0.1:%d/health", srv.Port())
|
||||
resp, err := http.Get(healthURL)
|
||||
if err != nil {
|
||||
t.Fatalf("GET /health: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Errorf("status = %d, want 200", resp.StatusCode)
|
||||
}
|
||||
ct := resp.Header.Get("Content-Type")
|
||||
if !strings.Contains(ct, "application/json") {
|
||||
t.Errorf("Content-Type = %q, want application/json", ct)
|
||||
}
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
bodyStr := string(body)
|
||||
if !strings.Contains(bodyStr, `"streaming":false`) {
|
||||
t.Errorf("body = %q, want streaming:false", bodyStr)
|
||||
}
|
||||
if !strings.Contains(bodyStr, `"status":"ok"`) {
|
||||
t.Errorf("body = %q, want status:ok", bodyStr)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStreamServer_Health_WithFile verifica que /health devuelve streaming:true
|
||||
// y el nombre del archivo cuando hay un archivo configurado.
|
||||
func TestStreamServer_Health_WithFile(t *testing.T) {
|
||||
srv := NewStreamServer(0)
|
||||
ctx := context.Background()
|
||||
|
||||
if err := srv.Listen(ctx); err != nil {
|
||||
t.Fatalf("Listen() error: %v", err)
|
||||
}
|
||||
defer srv.Shutdown(ctx)
|
||||
|
||||
provider := newFakeProvider("pelicula.mkv", []byte("contenido de prueba"))
|
||||
srv.SetFile(provider, "task-health-test")
|
||||
|
||||
healthURL := fmt.Sprintf("http://127.0.0.1:%d/health", srv.Port())
|
||||
resp, err := http.Get(healthURL)
|
||||
if err != nil {
|
||||
t.Fatalf("GET /health: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Errorf("status = %d, want 200", resp.StatusCode)
|
||||
}
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
bodyStr := string(body)
|
||||
if !strings.Contains(bodyStr, `"streaming":true`) {
|
||||
t.Errorf("body = %q, want streaming:true", bodyStr)
|
||||
}
|
||||
if !strings.Contains(bodyStr, "pelicula.mkv") {
|
||||
t.Errorf("body = %q, want file name pelicula.mkv", bodyStr)
|
||||
}
|
||||
if !strings.Contains(bodyStr, "task-hea") { // primeros 8 chars de "task-health-test"
|
||||
t.Errorf("body = %q, want task short ID", bodyStr)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStreamServer_MKV_ContentType verifica que el Content-Type para .mkv
|
||||
// es el correcto.
|
||||
func TestStreamServer_MKV_ContentType(t *testing.T) {
|
||||
|
|
|
|||
|
|
@ -380,3 +380,31 @@ func (r *responseRecorder) ReadFrom(src io.Reader) (int64, error) {
|
|||
n, err := io.Copy(r.body, src)
|
||||
return n, err
|
||||
}
|
||||
|
||||
// TestPrioritizeTail_SmallFile verifica que PrioritizeTail no lanza goroutine
|
||||
// cuando el archivo es demasiado pequeño (≤ 2×tailBytes).
|
||||
func TestPrioritizeTail_SmallFile(t *testing.T) {
|
||||
s := &StreamEngine{
|
||||
totalBytes: 5 * 1024 * 1024, // 5 MB — menor que 2×5 MB
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// No debe entrar en pánico ni bloquear con file == nil
|
||||
s.PrioritizeTail(ctx, 5*1024*1024)
|
||||
// Si llega aquí sin pánico, el test pasa
|
||||
}
|
||||
|
||||
// TestPrioritizeTail_NilFile verifica que PrioritizeTail es seguro cuando
|
||||
// file es nil (engine no inicializado).
|
||||
func TestPrioritizeTail_NilFile(t *testing.T) {
|
||||
s := &StreamEngine{
|
||||
totalBytes: 100 * 1024 * 1024,
|
||||
file: nil,
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
s.PrioritizeTail(ctx, 5*1024*1024)
|
||||
// No debe entrar en pánico
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue