diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs index 3be0452..47d91fa 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fs +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fs @@ -19,14 +19,73 @@ open System.Threading.Channels // ---------------------------------------------------------------------------- -type IAsyncEnumerator<'T> = +/// Internal pull-based enumerator; not part of the public API. +/// Use AsyncSeq<'T> = System.Collections.Generic.IAsyncEnumerable<'T> as the public type. +[] +type IAsyncSeqEnumerator<'T> = abstract MoveNext : unit -> Async<'T option> inherit IDisposable -type IAsyncEnumerable<'T> = - abstract GetEnumerator : unit -> IAsyncEnumerator<'T> - -type AsyncSeq<'T> = IAsyncEnumerable<'T> +#if FABLE_COMPILER +/// AsyncSeq<'T> for Fable: a library-specific interface that avoids ValueTask. +[] +type AsyncSeq<'T> = + abstract GetEnumerator : unit -> IAsyncSeqEnumerator<'T> + +/// Adapter: wraps an internal pull-enumerator factory into an AsyncSeq<'T>. +[] +type AsyncSeqImpl<'T>(getEnum: unit -> IAsyncSeqEnumerator<'T>) = + member _.GetInternalEnumerator() = getEnum() + interface AsyncSeq<'T> with + member _.GetEnumerator() = getEnum() +#else +/// AsyncSeq<'T> is now the BCL IAsyncEnumerable<'T>. +type AsyncSeq<'T> = System.Collections.Generic.IAsyncEnumerable<'T> + +/// Adapter: wraps an internal pull-enumerator factory into a BCL IAsyncEnumerable<'T>. +/// This is the canonical way to create AsyncSeq<'T> values from internal generators. +/// Not part of the public API (hidden by .fsi file). +[] +type AsyncSeqImpl<'T>(getEnum: unit -> IAsyncSeqEnumerator<'T>) = + member _.GetInternalEnumerator() = getEnum() + interface System.Collections.Generic.IAsyncEnumerable<'T> with + member _.GetAsyncEnumerator(ct) = + let inner = getEnum() + let mutable current = Unchecked.defaultof<'T> + { new System.Collections.Generic.IAsyncEnumerator<'T> with + member _.Current = current + member _.MoveNextAsync() = + Async.StartAsTask( + async { + let! res = inner.MoveNext() + match res with + | Some v -> current <- v; return true + | None -> return false + }, cancellationToken=ct) |> System.Threading.Tasks.ValueTask + member _.DisposeAsync() = + inner.Dispose() + System.Threading.Tasks.ValueTask() } + + +/// Extension adding GetEnumerator() to AsyncSeq<'T> for internal use. +/// This preserves internal pull-based iteration without touching all call sites. +[] +module AsyncSeqEnumeratorExtensions = + type System.Collections.Generic.IAsyncEnumerable<'T> with + /// Gets an internal pull-based enumerator. For internal use only. + member this.GetEnumerator() : IAsyncSeqEnumerator<'T> = + match this with + | :? AsyncSeqImpl<'T> as x -> x.GetInternalEnumerator() + | _ -> + let e = this.GetAsyncEnumerator(System.Threading.CancellationToken.None) + { new IAsyncSeqEnumerator<'T> with + member _.MoveNext() = async { + let! moved = e.MoveNextAsync().AsTask() |> Async.AwaitTask + if moved then return Some e.Current + else return None } + interface System.IDisposable with + member _.Dispose() = e.DisposeAsync() |> ignore } +#endif #if !FABLE_COMPILER type AsyncSeqSrc<'a> = private { tail : AsyncSeqSrcNode<'a> ref } @@ -222,7 +281,7 @@ module AsyncGenerator = let mutable g = g let mutable fin = false member __.Generator = g - interface IAsyncEnumerator<'a> with + interface IAsyncSeqEnumerator<'a> with member x.MoveNext () = async { let! step = appG g match step with @@ -233,13 +292,13 @@ module AsyncGenerator = return Some a | Goto next -> g <- next - return! (x :> IAsyncEnumerator<_>).MoveNext() } + return! (x :> IAsyncSeqEnumerator<_>).MoveNext() } member __.Dispose () = disposeG g /// Converts an enumerator to a generator. /// The resulting generator will either yield or stop. - type AsyncEnumeratorGenerator<'a> (enum:IAsyncEnumerator<'a>) = + type AsyncEnumeratorGenerator<'a> (enum:IAsyncSeqEnumerator<'a>) = member __.Enumerator = enum interface AsyncGenerator<'a> with member __.Apply () = async { @@ -251,23 +310,21 @@ module AsyncGenerator = return Stop } member __.Disposer = Some ((fun () -> (enum :> IDisposable).Dispose())) - let enumeratorFromGenerator (g:AsyncGenerator<'a>) : IAsyncEnumerator<'a> = + let enumeratorFromGenerator (g:AsyncGenerator<'a>) : IAsyncSeqEnumerator<'a> = match g with | :? AsyncEnumeratorGenerator<'a> as g -> g.Enumerator | _ -> (new AsyncGeneratorEnumerator<_>(g) :> _) - let generatorFromEnumerator (e:IAsyncEnumerator<'a>) : AsyncGenerator<'a> = + let generatorFromEnumerator (e:IAsyncSeqEnumerator<'a>) : AsyncGenerator<'a> = match e with | :? AsyncGeneratorEnumerator<'a> as e -> e.Generator | _ -> (new AsyncEnumeratorGenerator<_>(e) :> _) let delay (f:unit -> AsyncSeq<'T>) : AsyncSeq<'T> = - { new IAsyncEnumerable<'T> with - member x.GetEnumerator() = f().GetEnumerator() } + AsyncSeqImpl(fun () -> (f()).GetEnumerator()) :> AsyncSeq<'T> - let emitEnum (e:IAsyncEnumerator<'a>) : AsyncSeq<'a> = - { new IAsyncEnumerable<_> with - member __.GetEnumerator () = e } + let emitEnum (e:IAsyncSeqEnumerator<'a>) : AsyncSeq<'a> = + AsyncSeqImpl(fun () -> e) :> AsyncSeq<'a> let fromGeneratorDelay (f:unit -> AsyncGenerator<'a>) : AsyncSeq<'a> = delay (fun () -> emitEnum (enumeratorFromGenerator (f ()))) @@ -298,7 +355,7 @@ module AsyncSeqOp = let mutable currentState = init let mutable disposed = false - interface IAsyncEnumerator<'T> with + interface IAsyncSeqEnumerator<'T> with member __.MoveNext () : Async<'T option> = if disposed then async.Return None else async { @@ -357,9 +414,16 @@ module AsyncSeqOp = | None -> return None } new UnfoldAsyncEnumerator<'S, 'U> (h, init) :> _ - interface IAsyncEnumerable<'T> with - member __.GetEnumerator () = - new OptimizedUnfoldEnumerator<'S, 'T>(f, init) :> IAsyncEnumerator<'T> +#if FABLE_COMPILER + interface AsyncSeq<'T> with + member __.GetEnumerator() = + new OptimizedUnfoldEnumerator<'S, 'T>(f, init) :> IAsyncSeqEnumerator<'T> +#else + interface System.Collections.Generic.IAsyncEnumerable<'T> with + member __.GetAsyncEnumerator(ct) = + (AsyncSeqImpl(fun () -> new OptimizedUnfoldEnumerator<'S, 'T>(f, init) :> IAsyncSeqEnumerator<'T>) + :> System.Collections.Generic.IAsyncEnumerable<'T>).GetAsyncEnumerator(ct) +#endif @@ -369,89 +433,79 @@ module AsyncSeq = [] let empty<'T> : AsyncSeq<'T> = - { new IAsyncEnumerable<'T> with - member x.GetEnumerator() = - { new IAsyncEnumerator<'T> with - member x.MoveNext() = async { return None } - member x.Dispose() = () } } + AsyncSeqImpl(fun () -> + { new IAsyncSeqEnumerator<'T> with + member _.MoveNext() = async { return None } + interface System.IDisposable with + member _.Dispose() = () }) :> AsyncSeq<'T> let emptyAsync<'T> (action : Async) : AsyncSeq<'T> = - { new IAsyncEnumerable<'T> with - member x.GetEnumerator() = - { new IAsyncEnumerator<'T> with - member x.MoveNext() = - async { - do! action - return None - } - member x.Dispose() = () } } + AsyncSeqImpl(fun () -> + { new IAsyncSeqEnumerator<'T> with + member _.MoveNext() = async { + do! action + return None } + interface System.IDisposable with + member _.Dispose() = () }) :> AsyncSeq<'T> let singleton (v:'T) : AsyncSeq<'T> = - { new IAsyncEnumerable<'T> with - member x.GetEnumerator() = - let state = ref 0 - { new IAsyncEnumerator<'T> with - member x.MoveNext() = async { let res = state.Value = 0 - incr state; - return (if res then Some v else None) } - member x.Dispose() = () } } + AsyncSeqImpl(fun () -> + let state = ref 0 + { new IAsyncSeqEnumerator<'T> with + member _.MoveNext() = async { + let res = state.Value = 0 + incr state + return (if res then Some v else None) } + interface System.IDisposable with + member _.Dispose() = () }) :> AsyncSeq<'T> let append (inp1: AsyncSeq<'T>) (inp2: AsyncSeq<'T>) : AsyncSeq<'T> = // Optimized append implementation that doesn't create generator chains // This fixes the memory leak issue in Issue #35 - { new IAsyncEnumerable<'T> with - member x.GetEnumerator() = - let mutable currentEnumerator : IAsyncEnumerator<'T> option = None - let mutable useSecond = false - { new IAsyncEnumerator<'T> with - member x.MoveNext() = async { - match currentEnumerator with - | None -> - // Start with the first sequence - let enum1 = inp1.GetEnumerator() - currentEnumerator <- Some enum1 - return! x.MoveNext() - | Some enum when not useSecond -> - // Try to get next element from first sequence - let! result = enum.MoveNext() - match result with - | Some v -> return Some v - | None -> - // First sequence is exhausted, switch to second - dispose enum - let enum2 = inp2.GetEnumerator() - currentEnumerator <- Some enum2 - useSecond <- true - return! x.MoveNext() - | Some enum -> - // Get elements from second sequence - return! enum.MoveNext() - } - member x.Dispose() = - match currentEnumerator with - | Some enum -> dispose enum - | None -> () - } - } + AsyncSeqImpl(fun () -> + let mutable currentEnumerator : IAsyncSeqEnumerator<'T> option = None + let mutable useSecond = false + let rec moveNext (self: IAsyncSeqEnumerator<'T>) = async { + match currentEnumerator with + | None -> + let enum1 = inp1.GetEnumerator() + currentEnumerator <- Some enum1 + return! moveNext self + | Some enum when not useSecond -> + let! result = enum.MoveNext() + match result with + | Some v -> return Some v + | None -> + dispose enum + let enum2 = inp2.GetEnumerator() + currentEnumerator <- Some enum2 + useSecond <- true + return! moveNext self + | Some enum -> + return! enum.MoveNext() + } + { new IAsyncSeqEnumerator<'T> with + member x.MoveNext() = moveNext x + interface System.IDisposable with + member _.Dispose() = + match currentEnumerator with + | Some enum -> dispose enum + | None -> () }) :> AsyncSeq<'T> let inline delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> = AsyncGenerator.delay f let bindAsync (f:'T -> AsyncSeq<'U>) (inp:Async<'T>) : AsyncSeq<'U> = - { new IAsyncEnumerable<'U> with - member x.GetEnumerator () = - { new AsyncGenerator.AsyncGenerator<'U> with - member x.Apply () = async { - let! v = inp - let cont = + AsyncSeqImpl(fun () -> + { new AsyncGenerator.AsyncGenerator<'U> with + member _.Apply() = async { + let! v = inp + let cont = (f v).GetEnumerator() |> AsyncGenerator.generatorFromEnumerator - return AsyncGenerator.Goto cont - } - member x.Disposer = None - } - |> AsyncGenerator.enumeratorFromGenerator - } + return AsyncGenerator.Goto cont } + member _.Disposer = None } + |> AsyncGenerator.enumeratorFromGenerator) :> AsyncSeq<'U> @@ -475,7 +529,7 @@ module AsyncSeq = let asyncSeq = new AsyncSeqBuilder() - let emitEnumerator (ie: IAsyncEnumerator<'T>) = asyncSeq { + let emitEnumerator (ie: IAsyncSeqEnumerator<'T>) = asyncSeq { let! moven = ie.MoveNext() let b = ref moven while b.Value.IsSome do @@ -486,17 +540,16 @@ module AsyncSeq = [] type TryWithState<'T> = | NotStarted of AsyncSeq<'T> - | HaveBodyEnumerator of IAsyncEnumerator<'T> - | HaveHandlerEnumerator of IAsyncEnumerator<'T> + | HaveBodyEnumerator of IAsyncSeqEnumerator<'T> + | HaveHandlerEnumerator of IAsyncSeqEnumerator<'T> | Finished /// Implements the 'TryWith' functionality for computation builder let tryWith (inp: AsyncSeq<'T>) (handler : exn -> AsyncSeq<'T>) : AsyncSeq<'T> = // Note: this is put outside the object deliberately, so the object doesn't permanently capture inp1 and inp2 - { new IAsyncEnumerable<'T> with - member x.GetEnumerator() = - let state = ref (TryWithState.NotStarted inp) - { new IAsyncEnumerator<'T> with + AsyncSeqImpl(fun () -> + let state = ref (TryWithState.NotStarted inp) + { new IAsyncSeqEnumerator<'T> with member x.MoveNext() = async { match !state with | TryWithState.NotStarted inp -> @@ -548,22 +601,21 @@ module AsyncSeq = | TryWithState.HaveBodyEnumerator e | TryWithState.HaveHandlerEnumerator e -> state := TryWithState.Finished dispose e - | _ -> () } } + | _ -> () }) :> AsyncSeq<'T> [] type TryFinallyState<'T> = | NotStarted of AsyncSeq<'T> - | HaveBodyEnumerator of IAsyncEnumerator<'T> + | HaveBodyEnumerator of IAsyncSeqEnumerator<'T> | Finished // This pushes the handler through all the async computations // The (synchronous) compensation is run when the Dispose() is called let tryFinally (inp: AsyncSeq<'T>) (compensation : unit -> unit) : AsyncSeq<'T> = - { new IAsyncEnumerable<'T> with - member x.GetEnumerator() = - let state = ref (TryFinallyState.NotStarted inp) - { new IAsyncEnumerator<'T> with + AsyncSeqImpl(fun () -> + let state = ref (TryFinallyState.NotStarted inp) + { new IAsyncSeqEnumerator<'T> with member x.MoveNext() = async { match !state with | TryFinallyState.NotStarted inp -> @@ -586,21 +638,21 @@ module AsyncSeq = state := TryFinallyState.Finished dispose e compensation() - | _ -> () } } + | _ -> () }) :> AsyncSeq<'T> [] type CollectState<'T,'U> = | NotStarted of AsyncSeq<'T> - | HaveInputEnumerator of IAsyncEnumerator<'T> - | HaveInnerEnumerator of IAsyncEnumerator<'T> * IAsyncEnumerator<'U> + | HaveInputEnumerator of IAsyncSeqEnumerator<'T> + | HaveInnerEnumerator of IAsyncSeqEnumerator<'T> * IAsyncSeqEnumerator<'U> | Finished // Optimized collect implementation using direct field access instead of ref cells type OptimizedCollectEnumerator<'T, 'U>(f: 'T -> AsyncSeq<'U>, inp: AsyncSeq<'T>) = // Mutable fields instead of ref cells to reduce allocations - let mutable inputEnumerator: IAsyncEnumerator<'T> option = None - let mutable innerEnumerator: IAsyncEnumerator<'U> option = None + let mutable inputEnumerator: IAsyncSeqEnumerator<'T> option = None + let mutable innerEnumerator: IAsyncSeqEnumerator<'U> option = None let mutable disposed = false // Tail-recursive optimization to avoid deep continuation chains @@ -636,7 +688,7 @@ module AsyncSeq = return! moveNextLoop () } - interface IAsyncEnumerator<'U> with + interface IAsyncSeqEnumerator<'U> with member _.MoveNext() = moveNextLoop () member _.Dispose() = if not disposed then @@ -649,9 +701,7 @@ module AsyncSeq = | None -> () let collect (f: 'T -> AsyncSeq<'U>) (inp: AsyncSeq<'T>) : AsyncSeq<'U> = - { new IAsyncEnumerable<'U> with - member _.GetEnumerator() = - new OptimizedCollectEnumerator<'T, 'U>(f, inp) :> IAsyncEnumerator<'U> } + AsyncSeqImpl(fun () -> new OptimizedCollectEnumerator<'T, 'U>(f, inp) :> IAsyncSeqEnumerator<'U>) :> AsyncSeq<'U> // let collect (f: 'T -> AsyncSeq<'U>) (inp: AsyncSeq<'T>) : AsyncSeq<'U> = // AsyncGenerator.collect f inp @@ -660,15 +710,14 @@ module AsyncSeq = type CollectSeqState<'T,'U> = | NotStarted of seq<'T> | HaveInputEnumerator of IEnumerator<'T> - | HaveInnerEnumerator of IEnumerator<'T> * IAsyncEnumerator<'U> + | HaveInnerEnumerator of IEnumerator<'T> * IAsyncSeqEnumerator<'U> | Finished // Like collect, but the input is a sequence, where no bind is required on each step of the enumeration let collectSeq (f: 'T -> AsyncSeq<'U>) (inp: seq<'T>) : AsyncSeq<'U> = - { new IAsyncEnumerable<'U> with - member x.GetEnumerator() = - let state = ref (CollectSeqState.NotStarted inp) - { new IAsyncEnumerator<'U> with + AsyncSeqImpl(fun () -> + let state = ref (CollectSeqState.NotStarted inp) + { new IAsyncSeqEnumerator<'U> with member x.MoveNext() = async { match !state with | CollectSeqState.NotStarted inp -> @@ -705,7 +754,7 @@ module AsyncSeq = dispose e2 dispose e1 x.Dispose() - | _ -> () } } + | _ -> () }) :> AsyncSeq<'U> [] type MapState<'T> = @@ -714,10 +763,9 @@ module AsyncSeq = | Finished let ofSeq (inp: seq<'T>) : AsyncSeq<'T> = - { new IAsyncEnumerable<'T> with - member x.GetEnumerator() = - let state = ref (MapState.NotStarted inp) - { new IAsyncEnumerator<'T> with + AsyncSeqImpl(fun () -> + let state = ref (MapState.NotStarted inp) + { new IAsyncSeqEnumerator<'T> with member x.MoveNext() = async { match !state with | MapState.NotStarted inp -> @@ -737,10 +785,10 @@ module AsyncSeq = | MapState.HaveEnumerator e -> state := MapState.Finished dispose e - | _ -> () } } + | _ -> () }) :> AsyncSeq<'T> // Optimized iterAsync implementation to reduce allocations - type internal OptimizedIterAsyncEnumerator<'T>(enumerator: IAsyncEnumerator<'T>, f: 'T -> Async) = + type internal OptimizedIterAsyncEnumerator<'T>(enumerator: IAsyncSeqEnumerator<'T>, f: 'T -> Async) = let mutable disposed = false member _.IterateAsync() = @@ -761,7 +809,7 @@ module AsyncSeq = enumerator.Dispose() // Optimized iteriAsync implementation with direct tail recursion - type internal OptimizedIteriAsyncEnumerator<'T>(enumerator: IAsyncEnumerator<'T>, f: int -> 'T -> Async) = + type internal OptimizedIteriAsyncEnumerator<'T>(enumerator: IAsyncSeqEnumerator<'T>, f: int -> 'T -> Async) = let mutable disposed = false member _.IterateAsync() = @@ -855,10 +903,10 @@ module AsyncSeq = // Additional combinators (implemented as async/asyncSeq computations) // Optimized mapAsync enumerator that avoids computation builder overhead - type private OptimizedMapAsyncEnumerator<'T, 'TResult>(source: IAsyncEnumerator<'T>, f: 'T -> Async<'TResult>) = + type private OptimizedMapAsyncEnumerator<'T, 'TResult>(source: IAsyncSeqEnumerator<'T>, f: 'T -> Async<'TResult>) = let mutable disposed = false - interface IAsyncEnumerator<'TResult> with + interface IAsyncSeqEnumerator<'TResult> with member _.MoveNext() = async { let! moveResult = source.MoveNext() match moveResult with @@ -877,9 +925,7 @@ module AsyncSeq = match source with | :? AsyncSeqOp<'T> as source -> source.MapAsync f | _ -> - { new IAsyncEnumerable<'TResult> with - member _.GetEnumerator() = - new OptimizedMapAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncEnumerator<'TResult> } + AsyncSeqImpl(fun () -> new OptimizedMapAsyncEnumerator<'T, 'TResult>(source.GetEnumerator(), f) :> IAsyncSeqEnumerator<'TResult>) :> AsyncSeq<'TResult> let mapiAsync f (source : AsyncSeq<'T>) : AsyncSeq<'TResult> = asyncSeq { let i = ref 0L @@ -1405,27 +1451,21 @@ module AsyncSeq = #endif let takeWhileInclusive (f : 'a -> bool) (s : AsyncSeq<'a>) : AsyncSeq<'a> = - { new IAsyncEnumerable<'a> with - member __.GetEnumerator() = - let en = s.GetEnumerator() - let fin = ref false - { new IAsyncEnumerator<'a> with - - member __.MoveNext() = - async { - if !fin then return None - else - let! next = en.MoveNext() - match next with - | None -> return None - | Some a -> - if f a then return Some a - else - fin := true - return Some a - } - - member __.Dispose() = en.Dispose() } } + AsyncSeqImpl(fun () -> + let en = s.GetEnumerator() + let fin = ref false + { new IAsyncSeqEnumerator<'a> with + member _.MoveNext() = async { + if !fin then return None + else + let! next = en.MoveNext() + match next with + | None -> return None + | Some a -> + if f a then return Some a + else fin := true; return Some a } + interface System.IDisposable with + member _.Dispose() = en.Dispose() }) :> AsyncSeq<'a> let skipWhileAsync p (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq { use ie = source.GetEnumerator() @@ -1698,7 +1738,7 @@ module AsyncSeq = yield! loop (Some next, None) } yield! loop (None, None) } - let private mergeChoiceEnum (ie1:IAsyncEnumerator<'T1>) (ie2:IAsyncEnumerator<'T2>) : AsyncSeq> = asyncSeq { + let private mergeChoiceEnum (ie1:IAsyncSeqEnumerator<'T1>) (ie2:IAsyncSeqEnumerator<'T2>) : AsyncSeq> = asyncSeq { let! move1T = Async.StartChildAsTask (ie1.MoveNext()) let! move2T = Async.StartChildAsTask (ie2.MoveNext()) let! move = Async.chooseTasks move1T move2T @@ -1900,49 +1940,16 @@ module AsyncSeq = #if (NETSTANDARD || NET) #if !FABLE_COMPILER - let ofAsyncEnum (source: Collections.Generic.IAsyncEnumerable<_>) = asyncSeq { - let! ct = Async.CancellationToken - let e = source.GetAsyncEnumerator(ct) - use _ = - { new IDisposable with - member __.Dispose() = - // Fire-and-forget: avoids Async.RunSynchronously which deadlocks - // on single-threaded runtimes such as Blazor WASM (see issue #152). - e.DisposeAsync() |> ignore } - - let mutable currentResult = true - while currentResult do - let! r = e.MoveNextAsync().AsTask() |> Async.AwaitTask - currentResult <- r - if r then yield e.Current - } + /// Converts a BCL IAsyncEnumerable to AsyncSeq. Identity function since AsyncSeq<'T> IS IAsyncEnumerable<'T> in v4+. + [ is now identical to IAsyncEnumerable<'T>. This function is a no-op and can be removed.")>] + let ofAsyncEnum (source: System.Collections.Generic.IAsyncEnumerable<'T>) : AsyncSeq<'T> = source - let toAsyncEnum (source: AsyncSeq<'a>) = { - new Collections.Generic.IAsyncEnumerable<'a> with - member __.GetAsyncEnumerator(cancellationToken: CancellationToken) = - let mutable current = Unchecked.defaultof<_> - let enumerator = source.GetEnumerator() - { new Collections.Generic.IAsyncEnumerator<'a> with - member __.Current = current - member __.MoveNextAsync() = - let moveNextAsync = async { - let! enumerationResult = enumerator.MoveNext() - match enumerationResult with - | Some(v) -> - current <- v - return true - | _ -> return false - } - - Async.StartAsTask(moveNextAsync, cancellationToken = cancellationToken) |> ValueTask - member __.DisposeAsync() = - enumerator.Dispose() - ValueTask() - } - } + /// Returns the AsyncSeq as a BCL IAsyncEnumerable<'a>. Identity function since AsyncSeq<'a> IS IAsyncEnumerable<'a> in v4+. + [ is now identical to IAsyncEnumerable<'T>. This function is a no-op and can be removed.")>] + let toAsyncEnum (source: AsyncSeq<'a>) : System.Collections.Generic.IAsyncEnumerable<'a> = source - let ofIQueryable (query : IQueryable<'a>) = - query :?> Collections.Generic.IAsyncEnumerable<'a> |> ofAsyncEnum + let ofIQueryable (query : IQueryable<'a>) : AsyncSeq<'a> = + query :?> Collections.Generic.IAsyncEnumerable<'a> module AsyncSeqSrcImpl = diff --git a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi index a1afd9f..2d3309a 100644 --- a/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi +++ b/src/FSharp.Control.AsyncSeq/AsyncSeq.fsi @@ -3,19 +3,23 @@ namespace FSharp.Control open System -/// An enumerator for pulling results asynchronously -type IAsyncEnumerator<'T> = - abstract MoveNext : unit -> Async<'T option> - inherit IDisposable - -/// An asynchronous sequence represents a delayed computation that can be -/// started to give an enumerator for pulling results asynchronously -type IAsyncEnumerable<'T> = - abstract GetEnumerator : unit -> IAsyncEnumerator<'T> - -/// An asynchronous sequence represents a delayed computation that can be -/// started to give an enumerator for pulling results asynchronously -type AsyncSeq<'T> = IAsyncEnumerable<'T> +#if FABLE_COMPILER +/// Internal pull-based enumerator used by AsyncSeq<'T> in Fable builds. +[] +type IAsyncSeqEnumerator<'T> = + abstract MoveNext : unit -> Async<'T option> + inherit IDisposable + +/// An asynchronous sequence. +[] +type AsyncSeq<'T> = + abstract GetEnumerator : unit -> IAsyncSeqEnumerator<'T> +#else +/// An asynchronous sequence; equivalent to System.Collections.Generic.IAsyncEnumerable<'T>. +/// Use the asyncSeq { ... } computation expression to create values, and the AsyncSeq module +/// for combinators. +type AsyncSeq<'T> = System.Collections.Generic.IAsyncEnumerable<'T> +#endif [] module AsyncSeq = @@ -522,7 +526,9 @@ module AsyncSeq = /// Returns an async sequence which contains no contiguous duplicate elements. val distinctUntilChanged : source:AsyncSeq<'T> -> AsyncSeq<'T> when 'T : equality +#if FABLE_COMPILER [] +#endif val getIterator : source:AsyncSeq<'T> -> (unit -> Async<'T option>) #if !FABLE_COMPILER @@ -565,12 +571,14 @@ module AsyncSeq = /// completion of sub-sequences depends on completion of other sub-sequences. val groupBy<'T, 'Key when 'Key : equality> : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> AsyncSeq<'Key * AsyncSeq<'T>> - #if (NETSTANDARD2_1 || NETCOREAPP3_0) + #if (NETSTANDARD || NET) - /// Creates an asynchronous computation that asynchronously yields results from the provided .NET IAsyncEnumerable. + /// Returns the input AsyncSeq as a BCL IAsyncEnumerable<'T>. Identity since AsyncSeq<'T> IS IAsyncEnumerable<'T> in v4. + [ is now identical to IAsyncEnumerable<'T>. This function is a no-op and can be removed.")>] val ofAsyncEnum<'T> : source: Collections.Generic.IAsyncEnumerable<'T> -> AsyncSeq<'T> - /// Creates an .NET IAsyncEnumerable from the provided AsyncSeq computation. + /// Returns the input AsyncSeq as a BCL IAsyncEnumerable<'T>. Identity since AsyncSeq<'T> IS IAsyncEnumerable<'T> in v4. + [ is now identical to IAsyncEnumerable<'T>. This function is a no-op and can be removed.")>] val toAsyncEnum<'T> : source: AsyncSeq<'T> -> Collections.Generic.IAsyncEnumerable<'T> val ofIQueryable<'T> : source: Linq.IQueryable<'T> -> AsyncSeq<'T> diff --git a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs index ffb9c10..e13a842 100644 --- a/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs +++ b/tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs @@ -1227,18 +1227,16 @@ let ``AsyncSeq.ofObservableBuffered should work (one, take)``() = [] let ``AsyncSeq.getIterator should work``() = let s1 = [1..2] |> AsyncSeq.ofSeq - use i = s1.GetEnumerator() - match i.MoveNext() |> Async.RunSynchronously with - | None -> Assert.Fail("expected Some") - | Some v -> - Assert.AreEqual(v,1) - match i.MoveNext() |> Async.RunSynchronously with - | None -> Assert.Fail("expected Some") - | Some v -> - Assert.AreEqual(v,2) - match i.MoveNext() |> Async.RunSynchronously with - | None -> () - | Some _ -> Assert.Fail("expected None") + let i = s1.GetAsyncEnumerator(System.Threading.CancellationToken.None) + try + let move () = i.MoveNextAsync().AsTask() |> Async.AwaitTask |> Async.RunSynchronously + Assert.True(move(), "expected first element") + Assert.AreEqual(i.Current, 1) + Assert.True(move(), "expected second element") + Assert.AreEqual(i.Current, 2) + Assert.False(move(), "expected end of sequence") + finally + i.DisposeAsync() |> ignore diff --git a/tests/fable/package-lock.json b/tests/fable/package-lock.json index 4ba1812..b9920ff 100644 --- a/tests/fable/package-lock.json +++ b/tests/fable/package-lock.json @@ -42,6 +42,7 @@ "resolved": "https://registry.npmjs.org/@babel/core/-/core-7.15.8.tgz", "integrity": "sha512-3UG9dsxvYBMYwRv+gS41WKHno4K60/9GPy1CJaH6xy3Elq8CTtvtjT5R5jmNhXfCYLX2mTw+7/aq5ak/gOE0og==", "dev": true, + "peer": true, "dependencies": { "@babel/code-frame": "^7.15.8", "@babel/generator": "^7.15.8", @@ -9313,6 +9314,7 @@ "resolved": "https://registry.npmjs.org/@babel/core/-/core-7.15.8.tgz", "integrity": "sha512-3UG9dsxvYBMYwRv+gS41WKHno4K60/9GPy1CJaH6xy3Elq8CTtvtjT5R5jmNhXfCYLX2mTw+7/aq5ak/gOE0og==", "dev": true, + "peer": true, "requires": { "@babel/code-frame": "^7.15.8", "@babel/generator": "^7.15.8", diff --git a/version.props b/version.props index ca16dc0..23ed282 100644 --- a/version.props +++ b/version.props @@ -1,5 +1,5 @@ - 3.3.1 + 4.0.0-alpha.1