Conversation
✅ Deploy Preview for electric-next ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4066 +/- ##
=======================================
Coverage 88.67% 88.67%
=======================================
Files 25 25
Lines 2438 2438
Branches 613 610 -3
=======================================
Hits 2162 2162
Misses 274 274
Partials 2 2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Claude Code ReviewSummaryThis PR replaces the What’s Working Well
Issues FoundNone. Note on @alco’s Inline CommentFile: The comment asks: “Why not The current Using a custom atom like Issue ConformanceNo linked issue. The PR description and benchmarks are self-contained and provide sufficient justification. Previous Review StatusAll issues from prior iterations have been resolved:
This PR is ready to merge. Review iteration: 4 | 2026-03-27 |
bbb8ade to
04d0139
Compare
alco
left a comment
There was a problem hiding this comment.
Fabulously impressive! Let's get this in!
packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex
Outdated
Show resolved
Hide resolved
Some investigation shows that yes, shutting down a DynamicSupervisor with lots of children is **slow**. Calling `Process.kill(pid, :shutdown)` on a simple `DynamicSupervisor` with 200,000 child processes (`Agent`s) takes ~10 **minutes** to complete on my machine. This is despite the fact that `DynamicSupervisor`s terminate their children in parallel - sending `:EXIT` messages to all child processes without waiting for any to terminate. From the [erlang docs](https://www.erlang.org/doc/system/sup_princ.html#simplified-one_for_one-supervisors): > Because a simple_one_for_one supervisor can have many children, it > shuts them all down asynchronously. This means that the children will do > their cleanup in parallel and therefore the order in which they are > stopped is not defined. `DynamicSupervisor` inherited its shutdown behaviour from this supervisor strategy. Using a `PartitionSupervisor` helps and roughly reduces the time to shutdown by ~O(number of partitions) but this does not scale well with the number of running child processes. For instance with 200,000 children over 8 partitions the shutdown time is reduced to ~30s but if you increase the number of children to 500,000 there is a lower bound of ~7s below which you can never go, no matter how many partitions. The problem is that the `PartitionSupervisor` terminates its children sequentially. So as you increase the number of partitions, you're just increasing the number of children that are terminated sequentially, even if each child `DynamicSupervisor` terminates its children in parallel. This PR solves that by replacing the top-level `PartitionSupervisor` with another `DynamicSupervisor`. On shutdown all partition supervisors are terminated in parallel and the children of those partition supervisors are terminated in parallel. Here are some numbers from my benchmark showing the time required to shutdown a supervisor (tree) with 200,000 running processes. Our larger servers have 16 cores, so we're running a `PartitionSupervisor` with 16 partitions. ``` ========================================= Partitioned (PartitionSupervisor with 16 partitions) ========================================= 200000 processes memory: 72.7734375KiB start: 2.6s shutdown: 12.4s max queue len: 12480 ``` So 12 seconds even with a very simple process with no `terminate/2` callback. We could just increase the number of partitions... ``` ========================================= Partitioned (PartitionSupervisor with 50 partitions) ========================================= 200000 processes memory: 276.609375KiB start: 2.7s shutdown: 5.4s max queue len: 3936 ``` Which is better but we've nearly tripled the number of supervisors but only just over halved the shutdown time, so you start to see the tradeoff. Now with the new 2-tier `DynamicSupervisor`: ``` ========================================= DynamicPartitioned (DynamicSupervisor of 50 DynamicSupervisors) ========================================= 200000 processes memory: 180.84375KiB start: 2.8s shutdown: 0.5s max queue len: 3763 ``` So 10x improvement on the previous config and 25x on the current setup. The number of partitions can be set using a new env var `ELECTRIC_CONSUMER_PARTITIONS`. If that's not set then the partitions scale by `max_shapes` if that's known. If not we just use the number of cores. This is the shutdown time for our production stack with the fallback partition config, so nearly a 6x improvement. I've opted for a conservative default (could have gone with some multiple of the number of cores) but went with the lower-memory option. ``` ========================================= DynamicPartitioned (DynamicSupervisor of 16 DynamicSupervisors) ========================================= 200000 processes memory: 58.09375KiB start: 2.8s shutdown: 2.1s max queue len: 12555 ``` This also scales to 500,000 shapes, where it starts 125 partitions by default and shuts down in 1.5s (the original version took 70s): ``` 500000 processes memory: 435.21875KiB shutdown: 1.5s ``` This is the benchmarking script: ```elixir time = fn action -> {t, result} = :timer.tc(action, :microsecond) s = Float.round(t / 1_000_000.0, 1) {t, s, result} end time_colour = fn time -> if time > 1.0, do: :red, else: :green end {:ok, _} = Registry.start_link(name: ShutdownRegistry, keys: :unique) defmodule Simple do def desc(_config, _processes), do: "direct DynamicSupervisor" def start_supervisor(_config, _processes) do DynamicSupervisor.start_link(name: Simple.S, strategy: :one_for_one) end def start_child(supervisor, _i, child_spec) do DynamicSupervisor.start_child(supervisor, child_spec) end def supervisor_pids(supervisor_pid), do: [supervisor_pid] end defmodule Partitioned do @name Partitioned.S def desc(config, _processes), do: "PartitionSupervisor with #{config[:partitions]} partitions" def start_supervisor(config, _processes) do PartitionSupervisor.start_link( child_spec: DynamicSupervisor.child_spec(strategy: :one_for_one), partitions: Keyword.fetch!(config, :partitions), name: @name ) end def start_child(_supervisor, i, child_spec) do DynamicSupervisor.start_child( {:via, PartitionSupervisor, {@name, i}}, child_spec ) end def supervisor_pids(_supervisor_pid) do for {_id, pid, :supervisor, _module} <- PartitionSupervisor.which_children(@name) do pid end end end defmodule DynamicPartitioned do use DynamicSupervisor @name DynamicPartitioned.S @table __MODULE__ @target_per_partition 4_000 def name(id) do {:via, Registry, {ShutdownRegistry, {__MODULE__, id}}} end def desc(config, processes), do: "DynamicSupervisor of #{partition_count(config, processes)} DynamicSupervisors" def start_supervisor(config, processes) do # partitions = Keyword.fetch!(config, :partitions) partitions = partition_count(config, processes) with {:ok, sup} <- DynamicSupervisor.start_link( __MODULE__, [stack_id: "stack_id", partitions: partitions], name: @name ) do pids = for i <- 1..partitions do {:ok, pid} = DynamicSupervisor.start_child( sup, Supervisor.child_spec( {DynamicSupervisor, strategy: :one_for_one, name: name(i - 1)}, id: {__MODULE__, i} ) ) pid end # only needed for `supervisor_pids/1` :persistent_term.put({__MODULE__, :partitions}, List.to_tuple(pids)) # :persistent_term.put({__MODULE__, :partition_count}, partitions) {:ok, sup} end end defp partition_count(config, max_processes) do Keyword.get(config, :partitions) || calculate_partition_count(max_processes) end defp calculate_partition_count(max_processes) do cores = System.schedulers_online() max(cores, div(max_processes + @target_per_partition - 1, @target_per_partition)) end def start_child(_supervisor, i, child_spec) do DynamicSupervisor.start_child(partition(i), child_spec) end defp partition(i) do partitions = :ets.lookup_element(@table, :partition_count, 2) idx = :erlang.phash2(i, partitions) name(idx) end def supervisor_pids(_supervisor_pid) do partitions = :persistent_term.get({__MODULE__, :partitions}) Tuple.to_list(partitions) end def init(init_args) do partitions = Keyword.fetch!(init_args, :partitions) table = :ets.new(@table, [:named_table, :public, read_concurrency: true]) true = :ets.insert(table, [{:partition_count, partitions}]) DynamicSupervisor.init(strategy: :one_for_one) end end defmodule Stats do def monitor_message_queue(pids) do Stream.repeatedly(fn -> Enum.map(pids, fn pid -> case Process.info(pid, :message_queue_len) do {:message_queue_len, len} -> len nil -> 0 end end) end) |> Stream.flat_map(& &1) |> Enum.reduce_while(0, fn len, max -> receive do {:report, parent} -> send(parent, {:max_len, max}) {:halt, max} after 5 -> {:cont, if(len > max, do: len, else: max)} end end) end end impls = [ # {Simple, []}, {Partitioned, [partitions: 16]}, {Partitioned, [partitions: 50]}, {DynamicPartitioned, []}, {DynamicPartitioned, [partitions: 16]} ] p = [ # 100, # 1_000, # 10_000, # 20_000, 200_000 # 500_000 ] for {impl, config} <- impls do for n <- p do IO.puts( IO.ANSI.format([ "=========================================\n", [:cyan, " ", inspect(impl), :reset, " (", impl.desc(config, n), ")\n"], "=========================================\n" ]) ) {:ok, super} = impl.start_supervisor(config, n) pids = Enum.uniq([super | impl.supervisor_pids(super)]) supervisor_memory = pids |> Enum.map(fn pid -> {:memory, mem} = Process.info(pid, :memory) mem end) |> Enum.sum() {_start_us, start_s, _pids} = time.(fn -> Enum.map(1..n, fn i -> {:ok, pid} = impl.start_child( super, i, Supervisor.child_spec({Agent, fn -> i end}, id: {:agent, i}) ) pid end) end) Process.unlink(super) ref = Process.monitor(super) IO.write( IO.ANSI.format([ [:bright, String.pad_leading(to_string(n), 6, " "), :reset, " processes\n"], [" memory: ", to_string(supervisor_memory / 1024), "KiB\n", :reset], [" start: ", time_colour.(start_s), to_string(start_s), "s\n", :reset] ]) ) {_stop_us, stop_s, qlen} = time.(fn -> pids = impl.supervisor_pids(super) qpid = spawn(fn -> Stats.monitor_message_queue(pids) end) Process.exit(super, :shutdown) receive do {:DOWN, ^ref, :process, ^super, :shutdown} -> :ok msg -> raise msg end send(qpid, {:report, self()}) receive do {:max_len, len} -> len end end) IO.write( IO.ANSI.format([ [" shutdown: ", :bright, time_colour.(stop_s), to_string(stop_s), "s\n", :reset], ["max queue len: ", :bright, to_string(qlen), "\n", :reset], "\n" ]) ) end end ```
04d0139 to
ef7899d
Compare
Some investigation shows that yes, shutting down a DynamicSupervisor with lots of children is slow.
Calling
Process.kill(pid, :shutdown)on a simpleDynamicSupervisorwith 200,000 child processes (Agents) takes ~10 minutes to complete on my machine. This is despite the fact thatDynamicSupervisors terminate their children in parallel - sending:EXITmessages to all child processes without waiting for any to terminate.From the erlang docs:
DynamicSupervisorinherited its shutdown behaviour from this supervisor strategy.Using a
PartitionSupervisorhelps and roughly reduces the time to shutdown by ~O(number of partitions) but this does not scale well with the number of running child processes.For instance with 200,000 children over 8 partitions the shutdown time is reduced to ~30s but if you increase the number of children to 500,000 there is a lower bound of ~7s below which you can never go, no matter how many partitions.
The problem is that the
PartitionSupervisorterminates its children sequentially. So as you increase the number of partitions, you're just increasing the number of children that are terminated sequentially, even if each childDynamicSupervisorterminates its children in parallel.This PR solves that by replacing the top-level
PartitionSupervisorwith anotherDynamicSupervisor. On shutdown all partition supervisors are terminated in parallel and the children of those partition supervisors are also terminated in parallel, removing the bottleneck.Here are some numbers from my benchmark showing the time required to shutdown a supervisor (tree) with 200,000 running processes.
Our larger servers have 16 cores, so we're running a
PartitionSupervisorwith 16 partitions.So 12 seconds even with a very simple process with no
terminate/2callback.We could just increase the number of partitions...
Which is better but we've nearly tripled the number of supervisors but only just over halved the shutdown time, so you start to see the tradeoff.
Now with the new 2-tier
DynamicSupervisor:So 10x improvement on the previous config and 25x on the current setup.
The number of partitions can be set using a new env var
ELECTRIC_CONSUMER_PARTITIONS. If that's not set then the partitions scale bymax_shapesif that's known. If not we just use the number of cores.This is the shutdown time for our production stack with the fallback partition config, so nearly a 6x improvement.
I've opted for a conservative default (could have gone with some multiple of the number of cores) but went with the lower-memory option.
This also scales to 500,000 shapes, where it starts 125 partitions by default and shuts down in 1.5s (the original version took 70s):
This is the benchmarking script: