Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 11 additions & 122 deletions src/batcontrol/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .forecastsolar import ForecastSolar as solar_factory

from .forecastconsumption import Consumption as consumption_factory
from .forecast_metrics import ForecastMetrics

ERROR_IGNORE_TIME = 600 # 10 Minutes
EVALUATIONS_EVERY_MINUTES = 3 # Every x minutes on the clock
Expand Down Expand Up @@ -604,6 +605,7 @@ def run(self):
# Factorize [0] to account for elapsed time
production[0] *= (1 - elapsed_in_current)
consumption[0] *= (1 - elapsed_in_current)
net_consumption = consumption - production

logger.debug(
'Current interval factorization: elapsed=%.3f, remaining=%.3f',
Expand Down Expand Up @@ -683,16 +685,21 @@ def run(self):
if self.mqtt_api is not None:
self.mqtt_api.publish_min_dynamic_price_diff(
calc_output.min_dynamic_price_difference)
solar_active, surplus_wh = self._compute_solar_active_and_surplus(
solar_active, surplus_wh = ForecastMetrics.solar_active_and_surplus(
production, consumption, calc_input.free_capacity
)
self.mqtt_api.publish_solar_active(solar_active)
self.mqtt_api.publish_solar_surplus(surplus_wh)
night_surplus_wh = self._compute_night_surplus(
production, consumption,
pv_start_wh = ForecastMetrics.pv_start_battery(
net_consumption,
calc_input.stored_usable_energy, calc_input.free_capacity
)
self.mqtt_api.publish_pv_start_battery(pv_start_wh)
forecast_min_wh = ForecastMetrics.forecast_min_battery(
net_consumption,
calc_input.stored_usable_energy, calc_input.free_capacity
)
self.mqtt_api.publish_night_surplus(night_surplus_wh)
self.mqtt_api.publish_forecast_min_battery(forecast_min_wh)

if self.discharge_blocked and not \
self.general_logic.is_discharge_always_allowed_soc(self.get_SOC()):
Expand Down Expand Up @@ -876,124 +883,6 @@ def get_reserved_energy(self) -> float:
""" Returns the reserved energy in Wh from last calculation """
return self.last_reserved_energy

def _compute_solar_active_and_surplus(
self,
production: np.ndarray,
consumption: np.ndarray,
free_capacity: float) -> tuple:
"""Compute solar-active flag and expected surplus energy.

Returns:
solar_active (bool): True iff solar is producing in slot 0
surplus_wh (float): Expected solar overflow in Wh (>0 = WP can run)

When solar is active, surplus is the overflow in the current production
window. Otherwise, surplus is the expected overflow at the end of the
next production window after the battery has bridged consumption until
solar restarts.
"""
net_consumption = consumption - production

# Find start and end of the FIRST production window only
production_start: Optional[int] = None
production_end_current: Optional[int] = None
for i, p in enumerate(production):
if p > 0:
if production_start is None:
production_start = i
production_end_current = i
elif production_start is not None:
break

solar_active = production_start == 0

if production_start is None:
surplus_wh = 0.0
else:
bridge_wh = max(0.0, float(np.sum(net_consumption[:production_start])))
end_idx = (production_end_current + 1) if production_end_current is not None \
else production_start + 1
solar_net_wh = float(-np.sum(net_consumption[production_start:end_idx]))
surplus_wh = max(0.0, solar_net_wh - free_capacity - bridge_wh)

logger.debug(
'Solar active: %s, surplus: %.1f Wh (free_cap=%.1f Wh)',
solar_active, surplus_wh, free_capacity
)
return solar_active, surplus_wh

def _compute_night_surplus(
self,
production: np.ndarray,
consumption: np.ndarray,
stored_usable_energy: float,
free_capacity: float) -> float:
"""Compute expected battery surplus at the start of the next production window.

Answers the question: after tonight's discharge, how much charge will remain
in the battery when tomorrow's solar production starts?

The calculation intentionally projects through the entire first production
window (including any solar charging) to obtain the battery level at production
end. From there it subtracts overnight consumption to arrive at the battery
level at the next morning's production start:

battery_at_production_end - night_consumption

When solar is currently inactive (e.g. early morning), this means net_delta
covers the bridge discharge AND the upcoming solar charging. This is deliberate:
stopping at production_start would give the battery level at dawn of today, not
at dusk — which is the wrong baseline for the overnight calculation.

If no second production window exists within the forecast horizon,
night_consumption covers the remaining forecast slots (best available proxy).

Returns 0.0 if no solar production window exists in the forecast at all.
"""
net_consumption = consumption - production

# Find start and end of the first production window
production_start: Optional[int] = None
production_end: Optional[int] = None
for i, p in enumerate(production):
if p > 0:
if production_start is None:
production_start = i
production_end = i
elif production_start is not None:
break

if production_start is None:
return 0.0

end_idx = production_end + 1 # type: ignore[operator]

# Project battery level at end of first production window (clamped to [0, max])
net_delta = float(-np.sum(net_consumption[0:end_idx]))
battery_at_end = stored_usable_energy + min(
free_capacity, max(-stored_usable_energy, net_delta)
)

# Find the start of the next (second) production window after the night gap
next_production_start: Optional[int] = None
for i in range(end_idx, len(production)):
if production[i] > 0:
next_production_start = i
break
night_end = next_production_start if next_production_start is not None \
else len(production)

night_consumption_wh = max(0.0, float(np.sum(net_consumption[end_idx:night_end])))

night_surplus_wh = max(0.0, battery_at_end - night_consumption_wh)

logger.debug(
'Night surplus: %.1f Wh (battery_at_production_end=%.1f Wh,'
' night_consumption=%.1f Wh, night_slots=%d)',
night_surplus_wh, battery_at_end, night_consumption_wh, night_end - end_idx
)
return night_surplus_wh

def set_stored_energy(self, stored_energy) -> None:
""" Set the stored energy in Wh """
self.last_stored_energy = stored_energy
Expand Down
117 changes: 117 additions & 0 deletions src/batcontrol/forecast_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
Forecast-derived battery metrics for state estimation and load-control decisions.

ForecastMetrics computes indicators from production/consumption forecast arrays
and current battery state. All methods are stateless with respect to object
state; they emit debug log messages but do not mutate any shared state.

Metrics:
solar_active_and_surplus -- solar-active flag + expected PV overflow (Wh)
pv_start_battery -- battery level (Wh) at next net-charging point
forecast_min_battery -- minimum battery level (Wh) over forecast horizon
"""
import logging
from typing import Optional, Tuple

import numpy as np

logger = logging.getLogger(__name__)


class ForecastMetrics:
"""Pure-function metrics derived from forecast arrays and battery state."""

@staticmethod
def solar_active_and_surplus(
production: np.ndarray,
consumption: np.ndarray,
free_capacity: float) -> Tuple[bool, float]:
"""Compute solar-active flag and expected surplus energy.

Returns:
solar_active (bool): True iff solar is producing in slot 0
surplus_wh (float): Expected solar overflow in Wh (>0 = WP can run)

When solar is active, surplus is the overflow in the current production
window. Otherwise, surplus is the expected overflow at the end of the
next production window after the battery has bridged consumption until
solar restarts.
"""
net_consumption = consumption - production

production_start: Optional[int] = None
production_end_current: Optional[int] = None
for i, p in enumerate(production):
if p > 0:
if production_start is None:
production_start = i
production_end_current = i
elif production_start is not None:
break

solar_active = production_start == 0

if production_start is None:
surplus_wh = 0.0
else:
bridge_wh = max(0.0, float(np.sum(net_consumption[:production_start])))
end_idx = (production_end_current + 1) if production_end_current is not None \
else production_start + 1
solar_net_wh = float(-np.sum(net_consumption[production_start:end_idx]))
surplus_wh = max(0.0, solar_net_wh - free_capacity - bridge_wh)

logger.debug(
'Solar active: %s, surplus: %.1f Wh (free_cap=%.1f Wh)',
solar_active, surplus_wh, free_capacity
)
return solar_active, surplus_wh

@staticmethod
def pv_start_battery(
net_consumption: np.ndarray,
stored_usable_energy: float,
free_capacity: float) -> float:
"""Battery level (Wh above MIN_SOC) at the start of the next net-charging window.

Simulates slot-by-slot discharge until the first slot where
net_consumption < 0 (solar production exceeds household consumption).
That crossing point is when the battery transitions from discharging to
charging and is the most meaningful reference for overnight planning.

Returns 0.0 if the battery reaches MIN_SOC before that point, or if no
net-charging slot exists in the forecast at all.
"""
battery = stored_usable_energy
max_battery = stored_usable_energy + free_capacity
for net in net_consumption:
if net < 0:
return battery
battery = max(0.0, min(max_battery, battery - net))
return 0.0

@staticmethod
def forecast_min_battery(
net_consumption: np.ndarray,
stored_usable_energy: float,
free_capacity: float) -> float:
"""Minimum battery level (Wh above MIN_SOC) over the entire forecast horizon.

Simulates slot-by-slot with proper floor (MIN_SOC = 0 usable) and ceiling
(MAX_SOC = stored_usable + free_capacity) clamping at each step.
Returns the lowest point reached during the simulation.

A value of 0 means the battery is expected to hit MIN_SOC at some point
in the forecast -- a signal to be conservative with flexible loads.
"""
battery = stored_usable_energy
max_battery = stored_usable_energy + free_capacity
min_battery = stored_usable_energy
for net in net_consumption:
battery = max(0.0, min(max_battery, battery - net))
if battery < min_battery:
min_battery = battery
logger.debug(
'Forecast min battery: %.1f Wh (stored=%.1f Wh, slots=%d)',
min_battery, stored_usable_energy, len(net_consumption)
)
return min_battery
53 changes: 42 additions & 11 deletions src/batcontrol/mqtt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
- /control_source: source that last selected the current control state (api or optimizer)
- /solar_surplus_wh: expected solar surplus energy in Wh (>0 means usable surplus available)
- /solar_active: bool indicating whether solar is currently producing (slot 0 > 0)
- /night_surplus_wh: expected battery surplus in Wh at start of next production window (>0 means leftover charge after overnight discharge)
- /pv_start_battery_wh: battery level in Wh (above MIN_SOC) at the next net-charging point (when PV first exceeds consumption)
- /forecast_min_battery_wh: minimum battery level in Wh (above MIN_SOC) over the entire forecast horizon (0 = shortage expected)

The following statistical arrays are published as JSON arrays:
- /FCST/production: forecasted production in W
Expand Down Expand Up @@ -499,16 +500,28 @@ def publish_solar_surplus(self, surplus_wh: float) -> None:
f'{surplus_wh:.1f}'
)

def publish_night_surplus(self, surplus_wh: float) -> None:
""" Publish the expected battery surplus at the start of the next production window.
/night_surplus_wh
Positive values mean the battery will still hold charge (above MIN_SOC)
when solar production resumes the next morning.
def publish_pv_start_battery(self, battery_wh: float) -> None:
""" Publish the battery level at the next net-charging point.
/pv_start_battery_wh
Energy in Wh above MIN_SOC at the moment PV production first exceeds
household consumption. 0 if battery hits MIN_SOC before that point.
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/night_surplus_wh',
f'{surplus_wh:.1f}'
self.base_topic + '/pv_start_battery_wh',
f'{battery_wh:.1f}'
)

def publish_forecast_min_battery(self, battery_wh: float) -> None:
""" Publish the minimum battery level over the entire forecast horizon.
/forecast_min_battery_wh
Energy in Wh above MIN_SOC at the trough of the slot-by-slot simulation.
0 means the battery is expected to hit MIN_SOC at some point.
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/forecast_min_battery_wh',
f'{battery_wh:.1f}'
)

def publish_solar_active(self, active: bool) -> None:
Expand Down Expand Up @@ -922,12 +935,20 @@ def send_mqtt_discovery_messages(self) -> None:
self.base_topic + "/solar_surplus_wh")

self.publish_mqtt_discovery_message(
"Night Surplus",
"batcontrol_night_surplus_wh",
"PV Start Battery",
"batcontrol_pv_start_battery_wh",
"sensor",
"energy",
"Wh",
self.base_topic + "/pv_start_battery_wh")

self.publish_mqtt_discovery_message(
"Forecast Min Battery",
"batcontrol_forecast_min_battery_wh",
"sensor",
"energy",
"Wh",
self.base_topic + "/night_surplus_wh")
self.base_topic + "/forecast_min_battery_wh")

self.publish_mqtt_discovery_message(
"Solar Active",
Expand All @@ -939,6 +960,16 @@ def send_mqtt_discovery_messages(self) -> None:
entity_category="diagnostic",
value_template="{% if value == 'true' %}ON{% else %}OFF{% endif %}")

# TODO(0.9.1): remove this tombstone block once brokers have been cleaned up.
# Remove legacy retained discovery config for the renamed metric.
# An empty retained payload deletes the HA entity from existing brokers.
if self.client.is_connected():
self.client.publish(
self.auto_discover_topic +
'/sensor/batcontrol/batcontrol_night_surplus_wh/config',
'',
retain=True)

def send_mqtt_discovery_for_mode(self) -> None:
""" Publish Home Assistant MQTT Auto Discovery message for mode"""
val_templ = (
Expand Down
2 changes: 2 additions & 0 deletions tests/batcontrol/test_mqtt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def _make_discovery_stub():
api.send_mqtt_discovery_messages = (
MqttApi.send_mqtt_discovery_messages.__get__(api, MqttApi)
)
api.client = MagicMock()
api.auto_discover_topic = 'homeassistant'
return api


Expand Down
Loading