Created
April 11, 2019 08:44
-
-
Save daidokoro/437683f2ba671f77aed8132a58632b2b to your computer and use it in GitHub Desktop.
Beam Stuff
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Apache Beam Python Batch Example | |
''' | |
class Batch(beam.DoFn): | |
""" | |
Batch - batch items from PCollection | |
""" | |
def __init__(self, n): | |
import json | |
# batch values | |
self._n = n | |
self._batch = [] | |
def process(self, e): | |
""" | |
if length of the batch is == n, | |
then run an action, else, append | |
to the batch | |
""" | |
self._batch.append(e) | |
if len(self._batch) == self._n: | |
for e in self._action(self._batch): | |
# yield individual results from self._action | |
# or you could also just yield the | |
# batch | |
yield e | |
# reest batch | |
self._batch = [] | |
def _action(self, elems): | |
''' | |
some actions | |
------------ | |
eg. | |
update the batch | |
push to a db | |
batch api call, etc | |
''' | |
print("batch size: %s" % len(elems)) | |
return [ x for x in elems ] | |
def finish_bundle(self): | |
''' | |
If the finish_bundle function is defined, | |
it will be executed once all collections to this | |
Step are completed, i.e you use it to handle the remaining | |
items in your batch. | |
''' | |
# window definition is required when returning from | |
# finish_bundle | |
from apache_beam.utils.windowed_value import WindowedValue | |
from apache_beam import window | |
if len(self._batch) != 0: | |
for e in self._classify(self._batch): | |
# yield e(elem) or the whole batch, | |
# however, it needs to be wrapped in the | |
# WindowedValue function | |
yield WindowedValue(e, -1, [window.GlobalWindow()]) | |
self._batch = [] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment