Skip to content

Instantly share code, notes, and snippets.

@amingilani
Last active August 16, 2024 07:13
Show Gist options
  • Save amingilani/6da793538fa3bb1ab0024ef59a633d42 to your computer and use it in GitHub Desktop.
Save amingilani/6da793538fa3bb1ab0024ef59a633d42 to your computer and use it in GitHub Desktop.
A helper class I wrote for when I have to run data migrations.
Changing the gist name with this file

Django Batch Update

Overview

After re-inventing data migrations for several years, I decided to create a helper for myself.

This script provides an efficient solution for running batch updates on Django model records. By processing records in batches, it ensures minimal disruption and optimal performance. The tool also includes progress tracking and optional validation after updates.

N.B. Both the script and this documentation made use of generative AI for validating, restructuring, debating, and more. I have not yet tested this but will appreciate error reports.

Usage Instructions

Licensing

This code is MIT-licensed. Please attribute accordingly when using.

Sample Code

from your_module import BatchUpdate
from your_app.models import YourModel

def your_update_function(record):
    # Apply your update logic
    record.field1 = 'some_new_value'
    record.field2 = record.another_field
    return record

queryset = YourModel.objects.filter(your_conditions)
fields_to_update = ['field1', 'field2']

batch_update = BatchUpdate(
    queryset=queryset,
    record_updater=your_update_function,
    fields_to_update=fields_to_update,
    batch_size=2000,
    validation_query=queryset.filter(validation_conditions),
    write_database='default',
    read_database='replica'
)

batch_update()

Parameters

  • queryset: QuerySet

    • The initial queryset from which records will be fetched for updating.
  • record_updater: typing.Callable[[Model], Model]

    • A function that takes a record as input and returns an updated record.
  • fields_to_update: list[str]

    • A list of model field names that should be updated.
  • batch_size: int = 2000

    • The number of records to process in each batch. Default is 2000.
  • validation_query: typing.Optional[QuerySet] = None

    • An optional queryset used to validate that all records were updated correctly after the process completes.
  • write_database: typing.Optional[str] = None

    • The alias of the database where the updates should be written. Default is None, meaning the default database is used.
  • read_database: typing.Optional[str] = None

    • The alias of the database from which records should be read. Default is None, meaning the default database is used.

Deep Dive

Problem with Data Migrations

Running data migrations, especially on large datasets, can present several challenges:

  1. Database Locking: Large updates can lock tables for extended periods, leading to downtime or delayed processing.
  2. Memory Usage: Processing vast numbers of records in one go can lead to high memory consumption, potentially causing the application to crash.
  3. Transaction Size: Large transactions can overwhelm the database, leading to timeouts or rollback failures.
  4. Progress Tracking: Monitoring the progress of large migrations is difficult, creating uncertainty about the process.
  5. Validation: Ensuring all records are correctly updated after the migration is challenging.

Considerations for Optimal Data Migration

To address these challenges effectively:

  1. Batch Processing: Break down large updates into smaller batches to avoid locking issues and reduce memory usage.
  2. Error Handling: Implement robust error handling to manage partial updates and avoid inconsistent states.
  3. Progress Monitoring: Use tools like tqdm for progress tracking to maintain visibility over the migration process.
  4. Validation: After completing updates, validate changes to ensure that all records have been updated as expected.
  5. Database Management: Use separate read and write databases to distribute the load and minimize impact on live systems.

How This Script Handles These Challenges

The BatchUpdate class is designed to manage these challenges effectively:

  1. Batch Size Control: The batch size is adjustable, allowing you to control how many records are processed at a time, reducing the risk of database locks and high memory usage.
  2. Custom Record Updater: The record_updater function offers flexibility to apply custom logic to each record before saving it back to the database.
  3. Progress Reporting: The script utilizes tqdm for a visual progress bar. If tqdm is unavailable, it logs progress manually.
  4. Validation Option: After the batch update, you can provide a validation query to ensure that all records have been correctly updated. If validation fails, the process raises an error.

Conclusion

This script simplifies large-scale data migrations in Django applications by processing records in batches, providing detailed progress reporting, and including error handling and validation. It offers a flexible, efficient solution with minimal disruption to your application, particularly in environments that require distributed read and write operations.

import logging
from django.db.models import Model
from itertools import islice
import typing
from django.db.models import QuerySet
logger = logging.getLogger(__name__)
def fallback_progress_reporter(iterable: typing.Iterable, count: int) -> typing.Iterable:
"""
Fallback progress reporter function that logs progress after each iteration.
Args:
iterable: The iterable being processed.
count: The total number of items expected to process. This is used for progress logging.
Yields:
The next item from the iterable.
"""
processed_count = 0
for item in iterable:
yield item
processed_count += len(item)
logger.info(f"Processed {processed_count}/{count} records so far...")
class BackfillError(Exception):
"""
Custom exception to be raised when a backfill or batch update process fails validation.
"""
pass
class BatchUpdate:
def __init__(
self,
queryset: QuerySet,
record_updater: typing.Callable[[Model], Model],
fields_to_update: list[str],
batch_size: int = 2000,
validation_query: typing.Optional[QuerySet] = None,
write_database: typing.Optional[str] = None,
read_database: typing.Optional[str] = None,
):
"""
Initialize the batch update process.
Args:
queryset: The initial queryset from which records will be fetched for updating.
record_updater: A function that takes a record as input and returns an updated record.
fields_to_update: A list of model field names that should be updated.
batch_size: The number of records to process in each batch. Default is 2000.
validation_query: An optional queryset used to validate that all records were updated
correctly after the process completes.
write_database: The alias of the database where the updates should be written.
Default is None, meaning the default database is used.
read_database: The alias of the database from which records should be read. Default
is None, meaning the default database is used.
Raises:
None
"""
self.record_updater = record_updater
self.fields_to_update = fields_to_update
self.batch_size = batch_size
self.validation_query = validation_query
self.read_queryset = queryset
if read_database:
self.read_queryset = queryset.using(read_database)
self.write_queryset = queryset
if write_database:
self.write_queryset = queryset.using(write_database)
# Attempt to import tqdm for progress bar; use fallback if not available
try:
from tqdm import tqdm # noqa
self.progress_logger = tqdm
except ImportError:
logger.warning("tqdm not available. Progress will be logged manually.")
self.progress_logger = fallback_progress_reporter
def __call__(self) -> None:
"""
Execute the batch update process.
This method orchestrates the batch update by calculating the total number of records,
generating updated records, and applying those updates in batches. After updating,
it optionally validates that all records were updated correctly.
Args:
None
Returns:
None
Raises:
None
"""
# Pre-flight: Prepare calculations and log initial information
records_count, batch_count = self._calculate_counts(self.read_queryset, self.batch_size)
logger.info(f"Total records to update: {records_count}")
logger.info(f"Total batches to process: {batch_count}")
# End of pre-flight
# Flight: Generate updated records and write
updated_records = self._generate_updated_records(self.read_queryset, self.record_updater, self.batch_size)
for batch in self.progress_logger(updated_records, count=batch_count):
try:
self.write_queryset.bulk_update(batch, self.fields_to_update, batch_size=self.batch_size)
except Exception as e:
logger.error(f"An error occurred during batch update: {e}")
raise BackfillError("An error occurred during batch update.") from e
# End of flight
# Post-flight: Validate that all records were updated correctly
if self.validation_query is not None and self.validation_query.exists():
failure_message = "Validation failed. Some records may not have been updated correctly."
logger.error(failure_message)
raise BackfillError(failure_message)
else:
logger.info("All records were updated successfully!")
# End of post-flight
def _calculate_counts(self, queryset: QuerySet, batch_size: int) -> typing.Tuple[int, int]:
"""
Calculate the total number of records and batches.
Args:
queryset: The queryset to count the total number of records.
batch_size: The number of records per batch.
Returns:
A tuple containing the total number of records and the total number of batches.
Raises:
None
"""
records_count = queryset.count()
# Calculate the number of batches needed, ensuring any remainder
# records form an additional batch. The formula works by adding
# (batch_size - 1) to records_count, which effectively rounds up
# in cases where records_count is not an exact multiple of batch_size.
#
# Examples:
# - If records_count = 10 and batch_size = 5:
# (10 + 5 - 1) // 5
# = 14 // 5
# = 2 batches (2 full batches)
#
# - If records_count = 11 and batch_size = 5:
# (11 + 5 - 1) // 5
# = 15 // 5
# = 3 batches (2 full batches, 1 partial batch)
#
# - If records_count = 14 and batch_size = 5:
# (14 + 5 - 1) // 5
# = 18 // 5
# = 3 batches (2 full batches, 1 partial batch)
batch_count = (records_count + batch_size - 1) // batch_size
return records_count, batch_count
def _generate_updated_records(
self, queryset: QuerySet, record_updater: typing.Callable[[Model], Model], batch_size: int
) -> typing.Iterable[list]:
"""
Generate a sequence of record batches to update.
Args:
queryset: The queryset from which records are fetched and updated in memory.
batch_size: The number of records to process in each batch.
record_updater: A function that updates a record and returns the updated record.
Returns:
An iterable where each item is a list of updated records.
Raises:
None
"""
chunked_records = queryset.iterator(chunk_size=batch_size)
updated_records = (record_updater(record) for record in chunked_records)
while batch := list(islice(updated_records, batch_size)):
yield batch

MIT License

Copyright (c) 2024 SM Amin Gilani [email protected]

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment