Skip to content

Instantly share code, notes, and snippets.

@brianrodri
Last active January 15, 2021 01:27
Show Gist options
  • Save brianrodri/b6ae5df1bd4d518592dd9b138b0faa35 to your computer and use it in GitHub Desktop.
Save brianrodri/b6ae5df1bd4d518592dd9b138b0faa35 to your computer and use it in GitHub Desktop.
LATEST_VERSION_SENTINAL = object()
MIGRATION_FUNCTIONS_REGISTRY = {
exp_models.ExplorationModel: {
1: exp_services.migrate_schema_v1_to_v2,
2: exp_services.migrate_schema_v2_to_v3,
3: LATEST_VERSION_SENTINAL,
},
}
class SchemaMigrationTransform(beam.PTransform):
"""Transforms the input models into their latest schema version.
Example:
p = (
beam_jobs.GetModels(exp_models.ExplorationModel)
| beam_jobs.SchemaMigrationTransform()
| beam_jobs.PutModels())
"""
def expand(self, model_pipe):
"""Recursively updates models and returns pipeline with fully migrated versions."""
outputs = model_pipe | beam.FlatMap(self._migrate_model).with_outputs()
return [outputs.latest, outputs.needs_work | self] | beam.Flatten()
def _migrate_model(self, model):
"""Migrates schema if applicable and returns tagged result ('latest' or 'needs_work')."""
all_version_handlers = MIGRATION_FUNCTIONS_REGISTRY.get(type(model), None)
if all_version_handlers is None:
raise Exception
version_handler = all_version_handlers.get(model.schema_version, None)
if version_handler is None:
raise Exception
if version_handler is LATEST_VERSION_SENTINAL:
yield beam.pvalue.TaggedOutput('latest', model)
else:
model = model.clone()
version_handler(model)
model.schema_version += 1
yield beam.pvalue.TaggedOutput('needs_work', model)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment