Last active
March 4, 2025 09:07
-
-
Save Menziess/1a450d06851cbd00292b2a99c77cc854 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Stream synchronization. | |
$ pip install slipstream-async | |
The first stream depends on second, which depends on third. | |
And third depends on second. | |
1 <- 2 <- 3 | |
3 <- 2 | |
When third goes down, second will pause, after which first will pause. | |
When third and second catch up and recover, first will recover. | |
If second goes down, it pauses first and also pauses third. | |
""" | |
from asyncio import run, sleep | |
from datetime import UTC, timedelta | |
from datetime import datetime as dt | |
from operator import itemgetter | |
from slipstream import handle, stream | |
from slipstream.caching import Cache | |
from slipstream.checkpointing import Checkpoint, Dependency | |
async def emoji(): | |
offset = 0 | |
ts = timestamp | |
while True: | |
for emoji in 'ππππ': | |
yield {'offset': offset, 'emoji': emoji, 'timestamp': ts} | |
offset += 1 | |
ts += timedelta(seconds=1) | |
await sleep(0.2) | |
first = emoji() | |
second = emoji() | |
third = emoji() | |
async def recovery_callback(c: Checkpoint, d: Dependency) -> None: | |
print( | |
f'Downtime resolved (in {d.name} stream), resuming {c.name}' | |
) | |
emoji_cache = Cache('state/emoji') | |
checkpoints_cache = Cache('state/checkpoints', target_table_size=1024) | |
first_checkpoint = Checkpoint( | |
'first', | |
dependent=first, | |
dependencies=[Dependency( | |
'second', | |
second, | |
downtime_threshold=timedelta(seconds=3) | |
)], | |
cache=checkpoints_cache, | |
recovery_callback=recovery_callback, | |
) | |
second_checkpoint = Checkpoint( | |
'second', | |
dependent=second, | |
dependencies=[Dependency( | |
'third', | |
third, | |
downtime_threshold=timedelta(seconds=3) | |
)], | |
cache=checkpoints_cache, | |
recovery_callback=recovery_callback, | |
) | |
third_checkpoint = Checkpoint( | |
'third', | |
dependent=third, | |
dependencies=[Dependency( | |
'second', | |
second, | |
downtime_threshold=timedelta(seconds=3) | |
)], | |
cache=checkpoints_cache, | |
recovery_callback=recovery_callback, | |
) | |
timestamp = dt.now(tz=UTC).replace(microsecond=0) | |
@handle(first, sink=[print]) | |
async def first_handler(val): | |
"""Dependent stream that handles emoji messages. | |
(0, 'π') | |
(1, 'π') | |
(2, 'π') | |
(3, 'π') | |
""" | |
offset, emoji, ts = itemgetter('offset', 'emoji', 'timestamp')(val) | |
if downtime := await first_checkpoint.check_pulse(ts, offset=offset): | |
print( | |
f'Downtime detected (in second stream): {downtime}, ' | |
'pausing first' | |
) | |
yield ts.timestamp(), emoji + ' (first)' | |
@handle(second, sink=[emoji_cache, print]) | |
async def second_handler(val): | |
"""Dependency stream that also handles emoji messages. | |
(0, 'π') | |
(1, 'π') | |
(2, 'π') | |
(3, 'π') | |
""" | |
offset, emoji, ts = itemgetter('offset', 'emoji', 'timestamp')(val) | |
await first_checkpoint.heartbeat(ts, 'second') | |
await third_checkpoint.heartbeat(ts, 'second') | |
if downtime := await second_checkpoint.check_pulse(ts, offset=offset): | |
print( | |
f'Downtime detected (in third stream): {downtime}, ' | |
'pausing second' | |
) | |
yield ts.timestamp(), emoji + ' (second)' | |
# Causing downtime op purpose >:) | |
if offset == 17: | |
print('>>> ZZZZzzzzzZZZZzzzzz (second stream down)') | |
await sleep(3) | |
print('>>> 0,o (second stream up again)') | |
@handle(third, sink=[print]) | |
async def third_handler(val): | |
"""Dependency stream that also handles emoji messages. | |
(0, 'π') | |
(1, 'π') | |
(2, 'π') | |
(3, 'π') | |
""" | |
offset, emoji, ts = itemgetter('offset', 'emoji', 'timestamp')(val) | |
await second_checkpoint.heartbeat(ts, 'third') | |
yield ts.timestamp(), emoji + ' (third)' | |
if downtime := await third_checkpoint.check_pulse(ts, offset=offset): | |
print( | |
f'Downtime detected (in second stream): {downtime}, ' | |
'pausing third' | |
) | |
# Causing downtime op purpose >:) | |
if offset == 5: | |
print('>>> ZZZZzzzzzZZZZzzzzz (third stream down)') | |
await sleep(5) | |
print('>>> 0,o (third stream up again)') | |
if __name__ == '__main__': | |
run(stream()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output: