diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 12a17a1..0f826c2 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -925,6 +925,34 @@ module AsyncSeq = | Choice1Of2 value -> return value | Choice2Of2 ex -> return raise ex }) } + + let mapAsyncUnorderedParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq { + use mb = MailboxProcessor.Start (fun _ -> async.Return()) + use sm = new SemaphoreSlim(parallelism) + let! err = + s + |> iterAsync (fun a -> async { + do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError + let! b = Async.StartChild (async { + try + let! result = f a + sm.Release() |> ignore + return Choice1Of2 result + with ex -> + sm.Release() |> ignore + return Choice2Of2 ex + }) + mb.Post (Some b) }) + |> Async.map (fun _ -> mb.Post None) + |> Async.StartChildAsTask + yield! + replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive)) + |> mapAsync (fun childAsync -> async { + let! result = childAsync + match result with + | Choice1Of2 value -> return value + | Choice2Of2 ex -> return raise ex }) + } #endif let chooseAsync f (source:AsyncSeq<'T>) = diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index 7bb2017..a1afd9f 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -543,6 +543,14 @@ module AsyncSeq = /// Parallelism is bound by the ThreadPool. val mapAsyncUnorderedParallel : mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U> + /// Builds a new asynchronous sequence whose elements are generated by + /// applying the specified function to all elements of the input sequence, + /// with at most parallelism mapping operations running concurrently. + /// + /// The function is applied to elements in parallel (throttled), and results are emitted + /// in the order they complete (unordered), without preserving the original order. + val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U> + /// Applies a key-generating function to each element and returns an async sequence containing unique keys /// and async sequences containing elements corresponding to the key. /// diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index 964722c..ffb9c10 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -1717,6 +1717,59 @@ let ``AsyncSeq.mapAsyncUnorderedParallel should not preserve order`` () = Assert.IsTrue(allPresent, "All input elements should be present in results") +[] +let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should produce all results`` () = + let input = [1; 2; 3; 4; 5] + let expected = [2; 4; 6; 8; 10] |> Set.ofList + + let actual = + input + |> AsyncSeq.ofSeq + |> AsyncSeq.mapAsyncUnorderedParallelThrottled 3 (fun x -> async { + do! Async.Sleep(10) + return x * 2 + }) + |> AsyncSeq.toListAsync + |> runTest + |> Set.ofList + + Assert.AreEqual(expected, actual) + +[] +let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should propagate handler exception`` () = + let res = + AsyncSeq.init 100L id + |> AsyncSeq.mapAsyncUnorderedParallelThrottled 10 (fun i -> async { + if i = 50L then return failwith "oh no" + else return i * 2L + }) + |> AsyncSeq.toListAsync + |> Async.Catch + |> (fun x -> Async.RunSynchronously (x, timeout = 10000)) + + match res with + | Choice2Of2 _ -> () + | Choice1Of2 _ -> Assert.Fail ("error expected") + +[] +let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should throttle`` () = + let count = ref 0 + let parallelism = 5 + + let result = + AsyncSeq.init 50L id + |> AsyncSeq.mapAsyncUnorderedParallelThrottled parallelism (fun i -> async { + let c = Interlocked.Increment count + if c > parallelism then + return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism) + do! Async.Sleep 5 + Interlocked.Decrement count |> ignore + return i * 2L }) + |> AsyncSeq.toListAsync + |> Async.RunSynchronously + + Assert.AreEqual(50, result.Length) + //[] //let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () = // let ls = List.init 500 id