Last active
October 1, 2023 20:15
-
-
Save giuliohome/bdcc41c2959c6410370914559ed2a364 to your computer and use it in GitHub Desktop.
Async Workers with Condition Handling
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Control.Concurrent.STM | |
import Control.Monad | |
import Control.Concurrent | |
workerWaitForCondition :: TVar Bool -> IO () | |
workerWaitForCondition condition = atomically $ do | |
c <- readTVar condition | |
unless c retry | |
workerMakeConditionTrue :: TVar Bool -> IO () | |
workerMakeConditionTrue condition = do | |
threadDelay 2000000 -- Simulate some work | |
atomically $ do | |
writeTVar condition True | |
main :: IO () | |
main = do | |
w2_condition <- newTVarIO False | |
end_condition <- newEmptyMVar | |
-- Create two worker tasks | |
let task1 = | |
putStrLn "Worker 1 is waiting for the w2 condition..." >> | |
workerWaitForCondition w2_condition >> | |
putStrLn "Worker 1: Condition is met!" >> | |
putMVar end_condition "Forked thread has completed." | |
task2 = | |
putStrLn "Worker 2 is making the w2 condition true." >> | |
workerMakeConditionTrue w2_condition >> | |
putStrLn "Worker 2 has set the w2 condition true." | |
-- Wait for both tasks to complete | |
threadId1 <- forkIO task1 | |
putStrLn ("Hello, task1 " ++ show threadId1 ++ "!") | |
threadId2 <- forkIO task2 | |
putStrLn ("Hello, task2 " ++ show threadId2 ++ "!") | |
putStrLn ("waiting for the end of the workflow") | |
-- Wait for the forked thread to complete by taking the MVar | |
end_message <- takeMVar end_condition | |
putStrLn end_message |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def worker_wait_for_condition(): | |
async with condition: | |
print("Worker 1 is waiting for the condition...") | |
await condition.wait() | |
print("Worker 1: Condition is met!") | |
async def worker_make_condition_true(): | |
print("Worker 2 is making the condition true.") | |
await asyncio.sleep(2) # Simulate some work | |
async with condition: | |
condition.notify() | |
print("Worker 2 has set the condition true.") | |
async def main(): | |
global condition # Make the condition variable global | |
condition = asyncio.Condition() # Recreate the condition within the same event loop | |
# Create two worker tasks | |
task1 = asyncio.create_task(worker_wait_for_condition()) | |
task2 = asyncio.create_task(worker_make_condition_true()) | |
# Wait for both tasks to complete | |
await asyncio.gather(task1, task2) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//> using dep "org.typelevel::cats-effect::3.5.2" | |
import cats.effect._ | |
import cats.syntax.all._ | |
import scala.concurrent.duration._ | |
object Main extends IOApp.Simple { | |
def workerWaitForCondition(condition: Deferred[IO, Unit]): IO[Unit] = | |
for { | |
_ <- IO(println("Worker 1 is waiting for the condition...")) | |
_ <- condition.get | |
_ <- IO(println("Worker 1: Condition is met!")) | |
} yield () | |
def workerMakeConditionTrue(condition: Deferred[IO, Unit]): IO[Unit] = | |
for { | |
_ <- IO(println("Worker 2 is making the condition true.")) | |
_ <- IO.sleep(2.seconds) // Simulate some work | |
_ <- condition.complete(()) | |
_ <- IO(println("Worker 2 has set the condition true.")) | |
} yield () | |
def run: IO[Unit] = | |
for { | |
condition <- Deferred[IO, Unit] | |
task1 = workerWaitForCondition(condition) | |
task2 = workerMakeConditionTrue(condition) | |
_ <- (task1, task2).parTupled | |
} yield () | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This Python code snippet (as well as the Haskell version or Scala Cats Effect one) demonstrates the use of asynchronous workers that wait for specific conditions to be met and can also set those conditions asynchronously. The code utilizes Python's asyncio library to achieve concurrent and non-blocking behavior.
Features:
Two asynchronous workers: One worker waits for a specified condition, while the other worker makes that condition true when necessary.
Proper synchronization: The code ensures proper synchronization between the workers to avoid race conditions or conflicts.
Asynchronous waiting: Workers use await to asynchronously wait for conditions, allowing efficient and non-blocking execution.
Condition setting: The code simulates the process of making a condition true, which can be replaced with any real-world task or event.
This code snippet serves as a foundation for building concurrent and asynchronous applications in Python, allowing you to create complex workflows that depend on specific conditions being met asynchronously.