Skip to content

fix(sync-service): Improve shutdown times#4066

Open
magnetised wants to merge 1 commit intomainfrom
magnetised/pzwpmurupwos
Open

fix(sync-service): Improve shutdown times#4066
magnetised wants to merge 1 commit intomainfrom
magnetised/pzwpmurupwos

Conversation

@magnetised
Copy link
Copy Markdown
Contributor

Some investigation shows that yes, shutting down a DynamicSupervisor with lots of children is slow.

Calling Process.kill(pid, :shutdown) on a simple DynamicSupervisor with 200,000 child processes (Agents) takes ~10 minutes to complete on my machine. This is despite the fact that DynamicSupervisors terminate their children in parallel - sending :EXIT messages to all child processes without waiting for any to terminate.

From the erlang docs:

Because a simple_one_for_one supervisor can have many children, it
shuts them all down asynchronously. This means that the children will do
their cleanup in parallel and therefore the order in which they are
stopped is not defined.

DynamicSupervisor inherited its shutdown behaviour from this supervisor strategy.

Using a PartitionSupervisor helps and roughly reduces the time to shutdown by ~O(number of partitions) but this does not scale well with the number of running child processes.

For instance with 200,000 children over 8 partitions the shutdown time is reduced to ~30s but if you increase the number of children to 500,000 there is a lower bound of ~7s below which you can never go, no matter how many partitions.

The problem is that the PartitionSupervisor terminates its children sequentially. So as you increase the number of partitions, you're just increasing the number of children that are terminated sequentially, even if each child DynamicSupervisor terminates its children in parallel.

This PR solves that by replacing the top-level PartitionSupervisor with another DynamicSupervisor. On shutdown all partition supervisors are terminated in parallel and the children of those partition supervisors are also terminated in parallel, removing the bottleneck.

Here are some numbers from my benchmark showing the time required to shutdown a supervisor (tree) with 200,000 running processes.

Our larger servers have 16 cores, so we're running a PartitionSupervisor with 16 partitions.

=========================================
 Partitioned (PartitionSupervisor with 16 partitions)
=========================================

200000 processes
       memory: 72.7734375KiB
        start: 2.6s
     shutdown: 12.4s
max queue len: 12480

So 12 seconds even with a very simple process with no terminate/2 callback.

We could just increase the number of partitions...

=========================================
 Partitioned (PartitionSupervisor with 50 partitions)
=========================================

200000 processes
       memory: 276.609375KiB
        start: 2.7s
     shutdown: 5.4s
max queue len: 3936

Which is better but we've nearly tripled the number of supervisors but only just over halved the shutdown time, so you start to see the tradeoff.

Now with the new 2-tier DynamicSupervisor:

=========================================
 DynamicPartitioned (DynamicSupervisor of 50 DynamicSupervisors)
=========================================

200000 processes
       memory: 180.84375KiB
        start: 2.8s
     shutdown: 0.5s
max queue len: 3763

So 10x improvement on the previous config and 25x on the current setup.

The number of partitions can be set using a new env var ELECTRIC_CONSUMER_PARTITIONS. If that's not set then the partitions scale by max_shapes if that's known. If not we just use the number of cores.

This is the shutdown time for our production stack with the fallback partition config, so nearly a 6x improvement.

I've opted for a conservative default (could have gone with some multiple of the number of cores) but went with the lower-memory option.

=========================================
 DynamicPartitioned (DynamicSupervisor of 16 DynamicSupervisors)
=========================================

200000 processes
       memory: 58.09375KiB
        start: 2.8s
     shutdown: 2.1s
max queue len: 12555

This also scales to 500,000 shapes, where it starts 125 partitions by default and shuts down in 1.5s (the original version took 70s):

500000 processes
       memory: 435.21875KiB
     shutdown: 1.5s

This is the benchmarking script:

time = fn action ->
  {t, result} = :timer.tc(action, :microsecond)
  s = Float.round(t / 1_000_000.0, 1)
  {t, s, result}
end

time_colour = fn time ->
  if time > 1.0, do: :red, else: :green
end

{:ok, _} = Registry.start_link(name: ShutdownRegistry, keys: :unique)

defmodule Simple do
  def desc(_config, _processes), do: "direct DynamicSupervisor"

  def start_supervisor(_config, _processes) do
    DynamicSupervisor.start_link(name: Simple.S, strategy: :one_for_one)
  end

  def start_child(supervisor, _i, child_spec) do
    DynamicSupervisor.start_child(supervisor, child_spec)
  end

  def supervisor_pids(supervisor_pid), do: [supervisor_pid]
end

defmodule Partitioned do
  @name Partitioned.S

  def desc(config, _processes), do: "PartitionSupervisor with #{config[:partitions]} partitions"

  def start_supervisor(config, _processes) do
    PartitionSupervisor.start_link(
      child_spec: DynamicSupervisor.child_spec(strategy: :one_for_one),
      partitions: Keyword.fetch!(config, :partitions),
      name: @name
    )
  end

  def start_child(_supervisor, i, child_spec) do
    DynamicSupervisor.start_child(
      {:via, PartitionSupervisor, {@name, i}},
      child_spec
    )
  end

  def supervisor_pids(_supervisor_pid) do
    for {_id, pid, :supervisor, _module} <- PartitionSupervisor.which_children(@name) do
      pid
    end
  end
end

defmodule DynamicPartitioned do
  use DynamicSupervisor

  @name DynamicPartitioned.S
  @table __MODULE__
  @target_per_partition 4_000

  def name(id) do
    {:via, Registry, {ShutdownRegistry, {__MODULE__, id}}}
  end

  def desc(config, processes),
    do: "DynamicSupervisor of #{partition_count(config, processes)} DynamicSupervisors"

  def start_supervisor(config, processes) do
    # partitions = Keyword.fetch!(config, :partitions)

    partitions = partition_count(config, processes)

    with {:ok, sup} <-
           DynamicSupervisor.start_link(
             __MODULE__,
             [stack_id: "stack_id", partitions: partitions],
             name: @name
           ) do
      pids =
        for i <- 1..partitions do
          {:ok, pid} =
            DynamicSupervisor.start_child(
              sup,
              Supervisor.child_spec(
                {DynamicSupervisor, strategy: :one_for_one, name: name(i - 1)},
                id: {__MODULE__, i}
              )
            )

          pid
        end

      # only needed for `supervisor_pids/1`
      :persistent_term.put({__MODULE__, :partitions}, List.to_tuple(pids))
      # :persistent_term.put({__MODULE__, :partition_count}, partitions)

      {:ok, sup}
    end
  end

  defp partition_count(config, max_processes) do
    Keyword.get(config, :partitions) || calculate_partition_count(max_processes)
  end

  defp calculate_partition_count(max_processes) do
    cores = System.schedulers_online()
    max(cores, div(max_processes + @target_per_partition - 1, @target_per_partition))
  end

  def start_child(_supervisor, i, child_spec) do
    DynamicSupervisor.start_child(partition(i), child_spec)
  end

  defp partition(i) do
    partitions = :ets.lookup_element(@table, :partition_count, 2)
    idx = :erlang.phash2(i, partitions)
    name(idx)
  end

  def supervisor_pids(_supervisor_pid) do
    partitions = :persistent_term.get({__MODULE__, :partitions})
    Tuple.to_list(partitions)
  end

  def init(init_args) do
    partitions = Keyword.fetch!(init_args, :partitions)
    table = :ets.new(@table, [:named_table, :public, read_concurrency: true])
    true = :ets.insert(table, [{:partition_count, partitions}])
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

defmodule Stats do
  def monitor_message_queue(pids) do
    Stream.repeatedly(fn ->
      Enum.map(pids, fn pid ->
        case Process.info(pid, :message_queue_len) do
          {:message_queue_len, len} -> len
          nil -> 0
        end
      end)
    end)
    |> Stream.flat_map(& &1)
    |> Enum.reduce_while(0, fn len, max ->
      receive do
        {:report, parent} ->
          send(parent, {:max_len, max})
          {:halt, max}
      after
        5 ->
          {:cont, if(len > max, do: len, else: max)}
      end
    end)
  end
end

impls = [
  # {Simple, []},
  {Partitioned, [partitions: 16]},
  {Partitioned, [partitions: 50]},
  {DynamicPartitioned, []},
  {DynamicPartitioned, [partitions: 16]}
]

p = [
  # 100,
  # 1_000,
  # 10_000,
  # 20_000,
  200_000
  # 500_000
]

for {impl, config} <- impls do
  for n <- p do
    IO.puts(
      IO.ANSI.format([
        "=========================================\n",
        [:cyan, " ", inspect(impl), :reset, " (", impl.desc(config, n), ")\n"],
        "=========================================\n"
      ])
    )

    {:ok, super} = impl.start_supervisor(config, n)

    pids = Enum.uniq([super | impl.supervisor_pids(super)])

    supervisor_memory =
      pids
      |> Enum.map(fn pid ->
        {:memory, mem} = Process.info(pid, :memory)
        mem
      end)
      |> Enum.sum()

    {_start_us, start_s, _pids} =
      time.(fn ->
        Enum.map(1..n, fn i ->
          {:ok, pid} =
            impl.start_child(
              super,
              i,
              Supervisor.child_spec({Agent, fn -> i end}, id: {:agent, i})
            )

          pid
        end)
      end)

    Process.unlink(super)
    ref = Process.monitor(super)

    IO.write(
      IO.ANSI.format([
        [:bright, String.pad_leading(to_string(n), 6, " "), :reset, " processes\n"],
        ["       memory: ", to_string(supervisor_memory / 1024), "KiB\n", :reset],
        ["        start: ", time_colour.(start_s), to_string(start_s), "s\n", :reset]
      ])
    )

    {_stop_us, stop_s, qlen} =
      time.(fn ->
        pids = impl.supervisor_pids(super)

        qpid =
          spawn(fn ->
            Stats.monitor_message_queue(pids)
          end)

        Process.exit(super, :shutdown)

        receive do
          {:DOWN, ^ref, :process, ^super, :shutdown} -> :ok
          msg -> raise msg
        end

        send(qpid, {:report, self()})

        receive do
          {:max_len, len} -> len
        end
      end)

    IO.write(
      IO.ANSI.format([
        ["     shutdown: ", :bright, time_colour.(stop_s), to_string(stop_s), "s\n", :reset],
        ["max queue len: ", :bright, to_string(qlen), "\n", :reset],
        "\n"
      ])
    )
  end
end

@magnetised magnetised self-assigned this Mar 26, 2026
@netlify
Copy link
Copy Markdown

netlify bot commented Mar 26, 2026

Deploy Preview for electric-next ready!

Name Link
🔨 Latest commit ef7899d
🔍 Latest deploy log https://app.netlify.com/projects/electric-next/deploys/69c6930a7501600008e64226
😎 Deploy Preview https://deploy-preview-4066--electric-next.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 26, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 88.67%. Comparing base (0b45e63) to head (ef7899d).
✅ All tests successful. No failed tests found.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #4066   +/-   ##
=======================================
  Coverage   88.67%   88.67%           
=======================================
  Files          25       25           
  Lines        2438     2438           
  Branches      613      610    -3     
=======================================
  Hits         2162     2162           
  Misses        274      274           
  Partials        2        2           
Flag Coverage Δ
packages/experimental 87.73% <ø> (ø)
packages/react-hooks 86.48% <ø> (ø)
packages/start 82.83% <ø> (ø)
packages/typescript-client 93.81% <ø> (ø)
packages/y-electric 56.05% <ø> (ø)
typescript 88.67% <ø> (ø)
unit-tests 88.67% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@claude
Copy link
Copy Markdown

claude bot commented Mar 26, 2026

Claude Code Review

Summary

This PR replaces the PartitionSupervisor-based consumer supervision tree with a two-tier DynamicSupervisor hierarchy, delivering 6–25x shutdown time improvements. The implementation is sound and all previously raised issues have been resolved.

What’s Working Well

  • Two-tier DynamicSupervisor design — correct and well-benchmarked; all partition supervisors and their children terminate in parallel on shutdown, eliminating the sequential bottleneck from PartitionSupervisor.
  • ETS table for partition count — owned by the DynamicConsumerSupervisor process, automatically cleaned up on exit; no leak on restart.
  • Partition supervisor init is convention-compliantPartitionDynamicSupervisor.init/1 sets Process.set_label, Logger.metadata, and Sentry.set_tags_context, matching the project’s GenServer init requirements.
  • is_stack_id guards on the public API functions (start_shape_consumer, start_snapshotter, start_materializer) make the contract explicit.
  • Error path cleanupDynamicSupervisor.stop(supervisor_pid, :shutdown) is called before returning {:error, _} from start_link, preventing an orphaned supervisor from blocking future restarts.

Issues Found

None.

Note on @alco’s Inline Comment

File: packages/sync-service/lib/electric/shapes/dynamic_consumer_supervisor.ex

The comment asks: “Why not DynamicSupervisor.stop(supervisor_pid, :failed_to_start_partition)?”

The current :shutdown reason is the correct choice here. Using :shutdown signals a clean, expected termination to the BEAM runtime — no crash report is generated and supervisor monitors will not treat it as an unexpected crash. Since the outer DynamicSupervisor is being stopped before start_link returns (the parent supervisor has no PID reference yet), the stop reason has no effect on restart logic regardless.

Using a custom atom like :failed_to_start_partition would cause the process to exit with an abnormal reason, which can generate spurious log warnings and potentially trigger Sentry alerts. :shutdown is semantically accurate: this is a deliberate, orderly stop — not a crash.

Issue Conformance

No linked issue. The PR description and benchmarks are self-contained and provide sufficient justification.

Previous Review Status

All issues from prior iterations have been resolved:

  1. Orphaned supervisor on failed partition startupDynamicSupervisor.stop(supervisor_pid, :shutdown) is called in the {:error, _} path.
  2. API simplificationsupervisor_ref replaced with stack_id on public functions.
  3. NimbleOptions validationconsumer_partitions validated at startup.
  4. ETS table lifecycle — owned by supervisor process, auto-cleaned on exit.

This PR is ready to merge.


Review iteration: 4 | 2026-03-27

@magnetised magnetised force-pushed the magnetised/pzwpmurupwos branch 2 times, most recently from bbb8ade to 04d0139 Compare March 26, 2026 17:24
Copy link
Copy Markdown
Member

@alco alco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fabulously impressive! Let's get this in!

Some investigation shows that yes, shutting down a DynamicSupervisor
with lots of children is **slow**.

Calling `Process.kill(pid, :shutdown)`  on a simple `DynamicSupervisor`
with 200,000 child processes (`Agent`s) takes ~10 **minutes** to complete on my
machine. This is despite the fact that `DynamicSupervisor`s terminate
their children in parallel - sending `:EXIT` messages to all child
processes without waiting for any to terminate.

From the [erlang docs](https://www.erlang.org/doc/system/sup_princ.html#simplified-one_for_one-supervisors):

> Because a simple_one_for_one supervisor can have many children, it
> shuts them all down asynchronously. This means that the children will do
> their cleanup in parallel and therefore the order in which they are
> stopped is not defined.

`DynamicSupervisor` inherited its shutdown behaviour from this
supervisor strategy.

Using a `PartitionSupervisor` helps and roughly reduces  the time to
shutdown by ~O(number of partitions) but this does not scale well with
the number of running child processes.

For instance with 200,000 children over 8 partitions the shutdown time
is reduced to ~30s but if you increase the number of children to 500,000
there is a lower bound of ~7s below which you can never go, no matter
how many partitions.

The problem is that the `PartitionSupervisor` terminates its children
sequentially. So as you increase the number of partitions, you're just
increasing the number of children that are terminated sequentially, even
if each child `DynamicSupervisor` terminates its children in parallel.

This PR solves that by replacing the top-level `PartitionSupervisor`
with another `DynamicSupervisor`. On shutdown all partition supervisors
are terminated in parallel and the children of those partition
supervisors are terminated in parallel.

Here are some numbers from my benchmark showing the time required to
shutdown a supervisor (tree) with 200,000 running processes.

Our larger servers have 16 cores, so we're running a
`PartitionSupervisor` with 16 partitions.

```
=========================================
 Partitioned (PartitionSupervisor with 16 partitions)
=========================================

200000 processes
       memory: 72.7734375KiB
        start: 2.6s
     shutdown: 12.4s
max queue len: 12480
```

So 12 seconds even with a very simple process with no `terminate/2`
callback.

We could just increase the number of partitions...

```
=========================================
 Partitioned (PartitionSupervisor with 50 partitions)
=========================================

200000 processes
       memory: 276.609375KiB
        start: 2.7s
     shutdown: 5.4s
max queue len: 3936
```

Which is better but we've nearly tripled the number of supervisors but
only just over halved the shutdown time, so you start to see the
tradeoff.

Now with the new 2-tier `DynamicSupervisor`:

```
=========================================
 DynamicPartitioned (DynamicSupervisor of 50 DynamicSupervisors)
=========================================

200000 processes
       memory: 180.84375KiB
        start: 2.8s
     shutdown: 0.5s
max queue len: 3763
```

So 10x improvement on the previous config and 25x on the current setup.

The number of partitions can be set using a new env var
`ELECTRIC_CONSUMER_PARTITIONS`. If that's not set then the partitions
scale by `max_shapes` if that's known. If not we just use the number of
cores.

This is the shutdown time for our production stack with the fallback
partition config, so nearly a 6x improvement.

I've opted for a conservative default (could have gone with some
multiple of the number of cores) but went with the lower-memory option.

```
=========================================
 DynamicPartitioned (DynamicSupervisor of 16 DynamicSupervisors)
=========================================

200000 processes
       memory: 58.09375KiB
        start: 2.8s
     shutdown: 2.1s
max queue len: 12555

```

This also scales to 500,000 shapes, where it starts 125 partitions by
default and shuts down in 1.5s (the original version took 70s):

```
500000 processes
       memory: 435.21875KiB
     shutdown: 1.5s
```

This is the benchmarking script:

```elixir
time = fn action ->
  {t, result} = :timer.tc(action, :microsecond)
  s = Float.round(t / 1_000_000.0, 1)
  {t, s, result}
end

time_colour = fn time ->
  if time > 1.0, do: :red, else: :green
end

{:ok, _} = Registry.start_link(name: ShutdownRegistry, keys: :unique)

defmodule Simple do
  def desc(_config, _processes), do: "direct DynamicSupervisor"

  def start_supervisor(_config, _processes) do
    DynamicSupervisor.start_link(name: Simple.S, strategy: :one_for_one)
  end

  def start_child(supervisor, _i, child_spec) do
    DynamicSupervisor.start_child(supervisor, child_spec)
  end

  def supervisor_pids(supervisor_pid), do: [supervisor_pid]
end

defmodule Partitioned do
  @name Partitioned.S

  def desc(config, _processes), do: "PartitionSupervisor with #{config[:partitions]} partitions"

  def start_supervisor(config, _processes) do
    PartitionSupervisor.start_link(
      child_spec: DynamicSupervisor.child_spec(strategy: :one_for_one),
      partitions: Keyword.fetch!(config, :partitions),
      name: @name
    )
  end

  def start_child(_supervisor, i, child_spec) do
    DynamicSupervisor.start_child(
      {:via, PartitionSupervisor, {@name, i}},
      child_spec
    )
  end

  def supervisor_pids(_supervisor_pid) do
    for {_id, pid, :supervisor, _module} <- PartitionSupervisor.which_children(@name) do
      pid
    end
  end
end

defmodule DynamicPartitioned do
  use DynamicSupervisor

  @name DynamicPartitioned.S
  @table __MODULE__
  @target_per_partition 4_000

  def name(id) do
    {:via, Registry, {ShutdownRegistry, {__MODULE__, id}}}
  end

  def desc(config, processes),
    do: "DynamicSupervisor of #{partition_count(config, processes)} DynamicSupervisors"

  def start_supervisor(config, processes) do
    # partitions = Keyword.fetch!(config, :partitions)

    partitions = partition_count(config, processes)

    with {:ok, sup} <-
           DynamicSupervisor.start_link(
             __MODULE__,
             [stack_id: "stack_id", partitions: partitions],
             name: @name
           ) do
      pids =
        for i <- 1..partitions do
          {:ok, pid} =
            DynamicSupervisor.start_child(
              sup,
              Supervisor.child_spec(
                {DynamicSupervisor, strategy: :one_for_one, name: name(i - 1)},
                id: {__MODULE__, i}
              )
            )

          pid
        end

      # only needed for `supervisor_pids/1`
      :persistent_term.put({__MODULE__, :partitions}, List.to_tuple(pids))
      # :persistent_term.put({__MODULE__, :partition_count}, partitions)

      {:ok, sup}
    end
  end

  defp partition_count(config, max_processes) do
    Keyword.get(config, :partitions) || calculate_partition_count(max_processes)
  end

  defp calculate_partition_count(max_processes) do
    cores = System.schedulers_online()
    max(cores, div(max_processes + @target_per_partition - 1, @target_per_partition))
  end

  def start_child(_supervisor, i, child_spec) do
    DynamicSupervisor.start_child(partition(i), child_spec)
  end

  defp partition(i) do
    partitions = :ets.lookup_element(@table, :partition_count, 2)
    idx = :erlang.phash2(i, partitions)
    name(idx)
  end

  def supervisor_pids(_supervisor_pid) do
    partitions = :persistent_term.get({__MODULE__, :partitions})
    Tuple.to_list(partitions)
  end

  def init(init_args) do
    partitions = Keyword.fetch!(init_args, :partitions)
    table = :ets.new(@table, [:named_table, :public, read_concurrency: true])
    true = :ets.insert(table, [{:partition_count, partitions}])
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

defmodule Stats do
  def monitor_message_queue(pids) do
    Stream.repeatedly(fn ->
      Enum.map(pids, fn pid ->
        case Process.info(pid, :message_queue_len) do
          {:message_queue_len, len} -> len
          nil -> 0
        end
      end)
    end)
    |> Stream.flat_map(& &1)
    |> Enum.reduce_while(0, fn len, max ->
      receive do
        {:report, parent} ->
          send(parent, {:max_len, max})
          {:halt, max}
      after
        5 ->
          {:cont, if(len > max, do: len, else: max)}
      end
    end)
  end
end

impls = [
  # {Simple, []},
  {Partitioned, [partitions: 16]},
  {Partitioned, [partitions: 50]},
  {DynamicPartitioned, []},
  {DynamicPartitioned, [partitions: 16]}
]

p = [
  # 100,
  # 1_000,
  # 10_000,
  # 20_000,
  200_000
  # 500_000
]

for {impl, config} <- impls do
  for n <- p do
    IO.puts(
      IO.ANSI.format([
        "=========================================\n",
        [:cyan, " ", inspect(impl), :reset, " (", impl.desc(config, n), ")\n"],
        "=========================================\n"
      ])
    )

    {:ok, super} = impl.start_supervisor(config, n)

    pids = Enum.uniq([super | impl.supervisor_pids(super)])

    supervisor_memory =
      pids
      |> Enum.map(fn pid ->
        {:memory, mem} = Process.info(pid, :memory)
        mem
      end)
      |> Enum.sum()

    {_start_us, start_s, _pids} =
      time.(fn ->
        Enum.map(1..n, fn i ->
          {:ok, pid} =
            impl.start_child(
              super,
              i,
              Supervisor.child_spec({Agent, fn -> i end}, id: {:agent, i})
            )

          pid
        end)
      end)

    Process.unlink(super)
    ref = Process.monitor(super)

    IO.write(
      IO.ANSI.format([
        [:bright, String.pad_leading(to_string(n), 6, " "), :reset, " processes\n"],
        ["       memory: ", to_string(supervisor_memory / 1024), "KiB\n", :reset],
        ["        start: ", time_colour.(start_s), to_string(start_s), "s\n", :reset]
      ])
    )

    {_stop_us, stop_s, qlen} =
      time.(fn ->
        pids = impl.supervisor_pids(super)

        qpid =
          spawn(fn ->
            Stats.monitor_message_queue(pids)
          end)

        Process.exit(super, :shutdown)

        receive do
          {:DOWN, ^ref, :process, ^super, :shutdown} -> :ok
          msg -> raise msg
        end

        send(qpid, {:report, self()})

        receive do
          {:max_len, len} -> len
        end
      end)

    IO.write(
      IO.ANSI.format([
        ["     shutdown: ", :bright, time_colour.(stop_s), to_string(stop_s), "s\n", :reset],
        ["max queue len: ", :bright, to_string(qlen), "\n", :reset],
        "\n"
      ])
    )
  end
end

```
@magnetised magnetised force-pushed the magnetised/pzwpmurupwos branch from 04d0139 to ef7899d Compare March 27, 2026 14:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants