From 994f6675f5c02ee73f0fe614651622a4e64ffb89 Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 13:46:47 -0400 Subject: [PATCH 01/11] WIP: vec_log per-agent population mean to remove completion-rate bias Replaces the existing 'sum-over-completed-episodes / completed-episode-count' aggregation in vec_log with a two-stage per-agent mean: each agent slot contributes one window-mean (mean over its completions since last emit), then those means are averaged across agents. An agent that completes 10 short infraction-terminated episodes now has the same weight in the metric as an agent that completes one full clean episode. drive.h: - New per-env state: per_agent_log_sum[], per_agent_log_count[], per_agent_log_capacity. Grow-only realloc handles REPLAY scenarios where active_agent_count can vary across c_reset. - add_log now stamps the completing agent's slot instead of env->log, and bumps per_agent_log_count[i] instead of env->log.n. - New prepare_log: drains each agent's slot to (sum / count), sums across agents into env->log, sets env->log.n = #agents-with-data. Resets the per-agent buffers in place. - env_static_car_count and static_car_count are folded into each agent's slot so the population mean recovers the env's per-scenario value. drive/binding.c: - New vec_prepare_log binding (MY_METHODS) that calls prepare_log on every env in a VecEnv. drive/drive.py: - Calls binding.vec_prepare_log right before binding.vec_log every report_interval steps. env_binding.h: - Relaxes the vec_log gate from `aggregate.n < num_agents` (wait until enough completed episodes) to `aggregate.n < 1` (emit if anyone has data). For Drive this means "emit if any env has at least one agent with a window-mean"; for other ocean envs sharing this header it means smaller batches emitted more often, which the Python-side mean_and_log in pufferl.py already amortizes via its 0.25s rate-limit. - num_agents arg kept on the C signature for caller-API compatibility, but is now unused. Validated locally: `python setup.py build_ext --inplace --force` clean, existing scenario-length and single-agent-yaml tests pass, and a 64-agent smoke run emits one log dict per scenario_length window with n=num_agents. Co-Authored-By: Claude Opus 4.7 (1M context) --- pufferlib/ocean/drive/binding.c | 23 ++++- pufferlib/ocean/drive/drive.h | 162 +++++++++++++++++++++++--------- pufferlib/ocean/drive/drive.py | 5 + pufferlib/ocean/env_binding.h | 21 ++--- 4 files changed, 152 insertions(+), 59 deletions(-) diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index d568c3fe5b..7b90b05a32 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -25,14 +25,35 @@ static PyObject *map_cache_live_count_py( return PyLong_FromLong(live); } +// Forward declare so vec_prepare_log_py can call it before env_binding.h's +// unpack_vecenv definition is visible; the definition lives in drive.h. +static void prepare_log(Drive *env); + +// Drain each env's per-agent log buffers into env->log so the subsequent +// vec_log call sees a per-agent-mean-weighted aggregate. Must be called +// once per intended vec_log emission. +static PyObject *vec_prepare_log_py(PyObject *self __attribute__((unused)), PyObject *args); + // clang-format off #define MY_METHODS \ {"map_cache_size", map_cache_size_py, METH_NOARGS, "Map cache slot count."}, \ - {"map_cache_live_count", map_cache_live_count_py, METH_NOARGS, "Map cache live count."} + {"map_cache_live_count", map_cache_live_count_py, METH_NOARGS, "Map cache live count."}, \ + {"vec_prepare_log", vec_prepare_log_py, METH_VARARGS, "Aggregate per-agent log buffers into each env->log."} // clang-format on #include "../env_binding.h" +static PyObject *vec_prepare_log_py(PyObject *self __attribute__((unused)), PyObject *args) { + VecEnv *vec = unpack_vecenv(args); + if (!vec) { + return NULL; + } + for (int i = 0; i < vec->num_envs; i++) { + prepare_log((Drive *) vec->envs[i]); + } + Py_RETURN_NONE; +} + static int my_put(Env *env, PyObject *args, PyObject *kwargs) { PyObject *obs = PyDict_GetItemString(kwargs, "observations"); if (!PyObject_TypeCheck(obs, &PyArray_Type)) { diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 20d29a392e..174dfa436e 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -454,6 +454,16 @@ struct Drive { int next_episode_index; int completed_episodes_count; CompletedEpisodeSummary completed_episodes[COMPLETED_EPISODE_QUEUE_CAPACITY]; + + // Per-agent window accumulators for vec_log: each slot holds the running + // sum and count of completed-episode Log values for one agent. Drained + // and zeroed in vec_prepare_log() each time a vec_log emission fires, so + // every agent contributes equal weight to the cross-agent mean regardless + // of how many episodes it completed within the window. Sized at + // per_agent_log_capacity = max active-controllable slots seen so far. + Log *per_agent_log_sum; + int *per_agent_log_count; + int per_agent_log_capacity; }; typedef struct { @@ -2572,64 +2582,113 @@ static float calculate_puffer_score(Log *log_agent, float duration_steps, float return log_agent->puffer_score; } +// Grow-only per-agent log buffer. Active slot count can vary across c_reset +// (REPLAY scenarios with different agent populations); we never shrink so +// pending data from a prior window survives. +static void ensure_per_agent_log_capacity(Drive *env, int needed) { + if (env->per_agent_log_capacity >= needed) { + return; + } + int old_cap = env->per_agent_log_capacity; + env->per_agent_log_sum = (Log *) realloc(env->per_agent_log_sum, needed * sizeof(Log)); + env->per_agent_log_count = (int *) realloc(env->per_agent_log_count, needed * sizeof(int)); + memset(&env->per_agent_log_sum[old_cap], 0, (needed - old_cap) * sizeof(Log)); + memset(&env->per_agent_log_count[old_cap], 0, (needed - old_cap) * sizeof(int)); + env->per_agent_log_capacity = needed; +} + +// Drain per-agent accumulators into env->log so the shared vec_log can do +// its usual sum-across-envs / divide-by-aggregate.n step. Each contributing +// agent's window-mean (sum_of_completed_episode_values / count) becomes one +// term in env->log; env->log.n is the count of agents in this env that have +// at least one completed episode this window. After vec_log aggregates and +// divides, every metric is a population-mean: one weight per agent regardless +// of how many episodes that agent completed. +static void prepare_log(Drive *env) { + memset(&env->log, 0, sizeof(Log)); + if (env->per_agent_log_capacity == 0) { + return; + } + int num_keys = sizeof(Log) / sizeof(float); + int num_with_data = 0; + for (int a = 0; a < env->per_agent_log_capacity; a++) { + int c = env->per_agent_log_count[a]; + if (c == 0) { + continue; + } + float inv_c = 1.0f / (float) c; + float *slot = (float *) &env->per_agent_log_sum[a]; + float *dst = (float *) &env->log; + for (int j = 0; j < num_keys; j++) { + dst[j] += slot[j] * inv_c; + } + num_with_data++; + } + env->log.n = (float) num_with_data; + memset(env->per_agent_log_sum, 0, env->per_agent_log_capacity * sizeof(Log)); + memset(env->per_agent_log_count, 0, env->per_agent_log_capacity * sizeof(int)); +} + static void add_log(Drive *env) { int safe_timestep = (env->timestep > 0) ? env->timestep : 1; + ensure_per_agent_log_capacity(env, env->active_agent_count); for (int i = 0; i < env->active_agent_count; i++) { Agent *agent = &env->agents[env->active_agent_indices[i]]; + Log *slot = &env->per_agent_log_sum[i]; float episode_duration_s = env->logs[i].episode_length * env->dt; float reference_progress_distance = PUFFER_PROGRESS_REFERENCE_SPEED * episode_duration_s; reference_progress_distance = fmaxf(reference_progress_distance, 1.0f); env->logs[i].progress_ratio = agent->distance_since_spawn / reference_progress_distance; int offroad = env->logs[i].offroad_rate; - env->log.offroad_rate += offroad; + slot->offroad_rate += offroad; int collided = env->logs[i].collision_rate; - env->log.collision_rate += collided; + slot->collision_rate += collided; int red_light_violations = env->logs[i].red_light_violation_rate; - env->log.red_light_violation_rate += red_light_violations; + slot->red_light_violation_rate += red_light_violations; int total_infractions = (offroad || collided || red_light_violations) ? 1 : 0; float avg_speed_per_agent = env->logs[i].avg_speed_per_agent; - env->log.avg_speed_per_agent += avg_speed_per_agent / safe_timestep; + slot->avg_speed_per_agent += avg_speed_per_agent / safe_timestep; int num_waypoints_reached = env->logs[i].num_waypoints_reached; - env->log.num_waypoints_reached += num_waypoints_reached; + slot->num_waypoints_reached += num_waypoints_reached; int num_goals_reached = env->logs[i].num_goals_reached; - env->log.num_goals_reached += num_goals_reached; + slot->num_goals_reached += num_goals_reached; // Score: 1 per agent that reached all 3 target waypoints without // being removed/stopped. Was hardcoded to >=4, unreachable given // num_target_waypoints=3 in the ini, so score was always 0. if (num_goals_reached >= 3 && !agent->removed && !agent->stopped) { - env->log.score += 1.0f; + slot->score += 1.0f; } if (!offroad && !collided && !red_light_violations && num_waypoints_reached < 1) { - env->log.dnf_rate += 1.0f; + slot->dnf_rate += 1.0f; } - env->log.total_distance_travelled += agent->distance_since_spawn; + slot->total_distance_travelled += agent->distance_since_spawn; if (total_infractions > 0) { - env->log.total_infractions += 1.0f; + slot->total_infractions += 1.0f; } float displacement_error = env->logs[i].avg_displacement_error; - env->log.avg_displacement_error += displacement_error; - env->log.episode_length += env->logs[i].episode_length; - env->log.episode_return += env->logs[i].episode_return; + slot->avg_displacement_error += displacement_error; + slot->episode_length += env->logs[i].episode_length; + slot->episode_return += env->logs[i].episode_return; // Per-component reward sums (mirrors compute_rewards' env->rewards[i]+= sites). - env->log.reward_collision += env->logs[i].reward_collision; - env->log.reward_offroad += env->logs[i].reward_offroad; - env->log.reward_red_light += env->logs[i].reward_red_light; - env->log.reward_goal += env->logs[i].reward_goal; - env->log.reward_lane_align += env->logs[i].reward_lane_align; - env->log.reward_lane_center += env->logs[i].reward_lane_center; - env->log.reward_comfort += env->logs[i].reward_comfort; - env->log.reward_velocity += env->logs[i].reward_velocity; - env->log.reward_timestep += env->logs[i].reward_timestep; - env->log.reward_reverse += env->logs[i].reward_reverse; - env->log.reward_overspeed += env->logs[i].reward_overspeed; - env->log.reward_ade += env->logs[i].reward_ade; + slot->reward_collision += env->logs[i].reward_collision; + slot->reward_offroad += env->logs[i].reward_offroad; + slot->reward_red_light += env->logs[i].reward_red_light; + slot->reward_goal += env->logs[i].reward_goal; + slot->reward_lane_align += env->logs[i].reward_lane_align; + slot->reward_lane_center += env->logs[i].reward_lane_center; + slot->reward_comfort += env->logs[i].reward_comfort; + slot->reward_velocity += env->logs[i].reward_velocity; + slot->reward_timestep += env->logs[i].reward_timestep; + slot->reward_reverse += env->logs[i].reward_reverse; + slot->reward_overspeed += env->logs[i].reward_overspeed; + slot->reward_ade += env->logs[i].reward_ade; // Comfort and velocity metrics (normalized per timestep) - env->log.comfort_violation_count += env->logs[i].comfort_violation_count / safe_timestep; - env->log.velocity_progress_sum += env->logs[i].velocity_progress_sum / safe_timestep; + slot->comfort_violation_count += env->logs[i].comfort_violation_count / safe_timestep; + slot->velocity_progress_sum += env->logs[i].velocity_progress_sum / safe_timestep; // Lane metrics (normalized per timestep for average per episode) - env->log.lane_center_rate += env->logs[i].lane_center_rate / safe_timestep; - env->log.lane_heading_aligned_rate += env->logs[i].lane_heading_aligned_rate / safe_timestep; + slot->lane_center_rate += env->logs[i].lane_center_rate / safe_timestep; + slot->lane_heading_aligned_rate += env->logs[i].lane_heading_aligned_rate / safe_timestep; if (env->compute_eval_metrics) { env->logs[i].progress_ratio = agent->distance_since_spawn / reference_progress_distance; env->logs[i].comfort_score = calculate_duration_scaled_violation_score( @@ -2637,35 +2696,39 @@ static void add_log(Drive *env) { env->logs[i].episode_length, env->dt); calculate_puffer_score(&env->logs[i], env->logs[i].episode_length, env->dt); - env->log.at_fault_collision_rate += env->logs[i].at_fault_collision_rate; - env->log.ttc_within_bound_rate += env->logs[i].ttc_within_bound_rate; - env->log.wrong_way_distance += env->logs[i].wrong_way_distance; - env->log.speed_violation_sum += env->logs[i].speed_violation_sum; - env->log.progress_ratio += env->logs[i].progress_ratio; - env->log.comfort_score += env->logs[i].comfort_score; - env->log.ttc_violations += env->logs[i].ttc_violations; - env->log.ttc_samples += env->logs[i].ttc_samples; - env->log.multi_lane_time += env->logs[i].multi_lane_time; - env->log.multi_lane_score += env->logs[i].multi_lane_score; + slot->at_fault_collision_rate += env->logs[i].at_fault_collision_rate; + slot->ttc_within_bound_rate += env->logs[i].ttc_within_bound_rate; + slot->wrong_way_distance += env->logs[i].wrong_way_distance; + slot->speed_violation_sum += env->logs[i].speed_violation_sum; + slot->progress_ratio += env->logs[i].progress_ratio; + slot->comfort_score += env->logs[i].comfort_score; + slot->ttc_violations += env->logs[i].ttc_violations; + slot->ttc_samples += env->logs[i].ttc_samples; + slot->multi_lane_time += env->logs[i].multi_lane_time; + slot->multi_lane_score += env->logs[i].multi_lane_score; float wrong_dist = env->logs[i].wrong_way_distance; float direction_score = (wrong_dist <= 2.0f) ? 1.0f : (wrong_dist <= 6.0f) ? 0.5f : 0.0f; - env->log.driving_direction_score += direction_score; + slot->driving_direction_score += direction_score; float T = safe_timestep * env->dt; float speed_compliance = fmaxf(0.0f, 1.0f - env->logs[i].speed_violation_sum / fmaxf(T, 1e-3f)); - env->log.speed_limit_compliance += speed_compliance; + slot->speed_limit_compliance += speed_compliance; float making_progress = (env->logs[i].progress_ratio > 0.2f) ? 1.0f : 0.0f; - env->log.making_progress_rate += making_progress; - env->log.puffer_score += env->logs[i].puffer_score; + slot->making_progress_rate += making_progress; + slot->puffer_score += env->logs[i].puffer_score; } - env->log.n += 1; + // Env-level composition counts: fold once per agent's per-episode + // contribution so the per-agent mean recovers the env's value + // (constant within the scenario), and cross-agent averaging in + // vec_log gives a population-weighted mean. + slot->expert_static_car_count += env->expert_static_agent_count; + slot->static_car_count += env->static_agent_count; + + env->per_agent_log_count[i] += 1; } - // Log composition counts per agent so vec_log averaging recovers the per-env value - env->log.expert_static_car_count += env->expert_static_agent_count; - env->log.static_car_count += env->static_agent_count; if (env->emit_completed_episodes && env->completed_episodes_count < COMPLETED_EPISODE_QUEUE_CAPACITY) { // Snapshot per-episode aggregates from env->logs[] before c_reset @@ -3520,6 +3583,9 @@ void init(Drive *env) { env->human_agent_idx = 0; env->timestep = 0; env->shared_map = NULL; + env->per_agent_log_sum = NULL; + env->per_agent_log_count = NULL; + env->per_agent_log_capacity = 0; struct SharedMapData *shared = env->use_map_cache ? map_cache_lookup(env->map_name) : NULL; if (shared != NULL) { @@ -3683,6 +3749,8 @@ void c_close(Drive *env) { free(env->tracks_to_predict); free(env->map_name); free(env->ini_file); + free(env->per_agent_log_sum); + free(env->per_agent_log_count); } static int compute_observation_size(Drive *env) { diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 577d42252e..2244d573d7 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -474,6 +474,11 @@ def step(self, actions): self._reset_compact_replay_buffer(env_slot, scenarios_after[env_slot]) info.append(tagged) if self.tick % self.report_interval == 0: + # Drain per-agent accumulators into each env->log so the shared + # vec_log produces a per-agent population mean (every agent slot + # contributes one term, regardless of how many episodes it + # completed within the window). + binding.vec_prepare_log(self.c_envs) log = binding.vec_log(self.c_envs, self.num_agents) if log: info.append(log) diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index 9c6cb56f6b..8d70ff7bbe 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -705,7 +705,7 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { // Iterates over logs one float at a time. Will break // horribly if Log has non-float data. PyObject *num_agents_arg = PyTuple_GetItem(args, 1); - float num_agents = (float) PyLong_AsLong(num_agents_arg); + (void) num_agents_arg; // Kept for caller-API compatibility; gate is now aggregate.n < 1. int num_keys = sizeof(Log) / sizeof(float); Env *env = vec->envs[0]; @@ -758,16 +758,15 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { PyObject *dict = PyDict_New(); - // Only log if we have at least num_agents worth of data - Env *env = vec->envs[0]; - if (env->eval_mode) { - if (aggregate.n == 0) { - return dict; - } - } else { - if (aggregate.n < num_agents) { - return dict; - } + // Emit whenever any env has data. With Drive's per-agent prepare_log + // path, aggregate.n is the cross-env count of agents that contributed + // a window-mean (not completed-episode count), so the meaningful gate + // is "at least one contribution." Other ocean envs that don't run + // prepare_log retain completed-episode-count semantics and now emit + // smaller batches more often — the Python-side mean_and_log + // (pufferl.py) re-averages across emissions in its rate-limit window. + if (aggregate.n < 1) { + return dict; } // Got enough data. Reset logs and return metrics From dda8ccc3aa330bc192a569c9b9287efb5b802ecc Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 13:51:26 -0400 Subject: [PATCH 02/11] drive/binding: drop forward-decl wiring comments Co-Authored-By: Claude Opus 4.7 (1M context) --- pufferlib/ocean/drive/binding.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 7b90b05a32..36359b7fc8 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -25,13 +25,7 @@ static PyObject *map_cache_live_count_py( return PyLong_FromLong(live); } -// Forward declare so vec_prepare_log_py can call it before env_binding.h's -// unpack_vecenv definition is visible; the definition lives in drive.h. static void prepare_log(Drive *env); - -// Drain each env's per-agent log buffers into env->log so the subsequent -// vec_log call sees a per-agent-mean-weighted aggregate. Must be called -// once per intended vec_log emission. static PyObject *vec_prepare_log_py(PyObject *self __attribute__((unused)), PyObject *args); // clang-format off From ff6289930ef94481fe213e352111cfe431f95efe Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 13:54:48 -0400 Subject: [PATCH 03/11] env_binding: focus vec_log gate comment on the divide-by-zero invariant Co-Authored-By: Claude Opus 4.7 (1M context) --- pufferlib/ocean/env_binding.h | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index 8d70ff7bbe..3b9c9abe80 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -758,13 +758,9 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { PyObject *dict = PyDict_New(); - // Emit whenever any env has data. With Drive's per-agent prepare_log - // path, aggregate.n is the cross-env count of agents that contributed - // a window-mean (not completed-episode count), so the meaningful gate - // is "at least one contribution." Other ocean envs that don't run - // prepare_log retain completed-episode-count semantics and now emit - // smaller batches more often — the Python-side mean_and_log - // (pufferl.py) re-averages across emissions in its rate-limit window. + // aggregate.n is the divisor for every field below; skip the emission + // when no env has contributed any data (n=0 would divide by zero and + // the dict would carry no signal anyway). if (aggregate.n < 1) { return dict; } From e952bcba7c16347259568622c60540158c86ce62 Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 14:03:16 -0400 Subject: [PATCH 04/11] env_binding: drop unused num_agents extraction in vec_log Co-Authored-By: Claude Opus 4.7 (1M context) --- pufferlib/ocean/env_binding.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index 3b9c9abe80..ee04fd1112 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -704,8 +704,6 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { // Iterates over logs one float at a time. Will break // horribly if Log has non-float data. - PyObject *num_agents_arg = PyTuple_GetItem(args, 1); - (void) num_agents_arg; // Kept for caller-API compatibility; gate is now aggregate.n < 1. int num_keys = sizeof(Log) / sizeof(float); Env *env = vec->envs[0]; From 0d02e3a41f9ab0b8fe5245de4dc4db9e849006de Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 14:23:56 -0400 Subject: [PATCH 05/11] drive: switch per-agent log aggregation from window-mean to EMA Replaces the window-mean per-agent state (sum + count, reset on every emit) with a per-agent EMA that persists across emissions. Each agent slot tracks slot = alpha*slot + (1 - alpha)*new_episode_log on every completion, with the first completion seeding the slot directly. prepare_log then emits the population mean across agents flagged has_data, never resetting the EMAs. Removes the residual completion-rate bias that the previous PR design still carried: fast-completers no longer dominate the multi-emission average that pufferl.py's mean_and_log produces, because every agent now contributes a single smoothed value to every emission rather than appearing in some windows and not others. New env config knob log_ema_alpha (default 0.95, half-life ~14 episodes per agent) controls smoothing. alpha=0 collapses to "most recent episode only", alpha->1 freezes the slot at its first observation. Files: - drive.h: rename per_agent_log_sum to per_agent_log_ema, add per_agent_has_data flag, add log_ema_alpha field. add_log builds a fresh episode_log snapshot, then seeds the slot or EMA-blends. prepare_log skips no-data slots and sums (no division) into env->log; vec_log divides by env->log.n = num_with_data downstream. - binding.c: unpack log_ema_alpha kwarg. - drive.py: __init__ takes log_ema_alpha=0.95, plumbs through env_kwargs. - drive.ini: log_ema_alpha = 0.95 under [env]. Co-Authored-By: Claude Opus 4.7 (1M context) --- pufferlib/config/ocean/drive.ini | 8 ++ pufferlib/ocean/drive/binding.c | 1 + pufferlib/ocean/drive/drive.h | 183 +++++++++++++++++-------------- pufferlib/ocean/drive/drive.py | 3 + 4 files changed, 113 insertions(+), 82 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index 9652099f38..18d174aaa4 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -104,6 +104,14 @@ reward_timestep = 0.000025 reward_overspeed = 0.05 reward_ade = 0.0 +; --- Logging --- +; EMA coefficient for per-agent log aggregation. Each agent slot tracks a +; smoothed estimate of its completed-episode metrics; on every completion +; slot = alpha*slot + (1 - alpha)*new_episode. alpha=0 collapses to "most +; recent episode only", alpha->1 freezes the slot at its first observation. +; alpha=0.95 has a half-life of ~14 episodes per agent. +log_ema_alpha = 0.95 + ; --- Map --- ; Path to map used for training map_dir = "pufferlib/resources/drive/binaries/carla" diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 36359b7fc8..5551eebdc1 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -1993,6 +1993,7 @@ static int my_init(Env *env, PyObject *args, PyObject *kwargs) { env->reward_randomization = (bool) unpack(kwargs, "reward_randomization"); env->compute_eval_metrics = (bool) unpack(kwargs, "compute_eval_metrics"); env->eval_mode = (int) unpack(kwargs, "eval_mode"); + env->log_ema_alpha = (float) unpack(kwargs, "log_ema_alpha"); env->obs_norm_goal_offset_m = (float) unpack(kwargs, "obs_norm_goal_offset_m"); env->obs_norm_xy_offset_m = (float) unpack(kwargs, "obs_norm_xy_offset_m"); env->obs_norm_veh_length_m = (float) unpack(kwargs, "obs_norm_veh_length_m"); diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 174dfa436e..75e808c833 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -455,15 +455,19 @@ struct Drive { int completed_episodes_count; CompletedEpisodeSummary completed_episodes[COMPLETED_EPISODE_QUEUE_CAPACITY]; - // Per-agent window accumulators for vec_log: each slot holds the running - // sum and count of completed-episode Log values for one agent. Drained - // and zeroed in vec_prepare_log() each time a vec_log emission fires, so - // every agent contributes equal weight to the cross-agent mean regardless - // of how many episodes it completed within the window. Sized at - // per_agent_log_capacity = max active-controllable slots seen so far. - Log *per_agent_log_sum; - int *per_agent_log_count; + // Per-agent EMA of completed-episode Log values. Each slot holds one + // agent's smoothed estimate, updated on every termination of that agent's + // episode as slot = alpha*slot + (1 - alpha)*new_episode_log, with the + // very first completion seeding the slot directly. prepare_log emits the + // population mean across slots flagged has_data, so every agent that has + // ever completed an episode contributes equal weight to the cross-agent + // metric regardless of completion frequency. log_ema_alpha is the + // smoothing coefficient (0 = use latest only, 1 = never update). + Log *per_agent_log_ema; + int *per_agent_log_count; // lifetime count of completed episodes per slot + int *per_agent_has_data; // 1 once this slot has been seeded by a first completion int per_agent_log_capacity; + float log_ema_alpha; }; typedef struct { @@ -2584,26 +2588,27 @@ static float calculate_puffer_score(Log *log_agent, float duration_steps, float // Grow-only per-agent log buffer. Active slot count can vary across c_reset // (REPLAY scenarios with different agent populations); we never shrink so -// pending data from a prior window survives. +// EMA state from prior windows survives. static void ensure_per_agent_log_capacity(Drive *env, int needed) { if (env->per_agent_log_capacity >= needed) { return; } int old_cap = env->per_agent_log_capacity; - env->per_agent_log_sum = (Log *) realloc(env->per_agent_log_sum, needed * sizeof(Log)); + env->per_agent_log_ema = (Log *) realloc(env->per_agent_log_ema, needed * sizeof(Log)); env->per_agent_log_count = (int *) realloc(env->per_agent_log_count, needed * sizeof(int)); - memset(&env->per_agent_log_sum[old_cap], 0, (needed - old_cap) * sizeof(Log)); + env->per_agent_has_data = (int *) realloc(env->per_agent_has_data, needed * sizeof(int)); + memset(&env->per_agent_log_ema[old_cap], 0, (needed - old_cap) * sizeof(Log)); memset(&env->per_agent_log_count[old_cap], 0, (needed - old_cap) * sizeof(int)); + memset(&env->per_agent_has_data[old_cap], 0, (needed - old_cap) * sizeof(int)); env->per_agent_log_capacity = needed; } -// Drain per-agent accumulators into env->log so the shared vec_log can do -// its usual sum-across-envs / divide-by-aggregate.n step. Each contributing -// agent's window-mean (sum_of_completed_episode_values / count) becomes one -// term in env->log; env->log.n is the count of agents in this env that have -// at least one completed episode this window. After vec_log aggregates and -// divides, every metric is a population-mean: one weight per agent regardless -// of how many episodes that agent completed. +// Emit the cross-agent population mean: sum each agent's current EMA into +// env->log, divide later in vec_log by env->log.n (count of agents that have +// ever completed at least one episode). Per-agent EMA state is preserved +// across emissions so frequent completers do not dominate the long-run +// signal -- their slot simply tracks the smoothed estimate alongside everyone +// else. static void prepare_log(Drive *env) { memset(&env->log, 0, sizeof(Log)); if (env->per_agent_log_capacity == 0) { @@ -2612,83 +2617,85 @@ static void prepare_log(Drive *env) { int num_keys = sizeof(Log) / sizeof(float); int num_with_data = 0; for (int a = 0; a < env->per_agent_log_capacity; a++) { - int c = env->per_agent_log_count[a]; - if (c == 0) { + if (!env->per_agent_has_data[a]) { continue; } - float inv_c = 1.0f / (float) c; - float *slot = (float *) &env->per_agent_log_sum[a]; + float *slot = (float *) &env->per_agent_log_ema[a]; float *dst = (float *) &env->log; for (int j = 0; j < num_keys; j++) { - dst[j] += slot[j] * inv_c; + dst[j] += slot[j]; } num_with_data++; } env->log.n = (float) num_with_data; - memset(env->per_agent_log_sum, 0, env->per_agent_log_capacity * sizeof(Log)); - memset(env->per_agent_log_count, 0, env->per_agent_log_capacity * sizeof(int)); } static void add_log(Drive *env) { int safe_timestep = (env->timestep > 0) ? env->timestep : 1; ensure_per_agent_log_capacity(env, env->active_agent_count); + float alpha = env->log_ema_alpha; + float one_minus_alpha = 1.0f - alpha; + int num_log_keys = sizeof(Log) / sizeof(float); for (int i = 0; i < env->active_agent_count; i++) { Agent *agent = &env->agents[env->active_agent_indices[i]]; - Log *slot = &env->per_agent_log_sum[i]; float episode_duration_s = env->logs[i].episode_length * env->dt; float reference_progress_distance = PUFFER_PROGRESS_REFERENCE_SPEED * episode_duration_s; reference_progress_distance = fmaxf(reference_progress_distance, 1.0f); env->logs[i].progress_ratio = agent->distance_since_spawn / reference_progress_distance; + // Build a fresh per-episode Log snapshot. All field writes below are + // direct assignments (no accumulation): the snapshot represents this + // single completed episode in canonical units. We then either seed + // the agent's slot (first completion) or EMA-blend into it. + Log episode_log = {0}; + int offroad = env->logs[i].offroad_rate; - slot->offroad_rate += offroad; + episode_log.offroad_rate = offroad; int collided = env->logs[i].collision_rate; - slot->collision_rate += collided; + episode_log.collision_rate = collided; int red_light_violations = env->logs[i].red_light_violation_rate; - slot->red_light_violation_rate += red_light_violations; + episode_log.red_light_violation_rate = red_light_violations; int total_infractions = (offroad || collided || red_light_violations) ? 1 : 0; - float avg_speed_per_agent = env->logs[i].avg_speed_per_agent; - slot->avg_speed_per_agent += avg_speed_per_agent / safe_timestep; + episode_log.avg_speed_per_agent = env->logs[i].avg_speed_per_agent / safe_timestep; int num_waypoints_reached = env->logs[i].num_waypoints_reached; - slot->num_waypoints_reached += num_waypoints_reached; + episode_log.num_waypoints_reached = num_waypoints_reached; int num_goals_reached = env->logs[i].num_goals_reached; - slot->num_goals_reached += num_goals_reached; - // Score: 1 per agent that reached all 3 target waypoints without - // being removed/stopped. Was hardcoded to >=4, unreachable given - // num_target_waypoints=3 in the ini, so score was always 0. + episode_log.num_goals_reached = num_goals_reached; + // Score: 1 if the agent reached all 3 target waypoints without + // being removed/stopped, else 0. Was hardcoded to >=4, unreachable + // given num_target_waypoints=3 in the ini, so score was always 0. if (num_goals_reached >= 3 && !agent->removed && !agent->stopped) { - slot->score += 1.0f; + episode_log.score = 1.0f; } if (!offroad && !collided && !red_light_violations && num_waypoints_reached < 1) { - slot->dnf_rate += 1.0f; + episode_log.dnf_rate = 1.0f; } - slot->total_distance_travelled += agent->distance_since_spawn; + episode_log.total_distance_travelled = agent->distance_since_spawn; if (total_infractions > 0) { - slot->total_infractions += 1.0f; + episode_log.total_infractions = 1.0f; } - float displacement_error = env->logs[i].avg_displacement_error; - slot->avg_displacement_error += displacement_error; - slot->episode_length += env->logs[i].episode_length; - slot->episode_return += env->logs[i].episode_return; + episode_log.avg_displacement_error = env->logs[i].avg_displacement_error; + episode_log.episode_length = env->logs[i].episode_length; + episode_log.episode_return = env->logs[i].episode_return; // Per-component reward sums (mirrors compute_rewards' env->rewards[i]+= sites). - slot->reward_collision += env->logs[i].reward_collision; - slot->reward_offroad += env->logs[i].reward_offroad; - slot->reward_red_light += env->logs[i].reward_red_light; - slot->reward_goal += env->logs[i].reward_goal; - slot->reward_lane_align += env->logs[i].reward_lane_align; - slot->reward_lane_center += env->logs[i].reward_lane_center; - slot->reward_comfort += env->logs[i].reward_comfort; - slot->reward_velocity += env->logs[i].reward_velocity; - slot->reward_timestep += env->logs[i].reward_timestep; - slot->reward_reverse += env->logs[i].reward_reverse; - slot->reward_overspeed += env->logs[i].reward_overspeed; - slot->reward_ade += env->logs[i].reward_ade; + episode_log.reward_collision = env->logs[i].reward_collision; + episode_log.reward_offroad = env->logs[i].reward_offroad; + episode_log.reward_red_light = env->logs[i].reward_red_light; + episode_log.reward_goal = env->logs[i].reward_goal; + episode_log.reward_lane_align = env->logs[i].reward_lane_align; + episode_log.reward_lane_center = env->logs[i].reward_lane_center; + episode_log.reward_comfort = env->logs[i].reward_comfort; + episode_log.reward_velocity = env->logs[i].reward_velocity; + episode_log.reward_timestep = env->logs[i].reward_timestep; + episode_log.reward_reverse = env->logs[i].reward_reverse; + episode_log.reward_overspeed = env->logs[i].reward_overspeed; + episode_log.reward_ade = env->logs[i].reward_ade; // Comfort and velocity metrics (normalized per timestep) - slot->comfort_violation_count += env->logs[i].comfort_violation_count / safe_timestep; - slot->velocity_progress_sum += env->logs[i].velocity_progress_sum / safe_timestep; + episode_log.comfort_violation_count = env->logs[i].comfort_violation_count / safe_timestep; + episode_log.velocity_progress_sum = env->logs[i].velocity_progress_sum / safe_timestep; // Lane metrics (normalized per timestep for average per episode) - slot->lane_center_rate += env->logs[i].lane_center_rate / safe_timestep; - slot->lane_heading_aligned_rate += env->logs[i].lane_heading_aligned_rate / safe_timestep; + episode_log.lane_center_rate = env->logs[i].lane_center_rate / safe_timestep; + episode_log.lane_heading_aligned_rate = env->logs[i].lane_heading_aligned_rate / safe_timestep; if (env->compute_eval_metrics) { env->logs[i].progress_ratio = agent->distance_since_spawn / reference_progress_distance; env->logs[i].comfort_score = calculate_duration_scaled_violation_score( @@ -2696,37 +2703,47 @@ static void add_log(Drive *env) { env->logs[i].episode_length, env->dt); calculate_puffer_score(&env->logs[i], env->logs[i].episode_length, env->dt); - slot->at_fault_collision_rate += env->logs[i].at_fault_collision_rate; - slot->ttc_within_bound_rate += env->logs[i].ttc_within_bound_rate; - slot->wrong_way_distance += env->logs[i].wrong_way_distance; - slot->speed_violation_sum += env->logs[i].speed_violation_sum; - slot->progress_ratio += env->logs[i].progress_ratio; - slot->comfort_score += env->logs[i].comfort_score; - slot->ttc_violations += env->logs[i].ttc_violations; - slot->ttc_samples += env->logs[i].ttc_samples; - slot->multi_lane_time += env->logs[i].multi_lane_time; - slot->multi_lane_score += env->logs[i].multi_lane_score; + episode_log.at_fault_collision_rate = env->logs[i].at_fault_collision_rate; + episode_log.ttc_within_bound_rate = env->logs[i].ttc_within_bound_rate; + episode_log.wrong_way_distance = env->logs[i].wrong_way_distance; + episode_log.speed_violation_sum = env->logs[i].speed_violation_sum; + episode_log.progress_ratio = env->logs[i].progress_ratio; + episode_log.comfort_score = env->logs[i].comfort_score; + episode_log.ttc_violations = env->logs[i].ttc_violations; + episode_log.ttc_samples = env->logs[i].ttc_samples; + episode_log.multi_lane_time = env->logs[i].multi_lane_time; + episode_log.multi_lane_score = env->logs[i].multi_lane_score; float wrong_dist = env->logs[i].wrong_way_distance; float direction_score = (wrong_dist <= 2.0f) ? 1.0f : (wrong_dist <= 6.0f) ? 0.5f : 0.0f; - slot->driving_direction_score += direction_score; + episode_log.driving_direction_score = direction_score; float T = safe_timestep * env->dt; float speed_compliance = fmaxf(0.0f, 1.0f - env->logs[i].speed_violation_sum / fmaxf(T, 1e-3f)); - slot->speed_limit_compliance += speed_compliance; + episode_log.speed_limit_compliance = speed_compliance; float making_progress = (env->logs[i].progress_ratio > 0.2f) ? 1.0f : 0.0f; - slot->making_progress_rate += making_progress; - slot->puffer_score += env->logs[i].puffer_score; + episode_log.making_progress_rate = making_progress; + episode_log.puffer_score = env->logs[i].puffer_score; } - // Env-level composition counts: fold once per agent's per-episode - // contribution so the per-agent mean recovers the env's value - // (constant within the scenario), and cross-agent averaging in - // vec_log gives a population-weighted mean. - slot->expert_static_car_count += env->expert_static_agent_count; - slot->static_car_count += env->static_agent_count; + // Env-level composition counts: snapshot per agent so the per-agent + // EMA tracks the env's value (constant within a scenario) and the + // cross-agent mean gives a population-weighted view. + episode_log.expert_static_car_count = env->expert_static_agent_count; + episode_log.static_car_count = env->static_agent_count; + Log *slot = &env->per_agent_log_ema[i]; + if (!env->per_agent_has_data[i]) { + *slot = episode_log; + env->per_agent_has_data[i] = 1; + } else { + float *slot_f = (float *) slot; + float *ep_f = (float *) &episode_log; + for (int j = 0; j < num_log_keys; j++) { + slot_f[j] = alpha * slot_f[j] + one_minus_alpha * ep_f[j]; + } + } env->per_agent_log_count[i] += 1; } @@ -3583,8 +3600,9 @@ void init(Drive *env) { env->human_agent_idx = 0; env->timestep = 0; env->shared_map = NULL; - env->per_agent_log_sum = NULL; + env->per_agent_log_ema = NULL; env->per_agent_log_count = NULL; + env->per_agent_has_data = NULL; env->per_agent_log_capacity = 0; struct SharedMapData *shared = env->use_map_cache ? map_cache_lookup(env->map_name) : NULL; @@ -3749,8 +3767,9 @@ void c_close(Drive *env) { free(env->tracks_to_predict); free(env->map_name); free(env->ini_file); - free(env->per_agent_log_sum); + free(env->per_agent_log_ema); free(env->per_agent_log_count); + free(env->per_agent_has_data); } static int compute_observation_size(Drive *env) { diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 2244d573d7..9f15dd1b1d 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -74,6 +74,7 @@ def __init__( init_step=0, eval_mode=0, num_eval_scenarios=16, + log_ema_alpha=0.95, init_mode="create_all_valid", control_mode="control_vehicles", map_dir=None, @@ -167,6 +168,7 @@ def __init__( raise ValueError(f"dynamics_model must be 'classic' or 'jerk'. Got: {dynamics_model}") self.eval_mode = eval_mode self.num_eval_scenarios = num_eval_scenarios + self.log_ema_alpha = log_ema_alpha self.termination_mode = termination_mode self.inactive_agent_threshold = inactive_agent_threshold self.rng = np.random.default_rng(seed) @@ -415,6 +417,7 @@ def _env_init_kwargs(self, map_file, max_agents): "reward_randomization": self.reward_randomization, "compute_eval_metrics": self.compute_eval_metrics, "eval_mode": self.eval_mode, + "log_ema_alpha": self.log_ema_alpha, "obs_norm_goal_offset_m": self.obs_norm_goal_offset_m, "obs_norm_xy_offset_m": self.obs_norm_xy_offset_m, "obs_norm_veh_length_m": self.obs_norm_veh_length_m, From 16c866d4caed48b87714b94c6dec9d8255fcf176 Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 14:31:39 -0400 Subject: [PATCH 06/11] tests: cover the per-agent EMA logging contract T1: gate skips emissions until the first agent completes an episode; first log dict appears exactly when the scenario truncates. T2: once every agent has completed at least one episode, dict["n"] equals num_agents (population fully represented in the cross-agent mean). T3: prepare_log preserves per-agent EMA state across emissions -- two consecutive emissions with no intervening completions produce identical metric values, locking in that the EMA buffers are not reset on emit. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_drive_per_agent_logging.py | 122 ++++++++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 tests/test_drive_per_agent_logging.py diff --git a/tests/test_drive_per_agent_logging.py b/tests/test_drive_per_agent_logging.py new file mode 100644 index 0000000000..8c3d4a9309 --- /dev/null +++ b/tests/test_drive_per_agent_logging.py @@ -0,0 +1,122 @@ +"""Behavioral tests for the per-agent EMA logging path in Drive's vec_log. + +Covered: +- T1 (gate): vec_log emits nothing while no agent has completed an episode; + the first emission appears exactly when the scenario truncates. +- T2 (steady-state n): after enough steps for every agent to complete at + least one episode, dict["n"] equals num_agents. +- T3 (state persistence): two consecutive emissions with no new completions + between them produce identical metric values, proving prepare_log no + longer resets the per-agent EMAs. +""" + +from pathlib import Path + +import numpy as np +import pytest + +from pufferlib.ocean.drive.drive import Drive + +MAP_DIR = Path(__file__).resolve().parents[1] / "pufferlib/resources/drive/binaries/carla" + +NUM_AGENTS = 32 +SCENARIO_LENGTH = 5 + + +def _log_dicts(info): + """Filter info for the vec_log aggregate dict (carries both n and episode_return).""" + return [d for d in info if isinstance(d, dict) and "n" in d and "episode_return" in d] + + +def _make_env(): + if not MAP_DIR.is_dir() or not any(MAP_DIR.glob("*.bin")): + pytest.skip(f"Drive map binaries not available at {MAP_DIR}") + return Drive( + num_agents=NUM_AGENTS, + num_maps=1, + min_agents_per_env=1, + max_agents_per_env=8, + scenario_length=SCENARIO_LENGTH, + report_interval=1, + map_dir=str(MAP_DIR), + log_ema_alpha=0.95, + ) + + +def test_gate_skips_emissions_until_first_completion(): + """T1: vec_log returns no log dict until at least one agent has completed an + episode. With synced agents under null actions, completions first happen + when timestep reaches scenario_length.""" + env = _make_env() + env.reset(seed=0) + + # Steps 1..(L-1): no agent has truncated yet, gate sees aggregate.n == 0. + for step_idx in range(SCENARIO_LENGTH - 1): + _, _, _, _, info = env.step(np.zeros_like(env.actions)) + assert not _log_dicts(info), ( + f"Unexpected emission at step {step_idx + 1} (timestep={step_idx + 1}); " + "gate should skip before any agent has completed." + ) + + # Step L: all agents truncate at timestep == scenario_length. + _, _, _, _, info = env.step(np.zeros_like(env.actions)) + logs = _log_dicts(info) + assert len(logs) == 1, f"Expected exactly one log dict at scenario end, got {len(logs)}" + assert logs[0]["n"] >= 1, f"Expected n>=1 at first emission, got n={logs[0]['n']}" + + env.close() + + +def test_steady_state_n_equals_num_agents(): + """T2: once every agent has completed at least one episode, the emitted + n is the full population. With synced agents this is true from the first + emission onward.""" + env = _make_env() + env.reset(seed=0) + + # Run multiple scenarios so every agent has contributed. + last_log = None + for _ in range(4 * SCENARIO_LENGTH): + _, _, _, _, info = env.step(np.zeros_like(env.actions)) + logs = _log_dicts(info) + if logs: + last_log = logs[-1] + + assert last_log is not None, "Expected at least one emission across 4 scenarios" + assert last_log["n"] == NUM_AGENTS, ( + f"Expected steady-state n={NUM_AGENTS}, got n={last_log['n']} " + "(some agents missing from the population mean)" + ) + + env.close() + + +def test_emissions_identical_when_no_new_completions(): + """T3: prepare_log preserves per-agent EMA state across emissions. Two + consecutive emissions with no intervening completions must produce + bit-for-bit identical metric values.""" + env = _make_env() + env.reset(seed=0) + + # Reach the first completion at timestep == scenario_length. + for _ in range(SCENARIO_LENGTH): + env.step(np.zeros_like(env.actions)) + + # Next two emissions sit between completions (no agent truncates between + # timestep=1 and timestep=scenario_length-1 of the next scenario). + _, _, _, _, info_a = env.step(np.zeros_like(env.actions)) + _, _, _, _, info_b = env.step(np.zeros_like(env.actions)) + + log_a = _log_dicts(info_a) + log_b = _log_dicts(info_b) + assert log_a and log_b, "Both consecutive steps should emit" + log_a, log_b = log_a[-1], log_b[-1] + + assert log_a["n"] == log_b["n"], f"n changed without new completions: {log_a['n']} vs {log_b['n']}" + for key in ("episode_return", "episode_length", "collision_rate", "offroad_rate"): + assert log_a[key] == pytest.approx(log_b[key], rel=0, abs=1e-6), ( + f"{key} changed without new completions: {log_a[key]} vs {log_b[key]} " + "(prepare_log may be resetting per-agent state)" + ) + + env.close() From 36a1dffe4856789a5b7d051fa89ac86cfa896a12 Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 14:38:41 -0400 Subject: [PATCH 07/11] drive: allocate per-agent log buffers once at init, drop dead grow path active_agent_count is set once by set_active_agents (called only from init() and never from c_reset), so the grow-only ensure_per_agent_log_capacity helper only ever did real work on its first call and the comment justifying it as defense against varying populations was inaccurate. Allocate the three per-agent arrays in init() right after set_active_agents and drop the helper, the per-step ensure call in add_log, and the now-unused per_agent_log_capacity field. prepare_log iterates active_agent_count directly. Also trim the struct-field, prepare_log, episode_log-snapshot, and env-composition comments that were either describing wiring history or referencing downstream consumers. Co-Authored-By: Claude Opus 4.7 (1M context) --- pufferlib/ocean/drive/drive.h | 59 +++++++---------------------------- 1 file changed, 12 insertions(+), 47 deletions(-) diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 75e808c833..383e410110 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -455,18 +455,11 @@ struct Drive { int completed_episodes_count; CompletedEpisodeSummary completed_episodes[COMPLETED_EPISODE_QUEUE_CAPACITY]; - // Per-agent EMA of completed-episode Log values. Each slot holds one - // agent's smoothed estimate, updated on every termination of that agent's - // episode as slot = alpha*slot + (1 - alpha)*new_episode_log, with the - // very first completion seeding the slot directly. prepare_log emits the - // population mean across slots flagged has_data, so every agent that has - // ever completed an episode contributes equal weight to the cross-agent - // metric regardless of completion frequency. log_ema_alpha is the - // smoothing coefficient (0 = use latest only, 1 = never update). + // Per-agent EMA state, one slot per active controllable agent. Allocated + // in init() once active_agent_count is known. Log *per_agent_log_ema; - int *per_agent_log_count; // lifetime count of completed episodes per slot - int *per_agent_has_data; // 1 once this slot has been seeded by a first completion - int per_agent_log_capacity; + int *per_agent_log_count; + int *per_agent_has_data; float log_ema_alpha; }; @@ -2586,37 +2579,14 @@ static float calculate_puffer_score(Log *log_agent, float duration_steps, float return log_agent->puffer_score; } -// Grow-only per-agent log buffer. Active slot count can vary across c_reset -// (REPLAY scenarios with different agent populations); we never shrink so -// EMA state from prior windows survives. -static void ensure_per_agent_log_capacity(Drive *env, int needed) { - if (env->per_agent_log_capacity >= needed) { - return; - } - int old_cap = env->per_agent_log_capacity; - env->per_agent_log_ema = (Log *) realloc(env->per_agent_log_ema, needed * sizeof(Log)); - env->per_agent_log_count = (int *) realloc(env->per_agent_log_count, needed * sizeof(int)); - env->per_agent_has_data = (int *) realloc(env->per_agent_has_data, needed * sizeof(int)); - memset(&env->per_agent_log_ema[old_cap], 0, (needed - old_cap) * sizeof(Log)); - memset(&env->per_agent_log_count[old_cap], 0, (needed - old_cap) * sizeof(int)); - memset(&env->per_agent_has_data[old_cap], 0, (needed - old_cap) * sizeof(int)); - env->per_agent_log_capacity = needed; -} - -// Emit the cross-agent population mean: sum each agent's current EMA into -// env->log, divide later in vec_log by env->log.n (count of agents that have -// ever completed at least one episode). Per-agent EMA state is preserved -// across emissions so frequent completers do not dominate the long-run -// signal -- their slot simply tracks the smoothed estimate alongside everyone -// else. +// Sum each agent's current EMA slot into env->log and set env->log.n to the +// number of slots that have ever been seeded. vec_log divides by aggregate.n +// downstream to produce the cross-agent mean. static void prepare_log(Drive *env) { memset(&env->log, 0, sizeof(Log)); - if (env->per_agent_log_capacity == 0) { - return; - } int num_keys = sizeof(Log) / sizeof(float); int num_with_data = 0; - for (int a = 0; a < env->per_agent_log_capacity; a++) { + for (int a = 0; a < env->active_agent_count; a++) { if (!env->per_agent_has_data[a]) { continue; } @@ -2632,7 +2602,6 @@ static void prepare_log(Drive *env) { static void add_log(Drive *env) { int safe_timestep = (env->timestep > 0) ? env->timestep : 1; - ensure_per_agent_log_capacity(env, env->active_agent_count); float alpha = env->log_ema_alpha; float one_minus_alpha = 1.0f - alpha; int num_log_keys = sizeof(Log) / sizeof(float); @@ -2643,10 +2612,6 @@ static void add_log(Drive *env) { reference_progress_distance = fmaxf(reference_progress_distance, 1.0f); env->logs[i].progress_ratio = agent->distance_since_spawn / reference_progress_distance; - // Build a fresh per-episode Log snapshot. All field writes below are - // direct assignments (no accumulation): the snapshot represents this - // single completed episode in canonical units. We then either seed - // the agent's slot (first completion) or EMA-blend into it. Log episode_log = {0}; int offroad = env->logs[i].offroad_rate; @@ -2727,9 +2692,6 @@ static void add_log(Drive *env) { episode_log.puffer_score = env->logs[i].puffer_score; } - // Env-level composition counts: snapshot per agent so the per-agent - // EMA tracks the env's value (constant within a scenario) and the - // cross-agent mean gives a population-weighted view. episode_log.expert_static_car_count = env->expert_static_agent_count; episode_log.static_car_count = env->static_agent_count; @@ -3603,7 +3565,6 @@ void init(Drive *env) { env->per_agent_log_ema = NULL; env->per_agent_log_count = NULL; env->per_agent_has_data = NULL; - env->per_agent_log_capacity = 0; struct SharedMapData *shared = env->use_map_cache ? map_cache_lookup(env->map_name) : NULL; if (shared != NULL) { @@ -3653,6 +3614,10 @@ void init(Drive *env) { env->logs_capacity = 0; set_active_agents(env); env->logs_capacity = env->active_agent_count; + + env->per_agent_log_ema = (Log *) calloc(env->active_agent_count, sizeof(Log)); + env->per_agent_log_count = (int *) calloc(env->active_agent_count, sizeof(int)); + env->per_agent_has_data = (int *) calloc(env->active_agent_count, sizeof(int)); if (env->simulation_mode == SIMULATION_REPLAY) { remove_bad_trajectories(env); } From 7232ea67ad818f4b6dc8b43d9c14a0e97d7307fa Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 15:21:11 -0400 Subject: [PATCH 08/11] env_binding: guard eval-branch divide against per-env n=0 The eval branch of vec_log only checked env[0]'s log.n before dividing every env's log fields by their own n, so envs other than env[0] with no data would divide by zero. The empty-data short-circuit also returned the unused outer dict instead of the half-built list, leaking it to the GC. Rewrite the eval branch to per-env-check inside the loop: skip the divide-and-emit for envs with n=0 and leave their list slot as an empty dict, so the list shape stays vec->num_envs regardless of which envs have contributed. Co-Authored-By: Claude Opus 4.7 (1M context) --- pufferlib/ocean/env_binding.h | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index ee04fd1112..ac30638029 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -709,32 +709,25 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { Env *env = vec->envs[0]; if (env->eval_mode) { PyObject *list = PyList_New(vec->num_envs); - PyObject *dict = PyDict_New(); - if (env->log.n == 0) { - return dict; - } - - // Got enough data. Reset logs and return metrics for (int i = 0; i < vec->num_envs; i++) { PyObject *dict = PyDict_New(); - Env *env = vec->envs[i]; - float n = env->log.n; - // Average across agents - for (int i = 0; i < num_keys; i++) { - ((float *) &env->log)[i] /= n; - } - my_log(dict, env, &env->log, n); - assign_to_dict(dict, "n", n); - // Add map_name to dict - if (env->map_name) { - PyObject *s = PyUnicode_FromString(env->map_name); - if (s != NULL) { - PyDict_SetItemString(dict, "map_name", s); - Py_DECREF(s); + Env *env_i = vec->envs[i]; + float n = env_i->log.n; + if (n > 0) { + for (int j = 0; j < num_keys; j++) { + ((float *) &env_i->log)[j] /= n; + } + my_log(dict, env_i, &env_i->log, n); + assign_to_dict(dict, "n", n); + if (env_i->map_name) { + PyObject *s = PyUnicode_FromString(env_i->map_name); + if (s != NULL) { + PyDict_SetItemString(dict, "map_name", s); + Py_DECREF(s); + } } } - PyList_SetItem(list, i, dict); } // Reset logs to 0 after extracting metrics (prevents accumulation across episodes) From 6c4fe4ce445e9a88903af748203efb2975ffa8da Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 15:21:11 -0400 Subject: [PATCH 09/11] ci: include per-agent-logging and drive-training tests in utest test_drive_train called load_config which reads sys.argv; under pytest that contains pytest's own flags and the argparser bailed with SystemExit(2). Patch sys.argv to a single-element list inside the test so pytest discovery works (the existing train-ci.yml bare-script invocation is unaffected). Add tests/test_drive_per_agent_logging.py and tests/test_drive_train.py to the utest pytest pipeline. test_drive_train runs in its own pytest step because the test calls os._exit(0) on success to dodge worker-thread hangs, which would otherwise mask any earlier-test failures with exit 0. Co-Authored-By: Claude Opus 4.7 (1M context) --- .github/workflows/utest.yml | 6 ++++++ tests/test_drive_train.py | 6 +++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/.github/workflows/utest.yml b/.github/workflows/utest.yml index 7cb33c103d..58963b8f06 100644 --- a/.github/workflows/utest.yml +++ b/.github/workflows/utest.yml @@ -62,6 +62,7 @@ jobs: pip install pytest python -m pytest -v \ tests/test_drive_map_types.py \ + tests/test_drive_per_agent_logging.py \ tests/test_drive_scenario_length.py \ tests/test_eval_manager.py \ tests/test_validation_replay_html.py \ @@ -69,3 +70,8 @@ jobs: pufferlib/ocean/benchmark/test_map_metrics.py \ pufferlib/ocean/benchmark/test_road_edges.py \ pufferlib/ocean/benchmark/test_ttc.py + + - name: Run drive training smoke + env: + PUFFER_CPU: 1 + run: python -m pytest -v tests/test_drive_train.py diff --git a/tests/test_drive_train.py b/tests/test_drive_train.py index 1f4e93d4e1..6a0a6ffd08 100644 --- a/tests/test_drive_train.py +++ b/tests/test_drive_train.py @@ -7,6 +7,7 @@ import os import sys import time +from unittest.mock import patch sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) @@ -19,7 +20,10 @@ def test_drive_training(): try: env_name = "puffer_drive" - args = load_config(env_name) + # load_config reads sys.argv; under pytest that holds pytest's own flags + # which pufferl's argparser rejects with SystemExit(2). Strip it down. + with patch.object(sys, "argv", ["pufferl.py"]): + args = load_config(env_name) args["train"].update( { From ddc5ff06c9bdf82d8d9ab27d7a16dd7b1ff969df Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 15:27:35 -0400 Subject: [PATCH 10/11] env_binding: skip empty entries entirely in eval-branch list Returning vec->num_envs dicts with empty entries for no-data envs poisons downstream eval aggregation: pufferlib/ocean/benchmark/evaluators/base.py treats a missing "n" key as 1, so each empty dict inflates total_n and the weighted-mean denominator without contributing any numerator -- per-metric means get diluted. Worse, multi_scenario._should_stop counts emissions against num_scenarios, so cold-start steps with all-empty lists could terminate the eval loop early with garbage data. Return a list of only the populated dicts (length <= vec->num_envs). When no env has data the list is empty, drive.py's `if log: info.append(log)` guard skips it, and downstream counters keep ticking on real emissions only. Co-Authored-By: Claude Opus 4.7 (1M context) --- pufferlib/ocean/env_binding.h | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index ac30638029..029b4c1daf 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -708,27 +708,29 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { Env *env = vec->envs[0]; if (env->eval_mode) { - PyObject *list = PyList_New(vec->num_envs); + PyObject *list = PyList_New(0); for (int i = 0; i < vec->num_envs; i++) { - PyObject *dict = PyDict_New(); Env *env_i = vec->envs[i]; float n = env_i->log.n; - if (n > 0) { - for (int j = 0; j < num_keys; j++) { - ((float *) &env_i->log)[j] /= n; - } - my_log(dict, env_i, &env_i->log, n); - assign_to_dict(dict, "n", n); - if (env_i->map_name) { - PyObject *s = PyUnicode_FromString(env_i->map_name); - if (s != NULL) { - PyDict_SetItemString(dict, "map_name", s); - Py_DECREF(s); - } + if (n == 0) { + continue; + } + for (int j = 0; j < num_keys; j++) { + ((float *) &env_i->log)[j] /= n; + } + PyObject *dict = PyDict_New(); + my_log(dict, env_i, &env_i->log, n); + assign_to_dict(dict, "n", n); + if (env_i->map_name) { + PyObject *s = PyUnicode_FromString(env_i->map_name); + if (s != NULL) { + PyDict_SetItemString(dict, "map_name", s); + Py_DECREF(s); } } - PyList_SetItem(list, i, dict); + PyList_Append(list, dict); + Py_DECREF(dict); } // Reset logs to 0 after extracting metrics (prevents accumulation across episodes) for (int i = 0; i < vec->num_envs; i++) { From 338542ada62f985e26900e0a320ce0a19c3d65ad Mon Sep 17 00:00:00 2001 From: Eugene Vinitsky Date: Sat, 30 May 2026 15:33:21 -0400 Subject: [PATCH 11/11] drive: shorten log_ema_alpha half-life to 2 episodes; fix ruff-format log_ema_alpha = 0.707 (was 0.95) so the per-agent EMA's effective half-life is ~2 episodes -- responsive enough to track training progress within a few emissions per agent. Drive.__init__ default tracks drive.ini. Also join a split f-string in test_drive_per_agent_logging.py to make ruff-format happy. Co-Authored-By: Claude Opus 4.7 (1M context) --- pufferlib/config/ocean/drive.ini | 4 ++-- pufferlib/ocean/drive/drive.py | 2 +- tests/test_drive_per_agent_logging.py | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index 18d174aaa4..b315a889c4 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -109,8 +109,8 @@ reward_ade = 0.0 ; smoothed estimate of its completed-episode metrics; on every completion ; slot = alpha*slot + (1 - alpha)*new_episode. alpha=0 collapses to "most ; recent episode only", alpha->1 freezes the slot at its first observation. -; alpha=0.95 has a half-life of ~14 episodes per agent. -log_ema_alpha = 0.95 +; alpha=0.707 has a half-life of ~2 episodes per agent. +log_ema_alpha = 0.707 ; --- Map --- ; Path to map used for training diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 9f15dd1b1d..ecbf1d6027 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -74,7 +74,7 @@ def __init__( init_step=0, eval_mode=0, num_eval_scenarios=16, - log_ema_alpha=0.95, + log_ema_alpha=0.707, init_mode="create_all_valid", control_mode="control_vehicles", map_dir=None, diff --git a/tests/test_drive_per_agent_logging.py b/tests/test_drive_per_agent_logging.py index 8c3d4a9309..0ac7470073 100644 --- a/tests/test_drive_per_agent_logging.py +++ b/tests/test_drive_per_agent_logging.py @@ -84,8 +84,7 @@ def test_steady_state_n_equals_num_agents(): assert last_log is not None, "Expected at least one emission across 4 scenarios" assert last_log["n"] == NUM_AGENTS, ( - f"Expected steady-state n={NUM_AGENTS}, got n={last_log['n']} " - "(some agents missing from the population mean)" + f"Expected steady-state n={NUM_AGENTS}, got n={last_log['n']} (some agents missing from the population mean)" ) env.close()