Created
August 23, 2018 14:35
-
-
Save hartym/51ed4681abeaf80026ebdd2dfb602e5d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
import bonobo | |
from bonobo.config import use_context_processor | |
def rolling_deque(self, context): | |
buffer = yield deque() | |
while len(buffer): | |
retval = buffer.popleft() | |
context.send(list(buffer), retval, None, None) | |
@use_context_processor(rolling_deque) | |
def transform(buffer: deque, value): | |
buffer.append(value) | |
if len(buffer) > 3: | |
retval = buffer.popleft() | |
yield list(buffer), retval, value, retval + value | |
def get_graph(**options): | |
graph = bonobo.Graph() | |
graph.get_cursor() >> range(10) >> transform >> print | |
return graph | |
# The __main__ block actually execute the graph. | |
if __name__ == "__main__": | |
parser = bonobo.get_argument_parser() | |
with bonobo.parse_args(parser) as options: | |
bonobo.run(get_graph(**options)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment