Skip to content

Commit 49ab95a

Browse files
committed
Better Swift Concurrenty Integration
1 parent f46b963 commit 49ab95a

File tree

2 files changed

+135
-0
lines changed

2 files changed

+135
-0
lines changed

Sources/Signal.swift

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,11 +221,39 @@ extension Signal where Error == Swift.Error {
221221
public init(catching body: @escaping () throws -> Element) {
222222
self = Signal(result: Result(catching: body))
223223
}
224+
225+
/// Create a new signal by awaiting an async closure.
226+
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
227+
public init(await produce: @escaping () async throws -> Element) {
228+
self.init { observer in
229+
let task = Task {
230+
do {
231+
let element = try await produce()
232+
observer.receive(lastElement: element)
233+
} catch {
234+
observer.receive(completion: .failure(error))
235+
}
236+
}
237+
return BlockDisposable(task.cancel)
238+
}
239+
}
224240
}
225241

226242
#if !XCFRAMEWORK
227243

228244
extension Signal where Error == Never {
245+
246+
/// Create a new signal by awaiting an async closure.
247+
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
248+
public init(await produce: @escaping () async -> Element) {
249+
self.init { observer in
250+
let task = Task {
251+
let element = await produce()
252+
observer.receive(lastElement: element)
253+
}
254+
return BlockDisposable(task.cancel)
255+
}
256+
}
229257

230258
/// Create a new signal and assign its next element observer to the given variable.
231259
/// Calling the closure assigned to the varaible will send the next element on the signal.

Sources/SignalProtocol+Async.swift

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
//
2+
// The MIT License (MIT)
3+
//
4+
// Copyright (c) 2016-2019 Srdan Rasic (@srdanrasic)
5+
//
6+
// Permission is hereby granted, free of charge, to any person obtaining a copy
7+
// of this software and associated documentation files (the "Software"), to deal
8+
// in the Software without restriction, including without limitation the rights
9+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
// copies of the Software, and to permit persons to whom the Software is
11+
// furnished to do so, subject to the following conditions:
12+
//
13+
// The above copyright notice and this permission notice shall be included in
14+
// all copies or substantial portions of the Software.
15+
//
16+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22+
// THE SOFTWARE.
23+
//
24+
25+
import Foundation
26+
27+
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
28+
extension SignalProtocol where Error == Never {
29+
30+
public func toAsyncStream() -> AsyncStream<Element> {
31+
AsyncStream<Element> { continuation in
32+
let disposable = self.observe { event in
33+
switch event {
34+
case .next(let element):
35+
continuation.yield(element)
36+
case .completed:
37+
continuation.finish()
38+
}
39+
}
40+
continuation.onTermination = { @Sendable _ in
41+
disposable.dispose()
42+
}
43+
}
44+
}
45+
}
46+
47+
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
48+
extension SignalProtocol where Error == Swift.Error {
49+
50+
public func toAsyncThrowingStream() -> AsyncThrowingStream<Element, Error> {
51+
AsyncThrowingStream<Element, Error> { continuation in
52+
let disposable = self.observe { event in
53+
switch event {
54+
case .next(let element):
55+
continuation.yield(element)
56+
case .failed(let error):
57+
continuation.finish(throwing: error)
58+
case .completed:
59+
continuation.finish()
60+
}
61+
}
62+
continuation.onTermination = { @Sendable _ in
63+
disposable.dispose()
64+
}
65+
}
66+
}
67+
}
68+
69+
@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
70+
extension AsyncSequence {
71+
72+
public func toSignal() -> Signal<Element, Swift.Error> {
73+
Signal<Element, Swift.Error> { observer in
74+
let task = Task {
75+
do {
76+
for try await element in self {
77+
observer.receive(element)
78+
}
79+
observer.receive(completion: .finished)
80+
} catch {
81+
observer.receive(completion: .failure(error))
82+
}
83+
}
84+
return BlockDisposable(task.cancel)
85+
}
86+
}
87+
}
88+
89+
@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, *)
90+
extension AsyncSequence {
91+
92+
public func toFullyTypedSignal() -> Signal<Element, Failure> {
93+
Signal<Element, Failure> { observer in
94+
let task = Task {
95+
do {
96+
for try await element in self {
97+
observer.receive(element)
98+
}
99+
observer.receive(completion: .finished)
100+
} catch let error as Failure {
101+
observer.receive(completion: .failure(error))
102+
}
103+
}
104+
return BlockDisposable(task.cancel)
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)