Skip to content

Instantly share code, notes, and snippets.

@johngrimes
Last active June 1, 2025 00:27
Show Gist options
  • Save johngrimes/1aacedb70d8b509553a0d0ef02a12677 to your computer and use it in GitHub Desktop.
Save johngrimes/1aacedb70d8b509553a0d0ef02a12677 to your computer and use it in GitHub Desktop.
Streaming SQL on FHIR example with Pathling and Kafka
#!/usr/bin/env python3
"""
Consumer module for processing FHIR resources from Kafka with ETL pipeline.
This module defines constants for the base directory and Spark checkpoint
directory, and implements functions to create Spark sessions, define FHIR
resource views, and start a Kafka consumer to process and persist FHIR data in
PostgreSQL.
"""
import os
from pathlib import Path
import click
from pathling import DataSource, PathlingContext
from pathling._version import (
__java_version__,
__scala_version__,
)
from pyspark import __version__ as __spark_version__
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import explode, from_json
BASE_DIR = Path(__file__).parent.parent.resolve()
TARGET_DIR = BASE_DIR / "target"
SPARK_CHECKPOINT_DIR = TARGET_DIR / "checkpoints"
def _get_or_create_spark() -> SparkSession:
"""
Create or retrieve a Spark session configured for FHIR data processing.
This function sets up the Spark environment with necessary dependencies,
including Kafka connectivity, Pathling, and PostgreSQL support.
"""
os.environ["SPARK_CONF_DIR"] = str(BASE_DIR / "conf" / "spark-conf")
spark_builder = SparkSession.builder.config(
"spark.jars.packages",
f"org.apache.spark:spark-sql-kafka-0-10_{__scala_version__}:{__spark_version__},"
f"au.csiro.pathling:library-runtime:{__java_version__},"
f"org.postgresql:postgresql:42.2.18",
).config("spark.sql.streaming.checkpointLocation", SPARK_CHECKPOINT_DIR)
return spark_builder.getOrCreate()
def view_patient(data: DataSource) -> DataFrame:
"""
Create a view of Patient resources with essential demographic information.
This view extracts the patient identifier and gender from the FHIR Patient
resource.
"""
return data.view(
"Patient",
select=[
{
"column": [
{
"description": "Patient ID",
"path": "getResourceKey()",
"name": "id",
"type": "string",
"collection": "false",
},
{
"description": "Gender",
"path": "gender",
"name": "gender",
"type": "code",
"collection": "false",
},
],
},
],
)
def view_diagnosis(data: DataSource) -> DataFrame:
"""
Create a view of Condition resources.
This view captures condition identifiers, patient references, SNOMED CT
codes, and specifically flags viral infections using a query to the
terminology server.
"""
return data.view(
"Condition",
select=[
{
"column": [
{
"description": "Condition ID",
"path": "getResourceKey()",
"name": "id",
"type": "string",
"collection": "false",
},
{
"description": "Patient ID",
"path": "subject.getReferenceKey()",
"name": "patient_id",
"type": "string",
"collection": "false",
},
{
"description": "SNOMED CT diagnosis code",
"path": "code.coding.where(system = 'http://snomed.info/sct').code",
"name": "sct_id",
"type": "code",
"collection": "false",
},
{
"decription": "SNOMED CT diagnosis display",
"path": "code.coding.where(system = 'http://snomed.info/sct').display()",
"name": "sct_display",
"type": "string",
"collection": "false",
},
{
"description": "ICD 10-AM diagnosis code",
"path": "code.translate('http://aehrc.com/fhir/ConceptMap/aehrc-snomap-starter', false, 'wider').first().code", # noqa: E501
"name": "icd10am_code",
"type": "code",
"collection": "false",
},
{
"description": "ICD 10-AM diagnosis display",
"path": "code.translate('http://aehrc.com/fhir/ConceptMap/aehrc-snomap-starter', false, 'wider').first().display()", # noqa: E501
"name": "icd10am_display",
"type": "string",
"collection": "false",
},
{
"description": "Viral infection",
"path": "code.subsumedBy(http://snomed.info/sct|34014006 combine http://snomed.info/sct|438508001)", # noqa: E501
"name": "viral_infection",
"type": "boolean",
"collection": "false",
},
],
},
],
)
def view_encounter(data: DataSource) -> DataFrame:
"""
Create a comprehensive view of Encounter resources with visit details.
This view includes encounter timing, service provider, visit type,
arrival mode, departure status, clinical priority, and more.
"""
return data.view(
"Encounter",
select=[
{
"column": [
{
"description": "Encounter ID",
"path": "getResourceKey()",
"name": "id",
"type": "string",
"collection": "false",
},
{
"description": "Patient ID",
"path": "subject.getReferenceKey()",
"name": "patient_id",
"type": "string",
"collection": "false",
},
{
"description": "Encounter start date",
"path": "period.start",
"name": "start_time",
"type": "dateTime",
"collection": "false",
},
{
"description": "Encounter end date",
"path": "period.end",
"name": "end_time",
"type": "dateTime",
"collection": "false",
},
{
"description": "Encounter service provider",
"path": "serviceProvider.getReferenceKey()",
"name": "service_provider",
"type": "string",
"collection": "false",
},
],
"select": [
{
"forEachOrNull": "type.coding.where(system = 'http://occio.qh/data/typeofvisit')",
"column": [
{
"description": "Type of visit code",
"path": "code",
"name": "type_of_visit_code",
"type": "code",
"collection": "false",
},
{
"description": "Type of visit description",
"path": "display",
"name": "type_of_visit_desc",
"type": "string",
"collection": "false",
},
],
},
{
"forEachOrNull": "type.coding.where(system = 'http://occio.qh/data/modeofarrival')",
"column": [
{
"description": "Mode of arrival code",
"path": "code",
"name": "mode_of_arrival_code",
"type": "code",
"collection": "false",
},
{
"description": "Mode of arrival description",
"path": "display",
"name": "mode_of_arrival_desc",
"type": "string",
"collection": "false",
},
],
},
{
"forEachOrNull": "type.coding.where(system = 'http://occio.qh/data/departurestatus')",
"column": [
{
"description": "Departure status code",
"path": "code",
"name": "departure_status_code",
"type": "code",
"collection": "false",
},
{
"description": "Departure status description",
"path": "display",
"name": "departure_status_desc",
"type": "string",
"collection": "false",
},
],
},
{
"forEachOrNull": "priority.coding.where(system = 'http://occio.qh/data/ats')",
"column": [
{
"description": "ATS code",
"path": "code",
"name": "ats_code",
"type": "code",
"collection": "false",
},
],
},
{
"forEachOrNull": "reasonCode.coding.where(system = 'http://occio.qh/data/presentingproblem')",
"column": [
{
"description": "Presenting problem code",
"path": "code",
"name": "presenting_problem_code",
"type": "code",
"collection": "false",
},
{
"description": "Presenting problem description", # noqa: E501
"path": "display",
"name": "presenting_problem_desc",
"type": "string",
"collection": "false",
},
],
},
{
"forEachOrNull": "diagnosis.where(use.coding.exists("
"system = 'Admission diagnosis'))",
"column": [
{
"description": "Admission diagnosis ID",
"path": "condition.getReferenceKey()",
"name": "admission_diagnosis_id",
"type": "string",
"collection": "false",
},
],
},
],
},
],
)
@click.command()
@click.option("--kafka-topic", help="Kafka topic to subscribe to")
@click.option("--kafka-bootstrap-servers", help="Kafka bootstrap servers")
@click.option("--db-name", help="Database name to write to")
@click.option("--schema", help="Database schema to use")
@click.option("--host", help="Database host")
@click.option("--user", help="Database user")
@click.option("--password", help="Database password")
def start_consumer(
kafka_topic: str,
kafka_bootstrap_servers: str,
db_name: str,
schema: str,
host: str,
user: str,
password: str,
) -> None:
"""
Start the main consumer process for FHIR resource ETL from Kafka.
This function orchestrates the entire ETL pipeline, from reading Kafka
messages to transforming FHIR resources and persisting them in PostgreSQL.
"""
def _subscribe_to_kafka_topic() -> DataFrame:
"""
Subscribe to the specified Kafka topic as a streaming source.
Configure the Spark streaming reader to consume messages from the
earliest offset available.
"""
return (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", kafka_bootstrap_servers)
.option("subscribe", kafka_topic)
.option("startingOffsets", "earliest")
.load()
)
def _to_resource_stream(
kafka_stream: DataFrame,
resource_type: str,
) -> DataFrame:
"""
Convert raw Kafka stream into a typed FHIR resource stream.
Extract FHIR resources from the bundle structure, filter by resource
type, and encode them with Pathling.
"""
json_stream = (
kafka_stream.selectExpr("CAST(value AS STRING) AS bundle")
.select(
explode(
from_json(
"bundle",
"STRUCT<entry:ARRAY<STRUCT<resource:STRING>>>",
).entry.resource,
).alias("resource"),
)
.filter(
from_json(
"resource",
"STRUCT<resourceType:STRING>",
).resourceType
== resource_type,
)
)
return pc.encode(json_stream, resource_type)
def write_postgresql(
df: DataFrame,
db_name: str,
schema: str,
view_name: str,
) -> None:
"""
Persist a DataFrame to PostgreSQL using an upsert strategy.
Insert new records and update existing ones while maintaining
consistency through primary keys.
"""
import psycopg2
columns = df.columns
insert_columns = ", ".join(columns)
insert_values = ", ".join(["%s"] * len(columns))
# Exclude 'id' from the update set to avoid updating the primary key.
update_set = ", ".join(
[f"{col} = EXCLUDED.{col}" for col in columns if col != "id"],
)
sql = f"""
INSERT INTO {schema}.{view_name} ({insert_columns})
VALUES ({insert_values})
ON CONFLICT (id) DO UPDATE SET {update_set}
""" # noqa: S608
def upsert_partition(partition: DataFrame) -> None:
"""Upsert records for a single partition into PostgreSQL."""
conn = psycopg2.connect(
host=host,
database=db_name,
user=user,
password=password,
)
cursor = conn.cursor()
data = list(partition)
if data:
cursor.executemany(sql, data)
conn.commit()
cursor.close()
conn.close()
# Apply the upsert function to each partition.
df.foreachPartition(upsert_partition)
click.echo(
f"Starting kafka listener on topic: {kafka_topic} at: "
f"{kafka_bootstrap_servers}",
)
click.echo(f"Writing to database: {db_name}")
spark = _get_or_create_spark()
pc = PathlingContext.create(
spark,
terminology_server_url="http://velonto-ontoserver-service/fhir",
)
spark.sparkContext.setLogLevel("ERROR")
update_stream = _subscribe_to_kafka_topic()
data = pc.read.datasets(
{
resource_type: _to_resource_stream(update_stream, resource_type)
for resource_type in ["Patient", "Encounter", "Condition"]
},
)
all_views = [view_patient, view_encounter, view_diagnosis]
console_sinks = []
postgresql_sinks = []
for view_f in all_views:
view_name = view_f.__name__
view_data = view_f(data)
# Console sink.
console_sink = (
view_data.writeStream.outputMode("append")
.format("console")
.start(f"console_{view_name}")
)
console_sinks.append(console_sink)
# PostgreSQL sink.
postgresql_sink = (
view_data.writeStream.foreachBatch(
lambda df, _, view_name=view_name: write_postgresql(
df,
db_name,
schema,
view_name,
),
)
.outputMode("append")
.start()
)
postgresql_sinks.append(postgresql_sink)
for sink in console_sinks + postgresql_sinks:
# Block until the streaming queries terminate.
sink.awaitTermination()
if __name__ == "__main__":
start_consumer()
View raw

(Sorry about that, but we can’t show files that are this big right now.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment