Skip to content

Instantly share code, notes, and snippets.

@geobabbler
Created February 20, 2025 19:29
Show Gist options
  • Save geobabbler/7042e4dcd460dc9e82c447e1cdc840bf to your computer and use it in GitHub Desktop.
Save geobabbler/7042e4dcd460dc9e82c447e1cdc840bf to your computer and use it in GitHub Desktop.
GeoPandas FGDB export for Databricks
import geopandas as gpd
from shapely.geometry import Point
import pandas as pd
from datetime import datetime
import os
import shutil
import zipfile
%pip install geopandas shapely geojsonio
def export_delta_to_filegdb(spark, delta_table_name, output_path):
"""
Export a Delta table with X (longitude), Y (latitude), and survey_date columns to a File Geodatabase.
Parameters:
spark: SparkSession object
delta_table_name: str, name of the Delta table
output_path: str, DBFS path where the File Geodatabase will be saved
Returns:
None
"""
# Read the Delta table into a Spark DataFrame
df_spark = spark.table(delta_table_name)
# Convert to Pandas DataFrame
df_pandas = df_spark.toPandas()
print(f"Total records to process: {len(df_pandas)}")
# Convert x, y, z columns to string format if they exist
for col in ['x', 'y', 'z']:
if col in df_pandas.columns:
df_pandas[col] = df_pandas[col].astype(str)
# Create geometry column from X and Y coordinates
geometry = [Point(xy) for xy in zip(df_pandas['x'], df_pandas['y'])]
# Use all columns in the DataFrame
attribute_data = df_pandas.copy()
# Create GeoDataFrame with all attribute data
gdf = gpd.GeoDataFrame(
attribute_data,
geometry=geometry,
crs="EPSG:4326"
)
# Create a temporary local path for the shapefile
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
local_temp_path = f"/tmp/gdb_{timestamp}"
# Create the directory if it does not exist
os.makedirs(local_temp_path, exist_ok=True)
# Create File Geodatabase
gdb_filename = "exported_data.gdb"
gdf.to_file(local_temp_path + '/' + gdb_filename, driver="OpenFileGDB")
print(f"Exported File Geodatabase to: {local_temp_path}/{gdb_filename}")
# Create a ZIP archive of the shapefile directory
zip_filename = f"gdb_{timestamp}.zip"
zip_filepath = f"/tmp/{zip_filename}"
with zipfile.ZipFile(zip_filepath, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(local_temp_path):
for file in files:
file_path = os.path.join(root, file)
zipf.write(file_path, os.path.relpath(file_path, local_temp_path))
print(f"Created ZIP archive: {zip_filepath}")
# Copy ZIP file to DBFS
dbfs_zip_path = f"{output_path}/{zip_filename}".replace('/dbfs/', '')
dbutils.fs.cp(f"file:{zip_filepath}", dbfs_zip_path)
print(f"Copied ZIP file to: {output_path}/{zip_filename}")
if __name__ == "__main__":
# Make sure you have an active SparkSession
delta_table_name = "database.table"
output_path = "/dbfs/mnt/object/storage"
export_delta_to_filegdb(spark, delta_table_name, output_path)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment