Created
May 7, 2020 12:36
-
-
Save nngogol/15d51db5b979c8f9ae05ea1bd7a67754 to your computer and use it in GitHub Desktop.
(async)Racing show in less then 100 lines
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Made at 2020-05-07 15:31:52 | |
Asyncio Racing Show | |
run with: | |
$ clear; python3 -uB comp_simple.py | |
''' | |
import asyncio, random, queue | |
failed_runners = queue.Queue() | |
flag = False | |
# first runner | |
async def runner1(name): | |
global flag, failed_runners | |
for i in range(5): | |
# print(f'RUNNING Mr.{name} at {i} meters') | |
await asyncio.sleep(random.randint(1,2)/100) | |
if not flag: # if nobody grabbed the flag -> I won. | |
flag = f' Mister {name}' | |
print(f' flag is taken by Mr.{name}\n') | |
else: # if flag is gone -> I failed. | |
failed_runners.put(f'Finished, but failed "Mr.{name}"') | |
# other runners | |
async def runner2(name): | |
# thesame code, as in runner1 | |
global flag, failed_runners | |
for i in range(5): | |
# print(f'RUNNING Mr.{name} at {i} meters') | |
await asyncio.sleep(random.randint(1,2)/100) | |
if not flag: # if nobody grabbed the flag -> I won. | |
flag = f' Mister {name}' | |
print(f' flag is taken by Mr.{name}\n') | |
else: # if flag is gone -> I failed. | |
failed_runners.put(f'Finished, but failed "Mr.{name}"') | |
async def runner3(name): | |
# thesame code, as in runner1 | |
global flag, failed_runners | |
for i in range(5): | |
# print(f'RUNNING Mr.{name} at {i} meters') | |
await asyncio.sleep(random.randint(1,2)/100) | |
if not flag: # if nobody grabbed the flag -> I won. | |
flag = f' Mister {name}' | |
print(f' flag is taken by Mr.{name}\n') | |
else: # if flag is gone -> I failed. | |
failed_runners.put(f'Finished, but failed "Mr.{name}"') | |
# a while-true checker + little async sleep. | |
async def checker(): | |
global flag, failed_runners | |
print('START of COMPETITION') | |
# Let the race BEGIN! | |
while not flag: | |
# let's wait for flag state more | |
await asyncio.sleep(.001) | |
# Race Finished! Let's do conclusions: | |
# show winner | |
print(' ', '-'*30, f'\n WINNER is {flag.strip()}! Congrats!') | |
# IF there are failers - show them | |
if not failed_runners.empty(): | |
print(' list of failers:') | |
while not failed_runners.empty(): | |
item = failed_runners.get() | |
if item is not None: print(f' >>> {item}') | |
else: break | |
print('\nEND OF COMPETITION') | |
print('global START\n') | |
loop = asyncio.get_event_loop() | |
# in 4 tasks in asyncs loop | |
loop.run_until_complete(asyncio.gather( | |
runner1(1), runner2(2), runner3(3) | |
,checker())) | |
loop.close() | |
print('\nglobal END') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment