Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions lib/async/container/context.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Udi Oron.

module Async
module Container
# A single level of a worker's execution context.
# @attribute kind [Symbol] Either `:process` or `:thread`.
# @attribute num [Integer] The container-scoped ordinal of the worker at this level.
# @attribute name [String | Nil] The name the container was given for this worker.
Frame = Data.define(:kind, :num, :name)

# Mixed into each container's `Child::Instance` to expose its place in the worker
# hierarchy. The frame stack is built from the object graph (this instance plus its
# parent chain), so worker code that holds the instance - e.g. an async-service
# `prepare!(instance)` hook - can read its durable worker number with no process or
# thread globals.
module Context
# The instance of the worker this one is nested inside, or `nil` at the top level.
# A {Hybrid} thread's parent is its fork; a plain {Forked}/{Threaded} worker has none.
attr_accessor :parent

# The execution context as a {Frame} stack, outermost level first.
# @returns [Array(Frame)] e.g. `[process, thread]` for a Hybrid thread worker.
def context
(parent ? parent.context : []) + [Frame.new(kind: kind, num: num, name: name)]
end
end
end
end
37 changes: 27 additions & 10 deletions lib/async/container/forked.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

require_relative "generic"
require_relative "channel"
require_relative "context"
require_relative "notify/pipe"

module Async
Expand All @@ -22,10 +23,12 @@ def self.multiprocess?
class Child < Channel
# Represents a running child process from the point of view of the child process.
class Instance < Notify::Pipe
include Context

# Wrap an instance around the {Process} instance from within the forked child.
# @parameter process [Process] The process intance to wrap.
def self.for(process)
instance = self.new(process.out)
def self.for(process, instance_num: nil)
instance = self.new(process.out, num: instance_num)

# The child process won't be reading from the channel:
process.close_read
Expand All @@ -38,10 +41,19 @@ def self.for(process)
# Initialize the child process instance.
#
# @parameter io [IO] The IO object to use for communication.
def initialize(io)
super
def initialize(io, num: nil)
super(io)

@name = nil
@num = num
end

# @returns [Integer | Nil] The container-scoped ordinal of this worker.
attr :num

# @returns [Symbol] The kind of worker this instance represents.
def kind
:process
end

# Generate a hash representation of the process.
Expand All @@ -51,6 +63,7 @@ def as_json(...)
{
process_id: ::Process.pid,
name: @name,
num: @num,
}
end

Expand Down Expand Up @@ -98,9 +111,9 @@ def exec(*arguments, ready: true, **options)
# Fork a child process appropriate for a container.
#
# @returns [Process]
def self.fork(**options)
def self.fork(instance_num: nil, **options)
# $stderr.puts fork: caller
self.new(**options) do |process|
self.new(instance_num: instance_num, **options) do |process|
::Process.fork do
# We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly.
Signal.trap(:INT){::Thread.current.raise(Interrupt)}
Expand All @@ -109,7 +122,7 @@ def self.fork(**options)

# This could be a configuration option:
::Thread.handle_interrupt(SignalException => :immediate) do
yield Instance.for(process)
yield Instance.for(process, instance_num: instance_num)
rescue Interrupt
# Graceful exit.
rescue Exception => error
Expand Down Expand Up @@ -138,10 +151,11 @@ def self.spawn(*arguments, name: nil, **options)

# Initialize the process.
# @parameter name [String] The name to use for the child process.
def initialize(name: nil, **options)
def initialize(name: nil, instance_num: nil, **options)
super(**options)

@name = name
@instance_num = instance_num
@status = nil
@pid = nil

Expand Down Expand Up @@ -185,6 +199,9 @@ def name= value
# @attribute [Integer] The process identifier.
attr :pid

# @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents.
attr :instance_num

# A human readable representation of the process.
# @returns [String]
def inspect
Expand Down Expand Up @@ -265,8 +282,8 @@ def wait(timeout = 0.1)
# Start a named child process and execute the provided block in it.
# @parameter name [String] The name (title) of the child process.
# @parameter block [Proc] The block to execute in the child process.
def start(name, &block)
Child.fork(name: name, &block)
def start(name, instance_num: nil, &block)
Child.fork(name: name, instance_num: instance_num, &block)
end
end
end
Expand Down
38 changes: 36 additions & 2 deletions lib/async/container/generic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ def initialize(policy: Policy::DEFAULT, **options)
@policy = policy
@statistics = @policy.make_statistics
@keyed = {}
# Container-scoped allocation of worker ordinals (`instance_num`): a monotonic
# counter plus a free set, so a num released by a permanently exited worker is
# recycled, keeping the range compact (e.g. for multiprocess metric files).
@next_instance_num = 0
@free_instance_nums = Set.new
end

# @attribute [Group] The group of running children instances.
Expand Down Expand Up @@ -213,21 +218,27 @@ def stop(timeout = true)
# @parameter key [Symbol] A key used for reloading child instances.
# @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy.
# @parameter startup_timeout [Numeric | Nil] The maximum time a child instance can run without becoming ready, before it is terminated as unhealthy.
def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block)
def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_timeout: nil, startup_timeout: nil, &block)
name ||= UNNAMED

if mark?(key)
Console.debug(self, "Reusing existing child.", child: {key: key, name: name})
return false
end

# Allocate before the fiber so the closure captures the num and it stays unchanged
# across a restart (which re-enters `start` in the same fiber). Only release a num
# we allocated ourselves, and only when the worker permanently exits.
owned = instance_num.nil?
instance_num ||= acquire_instance_num

@statistics.spawn!

fiber do
until @stopping
Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics)

child = self.start(name, &block)
child = self.start(name, instance_num: instance_num, &block)
state = insert(key, child)

# Notify policy of spawn
Expand Down Expand Up @@ -300,11 +311,34 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu
break
end
end
ensure
release_instance_num(instance_num) if owned
end.resume

return true
end

# Allocate a container-scoped worker ordinal, recycling the lowest released num.
# Allocation runs on the single cooperative reactor thread (acquire in the run loop,
# release in the fiber's ensure), so no synchronisation is required.
protected def acquire_instance_num
unless @free_instance_nums.empty?
num = @free_instance_nums.min
@free_instance_nums.delete(num)
return num
end

num = @next_instance_num
@next_instance_num += 1
num
end

# Return a worker ordinal to the free set once its worker has permanently exited. Using a
# Set makes release idempotent, so a double release can't hand the same num to two workers.
protected def release_instance_num(instance_num)
@free_instance_nums.add(instance_num)
end

# Run multiple instances of the same block in the container.
# @parameter count [Integer] The number of instances to start.
def run(count: Container.processor_count, **options, &block)
Expand Down
7 changes: 6 additions & 1 deletion lib/async/container/hybrid.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio
self.spawn(**options) do |instance|
container = Threaded.new

container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block)
# Link each inner thread worker to this fork, so the thread instance can reach
# the durable process num via `instance.parent` / `instance.context`.
container.run(count: threads, health_check_timeout: health_check_timeout, **options) do |worker|
worker.parent = instance
block.call(worker)
end

container.wait_until_ready
instance.ready!
Expand Down
37 changes: 27 additions & 10 deletions lib/async/container/threaded.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

require_relative "generic"
require_relative "channel"
require_relative "context"
require_relative "notify/pipe"

module Async
Expand Down Expand Up @@ -41,21 +42,32 @@ def error

# Represents a running child thread from the point of view of the child thread.
class Instance < Notify::Pipe
include Context

# Wrap an instance around the {Thread} instance from within the threaded child.
# @parameter thread [Thread] The thread intance to wrap.
def self.for(thread)
instance = self.new(thread.out)
def self.for(thread, instance_num: nil)
instance = self.new(thread.out, num: instance_num)

return instance
end

# Initialize the child thread instance.
#
# @parameter io [IO] The IO object to use for communication with the parent.
def initialize(io)
def initialize(io, num: nil)
@thread = ::Thread.current
@num = num

super
super(io)
end

# @returns [Integer | Nil] The container-scoped ordinal of this worker.
attr :num

# @returns [Symbol] The kind of worker this instance represents.
def kind
:thread
end

# Generate a hash representation of the thread.
Expand All @@ -66,6 +78,7 @@ def as_json(...)
process_id: ::Process.pid,
thread_id: @thread.object_id,
name: @thread.name,
num: @num,
}
end

Expand Down Expand Up @@ -111,12 +124,12 @@ def exec(*arguments, ready: true, **options)
# Start a new child thread and execute the provided block in it.
#
# @parameter options [Hash] Additional options to to the new child instance.
def self.fork(**options)
self.new(**options) do |thread|
def self.fork(instance_num: nil, **options)
self.new(instance_num: instance_num, **options) do |thread|
::Thread.new do
# This could be a configuration option (see forked implementation too):
::Thread.handle_interrupt(SignalException => :immediate) do
yield Instance.for(thread)
yield Instance.for(thread, instance_num: instance_num)
end
end
end
Expand All @@ -125,9 +138,10 @@ def self.fork(**options)
# Initialize the thread.
#
# @parameter name [String] The name to use for the child thread.
def initialize(name: nil, **options)
def initialize(name: nil, instance_num: nil, **options)
super(**options)

@instance_num = instance_num
@status = nil

@thread = yield(self)
Expand All @@ -150,6 +164,9 @@ def initialize(name: nil, **options)
end
end

# @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents.
attr :instance_num

# Convert the child process to a hash, suitable for serialization.
#
# @returns [Hash] The request as a hash.
Expand Down Expand Up @@ -286,8 +303,8 @@ def finished(error = nil)
# Start a named child thread and execute the provided block in it.
# @parameter name [String] The name (title) of the child process.
# @parameter block [Proc] The block to execute in the child process.
def start(name, &block)
Child.fork(name: name, &block)
def start(name, instance_num: nil, &block)
Child.fork(name: name, instance_num: instance_num, &block)
end
end
end
Expand Down
64 changes: 64 additions & 0 deletions test/async/container/context.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2026, by Udi Oron.

require "async/container/threaded"
require "async/container/forked"
require "async/container/hybrid"
require "async/container/best"

# Have the worker serialise something about its instance to a pipe, one line per worker.
def report_from_worker(container, **run_options)
input, output = IO.pipe
container.run(**run_options) do |instance|
output.write(yield(instance) + "\n")
end
container.wait
output.close
input.read.lines.map(&:chomp)
ensure
input&.close unless input&.closed?
end

describe Async::Container::Threaded do
it "exposes a single :thread frame and no parent via instance.context" do
reported = report_from_worker(subject.new, count: 1) do |instance|
"#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}"
end

expect(reported).to be == ["thread:0|parent=nil"]
end
end

describe Async::Container::Forked do
it "exposes a single :process frame and no parent via instance.context" do
reported = report_from_worker(subject.new, count: 1) do |instance|
"#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}"
end

expect(reported).to be == ["process:0|parent=nil"]
end
end if Async::Container.fork?

describe Async::Container::Hybrid do
it "exposes [:process, :thread] frames via instance.context" do
reported = report_from_worker(subject.new, count: 1, forks: 1, threads: 1) do |instance|
instance.context.map {|f| f.kind}.join(",")
end

expect(reported).to be == ["process,thread"]
end

it "reaches the durable forked num through instance.parent (not the thread num)" do
reported = report_from_worker(subject.new, count: 2, forks: 2, threads: 1) do |instance|
"#{instance.kind}/#{instance.num} parent=#{instance.parent&.kind}/#{instance.parent&.num}"
end

# Both workers are thread num 0 within their fork; the durable forked num is on the parent.
expect(reported.sort).to be == [
"thread/0 parent=process/0",
"thread/0 parent=process/1",
]
end
end if Async::Container.fork?
Loading