Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions service/api/sse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"errors"
"strings"
"sync/atomic"

"github.com/splitio/go-split-commons/v9/conf"
"github.com/splitio/go-split-commons/v9/dtos"
Expand All @@ -18,6 +19,12 @@
keepAlive = 70
)

const (
StateIdle int32 = 0 // Client is idle/not started
StateRunning int32 = 1 // Client is running
StateDestroyed int32 = -1 // Client is destroyed/stopped
)

// StreamingClient interface
type StreamingClient interface {
ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage))
Expand All @@ -32,6 +39,7 @@
lifecycle lifecycle.Manager
metadata dtos.Metadata
clientKey *string
state atomic.Int32 // Atomic state: 0=Idle, 1=Running, -1=Destroyed
}

// Status constants
Expand All @@ -54,15 +62,20 @@
metadata: metadata,
clientKey: clientKey,
}
client.state.Store(StateIdle)
client.lifecycle.Setup()
return client
}

// ConnectStreaming connects to streaming
func (s *StreamingClientImpl) ConnectStreaming(token string, streamingStatus chan int, channelList []string, handleIncomingMessage func(IncomingMessage)) {

Check failure on line 71 in service/api/sse/client.go

View check run for this annotation

SonarQube Pull Requests / SonarQube Code Analysis

Refactor this method to reduce its Cognitive Complexity from 21 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonar.harness.io/project/issues?id=go-split-commons&pullRequest=274&issues=ca93a96e-c2de-4c59-9a84-d95636506102&open=ca93a96e-c2de-4c59-9a84-d95636506102

if !s.state.CompareAndSwap(StateIdle, StateRunning) {
s.logger.Info("Client is not in idle state (already running or destroyed). Ignoring")
return
}
if !s.lifecycle.BeginInitialization() {
s.logger.Info("Connection is already in process/running. Ignoring")
s.state.Store(StateIdle) // Reset state since lifecycle check failed
return
}

Expand All @@ -72,10 +85,27 @@
params["v"] = version

go func() {
defer s.lifecycle.ShutdownComplete()
defer func() {
s.lifecycle.ShutdownComplete()
// Reset to idle if not destroyed
if s.state.Load() != StateDestroyed {
s.state.Store(StateIdle)
}
}()

// Early exit if client was destroyed while goroutine was starting
if s.state.Load() <= StateIdle {
s.logger.Info("Client state is not valid (destroyed or idle). Exiting goroutine")
return
}
if !s.lifecycle.InitializationComplete() {
return
}
// Final check before starting connection - prevent race if Destroy was called
if s.state.Load() != StateRunning {
s.logger.Info("Client destroyed before connection started. Exiting goroutine")
return
}
firstEventReceived := gtSync.NewAtomicBool(false)
out := s.sseClient.Do(params, api.AddMetadataToHeaders(s.metadata, nil, s.clientKey), func(m IncomingMessage) {
if firstEventReceived.TestAndSet() && !m.IsError() {
Expand Down Expand Up @@ -113,6 +143,8 @@

// StopStreaming stops streaming
func (s *StreamingClientImpl) StopStreaming() {
// Set atomic state to destroyed immediately to prevent new goroutines
s.state.Store(StateDestroyed)
if !s.lifecycle.BeginShutdown() {
s.logger.Info("SSE client wrapper not running. Ignoring")
return
Expand All @@ -124,5 +156,5 @@

// IsRunning returns true if the client is running
func (s *StreamingClientImpl) IsRunning() bool {
return s.lifecycle.IsRunning()
return s.lifecycle.IsRunning() && s.state.Load() == StateRunning
}
Loading