Created
November 18, 2021 11:15
-
-
Save yai333/5d51a7a42a4cc75cba600fcac0b4364f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{"cells":[{"cell_type":"markdown","source":["<b>Mount a Blob storage container </b>"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"7bcfeb9d-2998-4d63-92c6-e95c0029592f"}}},{"cell_type":"code","source":["dbutils.widgets.text(\"inputPath\", \"\",\"\")\ninput_path = dbutils.widgets.get(\"inputPath\")\n\n \nmnt_path = \"/mnt/raw/calendars\" \ndelta_table_path = \"/mnt/delta/calendars\"\n\nif any(mount.mountPoint == mnt_path for mount in dbutils.fs.mounts()):\n dbutils.fs.unmount(mnt_path)\n\ndbutils.fs.mount(\n source = \"wasbs://[email protected]\",\n mount_point = mnt_path,\n extra_configs = {\"fs.azure.sas.demo.demoStorage.blob.core.windows.net\":dbutils.secrets.get(scope = \"demoBlogStorageScope\", key = \"blobStorageSAS\")}\n)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"039e1e1c-f62f-419b-8739-e327a8413d86"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>"]},"transient":null}],"execution_count":0},{"cell_type":"markdown","source":["<b>Transform calendar view data</b>"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"53bc5997-61dc-484a-9514-ae438f60f1f6"}}},{"cell_type":"code","source":["import re\nfrom pyspark.sql.functions import col, udf\n \ndef exists(obj, chain):\n _key = chain.pop(0)\n if _key in obj:\n return exists(obj[_key], chain) if chain else obj[_key]\n return None\n\ndef extractEmails(attendees):\n if attendees and len(attendees)>0:\n return \",\".join([x for x in [exists(attendee, [\"emailAddress\",\"address\"]) for attendee in attendees] if x is not None])\n return \"\"\n \ndef extractUTCDate(date_col):\n return exists(date_col, [\"dateTime\"])\n \n\ndef getLatestTableVersion():\n df_version = spark.sql(f\"SELECT max(version) as lastVersion FROM (DESCRIBE HISTORY delta.`{delta_table_path}`)\")\n return df_version.head()[0]\n\n\ngetEmailsUDF = udf(lambda z: extractEmails(z))\ngetDateUDF = udf(lambda z: extractUTCDate(z))\n\n"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"01266d4d-85d4-46f0-a132-f039e8ddf99a"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>"]},"transient":null}],"execution_count":0},{"cell_type":"code","source":["df = spark.read.json(f\"{mnt_path}/{input_path}\")\n\n\ndf_calendar = df.select(getEmailsUDF(col(\"attendees\")).alias(\"emails\"),\\\n getDateUDF(col(\"start\")).alias(\"startDatetime\"), \\\n getDateUDF(col(\"end\")).alias(\"endDatetime\"), \\\n col(\"iCalUId\"),\n col(\"isCancelled\"))\\\n .dropDuplicates()\ndf_calendar.show(5, False)\n"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"e0011935-537f-49be-ae10-22b8935cf56f"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>"]},"transient":null}],"execution_count":0},{"cell_type":"markdown","source":["<b>Create delta table (SCD Type 1) with Change Data Feed enabled, then merge new extracted data to delta table</b>"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"3aa00f7f-4396-48ae-98e7-a727594a9452"}}},{"cell_type":"code","source":["from delta.tables import *\n\nif DeltaTable.isDeltaTable(spark, delta_table_path) == False:\n spark.sql(f\"CREATE TABLE delta.`{delta_table_path}` (iCalUId STRING, emails STRING, startDatetime STRING, endDatetime STRING, isCancelled Boolean) TBLPROPERTIES (delta.enableChangeDataFeed = true)\")\n \ndeltaTable = DeltaTable.forPath(spark, delta_table_path) \ndeltaTable.alias(\"calendar\").merge(df_calendar.alias(\"updates\"),\"calendar.iCalUId = updates.iCalUId\")\\\n .whenMatchedDelete(condition=\"updates.isCancelled = true\")\\\n .whenMatchedUpdateAll(condition=\"calendar.emails <> updates.emails OR calendar.startDatetime <> updates.startDatetime OR calendar.endDatetime <> updates.endDatetime\")\\\n .whenNotMatchedInsertAll(condition=\"updates.isCancelled = false\")\\\n .execute()"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"913a1b7b-680c-48b9-bdfe-f16746f82843"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>"]},"transient":null}],"execution_count":0},{"cell_type":"code","source":["df_changefeed = spark.sql(f\"SELECT * FROM table_changes_by_path('{delta_table_path}', {getLatestTableVersion()})\")\ndf_changefeed.show(5)\n"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"01dd432e-3ee0-49c2-b036-3481810412ff"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>"]},"transient":null}],"execution_count":0},{"cell_type":"markdown","source":["<b>Configure AWS S3 connection, and load chang feed data to s3 bucket</b>"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"f2373c3d-a668-465b-8e15-ec02133fd059"}}},{"cell_type":"code","source":["AWS_REGION=\"ap-southeast-2\"\nAWS_ACCESS_KEY = dbutils.secrets.get(scope = \"demoBlogStorageScope\", key = \"aws-access-key\")\nAWS_SECRET_KEY = dbutils.secrets.get(scope = \"demoBlogStorageScope\", key = \"aws-secret-key\")\n\nsc._jsc.hadoopConfiguration().set(\"fs.s3n.awsAccessKeyId\", AWS_ACCESS_KEY)\nsc._jsc.hadoopConfiguration().set(\"fs.s3n.awsSecretAccessKey\", AWS_SECRET_KEY)"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"d26a60ed-bb47-456a-8a4c-5c9fb8ade56e"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>"]},"transient":null}],"execution_count":0},{"cell_type":"code","source":["if df_changefeed.first() is not None:\n df_changefeed.repartition(1).write.csv(\"s3n://aiyi.fuzzysearch/update\")\nelse:\n print (\"No calendar event loaded\")"],"metadata":{"application/vnd.databricks.v1+cell":{"title":"","showTitle":false,"inputWidgets":{},"nuid":"329b7d85-376f-4c64-94da-49b54e240657"}},"outputs":[{"output_type":"display_data","metadata":{"application/vnd.databricks.v1+output":{"data":"","errorSummary":"","metadata":{},"errorTraceType":null,"type":"ipynbError","arguments":{}}},"output_type":"display_data","data":{"text/html":["<style scoped>\n .ansiout {\n display: block;\n unicode-bidi: embed;\n white-space: pre-wrap;\n word-wrap: break-word;\n word-break: break-all;\n font-family: \"Source Code Pro\", \"Menlo\", monospace;;\n font-size: 13px;\n color: #555;\n margin-left: 4px;\n line-height: 19px;\n }\n</style>"]},"transient":null}],"execution_count":0}],"metadata":{"application/vnd.databricks.v1+notebook":{"notebookName":"Office365ETL (2)","dashboards":[],"notebookMetadata":{"pythonIndentUnit":2},"language":"python","widgets":{"inputPath":{"nuid":"061391fa-e6f1-4328-8cc0-eb51cf961ae6","currentValue":"","widgetInfo":{"widgetType":"text","name":"inputPath","defaultValue":"","label":"","options":{"widgetType":"text","validationRegex":null}}}},"notebookOrigID":1990356396667153}},"nbformat":4,"nbformat_minor":0} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment