Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions src/code.cloudfoundry.org/gorouter/mbus/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type RegistryMessageOpts struct {
HashBalance float64 `json:"hash_balance,string"`
}

func (rm *RegistryMessage) makeEndpoint(http2Enabled bool) (*route.Endpoint, error) {
func (rm *RegistryMessage) makeEndpoint(http2Enabled bool, globalRoutingAlgo string) (*route.Endpoint, error) {
port, useTLS, err := rm.port()
if err != nil {
return nil, err
Expand All @@ -61,6 +61,11 @@ func (rm *RegistryMessage) makeEndpoint(http2Enabled bool) (*route.Endpoint, err
protocol = "http1"
}

lbAlgo := globalRoutingAlgo
if rm.Options.LoadBalancingAlgorithm != "" {
lbAlgo = rm.Options.LoadBalancingAlgorithm
}

return route.NewEndpoint(&route.EndpointOpts{
AppId: rm.App,
AvailabilityZone: rm.AvailabilityZone,
Expand All @@ -77,7 +82,7 @@ func (rm *RegistryMessage) makeEndpoint(http2Enabled bool) (*route.Endpoint, err
IsolationSegment: rm.IsolationSegment,
UseTLS: useTLS,
UpdatedAt: updatedAt,
LoadBalancingAlgorithm: rm.Options.LoadBalancingAlgorithm,
LoadBalancingAlgorithm: lbAlgo,
HashHeaderName: rm.Options.HashHeaderName,
HashBalanceFactor: rm.Options.HashBalance,
}), nil
Expand Down Expand Up @@ -107,7 +112,8 @@ type Subscriber struct {

params startMessageParams

logger *slog.Logger
logger *slog.Logger
globalRoutingAlgo string
}

type startMessageParams struct {
Expand Down Expand Up @@ -137,10 +143,11 @@ func NewSubscriber(
minimumRegisterIntervalInSeconds: int(c.StartResponseDelayInterval.Seconds()),
pruneThresholdInSeconds: int(c.DropletStaleThreshold.Seconds()),
},
reconnected: reconnected,
natsPendingLimit: c.NatsClientMessageBufferSize,
logger: l,
http2Enabled: c.EnableHTTP2,
reconnected: reconnected,
natsPendingLimit: c.NatsClientMessageBufferSize,
logger: l,
http2Enabled: c.EnableHTTP2,
globalRoutingAlgo: c.LoadBalance,
}
}

Expand Down Expand Up @@ -243,7 +250,7 @@ func (s *Subscriber) subscribeRoutes() (*nats.Subscription, error) {
}

func (s *Subscriber) registerEndpoint(msg *RegistryMessage) {
endpoint, err := msg.makeEndpoint(s.http2Enabled)
endpoint, err := msg.makeEndpoint(s.http2Enabled, s.globalRoutingAlgo)
if err != nil {
s.logger.Error("Unable to register route",
log.ErrAttr(err),
Expand All @@ -258,7 +265,7 @@ func (s *Subscriber) registerEndpoint(msg *RegistryMessage) {
}

func (s *Subscriber) unregisterEndpoint(msg *RegistryMessage) {
endpoint, err := msg.makeEndpoint(s.http2Enabled)
endpoint, err := msg.makeEndpoint(s.http2Enabled, s.globalRoutingAlgo)
if err != nil {
s.logger.Error("Unable to unregister route",
log.ErrAttr(err),
Expand Down
12 changes: 6 additions & 6 deletions src/code.cloudfoundry.org/gorouter/mbus/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,14 +587,14 @@ var _ = Describe("Subscriber", func() {
})
})

Context("when the message contains an empty load balancing algorithm option", func() {
Context("when the message contains an empty or absent load balancing algorithm option", func() {
JustBeforeEach(func() {
sub = mbus.NewSubscriber(natsClient, registry, cfg, reconnected, logger.Logger)
process = ifrit.Invoke(sub)
Eventually(process.Ready()).Should(BeClosed())
})

It("endpoint is constructed with the empty string load balancing algorithm", func() {
It("endpoint is constructed with the global default load balancing algorithm", func() {
var msg = mbus.RegistryMessage{
Host: "host",
App: "app",
Expand All @@ -610,14 +610,14 @@ var _ = Describe("Subscriber", func() {
Eventually(registry.RegisterCallCount).Should(Equal(1))
_, originalEndpoint := registry.RegisterArgsForCall(0)
expectedEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host",
AppId: "app",
Protocol: "http2",
Host: "host",
AppId: "app",
Protocol: "http2",
LoadBalancingAlgorithm: config.LOAD_BALANCE_RR,
})

Expect(originalEndpoint).To(Equal(expectedEndpoint))
})

})

Context("when the message contains hash-based load balancing options", func() {
Expand Down
10 changes: 6 additions & 4 deletions src/code.cloudfoundry.org/gorouter/route/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,17 +668,19 @@ func (p *EndpointPool) MarshalJSON() ([]byte, error) {

// setPoolLoadBalancingAlgorithm overwrites the load balancing algorithm of a pool by that of a specified endpoint, if that is valid.
func (p *EndpointPool) setPoolLoadBalancingAlgorithm(endpoint *Endpoint) {
if endpoint.LoadBalancingAlgorithm == "" {
return
}

if endpoint.LoadBalancingAlgorithm != p.LoadBalancingAlgorithm {
if config.IsLoadBalancingAlgorithmValid(endpoint.LoadBalancingAlgorithm) {
previousAlgorithm := p.LoadBalancingAlgorithm
p.LoadBalancingAlgorithm = endpoint.LoadBalancingAlgorithm
p.logger.Debug("setting-pool-load-balancing-algorithm-to-that-of-an-endpoint",
slog.String("endpointLBAlgorithm", endpoint.LoadBalancingAlgorithm),
slog.String("poolLBAlgorithm", p.LoadBalancingAlgorithm))

// Clean up hash-based routing state when switching away from HB
if previousAlgorithm == config.LOAD_BALANCE_HB && p.LoadBalancingAlgorithm != config.LOAD_BALANCE_HB {
p.HashLookupTable = nil
p.HashRoutingProperties = nil
}
} else {
p.logger.Error("invalid-endpoint-load-balancing-algorithm-provided-keeping-pool-lb-algo",
slog.String("endpointLBAlgorithm", endpoint.LoadBalancingAlgorithm),
Expand Down
49 changes: 33 additions & 16 deletions src/code.cloudfoundry.org/gorouter/route/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,20 +375,6 @@ var _ = Describe("EndpointPool", func() {
Eventually(logger).Should(gbytes.Say(`setting-pool-load-balancing-algorithm-to-that-of-an-endpoint`))
})

It("is an empty string and the load balancing algorithm of a pool is kept", func() {
expectedLBAlgo := config.LOAD_BALANCE_RR
pool := route.NewPool(&route.PoolOpts{
Logger: logger.Logger,
LoadBalancingAlgorithm: expectedLBAlgo,
})
endpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host-1", Port: 1234,
RouteServiceUrl: "url",
})
pool.Put(endpoint)
Expect(pool.LoadBalancingAlgorithm).To(Equal(expectedLBAlgo))
})

It("is not specified in the endpoint options and the load balancing algorithm of a pool is kept", func() {
expectedLBAlgo := config.LOAD_BALANCE_RR
pool := route.NewPool(&route.PoolOpts{
Expand Down Expand Up @@ -486,9 +472,40 @@ var _ = Describe("EndpointPool", func() {

})

})
Context("When removing a per-route load balancing algorithm", func() {
It("reverts the pool to the platform default and cleans up HB state", func() {
pool := route.NewPool(&route.PoolOpts{
Logger: logger.Logger,
LoadBalancingAlgorithm: config.LOAD_BALANCE_RR,
})

Context("RouteServiceUrl", func() {
// Set up HB routing on the pool
hbEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host-1",
Port: 1234,
PrivateInstanceId: "id-1",
LoadBalancingAlgorithm: config.LOAD_BALANCE_HB,
HashBalanceFactor: 1.25,
HashHeaderName: "X-Tenant",
})
pool.Put(hbEndpoint)
Expect(pool.LoadBalancingAlgorithm).To(Equal(config.LOAD_BALANCE_HB))
Expect(pool.HashLookupTable).ToNot(BeNil())
Expect(pool.HashRoutingProperties).ToNot(BeNil())

// Revert to the platform default (as the subscriber would do when the option is removed)
resetEndpoint := route.NewEndpoint(&route.EndpointOpts{
Host: "host-1",
Port: 1234,
PrivateInstanceId: "id-1",
LoadBalancingAlgorithm: config.LOAD_BALANCE_RR,
})
pool.Put(resetEndpoint)
Expect(pool.LoadBalancingAlgorithm).To(Equal(config.LOAD_BALANCE_RR))
Expect(pool.HashLookupTable).To(BeNil())
Expect(pool.HashRoutingProperties).To(BeNil())
})
})
It("returns the route_service_url associated with the pool", func() {
endpoint := &route.Endpoint{}
endpointRS := &route.Endpoint{RouteServiceUrl: "my-url"}
Expand Down