Created
July 1, 2020 23:47
-
-
Save fuzzy-focus/2e92919eca5d9f07c1bc902b381ce08d to your computer and use it in GitHub Desktop.
Test Asyncio in conjunction with threading
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import threading | |
import queue | |
import itertools | |
import time | |
class Bus(threading.Thread): | |
def __init__(self, eq, bq): | |
super().__init__(daemon=True) | |
self.q = bq | |
self.eq = eq | |
self.val1 = 0 | |
self._val2 = itertools.cycle(range(10)) | |
@property | |
def val2(self): | |
return next(self._val2) | |
def run(self): | |
while True: | |
ind, msg, data = self.q.get() | |
print(f"BUS {ind: <3} msg = {msg}, data = {data}") | |
time.sleep(.1) | |
if msg == "read": | |
self.eq.put((ind, "read_resp", getattr(self, data))) | |
elif msg == "write": | |
self.val1 = data | |
time.sleep(.01) | |
class Entity(threading.Thread): | |
def __init__(self, eq, h): | |
super().__init__(daemon=True) | |
self.q = eq | |
self.h = h | |
def run(self): | |
while True: | |
ind, msg, data = self.q.get() | |
print(f"ENT {ind: <3} msg = {msg}, data = {data}") | |
time.sleep(.1) | |
if msg == "read": | |
print("?!?") | |
elif msg == "read_resp": | |
self.h.handle_read_resp(ind, msg, data) | |
elif msg == "write": | |
print("?!?") | |
class Data: | |
def __init__(self, h): | |
self.h = h | |
@property | |
def val1(self): | |
return self.h.handle_read("val1") | |
@val1.setter | |
def val1(self, value): | |
self.h.handle_write(value) | |
@property | |
def val2(self): | |
return self.h.handle_read("val2") | |
@val2.setter | |
def val2(self, value): | |
raise AttributeError() | |
class Handler: | |
def __init__(self, q): | |
self.futures = {} | |
self.q = q | |
self._i = 0 | |
@property | |
def i(self): | |
i = self._i | |
self._i +=1 | |
return i | |
def handle_read(self, data): | |
i = self.i | |
f = asyncio.Future() | |
self.futures[i] = f | |
self.q.put((i, "read", data)) | |
return f | |
def handle_read_resp(self, ind, msg, data): | |
print(f"HDL: {ind: <3} msg = {msg}, data = {data}, fut={self.futures}") | |
f = self.futures.pop(ind) | |
print(f) | |
f.set_result(data) | |
print(f) | |
def handle_write(self, data): | |
self.q.put((self.i, "write", data)) | |
async def sim(data): | |
while True: | |
print("writing val1") | |
data.val1 = 5 | |
print("trying to read val1") | |
v1 = data.val1 | |
print(f"reading v1: {v1}") | |
v1 = await asyncio.wait_for(v1, 1) | |
print(f"successfully read v1 = {v1}") | |
await asyncio.sleep(1) | |
print("read multiple") | |
futures = [data.val2 for _ in range(3)] | |
print("waiting for them to complete") | |
await asyncio.sleep(1) | |
vals = await asyncio.gather(*futures) | |
print(f"returned {vals}") | |
await asyncio.sleep(1) | |
def main(): | |
eq = queue.Queue() | |
bq = queue.Queue() | |
h = Handler(bq) | |
ent = Entity(eq,h) | |
bus = Bus(eq, bq) | |
data = Data(h) | |
bus.start() | |
ent.start() | |
ev = threading.Event() | |
try: | |
print("running all the threads") | |
asyncio.run(sim(data)) | |
ev.wait() | |
except (KeyboardInterrupt, SystemExit): | |
print("\nbye") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment