feat(daemon): add on-demand library scan via heartbeat and WebSocket
This commit is contained in:
parent
4cf07c411c
commit
a9179dc758
4 changed files with 56 additions and 15 deletions
|
|
@ -55,6 +55,9 @@ type Daemon struct {
|
||||||
|
|
||||||
// pollNow triggers an immediate poll (e.g. on resume)
|
// pollNow triggers an immediate poll (e.g. on resume)
|
||||||
pollNow chan struct{}
|
pollNow chan struct{}
|
||||||
|
|
||||||
|
// ScanNow triggers an immediate library scan (from heartbeat or WebSocket control event)
|
||||||
|
ScanNow chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDaemon creates a daemon with the given transport.
|
// NewDaemon creates a daemon with the given transport.
|
||||||
|
|
@ -71,6 +74,7 @@ func NewDaemon(cfg DaemonConfig, transport Transport) *Daemon {
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
transport: transport,
|
transport: transport,
|
||||||
pollNow: make(chan struct{}, 1),
|
pollNow: make(chan struct{}, 1),
|
||||||
|
ScanNow: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -236,6 +240,15 @@ func (d *Daemon) heartbeat(ctx context.Context) {
|
||||||
}
|
}
|
||||||
WriteState(&d.State)
|
WriteState(&d.State)
|
||||||
|
|
||||||
|
// Trigger library scan if requested
|
||||||
|
if resp.Scan {
|
||||||
|
log.Printf("Library scan requested by server")
|
||||||
|
select {
|
||||||
|
case d.ScanNow <- struct{}{}:
|
||||||
|
default: // scan already pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Log once per version when server suggests an upgrade
|
// Log once per version when server suggests an upgrade
|
||||||
if resp.Upgrade != nil && resp.Upgrade.Version != "" && resp.Upgrade.Version != d.lastNotifiedVersion {
|
if resp.Upgrade != nil && resp.Upgrade.Version != "" && resp.Upgrade.Version != d.lastNotifiedVersion {
|
||||||
d.lastNotifiedVersion = resp.Upgrade.Version
|
d.lastNotifiedVersion = resp.Upgrade.Version
|
||||||
|
|
@ -266,9 +279,17 @@ func (d *Daemon) handleEvent(event ServerEvent) {
|
||||||
}
|
}
|
||||||
|
|
||||||
case "control":
|
case "control":
|
||||||
if event.Control != nil && d.OnControlAction != nil {
|
if event.Control != nil {
|
||||||
log.Printf("Control action via WebSocket: %s task %s", event.Control.Action, event.Control.TaskID)
|
log.Printf("Control action via WebSocket: %s task %s", event.Control.Action, event.Control.TaskID)
|
||||||
d.OnControlAction(event.Control.Action, event.Control.TaskID)
|
if event.Control.Action == "scan" {
|
||||||
|
select {
|
||||||
|
case d.ScanNow <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if d.OnControlAction != nil {
|
||||||
|
d.OnControlAction(event.Control.Action, event.Control.TaskID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case "disconnected":
|
case "disconnected":
|
||||||
|
|
|
||||||
|
|
@ -137,6 +137,7 @@ type HeartbeatResponse struct {
|
||||||
Success bool `json:"success"`
|
Success bool `json:"success"`
|
||||||
Upgrade *UpgradeSignal `json:"upgrade,omitempty"`
|
Upgrade *UpgradeSignal `json:"upgrade,omitempty"`
|
||||||
Watching bool `json:"watching,omitempty"` // true when a user is viewing download progress in the web UI
|
Watching bool `json:"watching,omitempty"` // true when a user is viewing download progress in the web UI
|
||||||
|
Scan bool `json:"scan,omitempty"` // true when user triggered a library scan from the web UI
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpgradeSignal tells the agent to upgrade to a specific version.
|
// UpgradeSignal tells the agent to upgrade to a specific version.
|
||||||
|
|
@ -290,9 +291,10 @@ type DebridAccount struct {
|
||||||
|
|
||||||
// LibrarySyncRequest sends scanned media items to the server.
|
// LibrarySyncRequest sends scanned media items to the server.
|
||||||
type LibrarySyncRequest struct {
|
type LibrarySyncRequest struct {
|
||||||
Items []LibrarySyncItem `json:"items"`
|
Items []LibrarySyncItem `json:"items"`
|
||||||
ScanPath string `json:"scanPath"`
|
ScanPath string `json:"scanPath"`
|
||||||
IsLastBatch bool `json:"isLastBatch"`
|
IsLastBatch bool `json:"isLastBatch"`
|
||||||
|
SyncStartedAt string `json:"syncStartedAt,omitempty"` // ISO-8601; same for all batches in a session
|
||||||
}
|
}
|
||||||
|
|
||||||
// LibrarySyncItem is a single scanned media file with ffprobe metadata.
|
// LibrarySyncItem is a single scanned media file with ffprobe metadata.
|
||||||
|
|
|
||||||
|
|
@ -416,14 +416,21 @@ func runDaemonStart() error {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Start auto-scan goroutine (daily library scan + sync)
|
// Start auto-scan goroutine (daily library scan + sync)
|
||||||
if cfg.Library.ScanPath != "" && cfg.Library.AutoScan {
|
// Default scan_path to download dir so auto-scan works out of the box.
|
||||||
|
scanPath := cfg.Library.ScanPath
|
||||||
|
if scanPath == "" {
|
||||||
|
scanPath = cfg.Download.Dir
|
||||||
|
}
|
||||||
|
if scanPath != "" && cfg.Library.AutoScan {
|
||||||
|
scanCfg := cfg
|
||||||
|
scanCfg.Library.ScanPath = scanPath
|
||||||
scanInterval := 24 * time.Hour
|
scanInterval := 24 * time.Hour
|
||||||
if cfg.Library.ScanInterval != "" {
|
if cfg.Library.ScanInterval != "" {
|
||||||
if parsed, err := time.ParseDuration(cfg.Library.ScanInterval); err == nil && parsed > 0 {
|
if parsed, err := time.ParseDuration(cfg.Library.ScanInterval); err == nil && parsed > 0 {
|
||||||
scanInterval = parsed
|
scanInterval = parsed
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
go runAutoScan(ctx, cfg, scanInterval, agentClient)
|
go runAutoScan(ctx, scanCfg, scanInterval, agentClient, d.ScanNow)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start daemon (blocks)
|
// Start daemon (blocks)
|
||||||
|
|
@ -500,13 +507,15 @@ func formatSpeedLog(bps int64) string {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// runAutoScan runs a library scan + sync on a timer.
|
// runAutoScan runs a library scan + sync on a timer or on-demand via scanNow channel.
|
||||||
func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, ac *agent.Client) {
|
func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration, ac *agent.Client, scanNow <-chan struct{}) {
|
||||||
log.Printf("[auto-scan] enabled: every %s, path: %s", interval, cfg.Library.ScanPath)
|
log.Printf("[auto-scan] enabled: every %s, path: %s", interval, cfg.Library.ScanPath)
|
||||||
|
|
||||||
// Run first scan after a short delay (let daemon stabilize)
|
// Run first scan after a short delay (let daemon stabilize)
|
||||||
select {
|
select {
|
||||||
case <-time.After(30 * time.Second):
|
case <-time.After(30 * time.Second):
|
||||||
|
case <-scanNow:
|
||||||
|
// Immediate scan requested before initial delay
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -549,6 +558,7 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
const batchSize = 100
|
const batchSize = 100
|
||||||
|
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
|
||||||
for i := 0; i < len(items); i += batchSize {
|
for i := 0; i < len(items); i += batchSize {
|
||||||
end := i + batchSize
|
end := i + batchSize
|
||||||
if end > len(items) {
|
if end > len(items) {
|
||||||
|
|
@ -557,9 +567,10 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration,
|
||||||
isLast := end >= len(items)
|
isLast := end >= len(items)
|
||||||
|
|
||||||
_, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
|
_, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
|
||||||
Items: items[i:end],
|
Items: items[i:end],
|
||||||
ScanPath: cache.Path,
|
ScanPath: cache.Path,
|
||||||
IsLastBatch: isLast,
|
IsLastBatch: isLast,
|
||||||
|
SyncStartedAt: syncStartedAt,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[auto-scan] sync failed: %v", err)
|
log.Printf("[auto-scan] sync failed: %v", err)
|
||||||
|
|
@ -579,6 +590,10 @@ func runAutoScan(ctx context.Context, cfg config.Config, interval time.Duration,
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
doScan()
|
doScan()
|
||||||
|
case <-scanNow:
|
||||||
|
log.Printf("[auto-scan] on-demand scan triggered")
|
||||||
|
ticker.Reset(interval)
|
||||||
|
doScan()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/fatih/color"
|
"github.com/fatih/color"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
@ -165,6 +166,7 @@ func syncToServer(ctx context.Context, cfg config.Config, cache *library.Library
|
||||||
totalSynced := 0
|
totalSynced := 0
|
||||||
totalMatched := 0
|
totalMatched := 0
|
||||||
totalRemoved := 0
|
totalRemoved := 0
|
||||||
|
syncStartedAt := time.Now().UTC().Format(time.RFC3339)
|
||||||
|
|
||||||
for i := 0; i < len(items); i += batchSize {
|
for i := 0; i < len(items); i += batchSize {
|
||||||
end := i + batchSize
|
end := i + batchSize
|
||||||
|
|
@ -177,9 +179,10 @@ func syncToServer(ctx context.Context, cfg config.Config, cache *library.Library
|
||||||
fmt.Fprintf(os.Stderr, "\r Syncing %d/%d items...\033[K", end, len(items))
|
fmt.Fprintf(os.Stderr, "\r Syncing %d/%d items...\033[K", end, len(items))
|
||||||
|
|
||||||
resp, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
|
resp, err := ac.SyncLibrary(ctx, agent.LibrarySyncRequest{
|
||||||
Items: batch,
|
Items: batch,
|
||||||
ScanPath: cache.Path,
|
ScanPath: cache.Path,
|
||||||
IsLastBatch: isLast,
|
IsLastBatch: isLast,
|
||||||
|
SyncStartedAt: syncStartedAt,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sync failed: %w", err)
|
return fmt.Errorf("sync failed: %w", err)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue