diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d683e2..55a3094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Cluster Group Mode (HA Failover)**: Added per-group `group_settings` with `mode: cluster` so a group can connect through one active profile and automatically fail over to the next healthy candidate. +- **Group Mode Controls in Profiles UI**: Added a mode selector to the Edit Group dialog and a cluster mode marker in the Connection Profiles list. + +### Fixed + +- **Tasks Panel Focus Freeze**: Fixed an immediate freeze when tabbing from Task History to Active Operations by keeping task table cells selectable and avoiding a tview table selection loop. +- **Cluster Failover Refresh Re-entrancy**: Triggered failover refresh asynchronously to avoid nested `QueueUpdateDraw` callback paths. + ## [1.0.19] - 2026-02-21 ### Added diff --git a/internal/config/config.go b/internal/config/config.go index c910010..5afeed9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -119,6 +119,7 @@ type Config struct { Theme ThemeConfig `yaml:"theme"` Plugins PluginConfig `yaml:"plugins"` ShowIcons bool `yaml:"show_icons"` + GroupSettings map[string]GroupSettingsConfig `yaml:"group_settings,omitempty"` // Deprecated: legacy single-profile fields for migration Addr string `yaml:"addr"` User string `yaml:"user"` @@ -405,6 +406,7 @@ func (c *Config) MergeWithFile(path string) error { Enabled []string `yaml:"enabled"` } `yaml:"plugins"` ShowIcons *bool `yaml:"show_icons"` + GroupSettings map[string]GroupSettingsConfig `yaml:"group_settings"` // Legacy fields for migration Addr string `yaml:"addr"` User string `yaml:"user"` @@ -667,6 +669,14 @@ func (c *Config) MergeWithFile(path string) error { c.Theme.Colors[k] = v } + + // Merge group_settings configuration if provided + if fileConfig.GroupSettings != nil { + c.GroupSettings = make(map[string]GroupSettingsConfig, len(fileConfig.GroupSettings)) + for k, v := range fileConfig.GroupSettings { + c.GroupSettings[k] = v + } + } // Decrypt sensitive fields if not using SOPS // SOPS handles encryption/decryption itself, so we only decrypt age-encrypted fields if !IsSOPSEncrypted(path, data) { diff --git a/internal/config/marshal.go b/internal/config/marshal.go index 01bde95..2195196 100644 --- a/internal/config/marshal.go +++ b/internal/config/marshal.go @@ -10,6 +10,7 @@ type marshaledConfig struct { KeyBindings KeyBindings `yaml:"key_bindings,omitempty"` Theme ThemeConfig `yaml:"theme,omitempty"` Plugins PluginConfig `yaml:"plugins"` + GroupSettings map[string]GroupSettingsConfig `yaml:"group_settings,omitempty"` Addr string `yaml:"addr,omitempty"` User string `yaml:"user,omitempty"` Password string `yaml:"password,omitempty"` @@ -39,6 +40,7 @@ func (cfg *Config) MarshalYAML() (any, error) { KeyBindings: cfg.KeyBindings, Theme: cfg.Theme, Plugins: cfg.Plugins, + GroupSettings: cfg.GroupSettings, } if len(cfg.Profiles) == 0 { diff --git a/internal/config/profiles.go b/internal/config/profiles.go index bd97ec7..63b5e61 100644 --- a/internal/config/profiles.go +++ b/internal/config/profiles.go @@ -38,6 +38,27 @@ type ProfileConfig struct { Groups []string `yaml:"groups,omitempty"` } + +// GroupMode constants define the operational mode for a group. +const ( + // GroupModeAggregate is the default mode: all profiles connect simultaneously + // and their data is merged into a unified view (multi-cluster). + GroupModeAggregate = "aggregate" + + // GroupModeCluster connects to a single profile at a time with automatic + // failover to the next profile when the active one becomes unreachable. + // This is intended for multiple nodes of the same Proxmox cluster. + GroupModeCluster = "cluster" +) + +// GroupSettingsConfig holds per-group configuration options. +type GroupSettingsConfig struct { + // Mode determines the operational mode for the group. + // "aggregate" (default): connect to all profiles, merge data. + // "cluster": connect to one profile at a time with HA failover. + Mode string `yaml:"mode"` +} + // ApplyProfile applies the settings from a named profile to the main config. func (c *Config) ApplyProfile(profileName string) error { if c.Profiles == nil { @@ -259,9 +280,9 @@ func (c *Config) HasGroups() bool { } // ValidateGroups checks that group configurations are valid. +// This validates naming conflicts and group_settings entries. func (c *Config) ValidateGroups() error { groups := c.GetGroups() - for groupName := range groups { // Check for naming conflicts between profiles and groups if _, exists := c.Profiles[groupName]; exists { @@ -269,6 +290,23 @@ func (c *Config) ValidateGroups() error { } } + // Validate group_settings entries + for name, settings := range c.GroupSettings { + // Group settings must reference an actual group + if _, exists := groups[name]; !exists { + return fmt.Errorf("group_settings '%s' does not match any group", name) + } + + // Validate mode value + switch settings.Mode { + case GroupModeAggregate, GroupModeCluster, "": + // valid + default: + return fmt.Errorf("group_settings '%s' has invalid mode '%s' (must be '%s' or '%s')", + name, settings.Mode, GroupModeAggregate, GroupModeCluster) + } + + } return nil } @@ -286,3 +324,33 @@ func (c *Config) FindGroupProfileNameConflicts() []string { sort.Strings(conflicts) return conflicts } + +// GetGroupMode returns the operational mode for a group. +// Returns GroupModeCluster if configured, otherwise GroupModeAggregate (default). +func (c *Config) GetGroupMode(groupName string) string { + if settings, exists := c.GroupSettings[groupName]; exists { + if settings.Mode == GroupModeCluster { + return GroupModeCluster + } + } + return GroupModeAggregate +} + +// IsClusterGroup returns true if the named group is configured in cluster (HA failover) mode. +func (c *Config) IsClusterGroup(groupName string) bool { + return c.GetGroupMode(groupName) == GroupModeCluster +} + +// SetGroupMode sets the operational mode for a group. +// Creates the GroupSettings map if needed. +func (c *Config) SetGroupMode(groupName string, mode string) { + if c.GroupSettings == nil { + c.GroupSettings = make(map[string]GroupSettingsConfig) + } + if mode == GroupModeAggregate || mode == "" { + // Aggregate is the default — remove the entry to keep config clean + delete(c.GroupSettings, groupName) + return + } + c.GroupSettings[groupName] = GroupSettingsConfig{Mode: mode} +} diff --git a/internal/config/profiles_test.go b/internal/config/profiles_test.go index 9de9bd6..8990a1e 100644 --- a/internal/config/profiles_test.go +++ b/internal/config/profiles_test.go @@ -67,3 +67,33 @@ func TestGetProfilesInGroup(t *testing.T) { pNames3 := cfg.GetProfileNamesInGroup("group3") assert.Empty(t, pNames3) } + +func TestValidateGroupsRejectsStaleGroupSettings(t *testing.T) { + cfg := &Config{ + Profiles: map[string]ProfileConfig{ + "p1": {Groups: []string{"group1"}}, + }, + GroupSettings: map[string]GroupSettingsConfig{ + "deleted-group": {Mode: GroupModeCluster}, + }, + } + + err := cfg.ValidateGroups() + assert.Error(t, err) + assert.Contains(t, err.Error(), "group_settings 'deleted-group' does not match any group") +} + +func TestValidateGroupsAcceptsClusterModeSettingForExistingGroup(t *testing.T) { + cfg := &Config{ + Profiles: map[string]ProfileConfig{ + "p1": {Groups: []string{"group1"}}, + }, + GroupSettings: map[string]GroupSettingsConfig{ + "group1": {Mode: GroupModeCluster}, + }, + } + + err := cfg.ValidateGroups() + assert.NoError(t, err) + assert.True(t, cfg.IsClusterGroup("group1")) +} diff --git a/internal/ui/components/app.go b/internal/ui/components/app.go index b3fd281..2a55696 100644 --- a/internal/ui/components/app.go +++ b/internal/ui/components/app.go @@ -27,6 +27,8 @@ type App struct { groupManager *api.GroupClientManager isGroupMode bool groupName string + clusterClient *api.ClusterClient + isClusterMode bool config config.Config configPath string vncService *vnc.Service @@ -477,6 +479,16 @@ func (a *App) GroupManager() *api.GroupClientManager { return a.groupManager } +// IsClusterMode returns whether the app is running in cluster (HA failover) mode. +func (a *App) IsClusterMode() bool { + return a.isClusterMode +} + +// ClusterClient returns the cluster client if in cluster mode, nil otherwise. +func (a *App) ClusterClient() *api.ClusterClient { + return a.clusterClient +} + // Header returns the header component instance. func (a *App) Header() HeaderComponent { return a.header diff --git a/internal/ui/components/app_lifecycle.go b/internal/ui/components/app_lifecycle.go index da50047..cd9f28e 100644 --- a/internal/ui/components/app_lifecycle.go +++ b/internal/ui/components/app_lifecycle.go @@ -15,6 +15,10 @@ func (a *App) Run() error { defer func() { a.stopAutoRefresh() + // Stop cluster health checks on exit + if a.clusterClient != nil { + a.clusterClient.Close() + } a.cancel() }() @@ -37,12 +41,15 @@ func (a *App) Run() error { // updateHeaderWithActiveProfile updates the header to show the current active profile or group. func (a *App) updateHeaderWithActiveProfile() { - if a.isGroupMode { - // In group mode, show "Group: " + if a.isClusterMode && a.clusterClient != nil { + // In cluster mode, show "Cluster: (via )" + activeProfile := a.clusterClient.GetActiveProfileName() + a.header.ShowActiveProfile(fmt.Sprintf("Cluster: %s (via %s)", a.groupName, activeProfile)) + } else if a.isGroupMode { + // In aggregate group mode, show "Group: " a.header.ShowActiveProfile(fmt.Sprintf("Group: %s", a.groupName)) } else { profileName := a.config.GetActiveProfile() - if profileName == "" { a.header.ShowActiveProfile("") } else { diff --git a/internal/ui/components/connection_profiles.go b/internal/ui/components/connection_profiles.go index 81d7e97..ce24012 100644 --- a/internal/ui/components/connection_profiles.go +++ b/internal/ui/components/connection_profiles.go @@ -83,16 +83,18 @@ func (a *App) showConnectionProfilesDialog() { for _, groupName := range groupNames { profileNames := groups[groupName] - displayName := fmt.Sprintf("▸ %s (%d profiles)", groupName, len(profileNames)) - - // Check if currently connected to this group + // Determine group mode indicator + var modeTag string + if a.config.IsClusterGroup(groupName) { + modeTag = " [accent]cluster[-]" + } - if a.isGroupMode && a.groupName == groupName { + displayName := fmt.Sprintf("▸ %s (%d profiles)%s", groupName, len(profileNames), modeTag) + // Check if currently connected to this group (aggregate or cluster mode) + if (a.isGroupMode || a.isClusterMode) && a.groupName == groupName { displayName = "⚡ " + displayName - } - if groupName == a.config.DefaultProfile { displayName = displayName + " ⭐" } @@ -574,6 +576,14 @@ func (a *App) showDeleteGroupDialog(groupName string) { hasChanges = true } + // Remove per-group settings to avoid stale group_settings entries. + if a.config.GroupSettings != nil { + if _, exists := a.config.GroupSettings[groupName]; exists { + delete(a.config.GroupSettings, groupName) + hasChanges = true + } + } + if hasChanges { // Save the config @@ -979,6 +989,15 @@ func (a *App) showEditGroupDialog(groupName string) { form.SetBorderColor(theme.Colors.Border) + // Mode selector (aggregate vs cluster) + modeOptions := []string{config.GroupModeAggregate, config.GroupModeCluster} + currentMode := a.config.GetGroupMode(groupName) + currentModeIndex := 0 + if currentMode == config.GroupModeCluster { + currentModeIndex = 1 + } + form.AddDropDown("Mode", modeOptions, currentModeIndex, nil) + // Collect and sort profile names profileNames := make([]string, 0) @@ -1039,6 +1058,13 @@ func (a *App) showEditGroupDialog(groupName string) { hasChanges := false + // Persist mode selection + _, selectedMode := form.GetFormItemByLabel("Mode").(*tview.DropDown).GetCurrentOption() + if selectedMode != a.config.GetGroupMode(groupName) { + a.config.SetGroupMode(groupName, selectedMode) + hasChanges = true + } + for name, checked := range selections { profile := a.config.Profiles[name] diff --git a/internal/ui/components/connection_profiles_operations.go b/internal/ui/components/connection_profiles_operations.go index 77a9c8b..d2fcbc4 100644 --- a/internal/ui/components/connection_profiles_operations.go +++ b/internal/ui/components/connection_profiles_operations.go @@ -14,6 +14,36 @@ import ( "github.com/devnullvoid/pvetui/pkg/api" ) +func (a *App) deactivateGroupModes(uiLogger interface { + Debug(format string, args ...interface{}) +}) { + if a.isGroupMode { + uiLogger.Debug("Disabling group mode") + if a.groupManager != nil { + a.groupManager.Close() + } + a.groupManager = nil + a.isGroupMode = false + } + + if a.isClusterMode { + uiLogger.Debug("Disabling cluster mode") + if a.clusterClient != nil { + a.clusterClient.Close() + } + a.clusterClient = nil + a.isClusterMode = false + } + + if a.groupName != "" { + a.groupName = "" + } + + if a.tasksList != nil { + a.tasksList.Clear() + } +} + // applyConnectionProfile applies the selected connection profile. func (a *App) applyConnectionProfile(profileName string) { // Show loading indicator @@ -59,18 +89,8 @@ func (a *App) applyConnectionProfile(profileName string) { a.vncService.UpdateClient(client) } - // Clear group mode state - if a.isGroupMode { - uiLogger.Debug("Disabling group mode") - if a.groupManager != nil { - a.groupManager.Close() - } - a.groupManager = nil - a.isGroupMode = false - a.groupName = "" - // Clear tasks list to remove group tasks - a.tasksList.Clear() - } + // Leaving either group mode must tear down mode-specific background state. + a.deactivateGroupModes(uiLogger) a.QueueUpdateDraw(func() { // Update the header to show the new active profile @@ -89,15 +109,22 @@ func (a *App) applyConnectionProfile(profileName string) { }() } -// switchToGroup switches to a group cluster view. +// switchToGroup switches to a group view (aggregate or cluster mode). func (a *App) switchToGroup(groupName string) { - // Show loading indicator + // Check if this is a cluster (HA failover) group + if a.config.IsClusterGroup(groupName) { + a.switchToClusterGroup(groupName) + return + } + + // Show loading indicator (aggregate mode) a.header.ShowLoading(fmt.Sprintf("Connecting to group '%s'...", groupName)) // Run group initialization in goroutine to avoid blocking UI go func() { uiLogger := models.GetUILogger() uiLogger.Debug("Starting group switch to: %s", groupName) + a.deactivateGroupModes(uiLogger) // Get profile names for this group profileNames := a.config.GetProfileNamesInGroup(groupName) @@ -267,6 +294,138 @@ func (a *App) switchToGroup(groupName string) { }() } +// switchToClusterGroup switches to a cluster (HA failover) group. +// Unlike aggregate mode which connects to ALL profiles, cluster mode connects +// to ONE profile at a time and fails over to the next candidate if the active +// node becomes unreachable. The app behaves as a normal single-profile connection. +func (a *App) switchToClusterGroup(groupName string) { + // Show loading indicator + a.header.ShowLoading(fmt.Sprintf("Connecting to cluster '%s'...", groupName)) + + // Run cluster initialization in goroutine to avoid blocking UI + go func() { + uiLogger := models.GetUILogger() + uiLogger.Debug("Starting cluster group switch to: %s", groupName) + a.deactivateGroupModes(uiLogger) + + // Get profile names for this group + profileNames := a.config.GetProfileNamesInGroup(groupName) + if len(profileNames) == 0 { + uiLogger.Error("No profiles found for cluster group %s", groupName) + a.QueueUpdateDraw(func() { + a.header.ShowError(fmt.Sprintf("No profiles found for cluster group '%s'", groupName)) + }) + return + } + + uiLogger.Debug("Found %d profiles in cluster group %s: %v", len(profileNames), groupName, profileNames) + + // Create cluster client + cc := api.NewClusterClient( + groupName, + models.GetUILogger(), + a.client.GetCache(), // Use existing cache + ) + + // Build profile entries + var profiles []api.ProfileEntry + for _, name := range profileNames { + profile, exists := a.config.Profiles[name] + if !exists { + uiLogger.Debug("Profile %s not found in config, skipping", name) + continue + } + + // Create a config object from the profile for the adapter + profileConfig := &config.Config{ + Addr: profile.Addr, + User: profile.User, + Password: profile.Password, + TokenID: profile.TokenID, + TokenSecret: profile.TokenSecret, + Realm: profile.Realm, + ApiPath: profile.ApiPath, + Insecure: profile.Insecure, + SSHUser: profile.SSHUser, + VMSSHUser: profile.VMSSHUser, + CacheDir: a.config.CacheDir, + Debug: a.config.Debug, + } + + profiles = append(profiles, api.ProfileEntry{ + Name: name, + Config: adapters.NewConfigAdapter(profileConfig), + }) + } + + if len(profiles) == 0 { + uiLogger.Error("No valid profiles to initialize for cluster group %s", groupName) + a.QueueUpdateDraw(func() { + a.header.ShowError("No valid profiles found") + }) + return + } + + // Initialize cluster client (connects to first available candidate) + ctx := context.Background() + uiLogger.Debug("Initializing cluster client with %d candidates", len(profiles)) + + if err := cc.Initialize(ctx, profiles); err != nil { + uiLogger.Error("Failed to initialize cluster group %s: %v", groupName, err) + a.QueueUpdateDraw(func() { + a.header.ShowError(fmt.Sprintf("Failed to connect to any candidate: %v", err)) + }) + return + } + + uiLogger.Debug("Cluster group initialized, active profile: %s", cc.GetActiveProfileName()) + + // Register failover callback — updates the app when failover occurs + cc.SetOnFailover(func(oldProfile, newProfile string) { + a.QueueUpdateDraw(func() { + if !a.isClusterMode || a.clusterClient != cc { + uiLogger.Debug("[CLUSTER] Ignoring stale failover callback for inactive cluster client (%s -> %s)", oldProfile, newProfile) + return + } + uiLogger.Info("[CLUSTER] Failover callback: %s -> %s", oldProfile, newProfile) + a.client = cc.GetActiveClient() + if a.client == nil { + uiLogger.Error("[CLUSTER] Failover callback has nil active client for %s", newProfile) + return + } + if a.vncService != nil { + a.vncService.UpdateClient(a.client) + } + a.updateHeaderWithActiveProfile() + a.header.ShowWarning(fmt.Sprintf("Failover: %s \u2192 %s", oldProfile, newProfile)) + go a.manualRefresh() + }) + }) + + // Start health checks + cc.StartHealthCheck() + + // Update app state on UI thread + a.QueueUpdateDraw(func() { + // Set cluster mode state + a.clusterClient = cc + a.isClusterMode = true + a.groupName = groupName + a.client = cc.GetActiveClient() + if a.vncService != nil { + a.vncService.UpdateClient(a.client) + } + + // Update header to show cluster mode + a.updateHeaderWithActiveProfile() + a.header.ShowSuccess(fmt.Sprintf("Connected to cluster '%s' via %s", groupName, cc.GetActiveProfileName())) + }) + + // Trigger refresh to load data through normal single-profile flow + a.manualRefresh() + }() +} + // showDeleteProfileDialog displays a confirmation dialog for deleting a profile. func (a *App) showDeleteProfileDialog(profileName string) { // Store last focused primitive diff --git a/internal/ui/components/tasks_list.go b/internal/ui/components/tasks_list.go index e9b79f6..6150831 100644 --- a/internal/ui/components/tasks_list.go +++ b/internal/ui/components/tasks_list.go @@ -173,7 +173,6 @@ func (tl *TasksList) Refresh() { tc := tview.NewTableCell(column.title). SetTextColor(theme.Colors.HeaderText). SetAlign(tview.AlignLeft). - SetSelectable(false). SetExpansion(column.expansion) tl.activeTable.SetCell(0, i, tc) } @@ -230,7 +229,6 @@ func (tl *TasksList) Refresh() { tl.activeTable.SetCell(1, 0, tview.NewTableCell("No active operations"). SetTextColor(theme.Colors.Secondary). SetAlign(tview.AlignCenter). - SetSelectable(false). SetExpansion(1)) } @@ -257,7 +255,6 @@ func (tl *TasksList) updateHistoryTable() { tc := tview.NewTableCell(column.title). SetTextColor(theme.Colors.HeaderText). SetAlign(tview.AlignLeft). - SetSelectable(false). SetExpansion(column.expansion) tl.historyTable.SetCell(0, i, tc) } diff --git a/pkg/api/cluster_client.go b/pkg/api/cluster_client.go new file mode 100644 index 0000000..37f811c --- /dev/null +++ b/pkg/api/cluster_client.go @@ -0,0 +1,321 @@ +// Package api provides Proxmox API client functionality. +package api + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/devnullvoid/pvetui/pkg/api/interfaces" +) + +// Default health check configuration for cluster mode. +const ( + // DefaultHealthCheckInterval is the default interval between health check pings. + DefaultHealthCheckInterval = 30 * time.Second + + // DefaultHealthCheckTimeout is the timeout for a single health check request. + DefaultHealthCheckTimeout = 10 * time.Second + + // healthCheckPath is the lightweight Proxmox API endpoint used for health checks. + healthCheckPath = "/version" +) + +// ClusterClient manages a group of Proxmox nodes from the same cluster, +// connecting to one at a time with automatic failover. Unlike GroupClientManager +// which connects to ALL profiles simultaneously (aggregate mode), ClusterClient +// maintains a single active connection and fails over to the next candidate +// when the active node becomes unreachable. +type ClusterClient struct { + groupName string + candidates []ProfileEntry // ordered list of failover candidates + activeClient *Client + activeProfile string + activeIndex int + logger interfaces.Logger + cache interfaces.Cache + + // Health check state + healthTicker *time.Ticker + healthStop chan struct{} + healthInterval time.Duration + + // Failover callback — called after a successful failover. + // Parameters: oldProfile, newProfile. + onFailover func(oldProfile, newProfile string) + + mu sync.RWMutex +} + +// NewClusterClient creates a new cluster client manager for HA failover mode. +// The candidates list defines the failover order: first entry is preferred, +// subsequent entries are tried in order when the active node is unreachable. +func NewClusterClient( + groupName string, + logger interfaces.Logger, + cache interfaces.Cache, +) *ClusterClient { + return &ClusterClient{ + groupName: groupName, + logger: logger, + cache: cache, + activeIndex: -1, + healthInterval: DefaultHealthCheckInterval, + } +} + +// Initialize connects to the first available candidate in order. +// It tries each candidate sequentially until one succeeds. +// Returns an error only if ALL candidates fail to connect. +func (cc *ClusterClient) Initialize(ctx context.Context, profiles []ProfileEntry) error { + cc.mu.Lock() + defer cc.mu.Unlock() + + if len(profiles) == 0 { + return fmt.Errorf("no profiles provided for cluster group '%s'", cc.groupName) + } + + cc.candidates = profiles + cc.activeClient = nil + cc.activeProfile = "" + cc.activeIndex = -1 + + // Try each candidate in order + var lastErr error + for i, entry := range profiles { + cc.logger.Info("[CLUSTER] Trying candidate %d/%d: %s", i+1, len(profiles), entry.Name) + + client, err := NewClient(entry.Config, + WithLogger(cc.logger), + WithCache(cc.cache), + ) + if err != nil { + cc.logger.Error("[CLUSTER] Failed to connect to %s: %v", entry.Name, err) + lastErr = fmt.Errorf("profile %s: %w", entry.Name, err) + continue + } + + cc.activeClient = client + cc.activeProfile = entry.Name + cc.activeIndex = i + cc.logger.Info("[CLUSTER] Connected to %s (candidate %d/%d)", + entry.Name, i+1, len(profiles)) + return nil + } + + return fmt.Errorf("failed to connect to any candidate in cluster group '%s': %w", + cc.groupName, lastErr) +} + +// GetActiveClient returns the currently active API client. +// This is the primary method used by the app — it returns a single *Client +// that can be used identically to a regular single-profile connection. +func (cc *ClusterClient) GetActiveClient() *Client { + cc.mu.RLock() + defer cc.mu.RUnlock() + return cc.activeClient +} + +// GetActiveProfileName returns the name of the currently active profile. +func (cc *ClusterClient) GetActiveProfileName() string { + cc.mu.RLock() + defer cc.mu.RUnlock() + return cc.activeProfile +} + +// GetGroupName returns the cluster group name. +func (cc *ClusterClient) GetGroupName() string { + return cc.groupName +} + +// SetOnFailover registers a callback that fires after a successful failover. +// The callback receives the old and new profile names. +func (cc *ClusterClient) SetOnFailover(callback func(oldProfile, newProfile string)) { + cc.mu.Lock() + defer cc.mu.Unlock() + cc.onFailover = callback +} + +// SetHealthCheckInterval configures the interval between health check pings. +// Must be called before StartHealthCheck. +func (cc *ClusterClient) SetHealthCheckInterval(interval time.Duration) { + cc.mu.Lock() + defer cc.mu.Unlock() + cc.healthInterval = interval +} + +// Failover attempts to connect to the next available candidate. +// It tries candidates in round-robin order starting from the one after +// the currently active profile. Returns an error if no candidate is reachable. +func (cc *ClusterClient) Failover(ctx context.Context) error { + cc.mu.Lock() + defer cc.mu.Unlock() + + return cc.failoverLocked(ctx) +} + +// failoverLocked performs the failover while holding the write lock. +func (cc *ClusterClient) failoverLocked(ctx context.Context) error { + if len(cc.candidates) == 0 { + return fmt.Errorf("no candidates available for failover in cluster group '%s'", cc.groupName) + } + + oldProfile := cc.activeProfile + numCandidates := len(cc.candidates) + + cc.logger.Info("[CLUSTER] Initiating failover from %s", oldProfile) + + // Try each candidate starting from the next one in order + for attempt := 1; attempt <= numCandidates; attempt++ { + idx := (cc.activeIndex + attempt) % numCandidates + entry := cc.candidates[idx] + + cc.logger.Info("[CLUSTER] Failover attempt %d/%d: trying %s", + attempt, numCandidates, entry.Name) + + client, err := NewClient(entry.Config, + WithLogger(cc.logger), + WithCache(cc.cache), + ) + if err != nil { + cc.logger.Error("[CLUSTER] Failover to %s failed: %v", entry.Name, err) + continue + } + + cc.activeClient = client + cc.activeProfile = entry.Name + cc.activeIndex = idx + cc.logger.Info("[CLUSTER] Failover successful: %s -> %s", oldProfile, entry.Name) + + // Notify callback outside the lock to avoid deadlocks + if cc.onFailover != nil { + callback := cc.onFailover + newProfile := entry.Name + go callback(oldProfile, newProfile) + } + + return nil + } + + return fmt.Errorf("failover exhausted: no reachable candidate in cluster group '%s'", cc.groupName) +} + +// StartHealthCheck begins periodic health checking of the active node. +// If the active node becomes unreachable, it automatically triggers failover. +// The health check pings the /api2/json/version endpoint which is lightweight. +func (cc *ClusterClient) StartHealthCheck() { + cc.mu.Lock() + defer cc.mu.Unlock() + + // Stop any existing health check + cc.stopHealthCheckLocked() + + cc.healthStop = make(chan struct{}) + cc.healthTicker = time.NewTicker(cc.healthInterval) + + go cc.healthCheckLoop() + + cc.logger.Info("[CLUSTER] Health check started (interval: %s)", cc.healthInterval) +} + +// StopHealthCheck stops the periodic health checking. +func (cc *ClusterClient) StopHealthCheck() { + cc.mu.Lock() + defer cc.mu.Unlock() + cc.stopHealthCheckLocked() +} + +// stopHealthCheckLocked stops the health check while holding the lock. +func (cc *ClusterClient) stopHealthCheckLocked() { + if cc.healthStop != nil { + close(cc.healthStop) + cc.healthStop = nil + } + if cc.healthTicker != nil { + cc.healthTicker.Stop() + cc.healthTicker = nil + } +} + +// healthCheckLoop runs the periodic health check in a goroutine. +func (cc *ClusterClient) healthCheckLoop() { + cc.mu.RLock() + ticker := cc.healthTicker + stop := cc.healthStop + cc.mu.RUnlock() + + if ticker == nil || stop == nil { + return + } + + for { + select { + case <-stop: + return + case <-ticker.C: + cc.performHealthCheck() + } + } +} + +// performHealthCheck pings the active node and triggers failover if unreachable. +func (cc *ClusterClient) performHealthCheck() { + cc.mu.RLock() + client := cc.activeClient + profile := cc.activeProfile + cc.mu.RUnlock() + + if client == nil { + return + } + + // Use a short timeout for health checks + ctx, cancel := context.WithTimeout(context.Background(), DefaultHealthCheckTimeout) + defer cancel() + + // Ping the version endpoint — lightweight, no auth issues + var result map[string]interface{} + err := client.httpClient.GetWithRetry(ctx, healthCheckPath, &result, 1) + if err != nil { + cc.logger.Error("[CLUSTER] Health check failed for %s: %v", profile, err) + cc.logger.Info("[CLUSTER] Triggering automatic failover") + + // Trigger failover + cc.mu.Lock() + failoverErr := cc.failoverLocked(ctx) + cc.mu.Unlock() + + if failoverErr != nil { + cc.logger.Error("[CLUSTER] Automatic failover failed: %v", failoverErr) + } + return + } + + cc.logger.Debug("[CLUSTER] Health check OK for %s", profile) +} + +// Close stops health checks and cleans up resources. +func (cc *ClusterClient) Close() { + cc.StopHealthCheck() + + cc.mu.Lock() + defer cc.mu.Unlock() + + cc.activeClient = nil + cc.activeProfile = "" + cc.activeIndex = -1 + cc.logger.Info("[CLUSTER] Cluster client closed for group '%s'", cc.groupName) +} + +// GetCandidateNames returns the names of all failover candidates in order. +func (cc *ClusterClient) GetCandidateNames() []string { + cc.mu.RLock() + defer cc.mu.RUnlock() + + names := make([]string, len(cc.candidates)) + for i, c := range cc.candidates { + names[i] = c.Name + } + return names +} \ No newline at end of file diff --git a/pkg/api/grouped-cluster.go b/pkg/api/grouped-cluster.go index f933729..11a6d53 100644 --- a/pkg/api/grouped-cluster.go +++ b/pkg/api/grouped-cluster.go @@ -5,8 +5,80 @@ import ( "context" "fmt" "sort" + "strings" ) +func deduplicateGroupNodes(nodes []*Node) []*Node { + seen := make(map[string]*Node, len(nodes)) + order := make([]string, 0, len(nodes)) + + for _, node := range nodes { + if node == nil { + continue + } + + // Keep placeholders unique per source profile. + if strings.HasPrefix(node.ID, "offline-") { + key := "placeholder|" + node.ID + if _, exists := seen[key]; !exists { + seen[key] = node + order = append(order, key) + } + continue + } + + key := fmt.Sprintf("node|%s|%s", node.Name, node.IP) + if key == "node||" { + key = fmt.Sprintf("node|%s|%s", node.ID, node.SourceProfile) + } + + existing, exists := seen[key] + if !exists { + seen[key] = node + order = append(order, key) + continue + } + + // Prefer richer online data when de-duplicating. + if !existing.Online && node.Online { + seen[key] = node + } + } + + out := make([]*Node, 0, len(order)) + for _, key := range order { + out = append(out, seen[key]) + } + return out +} + +func deduplicateGroupVMs(vms []*VM) []*VM { + seen := make(map[string]*VM, len(vms)) + order := make([]string, 0, len(vms)) + + for _, vm := range vms { + if vm == nil { + continue + } + + key := fmt.Sprintf("vm|%s|%d|%s", vm.Type, vm.ID, vm.Node) + if key == "vm||0|" { + key = fmt.Sprintf("vm|fallback|%s|%s|%s", vm.Name, vm.Node, vm.SourceProfile) + } + + if _, exists := seen[key]; !exists { + seen[key] = vm + order = append(order, key) + } + } + + out := make([]*VM, 0, len(order)) + for _, key := range order { + out = append(out, seen[key]) + } + return out +} + // GetGroupNodes retrieves nodes from all connected profiles in the group. // Each node's SourceProfile field is set to identify which profile it came from. // Returns a combined list of all nodes across all profiles. @@ -35,6 +107,7 @@ func (m *GroupClientManager) GetGroupNodes(ctx context.Context) ([]*Node, error) allNodes = append(allNodes, nodes...) } } + allNodes = deduplicateGroupNodes(allNodes) // Sort nodes by name for consistent ordering sort.Slice(allNodes, func(i, j int) bool { @@ -83,12 +156,16 @@ func (m *GroupClientManager) GetGroupNodes(ctx context.Context) ([]*Node, error) if !representedProfiles[pc.ProfileName] { status, lastErr := pc.GetStatus() + // Only add placeholders for disconnected/error profiles. + // In aggregate mode, de-duplication can intentionally collapse duplicate + // nodes from multiple connected profiles in the same cluster. + if status == ProfileStatusConnected { + continue + } + // Determine reason for missing data var errorMsg string - if status == ProfileStatusConnected { - // Connected but returned no nodes? - errorMsg = "No Data" - } else if lastErr != nil { + if lastErr != nil { // Use a short error message errorMsg = "Connection Failed" } else { @@ -157,6 +234,7 @@ func (m *GroupClientManager) GetGroupVMs(ctx context.Context) ([]*VM, error) { allVMs = append(allVMs, vms...) } } + allVMs = deduplicateGroupVMs(allVMs) // Sort VMs by profile name, then by node, then by ID sort.Slice(allVMs, func(i, j int) bool { diff --git a/pkg/api/grouped_cluster_test.go b/pkg/api/grouped_cluster_test.go index 00ab12b..63c1366 100644 --- a/pkg/api/grouped_cluster_test.go +++ b/pkg/api/grouped_cluster_test.go @@ -14,12 +14,78 @@ import ( "github.com/devnullvoid/pvetui/pkg/api/testutils" ) +const ( + testAPIClusterResourcesPath = "/api2/json/cluster/resources" + testAPIClusterStatusPath = "/api2/json/cluster/status" + testAPINodesPath = "/api2/json/nodes" +) + +func newSingleNodeClusterServer(ticket, csrfToken, nodeName string, cpu float64) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == testAPITicketPath { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "data": map[string]interface{}{ + "ticket": ticket, + "CSRFPreventionToken": csrfToken, + }, + }) + return + } + if r.URL.Path == testAPIClusterResourcesPath { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "data": []interface{}{ + map[string]interface{}{ + "type": "node", + "node": nodeName, + "cpu": cpu, + "maxcpu": 8, + "mem": 1024.0, + "maxmem": 2048.0, + "disk": 1024.0, + "maxdisk": 2048.0, + "uptime": 100, + }, + }, + }) + return + } + if r.URL.Path == testAPIClusterStatusPath { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "data": []interface{}{ + map[string]interface{}{ + "id": "node/" + nodeName, + "name": nodeName, + "type": "node", + "online": 1, + }, + }, + }) + return + } + if r.URL.Path == testAPINodesPath { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "data": []interface{}{ + map[string]interface{}{ + "node": nodeName, + "status": "online", + }, + }, + }) + return + } + })) +} + func TestGroupClientManager_GetGroupNodes_WithOfflineProfile(t *testing.T) { // 1. Setup mock servers // Server 1: Online server1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/api2/json/access/ticket" { + if r.URL.Path == testAPITicketPath { // Auth w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]interface{}{ @@ -30,7 +96,7 @@ func TestGroupClientManager_GetGroupNodes_WithOfflineProfile(t *testing.T) { }) return } - if r.URL.Path == "/api2/json/cluster/resources" { + if r.URL.Path == testAPIClusterResourcesPath { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]interface{}{ "data": []interface{}{ @@ -49,7 +115,7 @@ func TestGroupClientManager_GetGroupNodes_WithOfflineProfile(t *testing.T) { }) return } - if r.URL.Path == "/api2/json/cluster/status" { + if r.URL.Path == testAPIClusterStatusPath { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]interface{}{ "data": []interface{}{ @@ -63,7 +129,7 @@ func TestGroupClientManager_GetGroupNodes_WithOfflineProfile(t *testing.T) { }) return } - if r.URL.Path == "/api2/json/nodes" { + if r.URL.Path == testAPINodesPath { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(map[string]interface{}{ "data": []interface{}{ @@ -156,3 +222,68 @@ func (m *MockCache) Get(key string, dest interface{}) (bool, error) func (m *MockCache) Set(key string, value interface{}, ttl time.Duration) error { return nil } func (m *MockCache) Delete(key string) error { return nil } func (m *MockCache) Clear() error { return nil } + +func TestDeduplicateGroupNodes(t *testing.T) { + nodes := []*Node{ + {Name: "pve1", IP: "10.0.0.1", SourceProfile: "a", Online: true}, + {Name: "pve1", IP: "10.0.0.1", SourceProfile: "b", Online: true}, + {Name: "pve2", IP: "10.0.0.2", SourceProfile: "a", Online: true}, + {ID: "offline-x", Name: "x", SourceProfile: "x", Online: false}, + {ID: "offline-y", Name: "y", SourceProfile: "y", Online: false}, + } + + got := deduplicateGroupNodes(nodes) + require.Len(t, got, 4) +} + +func TestDeduplicateGroupVMs(t *testing.T) { + vms := []*VM{ + {ID: 100, Type: VMTypeQemu, Node: "pve1", Name: "vm100", SourceProfile: "a"}, + {ID: 100, Type: VMTypeQemu, Node: "pve1", Name: "vm100", SourceProfile: "b"}, + {ID: 101, Type: VMTypeQemu, Node: "pve1", Name: "vm101", SourceProfile: "a"}, + } + + got := deduplicateGroupVMs(vms) + require.Len(t, got, 2) +} + +func TestGroupClientManager_GetGroupNodes_DedupedProfilesDoNotCreateFakePlaceholders(t *testing.T) { + server1 := newSingleNodeClusterServer("ticket1", "csrf1", "pve-main", 0.1) + defer server1.Close() + + server2 := newSingleNodeClusterServer("ticket2", "csrf2", "pve-main", 0.2) + defer server2.Close() + + logger := testutils.NewTestLogger() + manager := NewGroupClientManager("all", logger, &MockCache{}) + profiles := []ProfileEntry{ + { + Name: "default", + Config: &MockConfig{ + Addr: server1.URL, + User: "user", + Password: "password", + }, + }, + { + Name: "backup", + Config: &MockConfig{ + Addr: server2.URL, + User: "user", + Password: "password", + }, + }, + } + + err := manager.Initialize(context.Background(), profiles) + require.NoError(t, err) + + nodes, err := manager.GetGroupNodes(context.Background()) + require.NoError(t, err) + + // One real deduplicated node; no fake "default"/"backup" placeholder nodes. + require.Len(t, nodes, 1) + assert.Equal(t, "pve-main", nodes[0].Name) + assert.NotEqual(t, "default", nodes[0].Name) + assert.NotEqual(t, "backup", nodes[0].Name) +}