diff --git a/pkg/container/watch.go b/pkg/container/watch.go index e4dc2345..2e81d8f3 100644 --- a/pkg/container/watch.go +++ b/pkg/container/watch.go @@ -10,7 +10,6 @@ import ( "github.com/containerd/typeurl/v2" "github.com/patrickmn/go-cache" "github.com/rs/zerolog/log" - "github.com/threefoldtech/zosbase/pkg/stubs" ) func (c *Module) handlerEventTaskExit(ctx context.Context, ns string, event *events.TaskExit) { @@ -55,17 +54,18 @@ func (c *Module) handlerEventTaskExit(ctx context.Context, ns string, event *eve <-time.After(restartDelay) // wait for 2 seconds reason = c.start(ns, event.ContainerID) } else { - reason = fmt.Errorf("deleting container due to so many crashes") + reason = fmt.Errorf("restarting container failed due to so many crashes") } + log.Debug().Err(reason).Msg("failed to restart container") - if reason != nil { - log.Debug().Err(reason).Msg("deleting container due to restart error") + // if reason != nil { + // log.Debug().Err(reason).Msg("deleting container due to restart error") - stub := stubs.NewProvisionStub(c.client) - if err := stub.DecommissionCached(ctx, event.ContainerID, reason.Error()); err != nil { - log.Error().Err(err).Msg("failed to decommission reservation") - } - } + // stub := stubs.NewProvisionStub(c.client) + // if err := stub.DecommissionCached(ctx, event.ContainerID, reason.Error()); err != nil { + // log.Error().Err(err).Msg("failed to decommission reservation") + // } + // } } func (c *Module) handleEvent(ctx context.Context, ns string, event interface{}) { diff --git a/pkg/gridtypes/deployment.go b/pkg/gridtypes/deployment.go index 71d10ef1..6414aa89 100644 --- a/pkg/gridtypes/deployment.go +++ b/pkg/gridtypes/deployment.go @@ -129,8 +129,8 @@ func (d *Deployment) IsActive() bool { active := false for i := range d.Workloads { wl := &d.Workloads[i] - if !wl.Result.State.IsAny(StateDeleted, StateError) { - // not delete or error so is probably active + if !wl.Result.State.IsAny(StateDeleted) { + // not deleted, so is active (includes StateError which needs retry) return true } } diff --git a/pkg/provision.go b/pkg/provision.go index 8169aa84..5e21c606 100644 --- a/pkg/provision.go +++ b/pkg/provision.go @@ -15,6 +15,10 @@ type Provision interface { // GetWorkloadStatus: returns status, bool(true if workload exits otherwise it is false), error GetWorkloadStatus(id string) (gridtypes.ResultState, bool, error) CreateOrUpdate(twin uint32, deployment gridtypes.Deployment, update bool) error + // SetWorkloadError updates workload state to error without decommissioning it + SetWorkloadError(id string, errorMsg string) error + // SetWorkloadOk updates workload state to ok without decommissioning it + SetWorkloadOk(id string) error Get(twin uint32, contractID uint64) (gridtypes.Deployment, error) List(twin uint32) ([]gridtypes.Deployment, error) Changes(twin uint32, contractID uint64) ([]gridtypes.Workload, error) diff --git a/pkg/provision/engine.go b/pkg/provision/engine.go index 9e1f36b0..501584e9 100644 --- a/pkg/provision/engine.go +++ b/pkg/provision/engine.go @@ -1025,6 +1025,72 @@ func (e *NativeEngine) DecommissionCached(id string, reason string) error { return err } +// SetWorkloadError updates workload state to error without decommissioning it +func (e *NativeEngine) SetWorkloadError(id string, errorMsg string) error { + log.Info().Str("workload-id", id).Str("error", errorMsg).Msg("setting workload state to error") + + globalID := gridtypes.WorkloadID(id) + twin, dlID, name, err := globalID.Parts() + if err != nil { + log.Error().Err(err).Str("workload-id", id).Msg("failed to parse workload ID") + return err + } + log.Debug().Uint32("twin", twin).Uint64("deployment", dlID).Str("name", string(name)).Msg("parsed workload ID") + wl, err := e.storage.Current(twin, dlID, name) + if err != nil { + log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dlID).Str("name", string(name)).Msg("failed to get workload from storage") + return err + } + + if wl.Result.State == gridtypes.StateDeleted || + wl.Result.State == gridtypes.StateError { + // nothing to do! + return nil + } + + // Update result to StateError without uninstalling + result := gridtypes.Result{ + Created: gridtypes.Now(), + State: gridtypes.StateError, + Error: errorMsg, + } + + return e.storage.Transaction(twin, dlID, wl.WithResults(result)) +} + +// SetWorkloadOk updates workload state to ok without decommissioning it +func (e *NativeEngine) SetWorkloadOk(id string) error { + log.Info().Str("workload-id", id).Msg("setting workload state to ok") + + globalID := gridtypes.WorkloadID(id) + twin, dlID, name, err := globalID.Parts() + if err != nil { + log.Error().Err(err).Str("workload-id", id).Msg("failed to parse workload ID") + return err + } + log.Debug().Uint32("twin", twin).Uint64("deployment", dlID).Str("name", string(name)).Msg("parsed workload ID") + wl, err := e.storage.Current(twin, dlID, name) + if err != nil { + log.Error().Err(err).Uint32("twin", twin).Uint64("deployment", dlID).Str("name", string(name)).Msg("failed to get workload from storage") + return err + } + + if wl.Result.State == gridtypes.StateDeleted || + wl.Result.State == gridtypes.StateOk { + // nothing to do! + return nil + } + + // Update result to StateOk without uninstalling + result := gridtypes.Result{ + Created: gridtypes.Now(), + State: gridtypes.StateOk, + Error: "", + } + + return e.storage.Transaction(twin, dlID, wl.WithResults(result)) +} + func (n *NativeEngine) CreateOrUpdate(twin uint32, deployment gridtypes.Deployment, update bool) error { if err := deployment.Valid(); err != nil { return err diff --git a/pkg/stubs/provision_stub.go b/pkg/stubs/provision_stub.go index cc24258f..46db6717 100644 --- a/pkg/stubs/provision_stub.go +++ b/pkg/stubs/provision_stub.go @@ -176,3 +176,33 @@ func (s *ProvisionStub) ListTwins(ctx context.Context) (ret0 []uint32, ret1 erro } return } + +func (s *ProvisionStub) SetWorkloadError(ctx context.Context, arg0 string, arg1 string) (ret0 error) { + args := []interface{}{arg0, arg1} + result, err := s.client.RequestContext(ctx, s.module, s.object, "SetWorkloadError", args...) + if err != nil { + panic(err) + } + result.PanicOnError() + ret0 = result.CallError() + loader := zbus.Loader{} + if err := result.Unmarshal(&loader); err != nil { + panic(err) + } + return +} + +func (s *ProvisionStub) SetWorkloadOk(ctx context.Context, arg0 string) (ret0 error) { + args := []interface{}{arg0} + result, err := s.client.RequestContext(ctx, s.module, s.object, "SetWorkloadOk", args...) + if err != nil { + panic(err) + } + result.PanicOnError() + ret0 = result.CallError() + loader := zbus.Loader{} + if err := result.Unmarshal(&loader); err != nil { + panic(err) + } + return +} diff --git a/pkg/vm/monitor.go b/pkg/vm/monitor.go index 2128bbe0..20640640 100644 --- a/pkg/vm/monitor.go +++ b/pkg/vm/monitor.go @@ -22,6 +22,8 @@ const ( monitorEvery = 10 * time.Second logrotateEvery = 10 * time.Minute cleanupEvery = 10 * time.Minute + // cooldownPeriod is the time to wait after a burst of failures before retrying + cooldownPeriod = 6 * time.Hour ) var ( @@ -36,6 +38,13 @@ var ( ) ) +// vmFailureState tracks the failure count and cooldown state for a VM +type vmFailureState struct { + Count int + CooldownUntil time.Time + LastCooldownLog time.Time +} + func (m *Module) logrotate(ctx context.Context) error { log.Debug().Msg("running log rotations for vms") running, err := FindAll() @@ -148,6 +157,14 @@ func (m *Module) cleanupCidata() error { for _, file := range files { name := file.Name() if _, ok := running[name]; !ok { + // Check if VM is being monitored/retried before deleting + marker, exists := m.failures.Get(name) + if exists && marker != permanent { + // VM is in failure tracking (cooldown/retry), don't delete its cloud-init + log.Debug().Str("vm-id", name).Msg("skipping cloud-init cleanup for VM in retry state") + continue + } + log.Debug().Str("vm-id", name).Msg("removing cloud-init for non-running VM") _ = os.Remove(filepath.Join(dir, name)) continue } @@ -164,11 +181,11 @@ func (m *Module) monitorID(ctx context.Context, running map[string]Process, id s return nil } if ps, ok := running[id]; ok { - state, exists, err := stub.GetWorkloadStatus(ctx, id) + workloadState, exists, err := stub.GetWorkloadStatus(ctx, id) if err != nil { return errors.Wrapf(err, "failed to get workload status for vm:%s ", id) } - if !exists || state.IsAny(gridtypes.StateDeleted, gridtypes.StateError) { + if !exists || workloadState.IsAny(gridtypes.StateDeleted) { log.Debug().Str("name", id).Msg("deleting running vm with no active workload") m.removeConfig(id) _ = syscall.Kill(ps.Pid, syscall.SIGKILL) @@ -182,26 +199,86 @@ func (m *Module) monitorID(ctx context.Context, running map[string]Process, id s marker, ok := m.failures.Get(id) if !ok { // no previous value. so this is the first failure - m.failures.Set(id, int(0), cache.DefaultExpiration) + m.failures.Set(id, &vmFailureState{Count: 0, CooldownUntil: time.Time{}, LastCooldownLog: time.Time{}}, cache.DefaultExpiration) + marker, _ = m.failures.Get(id) } if marker == permanent { // if the marker is permanent. it means that this vm - // is being deleted or not monitored. we don't need to take any more action here + // is being deleted or not monitored. we don't need to take any action here // (don't try to restart or delete) m.removeConfig(id) return nil } - count, err := m.failures.IncrementInt(id, 1) - if err != nil { - // this should never happen because we make sure value - // is set - return errors.Wrap(err, "failed to check number of failure for the vm") + // Check if marker is a vmFailureState or old int format + var state *vmFailureState + switch v := marker.(type) { + case *vmFailureState: + state = v + case int: + // Migrate old int format to new struct format + state = &vmFailureState{Count: v, CooldownUntil: time.Time{}, LastCooldownLog: time.Time{}} + m.failures.Set(id, state, cache.DefaultExpiration) + default: + // Unknown format, reset + state = &vmFailureState{Count: 0, CooldownUntil: time.Time{}, LastCooldownLog: time.Time{}} + m.failures.Set(id, state, cache.DefaultExpiration) + } + + // Check if we're in cooldown period + now := time.Now() + if !state.CooldownUntil.IsZero() && now.Before(state.CooldownUntil) { + // Only log cooldown message every 30 minutes + if state.LastCooldownLog.IsZero() || now.Sub(state.LastCooldownLog) >= 30*time.Minute { + log.Debug().Str("name", id). + Time("cooldown_until", state.CooldownUntil). + Dur("remaining", state.CooldownUntil.Sub(now)). + Msg("vm in cooldown period, skipping restart attempt") + state.LastCooldownLog = now + m.failures.Set(id, state, cache.NoExpiration) + } + return nil + } + + // Set error state before attempting restart (if not in cooldown) + // This ensures the database reflects the actual state + log.Info().Str("name", id).Msg("vm detected as down, setting state to error") + log.Debug().Str("workload-id", id).Msg("attempting to set vm state to error") + if err := stub.SetWorkloadError(ctx, id, "vm detected as down"); err != nil { + // Check if deployment actually doesn't exist + if strings.Contains(err.Error(), "deployment does not exist") { + // Verify by checking workload status + _, exists, checkErr := stub.GetWorkloadStatus(ctx, id) + if checkErr == nil && !exists { + log.Warn().Str("workload-id", id).Msg("vm deployment confirmed deleted, stopping monitoring") + // Set permanent marker to prevent further restart attempts + m.failures.Set(id, permanent, cache.NoExpiration) + return nil + } + // If we can't confirm or it exists, treat as transient error and continue + log.Warn().Err(err).Str("workload-id", id).Msg("failed to set vm error state, but continuing retry") + } else { + log.Error().Err(err).Str("workload-id", id).Msg("failed to set vm error state") + } + } + + // If we just exited cooldown, reset the failure count for a new burst + if !state.CooldownUntil.IsZero() && now.After(state.CooldownUntil) { + log.Info().Str("name", id).Msg("cooldown period expired, resetting failure count for new burst") + state.Count = 0 + state.CooldownUntil = time.Time{} + state.LastCooldownLog = time.Time{} // Reset cooldown log timer + // Go back to using DefaultExpiration after cooldown expires + m.failures.Set(id, state, cache.DefaultExpiration) } + // Increment failure count + state.Count++ + m.failures.Set(id, state, cache.DefaultExpiration) + var reason error - if count < failuresBeforeDestroy { + if state.Count < failuresBeforeDestroy { vm, err := MachineFromFile(m.configPath(id)) if err != nil { @@ -216,21 +293,48 @@ func (m *Module) monitorID(ctx context.Context, running map[string]Process, id s return nil } - log.Debug().Str("name", id).Msg("trying to restart the vm") + log.Debug().Str("name", id).Int("attempt", state.Count).Msg("trying to restart the vm") if _, err = vm.Run(ctx, m.socketPath(id), m.logsPath(id)); err != nil { reason = m.withLogs(m.logsPath(id), err) + log.Warn().Err(reason).Str("name", id).Int("failures", state.Count).Msg("vm restart failed") + } else { + // Success! Reset failure count and set state to OK + log.Info().Str("name", id).Msg("vm restarted successfully") + state.Count = 0 + state.CooldownUntil = time.Time{} + m.failures.Set(id, state, cache.DefaultExpiration) + + // Update workload state to OK since VM is running + log.Debug().Str("workload-id", id).Msg("attempting to set vm state to ok") + if err := stub.SetWorkloadOk(ctx, id); err != nil { + // Check if deployment actually doesn't exist + if strings.Contains(err.Error(), "deployment does not exist") { + // Verify by checking workload status + _, exists, checkErr := stub.GetWorkloadStatus(ctx, id) + if checkErr == nil && !exists { + log.Warn().Str("workload-id", id).Msg("vm deployment confirmed deleted, will be stopped on next monitor cycle") + // Set permanent marker - next monitor cycle will kill VM and clean up config + m.failures.Set(id, permanent, cache.NoExpiration) + return nil + } + // If we can't confirm or it exists, treat as transient error and continue + log.Warn().Err(err).Str("workload-id", id).Msg("failed to set vm state to ok, but VM is running") + } else { + log.Error().Err(err).Str("workload-id", id).Msg("failed to set vm state to ok") + } + } } } else { - reason = fmt.Errorf("deleting vm due to so many crashes") - } - - if reason != nil { - log.Debug().Err(reason).Msg("deleting vm due to restart error") - m.removeConfig(id) - - if err := stub.DecommissionCached(ctx, id, reason.Error()); err != nil { - return errors.Wrapf(err, "failed to decommission reservation '%s'", id) - } + // Reached failure limit, enter cooldown period instead of decommissioning + state.CooldownUntil = now.Add(cooldownPeriod) + // Use NoExpiration to ensure cooldown state persists beyond cache default expiration + m.failures.Set(id, state, cache.NoExpiration) + log.Warn().Str("name", id). + Int("failures", state.Count). + Time("cooldown_until", state.CooldownUntil). + Dur("cooldown_duration", cooldownPeriod). + Msg("vm reached failure limit, entering cooldown period before next retry burst") + // Do NOT call DecommissionCached or removeConfig - keep the VM for retry } return nil