Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ linters:
- usestdlibvars
- whitespace
- wsl
settings:
lll:
line-length: 80
tab-width: 4
exclusions:
generated: lax
presets:
Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,17 @@ Changes to following annotations may lead to new connections timing out until th
- `k8s.cloudscale.ch/loadbalancer-health-monitor-type`
- `k8s.cloudscale.ch/loadbalancer-health-monitor-http`

Changes to the `k8s.cloudscale.ch/loadbalancer-node-selector` annotation are generally safe,
as long as the selector matches valid node labels.
When using `externalTrafficPolicy: Local`, ensure the selector targets nodes that are
actually running the backend pods. Otherwise, traffic will be dropped.

Unlike the well-known node label [`node.kubernetes.io/exclude-from-external-load-balancers=true`](https://kubernetes.io/docs/reference/labels-annotations-taints/#node-kubernetes-io-exclude-from-external-load-balancers),
which globally excludes nodes from *all* LoadBalancer services, this annotation allows
targeting a specific subset of nodes on a per-service basis.
Note that the `exclude-from-external-load-balancers` label is applied first: nodes with
this label are excluded before the `loadbalancer-node-selector` is evaluated.

##### Listener Port Changes

Changes to the outward bound service port have a downtime ranging from 15s to 120s, depending on the action. Since the name of the port is used to avoid expensive pool recreation, the impact is minimal if the port name does not change.
Expand Down
56 changes: 56 additions & 0 deletions examples/nginx-hello-nodeselector.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Deploys the docker.io/nginxdemos/hello:plain-text container and creates a
# loadbalancer service with a node-selector annotation for it:
#
# export KUBECONFIG=path/to/kubeconfig
# kubectl apply -f nginx-hello-nodeselector.yml
#
# Wait for `kubectl describe service hello` to show "Loadbalancer Ensured",
# then use the IP address found under "LoadBalancer Ingress" to connect to the
# service.
#
# You can also use the following shortcut:
#
# curl http://$(kubectl get service hello -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
#
# If you adjust the nodeSelector of the deployment, or the `loadbalancer-node-selector` annotation on the service,
# you'll notice that if they don't match, the request will fail.
#
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: hello
spec:
replicas: 2
selector:
matchLabels:
app: hello
template:
metadata:
labels:
app: hello
spec:
containers:
- name: hello
image: docker.io/nginxdemos/hello:plain-text
nodeSelector:
kubernetes.io/hostname: k8test-worker-2
---
apiVersion: v1
kind: Service
metadata:
labels:
app: hello
annotations:
k8s.cloudscale.ch/loadbalancer-node-selector: "kubernetes.io/hostname=k8test-worker-2"
name: hello
spec:
ports:
- port: 80
protocol: TCP
targetPort: 80
name: http
selector:
app: hello
type: LoadBalancer
externalTrafficPolicy: Local
32 changes: 28 additions & 4 deletions pkg/cloudscale_ccm/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"strings"
"time"

cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v6"
"github.com/cloudscale-ch/cloudscale-go-sdk/v6"
"golang.org/x/oauth2"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

cloudprovider "k8s.io/cloud-provider"
Expand All @@ -32,6 +35,8 @@ const (
type cloud struct {
instances *instances
loadbalancer *loadbalancer

eventRecorder record.EventRecorder
}

// Register this provider with Kubernetes.
Expand Down Expand Up @@ -112,8 +117,27 @@ func (c *cloud) Initialize(

// This cannot be configured earlier, even though it seems better situated
// in newCloudscaleClient
c.loadbalancer.k8s = kubernetes.NewForConfigOrDie(
clientBuilder.ConfigOrDie("cloudscale-cloud-controller-manager"))
c.loadbalancer.k8s = clientBuilder.ClientOrDie(
"cloudscale-cloud-controller-manager",
)

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{
Interface: c.loadbalancer.k8s.CoreV1().Events(""),
})
c.eventRecorder = eventBroadcaster.NewRecorder(scheme.Scheme,
corev1.EventSource{
Component: "cloudscale-cloud-controller-manager",
},
)

go func() {
// wait until stop chan closes
<-stop
eventBroadcaster.Shutdown()
}()

c.loadbalancer.recorder = c.eventRecorder
}

// LoadBalancer returns a balancer interface. Also returns true if the
Expand Down
95 changes: 78 additions & 17 deletions pkg/cloudscale_ccm/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package cloudscale_ccm

import (
"context"
"errors"
"fmt"
"slices"
"strings"

"github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil"
"github.com/cloudscale-ch/cloudscale-go-sdk/v6"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"

"github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/kubeutil"
)

// Annotations used by the loadbalancer integration of cloudscale_ccm. Those
Expand Down Expand Up @@ -208,7 +210,7 @@ const (
// connections timing out while the monitor is updated.
LoadBalancerHealthMonitorTimeoutS = "k8s.cloudscale.ch/loadbalancer-health-monitor-timeout-s"

// LoadBalancerHealthMonitorDownThreshold is the number of the checks that
// LoadBalancerHealthMonitorUpThreshold is the number of the checks that
// need to succeed before a pool member is considered up. Defaults to 2.
LoadBalancerHealthMonitorUpThreshold = "k8s.cloudscale.ch/loadbalancer-health-monitor-up-threshold"

Expand Down Expand Up @@ -278,7 +280,7 @@ const (
// Changing this annotation on an established service is considered safe.
LoadBalancerListenerTimeoutMemberDataMS = "k8s.cloudscale.ch/loadbalancer-timeout-member-data-ms"

// LoadBalancerSubnetLimit is a JSON list of subnet UUIDs that the
// LoadBalancerListenerAllowedSubnets is a JSON list of subnet UUIDs that the
// loadbalancer should use. By default, all subnets of a node are used:
//
// * `[]` means that anyone is allowed to connect (default).
Expand All @@ -291,12 +293,21 @@ const (
// This is an advanced feature, useful if you have nodes that are in
// multiple private subnets.
LoadBalancerListenerAllowedSubnets = "k8s.cloudscale.ch/loadbalancer-listener-allowed-subnets"

// LoadBalancerNodeSelector can be set to restrict which nodes are added to the LB pool.
// It accepts a standard Kubernetes label selector string.
//
// N.B.: If the node-selector annotation doesn't match any nodes, the LoadBalancer will remove all members
// from the LB pool, effectively causing a downtime on the LB.
// Make sure to verify the node selector well before setting it.
LoadBalancerNodeSelector = "k8s.cloudscale.ch/loadbalancer-node-selector"
)

type loadbalancer struct {
lbs lbMapper
srv serverMapper
k8s kubernetes.Interface
lbs lbMapper
srv serverMapper
k8s kubernetes.Interface
recorder record.EventRecorder
}

// GetLoadBalancer returns whether the specified load balancer exists, and
Expand Down Expand Up @@ -387,24 +398,34 @@ func (l *loadbalancer) EnsureLoadBalancer(
return nil, err
}

// Refuse to do anything if there are no nodes
if len(nodes) == 0 {
return nil, errors.New(
"no valid nodes for service found, please verify there is " +
"at least one that allows load balancers",
filteredNodes, err := filterNodesBySelector(serviceInfo, nodes)
if err != nil {
return nil, err
}

if len(filteredNodes) == 0 {
l.recorder.Event(
service,
v1.EventTypeWarning,
"NoValidNodes",
fmt.Sprintf("No valid nodes for service found, "+
"double-check node-selector annotation: %s: %s",
LoadBalancerNodeSelector,
serviceInfo.annotation(LoadBalancerNodeSelector),
),
)
}

// Reconcile
err := reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) {
err = reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) {
// Get the desired state from Kubernetes
servers, err := l.srv.mapNodes(ctx, nodes).All()
servers, err := l.srv.mapNodes(ctx, filteredNodes).All()
if err != nil {
return nil, fmt.Errorf(
"unable to get load balancer for %s: %w", service.Name, err)
}

return desiredLbState(serviceInfo, nodes, servers)
return desiredLbState(serviceInfo, filteredNodes, servers)
}, func() (*lbState, error) {
// Get the current state from cloudscale.ch
return actualLbState(ctx, &l.lbs, serviceInfo)
Expand Down Expand Up @@ -442,6 +463,28 @@ func (l *loadbalancer) EnsureLoadBalancer(
return result, nil
}

func filterNodesBySelector(
serviceInfo *serviceInfo,
nodes []*v1.Node,
) ([]*v1.Node, error) {
selector := labels.Everything()
if v := serviceInfo.annotation(LoadBalancerNodeSelector); v != "" {
var err error
selector, err = labels.Parse(v)
if err != nil {
return nil, fmt.Errorf("unable to parse selector: %w", err)
}
}
selectedNodes := make([]*v1.Node, 0, len(nodes))
for _, node := range nodes {
if selector.Matches(labels.Set(node.Labels)) {
selectedNodes = append(selectedNodes, node)
}
}

return selectedNodes, nil
}

// UpdateLoadBalancer updates hosts under the specified load balancer.
// Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
Expand All @@ -461,16 +504,34 @@ func (l *loadbalancer) UpdateLoadBalancer(
return err
}

filteredNodes, err := filterNodesBySelector(serviceInfo, nodes)
if err != nil {
return err
}

if len(filteredNodes) == 0 {
l.recorder.Event(
service,
v1.EventTypeWarning,
"NoValidNodes",
fmt.Sprintf("No valid nodes for service found, "+
"double-check node-selector annotation: %s: %s",
LoadBalancerNodeSelector,
serviceInfo.annotation(LoadBalancerNodeSelector),
),
)
}

// Reconcile
return reconcileLbState(ctx, l.lbs.client, func() (*lbState, error) {
// Get the desired state from Kubernetes
servers, err := l.srv.mapNodes(ctx, nodes).All()
servers, err := l.srv.mapNodes(ctx, filteredNodes).All()
if err != nil {
return nil, fmt.Errorf(
"unable to get load balancer for %s: %w", service.Name, err)
}

return desiredLbState(serviceInfo, nodes, servers)
return desiredLbState(serviceInfo, filteredNodes, servers)
}, func() (*lbState, error) {
// Get the current state from cloudscale.ch
return actualLbState(ctx, &l.lbs, serviceInfo)
Expand Down
Loading
Loading