Skip to content

Instantly share code, notes, and snippets.

@brianrodri
Last active June 27, 2021 12:35
Show Gist options
  • Save brianrodri/05e1112ecac73d30397aa1970febe5e1 to your computer and use it in GitHub Desktop.
Save brianrodri/05e1112ecac73d30397aa1970febe5e1 to your computer and use it in GitHub Desktop.
# FILE: jobs/blog_validation_jobs.py
from jobs import base_jobs
from jobs.io import ndb_io
from jobs.types import blog_validation_errors
import apache_beam as beam
class BlogUniquenessJob(base_jobs.JobBase):
def run(self):
blog_model_pcoll = (
self.pipeline
| 'Get every Blog Model' >> (
ndb_io.GetModels(blog_models.BlogPostModel.query()))
)
blogs_with_duplicate_titles = (
blog_model_pcoll
| 'Generate (title, model) key value pairs' >> (
beam.WithKeys(lambda blog_model: blog_model.title))
| 'Group pairs by their title' >> beam.GroupByKey()
| 'Discard keys' >> beam.Values()
| 'Discard models with unique titles' >> (
beam.Filter(lambda models: len(models) > 1))
)
return (
blogs_with_duplicate_titles
| 'Flatten models into a list of errors' >> beam.FlatMap(
lambda models: [
blog_validation_errors.DuplicateBlogTitleError(model)
for model in models
])
)
# FILE: jobs/types/blog_validation_errors.py
from jobs.types import base_validation_errors
import utils
class DuplicateBlogTitleError(base_validation_errors.BaseAuditError):
"""Error class for blogs with duplicate titles."""
def __init__(self, model):
message = 'title=%s is not unique' % utils.quoted(model.title)
super(DuplicateBlogTitleError, self).__init__(message, model)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment