From f1b4f2e3279372bde2483865962bfc5493d796e9 Mon Sep 17 00:00:00 2001 From: Deivid Soto Date: Thu, 9 Apr 2026 16:15:41 +0200 Subject: [PATCH] fix(stream): fix black screen on remote/Tailscale streaming Three root-cause fixes for VLC showing a black screen when opening a stream from a different network or via Tailscale: 1. PrioritizeTail: when VLC opens an MKV/MP4 stream it immediately seeks to the end of the file to read the container index (seekhead/moov atom). For active torrents those end-pieces aren't downloaded yet, so the reader blocks indefinitely. PrioritizeTail() opens a background reader positioned at the last 5 MB, keeping those pieces at high priority until ctx is cancelled or they finish downloading. 2. /health endpoint: GET /health returns a lightweight JSON response {"status":"ok","streaming":bool,...} so connectivity can be tested with a simple curl from any device before involving VLC. 3. Per-request logging: every incoming /stream request now logs the client IP and Range header, making it trivial to confirm whether remote/Tailscale clients are reaching the server at all. --- internal/cmd/stream_handler.go | 7 +++ internal/engine/stream.go | 32 ++++++++++++ internal/engine/stream_server.go | 44 ++++++++++++++++ internal/engine/stream_server_test.go | 74 +++++++++++++++++++++++++++ internal/engine/stream_test.go | 28 ++++++++++ 5 files changed, 185 insertions(+) diff --git a/internal/cmd/stream_handler.go b/internal/cmd/stream_handler.go index aec884b..fa61220 100644 --- a/internal/cmd/stream_handler.go +++ b/internal/cmd/stream_handler.go @@ -148,6 +148,13 @@ func handleStreamTask(parentCtx context.Context, at agent.Task, reporter *engine task.StreamURL = srv.URLsJSON() log.Printf("[%s] stream ready: %s (url: %s)", at.ID[:8], eng.FileName(), srv.URL()) + // Pre-descargar los últimos 5 MB del archivo para que el moov atom (MP4) + // o el seekhead (MKV) estén disponibles cuando VLC los pida al abrir el + // stream. Sin esto, VLC busca el final del archivo, el lector bloquea + // esperando piezas no descargadas, y el resultado es pantalla negra en + // redes remotas donde la latencia amplifica el efecto. + eng.PrioritizeTail(ctx, 5*1024*1024) + // 5. Start watch progress reporter if agentClient != nil { watchReporter := engine.NewWatchReporter(agentClient, srv, at.ID) diff --git a/internal/engine/stream.go b/internal/engine/stream.go index af644b7..1414f15 100644 --- a/internal/engine/stream.go +++ b/internal/engine/stream.go @@ -303,6 +303,38 @@ func (s *StreamEngine) FileSize() int64 { return s.totalBytes } // BufferTarget returns the buffer threshold in bytes. func (s *StreamEngine) BufferTarget() int64 { return s.bufferTarget } +// PrioritizeTail abre un lector posicionado cerca del final del archivo para +// forzar la descarga anticipada de los metadatos del container (moov atom en +// MP4, seekhead en MKV). Sin esto, VLC busca el final del archivo al abrirlo +// y el lector bloquea indefinidamente si esas piezas aún no están descargadas, +// resultando en pantalla negra en redes lentas o remotas. +// +// Se ejecuta en una goroutine y se cancela cuando ctx expira. +func (s *StreamEngine) PrioritizeTail(ctx context.Context, tailBytes int64) { + if s.file == nil || s.totalBytes <= tailBytes*2 { + return + } + go func() { + reader := s.file.NewReader() + defer reader.Close() + + seekPos := s.totalBytes - tailBytes + reader.Seek(seekPos, io.SeekStart) //nolint:errcheck + reader.SetReadahead(tailBytes) + reader.SetContext(ctx) + + // Leer continuamente para mantener las piezas priorizadas hasta que + // ctx se cancele o el final del archivo esté completamente descargado. + buf := make([]byte, 32*1024) + for { + _, err := reader.Read(buf) + if err != nil { + return + } + } + }() +} + // Shutdown gracefully closes the torrent and client. func (s *StreamEngine) Shutdown(_ context.Context) error { if s.tor != nil { diff --git a/internal/engine/stream_server.go b/internal/engine/stream_server.go index 492bf7a..359d0b1 100644 --- a/internal/engine/stream_server.go +++ b/internal/engine/stream_server.go @@ -71,6 +71,7 @@ func NewStreamServer(port int) *StreamServer { func (ss *StreamServer) Listen(ctx context.Context) error { mux := http.NewServeMux() mux.HandleFunc("/stream", ss.handler) + mux.HandleFunc("/health", ss.healthHandler) // SO_REUSEADDR allows immediate rebind if the port is in TIME_WAIT (e.g. after agent restart) lc := net.ListenConfig{ @@ -234,9 +235,52 @@ func (ss *StreamServer) Shutdown(ctx context.Context) error { return nil } +// healthHandler responde con el estado del servidor en JSON. +// Útil para diagnosticar conectividad desde redes remotas o Tailscale: +// +// curl http://:/health +func (ss *StreamServer) healthHandler(w http.ResponseWriter, r *http.Request) { + ss.mu.RLock() + provider := ss.provider + taskID := ss.taskID + ss.mu.RUnlock() + + clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) + + type healthResponse struct { + Status string `json:"status"` + Streaming bool `json:"streaming"` + File string `json:"file,omitempty"` + Task string `json:"task,omitempty"` + Port int `json:"port"` + Client string `json:"client"` + } + resp := healthResponse{ + Status: "ok", + Port: ss.port, + Client: clientIP, + } + if provider != nil { + resp.Streaming = true + resp.File = provider.FileName() + resp.Task = taskID + if len(resp.Task) > 8 { + resp.Task = resp.Task[:8] + } + } + + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Cache-Control", "no-cache") + json.NewEncoder(w).Encode(resp) //nolint:errcheck +} + func (ss *StreamServer) handler(w http.ResponseWriter, r *http.Request) { ss.lastActivity.Store(time.Now().UnixNano()) + // Log every incoming request — essential for diagnosing remote/Tailscale issues. + clientIP, _, _ := net.SplitHostPort(r.RemoteAddr) + log.Printf("[stream] %s /stream from %s Range:%q", r.Method, clientIP, r.Header.Get("Range")) + // Get current provider (may be nil if no file is being served) ss.mu.RLock() provider := ss.provider diff --git a/internal/engine/stream_server_test.go b/internal/engine/stream_server_test.go index 8802ff9..623a16d 100644 --- a/internal/engine/stream_server_test.go +++ b/internal/engine/stream_server_test.go @@ -305,6 +305,80 @@ func TestStreamServer_SetFile_SwapsProvider(t *testing.T) { } } +// TestStreamServer_Health_NoFile verifica que /health devuelve streaming:false +// cuando no hay archivo configurado. +func TestStreamServer_Health_NoFile(t *testing.T) { + srv := NewStreamServer(0) + ctx := context.Background() + + if err := srv.Listen(ctx); err != nil { + t.Fatalf("Listen() error: %v", err) + } + defer srv.Shutdown(ctx) + + healthURL := fmt.Sprintf("http://127.0.0.1:%d/health", srv.Port()) + resp, err := http.Get(healthURL) + if err != nil { + t.Fatalf("GET /health: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Errorf("status = %d, want 200", resp.StatusCode) + } + ct := resp.Header.Get("Content-Type") + if !strings.Contains(ct, "application/json") { + t.Errorf("Content-Type = %q, want application/json", ct) + } + + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + if !strings.Contains(bodyStr, `"streaming":false`) { + t.Errorf("body = %q, want streaming:false", bodyStr) + } + if !strings.Contains(bodyStr, `"status":"ok"`) { + t.Errorf("body = %q, want status:ok", bodyStr) + } +} + +// TestStreamServer_Health_WithFile verifica que /health devuelve streaming:true +// y el nombre del archivo cuando hay un archivo configurado. +func TestStreamServer_Health_WithFile(t *testing.T) { + srv := NewStreamServer(0) + ctx := context.Background() + + if err := srv.Listen(ctx); err != nil { + t.Fatalf("Listen() error: %v", err) + } + defer srv.Shutdown(ctx) + + provider := newFakeProvider("pelicula.mkv", []byte("contenido de prueba")) + srv.SetFile(provider, "task-health-test") + + healthURL := fmt.Sprintf("http://127.0.0.1:%d/health", srv.Port()) + resp, err := http.Get(healthURL) + if err != nil { + t.Fatalf("GET /health: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Errorf("status = %d, want 200", resp.StatusCode) + } + + body, _ := io.ReadAll(resp.Body) + bodyStr := string(body) + if !strings.Contains(bodyStr, `"streaming":true`) { + t.Errorf("body = %q, want streaming:true", bodyStr) + } + if !strings.Contains(bodyStr, "pelicula.mkv") { + t.Errorf("body = %q, want file name pelicula.mkv", bodyStr) + } + if !strings.Contains(bodyStr, "task-hea") { // primeros 8 chars de "task-health-test" + t.Errorf("body = %q, want task short ID", bodyStr) + } +} + // TestStreamServer_MKV_ContentType verifica que el Content-Type para .mkv // es el correcto. func TestStreamServer_MKV_ContentType(t *testing.T) { diff --git a/internal/engine/stream_test.go b/internal/engine/stream_test.go index 61e1612..df473a0 100644 --- a/internal/engine/stream_test.go +++ b/internal/engine/stream_test.go @@ -380,3 +380,31 @@ func (r *responseRecorder) ReadFrom(src io.Reader) (int64, error) { n, err := io.Copy(r.body, src) return n, err } + +// TestPrioritizeTail_SmallFile verifica que PrioritizeTail no lanza goroutine +// cuando el archivo es demasiado pequeño (≤ 2×tailBytes). +func TestPrioritizeTail_SmallFile(t *testing.T) { + s := &StreamEngine{ + totalBytes: 5 * 1024 * 1024, // 5 MB — menor que 2×5 MB + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // No debe entrar en pánico ni bloquear con file == nil + s.PrioritizeTail(ctx, 5*1024*1024) + // Si llega aquí sin pánico, el test pasa +} + +// TestPrioritizeTail_NilFile verifica que PrioritizeTail es seguro cuando +// file es nil (engine no inicializado). +func TestPrioritizeTail_NilFile(t *testing.T) { + s := &StreamEngine{ + totalBytes: 100 * 1024 * 1024, + file: nil, + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.PrioritizeTail(ctx, 5*1024*1024) + // No debe entrar en pánico +}