From 4c8d251f6b2718175aad77131ff044549a97868e Mon Sep 17 00:00:00 2001 From: GitHub Copilot Date: Sat, 21 Feb 2026 03:35:20 +0000 Subject: [PATCH] Add AsyncSeq.chunkBy and AsyncSeq.chunkByAsync Groups consecutive elements sharing the same key, yielding AsyncSeq<'Key * 'T list>. Mirrors the existing chunkBySize pattern. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/FSharp.Control.AsyncSeq/AsyncSeq.fs | 27 ++++++++++++++++ src/FSharp.Control.AsyncSeq/AsyncSeq.fsi | 8 +++++ .../AsyncSeqTests.fs | 32 +++++++++++++++++++ 3 files changed, 67 insertions(+) diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index f749ea7..e73a4d1 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -1558,6 +1558,33 @@ module AsyncSeq = let bufferByCount (bufferSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> = chunkBySize bufferSize source + let chunkByAsync (projection:'T -> Async<'Key>) (source:AsyncSeq<'T>) : AsyncSeq<'Key * 'T list> = asyncSeq { + use ie = source.GetEnumerator() + let! move = ie.MoveNext() + let b = ref move + if b.Value.IsSome then + let! key0 = projection b.Value.Value + let mutable currentKey = key0 + let buffer = ResizeArray<'T>() + buffer.Add b.Value.Value + let! moveNext = ie.MoveNext() + b := moveNext + while b.Value.IsSome do + let! key = projection b.Value.Value + if key = currentKey then + buffer.Add b.Value.Value + else + yield (currentKey, buffer |> Seq.toList) + currentKey <- key + buffer.Clear() + buffer.Add b.Value.Value + let! moveNext = ie.MoveNext() + b := moveNext + yield (currentKey, buffer |> Seq.toList) } + + let chunkBy (projection:'T -> 'Key) (source:AsyncSeq<'T>) : AsyncSeq<'Key * 'T list> = + chunkByAsync (projection >> async.Return) source + #if !FABLE_COMPILER let toSortedSeq fn source = toArrayAsync source |> Async.map fn |> Async.RunSynchronously diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index 15c1bf5..a759fb6 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -484,6 +484,14 @@ module AsyncSeq = [] val bufferByCount : bufferSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []> + /// Groups consecutive elements of the async sequence that share the same key (as computed by an async projection) + /// and yields each group as a pair of the key and a list of elements. + val chunkByAsync<'T, 'Key when 'Key : equality> : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * 'T list> + + /// Groups consecutive elements of the async sequence that share the same key (as computed by a projection) + /// and yields each group as a pair of the key and a list of elements. + val chunkBy<'T, 'Key when 'Key : equality> : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * 'T list> + #if !FABLE_COMPILER /// Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed. val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []> diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index 6a44cf4..964722c 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -2611,3 +2611,35 @@ let ``AsyncSeqOp.FoldAsync with exception in folder should propagate`` () = () // Expected } |> Async.RunSynchronously +[] +let ``AsyncSeq.chunkBy empty sequence returns empty`` () = + let result = AsyncSeq.chunkBy id AsyncSeq.empty |> AsyncSeq.toListSynchronously + Assert.AreEqual([], result) + +[] +let ``AsyncSeq.chunkBy single element`` () = + let source = asyncSeq { yield 42 } + let result = AsyncSeq.chunkBy id source |> AsyncSeq.toListSynchronously + Assert.AreEqual([(42, [42])], result) + +[] +let ``AsyncSeq.chunkBy groups consecutive equal keys`` () = + let source = asyncSeq { yield 1; yield 1; yield 2; yield 2; yield 1 } + let result = AsyncSeq.chunkBy id source |> AsyncSeq.toListSynchronously + Assert.AreEqual([(1, [1;1]); (2, [2;2]); (1, [1])], result) + +[] +let ``AsyncSeq.chunkBy with projection`` () = + let source = asyncSeq { yield 1; yield 3; yield 2; yield 4; yield 5 } + let result = AsyncSeq.chunkBy (fun x -> x % 2 = 0) source |> AsyncSeq.toListSynchronously + Assert.AreEqual([(false, [1;3]); (true, [2;4]); (false, [5])], result) + +[] +let ``AsyncSeq.chunkByAsync groups consecutive equal keys`` () = + async { + let source = asyncSeq { yield 1; yield 1; yield 2; yield 2 } + let result = AsyncSeq.chunkByAsync (fun x -> async { return x }) source |> AsyncSeq.toListSynchronously + Assert.AreEqual([(1, [1;1]); (2, [2;2])], result) + } |> Async.RunSynchronously + +