Last active
November 18, 2023 06:10
-
-
Save straight-shoota/4437971943bae7000f03fabf3d814a2f to your computer and use it in GitHub Desktop.
Spindle - Structured Concurrency for Crystal (PoC)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def bob | |
10.times do | |
puts "B" | |
sleep 0.006 | |
end | |
end | |
def alice | |
3.times do | |
puts "A" | |
sleep 0.01 | |
end | |
raise "Alice aborted" | |
end | |
concurrent do |spindle| | |
spindle.spawn "Alice" do | |
alice | |
end | |
spindle.spawn "Bob", ->bob | |
end | |
def concurrent | |
spindle = Spindle.new | |
begin | |
yield spindle | |
ensure | |
spindle.spin | |
end | |
end | |
macro concurrent(first, *args) | |
concurrent do | |
spawn do | |
{{ first }} | |
end | |
{% for arg in args %} | |
spawn do | |
{{ arg }} | |
end | |
{% end %} | |
end | |
end | |
class Spindle | |
@fibers = [] of Fiber | |
@finished = Channel(Tuple(Fiber, Exception?)).new | |
def spawn(name = nil, &block : -> _) | |
spawn(name, block) | |
end | |
def spawn(name, proc) | |
puts "spindle spawning #{name}" | |
finished = @finished | |
fiber = Fiber.new(name) do | |
begin | |
proc.call | |
rescue exc | |
finished.send({Fiber.current, exc}) | |
else | |
finished.send({Fiber.current, nil}) | |
end | |
end | |
@fibers << fiber | |
Scheduler.enqueue fiber | |
end | |
def <<(fiber : Fiber) | |
@fibers << fiber | |
end | |
def spin | |
exception = nil | |
until @fibers.empty? | |
fiber, exc = @finished.receive | |
@fibers.delete fiber | |
case exc | |
when Fiber::CancelledException | |
puts "#{fiber.name} cancelled" | |
when Exception | |
@fibers.each &.cancel | |
puts "#{fiber.name} raised" | |
else | |
puts "#{fiber.name} finished" | |
end | |
exception = exc unless exception | |
end | |
raise exception if exception | |
end | |
end | |
class Fiber | |
# A `CancelRequestException` holds the callstack of `CancelledException` where | |
# `Fiber#cancel` was called. | |
class CancelRequestException < Exception | |
getter fiber : Fiber | |
def initialize(@callstack : CallStack, message : String = "Fiber cancel request") | |
super(message) | |
@fiber = Fiber.current | |
end | |
end | |
# A `CancelledException` is raised when a fiber is resumed after it was cancelled. | |
# See `Fiber#cancel` for details. | |
# | |
# If `cause` is `nil`, the fiber was cancelled while executing. | |
class CancelledException < Exception | |
getter fiber : Fiber | |
def initialize(@fiber : Fiber, cause : CancelRequestException? = nil) | |
super("Fiber cancelled: #{fiber}", cause) | |
end | |
def cause : CancelRequestException? | |
@cause.as(CancelRequestException?) | |
end | |
end | |
@cancel_request : CancelRequestException? = nil | |
# Stops this fiber from executing again and unwinds its stack. | |
# | |
# This method requests the fiber to raise a `CancelledException` the next time | |
# it is resumed and enqueues it in the scheduler. Therefore the unwinding will | |
# only take place the next time the scheduler reschedules. | |
# | |
# Raises `CancelledException` if this is the current fiber. | |
def cancel | |
if Fiber.current == self | |
# In case the current fiber is to be canceled, just raise the exception directly. | |
raise CancelledException.new self | |
else | |
# Otherwise register a cancel request, it will be evaluated on next resume. | |
@cancel_request ||= CancelRequestException.new(CallStack.new) | |
end | |
# Trigger scheduling | |
@resume_event.try &.free | |
Scheduler.enqueue(self) | |
end | |
def resume | |
previous_def | |
if cancel_request = Fiber.current.@cancel_request | |
raise CancelledException.new Fiber.current, cancel_request | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment