Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions cmd/daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"os"
"time"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -14,6 +15,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

bootcv1alpha1 "github.com/jlebon/bootc-operator/api/v1alpha1"
Expand All @@ -32,6 +34,9 @@ func init() {
}

func main() {
var pollInterval time.Duration
flag.DurationVar(&pollInterval, "poll-interval", 5*time.Minute, "Interval for polling bootc status as a fallback to fsnotify")

opts := zap.Options{
Development: true,
}
Expand Down Expand Up @@ -62,17 +67,32 @@ func main() {
os.Exit(1)
}

statusChanged := make(chan event.GenericEvent, 1)

if err := (&daemon.BootcNodeReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
NodeName: nodeName,
Executor: bootc.NewHostExecutor(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
NodeName: nodeName,
Executor: bootc.NewHostExecutor(),
StatusChanged: statusChanged,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Failed to create controller", "controller", "bootcnode")
os.Exit(1)
}

setupLog.Info("Starting daemon", "node", nodeName)
watcher := &daemon.StatusWatcher{
PollInterval: pollInterval,
PrimaryPath: daemon.DefaultPrimaryPath,
FallbackPath: daemon.DefaultFallbackPath,
Events: statusChanged,
NodeName: nodeName,
}
if err := mgr.Add(watcher); err != nil {
setupLog.Error(err, "Failed to add status watcher")
os.Exit(1)
}

setupLog.Info("Starting daemon", "node", nodeName, "pollInterval", pollInterval)
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "Failed to run daemon")
os.Exit(1)
Expand Down
14 changes: 10 additions & 4 deletions internal/daemon/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,23 @@ type BootcNodeReconciler struct {
stageDone chan event.GenericEvent
// rebootIssued tracks whether a reboot has been issued so classifyAction
// can distinguish the Staged→Rebooting.
rebootIssued bool
rebootIssued bool
StatusChanged chan event.GenericEvent
}

func (r *BootcNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
r.stageDone = make(chan event.GenericEvent, 1)

return ctrl.NewControllerManagedBy(mgr).
builder := ctrl.NewControllerManagedBy(mgr).
For(&bootcv1alpha1.BootcNode{}).
WatchesRawSource(source.Channel(r.stageDone, &handler.EnqueueRequestForObject{})).
Named("bootcnode").
Complete(r)
Named("bootcnode")

if r.StatusChanged != nil {
builder = builder.WatchesRawSource(source.Channel(r.StatusChanged, &handler.EnqueueRequestForObject{}))
}

return builder.Complete(r)
}

func (r *BootcNodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down
135 changes: 135 additions & 0 deletions internal/daemon/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// SPDX-License-Identifier: Apache-2.0

package daemon

import (
"context"
"os"
"time"

"github.com/fsnotify/fsnotify"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"

bootcv1alpha1 "github.com/jlebon/bootc-operator/api/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/event"
)

const (
// ostree backend
DefaultPrimaryPath = "/proc/1/root/ostree/bootc"
// composefs backend
DefaultFallbackPath = "/proc/1/root/sysroot/state/deploy"
)

type StatusWatcher struct {
PollInterval time.Duration
PrimaryPath string
FallbackPath string
Events chan event.GenericEvent
NodeName string
Ready chan struct{}
}

func (w *StatusWatcher) Start(ctx context.Context) error {
log := logf.FromContext(ctx).WithName("status-watcher")

watchPath := w.resolveWatchPath()

fsWatcher := w.setupFsnotify(log, watchPath)

closeFsWatcher := func() {
if fsWatcher != nil {
_ = fsWatcher.Close()
fsWatcher = nil
}
}
defer closeFsWatcher()

if w.PollInterval <= 0 {
w.PollInterval = 5 * time.Minute
}

ticker := time.NewTicker(w.PollInterval)
defer ticker.Stop()

var evCh <-chan fsnotify.Event
var errCh <-chan error
if fsWatcher != nil {
evCh = fsWatcher.Events
errCh = fsWatcher.Errors
}

if w.Ready != nil {
close(w.Ready)
}

for {
select {
case <-ctx.Done():
return nil
case ev := <-evCh:
// bootc updates modify directory mtime, which inotify reports as IN_ATTRIB (Chmod).
if ev.Has(fsnotify.Chmod) {
log.V(1).Info("Detected bootc status change via fsnotify")
w.sendEvent()
}
// Tear down fsnotify so the loop continues with polling only.
// A broken inotify fd never delivers events again, so without this
// the watcher silently stops reacting to filesystem changes.
case err := <-errCh:
log.Error(err, "fsnotify error, degrading to polling only")
closeFsWatcher()
evCh = nil
errCh = nil
case <-ticker.C:
log.V(1).Info("Polling bootc status")
w.sendEvent()
}
}
}

func (w *StatusWatcher) setupFsnotify(log logr.Logger, watchPath string) *fsnotify.Watcher {
if watchPath == "" {
log.Info("No bootc status path found, using polling only")
return nil
}

fsWatcher, err := fsnotify.NewWatcher()
if err != nil {
log.Error(err, "Failed to create fsnotify watcher, falling back to polling")
return nil
}

if err := fsWatcher.Add(watchPath); err != nil {
log.Error(err, "Failed to watch path, falling back to polling", "path", watchPath)
_ = fsWatcher.Close()
return nil
}

log.Info("Watching path for bootc status changes", "path", watchPath)
return fsWatcher
}

func (w *StatusWatcher) resolveWatchPath() string {
if _, err := os.Stat(w.PrimaryPath); err == nil {
return w.PrimaryPath
}
if _, err := os.Stat(w.FallbackPath); err == nil {
return w.FallbackPath
}
return ""
}

func (w *StatusWatcher) sendEvent() {
ev := event.GenericEvent{
Object: &bootcv1alpha1.BootcNode{
ObjectMeta: metav1.ObjectMeta{Name: w.NodeName},
},
}
select {
case w.Events <- ev:
default:
}
}
137 changes: 137 additions & 0 deletions internal/daemon/watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// SPDX-License-Identifier: Apache-2.0

package daemon

import (
"context"
"os"
"path/filepath"
"testing"
"time"

"sigs.k8s.io/controller-runtime/pkg/event"
)

func startWatcher(t *testing.T, w *StatusWatcher) (done <-chan error, cancel context.CancelFunc) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan error, 1)
go func() { ch <- w.Start(ctx) }()
<-w.Ready
return ch, cancel
}

func TestWatcherEvents(t *testing.T) {
tests := []struct {
name string
mkPrimary bool
mkFallback bool
touchPrimary bool
touchFallback bool
pollInterval time.Duration
}{
{
name: "Fsnotify",
mkPrimary: true,
touchPrimary: true,
pollInterval: 10 * time.Minute,
},
{
name: "FallbackPath",
mkFallback: true,
touchFallback: true,
pollInterval: 10 * time.Minute,
},
{
name: "PollOnly",
pollInterval: 200 * time.Millisecond,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dir := t.TempDir()
primaryPath := filepath.Join(dir, "bootc")
fallbackPath := filepath.Join(dir, "deploy")

if tt.mkPrimary {
if err := os.Mkdir(primaryPath, 0o755); err != nil {
t.Fatal(err)
}
}
if tt.mkFallback {
if err := os.Mkdir(fallbackPath, 0o755); err != nil {
t.Fatal(err)
}
}

events := make(chan event.GenericEvent, 1)
w := &StatusWatcher{
PollInterval: tt.pollInterval,
PrimaryPath: primaryPath,
FallbackPath: fallbackPath,
Events: events,
NodeName: "test-node",
Ready: make(chan struct{}),
}

done, cancel := startWatcher(t, w)
defer cancel()

now := time.Now()
if tt.touchPrimary {
if err := os.Chtimes(primaryPath, now, now); err != nil {
t.Fatal(err)
}
}
if tt.touchFallback {
if err := os.Chtimes(fallbackPath, now, now); err != nil {
t.Fatal(err)
}
}

select {
case ev := <-events:
if ev.Object.GetName() != "test-node" {
t.Errorf("expected node name test-node, got %s", ev.Object.GetName())
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for event")
}

cancel()
if err := <-done; err != nil {
t.Fatalf("watcher returned error: %v", err)
}
})
}
}

func TestWatcherShutdown(t *testing.T) {
dir := t.TempDir()
watchDir := filepath.Join(dir, "bootc")
if err := os.Mkdir(watchDir, 0o755); err != nil {
t.Fatal(err)
}

w := &StatusWatcher{
PollInterval: 10 * time.Minute,
PrimaryPath: watchDir,
FallbackPath: filepath.Join(dir, "nonexistent"),
Events: make(chan event.GenericEvent, 1),
NodeName: "test-node",
Ready: make(chan struct{}),
}

done, cancel := startWatcher(t, w)
cancel()

select {
case err := <-done:
if err != nil {
t.Fatalf("watcher returned error on shutdown: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for watcher to shut down")
}
}