From a9b00713d2ed7f646541fe0d1c4ec6afbb837579 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 13:53:28 +0200 Subject: [PATCH 01/14] feat(consumer): make consumer-family spawn_opts configurable per process --- .../lib/electric/shapes/consumer.ex | 3 +- .../electric/shapes/consumer/materializer.ex | 5 +- .../electric/shapes/consumer/snapshotter.ex | 5 +- .../test/electric/shapes/consumer_test.exs | 71 +++++++++++++++++++ 4 files changed, 81 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index bbbe5474ff..06c47ca91c 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -112,7 +112,8 @@ defmodule Electric.Shapes.Consumer do def start_link(%{stack_id: stack_id, shape_handle: shape_handle} = _config) do GenServer.start_link(__MODULE__, %{stack_id: stack_id, shape_handle: shape_handle}, - name: name(stack_id, shape_handle) + name: name(stack_id, shape_handle), + spawn_opt: Electric.StackConfig.spawn_opts(stack_id, :consumer) ) end diff --git a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex index c98dc4c1f0..08c6c97bf5 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex @@ -111,7 +111,10 @@ defmodule Electric.Shapes.Consumer.Materializer do do: subscribe(%{stack_id: stack_id, shape_handle: shape_handle}) def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: name(opts)) + GenServer.start_link(__MODULE__, opts, + name: name(opts), + spawn_opt: Electric.StackConfig.spawn_opts(opts.stack_id, :consumer_materializer) + ) end def init(opts) do diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 8ff3aa14be..2a64f916f9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -20,7 +20,10 @@ defmodule Electric.Shapes.Consumer.Snapshotter do end def start_link(config) when is_map(config) do - GenServer.start_link(__MODULE__, config, name: name(config)) + GenServer.start_link(__MODULE__, config, + name: name(config), + spawn_opt: Electric.StackConfig.spawn_opts(config.stack_id, :consumer_snapshotter) + ) end def init(config) do diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index af5a82a43b..b64fd079a2 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2652,6 +2652,77 @@ defmodule Electric.Shapes.ConsumerTest do ) end + describe "process gc configuration" do + setup [ + :with_registry, + :with_in_memory_storage, + :with_shape_status, + :with_lsn_tracker, + :with_persistent_kv, + :with_status_monitor, + :with_dynamic_consumer_supervisor, + :with_noop_publication_manager, + :with_shape_cleaner + ] + + setup ctx do + start_link_supervised!({ + ShapeLogCollector.Supervisor, + stack_id: ctx.stack_id, persistent_kv: ctx.persistent_kv, inspector: @base_inspector + }) + + ShapeLogCollector.mark_as_ready(ctx.stack_id) + [shape_position: @shape_position] + end + + @tag process_spawn_opts: %{consumer: [fullsweep_after: 4, priority: :high]} + test "spawn_opts are correctly passed to consumer process", ctx do + support_test_storage_wrap(ctx, @shape_handle1, @shape1) + + {:ok, consumer} = + start_supervised( + {Consumer, + %{ + shape_handle: @shape_handle1, + stack_id: ctx.stack_id + }}, + id: {Consumer, @shape_handle1} + ) + + Consumer.initialize_shape(consumer, @shape1, %{action: :create}) + assert_receive {Support.TestStorage, :init_writer!, @shape_handle1, @shape1} + :started = Consumer.await_snapshot_start(ctx.stack_id, @shape_handle1) + + pid = Consumer.name(ctx.stack_id, @shape_handle1) |> GenServer.whereis() + info = Process.info(pid) + + assert info[:priority] == :high + assert info[:garbage_collection][:fullsweep_after] == 4 + end + end + + defp support_test_storage_wrap(ctx, shape_handle, shape) do + %{snapshot_xmin: xmin} = shape_status(shape_handle, ctx) + shapes = %{shape_handle => shape} + + storage = + Support.TestStorage.wrap(ctx.storage, %{ + shape_handle => [ + {:mark_snapshot_as_started, []}, + {:set_pg_snapshot, [%{xmin: xmin, xmax: xmin + 1, xip_list: [xmin]}]} + ] + }) + + Electric.StackConfig.put(ctx.stack_id, Electric.ShapeCache.Storage, storage) + Electric.StackConfig.put(ctx.stack_id, :inspector, @base_inspector) + + patch_shape_status(fetch_shape_by_handle: fn _, sh -> Map.fetch(shapes, sh) end) + + Support.TestUtils.activate_mocks_for_descendant_procs(Consumer) + Support.TestUtils.activate_mocks_for_descendant_procs(Electric.ShapeCache.ShapeCleaner) + :ok + end + defp get_log_items_from_storage(offset, shape_storage) do Storage.get_log_stream(offset, shape_storage) |> Enum.map(&Jason.decode!/1) end From 51b897f11ad1674bc27077f3aa3c819307bd6b7a Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 13:57:09 +0200 Subject: [PATCH 02/14] test(consumer): use supervised pid directly in spawn_opts test --- packages/sync-service/test/electric/shapes/consumer_test.exs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index b64fd079a2..a536bd87ba 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2693,8 +2693,7 @@ defmodule Electric.Shapes.ConsumerTest do assert_receive {Support.TestStorage, :init_writer!, @shape_handle1, @shape1} :started = Consumer.await_snapshot_start(ctx.stack_id, @shape_handle1) - pid = Consumer.name(ctx.stack_id, @shape_handle1) |> GenServer.whereis() - info = Process.info(pid) + info = Process.info(consumer) assert info[:priority] == :high assert info[:garbage_collection][:fullsweep_after] == 4 From 42548eb9330fca9d19c048711e57505f119873db Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:00:34 +0200 Subject: [PATCH 03/14] feat(consumer): add consumer_gc_heap_threshold stack config (default nil) --- packages/sync-service/config/runtime.exs | 1 + packages/sync-service/lib/electric/config.ex | 4 ++++ packages/sync-service/lib/electric/stack_config.ex | 3 ++- packages/sync-service/lib/electric/stack_supervisor.ex | 6 ++++++ 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index f21efd3c8f..6c73b12631 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -291,6 +291,7 @@ config :electric, process_registry_partitions: env!("ELECTRIC_TWEAKS_PROCESS_REGISTRY_PARTITIONS", :integer, nil), process_spawn_opts: env!("ELECTRIC_PROCESS_SPAWN_OPTS", &Electric.Config.parse_spawn_opts!/1, %{}), + consumer_gc_heap_threshold: env!("ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD", :integer, nil), http_api_num_acceptors: env!("ELECTRIC_TWEAKS_HTTP_API_NUM_ACCEPTORS", :integer, 100), conn_max_requests: env!("ELECTRIC_TWEAKS_CONN_MAX_REQUESTS", :integer, nil), handler_fullsweep_after: env!("ELECTRIC_TWEAKS_HANDLER_FULLSWEEP_AFTER", :integer, nil), diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 290fbfa4f3..15f2082793 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -114,6 +114,10 @@ defmodule Electric.Config do # # e.g. %{shape_log_collector: [min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024]} process_spawn_opts: %{}, + # Heap-size threshold (in BYTES) above which a consumer runs :erlang.garbage_collect() + # after processing a transaction fragment. nil disables adaptive GC. Looked up at + # runtime via StackConfig so it can be changed from a live IEx shell. + consumer_gc_heap_threshold: nil, ## Misc process_registry_partitions: &Electric.Config.Defaults.process_registry_partitions/0, feature_flags: if(Mix.env() == :test, do: @known_feature_flags, else: []), diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index 462d474953..cf51621418 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -33,7 +33,8 @@ defmodule Electric.StackConfig do shape_suspend_after: Electric.Config.default(:shape_suspend_after), chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), feature_flags: [], - process_spawn_opts: %{} + process_spawn_opts: %{}, + consumer_gc_heap_threshold: Electric.Config.default(:consumer_gc_heap_threshold) ] end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index b6d0111669..3d8a35b078 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -154,6 +154,10 @@ defmodule Electric.StackSupervisor do default: nil ], process_spawn_opts: [type: :map, default: %{}], + consumer_gc_heap_threshold: [ + type: {:or, [:non_neg_integer, nil]}, + default: Electric.Config.default(:consumer_gc_heap_threshold) + ], consumer_partitions: [type: {:or, [:pos_integer, nil]}, default: nil] ] ], @@ -357,6 +361,7 @@ defmodule Electric.StackSupervisor do shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?) shape_suspend_after = Keyword.fetch!(config.tweaks, :shape_suspend_after) process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts) + consumer_gc_heap_threshold = Keyword.fetch!(config.tweaks, :consumer_gc_heap_threshold) shape_cache_opts = [ stack_id: stack_id @@ -407,6 +412,7 @@ defmodule Electric.StackSupervisor do shape_enable_suspend?: shape_enable_suspend?, shape_suspend_after: shape_suspend_after, process_spawn_opts: process_spawn_opts, + consumer_gc_heap_threshold: consumer_gc_heap_threshold, feature_flags: Map.get(config, :feature_flags, []) ]}, {Electric.AsyncDeleter, From c53cfca5a924b8a706e1994d367a07ae959edc5a Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:03:26 +0200 Subject: [PATCH 04/14] fix(consumer): thread consumer_gc_heap_threshold env into stack tweaks --- .../sync-service/lib/electric/application.ex | 3 +- .../test/electric/config_test.exs | 39 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index 12628472ed..63a4ed7ffe 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -156,7 +156,8 @@ defmodule Electric.Application do shape_suspend_after: get_env(opts, :shape_suspend_after), conn_max_requests: get_env(opts, :conn_max_requests), handler_fullsweep_after: get_env(opts, :handler_fullsweep_after), - process_spawn_opts: get_env(opts, :process_spawn_opts) + process_spawn_opts: get_env(opts, :process_spawn_opts), + consumer_gc_heap_threshold: get_env(opts, :consumer_gc_heap_threshold) ], manual_table_publishing?: get_env(opts, :manual_table_publishing?), shape_db_opts: [ diff --git a/packages/sync-service/test/electric/config_test.exs b/packages/sync-service/test/electric/config_test.exs index 437b82605d..d5790eaea2 100644 --- a/packages/sync-service/test/electric/config_test.exs +++ b/packages/sync-service/test/electric/config_test.exs @@ -130,4 +130,43 @@ defmodule Electric.ConfigTest do end end end + + describe "tweaks propagation" do + setup do + initial_config = Application.get_all_env(:electric) + + for {key, _} <- initial_config do + Application.delete_env(:electric, key) + end + + on_exit(fn -> + Application.put_all_env([{:electric, initial_config}]) + end) + + [initial_config: initial_config] + end + + test "consumer_gc_heap_threshold opt is threaded into tweaks", ctx do + threshold = 209_715_200 + + config = + Electric.Application.configuration( + Keyword.merge( + Keyword.take(ctx.initial_config, [:replication_connection_opts]), + consumer_gc_heap_threshold: threshold + ) + ) + + assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == threshold + end + + test "consumer_gc_heap_threshold defaults to nil in tweaks", ctx do + config = + Electric.Application.configuration( + Keyword.take(ctx.initial_config, [:replication_connection_opts]) + ) + + assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == nil + end + end end From 98b812bdba411b20b7c1150ea643376dba3c95bb Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:09:46 +0200 Subject: [PATCH 05/14] feat(consumer): adaptive GC after fragment when heap exceeds threshold --- .../lib/electric/shapes/consumer.ex | 27 +++- .../test/electric/shapes/consumer_test.exs | 131 ++++++++++++++++++ 2 files changed, 157 insertions(+), 1 deletion(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 06c47ca91c..73a7b61b7f 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -518,7 +518,32 @@ defmodule Electric.Shapes.Consumer do defp handle_event(%TransactionFragment{} = txn_fragment, state) do Logger.debug(fn -> "Txn fragment received in Shapes.Consumer: #{inspect(txn_fragment)}" end) - handle_txn_fragment(txn_fragment, state) + + txn_fragment + |> handle_txn_fragment(state) + |> maybe_garbage_collect() + end + + defp maybe_garbage_collect(%State{stack_id: stack_id} = state) do + threshold = Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) + + if not is_nil(threshold) do + {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) + + if over_heap_threshold?(heap_words, threshold) do + :erlang.garbage_collect() + end + end + + state + end + + @doc false + # heap_words: process total_heap_size in words; threshold_bytes: configured byte threshold (or nil) + def over_heap_threshold?(_heap_words, nil), do: false + + def over_heap_threshold?(heap_words, threshold_bytes) when is_integer(threshold_bytes) do + heap_words * :erlang.system_info(:wordsize) > threshold_bytes end # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index a536bd87ba..266deef5e1 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2722,6 +2722,137 @@ defmodule Electric.Shapes.ConsumerTest do :ok end + describe "over_heap_threshold?/2" do + test "false when threshold is nil" do + refute Electric.Shapes.Consumer.over_heap_threshold?(1_000_000, nil) + end + + test "false when heap (words) is below threshold (bytes)" do + refute Electric.Shapes.Consumer.over_heap_threshold?(10, 1024) + end + + test "true when heap (words) exceeds threshold (bytes)" do + assert Electric.Shapes.Consumer.over_heap_threshold?(1000, 1) + end + + test "exactly-equal is not over threshold" do + wordsize = :erlang.system_info(:wordsize) + refute Electric.Shapes.Consumer.over_heap_threshold?(1, wordsize) + end + end + + describe "adaptive GC after fragment processing" do + @describetag :tmp_dir + + setup do + %{inspector: @base_inspector, pool: nil} + end + + setup [ + :with_registry, + :with_pure_file_storage, + :with_shape_status, + :with_lsn_tracker, + :with_log_chunking, + :with_persistent_kv, + :with_async_deleter, + :with_shape_cleaner, + :with_shape_log_collector, + :with_noop_publication_manager, + :with_status_monitor + ] + + setup(ctx) do + patch_snapshotter(fn parent, shape_handle, _shape, %{snapshot_fun: snapshot_fun} -> + pg_snapshot = {10, 11, [10]} + GenServer.cast(parent, {:pg_snapshot_known, shape_handle, pg_snapshot}) + GenServer.cast(parent, {:snapshot_started, shape_handle}) + snapshot_fun.([]) + end) + + Electric.StackConfig.put(ctx.stack_id, :shape_hibernate_after, 10_000) + :ok + end + + setup ctx do + %{consumer_supervisor: consumer_supervisor, shape_cache: shape_cache} = + Support.ComponentSetup.with_shape_cache(ctx) + + %{ + consumer_supervisor: consumer_supervisor, + shape_cache: shape_cache + } + end + + test "GC runs when heap exceeds tiny threshold", ctx do + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + + {:total_heap_size, heap_before} = :erlang.process_info(consumer_pid, :total_heap_size) + + # Inflate the consumer's heap by sending a large binary payload + large_binary = :binary.copy(<<0>>, 200_000) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Wait for the consumer to process the fragment + assert_receive {^ref, :new_changes, _}, @receive_timeout + + {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) + + # GC should have run (heap_after <= heap_before is the indicator), + # or at minimum the heap hasn't grown unboundedly relative to what we sent. + # Since threshold=1 forces GC on every fragment, the heap post-GC should + # be well below the inflated size. We verify the GC fired by checking that + # the post-fragment heap is not larger than the pre-fragment heap (i.e., GC ran). + assert heap_after <= heap_before * 2, + "Expected GC to reclaim heap after large fragment (before=#{heap_before} words, after=#{heap_after} words)" + end + + test "no GC by default (threshold=nil)", ctx do + # Ensure no threshold is set (default behaviour) + assert nil == Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + # Should process without error even when no GC threshold is configured + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + end + end + defp get_log_items_from_storage(offset, shape_storage) do Storage.get_log_stream(offset, shape_storage) |> Enum.map(&Jason.decode!/1) end From 5aa7fddc233e6e49f86ac58f96324a0816dde2f3 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:19:47 +0200 Subject: [PATCH 06/14] refactor(consumer): cleaner adaptive-GC nil path + payload-relative GC test + negative test --- .../lib/electric/shapes/consumer.ex | 19 ++++--- .../test/electric/shapes/consumer_test.exs | 54 ++++++++++++++++--- 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 73a7b61b7f..7d2f30240f 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -38,6 +38,7 @@ defmodule Electric.Shapes.Consumer do @default_snapshot_timeout 45_000 @stop_and_clean_timeout 30_000 @stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason() + @word_size :erlang.system_info(:wordsize) @type initialize_shape_opts() :: %{ :action => :create | :restore, @@ -525,17 +526,15 @@ defmodule Electric.Shapes.Consumer do end defp maybe_garbage_collect(%State{stack_id: stack_id} = state) do - threshold = Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) - - if not is_nil(threshold) do - {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) + case Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) do + nil -> + state - if over_heap_threshold?(heap_words, threshold) do - :erlang.garbage_collect() - end + threshold -> + {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) + if over_heap_threshold?(heap_words, threshold), do: :erlang.garbage_collect() + state end - - state end @doc false @@ -543,7 +542,7 @@ defmodule Electric.Shapes.Consumer do def over_heap_threshold?(_heap_words, nil), do: false def over_heap_threshold?(heap_words, threshold_bytes) when is_integer(threshold_bytes) do - heap_words * :erlang.system_info(:wordsize) > threshold_bytes + heap_words * @word_size > threshold_bytes end # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 266deef5e1..2262e212ac 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2796,7 +2796,46 @@ defmodule Electric.Shapes.ConsumerTest do xid = 11 lsn = Lsn.from_integer(10) - {:total_heap_size, heap_before} = :erlang.process_info(consumer_pid, :total_heap_size) + # Inflate the consumer's heap by sending a large binary payload + large_binary = :binary.copy(<<0>>, 200_000) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Wait for the consumer to process the fragment + assert_receive {^ref, :new_changes, _}, @receive_timeout + + {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) + + # threshold=1 forces a full GC after this fragment. Because the ~200 KB payload is + # transient garbage (not live state), the post-GC heap must be far below the payload + # size. Without GC the heap grows ~185x (observed), so this assertion fails loudly. + payload_words = div(200_000, :erlang.system_info(:wordsize)) + + assert heap_after < payload_words, + "heap_after (#{heap_after} words) should be far below payload (#{payload_words} words) after GC" + end + + test "GC does not run when threshold is very large", ctx do + # 1 GB threshold — the consumer heap will never reach this, so GC must NOT fire. + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1_000_000_000) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) # Inflate the consumer's heap by sending a large binary payload large_binary = :binary.copy(<<0>>, 200_000) @@ -2817,13 +2856,12 @@ defmodule Electric.Shapes.ConsumerTest do {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) - # GC should have run (heap_after <= heap_before is the indicator), - # or at minimum the heap hasn't grown unboundedly relative to what we sent. - # Since threshold=1 forces GC on every fragment, the heap post-GC should - # be well below the inflated size. We verify the GC fired by checking that - # the post-fragment heap is not larger than the pre-fragment heap (i.e., GC ran). - assert heap_after <= heap_before * 2, - "Expected GC to reclaim heap after large fragment (before=#{heap_before} words, after=#{heap_after} words)" + # GC was NOT triggered (threshold too high), so the heap still reflects + # the retained payload — it must be >= payload_words. + payload_words = div(200_000, :erlang.system_info(:wordsize)) + + assert heap_after >= payload_words, + "heap_after (#{heap_after} words) should be >= payload (#{payload_words} words) when GC is skipped" end test "no GC by default (threshold=nil)", ctx do From 5ae3307b227d1a630eabbe0a10bdd964bf2948e1 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:23:20 +0200 Subject: [PATCH 07/14] feat(consumer): add IEx helpers to set GC heap threshold per-stack and fleet-wide --- .../lib/electric/shapes/consumer.ex | 61 +++++++++++++++++++ .../test/electric/shapes/consumer_test.exs | 26 ++++++++ 2 files changed, 87 insertions(+) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 7d2f30240f..a55bab8139 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -111,6 +111,67 @@ defmodule Electric.Shapes.Consumer do ConsumerRegistry.whereis(stack_id, shape_handle) end + @doc """ + Set the adaptive-GC heap threshold (bytes, or nil to disable) for a single stack. + Takes effect immediately for that stack's consumers — safe to call from IEx. + """ + def set_gc_heap_threshold(stack_id, threshold_bytes) + when is_nil(threshold_bytes) or (is_integer(threshold_bytes) and threshold_bytes >= 0) do + Electric.StackConfig.put(stack_id, :consumer_gc_heap_threshold, threshold_bytes) + :ok + end + + @doc """ + Set the adaptive-GC heap threshold for every live stack on this node. + Returns `{:ok, number_of_stacks_updated}`. Pass nil to disable everywhere. + Intended for live experimentation from an IEx shell. + """ + def set_gc_heap_threshold_all_stacks(threshold_bytes) do + stack_ids = list_stack_ids() + + # Guard against a stack dying between enumeration and the put: if the + # StackConfig ETS table vanishes, StackConfig.put/3 raises ArgumentError. + # We skip such stale entries rather than crashing the operator call. + Enum.each(stack_ids, fn stack_id -> + try do + set_gc_heap_threshold(stack_id, threshold_bytes) + rescue + ArgumentError -> :ok + end + end) + + {:ok, length(stack_ids)} + end + + # Enumerate live stacks by scanning ETS tables whose names match the + # Electric.StackConfig table-name prefix ("Electric.StackConfig:"). + # This is the most direct approach: StackConfig creates one named ETS table per + # stack, so the set of live tables IS the set of live stacks. + # No first-class listing API exists in the codebase (grep confirmed). + # A race (stack dies mid-iteration) is harmless: StackConfig.put/3 on a vanished + # table would raise ArgumentError, which we rescue and skip. + defp list_stack_ids do + prefix = "#{inspect(Electric.StackConfig)}:" + prefix_len = byte_size(prefix) + + :ets.all() + |> Enum.flat_map(fn tab -> + case :ets.info(tab, :name) do + :undefined -> + [] + + name -> + name_str = Atom.to_string(name) + + if String.starts_with?(name_str, prefix) do + [binary_part(name_str, prefix_len, byte_size(name_str) - prefix_len)] + else + [] + end + end + end) + end + def start_link(%{stack_id: stack_id, shape_handle: shape_handle} = _config) do GenServer.start_link(__MODULE__, %{stack_id: stack_id, shape_handle: shape_handle}, name: name(stack_id, shape_handle), diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 2262e212ac..2b10e3bdb8 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2891,6 +2891,32 @@ defmodule Electric.Shapes.ConsumerTest do end end + describe "set_gc_heap_threshold helpers" do + # with_stack_id_from_test (line 87) already starts ProcessRegistry + StackConfig + # for ctx.stack_id — no heavier setup is needed for these pure-config tests. + + test "set_gc_heap_threshold/2 writes the value into StackConfig", ctx do + assert :ok = Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, 2_000_000) + + assert 2_000_000 == + Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + + test "set_gc_heap_threshold/2 accepts nil to disable", ctx do + Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, 123) + assert :ok = Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, nil) + assert nil == Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + + test "set_gc_heap_threshold_all_stacks/1 sets the live stack", ctx do + assert {:ok, n} = Electric.Shapes.Consumer.set_gc_heap_threshold_all_stacks(3_000_000) + assert n >= 1 + + assert 3_000_000 == + Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + end + defp get_log_items_from_storage(offset, shape_storage) do Storage.get_log_stream(offset, shape_storage) |> Enum.map(&Jason.decode!/1) end From 074956c200caed12e18238104ea69488580fcbac Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:29:11 +0200 Subject: [PATCH 08/14] fix(consumer): count only successfully-updated stacks in all-stacks helper --- .../lib/electric/shapes/consumer.ex | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index a55bab8139..3b738569a1 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -132,15 +132,19 @@ defmodule Electric.Shapes.Consumer do # Guard against a stack dying between enumeration and the put: if the # StackConfig ETS table vanishes, StackConfig.put/3 raises ArgumentError. # We skip such stale entries rather than crashing the operator call. - Enum.each(stack_ids, fn stack_id -> - try do - set_gc_heap_threshold(stack_id, threshold_bytes) - rescue - ArgumentError -> :ok - end - end) + # Only successfully-written stacks are counted so the returned value + # reflects reality even when stacks die mid-iteration. + count = + Enum.reduce(stack_ids, 0, fn stack_id, acc -> + try do + set_gc_heap_threshold(stack_id, threshold_bytes) + acc + 1 + rescue + ArgumentError -> acc + end + end) - {:ok, length(stack_ids)} + {:ok, count} end # Enumerate live stacks by scanning ETS tables whose names match the From 261bf217bb27aeaee8ea31f64c2f7c8152dfbfe4 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 14:55:19 +0200 Subject: [PATCH 09/14] fix(consumer): apply adaptive GC during buffered-fragment drain + changeset --- .changeset/consumer-heap-gc.md | 5 ++ .../lib/electric/shapes/consumer.ex | 5 +- .../test/electric/shapes/consumer_test.exs | 73 +++++++++++++++++++ 3 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 .changeset/consumer-heap-gc.md diff --git a/.changeset/consumer-heap-gc.md b/.changeset/consumer-heap-gc.md new file mode 100644 index 0000000000..b95e1a28bc --- /dev/null +++ b/.changeset/consumer-heap-gc.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Bound Shape.Consumer heap growth: make the consumer family's process spawn options (incl. `fullsweep_after`) configurable per process via `ELECTRIC_PROCESS_SPAWN_OPTS`, and add an opt-in adaptive GC that runs after a transaction fragment when the consumer's heap exceeds the runtime-tunable `ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD` (off by default). diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 3b738569a1..580ad7320d 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -897,7 +897,10 @@ defmodule Electric.Shapes.Consumer do {txn_fragments, state} = State.pop_buffered(state) Enum.reduce_while(txn_fragments, state, fn txn_fragment, state -> - state = handle_txn_fragment(txn_fragment, state) + state = + txn_fragment + |> handle_txn_fragment(state) + |> maybe_garbage_collect() if state.terminating? do {:halt, state} diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 2b10e3bdb8..97eabb6b82 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2763,7 +2763,16 @@ defmodule Electric.Shapes.ConsumerTest do ] setup(ctx) do + delay_snapshot_creation? = Map.get(ctx, :delay_snapshot_creation?) + test_pid = self() + patch_snapshotter(fn parent, shape_handle, _shape, %{snapshot_fun: snapshot_fun} -> + if delay_snapshot_creation? do + receive do + {^test_pid, :resume} -> :ok + end + end + pg_snapshot = {10, 11, [10]} GenServer.cast(parent, {:pg_snapshot_known, shape_handle, pg_snapshot}) GenServer.cast(parent, {:snapshot_started, shape_handle}) @@ -2889,6 +2898,70 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) assert_receive {^ref, :new_changes, _}, @receive_timeout end + + @tag delay_snapshot_creation?: true + test "GC runs during buffered-fragment drain when heap exceeds threshold", ctx do + # threshold=1 forces GC after every fragment processed during the buffer drain. + # The consumer starts with buffering?=true; fragments sent before pg_snapshot_known + # land in the buffer. When we unblock the snapshotter it fires pg_snapshot_known + # which triggers :consume_buffer → process_buffered_txn_fragments (our new GC call). + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + + # The snapshotter is now running but blocked on `receive {^test_pid, :resume}`. + assert_receive {:snapshot, ^shape_handle, snapshotter_pid} + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + + # Trace GC events on the consumer to count full GCs fired during the drain. + :erlang.trace(consumer_pid, true, [:garbage_collection]) + + # Send a large-payload fragment while buffering?=true — it goes into the buffer. + large_binary = :binary.copy(<<0>>, 200_000) + xid = 11 + lsn = Lsn.from_integer(10) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Unblock the snapshotter: fires pg_snapshot_known → :consume_buffer → + # process_buffered_txn_fragments where our new maybe_garbage_collect() call fires. + send(snapshotter_pid, {self(), :resume}) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + assert_receive {^ref, :new_changes, _}, @receive_timeout + + :erlang.trace(consumer_pid, false, [:garbage_collection]) + + # Count how many full GC (garbage_collect) trace messages arrived. + # :garbage_collection traces emit {:trace, pid, :gc_major_start, info} / + # :gc_major_end pairs (one per full :erlang.garbage_collect() call). + gc_events = + Stream.repeatedly(fn -> + receive do + {:trace, ^consumer_pid, :gc_major_start, _} -> :gc + {:trace, ^consumer_pid, :gc_minor_start, _} -> :minor + {:trace, ^consumer_pid, :gc_major_end, _} -> :skip + {:trace, ^consumer_pid, :gc_minor_end, _} -> :skip + after + 0 -> :done + end + end) + |> Stream.take_while(&(&1 != :done)) + |> Enum.count(&(&1 == :gc)) + + assert gc_events >= 1, + "expected at least one full GC during buffered-fragment drain, got #{gc_events}" + end end describe "set_gc_heap_threshold helpers" do From 8a626f7d443a6b4ea3a54dc8b0273c6be716ad9a Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Tue, 9 Jun 2026 15:18:49 +0200 Subject: [PATCH 10/14] feat(consumer): add hysteresis + critical-path docs for adaptive GC --- .../lib/electric/shapes/consumer.ex | 73 ++++++++++++++++++- .../lib/electric/shapes/consumer/state.ex | 5 +- .../test/electric/shapes/consumer_test.exs | 38 ++++++++++ 3 files changed, 113 insertions(+), 3 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 580ad7320d..f0b9ecfd21 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -40,6 +40,14 @@ defmodule Electric.Shapes.Consumer do @stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason() @word_size :erlang.system_info(:wordsize) + # Minimum wall-clock interval (ms) between consumer-forced full GC sweeps. + # Prevents GC-thrashing on the replication critical path: the ShapeLogCollector + # blocks until every consumer replies, so a forced GC on every fragment (e.g. + # during a buffered-fragment drain) would add publish latency proportional to + # the number of fragments. Hysteresis caps the worst-case frequency to at most + # one forced sweep per @gc_min_interval_ms regardless of fragment rate. + @gc_min_interval_ms 1_000 + @type initialize_shape_opts() :: %{ :action => :create | :restore, optional(:otel_ctx) => OpenTelemetry.otel_ctx() | nil, @@ -114,7 +122,16 @@ defmodule Electric.Shapes.Consumer do @doc """ Set the adaptive-GC heap threshold (bytes, or nil to disable) for a single stack. Takes effect immediately for that stack's consumers — safe to call from IEx. + + **Critical-path note**: this GC runs synchronously on the replication path — + the ShapeLogCollector blocks until every consumer replies. Prefer a conservative + (high) threshold to minimise added publish latency. The per-process + `fullsweep_after` spawn-opt (configured via `ELECTRIC_PROCESS_SPAWN_OPTS`) is + the lower-risk lever for steady-state heap bounding; this adaptive GC is a + targeted, runtime-tunable backstop. Forced-GC frequency is capped to at most + once per `@gc_min_interval_ms` (see `should_force_gc?/5`). """ + @spec set_gc_heap_threshold(Electric.stack_id(), non_neg_integer() | nil) :: :ok def set_gc_heap_threshold(stack_id, threshold_bytes) when is_nil(threshold_bytes) or (is_integer(threshold_bytes) and threshold_bytes >= 0) do Electric.StackConfig.put(stack_id, :consumer_gc_heap_threshold, threshold_bytes) @@ -126,6 +143,7 @@ defmodule Electric.Shapes.Consumer do Returns `{:ok, number_of_stacks_updated}`. Pass nil to disable everywhere. Intended for live experimentation from an IEx shell. """ + @spec set_gc_heap_threshold_all_stacks(non_neg_integer() | nil) :: {:ok, non_neg_integer()} def set_gc_heap_threshold_all_stacks(threshold_bytes) do stack_ids = list_stack_ids() @@ -590,26 +608,77 @@ defmodule Electric.Shapes.Consumer do |> maybe_garbage_collect() end + # NOTE: this runs synchronously on the replication critical path — the + # ShapeLogCollector blocks until every consumer replies. A forced + # :erlang.garbage_collect() can add measurable publish latency, especially + # during a buffered-fragment drain where maybe_garbage_collect/1 is called + # for every queued fragment in a tight loop. + # + # Two safeguards limit the impact: + # 1. The nil fast-path exits immediately when no threshold is configured. + # 2. Hysteresis (@gc_min_interval_ms) prevents back-to-back full sweeps even + # when the consumer sits just above the threshold across many fragments. + # + # Operators should prefer a CONSERVATIVE (high) threshold. For steady-state + # heap bounding the per-process `fullsweep_after` spawn-opt (set via + # ELECTRIC_PROCESS_SPAWN_OPTS) is a lower-risk alternative; this adaptive GC + # is a targeted, runtime-tunable backstop. defp maybe_garbage_collect(%State{stack_id: stack_id} = state) do case Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) do nil -> + # Fast path: adaptive GC is disabled — skip all process_info/time calls. state threshold -> {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) - if over_heap_threshold?(heap_words, threshold), do: :erlang.garbage_collect() - state + now = System.monotonic_time(:millisecond) + + if should_force_gc?(heap_words, threshold, state.last_forced_gc_at, now) do + :erlang.garbage_collect() + %{state | last_forced_gc_at: now} + else + state + end end end @doc false # heap_words: process total_heap_size in words; threshold_bytes: configured byte threshold (or nil) + @spec over_heap_threshold?(non_neg_integer(), non_neg_integer() | nil) :: boolean() def over_heap_threshold?(_heap_words, nil), do: false def over_heap_threshold?(heap_words, threshold_bytes) when is_integer(threshold_bytes) do heap_words * @word_size > threshold_bytes end + @doc false + # Decide whether to force a full GC sweep: heap must be over the threshold AND + # at least @gc_min_interval_ms must have elapsed since the last forced GC. + # last_gc_at / now_ms are monotonic milliseconds; last_gc_at is nil if this + # consumer has never forced a GC (always fire on first over-threshold event). + # Passing explicit min_interval_ms enables deterministic unit tests. + @spec should_force_gc?( + non_neg_integer(), + non_neg_integer() | nil, + integer() | nil, + integer(), + non_neg_integer() + ) :: boolean() + def should_force_gc?( + heap_words, + threshold_bytes, + last_gc_at, + now_ms, + min_interval_ms \\ @gc_min_interval_ms + ) + + def should_force_gc?(_heap_words, nil, _last_gc_at, _now_ms, _min_interval_ms), do: false + + def should_force_gc?(heap_words, threshold_bytes, last_gc_at, now_ms, min_interval_ms) do + over_heap_threshold?(heap_words, threshold_bytes) and + (is_nil(last_gc_at) or now_ms - last_gc_at >= min_interval_ms) + end + # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). # In this phase we have to buffer incoming txn fragments because we can't yet decide what to # do with the transaction: skip it or write it to the shape log. diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index 799cf0d8a6..ec582c305b 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -47,7 +47,10 @@ defmodule Electric.Shapes.Consumer.State do # as any message arrives (activity), so at most one is ever live at a time. suspend_timer: nil, # How long after hibernation to suspend (in ms) - suspend_after: nil + suspend_after: nil, + # Monotonic millisecond timestamp of the last consumer-forced GC (nil if never). + # Used by hysteresis logic in maybe_garbage_collect/1 to cap forced-GC frequency. + last_forced_gc_at: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 97eabb6b82..1fda472eba 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2741,6 +2741,44 @@ defmodule Electric.Shapes.ConsumerTest do end end + describe "should_force_gc?/5" do + # All tests pass explicit now_ms / last_gc_at / min_interval_ms so they are + # fully deterministic and do not depend on wall-clock time. + + test "false when threshold is nil (adaptive GC disabled)" do + refute Electric.Shapes.Consumer.should_force_gc?(1_000_000, nil, nil, 5_000, 1_000) + end + + test "true when heap over threshold and consumer has never forced a GC (last_gc_at nil)" do + # 1 000 words * 8 bytes/word = 8 000 bytes > threshold of 1 byte + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, nil, 5_000, 1_000) + end + + test "false when heap over threshold but interval has not elapsed" do + # last_gc_at=4_500, now=5_000 → delta=500 < min_interval=1_000 → no GC + refute Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 4_500, 5_000, 1_000) + end + + test "true when heap over threshold and interval has elapsed" do + # last_gc_at=3_000, now=5_000 → delta=2_000 >= min_interval=1_000 → GC + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 3_000, 5_000, 1_000) + end + + test "true at exactly the min interval boundary" do + # last_gc_at=4_000, now=5_000 → delta=1_000 == min_interval=1_000 → GC + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 4_000, 5_000, 1_000) + end + + test "false when heap is under threshold regardless of timing" do + # heap_words=1 * wordsize (8) = 8 bytes; threshold=1_000 bytes → under + refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, nil, 5_000, 1_000) + end + + test "false when heap is under threshold even if interval would have elapsed" do + refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, 0, 5_000, 1_000) + end + end + describe "adaptive GC after fragment processing" do @describetag :tmp_dir From f8e75789160c5191b82820dde91387fd6d88a441 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 15 Jun 2026 14:17:46 +0200 Subject: [PATCH 11/14] refactor(consumer): cache GC threshold at startup, work in bytes Cache the adaptive-GC heap threshold in consumer state once at init instead of doing a StackConfig ETS lookup on every txn fragment, and convert total_heap_size to bytes at the query site so the "words" jargon no longer threads through over_heap_threshold?/should_force_gc?. set_gc_heap_threshold/2 now only affects consumers started after the call; docstring updated accordingly. Co-Authored-By: Claude Opus 4.8 (1M context) refactor(consumer): populate cached GC threshold in State.new/2 Move the consumer_gc_heap_threshold lookup into State.new/2 alongside the existing hibernate_after/suspend_after lookups, instead of patching the field onto the struct in Consumer.init/1. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../lib/electric/shapes/consumer.ex | 125 ++++-------------- .../lib/electric/shapes/consumer/state.ex | 9 +- .../test/electric/shapes/consumer_test.exs | 19 +-- 3 files changed, 36 insertions(+), 117 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index f0b9ecfd21..d1438ce7f4 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -121,15 +121,10 @@ defmodule Electric.Shapes.Consumer do @doc """ Set the adaptive-GC heap threshold (bytes, or nil to disable) for a single stack. - Takes effect immediately for that stack's consumers — safe to call from IEx. - - **Critical-path note**: this GC runs synchronously on the replication path — - the ShapeLogCollector blocks until every consumer replies. Prefer a conservative - (high) threshold to minimise added publish latency. The per-process - `fullsweep_after` spawn-opt (configured via `ELECTRIC_PROCESS_SPAWN_OPTS`) is - the lower-risk lever for steady-state heap bounding; this adaptive GC is a - targeted, runtime-tunable backstop. Forced-GC frequency is capped to at most - once per `@gc_min_interval_ms` (see `should_force_gc?/5`). + + Consumers cache this value at startup (see `State.new/2`), so the new threshold only + applies to consumers started after this call — already-running consumers keep the + threshold they read when they booted. Safe to call from IEx. """ @spec set_gc_heap_threshold(Electric.stack_id(), non_neg_integer() | nil) :: :ok def set_gc_heap_threshold(stack_id, threshold_bytes) @@ -138,62 +133,6 @@ defmodule Electric.Shapes.Consumer do :ok end - @doc """ - Set the adaptive-GC heap threshold for every live stack on this node. - Returns `{:ok, number_of_stacks_updated}`. Pass nil to disable everywhere. - Intended for live experimentation from an IEx shell. - """ - @spec set_gc_heap_threshold_all_stacks(non_neg_integer() | nil) :: {:ok, non_neg_integer()} - def set_gc_heap_threshold_all_stacks(threshold_bytes) do - stack_ids = list_stack_ids() - - # Guard against a stack dying between enumeration and the put: if the - # StackConfig ETS table vanishes, StackConfig.put/3 raises ArgumentError. - # We skip such stale entries rather than crashing the operator call. - # Only successfully-written stacks are counted so the returned value - # reflects reality even when stacks die mid-iteration. - count = - Enum.reduce(stack_ids, 0, fn stack_id, acc -> - try do - set_gc_heap_threshold(stack_id, threshold_bytes) - acc + 1 - rescue - ArgumentError -> acc - end - end) - - {:ok, count} - end - - # Enumerate live stacks by scanning ETS tables whose names match the - # Electric.StackConfig table-name prefix ("Electric.StackConfig:"). - # This is the most direct approach: StackConfig creates one named ETS table per - # stack, so the set of live tables IS the set of live stacks. - # No first-class listing API exists in the codebase (grep confirmed). - # A race (stack dies mid-iteration) is harmless: StackConfig.put/3 on a vanished - # table would raise ArgumentError, which we rescue and skip. - defp list_stack_ids do - prefix = "#{inspect(Electric.StackConfig)}:" - prefix_len = byte_size(prefix) - - :ets.all() - |> Enum.flat_map(fn tab -> - case :ets.info(tab, :name) do - :undefined -> - [] - - name -> - name_str = Atom.to_string(name) - - if String.starts_with?(name_str, prefix) do - [binary_part(name_str, prefix_len, byte_size(name_str) - prefix_len)] - else - [] - end - end - end) - end - def start_link(%{stack_id: stack_id, shape_handle: shape_handle} = _config) do GenServer.start_link(__MODULE__, %{stack_id: stack_id, shape_handle: shape_handle}, name: name(stack_id, shape_handle), @@ -608,47 +547,29 @@ defmodule Electric.Shapes.Consumer do |> maybe_garbage_collect() end - # NOTE: this runs synchronously on the replication critical path — the - # ShapeLogCollector blocks until every consumer replies. A forced - # :erlang.garbage_collect() can add measurable publish latency, especially - # during a buffered-fragment drain where maybe_garbage_collect/1 is called - # for every queued fragment in a tight loop. - # - # Two safeguards limit the impact: - # 1. The nil fast-path exits immediately when no threshold is configured. - # 2. Hysteresis (@gc_min_interval_ms) prevents back-to-back full sweeps even - # when the consumer sits just above the threshold across many fragments. - # - # Operators should prefer a CONSERVATIVE (high) threshold. For steady-state - # heap bounding the per-process `fullsweep_after` spawn-opt (set via - # ELECTRIC_PROCESS_SPAWN_OPTS) is a lower-risk alternative; this adaptive GC - # is a targeted, runtime-tunable backstop. - defp maybe_garbage_collect(%State{stack_id: stack_id} = state) do - case Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) do - nil -> - # Fast path: adaptive GC is disabled — skip all process_info/time calls. - state + # Fast path: adaptive GC is disabled — skip all process_info/time calls. + defp maybe_garbage_collect(%State{gc_heap_threshold: nil} = state), do: state - threshold -> - {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) - now = System.monotonic_time(:millisecond) + defp maybe_garbage_collect(%State{gc_heap_threshold: threshold_bytes} = state) do + {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) + heap_bytes = heap_words * @word_size + now = System.monotonic_time(:millisecond) - if should_force_gc?(heap_words, threshold, state.last_forced_gc_at, now) do - :erlang.garbage_collect() - %{state | last_forced_gc_at: now} - else - state - end + if should_force_gc?(heap_bytes, threshold_bytes, state.last_forced_gc_at, now) do + :erlang.garbage_collect() + %{state | last_forced_gc_at: now} + else + state end end @doc false - # heap_words: process total_heap_size in words; threshold_bytes: configured byte threshold (or nil) + # heap_bytes: process total_heap_size in bytes; threshold_bytes: configured byte threshold (or nil) @spec over_heap_threshold?(non_neg_integer(), non_neg_integer() | nil) :: boolean() - def over_heap_threshold?(_heap_words, nil), do: false + def over_heap_threshold?(_heap_bytes, nil), do: false - def over_heap_threshold?(heap_words, threshold_bytes) when is_integer(threshold_bytes) do - heap_words * @word_size > threshold_bytes + def over_heap_threshold?(heap_bytes, threshold_bytes) when is_integer(threshold_bytes) do + heap_bytes > threshold_bytes end @doc false @@ -665,17 +586,17 @@ defmodule Electric.Shapes.Consumer do non_neg_integer() ) :: boolean() def should_force_gc?( - heap_words, + heap_bytes, threshold_bytes, last_gc_at, now_ms, min_interval_ms \\ @gc_min_interval_ms ) - def should_force_gc?(_heap_words, nil, _last_gc_at, _now_ms, _min_interval_ms), do: false + def should_force_gc?(_heap_bytes, nil, _last_gc_at, _now_ms, _min_interval_ms), do: false - def should_force_gc?(heap_words, threshold_bytes, last_gc_at, now_ms, min_interval_ms) do - over_heap_threshold?(heap_words, threshold_bytes) and + def should_force_gc?(heap_bytes, threshold_bytes, last_gc_at, now_ms, min_interval_ms) do + over_heap_threshold?(heap_bytes, threshold_bytes) and (is_nil(last_gc_at) or now_ms - last_gc_at >= min_interval_ms) end diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index ec582c305b..6009d87aa3 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -50,7 +50,13 @@ defmodule Electric.Shapes.Consumer.State do suspend_after: nil, # Monotonic millisecond timestamp of the last consumer-forced GC (nil if never). # Used by hysteresis logic in maybe_garbage_collect/1 to cap forced-GC frequency. - last_forced_gc_at: nil + last_forced_gc_at: nil, + # Adaptive-GC heap threshold (bytes) cached at consumer startup, or nil when + # disabled. Read once from StackConfig in new/2 so the per-fragment GC check on + # the replication critical path stays a plain struct field access instead of an + # ETS lookup. Changing the StackConfig value only affects consumers started + # afterwards (see Consumer.set_gc_heap_threshold/2). + gc_heap_threshold: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() @@ -110,6 +116,7 @@ defmodule Electric.Shapes.Consumer.State do :shape_suspend_after, Electric.Config.default(:shape_suspend_after) ), + gc_heap_threshold: Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil), buffering?: true } end diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 1fda472eba..63c75ddfce 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2727,17 +2727,16 @@ defmodule Electric.Shapes.ConsumerTest do refute Electric.Shapes.Consumer.over_heap_threshold?(1_000_000, nil) end - test "false when heap (words) is below threshold (bytes)" do + test "false when heap is below threshold" do refute Electric.Shapes.Consumer.over_heap_threshold?(10, 1024) end - test "true when heap (words) exceeds threshold (bytes)" do + test "true when heap exceeds threshold" do assert Electric.Shapes.Consumer.over_heap_threshold?(1000, 1) end test "exactly-equal is not over threshold" do - wordsize = :erlang.system_info(:wordsize) - refute Electric.Shapes.Consumer.over_heap_threshold?(1, wordsize) + refute Electric.Shapes.Consumer.over_heap_threshold?(1024, 1024) end end @@ -2750,7 +2749,7 @@ defmodule Electric.Shapes.ConsumerTest do end test "true when heap over threshold and consumer has never forced a GC (last_gc_at nil)" do - # 1 000 words * 8 bytes/word = 8 000 bytes > threshold of 1 byte + # 1_000 bytes > threshold of 1 byte assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, nil, 5_000, 1_000) end @@ -2770,7 +2769,7 @@ defmodule Electric.Shapes.ConsumerTest do end test "false when heap is under threshold regardless of timing" do - # heap_words=1 * wordsize (8) = 8 bytes; threshold=1_000 bytes → under + # heap=1 byte; threshold=1_000 bytes → under refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, nil, 5_000, 1_000) end @@ -3018,14 +3017,6 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, nil) assert nil == Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) end - - test "set_gc_heap_threshold_all_stacks/1 sets the live stack", ctx do - assert {:ok, n} = Electric.Shapes.Consumer.set_gc_heap_threshold_all_stacks(3_000_000) - assert n >= 1 - - assert 3_000_000 == - Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) - end end defp get_log_items_from_storage(offset, shape_storage) do From c78ba0c4ba8c13456768048893dbf9907e5ac566 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 15 Jun 2026 14:25:10 +0200 Subject: [PATCH 12/14] refactor(consumer): inline over_heap_threshold? into should_force_gc? The helper had a single caller now that the threshold lives in bytes, so fold the comparison into should_force_gc?/5 and drop the standalone function and its tests (boundary coverage moved into should_force_gc?). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../lib/electric/shapes/consumer.ex | 21 +++++------------- .../test/electric/shapes/consumer_test.exs | 22 ++++--------------- 2 files changed, 10 insertions(+), 33 deletions(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index d1438ce7f4..7e58b7b904 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -564,20 +564,11 @@ defmodule Electric.Shapes.Consumer do end @doc false - # heap_bytes: process total_heap_size in bytes; threshold_bytes: configured byte threshold (or nil) - @spec over_heap_threshold?(non_neg_integer(), non_neg_integer() | nil) :: boolean() - def over_heap_threshold?(_heap_bytes, nil), do: false - - def over_heap_threshold?(heap_bytes, threshold_bytes) when is_integer(threshold_bytes) do - heap_bytes > threshold_bytes - end - - @doc false - # Decide whether to force a full GC sweep: heap must be over the threshold AND - # at least @gc_min_interval_ms must have elapsed since the last forced GC. - # last_gc_at / now_ms are monotonic milliseconds; last_gc_at is nil if this - # consumer has never forced a GC (always fire on first over-threshold event). - # Passing explicit min_interval_ms enables deterministic unit tests. + # Decide whether to force a full GC sweep: heap (bytes) must be over the + # threshold (bytes) AND at least @gc_min_interval_ms must have elapsed since the + # last forced GC. last_gc_at / now_ms are monotonic milliseconds; last_gc_at is + # nil if this consumer has never forced a GC (always fire on first over-threshold + # event). Passing explicit min_interval_ms enables deterministic unit tests. @spec should_force_gc?( non_neg_integer(), non_neg_integer() | nil, @@ -596,7 +587,7 @@ defmodule Electric.Shapes.Consumer do def should_force_gc?(_heap_bytes, nil, _last_gc_at, _now_ms, _min_interval_ms), do: false def should_force_gc?(heap_bytes, threshold_bytes, last_gc_at, now_ms, min_interval_ms) do - over_heap_threshold?(heap_bytes, threshold_bytes) and + heap_bytes > threshold_bytes and (is_nil(last_gc_at) or now_ms - last_gc_at >= min_interval_ms) end diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 63c75ddfce..8f98694a4f 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2722,24 +2722,6 @@ defmodule Electric.Shapes.ConsumerTest do :ok end - describe "over_heap_threshold?/2" do - test "false when threshold is nil" do - refute Electric.Shapes.Consumer.over_heap_threshold?(1_000_000, nil) - end - - test "false when heap is below threshold" do - refute Electric.Shapes.Consumer.over_heap_threshold?(10, 1024) - end - - test "true when heap exceeds threshold" do - assert Electric.Shapes.Consumer.over_heap_threshold?(1000, 1) - end - - test "exactly-equal is not over threshold" do - refute Electric.Shapes.Consumer.over_heap_threshold?(1024, 1024) - end - end - describe "should_force_gc?/5" do # All tests pass explicit now_ms / last_gc_at / min_interval_ms so they are # fully deterministic and do not depend on wall-clock time. @@ -2776,6 +2758,10 @@ defmodule Electric.Shapes.ConsumerTest do test "false when heap is under threshold even if interval would have elapsed" do refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, 0, 5_000, 1_000) end + + test "false when heap exactly equals threshold (strict comparison)" do + refute Electric.Shapes.Consumer.should_force_gc?(1_000, 1_000, nil, 5_000, 1_000) + end end describe "adaptive GC after fragment processing" do From b06599795a6974079bf71c616c79b31921e72564 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 15 Jun 2026 14:44:57 +0200 Subject: [PATCH 13/14] feat(consumer): defer adaptive GC off the reply path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Run the adaptive GC check via {:continue, :maybe_gc} after replying to the ShapeLogCollector, so a forced full sweep no longer blocks the SLC's synchronous publish (design doc §6). The continue re-establishes the hibernate_after timeout the {:continue, …} return cannot carry. process_buffered_txn_fragments no longer GCs per fragment; the :consume_buffer continue defers a single GC once the whole buffer is drained. Switch the GC integration tests to assert on State.last_forced_gc_at (the direct signal of our forced-GC decision) instead of heap-size magnitude or GC-event traces, which were fragile against off-heap binaries and natural BEAM major GCs. Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/sync-service/lib/electric/config.ex | 3 +- .../lib/electric/shapes/consumer.ex | 37 +++++---- .../lib/electric/shapes/consumer/state.ex | 5 +- .../test/electric/shapes/consumer_test.exs | 77 ++++++------------- 4 files changed, 49 insertions(+), 73 deletions(-) diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index 15f2082793..c6d77a830c 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -115,8 +115,7 @@ defmodule Electric.Config do # e.g. %{shape_log_collector: [min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024]} process_spawn_opts: %{}, # Heap-size threshold (in BYTES) above which a consumer runs :erlang.garbage_collect() - # after processing a transaction fragment. nil disables adaptive GC. Looked up at - # runtime via StackConfig so it can be changed from a live IEx shell. + # after processing a transaction fragment. consumer_gc_heap_threshold: nil, ## Misc process_registry_partitions: &Electric.Config.Defaults.process_registry_partitions/0, diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 7e58b7b904..e5b15a1f67 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -40,12 +40,9 @@ defmodule Electric.Shapes.Consumer do @stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason() @word_size :erlang.system_info(:wordsize) - # Minimum wall-clock interval (ms) between consumer-forced full GC sweeps. - # Prevents GC-thrashing on the replication critical path: the ShapeLogCollector - # blocks until every consumer replies, so a forced GC on every fragment (e.g. - # during a buffered-fragment drain) would add publish latency proportional to - # the number of fragments. Hysteresis caps the worst-case frequency to at most - # one forced sweep per @gc_min_interval_ms regardless of fragment rate. + # Minimum wall-clock interval (ms) between consumer-forced full GC sweeps. Caps how much CPU + # a busy consumer spends on full sweeps to at most one per @gc_min_interval_ms regardless of + # fragment rate. @gc_min_interval_ms 1_000 @type initialize_shape_opts() :: %{ @@ -194,10 +191,19 @@ defmodule Electric.Shapes.Consumer do if state.terminating? do {:noreply, state, {:continue, :stop_and_clean}} else - {:noreply, state, state.hibernate_after} + {:noreply, state, {:continue, :maybe_gc}} end end + # Deferred adaptive GC. Reached via {:continue, :maybe_gc} after a fragment (or a + # full buffer drain) has been processed, so a forced full sweep runs off the + # reply/critical path rather than blocking the ShapeLogCollector. Re-establishes + # the hibernate_after timeout that the {:continue, …} return could not carry. + def handle_continue(:maybe_gc, state) do + state = maybe_garbage_collect(state) + {:noreply, state, state.hibernate_after} + end + @impl GenServer # Any incoming message counts as activity: cancel the pending suspend timer (if # any) and recurse for actual handling of the call. @@ -228,7 +234,7 @@ defmodule Electric.Shapes.Consumer do {:reply, :ok, state, {:continue, :stop_and_clean}} state -> - {:reply, :ok, state, state.hibernate_after} + {:reply, :ok, state, {:continue, :maybe_gc}} end end @@ -542,11 +548,15 @@ defmodule Electric.Shapes.Consumer do defp handle_event(%TransactionFragment{} = txn_fragment, state) do Logger.debug(fn -> "Txn fragment received in Shapes.Consumer: #{inspect(txn_fragment)}" end) - txn_fragment - |> handle_txn_fragment(state) - |> maybe_garbage_collect() + handle_txn_fragment(txn_fragment, state) end + # Adaptive GC check. Always invoked from a {:continue, :maybe_gc} handler so the + # forced full sweep runs off the ShapeLogCollector's synchronous publish path: the + # SLC has already received :ok by the time this runs. Because the SLC publishes + # fragments to a given consumer sequentially, the continue typically completes + # before the next fragment arrives, so it does not block steady-state throughput. + # Fast path: adaptive GC is disabled — skip all process_info/time calls. defp maybe_garbage_collect(%State{gc_heap_threshold: nil} = state), do: state @@ -878,10 +888,7 @@ defmodule Electric.Shapes.Consumer do {txn_fragments, state} = State.pop_buffered(state) Enum.reduce_while(txn_fragments, state, fn txn_fragment, state -> - state = - txn_fragment - |> handle_txn_fragment(state) - |> maybe_garbage_collect() + state = handle_txn_fragment(txn_fragment, state) if state.terminating? do {:halt, state} diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index 6009d87aa3..c3aa0f460c 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -52,10 +52,7 @@ defmodule Electric.Shapes.Consumer.State do # Used by hysteresis logic in maybe_garbage_collect/1 to cap forced-GC frequency. last_forced_gc_at: nil, # Adaptive-GC heap threshold (bytes) cached at consumer startup, or nil when - # disabled. Read once from StackConfig in new/2 so the per-fragment GC check on - # the replication critical path stays a plain struct field access instead of an - # ETS lookup. Changing the StackConfig value only affects consumers started - # afterwards (see Consumer.set_gc_heap_threshold/2). + # disabled. gc_heap_threshold: nil ] diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 8f98694a4f..a2554f54d9 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2827,8 +2827,6 @@ defmodule Electric.Shapes.ConsumerTest do xid = 11 lsn = Lsn.from_integer(10) - - # Inflate the consumer's heap by sending a large binary payload large_binary = :binary.copy(<<0>>, 200_000) txn = @@ -2841,19 +2839,17 @@ defmodule Electric.Shapes.ConsumerTest do ]) assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) - - # Wait for the consumer to process the fragment assert_receive {^ref, :new_changes, _}, @receive_timeout - {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) - - # threshold=1 forces a full GC after this fragment. Because the ~200 KB payload is - # transient garbage (not live state), the post-GC heap must be far below the payload - # size. Without GC the heap grows ~185x (observed), so this assertion fails loudly. - payload_words = div(200_000, :erlang.system_info(:wordsize)) + # GC runs in the deferred {:continue, :maybe_gc} after the reply. A synchronous + # call is queued behind the pending continue, so :sys.get_state returns only + # once the GC has run — and lets us read last_forced_gc_at, which the consumer + # stamps iff it forced a sweep. That is a direct signal of our decision, + # immune to natural BEAM GCs and heap-size timing. + assert %{last_forced_gc_at: forced_at} = :sys.get_state(consumer_pid) - assert heap_after < payload_words, - "heap_after (#{heap_after} words) should be far below payload (#{payload_words} words) after GC" + refute is_nil(forced_at), + "threshold=1 keeps the heap over threshold, so a forced GC should be recorded" end test "GC does not run when threshold is very large", ctx do @@ -2868,8 +2864,6 @@ defmodule Electric.Shapes.ConsumerTest do xid = 11 lsn = Lsn.from_integer(10) - - # Inflate the consumer's heap by sending a large binary payload large_binary = :binary.copy(<<0>>, 200_000) txn = @@ -2882,18 +2876,14 @@ defmodule Electric.Shapes.ConsumerTest do ]) assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) - - # Wait for the consumer to process the fragment assert_receive {^ref, :new_changes, _}, @receive_timeout - {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) - - # GC was NOT triggered (threshold too high), so the heap still reflects - # the retained payload — it must be >= payload_words. - payload_words = div(200_000, :erlang.system_info(:wordsize)) + # Flush the deferred :maybe_gc continue and read the state. The heap stays well + # under the 1 GB threshold, so the consumer must not have forced a sweep. + assert %{last_forced_gc_at: forced_at} = :sys.get_state(consumer_pid) - assert heap_after >= payload_words, - "heap_after (#{heap_after} words) should be >= payload (#{payload_words} words) when GC is skipped" + assert is_nil(forced_at), + "no forced GC should be recorded while under threshold, got #{inspect(forced_at)}" end test "no GC by default (threshold=nil)", ctx do @@ -2924,10 +2914,10 @@ defmodule Electric.Shapes.ConsumerTest do @tag delay_snapshot_creation?: true test "GC runs during buffered-fragment drain when heap exceeds threshold", ctx do - # threshold=1 forces GC after every fragment processed during the buffer drain. - # The consumer starts with buffering?=true; fragments sent before pg_snapshot_known - # land in the buffer. When we unblock the snapshotter it fires pg_snapshot_known - # which triggers :consume_buffer → process_buffered_txn_fragments (our new GC call). + # threshold=1 forces a GC once the buffered fragments are drained. The consumer + # starts with buffering?=true; fragments sent before pg_snapshot_known land in the + # buffer. When we unblock the snapshotter it fires pg_snapshot_known which triggers + # :consume_buffer → drains the buffer → {:continue, :maybe_gc} runs the GC once. Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1) {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) @@ -2937,9 +2927,6 @@ defmodule Electric.Shapes.ConsumerTest do consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) - # Trace GC events on the consumer to count full GCs fired during the drain. - :erlang.trace(consumer_pid, true, [:garbage_collection]) - # Send a large-payload fragment while buffering?=true — it goes into the buffer. large_binary = :binary.copy(<<0>>, 200_000) xid = 11 @@ -2956,34 +2943,20 @@ defmodule Electric.Shapes.ConsumerTest do assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) - # Unblock the snapshotter: fires pg_snapshot_known → :consume_buffer → - # process_buffered_txn_fragments where our new maybe_garbage_collect() call fires. + # Unblock the snapshotter: fires pg_snapshot_known → :consume_buffer → drains the + # buffer, then defers a single GC via {:continue, :maybe_gc}. send(snapshotter_pid, {self(), :resume}) ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) assert_receive {^ref, :new_changes, _}, @receive_timeout - :erlang.trace(consumer_pid, false, [:garbage_collection]) - - # Count how many full GC (garbage_collect) trace messages arrived. - # :garbage_collection traces emit {:trace, pid, :gc_major_start, info} / - # :gc_major_end pairs (one per full :erlang.garbage_collect() call). - gc_events = - Stream.repeatedly(fn -> - receive do - {:trace, ^consumer_pid, :gc_major_start, _} -> :gc - {:trace, ^consumer_pid, :gc_minor_start, _} -> :minor - {:trace, ^consumer_pid, :gc_major_end, _} -> :skip - {:trace, ^consumer_pid, :gc_minor_end, _} -> :skip - after - 0 -> :done - end - end) - |> Stream.take_while(&(&1 != :done)) - |> Enum.count(&(&1 == :gc)) + # The deferred GC runs in the :maybe_gc continue after the drain. Flush it with a + # synchronous call (queued behind the continue), then check last_forced_gc_at — + # the consumer stamps it iff it forced a sweep for the drained fragment. + assert %{last_forced_gc_at: forced_at} = :sys.get_state(consumer_pid) - assert gc_events >= 1, - "expected at least one full GC during buffered-fragment drain, got #{gc_events}" + refute is_nil(forced_at), + "expected a forced GC to be recorded after the buffered-fragment drain" end end From bb6425dca88e7665ee9bcd71cb919bffec4d8c73 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Mon, 15 Jun 2026 14:58:41 +0200 Subject: [PATCH 14/14] Remove superfluous blank line --- packages/sync-service/lib/electric/shapes/consumer.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index e5b15a1f67..6ee86ca229 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -547,7 +547,6 @@ defmodule Electric.Shapes.Consumer do defp handle_event(%TransactionFragment{} = txn_fragment, state) do Logger.debug(fn -> "Txn fragment received in Shapes.Consumer: #{inspect(txn_fragment)}" end) - handle_txn_fragment(txn_fragment, state) end