- Refactor download.go and stream.go with downloadDeps/streamDeps structs for dependency injection, enabling unit testing without real I/O - download_test.go: 15 tests — input validation, mock downloaders, method selection, cobra Args, deadlock detection - stream_test.go: input validation, noOpen flag, engine error handling - client_test.go: context cancellation, timeout, full Sync roundtrip, watch-progress and HTTP error unwrapping - sync_test.go: TriggerSync on watching transition, adjustInterval - torrent_test.go: TorrentDownloader lifecycle without network - stream_server_test.go: HTTP server lifecycle, SetFile/ClearFile, concurrent requests, Shutdown releases port, content-type - manager_integration_test.go: full pipeline — success, torrent→debrid fallback, all-fail, multi-concurrent, ForceStart, OnTaskDone, recent-finished drain, cancel mid-download, organize - usenet_test.go: Cancel/Pause race regression test (run with -race) - daemon_test.go: isAllowedStreamPath table tests - CI: split coverage gate to engine+agent only (50% threshold); cmd coverage still reported but not gated (interactive UI commands) - lefthook: add pre-push hook with go test -race -count=1 -timeout=120s
233 lines
6.3 KiB
Go
233 lines
6.3 KiB
Go
package cmd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/fatih/color"
|
|
"github.com/spf13/cobra"
|
|
"github.com/torrentclaw/unarr/internal/engine"
|
|
"github.com/torrentclaw/unarr/internal/parser"
|
|
"github.com/torrentclaw/unarr/internal/ui"
|
|
)
|
|
|
|
// streamDeps agrupa las funciones constructoras usadas por runStream.
|
|
// Pueden sobreescribirse en tests para inyectar mocks.
|
|
type streamDeps struct {
|
|
newStreamEngine func(cfg engine.StreamConfig) (*engine.StreamEngine, error)
|
|
newStreamServer func(port int) *engine.StreamServer
|
|
openPlayer func(url, override string) (string, *exec.Cmd, error)
|
|
}
|
|
|
|
var defaultStreamDeps = streamDeps{
|
|
newStreamEngine: engine.NewStreamEngine,
|
|
newStreamServer: engine.NewStreamServer,
|
|
openPlayer: engine.OpenPlayer,
|
|
}
|
|
|
|
func newStreamCmd() *cobra.Command {
|
|
var (
|
|
port int
|
|
noOpen bool
|
|
playerCmd string
|
|
)
|
|
|
|
cmd := &cobra.Command{
|
|
Use: "stream <magnet|infohash>",
|
|
Short: "Stream a torrent directly to a media player",
|
|
Long: `Stream a torrent by info hash or magnet link without waiting for the full download.
|
|
|
|
Downloads pieces sequentially (prioritizing the beginning of the file) and serves
|
|
the video over a local HTTP server. Automatically detects and opens mpv, vlc, or
|
|
your default browser.
|
|
|
|
The stream server runs until you press Ctrl+C. Data is stored temporarily in your
|
|
download directory (or system temp if not configured).`,
|
|
Example: ` unarr stream abc123def456abc123def456abc123def456abc1
|
|
unarr stream "magnet:?xt=urn:btih:..." --port 8080
|
|
unarr stream <hash> --player mpv
|
|
unarr stream <hash> --no-open`,
|
|
Args: cobra.ExactArgs(1),
|
|
RunE: func(cmd *cobra.Command, args []string) error {
|
|
return runStream(args[0], port, noOpen, playerCmd)
|
|
},
|
|
}
|
|
|
|
cmd.Flags().IntVar(&port, "port", 0, "HTTP server port (default: random available)")
|
|
cmd.Flags().BoolVar(&noOpen, "no-open", false, "don't open a player, just print the URL")
|
|
cmd.Flags().StringVar(&playerCmd, "player", "", "media player command (default: auto-detect)")
|
|
cmd.RegisterFlagCompletionFunc("player", func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
|
|
return []string{"mpv\tmpv media player", "vlc\tVLC media player"}, cobra.ShellCompDirectiveNoFileComp
|
|
})
|
|
|
|
return cmd
|
|
}
|
|
|
|
func runStream(input string, port int, noOpen bool, playerCmd string) error {
|
|
return runStreamWithDeps(input, port, noOpen, playerCmd, defaultStreamDeps)
|
|
}
|
|
|
|
func runStreamWithDeps(input string, port int, noOpen bool, playerCmd string, deps streamDeps) error {
|
|
cfg := loadConfig()
|
|
bold := color.New(color.Bold)
|
|
green := color.New(color.FgGreen)
|
|
yellow := color.New(color.FgYellow)
|
|
dim := color.New(color.FgHiBlack)
|
|
|
|
// Parse input
|
|
parsed := parser.Parse(input)
|
|
magnetOrHash := input
|
|
if parsed.InfoHash != "" && !parsed.IsMagnet {
|
|
magnetOrHash = parsed.InfoHash
|
|
} else if parsed.InfoHash == "" {
|
|
trimmed := strings.TrimSpace(input)
|
|
if len(trimmed) == 40 {
|
|
magnetOrHash = strings.ToLower(trimmed)
|
|
} else if !strings.HasPrefix(trimmed, "magnet:") {
|
|
return fmt.Errorf("invalid input: provide a 40-char info hash or magnet URI")
|
|
}
|
|
}
|
|
|
|
// Data directory
|
|
dataDir := cfg.Download.Dir
|
|
if dataDir == "" {
|
|
dataDir = filepath.Join(os.TempDir(), "unarr-stream")
|
|
}
|
|
|
|
// Create engine
|
|
eng, err := deps.newStreamEngine(engine.StreamConfig{
|
|
DataDir: dataDir,
|
|
Port: port,
|
|
MetaTimeout: 60 * time.Second,
|
|
NoOpen: noOpen,
|
|
PlayerCmd: playerCmd,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("create stream engine: %w", err)
|
|
}
|
|
|
|
// Signal handling
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
sigCh := make(chan os.Signal, 1)
|
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() {
|
|
<-sigCh
|
|
fmt.Println("\n Shutting down...")
|
|
cancel()
|
|
}()
|
|
|
|
// Header
|
|
fmt.Println()
|
|
bold.Println(" unarr Stream")
|
|
fmt.Println()
|
|
|
|
// Start engine (metadata + file selection)
|
|
dim.Println(" Waiting for metadata...")
|
|
if err := eng.Start(ctx, magnetOrHash); err != nil {
|
|
eng.Shutdown(context.Background())
|
|
return err
|
|
}
|
|
|
|
fileName := eng.FileName()
|
|
fileSize := eng.FileLength()
|
|
bold.Printf(" File: %s (%s)\n", fileName, ui.FormatBytes(fileSize))
|
|
|
|
if !eng.IsVideoFile() {
|
|
yellow.Println(" Warning: no video files found, streaming largest file")
|
|
}
|
|
|
|
// Start HTTP server
|
|
srv := deps.newStreamServer(port)
|
|
if err := srv.Listen(ctx); err != nil {
|
|
eng.Shutdown(context.Background())
|
|
return fmt.Errorf("start server: %w", err)
|
|
}
|
|
srv.SetFile(eng, "cli-stream")
|
|
|
|
fmt.Printf(" URL: %s\n", srv.URL())
|
|
fmt.Println()
|
|
|
|
// Buffer before opening player
|
|
dim.Print(" Buffering...")
|
|
err = eng.WaitBuffer(ctx, func(buffered, target int64) {
|
|
pct := int(float64(buffered) / float64(target) * 100)
|
|
if pct > 100 {
|
|
pct = 100
|
|
}
|
|
fmt.Printf("\r Buffering: %d%% (%s / %s) ",
|
|
pct, ui.FormatBytes(buffered), ui.FormatBytes(target))
|
|
})
|
|
if err != nil {
|
|
srv.Shutdown(context.Background())
|
|
eng.Shutdown(context.Background())
|
|
return err
|
|
}
|
|
fmt.Println()
|
|
|
|
// Start progress tracking
|
|
eng.StartProgressLoop(ctx)
|
|
|
|
// Open player
|
|
if !noOpen {
|
|
playerName, _, openErr := deps.openPlayer(srv.URL(), playerCmd)
|
|
if openErr != nil {
|
|
yellow.Printf(" Could not open player: %s\n", openErr)
|
|
fmt.Printf(" Open this URL in your player: %s\n", srv.URL())
|
|
} else {
|
|
green.Printf(" Opened in %s\n", playerName)
|
|
}
|
|
} else {
|
|
fmt.Printf(" Open this URL in your player: %s\n", srv.URL())
|
|
}
|
|
fmt.Println()
|
|
|
|
// Progress loop until Ctrl+C
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
completed := false
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
goto shutdown
|
|
case <-ticker.C:
|
|
p := eng.Progress()
|
|
pct := 0
|
|
if p.TotalBytes > 0 {
|
|
pct = int(float64(p.DownloadedBytes) / float64(p.TotalBytes) * 100)
|
|
}
|
|
fmt.Printf("\r %d%% | %s/s | Peers: %d | Seeds: %d ",
|
|
pct, ui.FormatBytes(p.SpeedBps), p.Peers, p.Seeds)
|
|
|
|
if pct >= 100 && !completed {
|
|
completed = true
|
|
fmt.Println()
|
|
green.Println(" Download complete! Stream server still running. Ctrl+C to exit.")
|
|
}
|
|
}
|
|
}
|
|
|
|
shutdown:
|
|
fmt.Println()
|
|
fmt.Println()
|
|
dim.Println(" Cleaning up...")
|
|
|
|
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer shutdownCancel()
|
|
|
|
srv.Shutdown(shutdownCtx)
|
|
eng.Shutdown(shutdownCtx)
|
|
|
|
fmt.Println(" Done.")
|
|
fmt.Println()
|
|
return nil
|
|
}
|