From 24ba0fcc69b7ecb04fa57df02b43457d60ab343e Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Thu, 6 Nov 2025 09:52:13 +0100 Subject: [PATCH] Drain backlog LHC events and load current fill info upon startup When AliECS core starts, We are not interested in old events except of the latest, so we know the current beam conditions. This works will allow us to get rid of BKP.RetrieveFillInfo() and have a corresponding update call in the LHC plugin. Closes OCTRL-1067. --- common/event/reader.go | 151 +++++++++++++++++++++++++-------- core/integration/lhc/plugin.go | 74 +++++++++++++--- 2 files changed, 176 insertions(+), 49 deletions(-) diff --git a/common/event/reader.go b/common/event/reader.go index 0b348caf..f163e692 100644 --- a/common/event/reader.go +++ b/common/event/reader.go @@ -27,44 +27,42 @@ package event import ( "context" "fmt" + "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger/infologger" pb "github.com/AliceO2Group/Control/common/protos" "github.com/segmentio/kafka-go" "github.com/spf13/viper" "google.golang.org/protobuf/proto" - "sync" ) // Reader interface provides methods to read events. type Reader interface { + // Next should return the next event or cancel if the context is cancelled. Next(ctx context.Context) (*pb.Event, error) + // Last should return the last available event currently present on the topic (or nil if none) + // or cancel if the context is cancelled. + Last(ctx context.Context) (*pb.Event, error) Close() error } // DummyReader is an implementation of Reader that returns no events. type DummyReader struct{} -// Next returns the next event or nil if there are no more events. func (*DummyReader) Next(context.Context) (*pb.Event, error) { return nil, nil } - -// Close closes the DummyReader. -func (*DummyReader) Close() error { return nil } +func (*DummyReader) Last(context.Context) (*pb.Event, error) { return nil, nil } +func (*DummyReader) Close() error { return nil } // KafkaReader reads events from Kafka and provides a blocking, cancellable API to fetch events. -// Consumption mode is chosen at creation time: -// - latestOnly=false: consume everything (from stored offsets or beginning depending on group state) -// - latestOnly=true: seek to latest offsets on start and only receive messages produced after start type KafkaReader struct { *kafka.Reader - mu sync.Mutex - topic string + topic string + brokers []string + groupID string } // NewReaderWithTopic creates a KafkaReader for the provided topic and starts it. -// If latestOnly is true the reader attempts to seek to the latest offsets on start so that -// only new messages (produced after creation) are consumed. -func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *KafkaReader { +func NewReaderWithTopic(topic topic.Topic, groupID string) *KafkaReader { cfg := kafka.ReaderConfig{ Brokers: viper.GetStringSlice("kafkaEndpoints"), Topic: string(topic), @@ -74,39 +72,72 @@ func NewReaderWithTopic(topic topic.Topic, groupID string, latestOnly bool) *Kaf } rk := &KafkaReader{ - Reader: kafka.NewReader(cfg), - topic: string(topic), - } - - if latestOnly { - // best-effort: set offset to last so we don't replay older messages - if err := rk.SetOffset(kafka.LastOffset); err != nil { - log.WithField(infologger.Level, infologger.IL_Devel). - Warnf("failed to set offset to last offset: %v", err) - } + Reader: kafka.NewReader(cfg), + topic: string(topic), + brokers: append([]string{}, cfg.Brokers...), + groupID: groupID, } - return rk } -// Next blocks until the next event is available or ctx is cancelled. It returns an error when the reader is closed -// (io.EOF) or the context is cancelled. The caller is responsible for providing a cancellable ctx. +// Next blocks until the next event is available or ctx is cancelled. func (r *KafkaReader) Next(ctx context.Context) (*pb.Event, error) { if r == nil { return nil, fmt.Errorf("nil reader") } - msg, err := r.ReadMessage(ctx) if err != nil { return nil, err } + return kafkaMessageToEvent(msg) +} - event, err := kafkaMessageToEvent(msg) +// Last fetches the last available message on the topic (considering all partitions). +// If multiple partitions have data, the event with the greatest message timestamp is returned. +func (r *KafkaReader) Last(ctx context.Context) (*pb.Event, error) { + if r == nil { + return nil, fmt.Errorf("nil reader") + } + partitions, err := r.readPartitions() if err != nil { return nil, err } - - return event, nil + var latestEvt *pb.Event + var latestEvtTimeNs int64 + for _, p := range partitions { + if p.Topic != r.topic { + continue + } + first, last, err := r.readFirstAndLast(p.ID) + if err != nil { + log.WithField(infologger.Level, infologger.IL_Devel).WithError(err). + Warnf("failed to read offsets for %s[%d]", r.topic, p.ID) + continue + } + if last <= first { + continue + } + msg, err := r.readAtOffset(ctx, p.ID, last-1) + if err != nil { + log.WithError(err). + WithField(infologger.Level, infologger.IL_Devel). + Warnf("failed to read last message for %s[%d] at offset %d", r.topic, p.ID, last-1) + continue + } + evt, err := kafkaMessageToEvent(msg) + if err != nil { + log.WithError(err). + WithField(infologger.Level, infologger.IL_Devel). + Warnf("failed to decode last message for %s[%d]", r.topic, p.ID) + continue + } + currentEvtTimeNs := msg.Time.UnixNano() + if latestEvt == nil || currentEvtTimeNs > latestEvtTimeNs { + latestEvt = evt + latestEvtTimeNs = currentEvtTimeNs + } + } + return latestEvt, nil } // Close stops the reader. @@ -114,13 +145,7 @@ func (r *KafkaReader) Close() error { if r == nil { return nil } - // Close the underlying kafka reader which will cause ReadMessage to return an error - err := r.Reader.Close() - if err != nil { - log.WithField(infologger.Level, infologger.IL_Devel). - Errorf("failed to close kafka reader: %v", err) - } - return err + return r.Reader.Close() } func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) { @@ -130,3 +155,55 @@ func kafkaMessageToEvent(m kafka.Message) (*pb.Event, error) { } return &evt, nil } + +func (r *KafkaReader) brokerAddr() (string, error) { + if len(r.brokers) == 0 { + return "", fmt.Errorf("no kafka brokers configured") + } + return r.brokers[0], nil +} + +func (r *KafkaReader) readPartitions() ([]kafka.Partition, error) { + addr, err := r.brokerAddr() + if err != nil { + return nil, err + } + conn, err := kafka.Dial("tcp", addr) + if err != nil { + return nil, err + } + defer conn.Close() + return conn.ReadPartitions(r.topic) +} + +func (r *KafkaReader) readFirstAndLast(partition int) (int64, int64, error) { + addr, err := r.brokerAddr() + if err != nil { + return 0, 0, err + } + conn, err := kafka.DialLeader(context.Background(), "tcp", addr, r.topic, partition) + if err != nil { + return 0, 0, err + } + defer conn.Close() + first, last, err := conn.ReadOffsets() + return first, last, err +} + +func (r *KafkaReader) readAtOffset(ctx context.Context, partition int, offset int64) (kafka.Message, error) { + if offset < 0 { + return kafka.Message{}, fmt.Errorf("invalid offset %d", offset) + } + kr := kafka.NewReader(kafka.ReaderConfig{ + Brokers: append([]string{}, r.brokers...), + Topic: r.topic, + Partition: partition, + MinBytes: 1, + MaxBytes: 10e6, + }) + defer kr.Close() + if err := kr.SetOffset(offset); err != nil { + return kafka.Message{}, err + } + return kr.ReadMessage(ctx) +} diff --git a/core/integration/lhc/plugin.go b/core/integration/lhc/plugin.go index 1d5853d9..28cbcc74 100644 --- a/core/integration/lhc/plugin.go +++ b/core/integration/lhc/plugin.go @@ -28,16 +28,16 @@ import ( "context" "encoding/json" "errors" - "github.com/AliceO2Group/Control/common/event/topic" - "github.com/AliceO2Group/Control/common/logger/infologger" - pb "github.com/AliceO2Group/Control/common/protos" "io" "strings" "sync" "time" cmnevent "github.com/AliceO2Group/Control/common/event" + "github.com/AliceO2Group/Control/common/event/topic" "github.com/AliceO2Group/Control/common/logger" + "github.com/AliceO2Group/Control/common/logger/infologger" + pb "github.com/AliceO2Group/Control/common/protos" "github.com/AliceO2Group/Control/common/utils/uid" "github.com/AliceO2Group/Control/core/environment" "github.com/AliceO2Group/Control/core/integration" @@ -51,10 +51,8 @@ var dipClientTopic topic.Topic = "dip.lhc.beam_mode" // Plugin implements integration.Plugin and listens for LHC updates. type Plugin struct { - endpoint string - ctx context.Context - //cancel context.CancelFunc - //wg sync.WaitGroup + endpoint string + ctx context.Context mu sync.Mutex currentState *pb.BeamInfo reader cmnevent.Reader @@ -66,21 +64,73 @@ func NewPlugin(endpoint string) integration.Plugin { } func (p *Plugin) Init(_ string) error { - // use a background context for reader loop; Destroy will Close the reader p.ctx = context.Background() - p.reader = cmnevent.NewReaderWithTopic(dipClientTopic, "o2-aliecs-core.lhc", true) - + p.reader = cmnevent.NewReaderWithTopic(dipClientTopic, "o2-aliecs-core.lhc") if p.reader == nil { return errors.New("could not create a kafka reader for LHC plugin") } - go p.readAndInjectLhcUpdates() - log.Debug("LHC plugin initialized (client started)") + // Always perform a short pre-drain to consume any backlog without injecting. + log.WithField(infologger.Level, infologger.IL_Devel). + Info("LHC plugin: draining any initial backlog") + p.drainBacklog(2 * time.Second) + + // If state is still empty, try reading the latest message once. + p.mu.Lock() + empty := p.currentState == nil || p.currentState.BeamMode == pb.BeamMode_UNKNOWN + p.mu.Unlock() + if empty { + if last, err := p.reader.Last(p.ctx); err != nil { + log.WithField(infologger.Level, infologger.IL_Support).WithError(err).Warn("failed to read last LHC state on init") + } else if last != nil { + if bmEvt := last.GetBeamModeEvent(); bmEvt != nil && bmEvt.GetBeamInfo() != nil { + p.mu.Lock() + p.currentState = bmEvt.GetBeamInfo() + p.mu.Unlock() + } + } else { + // nothing to retrieve in the topic, we move on + } + } + + go p.readAndInjectLhcUpdates() + log.WithField(infologger.Level, infologger.IL_Devel).Debug("LHC plugin initialized (client started)") return nil } +// drainBacklog reads messages for a limited time and only updates the plugin state, without injecting to env manager. +func (p *Plugin) drainBacklog(timeout time.Duration) { + drainCtx, cancel := context.WithTimeout(p.ctx, timeout) + defer cancel() + for { + msg, err := p.reader.Next(drainCtx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + break + } + // transient error: small sleep and continue until timeout + time.Sleep(50 * time.Millisecond) + continue + } + if msg == nil { + continue + } + if beamModeEvent := msg.GetBeamModeEvent(); beamModeEvent != nil && beamModeEvent.GetBeamInfo() != nil { + beamInfo := beamModeEvent.GetBeamInfo() + log.WithField(infologger.Level, infologger.IL_Devel). + Debugf("new LHC update received while draining backlog: BeamMode=%s, FillNumber=%d, FillingScheme=%s, StableBeamsStart=%d, StableBeamsEnd=%d, BeamType=%s", + beamInfo.GetBeamMode().String(), beamInfo.GetFillNumber(), beamInfo.GetFillingSchemeName(), + beamInfo.GetStableBeamsStart(), beamInfo.GetStableBeamsEnd(), beamInfo.GetBeamType()) + + p.mu.Lock() + p.currentState = beamModeEvent.GetBeamInfo() + p.mu.Unlock() + } + } +} + func (p *Plugin) GetName() string { return "lhc" } func (p *Plugin) GetPrettyName() string { return "LHC (DIP/Kafka client)" } func (p *Plugin) GetEndpoint() string {