From 7a039f96a5c2736e32394d75301fa36c024da60a Mon Sep 17 00:00:00 2001 From: Iskren Date: Thu, 12 Mar 2026 18:49:46 +0200 Subject: [PATCH] watch origin:kcp related resources and sync changes to service cluster --- internal/controller/sync/controller.go | 96 +++++++++++++++++++++++++- internal/sync/related_index.go | 61 ++++++++++++++++ internal/sync/syncer.go | 6 ++ internal/sync/syncer_related.go | 25 +++++++ internal/sync/syncer_test.go | 2 + 5 files changed, 189 insertions(+), 1 deletion(-) create mode 100644 internal/sync/related_index.go diff --git a/internal/controller/sync/controller.go b/internal/controller/sync/controller.go index 509d8f4..6e6b452 100644 --- a/internal/controller/sync/controller.go +++ b/internal/controller/sync/controller.go @@ -39,9 +39,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" "k8s.io/utils/ptr" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -67,6 +72,41 @@ type Reconciler struct { localCRD *apiextensionsv1.CustomResourceDefinition stateNamespace string agentName string + relatedIndex *sync.RelatedObjectIndex +} + +// relatedResourceEventHandler enqueues the primary object whenever a kcp-origin +// related resource changes. It uses the RelatedObjectIndex to reverse-map the +// changed related object back to its owning primary object. +type relatedResourceEventHandler struct { + clusterName string + relatedIndex *sync.RelatedObjectIndex + group string + resource string +} + +func (h *relatedResourceEventHandler) Create(_ context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { + h.enqueue(evt.Object, q) +} + +func (h *relatedResourceEventHandler) Update(_ context.Context, evt event.TypedUpdateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { + h.enqueue(evt.ObjectNew, q) +} + +func (h *relatedResourceEventHandler) Delete(_ context.Context, evt event.TypedDeleteEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { + h.enqueue(evt.Object, q) +} + +func (h *relatedResourceEventHandler) Generic(_ context.Context, evt event.TypedGenericEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { + h.enqueue(evt.Object, q) +} + +func (h *relatedResourceEventHandler) enqueue(obj *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) { + req, ok := h.relatedIndex.Get(h.clusterName, h.group, h.resource, obj.GetNamespace(), obj.GetName()) + if !ok { + return + } + q.Add(req) } // Create creates a new controller and importantly does *not* add it to the manager, @@ -120,6 +160,7 @@ func Create( stateNamespace: stateNamespace, agentName: agentName, localCRD: localCRD, + relatedIndex: sync.NewRelatedObjectIndex(), } ctrlOptions := mccontroller.Options{ @@ -161,6 +202,59 @@ func Create( return nil, fmt.Errorf("failed to setup local-side watch: %w", err) } + // Watch origin:kcp related resources in the virtual workspace so that changes + // to them trigger reconciliation of their owning primary object. + watchedGVKs := sets.New[schema.GroupVersionKind]() + for _, relRes := range pubRes.Spec.Related { + if relRes.Origin != syncagentv1alpha1.RelatedResourceOriginKcp { + continue + } + + gvr := schema.GroupVersionResource{ + Group: relRes.Group, + Version: relRes.Version, + Resource: relRes.Resource, + } + + // Use the local REST mapper to determine the Kind. Core resources (ConfigMap, + // Secret, …) and CRDs installed on the service cluster are covered by this. + gvk, err := localManager.GetRESTMapper().KindFor(gvr) + if err != nil { + log.Warnw("failed to determine Kind for origin:kcp related resource, skipping watch", "gvr", gvr, "error", err) + continue + } + + // Deduplicate: only set up one watch per GVK. + if watchedGVKs.Has(gvk) { + continue + } + + watchedGVKs.Insert(gvk) + + relatedDummy := &unstructured.Unstructured{} + relatedDummy.SetGroupVersionKind(gvk) + + group := relRes.Group + resource := relRes.Resource + + enqueueForRelated := mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request]( + func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] { + return &relatedResourceEventHandler{ + clusterName: clusterName, + relatedIndex: reconciler.relatedIndex, + group: group, + resource: resource, + } + }, + ) + + if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil { + return nil, fmt.Errorf("failed to setup watch for origin:kcp related resource %v: %w", gvk, err) + } + + log.Infow("Set up watch for origin:kcp related resource", "gvk", gvk) + } + log.Info("Done setting up unmanaged controller.") return c, nil @@ -226,7 +320,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request) } // sync main object - syncer, err := sync.NewResourceSyncer(log, r.localClient, vwClient, r.pubRes, r.localCRD, mutation.NewMutator, r.stateNamespace, r.agentName) + syncer, err := sync.NewResourceSyncer(log, r.localClient, vwClient, r.pubRes, r.localCRD, mutation.NewMutator, r.stateNamespace, r.agentName, r.relatedIndex) if err != nil { recorder.Event(remoteObj, corev1.EventTypeWarning, "ReconcilingError", "Failed to process object: a provider-side issue has occurred.") return reconcile.Result{}, fmt.Errorf("failed to create syncer: %w", err) diff --git a/internal/sync/related_index.go b/internal/sync/related_index.go new file mode 100644 index 0000000..8a492fc --- /dev/null +++ b/internal/sync/related_index.go @@ -0,0 +1,61 @@ +/* +Copyright 2025 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sync + +import ( + gosync "sync" + + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" +) + +type relatedObjectKey struct { + cluster string + group string + resource string + namespace string + name string +} + +// RelatedObjectIndex maps kcp-side related objects back to their owning primary objects. +// It is populated during reconciliation and used by watch handlers to trigger +// reconciliation when a related resource changes in kcp. +type RelatedObjectIndex struct { + mu gosync.RWMutex + index map[relatedObjectKey]mcreconcile.Request +} + +// NewRelatedObjectIndex creates a new empty RelatedObjectIndex. +func NewRelatedObjectIndex() *RelatedObjectIndex { + return &RelatedObjectIndex{ + index: make(map[relatedObjectKey]mcreconcile.Request), + } +} + +// Set stores the mapping from a related object to its owning primary object. +func (i *RelatedObjectIndex) Set(cluster, group, resource, namespace, name string, primary mcreconcile.Request) { + i.mu.Lock() + defer i.mu.Unlock() + i.index[relatedObjectKey{cluster, group, resource, namespace, name}] = primary +} + +// Get looks up the primary object that owns the given related object. +func (i *RelatedObjectIndex) Get(cluster, group, resource, namespace, name string) (mcreconcile.Request, bool) { + i.mu.RLock() + defer i.mu.RUnlock() + req, ok := i.index[relatedObjectKey{cluster, group, resource, namespace, name}] + return req, ok +} diff --git a/internal/sync/syncer.go b/internal/sync/syncer.go index 7108fca..d2e51b9 100644 --- a/internal/sync/syncer.go +++ b/internal/sync/syncer.go @@ -54,6 +54,10 @@ type ResourceSyncer struct { agentName string + // relatedIndex is populated during reconciliation to enable watches on origin:kcp + // related resources to trigger primary object reconciliation. + relatedIndex *RelatedObjectIndex + // newObjectStateStore is used for testing purposes newObjectStateStore newObjectStateStoreFunc } @@ -69,6 +73,7 @@ func NewResourceSyncer( mutatorCreator MutatorCreatorFunc, stateNamespace string, agentName string, + relatedIndex *RelatedObjectIndex, ) (*ResourceSyncer, error) { // create a dummy that represents the type used on the local service cluster localGVK, err := projection.PublishedResourceSourceGVK(localCRD, pubRes) @@ -127,6 +132,7 @@ func NewResourceSyncer( primaryMutator: primaryMutator, relatedMutators: relatedMutators, agentName: agentName, + relatedIndex: relatedIndex, newObjectStateStore: newKubernetesStateStoreCreator(stateNamespace), }, nil } diff --git a/internal/sync/syncer_related.go b/internal/sync/syncer_related.go index 35aceb6..b553cc3 100644 --- a/internal/sync/syncer_related.go +++ b/internal/sync/syncer_related.go @@ -38,6 +38,8 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile" ) func (s *ResourceSyncer) processRelatedResources(ctx context.Context, log *zap.SugaredLogger, stateStore ObjectStateStore, remote, local syncSide, primaryDeleting bool) (requeue bool, err error) { @@ -91,6 +93,29 @@ func (s *ResourceSyncer) processRelatedResource(ctx context.Context, log *zap.Su return false, nil } + // Populate the reverse index for origin:kcp related resources so that watches + // on those resources can trigger reconciliation of the owning primary object. + if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp && s.relatedIndex != nil { + for _, resolved := range resolvedObjects { + s.relatedIndex.Set( + string(remote.clusterName), + relRes.Group, + relRes.Resource, + resolved.original.GetNamespace(), + resolved.original.GetName(), + mcreconcile.Request{ + ClusterName: string(remote.clusterName), + Request: reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: remote.object.GetNamespace(), + Name: remote.object.GetName(), + }, + }, + }, + ) + } + } + slices.SortStableFunc(resolvedObjects, func(a, b resolvedObject) int { aKey := ctrlruntimeclient.ObjectKeyFromObject(a.original).String() bKey := ctrlruntimeclient.ObjectKeyFromObject(b.original).String() diff --git a/internal/sync/syncer_test.go b/internal/sync/syncer_test.go index 75055e8..b050231 100644 --- a/internal/sync/syncer_test.go +++ b/internal/sync/syncer_test.go @@ -923,6 +923,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) { }, stateNamespace, "textor-the-doctor", + nil, ) if err != nil { t.Fatalf("Failed to create syncer: %v", err) @@ -1231,6 +1232,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) { }, stateNamespace, "textor-the-doctor", + nil, ) if err != nil { t.Fatalf("Failed to create syncer: %v", err)