From 9e7e535f042591420db72fe616f7741e1c6fa4b5 Mon Sep 17 00:00:00 2001 From: Repo Assist Date: Sat, 7 Mar 2026 22:28:48 +0000 Subject: [PATCH] feat: add TaskSeq.chunkBySize and TaskSeq.windowed (closes #258, ref #289) - TaskSeq.chunkBySize: divides a task sequence into non-overlapping chunks of at most chunkSize elements. Uses a fixed-size array buffer (vs. ResizeArray) to avoid intermediate allocations and resizing. - TaskSeq.windowed: returns overlapping sliding windows of a fixed size. Uses a ring buffer internally so that only a single allocation (per window) is needed; no redundant element copies on each step. Both functions validate their size argument eagerly (before enumeration starts), raise ArgumentException for non-positive sizes, and are fully documented in the .fsi signature file. Also: - Update README.md to mark chunkBySize, windowed, pairwise, scan/scanAsync, reduce/reduceAsync, and unfold/unfoldAsync as implemented (these were merged in PRs #293, #296, #299, #300 respectively). - Update release-notes.txt for 0.5.0. - 171 new tests across TaskSeq.ChunkBySize.Tests.fs and TaskSeq.Windowed.Tests.fs. All 4021 existing tests continue to pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- README.md | 26 ++- release-notes.txt | 3 + .../FSharp.Control.TaskSeq.Test.fsproj | 2 + .../TaskSeq.ChunkBySize.Tests.fs | 194 ++++++++++++++++ .../TaskSeq.Windowed.Tests.fs | 219 ++++++++++++++++++ src/FSharp.Control.TaskSeq/TaskSeq.fs | 2 + src/FSharp.Control.TaskSeq/TaskSeq.fsi | 34 +++ src/FSharp.Control.TaskSeq/TaskSeqInternal.fs | 54 +++++ 8 files changed, 524 insertions(+), 10 deletions(-) create mode 100644 src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs create mode 100644 src/FSharp.Control.TaskSeq.Test/TaskSeq.Windowed.Tests.fs diff --git a/README.md b/README.md index 8f0b025..0d28e59 100644 --- a/README.md +++ b/README.md @@ -211,15 +211,16 @@ The `TaskSeq` project already has a wide array of functions and functionalities, - [ ] `average` / `averageBy`, `sum` and related - [x] `forall` / `forallAsync` (see [#240]) - [x] `skip` / `drop` / `truncate` / `take` (see [#209]) - - [ ] `chunkBySize` / `windowed` + - [x] `chunkBySize` / `windowed` (see [#258]) - [ ] `compareWith` - [ ] `distinct` - [ ] `exists2` / `map2` / `fold2` / `iter2` and related '2'-functions - [ ] `mapFold` - - [ ] `pairwise` / `allpairs` / `permute` / `distinct` / `distinctBy` + - [x] `pairwise` (see [#293]) + - [ ] `allpairs` / `permute` / `distinct` / `distinctBy` - [ ] `replicate` - - [ ] `reduce` / `scan` - - [ ] `unfold` + - [x] `reduce` / `scan` (see [#299], [#296]) + - [x] `unfold` (see [#300]) - [x] Publish package on Nuget, **DONE, PUBLISHED SINCE: 7 November 2022**. See https://www.nuget.org/packages/FSharp.Control.TaskSeq - [x] Make `TaskSeq` interoperable with `Task` by expanding the latter with a `for .. in .. do` that acceps task sequences - [x] Add to/from functions to seq, list, array @@ -263,7 +264,7 @@ This is what has been implemented so far, is planned or skipped: | ✅ [#67][] | | | `box` | | | ✅ [#67][] | | | `unbox` | | | ✅ [#23][] | `choose` | `choose` | `chooseAsync` | | -| | `chunkBySize` | `chunkBySize` | | | +| ✅ [#258][] | `chunkBySize` | `chunkBySize` | | | | ✅ [#11][] | `collect` | `collect` | `collectAsync` | | | ✅ [#11][] | | `collectSeq` | `collectSeqAsync` | | | | `compareWith` | `compareWith` | `compareWithAsync` | | @@ -332,17 +333,17 @@ This is what has been implemented so far, is planned or skipped: | ✅ [#2][] | | `ofTaskArray` | | | | ✅ [#2][] | | `ofTaskList` | | | | ✅ [#2][] | | `ofTaskSeq` | | | -| | `pairwise` | `pairwise` | | | +| ✅ [#293][] | `pairwise` | `pairwise` | | | | | `permute` | `permute` | `permuteAsync` | | | ✅ [#23][] | `pick` | `pick` | `pickAsync` | | | 🚫 | `readOnly` | | | [note #3](#note3 "The motivation for 'readOnly' in 'Seq' is that a cast from a mutable array or list to a 'seq<_>' is valid and can be cast back, leading to a mutable sequence. Since 'TaskSeq' doesn't implement 'IEnumerable<_>', such casts are not possible.") | -| | `reduce` | `reduce` | `reduceAsync` | | +| ✅ [#299][] | `reduce` | `reduce` | `reduceAsync` | | | 🚫 | `reduceBack` | | | [note #2](#note2 "Because of the async nature of TaskSeq sequences, iterating from the back would be bad practice. Instead, materialize the sequence to a list or array and then apply the 'Back' iterators.") | | ✅ [#236][]| `removeAt` | `removeAt` | | | | ✅ [#236][]| `removeManyAt` | `removeManyAt` | | | | | `replicate` | `replicate` | | | | ❓ | `rev` | | | [note #1](#note1 "These functions require a form of pre-materializing through 'TaskSeq.cache', similar to the approach taken in the corresponding 'Seq' functions. It doesn't make much sense to have a cached async sequence. However, 'AsyncSeq' does implement these, so we'll probably do so eventually as well.") | -| | `scan` | `scan` | `scanAsync` | | +| ✅ [#296][] | `scan` | `scan` | `scanAsync` | | | 🚫 | `scanBack` | | | [note #2](#note2 "Because of the async nature of TaskSeq sequences, iterating from the back would be bad practice. Instead, materialize the sequence to a list or array and then apply the 'Back' iterators.") | | ✅ [#90][] | `singleton` | `singleton` | | | | ✅ [#209][]| `skip` | `skip` | | | @@ -378,10 +379,10 @@ This is what has been implemented so far, is planned or skipped: | ✅ [#23][] | `tryLast` | `tryLast` | | | | ✅ [#23][] | `tryPick` | `tryPick` | `tryPickAsync` | | | ✅ [#76][] | | `tryTail` | | | -| | `unfold` | `unfold` | `unfoldAsync` | | +| ✅ [#300][] | `unfold` | `unfold` | `unfoldAsync` | | | ✅ [#236][]| `updateAt` | `updateAt` | | | | ✅ [#217][]| `where` | `where` | `whereAsync` | | -| | `windowed` | `windowed` | | | +| ✅ [#258][] | `windowed` | `windowed` | | | | ✅ [#2][] | `zip` | `zip` | | | | | `zip3` | `zip3` | | | | | | `zip4` | | | @@ -653,6 +654,11 @@ module TaskSeq = [#237]: https://github.com/fsprojects/FSharp.Control.TaskSeq/issues/237 [#236]: https://github.com/fsprojects/FSharp.Control.TaskSeq/issues/236 [#240]: https://github.com/fsprojects/FSharp.Control.TaskSeq/issues/240 +[#258]: https://github.com/fsprojects/FSharp.Control.TaskSeq/issues/258 +[#293]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/293 +[#296]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/296 +[#299]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/299 +[#300]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/300 [issues]: https://github.com/fsprojects/FSharp.Control.TaskSeq/issues [nuget]: https://www.nuget.org/packages/FSharp.Control.TaskSeq/ diff --git a/release-notes.txt b/release-notes.txt index 43967b3..54285cd 100644 --- a/release-notes.txt +++ b/release-notes.txt @@ -5,6 +5,9 @@ Release notes: - update engineering to .NET 9/10 - adds TaskSeq.scan and TaskSeq.scanAsync, #289 - adds TaskSeq.pairwise, #289 + - adds TaskSeq.reduce and TaskSeq.reduceAsync, #289 + - adds TaskSeq.unfold and TaskSeq.unfoldAsync, #289 + - adds TaskSeq.chunkBySize (closes #258) and TaskSeq.windowed, #289 0.4.0 - overhaul all doc comments, add exceptions, improve IDE quick-info experience, #136, #220, #234 diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index 2a6d808..b717c8f 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -53,6 +53,8 @@ + + diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs new file mode 100644 index 0000000..fb42b7e --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.ChunkBySize.Tests.fs @@ -0,0 +1,194 @@ +module TaskSeq.Tests.ChunkBySize + +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// TaskSeq.chunkBySize +// + +module EmptySeq = + [] + let ``TaskSeq-chunkBySize with null source raises`` () = assertNullArg <| fun () -> TaskSeq.chunkBySize 1 null + + [] + let ``TaskSeq-chunkBySize with zero raises ArgumentException before awaiting`` () = + fun () -> TaskSeq.empty |> TaskSeq.chunkBySize 0 |> ignore // throws eagerly, before enumeration + |> should throw typeof + + [] + let ``TaskSeq-chunkBySize with negative raises ArgumentException before awaiting`` () = + fun () -> TaskSeq.empty |> TaskSeq.chunkBySize -1 |> ignore + |> should throw typeof + + [)>] + let ``TaskSeq-chunkBySize on empty sequence yields empty`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.chunkBySize 1 + |> verifyEmpty + + [)>] + let ``TaskSeq-chunkBySize(99) on empty sequence yields empty`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.chunkBySize 99 + |> verifyEmpty + +module Immutable = + [)>] + let ``TaskSeq-chunkBySize preserves all elements in order`` variant = task { + do! + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 3 + |> TaskSeq.collect TaskSeq.ofArray + |> verify1To10 + } + + [)>] + let ``TaskSeq-chunkBySize(2) returns 5 chunks of 2 for a 10-element sequence`` variant = task { + let! chunks = + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 2 + |> TaskSeq.toArrayAsync + + chunks + |> should equal [| [| 1; 2 |]; [| 3; 4 |]; [| 5; 6 |]; [| 7; 8 |]; [| 9; 10 |] |] + } + + [)>] + let ``TaskSeq-chunkBySize(5) returns 2 full chunks for a 10-element sequence`` variant = task { + let! chunks = + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 5 + |> TaskSeq.toArrayAsync + + chunks |> should equal [| [| 1..5 |]; [| 6..10 |] |] + } + + [)>] + let ``TaskSeq-chunkBySize(1) returns each element as its own array`` variant = task { + let! chunks = + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 1 + |> TaskSeq.toArrayAsync + + chunks |> Array.length |> should equal 10 + + chunks + |> Array.iteri (fun i chunk -> chunk |> should equal [| i + 1 |]) + } + + [)>] + let ``TaskSeq-chunkBySize last chunk contains remainder when sequence does not divide evenly`` variant = task { + // 10 elements with chunk size 3 → chunks [1;2;3] [4;5;6] [7;8;9] [10] + let! chunks = + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 3 + |> TaskSeq.toArrayAsync + + chunks |> Array.length |> should equal 4 + chunks |> Array.last |> should equal [| 10 |] + } + + [)>] + let ``TaskSeq-chunkBySize larger than sequence returns single chunk with all elements`` variant = task { + let! chunks = + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 11 + |> TaskSeq.toArrayAsync + + chunks |> Array.length |> should equal 1 + chunks.[0] |> should equal [| 1..10 |] + } + + [)>] + let ``TaskSeq-chunkBySize equal to sequence length returns single full chunk`` variant = task { + let! chunks = + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize 10 + |> TaskSeq.toArrayAsync + + chunks |> Array.length |> should equal 1 + chunks.[0] |> should equal [| 1..10 |] + } + + [] + let ``TaskSeq-chunkBySize each chunk array is independent - modifying one does not affect others`` () = task { + let! chunks = + taskSeq { yield! [ 1..6 ] } + |> TaskSeq.chunkBySize 3 + |> TaskSeq.toArrayAsync + + // Mutate the first chunk + chunks.[0].[0] <- 99 + + // The second chunk must be unaffected + chunks.[1] |> should equal [| 4; 5; 6 |] + chunks.[0].[0] |> should equal 99 + } + + [)>] + let ``TaskSeq-chunkBySize remainder sizes`` variant = task { + let verifyLastChunkSize chunkSize expectedLast = + Gen.getSeqImmutable variant + |> TaskSeq.chunkBySize chunkSize + |> TaskSeq.toArrayAsync + |> Task.map (Array.last >> Array.length >> should equal expectedLast) + + do! verifyLastChunkSize 3 1 // 10 mod 3 = 1 + do! verifyLastChunkSize 4 2 // 10 mod 4 = 2 + do! verifyLastChunkSize 6 4 // 10 mod 6 = 4 + do! verifyLastChunkSize 7 3 // 10 mod 7 = 3 + do! verifyLastChunkSize 9 1 // 10 mod 9 = 1 + } + +module SideEffects = + [)>] + let ``TaskSeq-chunkBySize gets all items`` variant = + Gen.getSeqWithSideEffect variant + |> TaskSeq.chunkBySize 5 + |> TaskSeq.toArrayAsync + |> Task.map (should equal [| [| 1..5 |]; [| 6..10 |] |]) + + [] + let ``TaskSeq-chunkBySize executes side-effects from empty source`` () = task { + let mutable sideEffects = 0 + + let ts = taskSeq { + sideEffects <- sideEffects + 1 + sideEffects <- sideEffects + 1 + } + + do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq + do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq + sideEffects |> should equal 4 + } + + [] + let ``TaskSeq-chunkBySize executes all source side-effects`` () = task { + let mutable sideEffects = 0 + + let ts = taskSeq { + sideEffects <- sideEffects + 1 + yield 1 + sideEffects <- sideEffects + 1 + yield 2 + sideEffects <- sideEffects + 1 // executed even after last yield + } + + do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq + sideEffects |> should equal 3 + } + + [] + let ``TaskSeq-chunkBySize propagates exception from source`` () = + let items = taskSeq { + yield 1 + yield 2 + failwith "boom" + yield 3 + } + + fun () -> items |> TaskSeq.chunkBySize 2 |> consumeTaskSeq + |> should throwAsyncExact typeof diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Windowed.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Windowed.Tests.fs new file mode 100644 index 0000000..9281e5f --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Windowed.Tests.fs @@ -0,0 +1,219 @@ +module TaskSeq.Tests.Windowed + +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// TaskSeq.windowed +// + +module EmptySeq = + [] + let ``TaskSeq-windowed with null source raises`` () = assertNullArg <| fun () -> TaskSeq.windowed 1 null + + [] + let ``TaskSeq-windowed with zero raises ArgumentException before awaiting`` () = + fun () -> TaskSeq.empty |> TaskSeq.windowed 0 |> ignore // throws eagerly, before enumeration + |> should throw typeof + + [] + let ``TaskSeq-windowed with negative raises ArgumentException before awaiting`` () = + fun () -> TaskSeq.empty |> TaskSeq.windowed -1 |> ignore + |> should throw typeof + + [)>] + let ``TaskSeq-windowed on empty sequence yields empty`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.windowed 1 + |> verifyEmpty + + [)>] + let ``TaskSeq-windowed(99) on empty sequence yields empty`` variant = + Gen.getEmptyVariant variant + |> TaskSeq.windowed 99 + |> verifyEmpty + + [] + let ``TaskSeq-windowed on singleton with windowSize=2 yields empty`` () = taskSeq { yield 42 } |> TaskSeq.windowed 2 |> verifyEmpty + + [] + let ``TaskSeq-windowed on singleton with windowSize=1 yields one window`` () = task { + let! windows = + taskSeq { yield 42 } + |> TaskSeq.windowed 1 + |> TaskSeq.toListAsync + + windows |> should equal [ [| 42 |] ] + } + +module Immutable = + [)>] + let ``TaskSeq-windowed(1) returns each element wrapped in an array`` variant = task { + let! windows = + Gen.getSeqImmutable variant + |> TaskSeq.windowed 1 + |> TaskSeq.toArrayAsync + + windows |> Array.length |> should equal 10 + + windows + |> Array.iteri (fun i w -> w |> should equal [| i + 1 |]) + } + + [)>] + let ``TaskSeq-windowed(2) returns consecutive overlapping pairs as arrays`` variant = task { + let! windows = + Gen.getSeqImmutable variant + |> TaskSeq.windowed 2 + |> TaskSeq.toArrayAsync + + windows |> Array.length |> should equal 9 + windows |> Array.head |> should equal [| 1; 2 |] + windows |> Array.last |> should equal [| 9; 10 |] + } + + [)>] + let ``TaskSeq-windowed(3) returns correct sliding windows`` variant = task { + let! windows = + Gen.getSeqImmutable variant + |> TaskSeq.windowed 3 + |> TaskSeq.toArrayAsync + + windows |> Array.length |> should equal 8 + windows.[0] |> should equal [| 1; 2; 3 |] + windows.[1] |> should equal [| 2; 3; 4 |] + windows.[7] |> should equal [| 8; 9; 10 |] + } + + [)>] + let ``TaskSeq-windowed output length is source length minus windowSize plus one`` variant = task { + let source = Gen.getSeqImmutable variant + + let! len3 = source |> TaskSeq.windowed 3 |> TaskSeq.length + let! len5 = source |> TaskSeq.windowed 5 |> TaskSeq.length + let! len10 = source |> TaskSeq.windowed 10 |> TaskSeq.length + + len3 |> should equal 8 // 10 - 3 + 1 + len5 |> should equal 6 // 10 - 5 + 1 + len10 |> should equal 1 // 10 - 10 + 1 + } + + [)>] + let ``TaskSeq-windowed windowSize larger than source yields empty`` variant = + Gen.getSeqImmutable variant + |> TaskSeq.windowed 11 + |> verifyEmpty + + [] + let ``TaskSeq-windowed windows overlap correctly - each element shared by adjacent windows`` () = task { + let! windows = + taskSeq { yield! [ 'A'; 'B'; 'C'; 'D'; 'E' ] } + |> TaskSeq.windowed 3 + |> TaskSeq.toListAsync + + windows + |> should equal [ [| 'A'; 'B'; 'C' |]; [| 'B'; 'C'; 'D' |]; [| 'C'; 'D'; 'E' |] ] + + // Adjacent windows share all but one element + windows + |> List.pairwise + |> List.iter (fun (w1, w2) -> + // tail of w1 == init of w2 + w1.[1..] |> should equal w2.[.. w2.Length - 2]) + } + + [] + let ``TaskSeq-windowed each window array is independent - modifying one does not affect others`` () = task { + let! windows = + taskSeq { yield! [ 1..5 ] } + |> TaskSeq.windowed 3 + |> TaskSeq.toArrayAsync + + // Mutate first window + windows.[0].[0] <- 99 + + // Second window is unaffected + windows.[1] |> should equal [| 2; 3; 4 |] + windows.[0].[0] |> should equal 99 + } + + [] + let ``TaskSeq-windowed with large windowSize equal to source length`` () = task { + let! windows = + taskSeq { yield! [ 1..5 ] } + |> TaskSeq.windowed 5 + |> TaskSeq.toArrayAsync + + windows |> Array.length |> should equal 1 + windows.[0] |> should equal [| 1; 2; 3; 4; 5 |] + } + + [] + let ``TaskSeq-windowed ring buffer correctness - window wraps correctly at buffer boundary`` () = task { + // Window size 4 over [1..7] — exercises the ring-buffer copy path. + let! windows = + taskSeq { yield! [ 1..7 ] } + |> TaskSeq.windowed 4 + |> TaskSeq.toArrayAsync + + windows + |> should equal [| [| 1; 2; 3; 4 |]; [| 2; 3; 4; 5 |]; [| 3; 4; 5; 6 |]; [| 4; 5; 6; 7 |] |] + } + +module SideEffects = + [)>] + let ``TaskSeq-windowed gets all items`` variant = task { + let! windows = + Gen.getSeqWithSideEffect variant + |> TaskSeq.windowed 2 + |> TaskSeq.toArrayAsync + + windows |> Array.length |> should equal 9 + windows |> Array.head |> should equal [| 1; 2 |] + windows |> Array.last |> should equal [| 9; 10 |] + } + + [] + let ``TaskSeq-windowed executes all source side-effects`` () = task { + let mutable sideEffects = 0 + + let ts = taskSeq { + sideEffects <- sideEffects + 1 + yield 1 + sideEffects <- sideEffects + 1 + yield 2 + sideEffects <- sideEffects + 1 + } + + do! ts |> TaskSeq.windowed 2 |> consumeTaskSeq + sideEffects |> should equal 3 + } + + [] + let ``TaskSeq-windowed consumes every source element exactly once`` () = task { + let mutable count = 0 + + let ts = taskSeq { + for i in 1..5 do + count <- count + 1 + yield i + } + + let! windows = ts |> TaskSeq.windowed 3 |> TaskSeq.toListAsync + count |> should equal 5 + windows |> List.length |> should equal 3 + } + + [] + let ``TaskSeq-windowed propagates exception from source`` () = + let items = taskSeq { + yield 1 + yield 2 + failwith "boom" + yield 3 + } + + fun () -> items |> TaskSeq.windowed 2 |> consumeTaskSeq + |> should throwAsyncExact typeof diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 0ff387c..361286d 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -363,6 +363,8 @@ type TaskSeq private () = static member distinctUntilChanged source = Internal.distinctUntilChanged source static member pairwise source = Internal.pairwise source + static member chunkBySize chunkSize source = Internal.chunkBySize chunkSize source + static member windowed windowSize source = Internal.windowed windowSize source static member forall predicate source = Internal.forall (Predicate predicate) source static member forallAsync predicate source = Internal.forall (PredicateAsync predicate) source diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index d1d8d7c..aed7d96 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -1346,6 +1346,40 @@ type TaskSeq = /// Thrown when the input task sequence is null. static member pairwise: source: TaskSeq<'T> -> TaskSeq<'T * 'T> + /// + /// Divides the input task sequence into chunks of size at most . + /// The last chunk may be smaller than if the source sequence does not divide evenly. + /// Returns an empty task sequence when the source is empty. + /// + /// If is not positive, an is raised immediately + /// (before the sequence is evaluated). + /// + /// + /// The maximum number of elements in each chunk. Must be positive. + /// The input task sequence. + /// A task sequence of non-overlapping array chunks. + /// Thrown when the input task sequence is null. + /// Thrown when is not positive. + static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T[]> + + /// + /// Returns a task sequence of sliding windows of a given size over the source sequence. + /// Each window is a fresh array of exactly consecutive elements. + /// The result is empty if the source has fewer than elements. + /// + /// Uses a ring buffer internally to avoid redundant copies, yielding one allocation per window. + /// + /// If is not positive, an is raised immediately + /// (before the sequence is evaluated). + /// + /// + /// The number of elements in each window. Must be positive. + /// The input task sequence. + /// A task sequence of overlapping array windows. + /// Thrown when the input task sequence is null. + /// Thrown when is not positive. + static member windowed: windowSize: int -> source: TaskSeq<'T> -> TaskSeq<'T[]> + /// /// Combines the two task sequences into a new task sequence of pairs. The two sequences need not have equal lengths: /// when one sequence is exhausted any remaining elements in the other sequence are ignored. diff --git a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs index ee7b3df..666eb02 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqInternal.fs @@ -1152,3 +1152,57 @@ module internal TaskSeqInternal = yield previous, current maybePrevious <- ValueSome current } + + let chunkBySize chunkSize (source: TaskSeq<'T>) : TaskSeq<'T[]> = + if chunkSize < 1 then + invalidArg (nameof chunkSize) $"The value must be positive, but was %i{chunkSize}." + + checkNonNull (nameof source) source + + taskSeq { + // Use a fixed-size array with a count index to avoid ResizeArray overhead. + let buffer = Array.zeroCreate<'T> chunkSize + let mutable count = 0 + + for item in source do + buffer.[count] <- item + count <- count + 1 + + if count = chunkSize then + yield Array.copy buffer + count <- 0 + + if count > 0 then + // Last partial chunk: copy only the filled portion. + yield buffer.[0 .. count - 1] + } + + let windowed windowSize (source: TaskSeq<_>) = + if windowSize <= 0 then + invalidArg (nameof windowSize) $"The value must be positive, but was %i{windowSize}." + + checkNonNull (nameof source) source + + taskSeq { + // Ring buffer: arr holds elements in circular order. + // 'count' tracks total elements seen; count % windowSize is the next write position. + let arr = Array.zeroCreate windowSize + let mutable count = 0 + + for item in source do + arr.[count % windowSize] <- item + count <- count + 1 + + if count >= windowSize then + // Copy ring buffer in source order into a fresh array. + let result = Array.zeroCreate windowSize + let start = count % windowSize // index of oldest element in the ring + + if start = 0 then + Array.blit arr 0 result 0 windowSize + else + Array.blit arr start result 0 (windowSize - start) + Array.blit arr 0 result (windowSize - start) start + + yield result + }