Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 95 additions & 1 deletion internal/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/ptr"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand All @@ -67,6 +72,41 @@ type Reconciler struct {
localCRD *apiextensionsv1.CustomResourceDefinition
stateNamespace string
agentName string
relatedIndex *sync.RelatedObjectIndex
}

// relatedResourceEventHandler enqueues the primary object whenever a kcp-origin
// related resource changes. It uses the RelatedObjectIndex to reverse-map the
// changed related object back to its owning primary object.
type relatedResourceEventHandler struct {
clusterName string
relatedIndex *sync.RelatedObjectIndex
group string
resource string
}

func (h *relatedResourceEventHandler) Create(_ context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
h.enqueue(evt.Object, q)
}

func (h *relatedResourceEventHandler) Update(_ context.Context, evt event.TypedUpdateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
h.enqueue(evt.ObjectNew, q)
}

func (h *relatedResourceEventHandler) Delete(_ context.Context, evt event.TypedDeleteEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
h.enqueue(evt.Object, q)
}

func (h *relatedResourceEventHandler) Generic(_ context.Context, evt event.TypedGenericEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
h.enqueue(evt.Object, q)
}

func (h *relatedResourceEventHandler) enqueue(obj *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
req, ok := h.relatedIndex.Get(h.clusterName, h.group, h.resource, obj.GetNamespace(), obj.GetName())
if !ok {
return
}
q.Add(req)
}

// Create creates a new controller and importantly does *not* add it to the manager,
Expand Down Expand Up @@ -120,6 +160,7 @@ func Create(
stateNamespace: stateNamespace,
agentName: agentName,
localCRD: localCRD,
relatedIndex: sync.NewRelatedObjectIndex(),
}

ctrlOptions := mccontroller.Options{
Expand Down Expand Up @@ -161,6 +202,59 @@ func Create(
return nil, fmt.Errorf("failed to setup local-side watch: %w", err)
}

// Watch origin:kcp related resources in the virtual workspace so that changes
// to them trigger reconciliation of their owning primary object.
watchedGVKs := sets.New[schema.GroupVersionKind]()
for _, relRes := range pubRes.Spec.Related {
if relRes.Origin != syncagentv1alpha1.RelatedResourceOriginKcp {
continue
}

gvr := schema.GroupVersionResource{
Group: relRes.Group,
Version: relRes.Version,
Resource: relRes.Resource,
}

// Use the local REST mapper to determine the Kind. Core resources (ConfigMap,
// Secret, …) and CRDs installed on the service cluster are covered by this.
gvk, err := localManager.GetRESTMapper().KindFor(gvr)
if err != nil {
log.Warnw("failed to determine Kind for origin:kcp related resource, skipping watch", "gvr", gvr, "error", err)
continue
}

// Deduplicate: only set up one watch per GVK.
if watchedGVKs.Has(gvk) {
continue
}

watchedGVKs.Insert(gvk)

relatedDummy := &unstructured.Unstructured{}
relatedDummy.SetGroupVersionKind(gvk)

group := relRes.Group
resource := relRes.Resource

enqueueForRelated := mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request](
func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
return &relatedResourceEventHandler{
clusterName: clusterName,
relatedIndex: reconciler.relatedIndex,
group: group,
resource: resource,
}
},
)

if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil {
return nil, fmt.Errorf("failed to setup watch for origin:kcp related resource %v: %w", gvk, err)
}

log.Infow("Set up watch for origin:kcp related resource", "gvk", gvk)
}

log.Info("Done setting up unmanaged controller.")

return c, nil
Expand Down Expand Up @@ -226,7 +320,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, request mcreconcile.Request)
}

// sync main object
syncer, err := sync.NewResourceSyncer(log, r.localClient, vwClient, r.pubRes, r.localCRD, mutation.NewMutator, r.stateNamespace, r.agentName)
syncer, err := sync.NewResourceSyncer(log, r.localClient, vwClient, r.pubRes, r.localCRD, mutation.NewMutator, r.stateNamespace, r.agentName, r.relatedIndex)
if err != nil {
recorder.Event(remoteObj, corev1.EventTypeWarning, "ReconcilingError", "Failed to process object: a provider-side issue has occurred.")
return reconcile.Result{}, fmt.Errorf("failed to create syncer: %w", err)
Expand Down
61 changes: 61 additions & 0 deletions internal/sync/related_index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2025 The KCP Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sync

import (
gosync "sync"

mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
)

type relatedObjectKey struct {
cluster string
group string
resource string
namespace string
name string
}

// RelatedObjectIndex maps kcp-side related objects back to their owning primary objects.
// It is populated during reconciliation and used by watch handlers to trigger
// reconciliation when a related resource changes in kcp.
type RelatedObjectIndex struct {
mu gosync.RWMutex
index map[relatedObjectKey]mcreconcile.Request
}

// NewRelatedObjectIndex creates a new empty RelatedObjectIndex.
func NewRelatedObjectIndex() *RelatedObjectIndex {
return &RelatedObjectIndex{
index: make(map[relatedObjectKey]mcreconcile.Request),
}
}

// Set stores the mapping from a related object to its owning primary object.
func (i *RelatedObjectIndex) Set(cluster, group, resource, namespace, name string, primary mcreconcile.Request) {
i.mu.Lock()
defer i.mu.Unlock()
i.index[relatedObjectKey{cluster, group, resource, namespace, name}] = primary
}

// Get looks up the primary object that owns the given related object.
func (i *RelatedObjectIndex) Get(cluster, group, resource, namespace, name string) (mcreconcile.Request, bool) {
i.mu.RLock()
defer i.mu.RUnlock()
req, ok := i.index[relatedObjectKey{cluster, group, resource, namespace, name}]
return req, ok
}
6 changes: 6 additions & 0 deletions internal/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type ResourceSyncer struct {

agentName string

// relatedIndex is populated during reconciliation to enable watches on origin:kcp
// related resources to trigger primary object reconciliation.
relatedIndex *RelatedObjectIndex

// newObjectStateStore is used for testing purposes
newObjectStateStore newObjectStateStoreFunc
}
Expand All @@ -69,6 +73,7 @@ func NewResourceSyncer(
mutatorCreator MutatorCreatorFunc,
stateNamespace string,
agentName string,
relatedIndex *RelatedObjectIndex,
) (*ResourceSyncer, error) {
// create a dummy that represents the type used on the local service cluster
localGVK, err := projection.PublishedResourceSourceGVK(localCRD, pubRes)
Expand Down Expand Up @@ -127,6 +132,7 @@ func NewResourceSyncer(
primaryMutator: primaryMutator,
relatedMutators: relatedMutators,
agentName: agentName,
relatedIndex: relatedIndex,
newObjectStateStore: newKubernetesStateStoreCreator(stateNamespace),
}, nil
}
Expand Down
25 changes: 25 additions & 0 deletions internal/sync/syncer_related.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
mcreconcile "sigs.k8s.io/multicluster-runtime/pkg/reconcile"
)

func (s *ResourceSyncer) processRelatedResources(ctx context.Context, log *zap.SugaredLogger, stateStore ObjectStateStore, remote, local syncSide, primaryDeleting bool) (requeue bool, err error) {
Expand Down Expand Up @@ -91,6 +93,29 @@ func (s *ResourceSyncer) processRelatedResource(ctx context.Context, log *zap.Su
return false, nil
}

// Populate the reverse index for origin:kcp related resources so that watches
// on those resources can trigger reconciliation of the owning primary object.
if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp && s.relatedIndex != nil {
for _, resolved := range resolvedObjects {
s.relatedIndex.Set(
string(remote.clusterName),
relRes.Group,
relRes.Resource,
resolved.original.GetNamespace(),
resolved.original.GetName(),
mcreconcile.Request{
ClusterName: string(remote.clusterName),
Request: reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: remote.object.GetNamespace(),
Name: remote.object.GetName(),
},
},
},
)
}
}

slices.SortStableFunc(resolvedObjects, func(a, b resolvedObject) int {
aKey := ctrlruntimeclient.ObjectKeyFromObject(a.original).String()
bKey := ctrlruntimeclient.ObjectKeyFromObject(b.original).String()
Expand Down
2 changes: 2 additions & 0 deletions internal/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
},
stateNamespace,
"textor-the-doctor",
nil,
)
if err != nil {
t.Fatalf("Failed to create syncer: %v", err)
Expand Down Expand Up @@ -1231,6 +1232,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
},
stateNamespace,
"textor-the-doctor",
nil,
)
if err != nil {
t.Fatalf("Failed to create syncer: %v", err)
Expand Down
Loading