Last active
November 8, 2022 11:26
-
-
Save fredkingham/16749e304c4e19ce944291ebaa436888 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from django.db import transaction | |
from collections import defaultdict | |
from django.utils import timezone | |
from elcid.utils import timing | |
from elcid import models | |
from django.conf import settings | |
from django.db.models import DateTimeField, DateField | |
from opal.core.fields import ForeignKeyOrFreeText | |
import csv | |
import pytds | |
from intrahospital_api.apis.prod_api import ProdApi as ProdAPI | |
from intrahospital_api import update_demographics | |
FILE_NAME = "master_file.csv" | |
def get_column_names(): | |
api = ProdAPI() | |
query = """ | |
SELECT TOP(1) * FROM VIEW_CRS_Patient_Masterfile WITH (NOLOCK) | |
""" | |
result = api.execute_hospital_query(query) | |
return result[0].keys() | |
def get_count(): | |
api = ProdAPI() | |
query = """ | |
SELECT count(*) FROM VIEW_CRS_Patient_Masterfile WITH (NOLOCK) | |
""" | |
result = api.execute_hospital_query(query) | |
return result[0][0] | |
ETHNICITY_MAPPING = { | |
"99": "Other - Not Known", | |
"A": "White - British", | |
"B": "White - Irish", | |
"C": "White - Any Other White Background", | |
"D": "Mixed - White and Black Caribbean", | |
"E": "Mixed - White and Black African", | |
"F": "Mixed - White and Asian", | |
"G": "Mixed - Any Other Mixed Background", | |
"H": "Asian or Asian British - Indian", | |
"J": "Asian or Asian British - Pakistani", | |
"K": "Asian or Asian British - Bangladeshi", | |
"L": "Asian - Any Other Asian Background", | |
"M": "Black or Black British - Caribbean", | |
"N": "Black or Black British - African", | |
"P": "Black - Any Other Black Background", | |
"R": "Other - Chinese", | |
"S": "Other - Any Other Ethnic Group", | |
"Z": "Other - Not Stated", | |
} | |
DEMOGRAPHICS_INFORMATION_MAPPING = { | |
"hospital_number": "PATIENT_NUMBER", | |
"nhs_number": "NHS_NUMBER", | |
"first_name": "FORENAME1", | |
"middle_name": "FORENAME2", | |
"surname": "SURNAME", | |
"title": "TITLE", | |
"religion": "RELIGION", | |
"marital_status": "MARITAL_STATUS", | |
"nationality": "NATIONALITY", | |
"main_language": "MAIN_LANGUAGE", | |
"date_of_birth": "DOB", | |
"sex": "SEX", | |
"ethnicity": "ETHNIC_GROUP", | |
"date_of_death": "DATE_OF_DEATH", | |
"death_indicator": "DEATH_FLAG", | |
} | |
CONTACT_INFORMATION_MAPPING = { | |
"address_line_1": "ADDRESS_LINE1", | |
"address_line_2": "ADDRESS_LINE2", | |
"address_line_3": "ADDRESS_LINE3", | |
"address_line_4": "ADDRESS_LINE4", | |
"postcode": "POSTCODE", | |
"home_telephone": "HOME_TELEPHONE", | |
"work_telephone": "WORK_TELEPHONE", | |
"mobile_telephone": "MOBILE_TELEPHONE", | |
"email": "EMAIL", | |
} | |
NOK_MAPPING = { | |
"nok_type": "NOK_TYPE", | |
"surname": "NOK_SURNAME", | |
"forename_1": "NOK_FORENAME1", | |
"forename_2": "NOK_FORENAME2", | |
"relationship": "NOK_relationship", | |
"address_1": "NOK_address1", | |
"address_2": "NOK_address2", | |
"address_3": "NOK_address3", | |
"address_4": "NOK_address4", | |
"postcode": "NOK_Postcode", | |
"home_telephone": "nok_home_telephone", | |
"work_telephone": "nok_work_telephone", | |
} | |
GP_DETAILS = { | |
"crs_gp_masterfile_id": "CRS_GP_MASTERFILE_ID", | |
"national_code": "GP_NATIONAL_CODE", | |
"practice_code": "GP_PRACTICE_CODE", | |
"title": "gp_title", | |
"initials": "GP_INITIALS", | |
"surname": "GP_SURNAME", | |
"address_1": "GP_ADDRESS1", | |
"address_2": "GP_ADDRESS2", | |
"address_3": "GP_ADDRESS3", | |
"address_4": "GP_ADDRESS4", | |
"postcode": "GP_POSTCODE", | |
"telephone": "GP_TELEPHONE", | |
} | |
MASTERFILE_META = { | |
"insert_date": "INSERT_DATE", | |
"last_updated": "LAST_UPDATED", | |
"merged": "MERGED", | |
"merge_comments": "MERGE_COMMENTS", | |
"active_inactive": "ACTIVE_INACTIVE", | |
} | |
MODEL_MAPPING = { | |
models.Demographics: DEMOGRAPHICS_INFORMATION_MAPPING, | |
models.ContactInformation: CONTACT_INFORMATION_MAPPING, | |
models.NextOfKinDetails: NOK_MAPPING, | |
models.GPDetails: GP_DETAILS, | |
models.MasterFileMeta: MASTERFILE_META, | |
} | |
def stream_result(): | |
query = """ | |
SELECT * FROM VIEW_CRS_Patient_Masterfile | |
WHERE Patient_Number is not null | |
AND Patient_Number <> '' | |
ORDER BY Patient_Number | |
""" | |
count = get_count() | |
size = 10000 | |
iterations = int(count/size) + 1 | |
with pytds.connect( | |
settings.HOSPITAL_DB["ip_address"], | |
settings.HOSPITAL_DB["database"], | |
settings.HOSPITAL_DB["username"], | |
settings.HOSPITAL_DB["password"], | |
as_dict=True | |
) as conn: | |
cnt = 0 | |
with conn.cursor() as cur: | |
cur.execute(query) | |
while True: | |
cnt += 1 | |
print(f'saving {cnt}/{iterations} at {datetime.datetime.now()}') | |
result = cur.fetchmany(size) | |
update_rows(result) | |
print(f'finished saving {cnt}/{iterations} at {datetime.datetime.now()}') | |
if not result: | |
break | |
def is_fk_or_ft(model, field): | |
return isinstance(getattr(model, field), ForeignKeyOrFreeText) | |
def is_datetime(instance, field): | |
model = instance.__class__ | |
if not isinstance(getattr(model, field), ForeignKeyOrFreeText): | |
if isinstance(model._meta.get_field(field), DateTimeField): | |
return True | |
def is_date(instance, field): | |
model = instance.__class__ | |
if not isinstance(getattr(model, field), ForeignKeyOrFreeText): | |
if isinstance(model._meta.get_field(field), DateField): | |
return True | |
def to_datetime(v): | |
formats = [ | |
"%Y-%m-%d %H:%M:%S", | |
"%Y-%m-%d %H:%M:%S.%f" | |
] | |
formats.append(settings.DATETIME_INPUT_FORMATS[0]) | |
formats.append(settings.DATE_INPUT_FORMATS[0]) | |
for format in formats: | |
try: | |
dt = datetime.datetime.strptime( | |
v, format | |
) | |
if dt: | |
break | |
except Exception: | |
if format == formats[-1]: | |
raise | |
return timezone.make_aware(dt) | |
class FKorFTCache(object): | |
related_cache = defaultdict(dict) | |
free_text_cache = defaultdict(set) | |
def set(self, instance, field, value): | |
model = instance.__class__ | |
related_model = getattr(model, field).foreign_model | |
related_api_name = related_model.get_api_name() | |
setattr(instance, f"{field}_fk_id", None) | |
setattr(instance, f"{field}_ft", None) | |
if value in self.related_cache[related_api_name]: | |
setattr(instance, f"{field}_fk_id", value) | |
elif value in self.free_text_cache[related_api_name]: | |
setattr(instance, f"{field}_ft", value) | |
else: | |
instances = related_model.objects.filter(name__iexact=value) | |
if instances.exists(): | |
self.related_cache[related_api_name][value] = instances[0].id | |
setattr(instance, f"{field}_fk_id", value) | |
else: | |
self.free_text_cache[related_api_name].add(value) | |
setattr(instance, f"{field}_ft", value) | |
def translate_demographics(row, patient_id, cache): | |
demographics = models.Demographics(patient_id=patient_id) | |
for our_field, their_key in DEMOGRAPHICS_INFORMATION_MAPPING.items(): | |
val = row[their_key] | |
if their_key == 'SEX': | |
if val == "M": | |
val = "Male" | |
elif val == "F": | |
val = "Female" | |
if their_key == 'ETHNIC_GROUP': | |
val = ETHNICITY_MAPPING.get(row["ETHNIC_GROUP"]) | |
if their_key == "DATE_OF_DEATH": | |
if not val: | |
val = None | |
if their_key == "DEATH_FLAG": | |
if val == "ALIVE": | |
val = False | |
else: | |
val = True | |
# date of death is stored as a strin upstream | |
if their_key == "DATE_OF_DEATH" and val: | |
val = datetime.datetime.strptime(val, "%d/%m/%Y").date() | |
if is_datetime(demographics, our_field) and val: | |
val = timezone.make_aware(val) | |
if is_fk_or_ft(demographics, our_field): | |
cache.set(demographics, our_field, val) | |
else: | |
setattr(demographics, our_field, val) | |
return demographics | |
def translate_model(model, row, patient_id, cache): | |
instance = model(patient_id=patient_id) | |
for our_field, their_field in MODEL_MAPPING[model].items(): | |
if is_fk_or_ft(instance, our_field): | |
cache.set(instance, our_field) | |
else: | |
setattr(instance, our_field, row[their_field]) | |
return instance | |
@timing | |
@transaction.atomic | |
def update_rows(rows): | |
demographics = list(models.Demographics.objects.filter( | |
hospital_number__in=[i["PATIENT_NUMBER"] for i in rows if i["PATIENT_NUMBER"]] | |
)) | |
hn_to_patient_ids = defaultdict(set) | |
existing_patient_ids = [] | |
demographics_to_save = [] | |
contact_details_to_save = [] | |
gp_details_to_save = [] | |
nok_details_to_save = [] | |
master_meta_to_save = [] | |
cache = FKorFTCache() | |
for d in demographics: | |
hn_to_patient_ids[d.hospital_number].add(d.patient_id) | |
existing_patient_ids.append(d.patient_id) | |
for row in rows: | |
patient_ids = list(hn_to_patient_ids.get(row["PATIENT_NUMBER"], set())) | |
for patient_id in patient_ids: | |
demographics_to_save.append(translate_demographics(row, patient_id, cache)) | |
contact_details_to_save.append(translate_model(models.ContactInformation, row, patient_id, cache)) | |
gp_details_to_save.append(translate_model(models.GPDetails, row, patient_id, cache)) | |
nok_details_to_save.append(translate_model(models.NextOfKinDetails, row, patient_id, cache)) | |
master_meta_to_save.append(translate_model(models.MasterFileMeta, row, patient_id, cache)) | |
for model in MODEL_MAPPING.keys(): | |
model.objects.filter(patient_id__in=existing_patient_ids).delete() | |
models.Demographics.objects.bulk_create(demographics_to_save) | |
models.ContactInformation.objects.bulk_create(contact_details_to_save) | |
models.GPDetails.objects.bulk_create(gp_details_to_save) | |
models.NextOfKinDetails.objects.bulk_create(nok_details_to_save) | |
models.MasterFileMeta.objects.bulk_create(master_meta_to_save) | |
@transaction.atomic | |
def delete_duplicates(): | |
from opal.models import Patient | |
from django.db.models import Count | |
qs = Patient.objects.annotate(cnt=Count('demographics')).filter(cnt__gt=1).prefetch_related( | |
'demographics_set', 'contactinformation_set', 'gpdetails_set', 'nextofkindetails_set', 'masterfilemeta_set' | |
)[:100000] | |
demographics_to_delete = [] | |
contact_information_to_delete = [] | |
gpdetails_to_delete = [] | |
next_of_kin_details_to_delete = [] | |
for i in qs: | |
demographics_to_delete.extend(list(i.demographics_set.all())[1:]) | |
contact_information_to_delete.extend(list(i.contactinformation_set.all())[1:]) | |
gpdetails_to_delete.extend(list(i.gpdetails_set.all())[1:]) | |
next_of_kin_details_to_delete.extend(list(i.nextofkindetails_set.all())[1:]) | |
models.Demographics.objects.filter(id__in=[i.id for i in demographics_to_delete]).delete() | |
print(f'Deleted {len(demographics_to_delete)} demographics') | |
models.ContactInformation.objects.filter(id__in=[i.id for i in contact_information_to_delete]).delete() | |
print(f'Deleted {len(contact_information_to_delete)} contact information') | |
models.GPDetails.objects.filter(id__in=[i.id for i in gpdetails_to_delete]).delete() | |
print(f'Deleted {len(gpdetails_to_delete)} GP Details') | |
models.NextOfKinDetails.objects.filter(id__in=[i.id for i in next_of_kin_details_to_delete]).delete() | |
print(f'Deleted {len(next_of_kin_details_to_delete)} Next of kind details') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment