unarr/internal/cmd/stream.go
Deivid Soto 78c16c295e test: add comprehensive test suite for engine, agent and cmd packages
- Refactor download.go and stream.go with downloadDeps/streamDeps structs
  for dependency injection, enabling unit testing without real I/O
- download_test.go: 15 tests — input validation, mock downloaders, method
  selection, cobra Args, deadlock detection
- stream_test.go: input validation, noOpen flag, engine error handling
- client_test.go: context cancellation, timeout, full Sync roundtrip,
  watch-progress and HTTP error unwrapping
- sync_test.go: TriggerSync on watching transition, adjustInterval
- torrent_test.go: TorrentDownloader lifecycle without network
- stream_server_test.go: HTTP server lifecycle, SetFile/ClearFile,
  concurrent requests, Shutdown releases port, content-type
- manager_integration_test.go: full pipeline — success, torrent→debrid
  fallback, all-fail, multi-concurrent, ForceStart, OnTaskDone,
  recent-finished drain, cancel mid-download, organize
- usenet_test.go: Cancel/Pause race regression test (run with -race)
- daemon_test.go: isAllowedStreamPath table tests
- CI: split coverage gate to engine+agent only (50% threshold); cmd
  coverage still reported but not gated (interactive UI commands)
- lefthook: add pre-push hook with go test -race -count=1 -timeout=120s
2026-04-08 23:36:00 +02:00

233 lines
6.3 KiB
Go

package cmd
import (
"context"
"fmt"
"os"
"os/exec"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/fatih/color"
"github.com/spf13/cobra"
"github.com/torrentclaw/unarr/internal/engine"
"github.com/torrentclaw/unarr/internal/parser"
"github.com/torrentclaw/unarr/internal/ui"
)
// streamDeps agrupa las funciones constructoras usadas por runStream.
// Pueden sobreescribirse en tests para inyectar mocks.
type streamDeps struct {
newStreamEngine func(cfg engine.StreamConfig) (*engine.StreamEngine, error)
newStreamServer func(port int) *engine.StreamServer
openPlayer func(url, override string) (string, *exec.Cmd, error)
}
var defaultStreamDeps = streamDeps{
newStreamEngine: engine.NewStreamEngine,
newStreamServer: engine.NewStreamServer,
openPlayer: engine.OpenPlayer,
}
func newStreamCmd() *cobra.Command {
var (
port int
noOpen bool
playerCmd string
)
cmd := &cobra.Command{
Use: "stream <magnet|infohash>",
Short: "Stream a torrent directly to a media player",
Long: `Stream a torrent by info hash or magnet link without waiting for the full download.
Downloads pieces sequentially (prioritizing the beginning of the file) and serves
the video over a local HTTP server. Automatically detects and opens mpv, vlc, or
your default browser.
The stream server runs until you press Ctrl+C. Data is stored temporarily in your
download directory (or system temp if not configured).`,
Example: ` unarr stream abc123def456abc123def456abc123def456abc1
unarr stream "magnet:?xt=urn:btih:..." --port 8080
unarr stream <hash> --player mpv
unarr stream <hash> --no-open`,
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
return runStream(args[0], port, noOpen, playerCmd)
},
}
cmd.Flags().IntVar(&port, "port", 0, "HTTP server port (default: random available)")
cmd.Flags().BoolVar(&noOpen, "no-open", false, "don't open a player, just print the URL")
cmd.Flags().StringVar(&playerCmd, "player", "", "media player command (default: auto-detect)")
cmd.RegisterFlagCompletionFunc("player", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return []string{"mpv\tmpv media player", "vlc\tVLC media player"}, cobra.ShellCompDirectiveNoFileComp
})
return cmd
}
func runStream(input string, port int, noOpen bool, playerCmd string) error {
return runStreamWithDeps(input, port, noOpen, playerCmd, defaultStreamDeps)
}
func runStreamWithDeps(input string, port int, noOpen bool, playerCmd string, deps streamDeps) error {
cfg := loadConfig()
bold := color.New(color.Bold)
green := color.New(color.FgGreen)
yellow := color.New(color.FgYellow)
dim := color.New(color.FgHiBlack)
// Parse input
parsed := parser.Parse(input)
magnetOrHash := input
if parsed.InfoHash != "" && !parsed.IsMagnet {
magnetOrHash = parsed.InfoHash
} else if parsed.InfoHash == "" {
trimmed := strings.TrimSpace(input)
if len(trimmed) == 40 {
magnetOrHash = strings.ToLower(trimmed)
} else if !strings.HasPrefix(trimmed, "magnet:") {
return fmt.Errorf("invalid input: provide a 40-char info hash or magnet URI")
}
}
// Data directory
dataDir := cfg.Download.Dir
if dataDir == "" {
dataDir = filepath.Join(os.TempDir(), "unarr-stream")
}
// Create engine
eng, err := deps.newStreamEngine(engine.StreamConfig{
DataDir: dataDir,
Port: port,
MetaTimeout: 60 * time.Second,
NoOpen: noOpen,
PlayerCmd: playerCmd,
})
if err != nil {
return fmt.Errorf("create stream engine: %w", err)
}
// Signal handling
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\n Shutting down...")
cancel()
}()
// Header
fmt.Println()
bold.Println(" unarr Stream")
fmt.Println()
// Start engine (metadata + file selection)
dim.Println(" Waiting for metadata...")
if err := eng.Start(ctx, magnetOrHash); err != nil {
eng.Shutdown(context.Background())
return err
}
fileName := eng.FileName()
fileSize := eng.FileLength()
bold.Printf(" File: %s (%s)\n", fileName, ui.FormatBytes(fileSize))
if !eng.IsVideoFile() {
yellow.Println(" Warning: no video files found, streaming largest file")
}
// Start HTTP server
srv := deps.newStreamServer(port)
if err := srv.Listen(ctx); err != nil {
eng.Shutdown(context.Background())
return fmt.Errorf("start server: %w", err)
}
srv.SetFile(eng, "cli-stream")
fmt.Printf(" URL: %s\n", srv.URL())
fmt.Println()
// Buffer before opening player
dim.Print(" Buffering...")
err = eng.WaitBuffer(ctx, func(buffered, target int64) {
pct := int(float64(buffered) / float64(target) * 100)
if pct > 100 {
pct = 100
}
fmt.Printf("\r Buffering: %d%% (%s / %s) ",
pct, ui.FormatBytes(buffered), ui.FormatBytes(target))
})
if err != nil {
srv.Shutdown(context.Background())
eng.Shutdown(context.Background())
return err
}
fmt.Println()
// Start progress tracking
eng.StartProgressLoop(ctx)
// Open player
if !noOpen {
playerName, _, openErr := deps.openPlayer(srv.URL(), playerCmd)
if openErr != nil {
yellow.Printf(" Could not open player: %s\n", openErr)
fmt.Printf(" Open this URL in your player: %s\n", srv.URL())
} else {
green.Printf(" Opened in %s\n", playerName)
}
} else {
fmt.Printf(" Open this URL in your player: %s\n", srv.URL())
}
fmt.Println()
// Progress loop until Ctrl+C
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
completed := false
for {
select {
case <-ctx.Done():
goto shutdown
case <-ticker.C:
p := eng.Progress()
pct := 0
if p.TotalBytes > 0 {
pct = int(float64(p.DownloadedBytes) / float64(p.TotalBytes) * 100)
}
fmt.Printf("\r %d%% | %s/s | Peers: %d | Seeds: %d ",
pct, ui.FormatBytes(p.SpeedBps), p.Peers, p.Seeds)
if pct >= 100 && !completed {
completed = true
fmt.Println()
green.Println(" Download complete! Stream server still running. Ctrl+C to exit.")
}
}
}
shutdown:
fmt.Println()
fmt.Println()
dim.Println(" Cleaning up...")
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
srv.Shutdown(shutdownCtx)
eng.Shutdown(shutdownCtx)
fmt.Println(" Done.")
fmt.Println()
return nil
}