Skip to content
34 changes: 25 additions & 9 deletions Sources/ScreenStatetKit/Actions/AsyncAction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,29 @@ public typealias AsyncActionPut<Input> = AsyncAction<Input,Void>
public struct AsyncAction<Input,Output>: Sendable
where Input: Sendable, Output: Sendable {

public typealias WorkAction = @Sendable (Input) async throws -> Output
public typealias WorkAction = @Sendable @isolated(any) (Input) async throws -> Output
public let name: String?

private let identifier = UUID().uuidString
private let identifier = UUID()
private let action: WorkAction

public init (_ action: @escaping WorkAction) {
public init (name: String? = .none,
_ action: @escaping WorkAction) {
self.name = name
self.action = action
}

@discardableResult
public func asyncExecute(_ input: Input) async throws -> Output {
public func asyncExecute(isolation: isolated (any Actor)? = #isolation,
_ input: Input) async throws -> Output {
try await action(input)
}
}

extension AsyncAction where Input == Void {

@discardableResult
public func asyncExecute() async throws -> Output {
public func asyncExecute(isolation: isolated (any Actor)? = #isolation) async throws -> Output {
try await action(Void())
}
}
Expand All @@ -42,8 +46,14 @@ extension AsyncAction where Input == Void {
extension AsyncAction where Output == Void {

public func execute(_ input: Input) {
Task {
try await action(input)
if #available(iOS 26.0, macOS 26.0, *) {
Task.immediate {
try await action(input)
}
} else {
Task {
try await action(input)
}
}
}
}
Expand All @@ -52,8 +62,14 @@ extension AsyncAction where Output == Void {
extension AsyncAction where Output == Void, Input == Void {

public func execute() {
Task {
try await action(Void())
if #available(iOS 26.0, macOS 26.0, *) {
Task.immediate {
try await action(Void())
}
} else {
Task {
try await action(Void())
}
}
}
}
Expand Down
75 changes: 62 additions & 13 deletions Sources/ScreenStatetKit/AsyncStream/StreamProducerType.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ public actor StreamProducer<Element>: StreamProducerType where Element: Sendable

typealias Continuation = AsyncStream<Element>.Continuation

public let withLatest: Bool
private var continuations: [String:Continuation] = [:]
private let storage = StreamStorage()
private var latestElement: Element?

public let withLatest: Bool

/// Events stream
public var stream: AsyncStream<Element> {
AsyncStream { continuation in
Expand All @@ -46,41 +47,89 @@ public actor StreamProducer<Element>: StreamProducerType where Element: Sendable
if withLatest {
latestElement = element
}
continuations.values.forEach({ $0.yield(element) })
storage.emit(element: element)
}

public func finish() {
continuations.values.forEach({ $0.finish() })
continuations.removeAll()
storage.finish()
}

private func append(_ continuation: Continuation) {
let key = UUID().uuidString
continuation.onTermination = {[weak self] _ in
self?.onTermination(forKey: key)
}
continuations.updateValue(continuation, forKey: key)
storage.update(continuation, forKey: key)
}

private func removeContinuation(forKey key: String) {
continuations.removeValue(forKey: key)
storage.removeContinuation(forKey: key)
}

nonisolated private func onTermination(forKey key: String) {
Task(priority: .high) {
await removeContinuation(forKey: key)
if #available(iOS 26.0, macOS 26.0, *) {
Task.immediate {
await removeContinuation(forKey: key)
}
} else {
Task(priority: .high) {
await removeContinuation(forKey: key)
}
}
}

@available(*, deprecated, renamed: "finish", message: "The Stream will be automatically finished when deallocated. No need to call it manually.")
public nonisolated func nonIsolatedFinish() {
Task(priority: .high) {
await finish()
if #available(iOS 26.0, macOS 26.0, *) {
Task.immediate {
await finish()
}
} else {
Task(priority: .high) {
await finish()
}
}
}

public nonisolated func nonIsolatedEmit(_ element: Element) {
Task(priority: .high) {
await emit(element: element)
if #available(iOS 26.0, macOS 26.0, *) {
Task.immediate {
await emit(element: element)
}
} else {
Task(priority: .high) {
await emit(element: element)
}
}
}
}

//MARK: - Storage
extension StreamProducer {
private final class StreamStorage {

private var continuations: [String:Continuation] = [:]

func emit(element: Element) {
continuations.values.forEach({ $0.yield(element) })
}

func update(_ continuation: Continuation, forKey key: String) {
continuations.updateValue(continuation, forKey: key)
}

func removeContinuation(forKey key: String) {
continuations.removeValue(forKey: key)
}

func finish() {
continuations.values.forEach({ $0.finish() })
continuations.removeAll()
}

deinit {
finish()
}
}
}

Loading
Loading