diff --git a/lib/async/container/context.rb b/lib/async/container/context.rb new file mode 100644 index 0000000..9c9d056 --- /dev/null +++ b/lib/async/container/context.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Udi Oron. + +module Async + module Container + # A single level of a worker's execution context. + # @attribute kind [Symbol] Either `:process` or `:thread`. + # @attribute num [Integer] The container-scoped ordinal of the worker at this level. + # @attribute name [String | Nil] The name the container was given for this worker. + Frame = Data.define(:kind, :num, :name) + + # Mixed into each container's `Child::Instance` to expose its place in the worker + # hierarchy. The frame stack is built from the object graph (this instance plus its + # parent chain), so worker code that holds the instance - e.g. an async-service + # `prepare!(instance)` hook - can read its durable worker number with no process or + # thread globals. + module Context + # The instance of the worker this one is nested inside, or `nil` at the top level. + # A {Hybrid} thread's parent is its fork; a plain {Forked}/{Threaded} worker has none. + attr_accessor :parent + + # The execution context as a {Frame} stack, outermost level first. + # @returns [Array(Frame)] e.g. `[process, thread]` for a Hybrid thread worker. + def context + (parent ? parent.context : []) + [Frame.new(kind: kind, num: num, name: name)] + end + end + end +end diff --git a/lib/async/container/forked.rb b/lib/async/container/forked.rb index 15cc05f..91a0db5 100644 --- a/lib/async/container/forked.rb +++ b/lib/async/container/forked.rb @@ -7,6 +7,7 @@ require_relative "generic" require_relative "channel" +require_relative "context" require_relative "notify/pipe" module Async @@ -22,10 +23,12 @@ def self.multiprocess? class Child < Channel # Represents a running child process from the point of view of the child process. class Instance < Notify::Pipe + include Context + # Wrap an instance around the {Process} instance from within the forked child. # @parameter process [Process] The process intance to wrap. - def self.for(process) - instance = self.new(process.out) + def self.for(process, instance_num: nil) + instance = self.new(process.out, num: instance_num) # The child process won't be reading from the channel: process.close_read @@ -38,10 +41,19 @@ def self.for(process) # Initialize the child process instance. # # @parameter io [IO] The IO object to use for communication. - def initialize(io) - super + def initialize(io, num: nil) + super(io) @name = nil + @num = num + end + + # @returns [Integer | Nil] The container-scoped ordinal of this worker. + attr :num + + # @returns [Symbol] The kind of worker this instance represents. + def kind + :process end # Generate a hash representation of the process. @@ -51,6 +63,7 @@ def as_json(...) { process_id: ::Process.pid, name: @name, + num: @num, } end @@ -98,9 +111,9 @@ def exec(*arguments, ready: true, **options) # Fork a child process appropriate for a container. # # @returns [Process] - def self.fork(**options) + def self.fork(instance_num: nil, **options) # $stderr.puts fork: caller - self.new(**options) do |process| + self.new(instance_num: instance_num, **options) do |process| ::Process.fork do # We use `Thread.current.raise(...)` so that exceptions are filtered through `Thread.handle_interrupt` correctly. Signal.trap(:INT){::Thread.current.raise(Interrupt)} @@ -109,7 +122,7 @@ def self.fork(**options) # This could be a configuration option: ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(process) + yield Instance.for(process, instance_num: instance_num) rescue Interrupt # Graceful exit. rescue Exception => error @@ -138,10 +151,11 @@ def self.spawn(*arguments, name: nil, **options) # Initialize the process. # @parameter name [String] The name to use for the child process. - def initialize(name: nil, **options) + def initialize(name: nil, instance_num: nil, **options) super(**options) @name = name + @instance_num = instance_num @status = nil @pid = nil @@ -185,6 +199,9 @@ def name= value # @attribute [Integer] The process identifier. attr :pid + # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. + attr :instance_num + # A human readable representation of the process. # @returns [String] def inspect @@ -265,8 +282,8 @@ def wait(timeout = 0.1) # Start a named child process and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, &block) - Child.fork(name: name, &block) + def start(name, instance_num: nil, &block) + Child.fork(name: name, instance_num: instance_num, &block) end end end diff --git a/lib/async/container/generic.rb b/lib/async/container/generic.rb index d7e1551..22af78d 100644 --- a/lib/async/container/generic.rb +++ b/lib/async/container/generic.rb @@ -54,6 +54,11 @@ def initialize(policy: Policy::DEFAULT, **options) @policy = policy @statistics = @policy.make_statistics @keyed = {} + # Container-scoped allocation of worker ordinals (`instance_num`): a monotonic + # counter plus a free set, so a num released by a permanently exited worker is + # recycled, keeping the range compact (e.g. for multiprocess metric files). + @next_instance_num = 0 + @free_instance_nums = Set.new end # @attribute [Group] The group of running children instances. @@ -213,7 +218,7 @@ def stop(timeout = true) # @parameter key [Symbol] A key used for reloading child instances. # @parameter health_check_timeout [Numeric | Nil] The maximum time a child instance can run without updating its state, before it is terminated as unhealthy. # @parameter startup_timeout [Numeric | Nil] The maximum time a child instance can run without becoming ready, before it is terminated as unhealthy. - def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startup_timeout: nil, &block) + def spawn(name: nil, restart: false, key: nil, instance_num: nil, health_check_timeout: nil, startup_timeout: nil, &block) name ||= UNNAMED if mark?(key) @@ -221,13 +226,19 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu return false end + # Allocate before the fiber so the closure captures the num and it stays unchanged + # across a restart (which re-enters `start` in the same fiber). Only release a num + # we allocated ourselves, and only when the worker permanently exits. + owned = instance_num.nil? + instance_num ||= acquire_instance_num + @statistics.spawn! fiber do until @stopping Console.debug(self, "Starting child...", child: {key: key, name: name, restart: restart, health_check_timeout: health_check_timeout}, statistics: @statistics) - child = self.start(name, &block) + child = self.start(name, instance_num: instance_num, &block) state = insert(key, child) # Notify policy of spawn @@ -300,11 +311,34 @@ def spawn(name: nil, restart: false, key: nil, health_check_timeout: nil, startu break end end + ensure + release_instance_num(instance_num) if owned end.resume return true end + # Allocate a container-scoped worker ordinal, recycling the lowest released num. + # Allocation runs on the single cooperative reactor thread (acquire in the run loop, + # release in the fiber's ensure), so no synchronisation is required. + protected def acquire_instance_num + unless @free_instance_nums.empty? + num = @free_instance_nums.min + @free_instance_nums.delete(num) + return num + end + + num = @next_instance_num + @next_instance_num += 1 + num + end + + # Return a worker ordinal to the free set once its worker has permanently exited. Using a + # Set makes release idempotent, so a double release can't hand the same num to two workers. + protected def release_instance_num(instance_num) + @free_instance_nums.add(instance_num) + end + # Run multiple instances of the same block in the container. # @parameter count [Integer] The number of instances to start. def run(count: Container.processor_count, **options, &block) diff --git a/lib/async/container/hybrid.rb b/lib/async/container/hybrid.rb index 8c6f4f5..e23270f 100644 --- a/lib/async/container/hybrid.rb +++ b/lib/async/container/hybrid.rb @@ -26,7 +26,12 @@ def run(count: nil, forks: nil, threads: nil, health_check_timeout: nil, **optio self.spawn(**options) do |instance| container = Threaded.new - container.run(count: threads, health_check_timeout: health_check_timeout, **options, &block) + # Link each inner thread worker to this fork, so the thread instance can reach + # the durable process num via `instance.parent` / `instance.context`. + container.run(count: threads, health_check_timeout: health_check_timeout, **options) do |worker| + worker.parent = instance + block.call(worker) + end container.wait_until_ready instance.ready! diff --git a/lib/async/container/threaded.rb b/lib/async/container/threaded.rb index b1be095..5bc6af0 100644 --- a/lib/async/container/threaded.rb +++ b/lib/async/container/threaded.rb @@ -5,6 +5,7 @@ require_relative "generic" require_relative "channel" +require_relative "context" require_relative "notify/pipe" module Async @@ -41,10 +42,12 @@ def error # Represents a running child thread from the point of view of the child thread. class Instance < Notify::Pipe + include Context + # Wrap an instance around the {Thread} instance from within the threaded child. # @parameter thread [Thread] The thread intance to wrap. - def self.for(thread) - instance = self.new(thread.out) + def self.for(thread, instance_num: nil) + instance = self.new(thread.out, num: instance_num) return instance end @@ -52,10 +55,19 @@ def self.for(thread) # Initialize the child thread instance. # # @parameter io [IO] The IO object to use for communication with the parent. - def initialize(io) + def initialize(io, num: nil) @thread = ::Thread.current + @num = num - super + super(io) + end + + # @returns [Integer | Nil] The container-scoped ordinal of this worker. + attr :num + + # @returns [Symbol] The kind of worker this instance represents. + def kind + :thread end # Generate a hash representation of the thread. @@ -66,6 +78,7 @@ def as_json(...) process_id: ::Process.pid, thread_id: @thread.object_id, name: @thread.name, + num: @num, } end @@ -111,12 +124,12 @@ def exec(*arguments, ready: true, **options) # Start a new child thread and execute the provided block in it. # # @parameter options [Hash] Additional options to to the new child instance. - def self.fork(**options) - self.new(**options) do |thread| + def self.fork(instance_num: nil, **options) + self.new(instance_num: instance_num, **options) do |thread| ::Thread.new do # This could be a configuration option (see forked implementation too): ::Thread.handle_interrupt(SignalException => :immediate) do - yield Instance.for(thread) + yield Instance.for(thread, instance_num: instance_num) end end end @@ -125,9 +138,10 @@ def self.fork(**options) # Initialize the thread. # # @parameter name [String] The name to use for the child thread. - def initialize(name: nil, **options) + def initialize(name: nil, instance_num: nil, **options) super(**options) + @instance_num = instance_num @status = nil @thread = yield(self) @@ -150,6 +164,9 @@ def initialize(name: nil, **options) end end + # @attribute [Integer | Nil] The container-scoped ordinal of the worker this child represents. + attr :instance_num + # Convert the child process to a hash, suitable for serialization. # # @returns [Hash] The request as a hash. @@ -286,8 +303,8 @@ def finished(error = nil) # Start a named child thread and execute the provided block in it. # @parameter name [String] The name (title) of the child process. # @parameter block [Proc] The block to execute in the child process. - def start(name, &block) - Child.fork(name: name, &block) + def start(name, instance_num: nil, &block) + Child.fork(name: name, instance_num: instance_num, &block) end end end diff --git a/test/async/container/context.rb b/test/async/container/context.rb new file mode 100644 index 0000000..cd50056 --- /dev/null +++ b/test/async/container/context.rb @@ -0,0 +1,64 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Udi Oron. + +require "async/container/threaded" +require "async/container/forked" +require "async/container/hybrid" +require "async/container/best" + +# Have the worker serialise something about its instance to a pipe, one line per worker. +def report_from_worker(container, **run_options) + input, output = IO.pipe + container.run(**run_options) do |instance| + output.write(yield(instance) + "\n") + end + container.wait + output.close + input.read.lines.map(&:chomp) +ensure + input&.close unless input&.closed? +end + +describe Async::Container::Threaded do + it "exposes a single :thread frame and no parent via instance.context" do + reported = report_from_worker(subject.new, count: 1) do |instance| + "#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}" + end + + expect(reported).to be == ["thread:0|parent=nil"] + end +end + +describe Async::Container::Forked do + it "exposes a single :process frame and no parent via instance.context" do + reported = report_from_worker(subject.new, count: 1) do |instance| + "#{instance.context.map {|f| "#{f.kind}:#{f.num}"}.join(",")}|parent=#{instance.parent.inspect}" + end + + expect(reported).to be == ["process:0|parent=nil"] + end +end if Async::Container.fork? + +describe Async::Container::Hybrid do + it "exposes [:process, :thread] frames via instance.context" do + reported = report_from_worker(subject.new, count: 1, forks: 1, threads: 1) do |instance| + instance.context.map {|f| f.kind}.join(",") + end + + expect(reported).to be == ["process,thread"] + end + + it "reaches the durable forked num through instance.parent (not the thread num)" do + reported = report_from_worker(subject.new, count: 2, forks: 2, threads: 1) do |instance| + "#{instance.kind}/#{instance.num} parent=#{instance.parent&.kind}/#{instance.parent&.num}" + end + + # Both workers are thread num 0 within their fork; the durable forked num is on the parent. + expect(reported.sort).to be == [ + "thread/0 parent=process/0", + "thread/0 parent=process/1", + ] + end +end if Async::Container.fork? diff --git a/test/async/container/instance_num.rb b/test/async/container/instance_num.rb new file mode 100644 index 0000000..af25ac2 --- /dev/null +++ b/test/async/container/instance_num.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2026, by Udi Oron. + +require "async/container/threaded" +require "async/container/forked" +require "async/container/best" + +# Collect what each worker reports about its own instance context. +def collect_worker_context(container, count: 3, **options) + input, output = IO.pipe + container.run(count: count, **options) do |instance| + output.write("#{instance.num}:#{instance.kind}\n") + end + container.wait + output.close + input.read.lines.map(&:chomp) +ensure + input&.close unless input&.closed? +end + +describe Async::Container::Generic do + let(:container) {Async::Container::Threaded.new} + + with "#acquire_instance_num" do + it "assigns sequential nums starting at 0" do + expect(container.send(:acquire_instance_num)).to be == 0 + expect(container.send(:acquire_instance_num)).to be == 1 + expect(container.send(:acquire_instance_num)).to be == 2 + end + + it "reuses the lowest released num before extending the range" do + 3.times {container.send(:acquire_instance_num)} # => 0, 1, 2 + container.send(:release_instance_num, 1) + + expect(container.send(:acquire_instance_num)).to be == 1 # reused + expect(container.send(:acquire_instance_num)).to be == 3 # then extends + end + + it "does not hand out the same num twice when a num is released more than once" do + 2.times {container.send(:acquire_instance_num)} # => 0, 1 + container.send(:release_instance_num, 0) + container.send(:release_instance_num, 0) # double release must be idempotent + + expect(container.send(:acquire_instance_num)).to be == 0 # the recycled num + expect(container.send(:acquire_instance_num)).to be == 2 # not 0 again + end + end + + with "keyed reuse" do + it "does not allocate a num for a reused keyed child" do + container.spawn(key: :web) {sleep} # allocates 0, registers the key + reused = container.spawn(key: :web) {sleep} # mark? hit => returns before allocating + + expect(reused).to be == false + # If the second spawn had allocated, the next free num would be 2: + expect(container.send(:acquire_instance_num)).to be == 1 + ensure + container.stop(false) + end + end +end + +describe Async::Container::Threaded do + let(:container) {subject.new} + + it "exposes instance.num and instance.kind to the worker (kind: :thread)" do + reported = collect_worker_context(container, count: 3) + + nums = reported.map {|line| line.split(":").first.to_i} + kinds = reported.map {|line| line.split(":").last}.uniq + + expect(nums.sort).to be == [0, 1, 2] + expect(kinds).to be == ["thread"] + expect(container.statistics).to have_attributes(failures: be == 0) + end + + it "preserves instance.num across a restart" do + trigger = IO.pipe + nums = IO.pipe + + runner = Thread.new do + container.spawn(restart: true) do |instance| + nums.last.puts(instance.num.to_s) + trigger.first.gets # block until told to exit, then the worker restarts + end + container.wait + end + + reported = [] + 2.times do + reported << nums.first.gets.to_i + trigger.last.puts("die") + end + + runner.kill + runner.join + + # Same num allocated for both incarnations (num is captured outside the restart loop): + expect(reported).to be == [reported.first, reported.first] + end +end + +describe Async::Container::Forked do + let(:container) {subject.new} + + it "exposes instance.num and instance.kind to the worker (kind: :process)" do + reported = collect_worker_context(container, count: 3) + + nums = reported.map {|line| line.split(":").first.to_i} + kinds = reported.map {|line| line.split(":").last}.uniq + + expect(nums.sort).to be == [0, 1, 2] + expect(kinds).to be == ["process"] + expect(container.statistics).to have_attributes(failures: be == 0) + end +end if Async::Container.fork?