Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,33 @@ module AsyncSeq =
let bufferByCount (bufferSize:int) (source:AsyncSeq<'T>) : AsyncSeq<'T[]> =
chunkBySize bufferSize source

let chunkByAsync (projection:'T -> Async<'Key>) (source:AsyncSeq<'T>) : AsyncSeq<'Key * 'T list> = asyncSeq {
use ie = source.GetEnumerator()
let! move = ie.MoveNext()
let b = ref move
if b.Value.IsSome then
let! key0 = projection b.Value.Value
let mutable currentKey = key0
let buffer = ResizeArray<'T>()
buffer.Add b.Value.Value
let! moveNext = ie.MoveNext()
b := moveNext
while b.Value.IsSome do
let! key = projection b.Value.Value
if key = currentKey then
buffer.Add b.Value.Value
else
yield (currentKey, buffer |> Seq.toList)
currentKey <- key
buffer.Clear()
buffer.Add b.Value.Value
let! moveNext = ie.MoveNext()
b := moveNext
yield (currentKey, buffer |> Seq.toList) }

let chunkBy (projection:'T -> 'Key) (source:AsyncSeq<'T>) : AsyncSeq<'Key * 'T list> =
chunkByAsync (projection >> async.Return) source

#if !FABLE_COMPILER
let toSortedSeq fn source =
toArrayAsync source |> Async.map fn |> Async.RunSynchronously
Expand Down
8 changes: 8 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,14 @@ module AsyncSeq =
[<Obsolete("Use AsyncSeq.chunkBySize instead")>]
val bufferByCount : bufferSize:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>

/// Groups consecutive elements of the async sequence that share the same key (as computed by an async projection)
/// and yields each group as a pair of the key and a list of elements.
val chunkByAsync<'T, 'Key when 'Key : equality> : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * 'T list>

/// Groups consecutive elements of the async sequence that share the same key (as computed by a projection)
/// and yields each group as a pair of the key and a list of elements.
val chunkBy<'T, 'Key when 'Key : equality> : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * 'T list>

#if !FABLE_COMPILER
/// Buffer items from the async sequence until a specified buffer size is reached or a specified amount of time is elapsed.
val bufferByCountAndTime : bufferSize:int -> timeoutMs:int -> source:AsyncSeq<'T> -> AsyncSeq<'T []>
Expand Down
32 changes: 32 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2611,3 +2611,35 @@ let ``AsyncSeqOp.FoldAsync with exception in folder should propagate`` () =
() // Expected
} |> Async.RunSynchronously

[<Test>]
let ``AsyncSeq.chunkBy empty sequence returns empty`` () =
let result = AsyncSeq.chunkBy id AsyncSeq.empty<int> |> AsyncSeq.toListSynchronously
Assert.AreEqual([], result)

[<Test>]
let ``AsyncSeq.chunkBy single element`` () =
let source = asyncSeq { yield 42 }
let result = AsyncSeq.chunkBy id source |> AsyncSeq.toListSynchronously
Assert.AreEqual([(42, [42])], result)

[<Test>]
let ``AsyncSeq.chunkBy groups consecutive equal keys`` () =
let source = asyncSeq { yield 1; yield 1; yield 2; yield 2; yield 1 }
let result = AsyncSeq.chunkBy id source |> AsyncSeq.toListSynchronously
Assert.AreEqual([(1, [1;1]); (2, [2;2]); (1, [1])], result)

[<Test>]
let ``AsyncSeq.chunkBy with projection`` () =
let source = asyncSeq { yield 1; yield 3; yield 2; yield 4; yield 5 }
let result = AsyncSeq.chunkBy (fun x -> x % 2 = 0) source |> AsyncSeq.toListSynchronously
Assert.AreEqual([(false, [1;3]); (true, [2;4]); (false, [5])], result)

[<Test>]
let ``AsyncSeq.chunkByAsync groups consecutive equal keys`` () =
async {
let source = asyncSeq { yield 1; yield 1; yield 2; yield 2 }
let result = AsyncSeq.chunkByAsync (fun x -> async { return x }) source |> AsyncSeq.toListSynchronously
Assert.AreEqual([(1, [1;1]); (2, [2;2])], result)
} |> Async.RunSynchronously