chore(deps): update all dependencies and GitHub Actions to latest
- Go deps: cobra 1.10.2, fatih/color 1.19, tablewriter 1.1.4, anacrolix/torrent 1.61, charmbracelet/huh 1.0, pion/webrtc 4.2.11 - GitHub Actions: checkout v6, setup-go v6, golangci-lint-action v9, codecov-action v5, ghaction-upx v4, goreleaser-action v7 - CI matrix: drop Go 1.22, test on 1.24 + 1.25 - Migrate tablewriter API from v0 to v1 (breaking change) - Fix data race in WSTransport.readLoop (pass conn as parameter) - Add file.Sync() before close in debrid and usenet downloaders - Improve progress tracker: dedup MarkDone, re-mark dirty on flush error
This commit is contained in:
parent
719429b06e
commit
c9bcb96dab
10 changed files with 346 additions and 352 deletions
|
|
@ -79,7 +79,7 @@ func (t *WSTransport) Connect(ctx context.Context) error {
|
|||
t.authDoneOnce = sync.Once{}
|
||||
t.mu.Unlock()
|
||||
|
||||
go t.readLoop()
|
||||
go t.readLoop(conn)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -240,9 +240,9 @@ func (t *WSTransport) send(msg any) error {
|
|||
return t.conn.WriteMessage(websocket.TextMessage, data)
|
||||
}
|
||||
|
||||
func (t *WSTransport) readLoop() {
|
||||
func (t *WSTransport) readLoop(conn *websocket.Conn) {
|
||||
for {
|
||||
_, msg, err := t.conn.ReadMessage()
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
if !t.closed.Load() {
|
||||
log.Printf("[ws] read error: %v", err)
|
||||
|
|
|
|||
|
|
@ -182,7 +182,12 @@ func (d *DebridDownloader) Download(ctx context.Context, task *Task, outputDir s
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("open file: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
defer func() {
|
||||
if err := file.Sync(); err != nil {
|
||||
log.Printf("[%s] sync warning: %v", shortID(task.ID), err)
|
||||
}
|
||||
file.Close()
|
||||
}()
|
||||
|
||||
// Download with progress reporting
|
||||
downloaded := startOffset
|
||||
|
|
|
|||
|
|
@ -136,9 +136,12 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("download NZB: %w", err)
|
||||
}
|
||||
// Cache for future resume
|
||||
os.MkdirAll(resumeDir, 0o755)
|
||||
os.WriteFile(nzbCachePath, nzbData, 0o644)
|
||||
// Cache for future resume (best-effort — download still works without cache)
|
||||
if mkErr := os.MkdirAll(resumeDir, 0o755); mkErr != nil {
|
||||
log.Printf("[%s] resume dir create failed: %v", shortID, mkErr)
|
||||
} else if wErr := os.WriteFile(nzbCachePath, nzbData, 0o644); wErr != nil {
|
||||
log.Printf("[%s] NZB cache write failed: %v", shortID, wErr)
|
||||
}
|
||||
} else {
|
||||
log.Printf("[%s] using cached NZB", shortID)
|
||||
}
|
||||
|
|
@ -162,6 +165,11 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s
|
|||
shortID, tracker.TotalCompleted(), totalSegs)
|
||||
}
|
||||
|
||||
// Always flush progress on exit — covers graceful shutdown, SIGTERM,
|
||||
// error returns, and shutdown-timeout scenarios. The atomic write
|
||||
// (tmp+rename) ensures the file is never corrupted even on hard kill.
|
||||
defer tracker.Flush()
|
||||
|
||||
// Step 4: Get NNTP credentials and connect
|
||||
creds, err := u.getCredentials(dlCtx)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/fatih/color"
|
||||
"github.com/olekukonko/tablewriter"
|
||||
"github.com/olekukonko/tablewriter/tw"
|
||||
tc "github.com/torrentclaw/go-client"
|
||||
)
|
||||
|
||||
|
|
@ -21,6 +22,22 @@ var (
|
|||
boldColor = color.New(color.Bold)
|
||||
)
|
||||
|
||||
// newCleanTable creates a borderless, minimal table with left-aligned columns.
|
||||
func newCleanTable(w io.Writer) *tablewriter.Table {
|
||||
t := tablewriter.NewWriter(w)
|
||||
t.Configure(func(cfg *tablewriter.Config) {
|
||||
cfg.Header = tw.CellConfig{
|
||||
Alignment: tw.CellAlignment{Global: tw.AlignLeft},
|
||||
Padding: tw.CellPadding{Global: tw.Padding{Left: " ", Right: " "}},
|
||||
}
|
||||
cfg.Row = tw.CellConfig{
|
||||
Alignment: tw.CellAlignment{Global: tw.AlignLeft},
|
||||
Padding: tw.CellPadding{Global: tw.Padding{Left: " ", Right: " "}},
|
||||
}
|
||||
})
|
||||
return t
|
||||
}
|
||||
|
||||
// PrintSearchResults renders search results as a colored table.
|
||||
func PrintSearchResults(resp *tc.SearchResponse) {
|
||||
if len(resp.Results) == 0 {
|
||||
|
|
@ -55,16 +72,8 @@ func printSearchResultEntry(w io.Writer, r tc.SearchResult) {
|
|||
return
|
||||
}
|
||||
|
||||
table := tablewriter.NewWriter(w)
|
||||
table.SetHeader([]string{"", "Quality", "Size", "Seeds", "Source", "Codec", "Lang", "Score"})
|
||||
table.SetBorder(false)
|
||||
table.SetColumnSeparator("")
|
||||
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetCenterSeparator("")
|
||||
table.SetRowSeparator("")
|
||||
table.SetTablePadding(" ")
|
||||
table.SetNoWhiteSpace(true)
|
||||
table := newCleanTable(w)
|
||||
table.Header([]string{"", "Quality", "Size", "Seeds", "Source", "Codec", "Lang", "Score"})
|
||||
|
||||
for _, t := range r.Torrents {
|
||||
quality := StringOrDash(t.Quality)
|
||||
|
|
@ -96,16 +105,8 @@ func PrintPopularItems(items []tc.PopularItem) {
|
|||
headerColor.Println(" 🔥 Popular on unarr")
|
||||
fmt.Println()
|
||||
|
||||
table := tablewriter.NewWriter(os.Stdout)
|
||||
table.SetHeader([]string{"#", "Title", "Year", "Type", "IMDb", "Seeds"})
|
||||
table.SetBorder(false)
|
||||
table.SetColumnSeparator("")
|
||||
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetCenterSeparator("")
|
||||
table.SetRowSeparator("")
|
||||
table.SetTablePadding(" ")
|
||||
table.SetNoWhiteSpace(true)
|
||||
table := newCleanTable(os.Stdout)
|
||||
table.Header([]string{"#", "Title", "Year", "Type", "IMDb", "Seeds"})
|
||||
|
||||
for i, item := range items {
|
||||
table.Append([]string{
|
||||
|
|
@ -133,16 +134,8 @@ func PrintRecentItems(items []tc.RecentItem) {
|
|||
headerColor.Println(" 🆕 Recently Added")
|
||||
fmt.Println()
|
||||
|
||||
table := tablewriter.NewWriter(os.Stdout)
|
||||
table.SetHeader([]string{"#", "Title", "Year", "Type", "IMDb", "Added"})
|
||||
table.SetBorder(false)
|
||||
table.SetColumnSeparator("")
|
||||
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetCenterSeparator("")
|
||||
table.SetRowSeparator("")
|
||||
table.SetTablePadding(" ")
|
||||
table.SetNoWhiteSpace(true)
|
||||
table := newCleanTable(os.Stdout)
|
||||
table.Header([]string{"#", "Title", "Year", "Type", "IMDb", "Added"})
|
||||
|
||||
for i, item := range items {
|
||||
table.Append([]string{
|
||||
|
|
@ -186,16 +179,8 @@ func PrintStats(stats *tc.StatsResponse) {
|
|||
fmt.Println()
|
||||
boldColor.Println(" Recent Ingestions:")
|
||||
|
||||
table := tablewriter.NewWriter(os.Stdout)
|
||||
table.SetHeader([]string{"", "Source", "Status", "Fetched", "New", "Updated"})
|
||||
table.SetBorder(false)
|
||||
table.SetColumnSeparator("")
|
||||
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetCenterSeparator("")
|
||||
table.SetRowSeparator("")
|
||||
table.SetTablePadding(" ")
|
||||
table.SetNoWhiteSpace(true)
|
||||
table := newCleanTable(os.Stdout)
|
||||
table.Header([]string{"", "Source", "Status", "Fetched", "New", "Updated"})
|
||||
|
||||
for _, ing := range stats.RecentIngestions {
|
||||
status := ing.Status
|
||||
|
|
@ -294,16 +279,8 @@ func PrintInspect(title string, year string, torrents []tc.TorrentInfo, magnetUR
|
|||
if len(torrents) > 1 {
|
||||
dimColor.Printf(" + %d more torrents available\n\n", len(torrents)-1)
|
||||
|
||||
table := tablewriter.NewWriter(os.Stdout)
|
||||
table.SetHeader([]string{"", "Quality", "Size", "Seeds", "Source", "Score"})
|
||||
table.SetBorder(false)
|
||||
table.SetColumnSeparator("")
|
||||
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetCenterSeparator("")
|
||||
table.SetRowSeparator("")
|
||||
table.SetTablePadding(" ")
|
||||
table.SetNoWhiteSpace(true)
|
||||
table := newCleanTable(os.Stdout)
|
||||
table.Header([]string{"", "Quality", "Size", "Seeds", "Source", "Score"})
|
||||
|
||||
for i, tt := range torrents[1:] {
|
||||
score := ""
|
||||
|
|
@ -387,16 +364,8 @@ func PrintWatchProviders(title string, year string, providers *tc.WatchProviders
|
|||
if len(torrents) > 0 {
|
||||
headerColor.Println(" 🏴☠️ TORRENT:")
|
||||
|
||||
table := tablewriter.NewWriter(os.Stdout)
|
||||
table.SetHeader([]string{"", "Quality", "Size", "Seeds", "Source", "Score"})
|
||||
table.SetBorder(false)
|
||||
table.SetColumnSeparator("")
|
||||
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetAlignment(tablewriter.ALIGN_LEFT)
|
||||
table.SetCenterSeparator("")
|
||||
table.SetRowSeparator("")
|
||||
table.SetTablePadding(" ")
|
||||
table.SetNoWhiteSpace(true)
|
||||
table := newCleanTable(os.Stdout)
|
||||
table.Header([]string{"", "Quality", "Size", "Seeds", "Source", "Score"})
|
||||
|
||||
for _, t := range torrents {
|
||||
score := ""
|
||||
|
|
|
|||
|
|
@ -91,6 +91,9 @@ func (d *Downloader) DownloadFile(ctx context.Context, file nzb.File, fileIndex
|
|||
if _, statErr := os.Stat(destPath); statErr == nil && tracker.CompletedSegments(fileIndex) > 0 {
|
||||
// Partial file exists and we have progress — open for read-write (no truncate)
|
||||
outFile, err = os.OpenFile(destPath, os.O_RDWR, 0o644)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("open file for resume: %w", err)
|
||||
}
|
||||
resuming = true
|
||||
}
|
||||
}
|
||||
|
|
@ -105,10 +108,13 @@ func (d *Downloader) DownloadFile(ctx context.Context, file nzb.File, fileIndex
|
|||
if totalBytes > 0 {
|
||||
outFile.Truncate(totalBytes)
|
||||
}
|
||||
} else if err != nil {
|
||||
return "", fmt.Errorf("open file for resume: %w", err)
|
||||
}
|
||||
defer outFile.Close()
|
||||
defer func() {
|
||||
if err := outFile.Sync(); err != nil {
|
||||
log.Printf("[usenet] sync warning: %v", err)
|
||||
}
|
||||
outFile.Close()
|
||||
}()
|
||||
|
||||
// Download segments using worker pool
|
||||
var downloaded atomic.Int64
|
||||
|
|
@ -329,7 +335,10 @@ func (d *Downloader) DownloadNZB(ctx context.Context, n *nzb.NZB, outputDir stri
|
|||
default:
|
||||
}
|
||||
|
||||
fileIdx := nzbFileIndex[file.Subject]
|
||||
fileIdx, ok := nzbFileIndex[file.Subject]
|
||||
if !ok {
|
||||
fileIdx = -1 // unknown index — tracker will treat as no-op
|
||||
}
|
||||
|
||||
// Skip fully completed files
|
||||
if tracker != nil && tracker.IsFileDone(fileIdx) {
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ type ProgressTracker struct {
|
|||
dir string // directory where progress files are stored
|
||||
files []fileProgress
|
||||
|
||||
mu sync.Mutex
|
||||
mu sync.RWMutex
|
||||
dirty bool
|
||||
lastFlush time.Time
|
||||
markCount int // marks since last flush
|
||||
|
|
@ -185,12 +185,16 @@ func (p *ProgressTracker) MarkDone(fileIndex, segIndex int) {
|
|||
}
|
||||
|
||||
p.mu.Lock()
|
||||
fp.completed[segIndex/8] |= 1 << uint(segIndex%8)
|
||||
fp.doneCount.Add(1)
|
||||
p.dirty = true
|
||||
p.markCount++
|
||||
mask := byte(1 << uint(segIndex%8))
|
||||
alreadyDone := fp.completed[segIndex/8]&mask != 0
|
||||
if !alreadyDone {
|
||||
fp.completed[segIndex/8] |= mask
|
||||
fp.doneCount.Add(1)
|
||||
p.dirty = true
|
||||
p.markCount++
|
||||
}
|
||||
|
||||
shouldFlush := p.markCount >= flushSegmentFreq || time.Since(p.lastFlush) >= flushInterval
|
||||
shouldFlush := !alreadyDone && (p.markCount >= flushSegmentFreq || time.Since(p.lastFlush) >= flushInterval)
|
||||
p.mu.Unlock()
|
||||
|
||||
if shouldFlush {
|
||||
|
|
@ -207,10 +211,10 @@ func (p *ProgressTracker) IsDone(fileIndex, segIndex int) bool {
|
|||
if segIndex < 0 || segIndex >= fp.segCount {
|
||||
return false
|
||||
}
|
||||
// Read without lock — single-byte read is atomic on aligned data,
|
||||
// and we only ever set bits (never clear), so a stale read just means
|
||||
// we might re-download a segment (harmless, WriteAt is idempotent).
|
||||
return fp.completed[segIndex/8]&(1<<uint(segIndex%8)) != 0
|
||||
p.mu.RLock()
|
||||
done := fp.completed[segIndex/8]&(1<<uint(segIndex%8)) != 0
|
||||
p.mu.RUnlock()
|
||||
return done
|
||||
}
|
||||
|
||||
// IsFileDone returns true if all segments of a file are completed.
|
||||
|
|
@ -294,16 +298,25 @@ func (p *ProgressTracker) Flush() error {
|
|||
|
||||
// Atomic write: tmp file + rename
|
||||
if err := os.MkdirAll(p.dir, 0o755); err != nil {
|
||||
p.mu.Lock()
|
||||
p.dirty = true // re-mark so next Flush retries
|
||||
p.mu.Unlock()
|
||||
return fmt.Errorf("create resume dir: %w", err)
|
||||
}
|
||||
|
||||
tmpPath := p.progressPath() + ".tmp"
|
||||
if err := os.WriteFile(tmpPath, buf, 0o644); err != nil {
|
||||
p.mu.Lock()
|
||||
p.dirty = true
|
||||
p.mu.Unlock()
|
||||
return fmt.Errorf("write progress tmp: %w", err)
|
||||
}
|
||||
|
||||
if err := os.Rename(tmpPath, p.progressPath()); err != nil {
|
||||
os.Remove(tmpPath)
|
||||
p.mu.Lock()
|
||||
p.dirty = true
|
||||
p.mu.Unlock()
|
||||
return fmt.Errorf("rename progress: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue