diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index d99f5c6..c605c59 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -6,6 +6,7 @@ import ( "flag" "fmt" "os" + "time" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" @@ -14,6 +15,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log/zap" bootcv1alpha1 "github.com/jlebon/bootc-operator/api/v1alpha1" @@ -32,6 +34,9 @@ func init() { } func main() { + var pollInterval time.Duration + flag.DurationVar(&pollInterval, "poll-interval", 5*time.Minute, "Interval for polling bootc status as a fallback to fsnotify") + opts := zap.Options{ Development: true, } @@ -62,17 +67,32 @@ func main() { os.Exit(1) } + statusChanged := make(chan event.GenericEvent, 1) + if err := (&daemon.BootcNodeReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - NodeName: nodeName, - Executor: bootc.NewHostExecutor(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + NodeName: nodeName, + Executor: bootc.NewHostExecutor(), + StatusChanged: statusChanged, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "Failed to create controller", "controller", "bootcnode") os.Exit(1) } - setupLog.Info("Starting daemon", "node", nodeName) + watcher := &daemon.StatusWatcher{ + PollInterval: pollInterval, + PrimaryPath: daemon.DefaultPrimaryPath, + FallbackPath: daemon.DefaultFallbackPath, + Events: statusChanged, + NodeName: nodeName, + } + if err := mgr.Add(watcher); err != nil { + setupLog.Error(err, "Failed to add status watcher") + os.Exit(1) + } + + setupLog.Info("Starting daemon", "node", nodeName, "pollInterval", pollInterval) if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "Failed to run daemon") os.Exit(1) diff --git a/internal/daemon/reconciler.go b/internal/daemon/reconciler.go index cecd60b..896c009 100644 --- a/internal/daemon/reconciler.go +++ b/internal/daemon/reconciler.go @@ -58,17 +58,23 @@ type BootcNodeReconciler struct { stageDone chan event.GenericEvent // rebootIssued tracks whether a reboot has been issued so classifyAction // can distinguish the Staged→Rebooting. - rebootIssued bool + rebootIssued bool + StatusChanged chan event.GenericEvent } func (r *BootcNodeReconciler) SetupWithManager(mgr ctrl.Manager) error { r.stageDone = make(chan event.GenericEvent, 1) - return ctrl.NewControllerManagedBy(mgr). + builder := ctrl.NewControllerManagedBy(mgr). For(&bootcv1alpha1.BootcNode{}). WatchesRawSource(source.Channel(r.stageDone, &handler.EnqueueRequestForObject{})). - Named("bootcnode"). - Complete(r) + Named("bootcnode") + + if r.StatusChanged != nil { + builder = builder.WatchesRawSource(source.Channel(r.StatusChanged, &handler.EnqueueRequestForObject{})) + } + + return builder.Complete(r) } func (r *BootcNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { diff --git a/internal/daemon/watcher.go b/internal/daemon/watcher.go new file mode 100644 index 0000000..f0622a0 --- /dev/null +++ b/internal/daemon/watcher.go @@ -0,0 +1,135 @@ +// SPDX-License-Identifier: Apache-2.0 + +package daemon + +import ( + "context" + "os" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/go-logr/logr" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + bootcv1alpha1 "github.com/jlebon/bootc-operator/api/v1alpha1" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +const ( + // ostree backend + DefaultPrimaryPath = "/proc/1/root/ostree/bootc" + // composefs backend + DefaultFallbackPath = "/proc/1/root/sysroot/state/deploy" +) + +type StatusWatcher struct { + PollInterval time.Duration + PrimaryPath string + FallbackPath string + Events chan event.GenericEvent + NodeName string + Ready chan struct{} +} + +func (w *StatusWatcher) Start(ctx context.Context) error { + log := logf.FromContext(ctx).WithName("status-watcher") + + watchPath := w.resolveWatchPath() + + fsWatcher := w.setupFsnotify(log, watchPath) + + closeFsWatcher := func() { + if fsWatcher != nil { + _ = fsWatcher.Close() + fsWatcher = nil + } + } + defer closeFsWatcher() + + if w.PollInterval <= 0 { + w.PollInterval = 5 * time.Minute + } + + ticker := time.NewTicker(w.PollInterval) + defer ticker.Stop() + + var evCh <-chan fsnotify.Event + var errCh <-chan error + if fsWatcher != nil { + evCh = fsWatcher.Events + errCh = fsWatcher.Errors + } + + if w.Ready != nil { + close(w.Ready) + } + + for { + select { + case <-ctx.Done(): + return nil + case ev := <-evCh: + // bootc updates modify directory mtime, which inotify reports as IN_ATTRIB (Chmod). + if ev.Has(fsnotify.Chmod) { + log.V(1).Info("Detected bootc status change via fsnotify") + w.sendEvent() + } + // Tear down fsnotify so the loop continues with polling only. + // A broken inotify fd never delivers events again, so without this + // the watcher silently stops reacting to filesystem changes. + case err := <-errCh: + log.Error(err, "fsnotify error, degrading to polling only") + closeFsWatcher() + evCh = nil + errCh = nil + case <-ticker.C: + log.V(1).Info("Polling bootc status") + w.sendEvent() + } + } +} + +func (w *StatusWatcher) setupFsnotify(log logr.Logger, watchPath string) *fsnotify.Watcher { + if watchPath == "" { + log.Info("No bootc status path found, using polling only") + return nil + } + + fsWatcher, err := fsnotify.NewWatcher() + if err != nil { + log.Error(err, "Failed to create fsnotify watcher, falling back to polling") + return nil + } + + if err := fsWatcher.Add(watchPath); err != nil { + log.Error(err, "Failed to watch path, falling back to polling", "path", watchPath) + _ = fsWatcher.Close() + return nil + } + + log.Info("Watching path for bootc status changes", "path", watchPath) + return fsWatcher +} + +func (w *StatusWatcher) resolveWatchPath() string { + if _, err := os.Stat(w.PrimaryPath); err == nil { + return w.PrimaryPath + } + if _, err := os.Stat(w.FallbackPath); err == nil { + return w.FallbackPath + } + return "" +} + +func (w *StatusWatcher) sendEvent() { + ev := event.GenericEvent{ + Object: &bootcv1alpha1.BootcNode{ + ObjectMeta: metav1.ObjectMeta{Name: w.NodeName}, + }, + } + select { + case w.Events <- ev: + default: + } +} diff --git a/internal/daemon/watcher_test.go b/internal/daemon/watcher_test.go new file mode 100644 index 0000000..29de499 --- /dev/null +++ b/internal/daemon/watcher_test.go @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: Apache-2.0 + +package daemon + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "sigs.k8s.io/controller-runtime/pkg/event" +) + +func startWatcher(t *testing.T, w *StatusWatcher) (done <-chan error, cancel context.CancelFunc) { + t.Helper() + ctx, cancel := context.WithCancel(context.Background()) + ch := make(chan error, 1) + go func() { ch <- w.Start(ctx) }() + <-w.Ready + return ch, cancel +} + +func TestWatcherEvents(t *testing.T) { + tests := []struct { + name string + mkPrimary bool + mkFallback bool + touchPrimary bool + touchFallback bool + pollInterval time.Duration + }{ + { + name: "Fsnotify", + mkPrimary: true, + touchPrimary: true, + pollInterval: 10 * time.Minute, + }, + { + name: "FallbackPath", + mkFallback: true, + touchFallback: true, + pollInterval: 10 * time.Minute, + }, + { + name: "PollOnly", + pollInterval: 200 * time.Millisecond, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dir := t.TempDir() + primaryPath := filepath.Join(dir, "bootc") + fallbackPath := filepath.Join(dir, "deploy") + + if tt.mkPrimary { + if err := os.Mkdir(primaryPath, 0o755); err != nil { + t.Fatal(err) + } + } + if tt.mkFallback { + if err := os.Mkdir(fallbackPath, 0o755); err != nil { + t.Fatal(err) + } + } + + events := make(chan event.GenericEvent, 1) + w := &StatusWatcher{ + PollInterval: tt.pollInterval, + PrimaryPath: primaryPath, + FallbackPath: fallbackPath, + Events: events, + NodeName: "test-node", + Ready: make(chan struct{}), + } + + done, cancel := startWatcher(t, w) + defer cancel() + + now := time.Now() + if tt.touchPrimary { + if err := os.Chtimes(primaryPath, now, now); err != nil { + t.Fatal(err) + } + } + if tt.touchFallback { + if err := os.Chtimes(fallbackPath, now, now); err != nil { + t.Fatal(err) + } + } + + select { + case ev := <-events: + if ev.Object.GetName() != "test-node" { + t.Errorf("expected node name test-node, got %s", ev.Object.GetName()) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for event") + } + + cancel() + if err := <-done; err != nil { + t.Fatalf("watcher returned error: %v", err) + } + }) + } +} + +func TestWatcherShutdown(t *testing.T) { + dir := t.TempDir() + watchDir := filepath.Join(dir, "bootc") + if err := os.Mkdir(watchDir, 0o755); err != nil { + t.Fatal(err) + } + + w := &StatusWatcher{ + PollInterval: 10 * time.Minute, + PrimaryPath: watchDir, + FallbackPath: filepath.Join(dir, "nonexistent"), + Events: make(chan event.GenericEvent, 1), + NodeName: "test-node", + Ready: make(chan struct{}), + } + + done, cancel := startWatcher(t, w) + cancel() + + select { + case err := <-done: + if err != nil { + t.Fatalf("watcher returned error on shutdown: %v", err) + } + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for watcher to shut down") + } +}