Skip to content

Commit cbd57d9

Browse files
authored
Merge pull request #383 from capocchi/version-5.1
Version 5.1
2 parents 5bcf8f7 + 7825f27 commit cbd57d9

19 files changed

+1669
-2056
lines changed
Lines changed: 100 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,36 @@
1-
# ---------------------------------------------------------------------------
2-
# Kafka-based IN-MEMORY worker (thread) Class
3-
# ---------------------------------------------------------------------------
4-
# Requires: pip install confluent-kafka
5-
import threading
6-
import json
1+
# -*- coding: utf-8 -*-
2+
3+
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
4+
# InMemoryKafkaWorker.py ---
5+
# --------------------------------
6+
# Copyright (c) 2025
7+
# L. CAPOCCHI (capocchi@univ-corse.fr)
8+
# SPE Lab - SISU Group - University of Corsica
9+
# --------------------------------
10+
# Version 1.0 last modified: 12/22/25
11+
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
12+
#
13+
# GENERAL NOTES AND REMARKS:
14+
#
15+
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
16+
17+
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
18+
#
19+
# GLOBAL VARIABLES AND FUNCTIONS
20+
#
21+
## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ## ##
22+
23+
# ------------------------------------------------------------------
24+
# Kafka-specific implementation
25+
# ------------------------------------------------------------------
26+
727
import logging
828
import time
929

10-
from abc import ABC, abstractmethod
30+
from DEVSKernel.KafkaDEVS.InMemoryMessagingWorker import InMemoryMessagingWorker, MessageAdapter, MessageConsumer, MessageProducer
31+
from typing import Dict, Any
1132

12-
from DEVSKernel.KafkaDEVS.logconfig import LOGGING_LEVEL, worker_kafka_logger
33+
from DEVSKernel.KafkaDEVS.logconfig import LOGGING_LEVEL
1334

1435
logger = logging.getLogger("DEVSKernel.KafkaDEVS.InMemoryKafkaWorker")
1536
logger.setLevel(LOGGING_LEVEL)
@@ -20,152 +41,74 @@
2041
Producer = None
2142
Consumer = None
2243

23-
class InMemoryKafkaWorker(threading.Thread):
24-
"""Worker thread that manages one atomic model in memory."""
25-
26-
def __init__(self, model_name, aDEVS, bootstrap_server, in_topic=None, out_topic=None):
27-
super().__init__(daemon=True)
28-
self.aDEVS = aDEVS
29-
self.model_name = model_name
30-
self.bootstrap_server = bootstrap_server
31-
self.running = True
32-
33-
# Topics explicitement fournis par la stratégie
34-
self.in_topic = in_topic ### from coodinator
35-
self.out_topic = out_topic ### to coordinator
36-
37-
group_id = f"worker-thread-{self.aDEVS.myID}-{int(time.time() * 1000)}"
38-
39-
# Kafka consumer pour le topic de travail dédié
40-
self.consumer = Consumer({
41-
"bootstrap.servers": bootstrap_server,
42-
"group.id": group_id,
43-
"auto.offset.reset": "latest",
44-
"enable.auto.commit": True,
45-
})
46-
self.consumer.subscribe([self.in_topic])
47-
48-
# Kafka producer pour renvoyer les réponses
49-
self.producer = Producer({
50-
"bootstrap.servers": bootstrap_server
51-
})
52-
53-
logger.info(
54-
" [Thread-%s] Created for model %s (in topic=%s, out topic=%s)",
55-
self.aDEVS.myID, self.model_name, self.in_topic, self.out_topic
56-
)
57-
58-
def get_model(self):
59-
"""Returns the atomic DEVS model managed by this worker."""
60-
return self.aDEVS
61-
62-
def get_model_label(self):
63-
return self.model_name
64-
65-
def get_model_time_next(self):
66-
return self.aDEVS.timeNext
67-
68-
# ------------------------------------------------------------------
69-
# Traduction message DEVS -> appels sur le modèle
70-
# ------------------------------------------------------------------
71-
72-
def do_initialize(self, t:float):
73-
"""Initialise le modèle atomique avant de démarrer la boucle."""
74-
self.aDEVS.sigma = 0.0
75-
self.aDEVS.timeLast = 0.0
76-
self.aDEVS.myTimeAdvance = self.aDEVS.timeAdvance()
77-
self.aDEVS.timeNext = self.aDEVS.timeLast + self.aDEVS.myTimeAdvance
78-
79-
if self.aDEVS.myTimeAdvance != float("inf"):
80-
self.aDEVS.myTimeAdvance += t
81-
82-
def do_external_transition(self, t, msg):
83-
"""Effectue une transition interne sur le modèle atomique."""
84-
85-
port_inputs = {}
86-
87-
# Construire dict {port_obj -> Message(value, time)}
88-
from DomainInterface.Object import Message
89-
for pv in msg.portValueList:
90-
# pv.portIdentifier doit matcher le nom du port d'entrée
91-
for iport in self.aDEVS.IPorts:
92-
if iport.name == pv.portIdentifier:
93-
m = Message(pv.value, t)
94-
port_inputs[iport] = m
95-
break
96-
97-
self.aDEVS.myInput = port_inputs
98-
99-
# update elapsed time. This is necessary for the call to the external
100-
# transition function, which is used to update the DEVS' state.
101-
self.aDEVS.elapsed = t - self.aDEVS.timeLast
102-
103-
self.aDEVS.extTransition()
104-
105-
# Udpate time variables:
106-
self.aDEVS.timeLast = t
107-
self.aDEVS.myTimeAdvance = self.aDEVS.timeAdvance()
108-
self.aDEVS.timeNext = self.aDEVS.timeLast + self.aDEVS.myTimeAdvance
109-
if self.aDEVS.myTimeAdvance != float("inf"): self.aDEVS.myTimeAdvance += t
110-
self.aDEVS.elapsed = 0
111-
112-
def do_internal_transition(self, t:float):
113-
"""Effectue une transition interne sur le modèle atomique."""
114-
115-
time_last = self.aDEVS.timeLast
116-
self.aDEVS.elapsed = t - time_last
117-
118-
self.aDEVS.intTransition()
119-
120-
self.aDEVS.timeLast = t
121-
self.aDEVS.myTimeAdvance = self.aDEVS.timeAdvance()
122-
self.aDEVS.timeNext = self.aDEVS.timeLast + self.aDEVS.myTimeAdvance
123-
if self.aDEVS.myTimeAdvance != float('inf'): self.aDEVS.myTimeAdvance += t
124-
self.aDEVS.elapsed = 0
125-
126-
def do_output_function(self):
127-
"""Appelle outputFnc() sur le modèle atomique et retourne les sorties."""
128-
129-
self.aDEVS.outputFnc()
130-
131-
@abstractmethod
132-
def _process_standard(self, data):
133-
"""
134-
Process standard DEVS message format.
135-
Must be implemented by subclasses.
136-
137-
Args:
138-
data (dict): Parsed JSON message data
139-
"""
140-
pass
141-
142-
# ------------------------------------------------------------------
143-
# Boucle principale
144-
# ------------------------------------------------------------------
145-
146-
def run(self):
147-
logger.info(f" [Thread-{self.aDEVS.myID}] Started")
148-
149-
while self.running:
150-
msg = self.consumer.poll(timeout=0.5)
151-
if msg is None or msg.error():
152-
continue
153-
154-
try:
155-
raw = msg.value().decode("utf-8")
156-
data = json.loads(raw)
157-
158-
worker_kafka_logger.debug(f"[Thread-{self.aDEVS.myID}] IN: topic={msg.topic()} value={raw}")
159-
160-
self._process_standard(data)
161-
162-
except Exception as e:
163-
logger.exception("[Thread-%s] Error in run loop: %s", self.aDEVS.myID, e)
164-
165-
self.consumer.close()
166-
logger.info(f" [Thread-{self.aDEVS.myID}] Stopped")
167-
168-
169-
def stop(self):
170-
"""Stop the worker thread."""
171-
self.running = False
44+
class KafkaMessageAdapter(MessageAdapter):
45+
"""Kafka-specific implementation of MessageAdapter"""
46+
47+
def create_consumer(self, config: Dict[str, Any]) -> MessageConsumer:
48+
"""Create a Kafka consumer"""
49+
return Consumer(config)
50+
51+
def create_producer(self, config: Dict[str, Any]) -> MessageProducer:
52+
"""Create a Kafka producer"""
53+
return Producer(config)
54+
55+
def extract_message_value(self, message: Any) -> bytes:
56+
"""Extract value from Kafka message"""
57+
return message.value()
58+
59+
def has_error(self, message: Any) -> bool:
60+
"""Check if Kafka message has an error"""
61+
return message.error() is not None
62+
63+
def get_topic(self, message: Any) -> str:
64+
"""Get topic from Kafka message"""
65+
return message.topic()
66+
67+
class InMemoryKafkaWorker(InMemoryMessagingWorker):
68+
"""
69+
Kafka-specific worker thread that manages one atomic model in memory.
70+
This is a concrete implementation of InMemoryMessagingWorker for Kafka.
71+
"""
72+
73+
def __init__(
74+
self,
75+
model_name: str,
76+
aDEVS,
77+
bootstrap_server: str,
78+
in_topic: str = None,
79+
out_topic: str = None
80+
):
81+
"""
82+
Initialize the Kafka worker.
83+
84+
Args:
85+
model_name: Name of the DEVS model
86+
aDEVS: The atomic DEVS model instance
87+
bootstrap_server: Kafka bootstrap server address
88+
in_topic: Topic to receive messages from coordinator
89+
out_topic: Topic to send messages to coordinator
90+
"""
91+
group_id = f"worker-thread-{aDEVS.myID}-{int(time.time() * 1000)}"
92+
93+
consumer_config = {
94+
"bootstrap.servers": bootstrap_server,
95+
"group.id": group_id,
96+
"auto.offset.reset": "latest",
97+
"enable.auto.commit": True,
98+
}
99+
100+
producer_config = {
101+
"bootstrap.servers": bootstrap_server
102+
}
103+
104+
adapter = KafkaMessageAdapter()
105+
106+
super().__init__(
107+
model_name=model_name,
108+
aDEVS=aDEVS,
109+
adapter=adapter,
110+
consumer_config=consumer_config,
111+
producer_config=producer_config,
112+
in_topic=in_topic,
113+
out_topic=out_topic
114+
)

0 commit comments

Comments
 (0)