diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs
index 12a17a1..0f826c2 100644
--- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs
+++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs
@@ -925,6 +925,34 @@ module AsyncSeq =
| Choice1Of2 value -> return value
| Choice2Of2 ex -> return raise ex })
}
+
+ let mapAsyncUnorderedParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
+ use mb = MailboxProcessor.Start (fun _ -> async.Return())
+ use sm = new SemaphoreSlim(parallelism)
+ let! err =
+ s
+ |> iterAsync (fun a -> async {
+ do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError
+ let! b = Async.StartChild (async {
+ try
+ let! result = f a
+ sm.Release() |> ignore
+ return Choice1Of2 result
+ with ex ->
+ sm.Release() |> ignore
+ return Choice2Of2 ex
+ })
+ mb.Post (Some b) })
+ |> Async.map (fun _ -> mb.Post None)
+ |> Async.StartChildAsTask
+ yield!
+ replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
+ |> mapAsync (fun childAsync -> async {
+ let! result = childAsync
+ match result with
+ | Choice1Of2 value -> return value
+ | Choice2Of2 ex -> return raise ex })
+ }
#endif
let chooseAsync f (source:AsyncSeq<'T>) =
diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
index 7bb2017..a1afd9f 100644
--- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
+++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
@@ -543,6 +543,14 @@ module AsyncSeq =
/// Parallelism is bound by the ThreadPool.
val mapAsyncUnorderedParallel : mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
+ /// Builds a new asynchronous sequence whose elements are generated by
+ /// applying the specified function to all elements of the input sequence,
+ /// with at most parallelism mapping operations running concurrently.
+ ///
+ /// The function is applied to elements in parallel (throttled), and results are emitted
+ /// in the order they complete (unordered), without preserving the original order.
+ val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
+
/// Applies a key-generating function to each element and returns an async sequence containing unique keys
/// and async sequences containing elements corresponding to the key.
///
diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
index 964722c..ffb9c10 100644
--- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
+++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
@@ -1717,6 +1717,59 @@ let ``AsyncSeq.mapAsyncUnorderedParallel should not preserve order`` () =
Assert.IsTrue(allPresent, "All input elements should be present in results")
+[]
+let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should produce all results`` () =
+ let input = [1; 2; 3; 4; 5]
+ let expected = [2; 4; 6; 8; 10] |> Set.ofList
+
+ let actual =
+ input
+ |> AsyncSeq.ofSeq
+ |> AsyncSeq.mapAsyncUnorderedParallelThrottled 3 (fun x -> async {
+ do! Async.Sleep(10)
+ return x * 2
+ })
+ |> AsyncSeq.toListAsync
+ |> runTest
+ |> Set.ofList
+
+ Assert.AreEqual(expected, actual)
+
+[]
+let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should propagate handler exception`` () =
+ let res =
+ AsyncSeq.init 100L id
+ |> AsyncSeq.mapAsyncUnorderedParallelThrottled 10 (fun i -> async {
+ if i = 50L then return failwith "oh no"
+ else return i * 2L
+ })
+ |> AsyncSeq.toListAsync
+ |> Async.Catch
+ |> (fun x -> Async.RunSynchronously (x, timeout = 10000))
+
+ match res with
+ | Choice2Of2 _ -> ()
+ | Choice1Of2 _ -> Assert.Fail ("error expected")
+
+[]
+let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should throttle`` () =
+ let count = ref 0
+ let parallelism = 5
+
+ let result =
+ AsyncSeq.init 50L id
+ |> AsyncSeq.mapAsyncUnorderedParallelThrottled parallelism (fun i -> async {
+ let c = Interlocked.Increment count
+ if c > parallelism then
+ return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism)
+ do! Async.Sleep 5
+ Interlocked.Decrement count |> ignore
+ return i * 2L })
+ |> AsyncSeq.toListAsync
+ |> Async.RunSynchronously
+
+ Assert.AreEqual(50, result.Length)
+
//[]
//let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () =
// let ls = List.init 500 id