fix(agent): add retry with backoff and WebSocket connect for daemon registration
This commit is contained in:
parent
8388220dae
commit
6f81a2f3ea
4 changed files with 73 additions and 6 deletions
|
|
@ -246,14 +246,14 @@ func (c *Client) handleResponse(resp *http.Response, dst any) error {
|
||||||
// Try to parse as JSON error
|
// Try to parse as JSON error
|
||||||
var errResp ErrorResponse
|
var errResp ErrorResponse
|
||||||
if json.Unmarshal(body, &errResp) == nil && errResp.Error != "" {
|
if json.Unmarshal(body, &errResp) == nil && errResp.Error != "" {
|
||||||
return fmt.Errorf("API error %d: %s", resp.StatusCode, errResp.Error)
|
return &HTTPError{StatusCode: resp.StatusCode, Message: errResp.Error}
|
||||||
}
|
}
|
||||||
// Non-JSON response (e.g. HTML error page) — truncate to something readable
|
// Non-JSON response (e.g. HTML error page) — truncate to something readable
|
||||||
msg := string(body)
|
msg := string(body)
|
||||||
if len(msg) > 120 || strings.Contains(msg, "<html") || strings.Contains(msg, "<!DOCTYPE") {
|
if len(msg) > 120 || strings.Contains(msg, "<html") || strings.Contains(msg, "<!DOCTYPE") {
|
||||||
msg = fmt.Sprintf("server returned %s (non-JSON response, likely a server error)", resp.Status)
|
msg = fmt.Sprintf("server returned %s (non-JSON response, likely a server error)", resp.Status)
|
||||||
}
|
}
|
||||||
return fmt.Errorf("API error %d: %s", resp.StatusCode, msg)
|
return &HTTPError{StatusCode: resp.StatusCode, Message: msg}
|
||||||
}
|
}
|
||||||
|
|
||||||
if dst != nil {
|
if dst != nil {
|
||||||
|
|
|
||||||
|
|
@ -2,10 +2,12 @@ package agent
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
@ -76,6 +78,7 @@ func NewDaemon(cfg DaemonConfig, transport Transport) *Daemon {
|
||||||
func (d *Daemon) Transport() Transport { return d.transport }
|
func (d *Daemon) Transport() Transport { return d.transport }
|
||||||
|
|
||||||
// Register registers the agent and fetches user info + features.
|
// Register registers the agent and fetches user info + features.
|
||||||
|
// Retries with exponential backoff on transient errors (429, 5xx, network).
|
||||||
func (d *Daemon) Register(ctx context.Context) error {
|
func (d *Daemon) Register(ctx context.Context) error {
|
||||||
req := RegisterRequest{
|
req := RegisterRequest{
|
||||||
AgentID: d.cfg.AgentID,
|
AgentID: d.cfg.AgentID,
|
||||||
|
|
@ -90,9 +93,32 @@ func (d *Daemon) Register(ctx context.Context) error {
|
||||||
req.DiskTotalBytes = total
|
req.DiskTotalBytes = total
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := d.transport.Register(ctx, req)
|
const maxRetries = 5
|
||||||
|
backoff := 5 * time.Second
|
||||||
|
|
||||||
|
var resp *RegisterResponse
|
||||||
|
var err error
|
||||||
|
for attempt := range maxRetries {
|
||||||
|
resp, err = d.transport.Register(ctx, req)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Only retry on transient errors (429, 5xx, network failures)
|
||||||
|
if !isTransientError(err) {
|
||||||
|
return fmt.Errorf("register: %w", err)
|
||||||
|
}
|
||||||
|
log.Printf("Register failed (attempt %d/%d): %v - retrying in %v", attempt+1, maxRetries, err, backoff)
|
||||||
|
timer := time.NewTimer(backoff)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
timer.Stop()
|
||||||
|
return fmt.Errorf("register: %w", ctx.Err())
|
||||||
|
case <-timer.C:
|
||||||
|
}
|
||||||
|
backoff = min(backoff*2, 60*time.Second)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("register: %w", err)
|
return fmt.Errorf("register: %w (after %d retries)", err, maxRetries)
|
||||||
}
|
}
|
||||||
|
|
||||||
d.User = resp.User
|
d.User = resp.User
|
||||||
|
|
@ -118,8 +144,14 @@ func (d *Daemon) Register(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts the main daemon loop. Blocks until ctx is cancelled.
|
// Run connects the transport, registers the agent, and starts the main loop.
|
||||||
|
// Blocks until ctx is cancelled. Callers must NOT call transport.Connect before Run.
|
||||||
func (d *Daemon) Run(ctx context.Context) error {
|
func (d *Daemon) Run(ctx context.Context) error {
|
||||||
|
// Connect transport (establishes WebSocket if available, falls back to HTTP)
|
||||||
|
if err := d.transport.Connect(ctx); err != nil {
|
||||||
|
return fmt.Errorf("connect transport: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Register
|
// Register
|
||||||
if err := d.Register(ctx); err != nil {
|
if err := d.Register(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -265,6 +297,26 @@ func (d *Daemon) deregister() {
|
||||||
RemoveState()
|
RemoveState()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isTransientError returns true for errors worth retrying (429, 5xx, network).
|
||||||
|
func isTransientError(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// Structured check: HTTPError carries the status code directly
|
||||||
|
var httpErr *HTTPError
|
||||||
|
if errors.As(err, &httpErr) {
|
||||||
|
return httpErr.StatusCode == 429 || httpErr.StatusCode >= 500
|
||||||
|
}
|
||||||
|
// Fallback: network-level errors (no HTTP response received)
|
||||||
|
lower := strings.ToLower(err.Error())
|
||||||
|
for _, keyword := range []string{"connection refused", "no such host", "timeout", "request failed"} {
|
||||||
|
if strings.Contains(lower, keyword) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Daemon) poll(ctx context.Context) {
|
func (d *Daemon) poll(ctx context.Context) {
|
||||||
resp, err := d.transport.ClaimTasks(ctx, d.cfg.AgentID)
|
resp, err := d.transport.ClaimTasks(ctx, d.cfg.AgentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import "context"
|
||||||
// Both WebSocket (via CF Durable Object) and HTTP (direct to origin) implement this.
|
// Both WebSocket (via CF Durable Object) and HTTP (direct to origin) implement this.
|
||||||
type Transport interface {
|
type Transport interface {
|
||||||
// Connect establishes the transport connection.
|
// Connect establishes the transport connection.
|
||||||
|
// Called internally by Daemon.Run — callers must NOT call Connect separately.
|
||||||
Connect(ctx context.Context) error
|
Connect(ctx context.Context) error
|
||||||
|
|
||||||
// Close tears down the connection gracefully.
|
// Close tears down the connection gracefully.
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,9 @@
|
||||||
package agent
|
package agent
|
||||||
|
|
||||||
import "time"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
// RegisterRequest is sent by the CLI on startup to register itself.
|
// RegisterRequest is sent by the CLI on startup to register itself.
|
||||||
type RegisterRequest struct {
|
type RegisterRequest struct {
|
||||||
|
|
@ -147,6 +150,17 @@ type ErrorResponse struct {
|
||||||
Details any `json:"details,omitempty"`
|
Details any `json:"details,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HTTPError represents an HTTP API error with a status code.
|
||||||
|
// Use errors.As to extract the status code for retry decisions.
|
||||||
|
type HTTPError struct {
|
||||||
|
StatusCode int
|
||||||
|
Message string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *HTTPError) Error() string {
|
||||||
|
return fmt.Sprintf("API error %d: %s", e.StatusCode, e.Message)
|
||||||
|
}
|
||||||
|
|
||||||
// AgentInfo holds metadata about the running agent for display.
|
// AgentInfo holds metadata about the running agent for display.
|
||||||
type AgentInfo struct {
|
type AgentInfo struct {
|
||||||
ID string
|
ID string
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue