fix(usenet): fsync delivered files before reporting complete
Symmetric hardening with the debrid fix (2026-06-15 NFS write-back race): the prod download dir is a network mount and usenet post-processing reads files back for par2 from the page cache while the write-back to the server can still lag. A later open (organize/stream/ffprobe) would then see a short file. fsync the delivered file (or every regular file in a multi-file release directory) and surface any write-back error before returning. Also report the real content size for directory deliveries (walk instead of the dir inode size) and fail an empty delivery. Add syncTree/syncFile + tests.
This commit is contained in:
parent
f3c9648bce
commit
271413e0f9
2 changed files with 103 additions and 2 deletions
|
|
@ -288,10 +288,28 @@ func (u *UsenetDownloader) Download(ctx context.Context, task *Task, outputDir s
|
||||||
finalPath = taskDir
|
finalPath = taskDir
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get final file size
|
// Force the delivered file(s) to durable storage before reporting success.
|
||||||
|
// Symmetric with the debrid path (2026-06-15 NFS incident): the prod download
|
||||||
|
// dir is a network mount, and post-processing reads the data back for par2 from
|
||||||
|
// the page cache while the write-back to the server can still lag — a later open
|
||||||
|
// (organize, stream, ffprobe) would then see a short file. fsync commits it now
|
||||||
|
// and surfaces a write-back error here, where it's actionable.
|
||||||
|
if err := syncTree(finalPath); err != nil {
|
||||||
|
return nil, fmt.Errorf("flush to disk failed (write-back/network-mount error): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get final file size — after the durable flush, so the size is real. Walk
|
||||||
|
// directories (multi-file releases) instead of reporting the dir inode size.
|
||||||
var finalSize int64
|
var finalSize int64
|
||||||
if fi, err := os.Stat(finalPath); err == nil {
|
if fi, err := os.Stat(finalPath); err == nil {
|
||||||
finalSize = fi.Size()
|
if fi.IsDir() {
|
||||||
|
finalSize, _ = dirSize(finalPath)
|
||||||
|
} else {
|
||||||
|
finalSize = fi.Size()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if finalSize == 0 {
|
||||||
|
return nil, fmt.Errorf("usenet delivery is empty after post-processing: %s", finalPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up resume state on successful completion
|
// Clean up resume state on successful completion
|
||||||
|
|
@ -497,3 +515,42 @@ func sanitizeDir(name string) string {
|
||||||
}
|
}
|
||||||
return name
|
return name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// syncTree fsyncs path so its data is durable before the download is treated as
|
||||||
|
// complete. For a directory (multi-file release) it fsyncs every regular file
|
||||||
|
// underneath. A Sync error is returned, not swallowed — on a network mount a
|
||||||
|
// failed write-back must fail the download instead of leaving a truncated file.
|
||||||
|
func syncTree(path string) error {
|
||||||
|
fi, err := os.Stat(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !fi.IsDir() {
|
||||||
|
return syncFile(path)
|
||||||
|
}
|
||||||
|
return filepath.Walk(path, func(p string, info os.FileInfo, walkErr error) error {
|
||||||
|
if walkErr != nil {
|
||||||
|
return walkErr
|
||||||
|
}
|
||||||
|
if info.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return syncFile(p)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncFile flushes a single file's dirty pages to durable storage. fsync flushes
|
||||||
|
// the inode's cached writes regardless of the (read-only) open mode, so it commits
|
||||||
|
// data the post-processing library wrote and already closed.
|
||||||
|
func syncFile(path string) error {
|
||||||
|
f, err := os.Open(path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
syncErr := f.Sync()
|
||||||
|
closeErr := f.Close()
|
||||||
|
if syncErr != nil {
|
||||||
|
return syncErr
|
||||||
|
}
|
||||||
|
return closeErr
|
||||||
|
}
|
||||||
|
|
|
||||||
44
internal/engine/usenet_sync_test.go
Normal file
44
internal/engine/usenet_sync_test.go
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
package engine
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestSyncTreeFile fsyncs a single delivered file without error.
|
||||||
|
func TestSyncTreeFile(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
p := filepath.Join(dir, "movie.mkv")
|
||||||
|
if err := os.WriteFile(p, []byte("payload"), 0o644); err != nil {
|
||||||
|
t.Fatalf("write: %v", err)
|
||||||
|
}
|
||||||
|
if err := syncTree(p); err != nil {
|
||||||
|
t.Errorf("syncTree(file) = %v, want nil", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSyncTreeDir fsyncs every regular file in a multi-file release directory,
|
||||||
|
// skipping subdirectories, without error.
|
||||||
|
func TestSyncTreeDir(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
if err := os.MkdirAll(filepath.Join(dir, "sub"), 0o755); err != nil {
|
||||||
|
t.Fatalf("mkdir: %v", err)
|
||||||
|
}
|
||||||
|
for _, name := range []string{"e01.mkv", "e02.mkv", "sub/e03.mkv"} {
|
||||||
|
if err := os.WriteFile(filepath.Join(dir, name), []byte("x"), 0o644); err != nil {
|
||||||
|
t.Fatalf("write %s: %v", name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := syncTree(dir); err != nil {
|
||||||
|
t.Errorf("syncTree(dir) = %v, want nil", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSyncTreeMissing surfaces a stat error for a path that does not exist
|
||||||
|
// (a failed write-back must fail the download, not be swallowed).
|
||||||
|
func TestSyncTreeMissing(t *testing.T) {
|
||||||
|
if err := syncTree(filepath.Join(t.TempDir(), "nope.mkv")); err == nil {
|
||||||
|
t.Error("syncTree(missing) = nil, want error")
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue