Skip to content

Instantly share code, notes, and snippets.

@chklauser
Last active May 22, 2024 19:36
Show Gist options
  • Save chklauser/e2eadb8ade49fff444cacde02e07a947 to your computer and use it in GitHub Desktop.
Save chklauser/e2eadb8ade49fff444cacde02e07a947 to your computer and use it in GitHub Desktop.
// Task builder for F# that compiles to allocation-free paths for synchronous code.
//
// Originally written in 2016 by Robert Peele ([email protected])
// New operator-based overload resolution for F# 4.0 compatibility by Gustavo Leon in 2018.
// Revised for insertion into FSharp.Core by Microsoft, 2019.
//
// Original notice:
// To the extent possible under law, the author(s) have dedicated all copyright and related and neighboring rights
// to this software to the public domain worldwide. This software is distributed without any warranty.
//
// Updates:
// Copyright (c) Microsoft Corporation. All Rights Reserved. See License.txt in the project root for license information.
// Adapted from https://github.com/dotnet/fsharp/blob/main/src/FSharp.Core/tasks.fs to include the Result monad.
#nowarn "3513"
namespace Chabis.FSharp.TaskProcess
open System
open System.Runtime.CompilerServices
open System.Threading.Tasks
open Microsoft.FSharp.Core
open Microsoft.FSharp.Core.CompilerServices
open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers
open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators
open Microsoft.FSharp.Collections
/// The extra data stored in ResumableStateMachine for tasks
[<Struct; NoComparison; NoEquality>]
type TaskProcessStateMachineData<'T, 'TAbort> =
val mutable Abort: 'TAbort option
[<DefaultValue(false)>]
val mutable Result: 'T
[<DefaultValue(false)>]
val mutable MethodBuilder: AsyncTaskMethodBuilder<Result<'T, 'TAbort>>
and TaskProcessStateMachine<'TOverall, 'TAbortOverall> = ResumableStateMachine<TaskProcessStateMachineData<'TOverall, 'TAbortOverall>>
and TaskProcessResumptionFunc<'TOverall, 'TAbortOverall> = ResumptionFunc<TaskProcessStateMachineData<'TOverall, 'TAbortOverall>>
and TaskProcessResumptionDynamicInfo<'TOverall, 'TAbortOverall> = ResumptionDynamicInfo<TaskProcessStateMachineData<'TOverall, 'TAbortOverall>>
and TaskProcessCode<'TOverall,'TAbortOverall, 'T> = ResumableCode<TaskProcessStateMachineData<'TOverall, 'TAbortOverall>, 'T>
type TaskProcessBuilder() =
member inline _.Delay(generator: unit -> TaskProcessCode<'TOverall, 'TAbortOverall, 'T>) : TaskProcessCode<'TOverall, 'TAbortOverall, 'T> =
TaskProcessCode<'TOverall, 'TAbortOverall, 'T>(fun sm -> (generator ()).Invoke(&sm))
/// Used to represent no-ops like the implicit empty "else" branch of an "if" expression.
[<DefaultValue>]
member inline _.Zero() : TaskProcessCode<'TOverall, 'TAbortOverall, unit> =
ResumableCode.Zero()
member inline _.Return(value: 'T) : TaskProcessCode<'T, 'TAbortOverall, 'T> =
TaskProcessCode<'T, 'TAbortOverall, _>(fun sm ->
sm.Data.Result <- value
true)
member inline _.Yield(error: 'TAbortOverall) : TaskProcessCode<'T, 'TAbortOverall, 'unit> =
TaskProcessCode<'T, 'TAbortOverall, 'unit>(fun sm ->
sm.Data.Abort <- Some error
true)
[<CustomOperation("abort")>]
member inline _.Abort(error: 'TAbortOverall) : TaskProcessCode<'T, 'TAbortOverall, 'T> =
TaskProcessCode<'T, 'TAbortOverall, _>(fun sm ->
sm.Data.Abort <- Some error
true)
/// Chains together a step with its following step.
/// Note that this requires that the first step has no result.
/// This prevents constructs like `task { return 1; return 2; }`.
member inline _.Combine
(
task1: TaskProcessCode<'TOverall, 'TAbortOverall, unit>,
task2: TaskProcessCode<'TOverall, 'TAbortOverall, 'T>
) : TaskProcessCode<'TOverall, 'TAbortOverall, 'T> =
ResumableCode.Combine(task1, task2)
/// Builds a step that executes the body while the condition predicate is true.
member inline _.While
(
[<InlineIfLambda>] condition: unit -> bool,
body: TaskProcessCode<'TOverall, 'TAbortOverall, unit>
) : TaskProcessCode<'TOverall, 'TAbortOverall, unit> =
ResumableCode.While(condition, body)
/// Wraps a step in a try/with. This catches exceptions both in the evaluation of the function
/// to retrieve the step, and in the continuation of the step (if any).
member inline _.TryWith
(
body: TaskProcessCode<'TOverall, 'TAbortOverall, 'T>,
catch: exn -> TaskProcessCode<'TOverall, 'TAbortOverall, 'T>
) : TaskProcessCode<'TOverall, 'TAbortOverall, 'T> =
ResumableCode.TryWith(body, catch)
/// Wraps a step in a try/finally. This catches exceptions both in the evaluation of the function
/// to retrieve the step, and in the continuation of the step (if any).
member inline _.TryFinally
(
body: TaskProcessCode<'TOverall, 'TAbortOverall, 'T>,
[<InlineIfLambda>] compensation: unit -> unit
) : TaskProcessCode<'TOverall, 'TAbortOverall, 'T> =
ResumableCode.TryFinally(
body,
ResumableCode<_, _>(fun _sm ->
compensation ()
true)
)
member inline _.For(sequence: seq<'T>, body: 'T -> TaskProcessCode<'TOverall, 'TAbortOverall, unit>) : TaskProcessCode<'TOverall, 'TAbortOverall, unit> =
ResumableCode.For(sequence, body)
// This is the dynamic implementation - this is not used
// for statically compiled tasks. An executor (resumptionFuncExecutor) is
// registered with the state machine, plus the initial resumption.
// The executor stays constant throughout the execution, it wraps each step
// of the execution in a try/with. The resumption is changed at each step
// to represent the continuation of the computation.
static member RunDynamic(code: TaskProcessCode<'T, 'TAbortOverall, 'T>) : Task<Result<'T, 'TAbortOverall>> =
let mutable sm = TaskProcessStateMachine<'T, 'TAbortOverall>()
let initialResumptionFunc = TaskProcessResumptionFunc<'T, 'TAbortOverall>(fun sm -> code.Invoke(&sm))
let resumptionInfo =
{ new TaskProcessResumptionDynamicInfo<'T, 'TAbortOverall>(initialResumptionFunc) with
member info.MoveNext(sm) =
let mutable savedExn = null
try
sm.ResumptionDynamicInfo.ResumptionData <- null
let step = info.ResumptionFunc.Invoke(&sm)
if step then
// TODO: using Option and Result is not super efficient
sm.Data.MethodBuilder.SetResult(
sm.Data.Abort
|> Option.map Result.Error
|> Option.defaultValue (Result.Ok sm.Data.Result)
)
else
let mutable awaiter =
sm.ResumptionDynamicInfo.ResumptionData :?> ICriticalNotifyCompletion
assert not (isNull awaiter)
sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm)
with exn ->
savedExn <- exn
// Run SetException outside the stack unwind, see https://github.com/dotnet/roslyn/issues/26567
match savedExn with
| null -> ()
| exn -> sm.Data.MethodBuilder.SetException exn
member _.SetStateMachine(sm, state) =
sm.Data.MethodBuilder.SetStateMachine(state)
}
sm.ResumptionDynamicInfo <- resumptionInfo
sm.Data.MethodBuilder <- AsyncTaskMethodBuilder<Result<'T, 'TAbortOverall>>.Create()
sm.Data.MethodBuilder.Start(&sm)
sm.Data.MethodBuilder.Task
member inline _.Run(code: TaskProcessCode<'T, 'TAbortOverall, 'T>) : Task<Result<'T, 'TAbortOverall>> =
if __useResumableCode then
__stateMachine<TaskProcessStateMachineData<'T, 'TAbortOverall>, Task<Result<'T, 'TAbortOverall>>>
(MoveNextMethodImpl<_>(fun sm ->
//-- RESUMABLE CODE START
__resumeAt sm.ResumptionPoint
let mutable __stack_exn: Exception = null
try
let __stack_code_fin = code.Invoke(&sm)
if __stack_code_fin then
// TODO: using Option and Result is not super efficient
sm.Data.MethodBuilder.SetResult(
sm.Data.Abort
|> Option.map Result.Error
|> Option.defaultValue (Result.Ok sm.Data.Result)
)
with exn ->
__stack_exn <- exn
// Run SetException outside the stack unwind, see https://github.com/dotnet/roslyn/issues/26567
match __stack_exn with
| null -> ()
| exn -> sm.Data.MethodBuilder.SetException exn
//-- RESUMABLE CODE END
))
(SetStateMachineMethodImpl<_>(fun sm state -> sm.Data.MethodBuilder.SetStateMachine(state)))
(AfterCode<_, _>(fun sm ->
sm.Data.MethodBuilder <- AsyncTaskMethodBuilder<Result<'T, 'TAbortOverall>>.Create()
sm.Data.MethodBuilder.Start(&sm)
sm.Data.MethodBuilder.Task))
else
TaskProcessBuilder.RunDynamic(code)
[<AutoOpen>]
module TaskProcessBuilder =
let taskProcess = TaskProcessBuilder()
// TODO: this method should not be necessary. Overload resolution should help us
// find the correct implementation.
let infallible (t: Task<'T>) : Task<Result<'T, 'TAbort>> = task {
let! r = t
return Ok r
}
namespace Chabis.FSharp.TaskProcess.TaskProcessBuilderExtensions
open Chabis.FSharp.TaskProcess
open System
open System.Runtime.CompilerServices
open System.Threading.Tasks
open Microsoft.FSharp.Core
open Microsoft.FSharp.Core.CompilerServices
open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers
open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators
[<AutoOpen>]
module LiftInfallibleLowPriority =
type TaskProcessBuilder with
[<NoEagerConstraintApplication>]
static member inline BindDynamic< ^TaskLike, 'TResult1, 'TResult2, ^Awaiter, 'TOverall, 'TAbortOverall
when ^TaskLike: (member GetAwaiter: unit -> ^Awaiter)
and ^Awaiter :> ICriticalNotifyCompletion
and ^Awaiter: (member get_IsCompleted: unit -> bool)
and ^Awaiter: (member GetResult: unit -> 'TResult1)>
(
sm: byref<_>,
task: ^TaskLike,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : bool =
let mutable awaiter = (^TaskLike: (member GetAwaiter: unit -> ^Awaiter) task)
let cont =
TaskProcessResumptionFunc<'TOverall, 'TAbortOverall>(fun sm ->
let result = (^Awaiter: (member GetResult: unit -> 'TResult1) awaiter)
(continuation result).Invoke(&sm))
// shortcut to continue immediately
if (^Awaiter: (member get_IsCompleted: unit -> bool) awaiter) then
cont.Invoke(&sm)
else
sm.ResumptionDynamicInfo.ResumptionData <- (awaiter :> ICriticalNotifyCompletion)
sm.ResumptionDynamicInfo.ResumptionFunc <- cont
false
[<NoEagerConstraintApplication>]
member inline _.Bind< ^TaskLike, 'TResult1, 'TResult2, ^Awaiter, 'TOverall, 'TAbortOverall
when ^TaskLike: (member GetAwaiter: unit -> ^Awaiter)
and ^Awaiter :> ICriticalNotifyCompletion
and ^Awaiter: (member get_IsCompleted: unit -> bool)
and ^Awaiter: (member GetResult: unit -> 'TResult1)>
(
task: ^TaskLike,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2> =
TaskProcessCode<'TOverall, 'TAbortOverall, _>(fun sm ->
if __useResumableCode then
//-- RESUMABLE CODE START
// Get an awaiter from the awaitable
let mutable awaiter = (^TaskLike: (member GetAwaiter: unit -> ^Awaiter) task)
let mutable __stack_fin = true
if not (^Awaiter: (member get_IsCompleted: unit -> bool) awaiter) then
// This will yield with __stack_yield_fin = false
// This will resume with __stack_yield_fin = true
let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm)
__stack_fin <- __stack_yield_fin
if __stack_fin then
let result = (^Awaiter: (member GetResult: unit -> 'TResult1) awaiter)
(continuation result).Invoke(&sm)
else
sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm)
false
else
TaskProcessBuilder.BindDynamic< ^TaskLike, 'TResult1, 'TResult2, ^Awaiter, 'TOverall, 'TAbortOverall>(
&sm,
task,
continuation
)
//-- RESUMABLE CODE END
)
[<NoEagerConstraintApplication>]
member inline this.ReturnFrom< ^TaskLike, ^Awaiter, 'T, 'TAbortOverall
when ^TaskLike: (member GetAwaiter: unit -> ^Awaiter)
and ^Awaiter :> ICriticalNotifyCompletion
and ^Awaiter: (member get_IsCompleted: unit -> bool)
and ^Awaiter: (member GetResult: unit -> 'T)>
(task: ^TaskLike)
: TaskProcessCode<'T, 'TAbortOverall, 'T> =
this.Bind(task, this.Return)
[<AutoOpen>]
module LowPriority =
// Low priority extensions
type TaskProcessBuilder with
[<NoEagerConstraintApplication>]
static member inline BindDynamic< ^TaskLike, 'TResult1, 'TResult2, ^Awaiter, 'TOverall, 'TAbortOverall
when ^TaskLike: (member GetAwaiter: unit -> ^Awaiter)
and ^Awaiter :> ICriticalNotifyCompletion
and ^Awaiter: (member get_IsCompleted: unit -> bool)
and ^Awaiter: (member GetResult: unit -> Result<'TResult1, 'TAbortOverall>)>
(
sm: byref<_>,
task: ^TaskLike,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : bool =
let mutable awaiter = (^TaskLike: (member GetAwaiter: unit -> ^Awaiter) task)
let cont =
TaskProcessResumptionFunc<'TOverall, 'TAbortOverall>(fun sm ->
match (^Awaiter: (member GetResult: unit -> Result<'TResult1, 'TAbortOverall>) awaiter) with
| Ok result -> (continuation result).Invoke(&sm)
| Error error ->
sm.Data.Abort <- Some error
true
)
// shortcut to continue immediately
if (^Awaiter: (member get_IsCompleted: unit -> bool) awaiter) then
cont.Invoke(&sm)
else
sm.ResumptionDynamicInfo.ResumptionData <- (awaiter :> ICriticalNotifyCompletion)
sm.ResumptionDynamicInfo.ResumptionFunc <- cont
false
[<NoEagerConstraintApplication>]
member inline _.Bind< ^TaskLike, 'TResult1, 'TResult2, ^Awaiter, 'TOverall, 'TAbortOverall
when ^TaskLike: (member GetAwaiter: unit -> ^Awaiter)
and ^Awaiter :> ICriticalNotifyCompletion
and ^Awaiter: (member get_IsCompleted: unit -> bool)
and ^Awaiter: (member GetResult: unit -> Result<'TResult1, 'TAbortOverall>)>
(
task: ^TaskLike,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2> =
TaskProcessCode<'TOverall, 'TAbortOverall, _>(fun sm ->
if __useResumableCode then
//-- RESUMABLE CODE START
// Get an awaiter from the awaitable
let mutable awaiter = (^TaskLike: (member GetAwaiter: unit -> ^Awaiter) task)
let mutable __stack_fin = true
if not (^Awaiter: (member get_IsCompleted: unit -> bool) awaiter) then
// This will yield with __stack_yield_fin = false
// This will resume with __stack_yield_fin = true
let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm)
__stack_fin <- __stack_yield_fin
if __stack_fin then
match (^Awaiter: (member GetResult: unit -> Result<'TResult1, 'TAbortOverall>) awaiter) with
| Ok result -> (continuation result).Invoke(&sm)
| Error error ->
sm.Data.Abort <- Some error
true
else
sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm)
false
else
TaskProcessBuilder.BindDynamic< ^TaskLike, 'TResult1, 'TResult2, ^Awaiter, 'TOverall, 'TAbortOverall>(
&sm,
task,
continuation
)
//-- RESUMABLE CODE END
)
[<NoEagerConstraintApplication>]
member inline this.ReturnFrom< ^TaskLike, ^Awaiter, 'T, 'TAbortOverall
when ^TaskLike: (member GetAwaiter: unit -> ^Awaiter)
and ^Awaiter :> ICriticalNotifyCompletion
and ^Awaiter: (member get_IsCompleted: unit -> bool)
and ^Awaiter: (member GetResult: unit -> Result<'T, 'TAbortOverall>)>
(task: ^TaskLike)
: TaskProcessCode<'T, 'TAbortOverall, 'T> =
this.Bind(task, this.Return)
member inline _.Using<'Resource, 'TOverall, 'TAbortOverall, 'T when 'Resource :> IDisposable>
(
resource: 'Resource,
body: 'Resource -> TaskProcessCode<'TOverall, 'TAbortOverall, 'T>
) =
ResumableCode.Using(resource, body)
[<AutoOpen>]
module LiftInfallibleHighPriority =
type TaskProcessBuilder with
static member BindDynamic
(
sm: byref<_>,
task: Task<'TResult1>,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : bool =
let mutable awaiter = task.GetAwaiter()
let cont =
TaskProcessResumptionFunc<'TOverall, 'TAbortOverall>(fun sm ->
let result = awaiter.GetResult()
(continuation result).Invoke(&sm))
// shortcut to continue immediately
if awaiter.IsCompleted then
cont.Invoke(&sm)
else
sm.ResumptionDynamicInfo.ResumptionData <- (awaiter :> ICriticalNotifyCompletion)
sm.ResumptionDynamicInfo.ResumptionFunc <- cont
false
member inline _.Bind
(
task: Task<'TResult1>,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2> =
TaskProcessCode<'TOverall, 'TAbortOverall, _>(fun sm ->
if __useResumableCode then
//-- RESUMABLE CODE START
// Get an awaiter from the task
let mutable awaiter = task.GetAwaiter()
let mutable __stack_fin = true
if not awaiter.IsCompleted then
// This will yield with __stack_yield_fin = false
// This will resume with __stack_yield_fin = true
let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm)
__stack_fin <- __stack_yield_fin
if __stack_fin then
let result = awaiter.GetResult()
(continuation result).Invoke(&sm)
else
sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm)
false
else
TaskProcessBuilder.BindDynamic(&sm, task, continuation)
//-- RESUMABLE CODE END
)
member inline this.ReturnFrom(task: Task<'T>) : TaskProcessCode<'T, 'TAbortOverall, 'T> =
this.Bind(task, this.Return)
[<AutoOpen>]
module HighPriority =
// High priority extensions
type TaskProcessBuilder with
static member BindDynamic
(
sm: byref<_>,
task: Task<Result<'TResult1, 'TAbortOverall>>,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : bool =
let mutable awaiter = task.GetAwaiter()
let cont =
TaskProcessResumptionFunc<'TOverall, 'TAbortOverall>(fun sm ->
match awaiter.GetResult() with
| Ok result -> (continuation result).Invoke(&sm)
| Error error ->
sm.Data.Abort <- Some error
true
)
// shortcut to continue immediately
if awaiter.IsCompleted then
cont.Invoke(&sm)
else
sm.ResumptionDynamicInfo.ResumptionData <- (awaiter :> ICriticalNotifyCompletion)
sm.ResumptionDynamicInfo.ResumptionFunc <- cont
false
member inline _.Bind
(
task: Task<Result<'TResult1, 'TAbortOverall>>,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2> =
TaskProcessCode<'TOverall, 'TAbortOverall, _>(fun sm ->
if __useResumableCode then
//-- RESUMABLE CODE START
// Get an awaiter from the task
let mutable awaiter = task.GetAwaiter()
let mutable __stack_fin = true
if not awaiter.IsCompleted then
// This will yield with __stack_yield_fin = false
// This will resume with __stack_yield_fin = true
let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm)
__stack_fin <- __stack_yield_fin
if __stack_fin then
match awaiter.GetResult() with
| Ok result -> (continuation result).Invoke(&sm)
| Error error ->
sm.Data.Abort <- Some error
true
else
sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm)
false
else
TaskProcessBuilder.BindDynamic(&sm, task, continuation)
//-- RESUMABLE CODE END
)
member inline this.ReturnFrom(task: Task<Result<'T, 'TAbortOverall>>) : TaskProcessCode<'T, 'TAbortOverall, 'T> =
this.Bind(task, this.Return)
module LiftInfallibleMediumPriority =
open LiftInfallibleHighPriority
// Medium priority extensions
type TaskProcessBuilder with
member inline this.Bind
(
computation: Async<'TResult1>,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2> =
this.Bind(Async.StartImmediateAsTask computation, continuation)
member inline this.ReturnFrom(computation: Async<'T>) : TaskProcessCode<'T, 'TAbortOverall, 'T> =
this.ReturnFrom(Async.StartImmediateAsTask computation)
module MediumPriority =
open HighPriority
// Medium priority extensions
type TaskProcessBuilder with
member inline this.Bind
(
computation: Async<Result<'TResult1, 'TAbortOverall>>,
continuation: 'TResult1 -> TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2>
) : TaskProcessCode<'TOverall, 'TAbortOverall, 'TResult2> =
this.Bind(Async.StartImmediateAsTask computation, continuation)
member inline this.ReturnFrom(computation: Async<Result<'T, 'TAbortOverall>>) : TaskProcessCode<'T, 'TAbortOverall, 'T> =
this.ReturnFrom(Async.StartImmediateAsTask computation)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment