Last active
May 9, 2024 20:39
-
-
Save giuliohome/f19838b6636edfae6ada0d6e8254e8c3 to your computer and use it in GitHub Desktop.
The script simulates a scenario where certain activities depend on others as weak dependencies and may be restarted if the dependencies are completed.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
# weak dependencies list as (dependency, dependent) | |
weak_deps = [(2,3),(4,1)] | |
# e.g. let's say activity 2 updates input of activity 3 as a weak dependency, etc... | |
async def gather_with_first_completed(awaitable_results): | |
# Create a list to store the completed results | |
completed_results = [] | |
tasks = [asyncio.create_task(t) for t in awaitable_results] | |
while tasks: | |
# Wait for the first task to complete | |
# Use asyncio.wait to wait for the first task to complete | |
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) | |
# Iterate through the completed tasks | |
for task in done: | |
curr_result = task.result() | |
print('activity ' + str(curr_result) + ' completed') | |
# Here, you can restart some of the activities in completed_results | |
# if the just-finished task has updated their inputs. | |
for weak_depcy, weak_depdent in weak_deps: | |
if curr_result == weak_depcy: | |
if weak_depdent in completed_results: | |
pending = pending | set([asyncio.create_task(asyncio.sleep(2, result=weak_depdent))]) | |
print('activity ' + str(weak_depdent) + ' restarted because its weak dependency ' + str(weak_depcy) + ' completed') | |
# Append the result of the completed task to the list | |
completed_results.append(curr_result) | |
# Remove the completed tasks from the list of tasks | |
tasks = pending | |
return completed_results | |
async def main_loop(): | |
awaitable_results = [asyncio.sleep(2, result=i) for i in range(5)] | |
result_list = await gather_with_first_completed(awaitable_results) | |
print('all activities done') | |
print(result_list) | |
asyncio.run(main_loop()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
async def gather_with_first_completed(*awaitable_results): | |
# Create a list to store the completed results | |
completed_results = [] | |
tasks = [asyncio.create_task(t) for t in awaitable_results] | |
while tasks: | |
# Wait for the first task to complete | |
# Use asyncio.wait to wait for the first task to complete | |
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) | |
# Iterate through the completed tasks | |
for task in done: | |
curr_result = task.result() | |
# Append the result of the completed task to the list | |
completed_results.append(curr_result) | |
# Remove the completed tasks from the list of tasks | |
tasks = pending | |
return completed_results | |
# usage | |
async def t1(): | |
await asyncio.sleep(1) | |
return 't1' | |
async def t5(): | |
await asyncio.sleep(5) | |
return 't5' | |
async def t10(): | |
await asyncio.sleep(10) | |
return 't10' | |
async def usage(): | |
my_gather = await gather_with_first_completed(t5(), t10(), t1()) | |
print('my gather: ', my_gather) | |
# ['t1', 't5', 't10'] | |
std_gather = await asyncio.gather(t5(), t10(), t1()) | |
print('standard gather: ', std_gather) | |
# ['t5', 't10', 't1'] | |
asyncio.run(usage()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example of output