diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index 2ed8997004..39c1ee0eed 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -108,6 +108,14 @@ reward_timestep = 0.000025 reward_overspeed = 0.05 reward_ade = 0.0 +; --- Logging --- +; EMA coefficient for per-agent log aggregation. Each agent slot tracks a +; smoothed estimate of its completed-episode metrics; on every completion +; slot = alpha*slot + (1 - alpha)*new_episode. alpha=0 collapses to "most +; recent episode only", alpha->1 freezes the slot at its first observation. +; alpha=0.707 has a half-life of ~2 episodes per agent. +log_ema_alpha = 0.707 + ; --- Map --- ; Path to map used for training map_dir = "pufferlib/resources/drive/binaries/carla" diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index ec7f34d2c1..8a66a2b798 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -25,14 +25,29 @@ static PyObject *map_cache_live_count_py( return PyLong_FromLong(live); } +static void prepare_log(Drive *env); +static PyObject *vec_prepare_log_py(PyObject *self __attribute__((unused)), PyObject *args); + // clang-format off #define MY_METHODS \ {"map_cache_size", map_cache_size_py, METH_NOARGS, "Map cache slot count."}, \ - {"map_cache_live_count", map_cache_live_count_py, METH_NOARGS, "Map cache live count."} + {"map_cache_live_count", map_cache_live_count_py, METH_NOARGS, "Map cache live count."}, \ + {"vec_prepare_log", vec_prepare_log_py, METH_VARARGS, "Aggregate per-agent log buffers into each env->log."} // clang-format on #include "../env_binding.h" +static PyObject *vec_prepare_log_py(PyObject *self __attribute__((unused)), PyObject *args) { + VecEnv *vec = unpack_vecenv(args); + if (!vec) { + return NULL; + } + for (int i = 0; i < vec->num_envs; i++) { + prepare_log((Drive *) vec->envs[i]); + } + Py_RETURN_NONE; +} + static int my_put(Env *env, PyObject *args, PyObject *kwargs) { PyObject *obs = PyDict_GetItemString(kwargs, "observations"); if (!PyObject_TypeCheck(obs, &PyArray_Type)) { @@ -1979,6 +1994,7 @@ static int my_init(Env *env, PyObject *args, PyObject *kwargs) { env->reward_randomization = (bool) unpack(kwargs, "reward_randomization"); env->compute_eval_metrics = (bool) unpack(kwargs, "compute_eval_metrics"); env->eval_mode = (int) unpack(kwargs, "eval_mode"); + env->log_ema_alpha = (float) unpack(kwargs, "log_ema_alpha"); env->obs_norm_goal_offset_m = (float) unpack(kwargs, "obs_norm_goal_offset_m"); env->obs_norm_xy_offset_m = (float) unpack(kwargs, "obs_norm_xy_offset_m"); env->obs_norm_veh_length_m = (float) unpack(kwargs, "obs_norm_veh_length_m"); diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 61cc1ec42b..017747c6fe 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -456,6 +456,13 @@ struct Drive { int next_episode_index; int completed_episodes_count; CompletedEpisodeSummary completed_episodes[COMPLETED_EPISODE_QUEUE_CAPACITY]; + + // Per-agent EMA state, one slot per active controllable agent. Allocated + // in init() once active_agent_count is known. + Log *per_agent_log_ema; + int *per_agent_log_count; + int *per_agent_has_data; + float log_ema_alpha; }; typedef struct { @@ -2693,8 +2700,32 @@ static float calculate_puffer_score(Log *log_agent, float duration_steps, float return log_agent->puffer_score; } +// Sum each agent's current EMA slot into env->log and set env->log.n to the +// number of slots that have ever been seeded. vec_log divides by aggregate.n +// downstream to produce the cross-agent mean. +static void prepare_log(Drive *env) { + memset(&env->log, 0, sizeof(Log)); + int num_keys = sizeof(Log) / sizeof(float); + int num_with_data = 0; + for (int a = 0; a < env->active_agent_count; a++) { + if (!env->per_agent_has_data[a]) { + continue; + } + float *slot = (float *) &env->per_agent_log_ema[a]; + float *dst = (float *) &env->log; + for (int j = 0; j < num_keys; j++) { + dst[j] += slot[j]; + } + num_with_data++; + } + env->log.n = (float) num_with_data; +} + static void add_log(Drive *env) { int safe_timestep = (env->timestep > 0) ? env->timestep : 1; + float alpha = env->log_ema_alpha; + float one_minus_alpha = 1.0f - alpha; + int num_log_keys = sizeof(Log) / sizeof(float); for (int i = 0; i < env->active_agent_count; i++) { Agent *agent = &env->agents[env->active_agent_indices[i]]; float episode_duration_s = env->logs[i].episode_length * env->dt; @@ -2702,55 +2733,55 @@ static void add_log(Drive *env) { reference_progress_distance = fmaxf(reference_progress_distance, 1.0f); env->logs[i].progress_ratio = agent->distance_since_spawn / reference_progress_distance; + Log episode_log = {0}; + int offroad = env->logs[i].offroad_rate; - env->log.offroad_rate += offroad; + episode_log.offroad_rate = offroad; int collided = env->logs[i].collision_rate; - env->log.collision_rate += collided; + episode_log.collision_rate = collided; int red_light_violations = env->logs[i].red_light_violation_rate; - env->log.red_light_violation_rate += red_light_violations; + episode_log.red_light_violation_rate = red_light_violations; int total_infractions = (offroad || collided || red_light_violations) ? 1 : 0; - float avg_speed_per_agent = env->logs[i].avg_speed_per_agent; - env->log.avg_speed_per_agent += avg_speed_per_agent / safe_timestep; + episode_log.avg_speed_per_agent = env->logs[i].avg_speed_per_agent / safe_timestep; int num_waypoints_reached = env->logs[i].num_waypoints_reached; - env->log.num_waypoints_reached += num_waypoints_reached; + episode_log.num_waypoints_reached = num_waypoints_reached; int num_goals_reached = env->logs[i].num_goals_reached; - env->log.num_goals_reached += num_goals_reached; - // Score: 1 per agent that reached all 3 target waypoints without - // being removed/stopped. Was hardcoded to >=4, unreachable given - // num_target_waypoints=3 in the ini, so score was always 0. + episode_log.num_goals_reached = num_goals_reached; + // Score: 1 if the agent reached all 3 target waypoints without + // being removed/stopped, else 0. Was hardcoded to >=4, unreachable + // given num_target_waypoints=3 in the ini, so score was always 0. if (num_goals_reached >= 3 && !agent->removed && !agent->stopped) { - env->log.score += 1.0f; + episode_log.score = 1.0f; } if (!offroad && !collided && !red_light_violations && num_waypoints_reached < 1) { - env->log.dnf_rate += 1.0f; + episode_log.dnf_rate = 1.0f; } - env->log.total_distance_travelled += agent->distance_since_spawn; + episode_log.total_distance_travelled = agent->distance_since_spawn; if (total_infractions > 0) { - env->log.total_infractions += 1.0f; + episode_log.total_infractions = 1.0f; } - float displacement_error = env->logs[i].avg_displacement_error; - env->log.avg_displacement_error += displacement_error; - env->log.episode_length += env->logs[i].episode_length; - env->log.episode_return += env->logs[i].episode_return; + episode_log.avg_displacement_error = env->logs[i].avg_displacement_error; + episode_log.episode_length = env->logs[i].episode_length; + episode_log.episode_return = env->logs[i].episode_return; // Per-component reward sums (mirrors compute_rewards' env->rewards[i]+= sites). - env->log.reward_collision += env->logs[i].reward_collision; - env->log.reward_offroad += env->logs[i].reward_offroad; - env->log.reward_red_light += env->logs[i].reward_red_light; - env->log.reward_goal += env->logs[i].reward_goal; - env->log.reward_lane_align += env->logs[i].reward_lane_align; - env->log.reward_lane_center += env->logs[i].reward_lane_center; - env->log.reward_comfort += env->logs[i].reward_comfort; - env->log.reward_velocity += env->logs[i].reward_velocity; - env->log.reward_timestep += env->logs[i].reward_timestep; - env->log.reward_reverse += env->logs[i].reward_reverse; - env->log.reward_overspeed += env->logs[i].reward_overspeed; - env->log.reward_ade += env->logs[i].reward_ade; + episode_log.reward_collision = env->logs[i].reward_collision; + episode_log.reward_offroad = env->logs[i].reward_offroad; + episode_log.reward_red_light = env->logs[i].reward_red_light; + episode_log.reward_goal = env->logs[i].reward_goal; + episode_log.reward_lane_align = env->logs[i].reward_lane_align; + episode_log.reward_lane_center = env->logs[i].reward_lane_center; + episode_log.reward_comfort = env->logs[i].reward_comfort; + episode_log.reward_velocity = env->logs[i].reward_velocity; + episode_log.reward_timestep = env->logs[i].reward_timestep; + episode_log.reward_reverse = env->logs[i].reward_reverse; + episode_log.reward_overspeed = env->logs[i].reward_overspeed; + episode_log.reward_ade = env->logs[i].reward_ade; // Comfort and velocity metrics (normalized per timestep) - env->log.comfort_violation_count += env->logs[i].comfort_violation_count / safe_timestep; - env->log.velocity_progress_sum += env->logs[i].velocity_progress_sum / safe_timestep; + episode_log.comfort_violation_count = env->logs[i].comfort_violation_count / safe_timestep; + episode_log.velocity_progress_sum = env->logs[i].velocity_progress_sum / safe_timestep; // Lane metrics (normalized per timestep for average per episode) - env->log.lane_center_rate += env->logs[i].lane_center_rate / safe_timestep; - env->log.lane_heading_aligned_rate += env->logs[i].lane_heading_aligned_rate / safe_timestep; + episode_log.lane_center_rate = env->logs[i].lane_center_rate / safe_timestep; + episode_log.lane_heading_aligned_rate = env->logs[i].lane_heading_aligned_rate / safe_timestep; if (env->compute_eval_metrics) { env->logs[i].progress_ratio = agent->distance_since_spawn / reference_progress_distance; env->logs[i].comfort_score = calculate_duration_scaled_violation_score( @@ -2758,35 +2789,46 @@ static void add_log(Drive *env) { env->logs[i].episode_length, env->dt); calculate_puffer_score(&env->logs[i], env->logs[i].episode_length, env->dt); - env->log.at_fault_collision_rate += env->logs[i].at_fault_collision_rate; - env->log.ttc_within_bound_rate += env->logs[i].ttc_within_bound_rate; - env->log.wrong_way_distance += env->logs[i].wrong_way_distance; - env->log.speed_violation_sum += env->logs[i].speed_violation_sum; - env->log.progress_ratio += env->logs[i].progress_ratio; - env->log.comfort_score += env->logs[i].comfort_score; - env->log.ttc_violations += env->logs[i].ttc_violations; - env->log.ttc_samples += env->logs[i].ttc_samples; - env->log.multi_lane_time += env->logs[i].multi_lane_time; - env->log.multi_lane_score += env->logs[i].multi_lane_score; + episode_log.at_fault_collision_rate = env->logs[i].at_fault_collision_rate; + episode_log.ttc_within_bound_rate = env->logs[i].ttc_within_bound_rate; + episode_log.wrong_way_distance = env->logs[i].wrong_way_distance; + episode_log.speed_violation_sum = env->logs[i].speed_violation_sum; + episode_log.progress_ratio = env->logs[i].progress_ratio; + episode_log.comfort_score = env->logs[i].comfort_score; + episode_log.ttc_violations = env->logs[i].ttc_violations; + episode_log.ttc_samples = env->logs[i].ttc_samples; + episode_log.multi_lane_time = env->logs[i].multi_lane_time; + episode_log.multi_lane_score = env->logs[i].multi_lane_score; float wrong_dist = env->logs[i].wrong_way_distance; float direction_score = (wrong_dist <= 2.0f) ? 1.0f : (wrong_dist <= 6.0f) ? 0.5f : 0.0f; - env->log.driving_direction_score += direction_score; + episode_log.driving_direction_score = direction_score; float T = safe_timestep * env->dt; float speed_compliance = fmaxf(0.0f, 1.0f - env->logs[i].speed_violation_sum / fmaxf(T, 1e-3f)); - env->log.speed_limit_compliance += speed_compliance; + episode_log.speed_limit_compliance = speed_compliance; float making_progress = (env->logs[i].progress_ratio > 0.2f) ? 1.0f : 0.0f; - env->log.making_progress_rate += making_progress; - env->log.puffer_score += env->logs[i].puffer_score; + episode_log.making_progress_rate = making_progress; + episode_log.puffer_score = env->logs[i].puffer_score; } - env->log.n += 1; + episode_log.expert_static_car_count = env->expert_static_agent_count; + episode_log.static_car_count = env->static_agent_count; + + Log *slot = &env->per_agent_log_ema[i]; + if (!env->per_agent_has_data[i]) { + *slot = episode_log; + env->per_agent_has_data[i] = 1; + } else { + float *slot_f = (float *) slot; + float *ep_f = (float *) &episode_log; + for (int j = 0; j < num_log_keys; j++) { + slot_f[j] = alpha * slot_f[j] + one_minus_alpha * ep_f[j]; + } + } + env->per_agent_log_count[i] += 1; } - // Log composition counts per agent so vec_log averaging recovers the per-env value - env->log.expert_static_car_count += env->expert_static_agent_count; - env->log.static_car_count += env->static_agent_count; if (env->emit_completed_episodes && env->completed_episodes_count < COMPLETED_EPISODE_QUEUE_CAPACITY) { // Snapshot per-episode aggregates from env->logs[] before c_reset @@ -3649,6 +3691,9 @@ void init(Drive *env) { env->human_agent_idx = 0; env->timestep = 0; env->shared_map = NULL; + env->per_agent_log_ema = NULL; + env->per_agent_log_count = NULL; + env->per_agent_has_data = NULL; struct SharedMapData *shared = env->use_map_cache ? map_cache_lookup(env->map_name) : NULL; if (shared != NULL) { @@ -3698,6 +3743,10 @@ void init(Drive *env) { env->logs_capacity = 0; set_active_agents(env); env->logs_capacity = env->active_agent_count; + + env->per_agent_log_ema = (Log *) calloc(env->active_agent_count, sizeof(Log)); + env->per_agent_log_count = (int *) calloc(env->active_agent_count, sizeof(int)); + env->per_agent_has_data = (int *) calloc(env->active_agent_count, sizeof(int)); if (env->simulation_mode == SIMULATION_REPLAY) { remove_bad_trajectories(env); } @@ -3812,6 +3861,9 @@ void c_close(Drive *env) { free(env->tracks_to_predict); free(env->map_name); free(env->ini_file); + free(env->per_agent_log_ema); + free(env->per_agent_log_count); + free(env->per_agent_has_data); } static int compute_observation_size(Drive *env) { diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index c26d2bd8eb..66901bc639 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -74,6 +74,7 @@ def __init__( init_step=0, eval_mode=0, num_eval_scenarios=16, + log_ema_alpha=0.707, init_mode="create_all_valid", control_mode="control_vehicles", map_dir=None, @@ -169,6 +170,7 @@ def __init__( raise ValueError(f"dynamics_model must be 'classic' or 'jerk'. Got: {dynamics_model}") self.eval_mode = eval_mode self.num_eval_scenarios = num_eval_scenarios + self.log_ema_alpha = log_ema_alpha self.termination_mode = termination_mode self.inactive_agent_threshold = inactive_agent_threshold self.rng = np.random.default_rng(seed) @@ -418,6 +420,7 @@ def _env_init_kwargs(self, map_file, max_agents): "reward_randomization": self.reward_randomization, "compute_eval_metrics": self.compute_eval_metrics, "eval_mode": self.eval_mode, + "log_ema_alpha": self.log_ema_alpha, "obs_norm_goal_offset_m": self.obs_norm_goal_offset_m, "obs_norm_xy_offset_m": self.obs_norm_xy_offset_m, "obs_norm_veh_length_m": self.obs_norm_veh_length_m, @@ -477,6 +480,11 @@ def step(self, actions): self._reset_compact_replay_buffer(env_slot, scenarios_after[env_slot]) info.append(tagged) if self.tick % self.report_interval == 0: + # Drain per-agent accumulators into each env->log so the shared + # vec_log produces a per-agent population mean (every agent slot + # contributes one term, regardless of how many episodes it + # completed within the window). + binding.vec_prepare_log(self.c_envs) log = binding.vec_log(self.c_envs, self.num_agents) if log: info.append(log) diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index 9c6cb56f6b..029b4c1daf 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -704,40 +704,33 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { // Iterates over logs one float at a time. Will break // horribly if Log has non-float data. - PyObject *num_agents_arg = PyTuple_GetItem(args, 1); - float num_agents = (float) PyLong_AsLong(num_agents_arg); int num_keys = sizeof(Log) / sizeof(float); Env *env = vec->envs[0]; if (env->eval_mode) { - PyObject *list = PyList_New(vec->num_envs); - PyObject *dict = PyDict_New(); - - if (env->log.n == 0) { - return dict; - } + PyObject *list = PyList_New(0); - // Got enough data. Reset logs and return metrics for (int i = 0; i < vec->num_envs; i++) { - PyObject *dict = PyDict_New(); - Env *env = vec->envs[i]; - float n = env->log.n; - // Average across agents - for (int i = 0; i < num_keys; i++) { - ((float *) &env->log)[i] /= n; + Env *env_i = vec->envs[i]; + float n = env_i->log.n; + if (n == 0) { + continue; } - my_log(dict, env, &env->log, n); + for (int j = 0; j < num_keys; j++) { + ((float *) &env_i->log)[j] /= n; + } + PyObject *dict = PyDict_New(); + my_log(dict, env_i, &env_i->log, n); assign_to_dict(dict, "n", n); - // Add map_name to dict - if (env->map_name) { - PyObject *s = PyUnicode_FromString(env->map_name); + if (env_i->map_name) { + PyObject *s = PyUnicode_FromString(env_i->map_name); if (s != NULL) { PyDict_SetItemString(dict, "map_name", s); Py_DECREF(s); } } - - PyList_SetItem(list, i, dict); + PyList_Append(list, dict); + Py_DECREF(dict); } // Reset logs to 0 after extracting metrics (prevents accumulation across episodes) for (int i = 0; i < vec->num_envs; i++) { @@ -758,16 +751,11 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { PyObject *dict = PyDict_New(); - // Only log if we have at least num_agents worth of data - Env *env = vec->envs[0]; - if (env->eval_mode) { - if (aggregate.n == 0) { - return dict; - } - } else { - if (aggregate.n < num_agents) { - return dict; - } + // aggregate.n is the divisor for every field below; skip the emission + // when no env has contributed any data (n=0 would divide by zero and + // the dict would carry no signal anyway). + if (aggregate.n < 1) { + return dict; } // Got enough data. Reset logs and return metrics diff --git a/tests/test_drive_per_agent_logging.py b/tests/test_drive_per_agent_logging.py new file mode 100644 index 0000000000..0ac7470073 --- /dev/null +++ b/tests/test_drive_per_agent_logging.py @@ -0,0 +1,121 @@ +"""Behavioral tests for the per-agent EMA logging path in Drive's vec_log. + +Covered: +- T1 (gate): vec_log emits nothing while no agent has completed an episode; + the first emission appears exactly when the scenario truncates. +- T2 (steady-state n): after enough steps for every agent to complete at + least one episode, dict["n"] equals num_agents. +- T3 (state persistence): two consecutive emissions with no new completions + between them produce identical metric values, proving prepare_log no + longer resets the per-agent EMAs. +""" + +from pathlib import Path + +import numpy as np +import pytest + +from pufferlib.ocean.drive.drive import Drive + +MAP_DIR = Path(__file__).resolve().parents[1] / "pufferlib/resources/drive/binaries/carla" + +NUM_AGENTS = 32 +SCENARIO_LENGTH = 5 + + +def _log_dicts(info): + """Filter info for the vec_log aggregate dict (carries both n and episode_return).""" + return [d for d in info if isinstance(d, dict) and "n" in d and "episode_return" in d] + + +def _make_env(): + if not MAP_DIR.is_dir() or not any(MAP_DIR.glob("*.bin")): + pytest.skip(f"Drive map binaries not available at {MAP_DIR}") + return Drive( + num_agents=NUM_AGENTS, + num_maps=1, + min_agents_per_env=1, + max_agents_per_env=8, + scenario_length=SCENARIO_LENGTH, + report_interval=1, + map_dir=str(MAP_DIR), + log_ema_alpha=0.95, + ) + + +def test_gate_skips_emissions_until_first_completion(): + """T1: vec_log returns no log dict until at least one agent has completed an + episode. With synced agents under null actions, completions first happen + when timestep reaches scenario_length.""" + env = _make_env() + env.reset(seed=0) + + # Steps 1..(L-1): no agent has truncated yet, gate sees aggregate.n == 0. + for step_idx in range(SCENARIO_LENGTH - 1): + _, _, _, _, info = env.step(np.zeros_like(env.actions)) + assert not _log_dicts(info), ( + f"Unexpected emission at step {step_idx + 1} (timestep={step_idx + 1}); " + "gate should skip before any agent has completed." + ) + + # Step L: all agents truncate at timestep == scenario_length. + _, _, _, _, info = env.step(np.zeros_like(env.actions)) + logs = _log_dicts(info) + assert len(logs) == 1, f"Expected exactly one log dict at scenario end, got {len(logs)}" + assert logs[0]["n"] >= 1, f"Expected n>=1 at first emission, got n={logs[0]['n']}" + + env.close() + + +def test_steady_state_n_equals_num_agents(): + """T2: once every agent has completed at least one episode, the emitted + n is the full population. With synced agents this is true from the first + emission onward.""" + env = _make_env() + env.reset(seed=0) + + # Run multiple scenarios so every agent has contributed. + last_log = None + for _ in range(4 * SCENARIO_LENGTH): + _, _, _, _, info = env.step(np.zeros_like(env.actions)) + logs = _log_dicts(info) + if logs: + last_log = logs[-1] + + assert last_log is not None, "Expected at least one emission across 4 scenarios" + assert last_log["n"] == NUM_AGENTS, ( + f"Expected steady-state n={NUM_AGENTS}, got n={last_log['n']} (some agents missing from the population mean)" + ) + + env.close() + + +def test_emissions_identical_when_no_new_completions(): + """T3: prepare_log preserves per-agent EMA state across emissions. Two + consecutive emissions with no intervening completions must produce + bit-for-bit identical metric values.""" + env = _make_env() + env.reset(seed=0) + + # Reach the first completion at timestep == scenario_length. + for _ in range(SCENARIO_LENGTH): + env.step(np.zeros_like(env.actions)) + + # Next two emissions sit between completions (no agent truncates between + # timestep=1 and timestep=scenario_length-1 of the next scenario). + _, _, _, _, info_a = env.step(np.zeros_like(env.actions)) + _, _, _, _, info_b = env.step(np.zeros_like(env.actions)) + + log_a = _log_dicts(info_a) + log_b = _log_dicts(info_b) + assert log_a and log_b, "Both consecutive steps should emit" + log_a, log_b = log_a[-1], log_b[-1] + + assert log_a["n"] == log_b["n"], f"n changed without new completions: {log_a['n']} vs {log_b['n']}" + for key in ("episode_return", "episode_length", "collision_rate", "offroad_rate"): + assert log_a[key] == pytest.approx(log_b[key], rel=0, abs=1e-6), ( + f"{key} changed without new completions: {log_a[key]} vs {log_b[key]} " + "(prepare_log may be resetting per-agent state)" + ) + + env.close()