Created
March 4, 2025 09:09
-
-
Save Menziess/22d8a511f61c04a8142d81510a0db04b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Stream synchronization. | |
$ pip install slipstream-async | |
The activity stream depends on the weather stream. | |
When weather goes down, activity is paused until weather recovers. | |
When it recovers, it moves back to the checkpoint offset | |
and reprocesses messages enriched with stale data. | |
""" | |
from asyncio import run, sleep | |
from datetime import datetime as dt | |
from datetime import timedelta | |
from operator import attrgetter, itemgetter | |
from typing import cast | |
from slipstream import Cache, Topic, handle, stream | |
from slipstream.checkpointing import Checkpoint, Dependency | |
from slipstream.codecs import JsonCodec | |
from slipstream.core import READ_FROM_END | |
# The data that we'll use in our example | |
weather_messages = iter([ | |
{'timestamp': dt(2023, 1, 1, 10), 'value': 'π'}, | |
{'timestamp': dt(2023, 1, 1, 11), 'value': 'β '}, | |
{'timestamp': dt(2023, 1, 1, 12), 'value': 'π¦οΈ'}, | |
{'timestamp': dt(2023, 1, 1, 13), 'value': 'π§'}, | |
]) | |
# The correct weather at the time of the event in the trailing comment: | |
activity_messages = iter([ | |
{'timestamp': dt(2023, 1, 1, 10, 30), 'value': 'swimming'}, # π | |
{'timestamp': dt(2023, 1, 1, 11, 30), 'value': 'walking home'}, # β | |
{'timestamp': dt(2023, 1, 1, 12, 30), 'value': 'shopping'}, # π¦οΈ | |
{'timestamp': dt(2023, 1, 1, 13, 10), 'value': 'lunch'}, # π§ | |
]) | |
async def async_iterable(it): | |
"""Make synchonous Iterable act like AsyncIterable.""" | |
last_ts = dt(2023, 1, 1, 9) | |
for msg in it: | |
ts = msg['timestamp'] | |
await sleep((ts - last_ts).total_seconds() / 3600) | |
last_ts = ts | |
yield msg | |
# Converting regular iterables into async ones | |
weather_stream = async_iterable(weather_messages) | |
activity_stream = async_iterable(activity_messages) | |
# Defining activity topic and relevant caches | |
activity = Topic('activity', { | |
'bootstrap_servers': 'localhost:29091', | |
'auto_offset_reset': 'earliest', | |
'group_instance_id': 'activity', | |
'group_id': 'activity', | |
}, codec=JsonCodec(), offset=READ_FROM_END) | |
checkpoints_cache = Cache('state/checkpoints', target_table_size=1024) | |
weather_cache = Cache('state/weather') | |
async def downtime_callback(c: Checkpoint, d: Dependency) -> None: | |
print('\tThe stream is automatically paused.') | |
async def recovery_callback(c: Checkpoint, d: Dependency) -> None: | |
offset = cast(dict[str, int], d.checkpoint_state) | |
print( | |
'\tDowntime resolved, ' | |
f'going back to offset {offset} for reprocessing.' | |
) | |
await activity.seek({ | |
int(p): o for p, o in offset.items() | |
}) | |
# Defining checkpoint to track dependency downtimes | |
checkpoint = Checkpoint( | |
'activity', | |
dependent=activity, | |
dependencies=[Dependency( | |
'weather_stream', | |
weather_stream, | |
downtime_threshold=timedelta(hours=1) | |
)], | |
downtime_callback=downtime_callback, | |
recovery_callback=recovery_callback, | |
cache=checkpoints_cache | |
) | |
@handle(weather_stream, sink=[weather_cache, print]) | |
async def handle_weather(val): | |
"""Process weather message.""" | |
ts, weather = itemgetter('timestamp', 'value')(val) | |
await checkpoint.heartbeat(ts) | |
unix_ts = ts.timestamp() | |
yield unix_ts, weather | |
# Causing downtime op purpose >:) | |
if weather == 'β ': | |
print('\tkilling weather stream on purpose >:)') | |
await sleep(5) | |
print('\t0,o (weather stream up again)') | |
@handle(activity_stream, sink=[activity]) | |
def producer(val): | |
"""Send data to activity topic.""" | |
yield None, val | |
@handle(activity, sink=[print]) | |
async def handle_activity(msg): | |
"""Process activity message.""" | |
partition, offset = attrgetter('partition', 'offset')(msg) | |
ts, activity = itemgetter('timestamp', 'value')(msg.value) | |
ts = dt.strptime(ts, '%Y-%m-%d %H:%M:%S') | |
unix_ts = ts.timestamp() | |
# Handle weather stream downtime | |
if downtime := await checkpoint.check_pulse(ts, **{ | |
str(partition): offset | |
}): | |
print( | |
f'\tDowntime detected: {downtime}, ' | |
'(could cause faulty enrichment)' | |
) | |
# Find the latest weather message as of activity time | |
for weather in weather_cache.values(backwards=True, from_key=unix_ts): | |
yield f'>>> The weather during {activity} was {weather}' | |
return | |
yield activity, '?' | |
if __name__ == '__main__': | |
run(stream()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output:
After going back to offset 494 for partition 0, it reprocesses the "shopping" activity.
It was wrongly enriched with β because the weather stream was down and the data was stale.
It was then corrected with π¦οΈ after the stream recovered.