feat(hls): persistent fMP4 segment cache + integrity + stats (0.9.7)
Cache keyed by sha256(absPath|quality|audioIdx)[:8] with .complete marker; LRU + size-budget eviction; per-key writer-lock; pinned during play; startup orphan reap; integrity verify on HIT; subtitle-completeness gate; hit/miss counters + daily log line. New [downloads.hls_cache] block in config.toml (enabled/size_gb/dir, default 5GB). Smoke test: 2nd play of same source+quality is 23-31× faster (HIT path skips ffmpeg entirely).
This commit is contained in:
parent
834c58c25a
commit
7e96976257
10 changed files with 1295 additions and 9 deletions
33
CHANGELOG.md
33
CHANGELOG.md
|
|
@ -5,6 +5,39 @@ All notable changes to this project will be documented in this file.
|
|||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [0.9.7] - 2026-05-26
|
||||
|
||||
### Added
|
||||
|
||||
- **hls cache**: persistent fMP4 segment cache keyed by
|
||||
`(source, quality, audio_index)`. After a successful encode the segments
|
||||
+ `init.mp4` are kept under `~/.cache/unarr/hls-cache/{key}/` with a
|
||||
`.complete` marker. A second play of the same file at the same quality
|
||||
skips ffmpeg entirely (smoke-tested 23–31× faster than re-encode). LRU
|
||||
+ size-budget eviction; pinned during active play; per-key writer-lock
|
||||
prevents two concurrent encodes from corrupting each other. Startup
|
||||
reaps orphan dirs without `.complete` older than 10 min so a daemon
|
||||
crash doesn't leak disk indefinitely. New `[downloads.hls_cache]` block
|
||||
in `config.toml`: `enabled` (default true), `size_gb` (default 5,
|
||||
min 1), `dir` (default `~/.cache/unarr/hls-cache`).
|
||||
- **hls cache integrity check**: on HIT, the daemon stats `init.mp4` +
|
||||
last segment before reporting cache reuse — if a file was externally
|
||||
deleted, the entry is invalidated and re-encoded transparently.
|
||||
- **hls cache stats**: hit/miss counters surface via `cache.Stats()`
|
||||
(`Hits`, `Misses`, `EntryCount`, `TotalBytes`) and the sweeper logs a
|
||||
daily summary line `[hls_cache] day-stats: hits=N misses=M ratio=X%
|
||||
entries=Y size=ZMB`.
|
||||
- **subtitle integrity for cached replay**: `Close` waits up to 15 s for
|
||||
the subtitle extractor goroutine before sealing `.complete` so a HIT
|
||||
never serves half-written `.vtt` files. Timeout invalidates instead of
|
||||
sealing.
|
||||
|
||||
### Changed
|
||||
|
||||
- `[daemon] auto_upgrade` now appears in fresh `config.toml` files as
|
||||
`true` (it was always the implicit default; this just makes it visible
|
||||
in default-generated configs).
|
||||
|
||||
## [0.9.6] - 2026-05-26
|
||||
|
||||
### Added
|
||||
|
|
|
|||
87
README.md
87
README.md
|
|
@ -343,6 +343,58 @@ unarr self-update --force # reinstall even if up to date
|
|||
|
||||
`unarr doctor` checks: config file, API key, server connectivity (with latency), agent registration, download directory, disk space, and version.
|
||||
|
||||
### Updating unarr
|
||||
|
||||
unarr supports three update paths. Pick whichever fits your workflow.
|
||||
|
||||
**1. Manual self-update (always available).**
|
||||
|
||||
```bash
|
||||
unarr self-update # interactive update to latest
|
||||
unarr self-update --force # reinstall same version
|
||||
unarr self-update --allow-unsigned # accept releases without checksum signature
|
||||
```
|
||||
|
||||
The CLI downloads the new release archive over HTTPS (from
|
||||
`torrentclaw.com/releases/download/v<ver>/`), verifies SHA-256, swaps the
|
||||
binary in place (`.backup` kept next to it), and restarts the systemd
|
||||
user unit if the daemon is running.
|
||||
|
||||
**2. Auto-apply on server signal (default, since 0.9.6).**
|
||||
|
||||
When you press **"Force update now"** on the web (Settings → Agent → Force
|
||||
update), the server sets a flag your daemon polls every sync (~3 s). On
|
||||
the next sync the daemon downloads the new binary, replaces itself, and
|
||||
exits — `systemd Restart=always` respawns on the new version. No SSH, no
|
||||
terminal access required. Works headless on NAS / Docker.
|
||||
|
||||
The button shows an amber warning if your agent is below 0.9.6 (older
|
||||
daemons see the signal but only log "run unarr update" — the operator
|
||||
must run the command manually that one time).
|
||||
|
||||
**Opt out of auto-apply.** Some users prefer reviewing CHANGELOG before
|
||||
applying. Disable in `config.toml`:
|
||||
|
||||
```toml
|
||||
[daemon]
|
||||
auto_upgrade = false
|
||||
```
|
||||
|
||||
With `auto_upgrade = false`, pressing the web button still flags your
|
||||
agent (so the daemon logs the new version on next sync), but the daemon
|
||||
will not download / replace anything — you run `unarr self-update` when
|
||||
you're ready.
|
||||
|
||||
**3. Docker auto-restart with a new tag.**
|
||||
|
||||
```bash
|
||||
docker pull torrentclaw/unarr:latest
|
||||
docker compose up -d
|
||||
```
|
||||
|
||||
Tags published: `latest`, `0.9`, `0.9.7`, ... — pin to a minor (`0.9`)
|
||||
for opt-in patch updates without surprises.
|
||||
|
||||
## Clean
|
||||
|
||||
Remove temporary files, logs, resume data, and other artifacts generated by unarr. Shows what will be removed and asks for confirmation before deleting.
|
||||
|
|
@ -424,6 +476,7 @@ tv_shows_dir = "~/Media/TV Shows"
|
|||
[daemon]
|
||||
poll_interval = "30s"
|
||||
heartbeat_interval = "30s"
|
||||
auto_upgrade = true # apply server-flagged upgrades in-place (since 0.9.6)
|
||||
|
||||
[notifications]
|
||||
enabled = true
|
||||
|
|
@ -466,6 +519,40 @@ If `transcode.enabled = true` but `ffmpeg` / `ffprobe` aren't on PATH, the
|
|||
daemon logs a warning at startup and HLS sessions are rejected at runtime
|
||||
with a clear error — install ffmpeg or set `enabled = false`.
|
||||
|
||||
#### `[downloads.hls_cache]` — persistent HLS segment cache
|
||||
|
||||
```toml
|
||||
[downloads.hls_cache]
|
||||
enabled = true # on by default
|
||||
size_gb = 5 # disk budget; LRU eviction once exceeded
|
||||
dir = "" # custom path; empty = ~/.cache/unarr/hls-cache
|
||||
```
|
||||
|
||||
| Key | Type | Default | Notes |
|
||||
|-----|------|---------|-------|
|
||||
| `enabled` | bool | `true` | Persists finished HLS encodes per `(source, quality, audio_index)`. A second play of the same file at the same quality reuses the segments — no ffmpeg, near-zero CPU, instant playback. Set to `false` to delete segments on session close (original behavior). |
|
||||
| `size_gb` | int | `5` | Cache budget in gigabytes. When exceeded the LRU sweeper evicts the least-recently-used cached encodes hourly. Minimum 1 GB (smaller values are clamped up). |
|
||||
| `dir` | string | `""` | Custom storage path. Empty defaults to `~/.cache/unarr/hls-cache` (Linux/macOS) or the user cache dir (Windows). |
|
||||
|
||||
**What it does.** First play encodes normally (ffmpeg writes segments).
|
||||
On session close, if every segment is on disk and ffmpeg exited cleanly,
|
||||
the directory is sealed with a `.complete` marker and kept. Next time the
|
||||
same source + quality combo is requested, the daemon serves segments
|
||||
straight from disk — no transcode, no warm-up, no CPU cost.
|
||||
|
||||
**Why per (source, quality, audio).** Renaming the file or switching
|
||||
quality invalidates the entry: the segments are tied to the exact source
|
||||
bytes and the exact ffmpeg parameters. Re-encoding generates a new key.
|
||||
|
||||
**Eviction.** A background goroutine wakes every hour. If total cache size
|
||||
exceeds `size_gb`, it deletes the oldest entries (by mtime) until under
|
||||
budget. Active sessions are pinned — they never get evicted mid-play.
|
||||
|
||||
**Disable.** Either edit the TOML to set `enabled = false`, or remove the
|
||||
cache directory manually (it'll be recreated as needed). Disabling does
|
||||
not delete existing cached segments — drop `dir` (or `~/.cache/unarr/hls-cache`)
|
||||
to reclaim the space.
|
||||
|
||||
#### `[downloads.vpn]`
|
||||
|
||||
| Key | Type | Default | Notes |
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ type DaemonConfig struct {
|
|||
ScanPaths []string // configured scan paths for file deletion validation
|
||||
HWAccel string // detected encoder backend ("nvenc"/"qsv"/"vaapi"/"videotoolbox"/"none")
|
||||
MaxTranscodeHeight int // resolution cap the agent can transcode comfortably (px)
|
||||
AutoUpgrade bool // honor server-flagged upgrades by downloading + restarting (default: true)
|
||||
}
|
||||
|
||||
// Daemon manages agent registration and the sync loop.
|
||||
|
|
@ -237,6 +238,10 @@ func (d *Daemon) Run(ctx context.Context) error {
|
|||
return
|
||||
}
|
||||
d.lastNotifiedVersion = version
|
||||
if !d.cfg.AutoUpgrade {
|
||||
log.Printf("[upgrade] new version available: %s — auto_upgrade=false, run `unarr update` to apply", version)
|
||||
return
|
||||
}
|
||||
log.Printf("[upgrade] new version available: %s — applying auto-upgrade", version)
|
||||
go d.applyAutoUpgrade(version)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -162,6 +162,7 @@ func runDaemonStart() error {
|
|||
ScanPaths: library.ResolveScanPaths(cfg.Download.Dir, cfg.Organize.MoviesDir, cfg.Organize.TVShowsDir, cfg.Library.ScanPath),
|
||||
HWAccel: string(hwAccelPick),
|
||||
MaxTranscodeHeight: maxTranscodeHeight,
|
||||
AutoUpgrade: cfg.Daemon.AutoUpgradeEnabled(),
|
||||
}
|
||||
|
||||
// Create HTTP client with mirror failover so a `.com` block-out rolls
|
||||
|
|
@ -299,6 +300,32 @@ func runDaemonStart() error {
|
|||
if err := engine.CleanupHLSOrphanDirs(); err != nil {
|
||||
log.Printf("[hls] orphan tmpdir cleanup: %v", err)
|
||||
}
|
||||
|
||||
// Persistent HLS segment cache — survives across sessions so re-plays
|
||||
// of the same file at the same quality skip ffmpeg entirely. Off when
|
||||
// hls_cache.enabled = false; size cap from hls_cache.size_gb; path from
|
||||
// hls_cache.dir (defaults to ~/.cache/unarr/hls-cache).
|
||||
var hlsCache *engine.HLSCache
|
||||
if cfg.Download.HLSCache.Enabled {
|
||||
cacheDir := cfg.Download.HLSCache.Dir
|
||||
if cacheDir == "" {
|
||||
if base, err := os.UserCacheDir(); err == nil {
|
||||
cacheDir = filepath.Join(base, "unarr", "hls-cache")
|
||||
} else {
|
||||
cacheDir = filepath.Join(os.TempDir(), "unarr-hls-cache")
|
||||
}
|
||||
}
|
||||
c, err := engine.NewHLSCache(cacheDir, cfg.Download.HLSCache.SizeGB)
|
||||
if err != nil {
|
||||
log.Printf("[hls_cache] init failed (%v) — falling back to per-session tmpdirs", err)
|
||||
} else {
|
||||
hlsCache = c
|
||||
hlsCache.StartSweeper(ctx, time.Hour)
|
||||
log.Printf("[hls_cache] enabled: dir=%s budget=%dGB", cacheDir, cfg.Download.HLSCache.SizeGB)
|
||||
}
|
||||
} else {
|
||||
log.Printf("[hls_cache] disabled by config — every play re-encodes from scratch")
|
||||
}
|
||||
if err := streamSrv.Listen(ctx); err != nil {
|
||||
return fmt.Errorf("start stream server: %w", err)
|
||||
}
|
||||
|
|
@ -543,6 +570,7 @@ func runDaemonStart() error {
|
|||
Quality: sess.Quality,
|
||||
AudioIndex: sess.AudioIndex,
|
||||
Transcode: tcRuntime,
|
||||
Cache: hlsCache,
|
||||
}
|
||||
hsess, err := engine.StartHLSSession(hlsCtx, hlsCfg)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
package cmd
|
||||
|
||||
// Version is the CLI version. Overridden by goreleaser ldflags at release time.
|
||||
var Version = "0.9.6"
|
||||
var Version = "0.9.7"
|
||||
|
|
|
|||
|
|
@ -52,10 +52,22 @@ type DownloadConfig struct {
|
|||
EnableUPnP bool `toml:"enable_upnp"` // map StreamPort to the WAN via UPnP/NAT-PMP (default: false; opt-in because it exposes the unauthenticated /stream + /hls endpoints to the public internet)
|
||||
CORSExtraOrigins []string `toml:"cors_extra_origins"` // extra browser origins added on top of the baked-in allowlist (torrentclaw.com, app.torrentclaw.com, localhost:3030)
|
||||
Transcode TranscodeConfig `toml:"transcode"`
|
||||
HLSCache HLSCacheConfig `toml:"hls_cache"`
|
||||
VPN VPNConfig `toml:"vpn"`
|
||||
Funnel FunnelConfig `toml:"funnel"`
|
||||
}
|
||||
|
||||
// HLSCacheConfig controls the persistent HLS segment cache. A completed encode
|
||||
// is kept on disk so a second play of the same file at the same quality skips
|
||||
// ffmpeg entirely. Old entries are evicted (LRU) once the cache exceeds the
|
||||
// size budget. Enabled by default — disable to save disk space at the cost of
|
||||
// re-encoding every play.
|
||||
type HLSCacheConfig struct {
|
||||
Enabled bool `toml:"enabled"` // default: true
|
||||
SizeGB int `toml:"size_gb"` // size budget in gigabytes; default: 5; minimum: 1
|
||||
Dir string `toml:"dir"` // override storage path; default: ~/.cache/unarr/hls-cache
|
||||
}
|
||||
|
||||
// FunnelConfig gates the optional CloudFlare Quick Tunnel that exposes the
|
||||
// daemon's HLS server over a public HTTPS hostname (https://<random>.try
|
||||
// cloudflare.com). Enabling it lets the web player on torrentclaw.com play
|
||||
|
|
@ -101,8 +113,27 @@ type OrganizeConfig struct {
|
|||
|
||||
type DaemonConfig struct {
|
||||
StatusInterval string `toml:"status_interval"`
|
||||
// AutoUpgrade gates the daemon's response to a server-flagged upgrade
|
||||
// (set via the "Force update" button on the web). When true the daemon
|
||||
// downloads + replaces the binary in-place and exits so the service
|
||||
// supervisor respawns on the new version. When false the daemon only
|
||||
// logs "new version available" and the operator must run `unarr update`
|
||||
// manually. Default: true. Available since unarr 0.9.6.
|
||||
AutoUpgrade *bool `toml:"auto_upgrade"`
|
||||
}
|
||||
|
||||
// AutoUpgradeEnabled returns the resolved AutoUpgrade flag — defaults to true
|
||||
// when the user has not set it explicitly. Pointer-vs-bool because Go's
|
||||
// zero-value bool would collapse "unset" and "false" together.
|
||||
func (d DaemonConfig) AutoUpgradeEnabled() bool {
|
||||
if d.AutoUpgrade == nil {
|
||||
return true
|
||||
}
|
||||
return *d.AutoUpgrade
|
||||
}
|
||||
|
||||
func boolPtr(v bool) *bool { return &v }
|
||||
|
||||
type NotificationsConfig struct {
|
||||
Enabled bool `toml:"enabled"`
|
||||
}
|
||||
|
|
@ -156,11 +187,24 @@ func Default() Config {
|
|||
// `unarr funnel off` (sets enabled=false in the TOML).
|
||||
Enabled: true,
|
||||
},
|
||||
HLSCache: HLSCacheConfig{
|
||||
// On by default — second play of a recently watched file at the
|
||||
// same quality skips ffmpeg (instant start, near-zero CPU).
|
||||
// Users can opt out (hls_cache.enabled=false) or shrink the
|
||||
// budget (hls_cache.size_gb) when disk is tight.
|
||||
Enabled: true,
|
||||
SizeGB: 5,
|
||||
},
|
||||
},
|
||||
Daemon: DaemonConfig{
|
||||
// Pointer-to-true so Default() round-trips through TOML marshal
|
||||
// as `auto_upgrade = true` instead of an omitted key — keeps the
|
||||
// freshly-written config aligned with what README documents.
|
||||
AutoUpgrade: boolPtr(true),
|
||||
},
|
||||
Organize: OrganizeConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
Daemon: DaemonConfig{},
|
||||
Notifications: NotificationsConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
|
|
|
|||
|
|
@ -100,6 +100,11 @@ type HLSSessionConfig struct {
|
|||
Quality string // "2160p"|"1080p"|"720p"|"480p"|"original"|""
|
||||
AudioIndex int // 0-based ffmpeg audio stream selection (-map 0:a:N). -1 = default.
|
||||
Transcode TranscodeRuntime
|
||||
// Cache is an optional persistent segment cache keyed by (source, quality,
|
||||
// audio). When set, completed encodes are kept across sessions so re-plays
|
||||
// of the same file at the same quality skip ffmpeg entirely. nil disables
|
||||
// caching (per-session tmpdir, deleted on Close — original behavior).
|
||||
Cache *HLSCache
|
||||
}
|
||||
|
||||
// HLSSession owns a tmpdir + ffmpeg subprocess producing HLS fragments.
|
||||
|
|
@ -139,6 +144,19 @@ type HLSSession struct {
|
|||
exitErr error
|
||||
exited bool
|
||||
readyCh chan struct{} // closed + replaced each time readyMax advances
|
||||
|
||||
// Persistent cache state. cache==nil means caching disabled for this session.
|
||||
// fromCache=true means the session is replaying a completed encode and no
|
||||
// ffmpeg subprocess was spawned. writerLockHeld=true means this session
|
||||
// owns the per-key TryAcquireWriter claim — Close must ReleaseWriter.
|
||||
// subsDone closes when the subtitle extractor goroutine returns (or is
|
||||
// nil when the source had no subtitle tracks); MarkComplete waits on it
|
||||
// so a HIT replay never serves partial .vtt files.
|
||||
cache *HLSCache
|
||||
cacheKey string
|
||||
fromCache bool
|
||||
writerLockHeld bool
|
||||
subsDone chan struct{}
|
||||
}
|
||||
|
||||
// hlsSeekAhead is how many segments past the writer's current position the
|
||||
|
|
@ -263,11 +281,77 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er
|
|||
return nil, errors.New("hls: source has no duration")
|
||||
}
|
||||
|
||||
tmpDir := filepath.Join(hlsTmpDirRoot(), cfg.SessionID)
|
||||
// Resolve tmpDir + cache placement. Three states:
|
||||
// 1. cache disabled → per-session tmpdir, deleted on Close.
|
||||
// 2. cache HIT (.complete found) → read from cache dir, no ffmpeg, Pin.
|
||||
// 3. cache MISS, writer-lock OK → ffmpeg writes to cache dir, Pin + writer-lock.
|
||||
// 4. cache MISS, writer-lock NO → another session already writing this
|
||||
// key; fall back to private per-session tmpdir
|
||||
// (no caching for this session — second-writer
|
||||
// would corrupt the first one's segments).
|
||||
var (
|
||||
tmpDir string
|
||||
cacheKey string
|
||||
fromCache bool
|
||||
writerLockHeld bool
|
||||
)
|
||||
if cfg.Cache != nil {
|
||||
cacheKey = cfg.Cache.KeyFor(cfg.SourcePath, cfg.Quality, cfg.AudioIndex)
|
||||
// Integrity gate: HasComplete just stats the marker. If init.mp4 or
|
||||
// the last segment vanished (external rm, partial-disk failure), we
|
||||
// can't actually serve a HIT — drop the dir and re-encode.
|
||||
segCountForVerify := int((probe.DurationSec + float64(hlsSegmentDuration) - 1) / float64(hlsSegmentDuration))
|
||||
if segCountForVerify < 1 {
|
||||
segCountForVerify = 1
|
||||
}
|
||||
if cfg.Cache.HasComplete(cacheKey) && !cfg.Cache.VerifyComplete(cacheKey, segCountForVerify) {
|
||||
log.Printf("[hls %s] cache %s sealed but failed integrity check — re-encoding",
|
||||
shortHLSID(cfg.SessionID), cacheKey)
|
||||
_ = cfg.Cache.Invalidate(cacheKey)
|
||||
}
|
||||
if cfg.Cache.HasComplete(cacheKey) {
|
||||
// HIT: read-only replay — many concurrent HITs are fine.
|
||||
tmpDir = cfg.Cache.DirFor(cacheKey)
|
||||
cfg.Cache.Pin(cacheKey)
|
||||
fromCache = true
|
||||
cfg.Cache.RecordHit()
|
||||
_ = cfg.Cache.Touch(cacheKey)
|
||||
} else if cfg.Cache.TryAcquireWriter(cacheKey) {
|
||||
tmpDir = cfg.Cache.DirFor(cacheKey)
|
||||
cfg.Cache.Pin(cacheKey)
|
||||
writerLockHeld = true
|
||||
cfg.Cache.RecordMiss()
|
||||
} else {
|
||||
// Another session is writing this key — fall back to private
|
||||
// dir so we don't trample its segments.
|
||||
log.Printf("[hls %s] cache key %s busy, falling back to per-session tmpdir",
|
||||
shortHLSID(cfg.SessionID), cacheKey)
|
||||
tmpDir = filepath.Join(hlsTmpDirRoot(), cfg.SessionID)
|
||||
cacheKey = "" // disable caching for this session
|
||||
cfg.Cache.RecordMiss()
|
||||
}
|
||||
} else {
|
||||
tmpDir = filepath.Join(hlsTmpDirRoot(), cfg.SessionID)
|
||||
}
|
||||
|
||||
cleanupOnError := func() {
|
||||
if cfg.Cache != nil && cacheKey != "" {
|
||||
cfg.Cache.Unpin(cacheKey)
|
||||
if writerLockHeld {
|
||||
cfg.Cache.ReleaseWriter(cacheKey)
|
||||
_ = cfg.Cache.Invalidate(cacheKey)
|
||||
}
|
||||
} else {
|
||||
_ = os.RemoveAll(tmpDir)
|
||||
}
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Join(tmpDir, "video"), 0o755); err != nil {
|
||||
cleanupOnError()
|
||||
return nil, fmt.Errorf("hls: mkdir video: %w", err)
|
||||
}
|
||||
if err := os.MkdirAll(filepath.Join(tmpDir, "subs"), 0o755); err != nil {
|
||||
cleanupOnError()
|
||||
return nil, fmt.Errorf("hls: mkdir subs: %w", err)
|
||||
}
|
||||
|
||||
|
|
@ -285,10 +369,30 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er
|
|||
startedAt: time.Now(),
|
||||
lastTouch: time.Now(),
|
||||
readyCh: make(chan struct{}),
|
||||
cache: cfg.Cache,
|
||||
cacheKey: cacheKey,
|
||||
fromCache: fromCache,
|
||||
writerLockHeld: writerLockHeld,
|
||||
}
|
||||
s.manifestVideo = renderVideoPlaylist(probe.DurationSec, segCount)
|
||||
s.manifestRoot = renderMasterPlaylist(probe, cfg.Quality)
|
||||
|
||||
// Cache HIT: every segment + init.mp4 is already on disk. Skip ffmpeg
|
||||
// entirely and mark readyMax so handlers don't wait. Background subtitle
|
||||
// extraction is also unnecessary — subs were extracted on the original run.
|
||||
if fromCache {
|
||||
s.readyMu.Lock()
|
||||
s.readyMax = segCount - 1
|
||||
s.exited = true
|
||||
close(s.readyCh)
|
||||
s.readyCh = nil
|
||||
s.readyMu.Unlock()
|
||||
log.Printf("[hls %s] cache HIT %s: %s, %.1fs, %d segs (quality=%s)",
|
||||
shortHLSID(cfg.SessionID), cacheKey, filepath.Base(cfg.SourcePath),
|
||||
probe.DurationSec, segCount, coalesce(cfg.Quality, "auto"))
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Spawn ffmpeg under a dedicated context so Close() can kill it without
|
||||
// touching the parent ctx.
|
||||
ffCtx, cancel := context.WithCancel(context.Background())
|
||||
|
|
@ -298,7 +402,7 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er
|
|||
cmd.Stderr = &hlsStderrCapture{owner: s}
|
||||
if err := cmd.Start(); err != nil {
|
||||
cancel()
|
||||
_ = os.RemoveAll(tmpDir)
|
||||
cleanupOnError()
|
||||
return nil, fmt.Errorf("hls: start ffmpeg: %w", err)
|
||||
}
|
||||
s.cmd = cmd
|
||||
|
|
@ -307,12 +411,20 @@ func StartHLSSession(ctx context.Context, cfg HLSSessionConfig) (*HLSSession, er
|
|||
go s.pollSegments(ffCtx)
|
||||
|
||||
if len(probe.SubtitleTracks) > 0 {
|
||||
go s.extractSubtitles(ffCtx)
|
||||
s.subsDone = make(chan struct{})
|
||||
go func() {
|
||||
defer close(s.subsDone)
|
||||
s.extractSubtitles(ffCtx)
|
||||
}()
|
||||
}
|
||||
|
||||
log.Printf("[hls %s] started: %s, %.1fs, %d segs (quality=%s)",
|
||||
cachedNote := ""
|
||||
if cfg.Cache != nil {
|
||||
cachedNote = fmt.Sprintf(" (cache-miss %s)", cacheKey)
|
||||
}
|
||||
log.Printf("[hls %s] started: %s, %.1fs, %d segs (quality=%s)%s",
|
||||
shortHLSID(cfg.SessionID), filepath.Base(cfg.SourcePath),
|
||||
probe.DurationSec, segCount, coalesce(cfg.Quality, "auto"))
|
||||
probe.DurationSec, segCount, coalesce(cfg.Quality, "auto"), cachedNote)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
|
@ -385,8 +497,15 @@ func (s *HLSSession) Touch() {
|
|||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// Close stops ffmpeg, deletes the tmpdir, and prevents further requests from
|
||||
// blocking on segment readiness. Idempotent.
|
||||
// Close stops ffmpeg and prevents further requests from blocking on segment
|
||||
// readiness. Idempotent.
|
||||
//
|
||||
// Disk lifecycle:
|
||||
// - cache disabled → delete tmpDir (original behavior).
|
||||
// - cache enabled + this session was a HIT → keep dir, just unpin.
|
||||
// - cache enabled + this was a write session → if ffmpeg exited cleanly and
|
||||
// every segment is on disk, persist with .complete and keep dir. Otherwise
|
||||
// drop the dir so a half-written cache doesn't survive into the next play.
|
||||
func (s *HLSSession) Close() error {
|
||||
s.mu.Lock()
|
||||
if s.closed {
|
||||
|
|
@ -407,7 +526,47 @@ func (s *HLSSession) Close() error {
|
|||
s.readyCh = nil
|
||||
}
|
||||
s.exited = true
|
||||
exitErr := s.exitErr
|
||||
s.readyMu.Unlock()
|
||||
|
||||
if s.cache != nil && s.cacheKey != "" {
|
||||
defer s.cache.Unpin(s.cacheKey)
|
||||
if s.writerLockHeld {
|
||||
defer s.cache.ReleaseWriter(s.cacheKey)
|
||||
}
|
||||
if s.fromCache {
|
||||
log.Printf("[hls %s] closed (cache reuse)", shortHLSID(s.cfg.SessionID))
|
||||
return nil
|
||||
}
|
||||
// Wait briefly for the subtitle extractor to finish so a cached
|
||||
// replay never serves half-written .vtt files. Bounded so a stuck
|
||||
// extractor can't block Close indefinitely; on timeout we treat
|
||||
// the cache as incomplete and drop it.
|
||||
subsOK := true
|
||||
if s.subsDone != nil {
|
||||
select {
|
||||
case <-s.subsDone:
|
||||
case <-time.After(15 * time.Second):
|
||||
log.Printf("[hls %s] subtitle extractor timeout — not caching", shortHLSID(s.cfg.SessionID))
|
||||
subsOK = false
|
||||
}
|
||||
}
|
||||
if subsOK && exitErr == nil && s.allSegmentsPresent() {
|
||||
if err := s.cache.MarkComplete(s.cacheKey); err == nil {
|
||||
log.Printf("[hls %s] cache persisted %s", shortHLSID(s.cfg.SessionID), s.cacheKey)
|
||||
return nil
|
||||
} else {
|
||||
log.Printf("[hls %s] cache persist failed: %v", shortHLSID(s.cfg.SessionID), err)
|
||||
}
|
||||
}
|
||||
// Partial / failed → drop so we re-encode next time.
|
||||
if err := s.cache.Invalidate(s.cacheKey); err != nil {
|
||||
log.Printf("[hls %s] cache invalidate failed: %v", shortHLSID(s.cfg.SessionID), err)
|
||||
}
|
||||
log.Printf("[hls %s] closed (cache discarded)", shortHLSID(s.cfg.SessionID))
|
||||
return nil
|
||||
}
|
||||
|
||||
if tmpDir != "" {
|
||||
_ = os.RemoveAll(tmpDir)
|
||||
}
|
||||
|
|
@ -415,6 +574,31 @@ func (s *HLSSession) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// allSegmentsPresent reports whether every expected segment (and init.mp4) is
|
||||
// on disk AND validated by the segment poller. Used to decide whether a
|
||||
// finished session is cacheable. We trust readyMax (advanced by pollSegments
|
||||
// only after the next segment exists, proving the predecessor is fully closed)
|
||||
// over a naive Size>0 stat that could accept truncated mid-write files.
|
||||
func (s *HLSSession) allSegmentsPresent() bool {
|
||||
if fi, err := os.Stat(filepath.Join(s.tmpDir, "video", "init.mp4")); err != nil || fi.Size() == 0 {
|
||||
return false
|
||||
}
|
||||
s.readyMu.Lock()
|
||||
readyMax := s.readyMax
|
||||
s.readyMu.Unlock()
|
||||
if readyMax < s.segmentCount-1 {
|
||||
return false
|
||||
}
|
||||
for i := 0; i < s.segmentCount; i++ {
|
||||
path := filepath.Join(s.tmpDir, "video", fmt.Sprintf("seg-%d.m4s", i))
|
||||
fi, err := os.Stat(path)
|
||||
if err != nil || fi.Size() == 0 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// waitFFmpeg reaps the ffmpeg process and records its exit error for handlers.
|
||||
//
|
||||
// Auto-restart supervisor: if ffmpeg crashes (non-graceful exit) and the
|
||||
|
|
|
|||
410
internal/engine/hls_cache.go
Normal file
410
internal/engine/hls_cache.go
Normal file
|
|
@ -0,0 +1,410 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// HLSCache persists transcoded HLS segments per (source, quality, audio) so a
|
||||
// second play of the same file at the same quality skips ffmpeg entirely.
|
||||
//
|
||||
// Layout on disk:
|
||||
//
|
||||
// {root}/{key}/init.mp4
|
||||
// {root}/{key}/seg-0.m4s
|
||||
// {root}/{key}/seg-N.m4s
|
||||
// {root}/{key}/.complete
|
||||
//
|
||||
// Atomicity: the .complete marker is written only when ffmpeg exits 0 AND all
|
||||
// segments are on disk. A dir without .complete is treated as a partial run —
|
||||
// next session can reuse the segments already present, ffmpeg fills the gaps.
|
||||
//
|
||||
// Concurrency: Pin/Unpin increments a ref counter per key so the LRU sweeper
|
||||
// never evicts a directory that an active session is reading from.
|
||||
type HLSCache struct {
|
||||
root string
|
||||
maxBytes int64
|
||||
|
||||
mu sync.Mutex
|
||||
refs map[string]int
|
||||
writers map[string]bool // exclusive ffmpeg writer per key; nil entries are absent
|
||||
|
||||
// Counters surfaced via Stats() — useful for /api/internal/agent/cache-stats
|
||||
// and for the sweeper's daily log line. atomic so RecordHit/RecordMiss are
|
||||
// safe to call from any goroutine without taking the cache mutex.
|
||||
hits atomic.Uint64
|
||||
misses atomic.Uint64
|
||||
}
|
||||
|
||||
const (
|
||||
hlsCacheCompleteMarker = ".complete"
|
||||
// hlsCacheMinBudgetGB clamps absurd / zero / negative SizeGB values to
|
||||
// a sane floor. NOT a guarantee that any single encode fits — a long
|
||||
// 4K HEVC re-encode can exceed it. Operators should set size_gb based
|
||||
// on their actual workload.
|
||||
hlsCacheMinBudgetGB = 1
|
||||
// hlsCacheStartupOrphanAge: directories without .complete older than
|
||||
// this are removed on cache startup. Long enough that a daemon crash
|
||||
// during an in-progress encode (which legitimately leaves a partial
|
||||
// dir) doesn't get nuked too aggressively if the daemon restarts fast.
|
||||
hlsCacheStartupOrphanAge = 10 * time.Minute
|
||||
)
|
||||
|
||||
// NewHLSCache creates the cache rooted at the given dir with a size budget in
|
||||
// gigabytes. A budget < hlsCacheMinBudgetGB is clamped up so a single play
|
||||
// doesn't get instantly evicted mid-stream.
|
||||
func NewHLSCache(root string, sizeGB int) (*HLSCache, error) {
|
||||
if root == "" {
|
||||
return nil, errors.New("hls_cache: empty root")
|
||||
}
|
||||
if sizeGB < hlsCacheMinBudgetGB {
|
||||
sizeGB = hlsCacheMinBudgetGB
|
||||
}
|
||||
if err := os.MkdirAll(root, 0o755); err != nil {
|
||||
return nil, fmt.Errorf("hls_cache: mkdir root: %w", err)
|
||||
}
|
||||
c := &HLSCache{
|
||||
root: root,
|
||||
maxBytes: int64(sizeGB) * 1024 * 1024 * 1024,
|
||||
refs: make(map[string]int),
|
||||
writers: make(map[string]bool),
|
||||
}
|
||||
// Reap dirs left over from a crashed encode. A dir without .complete that
|
||||
// hasn't been touched recently was almost certainly orphaned by an
|
||||
// ungraceful daemon exit — keeping it just feeds the unbounded growth
|
||||
// pattern the hourly LRU is too slow to contain.
|
||||
if removed, err := c.cleanStartupOrphans(); err != nil {
|
||||
log.Printf("[hls_cache] startup orphan cleanup: %v", err)
|
||||
} else if removed > 0 {
|
||||
log.Printf("[hls_cache] startup: removed %d orphan dir(s) without .complete", removed)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// cleanStartupOrphans removes cache subdirectories that lack a .complete
|
||||
// marker AND haven't been modified within hlsCacheStartupOrphanAge. Called
|
||||
// once at construction. Safe at startup because no sessions are active yet,
|
||||
// so Pin can't race with us.
|
||||
func (c *HLSCache) cleanStartupOrphans() (int, error) {
|
||||
entries, err := os.ReadDir(c.root)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return 0, nil
|
||||
}
|
||||
return 0, err
|
||||
}
|
||||
cutoff := time.Now().Add(-hlsCacheStartupOrphanAge)
|
||||
removed := 0
|
||||
for _, e := range entries {
|
||||
if !e.IsDir() {
|
||||
continue
|
||||
}
|
||||
dir := filepath.Join(c.root, e.Name())
|
||||
if _, err := os.Stat(filepath.Join(dir, hlsCacheCompleteMarker)); err == nil {
|
||||
continue // sealed, keep
|
||||
}
|
||||
info, err := e.Info()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if info.ModTime().After(cutoff) {
|
||||
continue // too recent — might be a daemon that just restarted mid-encode
|
||||
}
|
||||
if err := os.RemoveAll(dir); err == nil {
|
||||
removed++
|
||||
}
|
||||
}
|
||||
return removed, nil
|
||||
}
|
||||
|
||||
// TryAcquireWriter attempts to claim exclusive ffmpeg-write access to a key.
|
||||
// Returns true on success — the caller is then responsible for ReleaseWriter
|
||||
// when ffmpeg exits / fails. Returns false if another session is already
|
||||
// writing this key, in which case the caller must fall back to a private
|
||||
// per-session tmpdir (no caching for that session).
|
||||
func (c *HLSCache) TryAcquireWriter(key string) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.writers[key] {
|
||||
return false
|
||||
}
|
||||
c.writers[key] = true
|
||||
return true
|
||||
}
|
||||
|
||||
// ReleaseWriter releases the writer claim acquired via TryAcquireWriter.
|
||||
// Idempotent on unknown keys.
|
||||
func (c *HLSCache) ReleaseWriter(key string) {
|
||||
c.mu.Lock()
|
||||
delete(c.writers, key)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// KeyFor derives a stable cache key for (source, quality, audioIndex). Using
|
||||
// the absolute source path means renaming a file invalidates the cache, which
|
||||
// is correct — segment content is tied to the encoded source.
|
||||
func (c *HLSCache) KeyFor(sourcePath, quality string, audioIndex int) string {
|
||||
abs, err := filepath.Abs(sourcePath)
|
||||
if err != nil {
|
||||
abs = sourcePath
|
||||
}
|
||||
h := sha256.Sum256([]byte(fmt.Sprintf("%s|%s|%d", abs, quality, audioIndex)))
|
||||
return hex.EncodeToString(h[:8]) // 16 hex chars — collision-safe enough for per-host cache
|
||||
}
|
||||
|
||||
// DirFor returns the on-disk directory for a cache key. Caller is responsible
|
||||
// for creating it.
|
||||
func (c *HLSCache) DirFor(key string) string {
|
||||
return filepath.Join(c.root, key)
|
||||
}
|
||||
|
||||
// HasComplete returns true when the .complete marker is present, meaning the
|
||||
// directory holds a full set of segments from a successful encode.
|
||||
func (c *HLSCache) HasComplete(key string) bool {
|
||||
if _, err := os.Stat(filepath.Join(c.DirFor(key), hlsCacheCompleteMarker)); err == nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// MarkComplete writes the .complete marker. Call only after verifying ffmpeg
|
||||
// exited cleanly AND every expected segment is on disk. The dir must already
|
||||
// exist — StartHLSSession created it on the writer path.
|
||||
func (c *HLSCache) MarkComplete(key string) error {
|
||||
return os.WriteFile(filepath.Join(c.DirFor(key), hlsCacheCompleteMarker), nil, 0o644)
|
||||
}
|
||||
|
||||
// RecordHit increments the hit counter; called by StartHLSSession on a
|
||||
// cache-HIT path.
|
||||
func (c *HLSCache) RecordHit() { c.hits.Add(1) }
|
||||
|
||||
// RecordMiss increments the miss counter; called when a session has to
|
||||
// encode from scratch (or fails an integrity check on a stale HIT).
|
||||
func (c *HLSCache) RecordMiss() { c.misses.Add(1) }
|
||||
|
||||
// CacheStats is a snapshot of the cache's runtime counters + on-disk size.
|
||||
// The size fields are best-effort (computed via dirSize) so callers paying
|
||||
// for them should cache the result, not poll in a hot loop.
|
||||
type CacheStats struct {
|
||||
Hits uint64
|
||||
Misses uint64
|
||||
EntryCount int
|
||||
TotalBytes int64
|
||||
}
|
||||
|
||||
// Stats returns a snapshot of the cache counters and size. Walks the root
|
||||
// to total disk usage — O(N segments). Call at most every few minutes.
|
||||
func (c *HLSCache) Stats() CacheStats {
|
||||
s := CacheStats{
|
||||
Hits: c.hits.Load(),
|
||||
Misses: c.misses.Load(),
|
||||
}
|
||||
entries, err := os.ReadDir(c.root)
|
||||
if err != nil {
|
||||
return s
|
||||
}
|
||||
for _, e := range entries {
|
||||
if !e.IsDir() {
|
||||
continue
|
||||
}
|
||||
size, err := dirSize(filepath.Join(c.root, e.Name()))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
s.EntryCount++
|
||||
s.TotalBytes += size
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// hitRatePercent returns the current hit/(hit+miss) percentage rounded to
|
||||
// the nearest int; 0 when no calls have been recorded.
|
||||
func (c *HLSCache) hitRatePercent() int {
|
||||
h := c.hits.Load()
|
||||
m := c.misses.Load()
|
||||
total := h + m
|
||||
if total == 0 {
|
||||
return 0
|
||||
}
|
||||
return int((h*100 + total/2) / total)
|
||||
}
|
||||
|
||||
// VerifyComplete checks that the .complete marker is present AND the
|
||||
// essential files (init.mp4 + last segment) exist with non-zero size. A
|
||||
// dir that passes HasComplete but fails VerifyComplete is treated as
|
||||
// corrupted — typically external `rm` or a partial-disk-failure scenario.
|
||||
// When it returns false, callers should Invalidate and re-encode.
|
||||
func (c *HLSCache) VerifyComplete(key string, segmentCount int) bool {
|
||||
if !c.HasComplete(key) {
|
||||
return false
|
||||
}
|
||||
dir := c.DirFor(key)
|
||||
if fi, err := os.Stat(filepath.Join(dir, "video", "init.mp4")); err != nil || fi.Size() == 0 {
|
||||
return false
|
||||
}
|
||||
if segmentCount > 0 {
|
||||
lastSeg := filepath.Join(dir, "video", fmt.Sprintf("seg-%d.m4s", segmentCount-1))
|
||||
if fi, err := os.Stat(lastSeg); err != nil || fi.Size() == 0 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Pin increments the ref counter for a key. The sweeper checks this before
|
||||
// evicting, so a pinned dir is safe even if its mtime is old.
|
||||
func (c *HLSCache) Pin(key string) {
|
||||
c.mu.Lock()
|
||||
c.refs[key]++
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Unpin decrements; safe to call on unknown keys (no-op).
|
||||
func (c *HLSCache) Unpin(key string) {
|
||||
c.mu.Lock()
|
||||
if c.refs[key] > 0 {
|
||||
c.refs[key]--
|
||||
if c.refs[key] == 0 {
|
||||
delete(c.refs, key)
|
||||
}
|
||||
}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *HLSCache) isPinned(key string) bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.refs[key] > 0
|
||||
}
|
||||
|
||||
// Touch updates the directory mtime so LRU picks fresher entries as recently
|
||||
// used. Called when a session starts reading from a cached dir.
|
||||
func (c *HLSCache) Touch(key string) error {
|
||||
dir := c.DirFor(key)
|
||||
now := time.Now()
|
||||
return os.Chtimes(dir, now, now)
|
||||
}
|
||||
|
||||
// Sweep enforces the size budget by deleting the least-recently-used cache
|
||||
// dirs (ignoring pinned ones) until the total size is at or below maxBytes.
|
||||
// Returns the number of bytes freed.
|
||||
func (c *HLSCache) Sweep() (int64, error) {
|
||||
entries, err := os.ReadDir(c.root)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return 0, nil
|
||||
}
|
||||
return 0, fmt.Errorf("hls_cache: read root: %w", err)
|
||||
}
|
||||
|
||||
type item struct {
|
||||
key string
|
||||
path string
|
||||
size int64
|
||||
mtime time.Time
|
||||
}
|
||||
items := make([]item, 0, len(entries))
|
||||
var total, pinned int64
|
||||
for _, e := range entries {
|
||||
if !e.IsDir() {
|
||||
continue
|
||||
}
|
||||
info, err := e.Info()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
key := e.Name()
|
||||
path := filepath.Join(c.root, key)
|
||||
size, err := dirSize(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
items = append(items, item{key: key, path: path, size: size, mtime: info.ModTime()})
|
||||
total += size
|
||||
if c.isPinned(key) {
|
||||
pinned += size
|
||||
}
|
||||
}
|
||||
|
||||
if total <= c.maxBytes {
|
||||
return 0, nil
|
||||
}
|
||||
if pinned >= c.maxBytes {
|
||||
// Every pinned byte already exceeds the budget — even evicting
|
||||
// every unpinned dir won't bring us under. Warn loudly so the
|
||||
// operator knows to bump size_gb (or kill the long-running session).
|
||||
log.Printf("[hls_cache] warn: pinned bytes (%.1f MB) exceed budget (%.1f MB) — cannot enforce limit until sessions release",
|
||||
float64(pinned)/(1024*1024), float64(c.maxBytes)/(1024*1024))
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Oldest first.
|
||||
sort.Slice(items, func(i, j int) bool {
|
||||
return items[i].mtime.Before(items[j].mtime)
|
||||
})
|
||||
|
||||
var freed int64
|
||||
for _, it := range items {
|
||||
if total-freed <= c.maxBytes {
|
||||
break
|
||||
}
|
||||
if c.isPinned(it.key) {
|
||||
continue
|
||||
}
|
||||
if err := os.RemoveAll(it.path); err != nil {
|
||||
log.Printf("[hls_cache] evict %s failed: %v", it.key, err)
|
||||
continue
|
||||
}
|
||||
log.Printf("[hls_cache] evicted %s (%.1f MB, age %s)",
|
||||
it.key, float64(it.size)/(1024*1024), time.Since(it.mtime).Round(time.Second))
|
||||
freed += it.size
|
||||
}
|
||||
return freed, nil
|
||||
}
|
||||
|
||||
// StartSweeper kicks off the LRU sweeper goroutine. Cancels on ctx done.
|
||||
// In addition to enforcing the size budget, logs a daily summary of hit-rate
|
||||
// + disk usage so operators can see the cache's value at a glance.
|
||||
func (c *HLSCache) StartSweeper(ctx context.Context, interval time.Duration) {
|
||||
if interval <= 0 {
|
||||
interval = time.Hour
|
||||
}
|
||||
go func() {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
statsTick := time.NewTicker(24 * time.Hour)
|
||||
defer statsTick.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
if _, err := c.Sweep(); err != nil {
|
||||
log.Printf("[hls_cache] sweep error: %v", err)
|
||||
}
|
||||
case <-statsTick.C:
|
||||
s := c.Stats()
|
||||
log.Printf("[hls_cache] day-stats: hits=%d misses=%d ratio=%d%% entries=%d size=%.1fMB",
|
||||
s.Hits, s.Misses, c.hitRatePercent(), s.EntryCount,
|
||||
float64(s.TotalBytes)/(1024*1024))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Invalidate removes a cache entry — used when ffmpeg fails to encode the
|
||||
// source so we don't reuse a half-written dir next time.
|
||||
func (c *HLSCache) Invalidate(key string) error {
|
||||
return os.RemoveAll(c.DirFor(key))
|
||||
}
|
||||
|
||||
134
internal/engine/hls_cache_smoke_test.go
Normal file
134
internal/engine/hls_cache_smoke_test.go
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
//go:build smoke
|
||||
|
||||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestHLSCacheSmoke exercises the end-to-end cache flow against real ffmpeg:
|
||||
// - First session encodes a 5s test pattern; expect MISS, ffmpeg runs,
|
||||
// .complete written, MarkComplete logs.
|
||||
// - Second session for identical (source, quality, audio); expect HIT,
|
||||
// no ffmpeg, instant Start.
|
||||
//
|
||||
// Build tag `smoke` keeps it out of the default `go test ./...` run because
|
||||
// it depends on a working ffmpeg/ffprobe and takes ~5–10 s.
|
||||
//
|
||||
// go test -tags=smoke -run TestHLSCacheSmoke -v ./internal/engine/
|
||||
func TestHLSCacheSmoke(t *testing.T) {
|
||||
ffmpeg, err := exec.LookPath("ffmpeg")
|
||||
if err != nil {
|
||||
t.Skipf("ffmpeg not on PATH: %v", err)
|
||||
}
|
||||
ffprobe, err := exec.LookPath("ffprobe")
|
||||
if err != nil {
|
||||
t.Skipf("ffprobe not on PATH: %v", err)
|
||||
}
|
||||
|
||||
tmp := t.TempDir()
|
||||
source := filepath.Join(tmp, "source.mp4")
|
||||
t.Logf("generating 5 s test pattern → %s", source)
|
||||
if out, err := exec.Command(ffmpeg,
|
||||
"-y", "-loglevel", "error",
|
||||
"-f", "lavfi", "-i", "testsrc=duration=5:size=640x480:rate=30",
|
||||
"-f", "lavfi", "-i", "sine=frequency=1000:duration=5",
|
||||
"-c:v", "libx264", "-preset", "ultrafast", "-pix_fmt", "yuv420p",
|
||||
"-c:a", "aac",
|
||||
source,
|
||||
).CombinedOutput(); err != nil {
|
||||
t.Fatalf("ffmpeg generate: %v\n%s", err, out)
|
||||
}
|
||||
|
||||
cacheRoot := filepath.Join(tmp, "cache")
|
||||
cache, err := NewHLSCache(cacheRoot, 1)
|
||||
if err != nil {
|
||||
t.Fatalf("NewHLSCache: %v", err)
|
||||
}
|
||||
|
||||
cfg := HLSSessionConfig{
|
||||
SessionID: "smoke1",
|
||||
SourcePath: source,
|
||||
FileName: "source.mp4",
|
||||
Quality: "720p",
|
||||
AudioIndex: 0,
|
||||
Transcode: TranscodeRuntime{
|
||||
FFmpegPath: ffmpeg,
|
||||
FFprobePath: ffprobe,
|
||||
Preset: "ultrafast",
|
||||
},
|
||||
Cache: cache,
|
||||
}
|
||||
|
||||
// First run — expect MISS, ffmpeg runs.
|
||||
t.Log("session 1: expect MISS")
|
||||
t0 := time.Now()
|
||||
s1, err := StartHLSSession(context.Background(), cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("StartHLSSession #1: %v", err)
|
||||
}
|
||||
if s1.fromCache {
|
||||
t.Fatal("session 1 reported cache HIT on a fresh cache")
|
||||
}
|
||||
|
||||
// Wait for all segments to land. 5 s source @ 4 s segments → 2 segments.
|
||||
deadline := time.Now().Add(60 * time.Second)
|
||||
for {
|
||||
s1.readyMu.Lock()
|
||||
ready := s1.readyMax
|
||||
exited := s1.exited
|
||||
s1.readyMu.Unlock()
|
||||
if ready >= s1.segmentCount-1 && exited {
|
||||
break
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
_ = s1.Close()
|
||||
t.Fatalf("session 1 didn't finish in 60 s (readyMax=%d/%d, exited=%v)",
|
||||
ready, s1.segmentCount-1, exited)
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
if err := s1.Close(); err != nil {
|
||||
t.Fatalf("Close #1: %v", err)
|
||||
}
|
||||
encodeDur := time.Since(t0)
|
||||
t.Logf("session 1: MISS completed in %s", encodeDur.Round(time.Millisecond))
|
||||
|
||||
key := cache.KeyFor(source, "720p", 0)
|
||||
if !cache.HasComplete(key) {
|
||||
t.Fatalf("cache.HasComplete(%s) is false after successful encode", key)
|
||||
}
|
||||
|
||||
// Second run — expect HIT, no ffmpeg.
|
||||
t.Log("session 2: expect HIT")
|
||||
cfg.SessionID = "smoke2"
|
||||
t1 := time.Now()
|
||||
s2, err := StartHLSSession(context.Background(), cfg)
|
||||
if err != nil {
|
||||
t.Fatalf("StartHLSSession #2: %v", err)
|
||||
}
|
||||
if !s2.fromCache {
|
||||
t.Fatal("session 2 should have reported cache HIT")
|
||||
}
|
||||
if s2.cmd != nil {
|
||||
t.Fatal("session 2 should not have spawned ffmpeg (s.cmd != nil)")
|
||||
}
|
||||
hitDur := time.Since(t1)
|
||||
t.Logf("session 2: HIT in %s (%.1f× faster than MISS)",
|
||||
hitDur.Round(time.Millisecond), float64(encodeDur)/float64(hitDur))
|
||||
if hitDur > 500*time.Millisecond {
|
||||
t.Errorf("HIT path too slow: %s — expected <500 ms", hitDur)
|
||||
}
|
||||
if err := s2.Close(); err != nil {
|
||||
t.Fatalf("Close #2: %v", err)
|
||||
}
|
||||
|
||||
// After the HIT session closes, the cache dir + .complete must still exist.
|
||||
if !cache.HasComplete(key) {
|
||||
t.Fatal(".complete disappeared after HIT session closed")
|
||||
}
|
||||
}
|
||||
361
internal/engine/hls_cache_test.go
Normal file
361
internal/engine/hls_cache_test.go
Normal file
|
|
@ -0,0 +1,361 @@
|
|||
package engine
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func newTestCache(t *testing.T, sizeGB int) *HLSCache {
|
||||
t.Helper()
|
||||
root := t.TempDir()
|
||||
c, err := NewHLSCache(root, sizeGB)
|
||||
if err != nil {
|
||||
t.Fatalf("NewHLSCache: %v", err)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
func TestKeyForStable(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
k1 := c.KeyFor("/a/b/movie.mkv", "1080p", 0)
|
||||
k2 := c.KeyFor("/a/b/movie.mkv", "1080p", 0)
|
||||
if k1 != k2 {
|
||||
t.Fatalf("expected stable keys, got %q vs %q", k1, k2)
|
||||
}
|
||||
if c.KeyFor("/a/b/movie.mkv", "720p", 0) == k1 {
|
||||
t.Fatal("quality should change key")
|
||||
}
|
||||
if c.KeyFor("/a/b/movie.mkv", "1080p", 1) == k1 {
|
||||
t.Fatal("audio index should change key")
|
||||
}
|
||||
if c.KeyFor("/x/y/other.mkv", "1080p", 0) == k1 {
|
||||
t.Fatal("path should change key")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarkCompleteAndHas(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
key := "abc123"
|
||||
if c.HasComplete(key) {
|
||||
t.Fatal("fresh cache should not report complete")
|
||||
}
|
||||
// Production callers create the dir during StartHLSSession; MarkComplete
|
||||
// trusts that invariant and fails if the dir was wiped meanwhile.
|
||||
if err := os.MkdirAll(c.DirFor(key), 0o755); err != nil {
|
||||
t.Fatalf("mkdir: %v", err)
|
||||
}
|
||||
if err := c.MarkComplete(key); err != nil {
|
||||
t.Fatalf("MarkComplete: %v", err)
|
||||
}
|
||||
if !c.HasComplete(key) {
|
||||
t.Fatal("after MarkComplete, HasComplete must be true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarkCompleteFailsWithoutDir(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
if err := c.MarkComplete("never-created"); err == nil {
|
||||
t.Fatal("MarkComplete should error when dir doesn't exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPinPreventsEviction(t *testing.T) {
|
||||
c := newTestCache(t, 1) // 1 GB budget, but min clamp keeps it usable
|
||||
c.maxBytes = 1024 // squeeze budget for the test
|
||||
|
||||
// Write two entries past the budget.
|
||||
for i, key := range []string{"old", "new"} {
|
||||
dir := c.DirFor(key)
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
t.Fatalf("mkdir %s: %v", dir, err)
|
||||
}
|
||||
path := filepath.Join(dir, "seg.bin")
|
||||
if err := os.WriteFile(path, make([]byte, 800), 0o644); err != nil {
|
||||
t.Fatalf("write %s: %v", path, err)
|
||||
}
|
||||
now := time.Now().Add(time.Duration(i) * time.Hour) // "old" mtime < "new"
|
||||
_ = os.Chtimes(dir, now, now)
|
||||
}
|
||||
|
||||
c.Pin("old") // protect the older one
|
||||
freed, err := c.Sweep()
|
||||
if err != nil {
|
||||
t.Fatalf("Sweep: %v", err)
|
||||
}
|
||||
if freed == 0 {
|
||||
t.Fatal("expected some eviction")
|
||||
}
|
||||
if _, err := os.Stat(c.DirFor("old")); err != nil {
|
||||
t.Fatal("pinned 'old' was evicted")
|
||||
}
|
||||
if _, err := os.Stat(c.DirFor("new")); err == nil {
|
||||
t.Fatal("'new' should have been evicted to make room")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweepNoOpUnderBudget(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
dir := c.DirFor("small")
|
||||
_ = os.MkdirAll(dir, 0o755)
|
||||
_ = os.WriteFile(filepath.Join(dir, "x"), []byte("tiny"), 0o644)
|
||||
freed, err := c.Sweep()
|
||||
if err != nil {
|
||||
t.Fatalf("Sweep: %v", err)
|
||||
}
|
||||
if freed != 0 {
|
||||
t.Fatalf("expected 0 freed under budget, got %d", freed)
|
||||
}
|
||||
if _, err := os.Stat(dir); err != nil {
|
||||
t.Fatal("under-budget entry was wrongly evicted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweepEmptyRoot(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
freed, err := c.Sweep()
|
||||
if err != nil {
|
||||
t.Fatalf("Sweep empty: %v", err)
|
||||
}
|
||||
if freed != 0 {
|
||||
t.Fatalf("freed=%d, want 0", freed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestInvalidateRemovesDir(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
key := "drop"
|
||||
dir := c.DirFor(key)
|
||||
_ = os.MkdirAll(dir, 0o755)
|
||||
_ = os.WriteFile(filepath.Join(dir, "x"), []byte("y"), 0o644)
|
||||
if err := c.Invalidate(key); err != nil {
|
||||
t.Fatalf("Invalidate: %v", err)
|
||||
}
|
||||
if _, err := os.Stat(dir); err == nil {
|
||||
t.Fatal("dir still present after Invalidate")
|
||||
}
|
||||
}
|
||||
|
||||
func TestTouchUpdatesMtime(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
key := "touch"
|
||||
dir := c.DirFor(key)
|
||||
_ = os.MkdirAll(dir, 0o755)
|
||||
old := time.Now().Add(-2 * time.Hour)
|
||||
_ = os.Chtimes(dir, old, old)
|
||||
|
||||
if err := c.Touch(key); err != nil {
|
||||
t.Fatalf("Touch: %v", err)
|
||||
}
|
||||
info, err := os.Stat(dir)
|
||||
if err != nil {
|
||||
t.Fatalf("stat: %v", err)
|
||||
}
|
||||
if !info.ModTime().After(old.Add(time.Minute)) {
|
||||
t.Fatalf("mtime not refreshed: %v", info.ModTime())
|
||||
}
|
||||
}
|
||||
|
||||
func TestPinUnpinSymmetry(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
c.Pin("k")
|
||||
c.Pin("k")
|
||||
if !c.isPinned("k") {
|
||||
t.Fatal("Pin twice should leave pinned")
|
||||
}
|
||||
c.Unpin("k")
|
||||
if !c.isPinned("k") {
|
||||
t.Fatal("Unpin once should keep pinned (refs=1)")
|
||||
}
|
||||
c.Unpin("k")
|
||||
if c.isPinned("k") {
|
||||
t.Fatal("Unpin twice should drop pin")
|
||||
}
|
||||
c.Unpin("k") // safe no-op
|
||||
}
|
||||
|
||||
func TestConcurrentPinUnpin(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 100; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
c.Pin("race")
|
||||
time.Sleep(time.Microsecond)
|
||||
c.Unpin("race")
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
if c.isPinned("race") {
|
||||
t.Fatal("refs leaked")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweeperLoopExits(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c.StartSweeper(ctx, 10*time.Millisecond)
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
cancel()
|
||||
// If StartSweeper doesn't exit on cancel the test would leak a goroutine;
|
||||
// the leak detector in the test runner will surface it.
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
|
||||
func TestMinBudgetClamp(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
c, err := NewHLSCache(root, 0) // below floor
|
||||
if err != nil {
|
||||
t.Fatalf("NewHLSCache: %v", err)
|
||||
}
|
||||
if c.maxBytes != int64(hlsCacheMinBudgetGB)*1024*1024*1024 {
|
||||
t.Fatalf("budget not clamped to min: got %d", c.maxBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTryAcquireWriterExclusive(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
if !c.TryAcquireWriter("k") {
|
||||
t.Fatal("first acquire should succeed")
|
||||
}
|
||||
if c.TryAcquireWriter("k") {
|
||||
t.Fatal("second acquire for same key must fail")
|
||||
}
|
||||
if !c.TryAcquireWriter("other") {
|
||||
t.Fatal("different key should not conflict")
|
||||
}
|
||||
c.ReleaseWriter("k")
|
||||
if !c.TryAcquireWriter("k") {
|
||||
t.Fatal("acquire after release should succeed")
|
||||
}
|
||||
c.ReleaseWriter("k")
|
||||
c.ReleaseWriter("k") // idempotent
|
||||
}
|
||||
|
||||
func TestStartupOrphanCleanup(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
|
||||
// Pre-seed: one sealed dir + one orphan old enough + one orphan fresh.
|
||||
sealed := filepath.Join(root, "sealed")
|
||||
_ = os.MkdirAll(sealed, 0o755)
|
||||
_ = os.WriteFile(filepath.Join(sealed, hlsCacheCompleteMarker), nil, 0o644)
|
||||
|
||||
staleOrphan := filepath.Join(root, "stale_orphan")
|
||||
_ = os.MkdirAll(staleOrphan, 0o755)
|
||||
old := time.Now().Add(-2 * hlsCacheStartupOrphanAge)
|
||||
_ = os.Chtimes(staleOrphan, old, old)
|
||||
|
||||
freshOrphan := filepath.Join(root, "fresh_orphan")
|
||||
_ = os.MkdirAll(freshOrphan, 0o755)
|
||||
|
||||
if _, err := NewHLSCache(root, 1); err != nil {
|
||||
t.Fatalf("NewHLSCache: %v", err)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(sealed); err != nil {
|
||||
t.Fatal("sealed dir was wrongly removed")
|
||||
}
|
||||
if _, err := os.Stat(staleOrphan); err == nil {
|
||||
t.Fatal("stale orphan should have been removed at startup")
|
||||
}
|
||||
if _, err := os.Stat(freshOrphan); err != nil {
|
||||
t.Fatal("fresh orphan should be kept (might be a mid-restart encode)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHitMissCounters(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
if s := c.Stats(); s.Hits != 0 || s.Misses != 0 {
|
||||
t.Fatalf("fresh cache stats not zero: %+v", s)
|
||||
}
|
||||
c.RecordHit()
|
||||
c.RecordHit()
|
||||
c.RecordMiss()
|
||||
s := c.Stats()
|
||||
if s.Hits != 2 || s.Misses != 1 {
|
||||
t.Fatalf("counters wrong: %+v", s)
|
||||
}
|
||||
// 2/3 = 67%
|
||||
if got := c.hitRatePercent(); got != 67 {
|
||||
t.Fatalf("hitRatePercent=%d, want 67", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatsEntryCount(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
for _, k := range []string{"a", "b", "c"} {
|
||||
dir := c.DirFor(k)
|
||||
_ = os.MkdirAll(dir, 0o755)
|
||||
_ = os.WriteFile(filepath.Join(dir, "x"), []byte("hello"), 0o644)
|
||||
}
|
||||
s := c.Stats()
|
||||
if s.EntryCount != 3 {
|
||||
t.Fatalf("EntryCount=%d, want 3", s.EntryCount)
|
||||
}
|
||||
if s.TotalBytes != 15 {
|
||||
t.Fatalf("TotalBytes=%d, want 15", s.TotalBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestVerifyCompleteRejectsMissingFiles(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
key := "v"
|
||||
dir := c.DirFor(key)
|
||||
_ = os.MkdirAll(filepath.Join(dir, "video"), 0o755)
|
||||
|
||||
// No .complete yet → reject.
|
||||
if c.VerifyComplete(key, 2) {
|
||||
t.Fatal("VerifyComplete should reject without .complete")
|
||||
}
|
||||
|
||||
// Mark complete but no files → reject.
|
||||
if err := c.MarkComplete(key); err != nil {
|
||||
t.Fatalf("MarkComplete: %v", err)
|
||||
}
|
||||
if c.VerifyComplete(key, 2) {
|
||||
t.Fatal("VerifyComplete should reject when init.mp4 missing")
|
||||
}
|
||||
|
||||
// Write init.mp4, last seg missing → reject.
|
||||
_ = os.WriteFile(filepath.Join(dir, "video", "init.mp4"), []byte("..."), 0o644)
|
||||
if c.VerifyComplete(key, 2) {
|
||||
t.Fatal("VerifyComplete should reject when last segment missing")
|
||||
}
|
||||
|
||||
// Write last seg → pass.
|
||||
_ = os.WriteFile(filepath.Join(dir, "video", "seg-1.m4s"), []byte("..."), 0o644)
|
||||
if !c.VerifyComplete(key, 2) {
|
||||
t.Fatal("VerifyComplete should pass with all files present")
|
||||
}
|
||||
|
||||
// Zero-size last seg → reject.
|
||||
_ = os.WriteFile(filepath.Join(dir, "video", "seg-1.m4s"), nil, 0o644)
|
||||
if c.VerifyComplete(key, 2) {
|
||||
t.Fatal("VerifyComplete should reject zero-size last segment")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSweepRespectsPinnedExceedsBudget(t *testing.T) {
|
||||
c := newTestCache(t, 1)
|
||||
c.maxBytes = 256 // squeeze
|
||||
|
||||
pinned := c.DirFor("pinned")
|
||||
_ = os.MkdirAll(pinned, 0o755)
|
||||
_ = os.WriteFile(filepath.Join(pinned, "x"), make([]byte, 1024), 0o644)
|
||||
c.Pin("pinned")
|
||||
|
||||
freed, err := c.Sweep()
|
||||
if err != nil {
|
||||
t.Fatalf("Sweep: %v", err)
|
||||
}
|
||||
if freed != 0 {
|
||||
t.Fatalf("nothing should have been freed: got %d", freed)
|
||||
}
|
||||
if _, err := os.Stat(pinned); err != nil {
|
||||
t.Fatal("pinned dir wrongly removed despite over-budget pin")
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue