diff --git a/.changeset/consumer-heap-gc.md b/.changeset/consumer-heap-gc.md new file mode 100644 index 0000000000..b95e1a28bc --- /dev/null +++ b/.changeset/consumer-heap-gc.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Bound Shape.Consumer heap growth: make the consumer family's process spawn options (incl. `fullsweep_after`) configurable per process via `ELECTRIC_PROCESS_SPAWN_OPTS`, and add an opt-in adaptive GC that runs after a transaction fragment when the consumer's heap exceeds the runtime-tunable `ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD` (off by default). diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index f21efd3c8f..6c73b12631 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -291,6 +291,7 @@ config :electric, process_registry_partitions: env!("ELECTRIC_TWEAKS_PROCESS_REGISTRY_PARTITIONS", :integer, nil), process_spawn_opts: env!("ELECTRIC_PROCESS_SPAWN_OPTS", &Electric.Config.parse_spawn_opts!/1, %{}), + consumer_gc_heap_threshold: env!("ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD", :integer, nil), http_api_num_acceptors: env!("ELECTRIC_TWEAKS_HTTP_API_NUM_ACCEPTORS", :integer, 100), conn_max_requests: env!("ELECTRIC_TWEAKS_CONN_MAX_REQUESTS", :integer, nil), handler_fullsweep_after: env!("ELECTRIC_TWEAKS_HANDLER_FULLSWEEP_AFTER", :integer, nil), diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 12628472ed..63a4ed7ffe 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -156,7 +156,8 @@ defmodule Electric.Application do shape_suspend_after: get_env(opts, :shape_suspend_after), conn_max_requests: get_env(opts, :conn_max_requests), handler_fullsweep_after: get_env(opts, :handler_fullsweep_after), - process_spawn_opts: get_env(opts, :process_spawn_opts) + process_spawn_opts: get_env(opts, :process_spawn_opts), + consumer_gc_heap_threshold: get_env(opts, :consumer_gc_heap_threshold) ], manual_table_publishing?: get_env(opts, :manual_table_publishing?), shape_db_opts: [ diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 290fbfa4f3..c6d77a830c 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -114,6 +114,9 @@ defmodule Electric.Config do # # e.g. %{shape_log_collector: [min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024]} process_spawn_opts: %{}, + # Heap-size threshold (in BYTES) above which a consumer runs :erlang.garbage_collect() + # after processing a transaction fragment. + consumer_gc_heap_threshold: nil, ## Misc process_registry_partitions: &Electric.Config.Defaults.process_registry_partitions/0, feature_flags: if(Mix.env() == :test, do: @known_feature_flags, else: []), diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index bbbe5474ff..6ee86ca229 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -38,6 +38,12 @@ defmodule Electric.Shapes.Consumer do @default_snapshot_timeout 45_000 @stop_and_clean_timeout 30_000 @stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason() + @word_size :erlang.system_info(:wordsize) + + # Minimum wall-clock interval (ms) between consumer-forced full GC sweeps. Caps how much CPU + # a busy consumer spends on full sweeps to at most one per @gc_min_interval_ms regardless of + # fragment rate. + @gc_min_interval_ms 1_000 @type initialize_shape_opts() :: %{ :action => :create | :restore, @@ -110,9 +116,24 @@ defmodule Electric.Shapes.Consumer do ConsumerRegistry.whereis(stack_id, shape_handle) end + @doc """ + Set the adaptive-GC heap threshold (bytes, or nil to disable) for a single stack. + + Consumers cache this value at startup (see `State.new/2`), so the new threshold only + applies to consumers started after this call — already-running consumers keep the + threshold they read when they booted. Safe to call from IEx. + """ + @spec set_gc_heap_threshold(Electric.stack_id(), non_neg_integer() | nil) :: :ok + def set_gc_heap_threshold(stack_id, threshold_bytes) + when is_nil(threshold_bytes) or (is_integer(threshold_bytes) and threshold_bytes >= 0) do + Electric.StackConfig.put(stack_id, :consumer_gc_heap_threshold, threshold_bytes) + :ok + end + def start_link(%{stack_id: stack_id, shape_handle: shape_handle} = _config) do GenServer.start_link(__MODULE__, %{stack_id: stack_id, shape_handle: shape_handle}, - name: name(stack_id, shape_handle) + name: name(stack_id, shape_handle), + spawn_opt: Electric.StackConfig.spawn_opts(stack_id, :consumer) ) end @@ -170,10 +191,19 @@ defmodule Electric.Shapes.Consumer do if state.terminating? do {:noreply, state, {:continue, :stop_and_clean}} else - {:noreply, state, state.hibernate_after} + {:noreply, state, {:continue, :maybe_gc}} end end + # Deferred adaptive GC. Reached via {:continue, :maybe_gc} after a fragment (or a + # full buffer drain) has been processed, so a forced full sweep runs off the + # reply/critical path rather than blocking the ShapeLogCollector. Re-establishes + # the hibernate_after timeout that the {:continue, …} return could not carry. + def handle_continue(:maybe_gc, state) do + state = maybe_garbage_collect(state) + {:noreply, state, state.hibernate_after} + end + @impl GenServer # Any incoming message counts as activity: cancel the pending suspend timer (if # any) and recurse for actual handling of the call. @@ -204,7 +234,7 @@ defmodule Electric.Shapes.Consumer do {:reply, :ok, state, {:continue, :stop_and_clean}} state -> - {:reply, :ok, state, state.hibernate_after} + {:reply, :ok, state, {:continue, :maybe_gc}} end end @@ -520,6 +550,56 @@ defmodule Electric.Shapes.Consumer do handle_txn_fragment(txn_fragment, state) end + # Adaptive GC check. Always invoked from a {:continue, :maybe_gc} handler so the + # forced full sweep runs off the ShapeLogCollector's synchronous publish path: the + # SLC has already received :ok by the time this runs. Because the SLC publishes + # fragments to a given consumer sequentially, the continue typically completes + # before the next fragment arrives, so it does not block steady-state throughput. + + # Fast path: adaptive GC is disabled — skip all process_info/time calls. + defp maybe_garbage_collect(%State{gc_heap_threshold: nil} = state), do: state + + defp maybe_garbage_collect(%State{gc_heap_threshold: threshold_bytes} = state) do + {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) + heap_bytes = heap_words * @word_size + now = System.monotonic_time(:millisecond) + + if should_force_gc?(heap_bytes, threshold_bytes, state.last_forced_gc_at, now) do + :erlang.garbage_collect() + %{state | last_forced_gc_at: now} + else + state + end + end + + @doc false + # Decide whether to force a full GC sweep: heap (bytes) must be over the + # threshold (bytes) AND at least @gc_min_interval_ms must have elapsed since the + # last forced GC. last_gc_at / now_ms are monotonic milliseconds; last_gc_at is + # nil if this consumer has never forced a GC (always fire on first over-threshold + # event). Passing explicit min_interval_ms enables deterministic unit tests. + @spec should_force_gc?( + non_neg_integer(), + non_neg_integer() | nil, + integer() | nil, + integer(), + non_neg_integer() + ) :: boolean() + def should_force_gc?( + heap_bytes, + threshold_bytes, + last_gc_at, + now_ms, + min_interval_ms \\ @gc_min_interval_ms + ) + + def should_force_gc?(_heap_bytes, nil, _last_gc_at, _now_ms, _min_interval_ms), do: false + + def should_force_gc?(heap_bytes, threshold_bytes, last_gc_at, now_ms, min_interval_ms) do + heap_bytes > threshold_bytes and + (is_nil(last_gc_at) or now_ms - last_gc_at >= min_interval_ms) + end + # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). # In this phase we have to buffer incoming txn fragments because we can't yet decide what to # do with the transaction: skip it or write it to the shape log. diff --git a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex index c98dc4c1f0..08c6c97bf5 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex @@ -111,7 +111,10 @@ defmodule Electric.Shapes.Consumer.Materializer do do: subscribe(%{stack_id: stack_id, shape_handle: shape_handle}) def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: name(opts)) + GenServer.start_link(__MODULE__, opts, + name: name(opts), + spawn_opt: Electric.StackConfig.spawn_opts(opts.stack_id, :consumer_materializer) + ) end def init(opts) do diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 8ff3aa14be..2a64f916f9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -20,7 +20,10 @@ defmodule Electric.Shapes.Consumer.Snapshotter do end def start_link(config) when is_map(config) do - GenServer.start_link(__MODULE__, config, name: name(config)) + GenServer.start_link(__MODULE__, config, + name: name(config), + spawn_opt: Electric.StackConfig.spawn_opts(config.stack_id, :consumer_snapshotter) + ) end def init(config) do diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index 799cf0d8a6..c3aa0f460c 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -47,7 +47,13 @@ defmodule Electric.Shapes.Consumer.State do # as any message arrives (activity), so at most one is ever live at a time. suspend_timer: nil, # How long after hibernation to suspend (in ms) - suspend_after: nil + suspend_after: nil, + # Monotonic millisecond timestamp of the last consumer-forced GC (nil if never). + # Used by hysteresis logic in maybe_garbage_collect/1 to cap forced-GC frequency. + last_forced_gc_at: nil, + # Adaptive-GC heap threshold (bytes) cached at consumer startup, or nil when + # disabled. + gc_heap_threshold: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -107,6 +113,7 @@ defmodule Electric.Shapes.Consumer.State do :shape_suspend_after, Electric.Config.default(:shape_suspend_after) ), + gc_heap_threshold: Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil), buffering?: true } end diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index 462d474953..cf51621418 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -33,7 +33,8 @@ defmodule Electric.StackConfig do shape_suspend_after: Electric.Config.default(:shape_suspend_after), chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), feature_flags: [], - process_spawn_opts: %{} + process_spawn_opts: %{}, + consumer_gc_heap_threshold: Electric.Config.default(:consumer_gc_heap_threshold) ] end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index b6d0111669..3d8a35b078 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -154,6 +154,10 @@ defmodule Electric.StackSupervisor do default: nil ], process_spawn_opts: [type: :map, default: %{}], + consumer_gc_heap_threshold: [ + type: {:or, [:non_neg_integer, nil]}, + default: Electric.Config.default(:consumer_gc_heap_threshold) + ], consumer_partitions: [type: {:or, [:pos_integer, nil]}, default: nil] ] ], @@ -357,6 +361,7 @@ defmodule Electric.StackSupervisor do shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?) shape_suspend_after = Keyword.fetch!(config.tweaks, :shape_suspend_after) process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts) + consumer_gc_heap_threshold = Keyword.fetch!(config.tweaks, :consumer_gc_heap_threshold) shape_cache_opts = [ stack_id: stack_id @@ -407,6 +412,7 @@ defmodule Electric.StackSupervisor do shape_enable_suspend?: shape_enable_suspend?, shape_suspend_after: shape_suspend_after, process_spawn_opts: process_spawn_opts, + consumer_gc_heap_threshold: consumer_gc_heap_threshold, feature_flags: Map.get(config, :feature_flags, []) ]}, {Electric.AsyncDeleter, diff --git a/packages/sync-service/test/electric/config_test.exs b/packages/sync-service/test/electric/config_test.exs index 437b82605d..d5790eaea2 100644 --- a/packages/sync-service/test/electric/config_test.exs +++ b/packages/sync-service/test/electric/config_test.exs @@ -130,4 +130,43 @@ defmodule Electric.ConfigTest do end end end + + describe "tweaks propagation" do + setup do + initial_config = Application.get_all_env(:electric) + + for {key, _} <- initial_config do + Application.delete_env(:electric, key) + end + + on_exit(fn -> + Application.put_all_env([{:electric, initial_config}]) + end) + + [initial_config: initial_config] + end + + test "consumer_gc_heap_threshold opt is threaded into tweaks", ctx do + threshold = 209_715_200 + + config = + Electric.Application.configuration( + Keyword.merge( + Keyword.take(ctx.initial_config, [:replication_connection_opts]), + consumer_gc_heap_threshold: threshold + ) + ) + + assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == threshold + end + + test "consumer_gc_heap_threshold defaults to nil in tweaks", ctx do + config = + Electric.Application.configuration( + Keyword.take(ctx.initial_config, [:replication_connection_opts]) + ) + + assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == nil + end + end end diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index af5a82a43b..a2554f54d9 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2652,6 +2652,332 @@ defmodule Electric.Shapes.ConsumerTest do ) end + describe "process gc configuration" do + setup [ + :with_registry, + :with_in_memory_storage, + :with_shape_status, + :with_lsn_tracker, + :with_persistent_kv, + :with_status_monitor, + :with_dynamic_consumer_supervisor, + :with_noop_publication_manager, + :with_shape_cleaner + ] + + setup ctx do + start_link_supervised!({ + ShapeLogCollector.Supervisor, + stack_id: ctx.stack_id, persistent_kv: ctx.persistent_kv, inspector: @base_inspector + }) + + ShapeLogCollector.mark_as_ready(ctx.stack_id) + [shape_position: @shape_position] + end + + @tag process_spawn_opts: %{consumer: [fullsweep_after: 4, priority: :high]} + test "spawn_opts are correctly passed to consumer process", ctx do + support_test_storage_wrap(ctx, @shape_handle1, @shape1) + + {:ok, consumer} = + start_supervised( + {Consumer, + %{ + shape_handle: @shape_handle1, + stack_id: ctx.stack_id + }}, + id: {Consumer, @shape_handle1} + ) + + Consumer.initialize_shape(consumer, @shape1, %{action: :create}) + assert_receive {Support.TestStorage, :init_writer!, @shape_handle1, @shape1} + :started = Consumer.await_snapshot_start(ctx.stack_id, @shape_handle1) + + info = Process.info(consumer) + + assert info[:priority] == :high + assert info[:garbage_collection][:fullsweep_after] == 4 + end + end + + defp support_test_storage_wrap(ctx, shape_handle, shape) do + %{snapshot_xmin: xmin} = shape_status(shape_handle, ctx) + shapes = %{shape_handle => shape} + + storage = + Support.TestStorage.wrap(ctx.storage, %{ + shape_handle => [ + {:mark_snapshot_as_started, []}, + {:set_pg_snapshot, [%{xmin: xmin, xmax: xmin + 1, xip_list: [xmin]}]} + ] + }) + + Electric.StackConfig.put(ctx.stack_id, Electric.ShapeCache.Storage, storage) + Electric.StackConfig.put(ctx.stack_id, :inspector, @base_inspector) + + patch_shape_status(fetch_shape_by_handle: fn _, sh -> Map.fetch(shapes, sh) end) + + Support.TestUtils.activate_mocks_for_descendant_procs(Consumer) + Support.TestUtils.activate_mocks_for_descendant_procs(Electric.ShapeCache.ShapeCleaner) + :ok + end + + describe "should_force_gc?/5" do + # All tests pass explicit now_ms / last_gc_at / min_interval_ms so they are + # fully deterministic and do not depend on wall-clock time. + + test "false when threshold is nil (adaptive GC disabled)" do + refute Electric.Shapes.Consumer.should_force_gc?(1_000_000, nil, nil, 5_000, 1_000) + end + + test "true when heap over threshold and consumer has never forced a GC (last_gc_at nil)" do + # 1_000 bytes > threshold of 1 byte + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, nil, 5_000, 1_000) + end + + test "false when heap over threshold but interval has not elapsed" do + # last_gc_at=4_500, now=5_000 → delta=500 < min_interval=1_000 → no GC + refute Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 4_500, 5_000, 1_000) + end + + test "true when heap over threshold and interval has elapsed" do + # last_gc_at=3_000, now=5_000 → delta=2_000 >= min_interval=1_000 → GC + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 3_000, 5_000, 1_000) + end + + test "true at exactly the min interval boundary" do + # last_gc_at=4_000, now=5_000 → delta=1_000 == min_interval=1_000 → GC + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 4_000, 5_000, 1_000) + end + + test "false when heap is under threshold regardless of timing" do + # heap=1 byte; threshold=1_000 bytes → under + refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, nil, 5_000, 1_000) + end + + test "false when heap is under threshold even if interval would have elapsed" do + refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, 0, 5_000, 1_000) + end + + test "false when heap exactly equals threshold (strict comparison)" do + refute Electric.Shapes.Consumer.should_force_gc?(1_000, 1_000, nil, 5_000, 1_000) + end + end + + describe "adaptive GC after fragment processing" do + @describetag :tmp_dir + + setup do + %{inspector: @base_inspector, pool: nil} + end + + setup [ + :with_registry, + :with_pure_file_storage, + :with_shape_status, + :with_lsn_tracker, + :with_log_chunking, + :with_persistent_kv, + :with_async_deleter, + :with_shape_cleaner, + :with_shape_log_collector, + :with_noop_publication_manager, + :with_status_monitor + ] + + setup(ctx) do + delay_snapshot_creation? = Map.get(ctx, :delay_snapshot_creation?) + test_pid = self() + + patch_snapshotter(fn parent, shape_handle, _shape, %{snapshot_fun: snapshot_fun} -> + if delay_snapshot_creation? do + receive do + {^test_pid, :resume} -> :ok + end + end + + pg_snapshot = {10, 11, [10]} + GenServer.cast(parent, {:pg_snapshot_known, shape_handle, pg_snapshot}) + GenServer.cast(parent, {:snapshot_started, shape_handle}) + snapshot_fun.([]) + end) + + Electric.StackConfig.put(ctx.stack_id, :shape_hibernate_after, 10_000) + :ok + end + + setup ctx do + %{consumer_supervisor: consumer_supervisor, shape_cache: shape_cache} = + Support.ComponentSetup.with_shape_cache(ctx) + + %{ + consumer_supervisor: consumer_supervisor, + shape_cache: shape_cache + } + end + + test "GC runs when heap exceeds tiny threshold", ctx do + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + large_binary = :binary.copy(<<0>>, 200_000) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + + # GC runs in the deferred {:continue, :maybe_gc} after the reply. A synchronous + # call is queued behind the pending continue, so :sys.get_state returns only + # once the GC has run — and lets us read last_forced_gc_at, which the consumer + # stamps iff it forced a sweep. That is a direct signal of our decision, + # immune to natural BEAM GCs and heap-size timing. + assert %{last_forced_gc_at: forced_at} = :sys.get_state(consumer_pid) + + refute is_nil(forced_at), + "threshold=1 keeps the heap over threshold, so a forced GC should be recorded" + end + + test "GC does not run when threshold is very large", ctx do + # 1 GB threshold — the consumer heap will never reach this, so GC must NOT fire. + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1_000_000_000) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + large_binary = :binary.copy(<<0>>, 200_000) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + + # Flush the deferred :maybe_gc continue and read the state. The heap stays well + # under the 1 GB threshold, so the consumer must not have forced a sweep. + assert %{last_forced_gc_at: forced_at} = :sys.get_state(consumer_pid) + + assert is_nil(forced_at), + "no forced GC should be recorded while under threshold, got #{inspect(forced_at)}" + end + + test "no GC by default (threshold=nil)", ctx do + # Ensure no threshold is set (default behaviour) + assert nil == Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + # Should process without error even when no GC threshold is configured + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + end + + @tag delay_snapshot_creation?: true + test "GC runs during buffered-fragment drain when heap exceeds threshold", ctx do + # threshold=1 forces a GC once the buffered fragments are drained. The consumer + # starts with buffering?=true; fragments sent before pg_snapshot_known land in the + # buffer. When we unblock the snapshotter it fires pg_snapshot_known which triggers + # :consume_buffer → drains the buffer → {:continue, :maybe_gc} runs the GC once. + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + + # The snapshotter is now running but blocked on `receive {^test_pid, :resume}`. + assert_receive {:snapshot, ^shape_handle, snapshotter_pid} + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + + # Send a large-payload fragment while buffering?=true — it goes into the buffer. + large_binary = :binary.copy(<<0>>, 200_000) + xid = 11 + lsn = Lsn.from_integer(10) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Unblock the snapshotter: fires pg_snapshot_known → :consume_buffer → drains the + # buffer, then defers a single GC via {:continue, :maybe_gc}. + send(snapshotter_pid, {self(), :resume}) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + assert_receive {^ref, :new_changes, _}, @receive_timeout + + # The deferred GC runs in the :maybe_gc continue after the drain. Flush it with a + # synchronous call (queued behind the continue), then check last_forced_gc_at — + # the consumer stamps it iff it forced a sweep for the drained fragment. + assert %{last_forced_gc_at: forced_at} = :sys.get_state(consumer_pid) + + refute is_nil(forced_at), + "expected a forced GC to be recorded after the buffered-fragment drain" + end + end + + describe "set_gc_heap_threshold helpers" do + # with_stack_id_from_test (line 87) already starts ProcessRegistry + StackConfig + # for ctx.stack_id — no heavier setup is needed for these pure-config tests. + + test "set_gc_heap_threshold/2 writes the value into StackConfig", ctx do + assert :ok = Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, 2_000_000) + + assert 2_000_000 == + Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + + test "set_gc_heap_threshold/2 accepts nil to disable", ctx do + Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, 123) + assert :ok = Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, nil) + assert nil == Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + end + defp get_log_items_from_storage(offset, shape_storage) do Storage.get_log_stream(offset, shape_storage) |> Enum.map(&Jason.decode!/1) end