Skip to content

Instantly share code, notes, and snippets.

@a-agmon
Created October 31, 2024 13:46
Show Gist options
  • Save a-agmon/058b01777144185f54215abb65c0342d to your computer and use it in GitHub Desktop.
Save a-agmon/058b01777144185f54215abb65c0342d to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "view-in-github",
"colab_type": "text"
},
"source": [
"<a href=\"https://colab.research.google.com/gist/a-agmon/d0f8d61786a019ca7ae1b0e8ebb49de0/copy-of-interview-answers.ipynb\" target=\"_parent\"><img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In Colab\"/></a>"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "A94t8YSBzNyy",
"outputId": "eace3431-3cd4-447f-b9d1-45e5b9990e60"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Reading package lists... Done\n",
"Building dependency tree... Done\n",
"Reading state information... Done\n",
"tree is already the newest version (2.0.2-1).\n",
"0 upgraded, 0 newly installed, 0 to remove and 51 not upgraded.\n",
"Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease\n",
"Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64 InRelease\n",
"Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease\n",
"Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease\n",
"Ign:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease\n",
"Hit:6 https://r2u.stat.illinois.edu/ubuntu jammy Release\n",
"Hit:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease\n",
"Hit:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease\n",
"Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease\n",
"Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease\n",
"Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease\n",
"Reading package lists... Done\n",
"Building dependency tree... Done\n",
"Reading state information... Done\n",
"51 packages can be upgraded. Run 'apt list --upgradable' to see them.\n",
"\u001b[1;33mW: \u001b[0mSkipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)\u001b[0m\n"
]
}
],
"source": [
"!apt-get install tree\n",
"!sudo apt update\n",
"!apt-get install openjdk-8-jdk-headless -qq > /dev/null\n",
"!wget -q https://dlcdn.apache.org/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgz\n",
"!tar xf spark-3.4.4-bin-hadoop3.tgz"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "OvB2Zqkzz-6L",
"outputId": "926fa563-5824-49e8-f1a3-af5a0aaca84e"
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Requirement already satisfied: pyspark==3.4.4 in /usr/local/lib/python3.10/dist-packages (3.4.4)\n",
"Requirement already satisfied: py4j==0.10.9.7 in /usr/local/lib/python3.10/dist-packages (from pyspark==3.4.4) (0.10.9.7)\n"
]
}
],
"source": [
"!pip install -q findspark\n",
"!pip install pyspark==3.4.4"
]
},
{
"cell_type": "code",
"source": [
"import pyspark\n",
"print(pyspark.__version__)"
],
"metadata": {
"id": "FFqg7hyA5OH7",
"outputId": "a3553117-0f67-4ee3-d051-02ca0f84692d",
"colab": {
"base_uri": "https://localhost:8080/"
}
},
"execution_count": 3,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"3.4.4\n"
]
}
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"id": "ivG5oW6o0XmB"
},
"outputs": [],
"source": [
"import os\n",
"os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-8-openjdk-amd64\"\n",
"os.environ[\"SPARK_HOME\"] = \"/content/spark-3.4.4-bin-hadoop3\"\n",
"\n",
"import pandas as pd\n",
"import numpy as np\n",
"import findspark\n",
"findspark.init()\n",
"from pyspark.sql import SparkSession\n",
"from pyspark import SparkConf\n",
"import pyspark.sql.functions as F\n",
"from pyspark.sql.types import IntegerType"
]
},
{
"cell_type": "code",
"source": [
"#!pip install deltalake"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "rVsV6MqyqfG_",
"outputId": "d96e4998-79a1-417b-9015-1dfae680e612"
},
"execution_count": 5,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Requirement already satisfied: deltalake in /usr/local/lib/python3.10/dist-packages (0.21.0)\n",
"Requirement already satisfied: pyarrow>=16 in /usr/local/lib/python3.10/dist-packages (from deltalake) (16.1.0)\n",
"Requirement already satisfied: numpy>=1.16.6 in /usr/local/lib/python3.10/dist-packages (from pyarrow>=16->deltalake) (1.26.4)\n"
]
}
]
},
{
"cell_type": "code",
"source": [
"# First cell - Installation\n",
"#!pip install delta-spark==2.4.0 pyspark==3.4.0 pandas\n",
"!pip install delta-spark"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "i1RBuWp6r85e",
"outputId": "c9989255-989a-4ef4-f9bb-0af035850e57"
},
"execution_count": 5,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Requirement already satisfied: delta-spark in /usr/local/lib/python3.10/dist-packages (2.4.0)\n",
"Requirement already satisfied: pyspark<3.5.0,>=3.4.0 in /usr/local/lib/python3.10/dist-packages (from delta-spark) (3.4.4)\n",
"Requirement already satisfied: importlib-metadata>=1.0.0 in /usr/local/lib/python3.10/dist-packages (from delta-spark) (8.5.0)\n",
"Requirement already satisfied: zipp>=3.20 in /usr/local/lib/python3.10/dist-packages (from importlib-metadata>=1.0.0->delta-spark) (3.20.2)\n",
"Requirement already satisfied: py4j==0.10.9.7 in /usr/local/lib/python3.10/dist-packages (from pyspark<3.5.0,>=3.4.0->delta-spark) (0.10.9.7)\n"
]
}
]
},
{
"cell_type": "code",
"source": [
"from pyspark.sql import SparkSession\n",
"from delta import *\n",
"import pandas as pd\n",
"\n",
"# Initialize Spark with Delta Lake support\n",
"builder = SparkSession.builder.appName(\"DeltaMergeExample\") \\\n",
" .config(\"spark.jars.packages\", \"io.delta:delta-core_2.12:2.4.0\") \\\n",
" .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\") \\\n",
" .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\")\n",
"\n",
"spark = configure_spark_with_delta_pip(builder).getOrCreate()"
],
"metadata": {
"id": "Zv0DPyNnrhhF"
},
"execution_count": 6,
"outputs": []
},
{
"cell_type": "code",
"source": [
"spark"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 219
},
"id": "xB1lo6kVriDL",
"outputId": "4ed84c7d-30de-4a37-b21e-447651dc7e4b"
},
"execution_count": 7,
"outputs": [
{
"output_type": "execute_result",
"data": {
"text/plain": [
"<pyspark.sql.session.SparkSession at 0x7e6b9facaf80>"
],
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkSession - in-memory</b></p>\n",
" \n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://98ea5d9f8a44:4040\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v3.4.4</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>local[*]</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>DeltaMergeExample</code></dd>\n",
" </dl>\n",
" </div>\n",
" \n",
" </div>\n",
" "
]
},
"metadata": {},
"execution_count": 7
}
]
},
{
"cell_type": "code",
"source": [
"# Create initial data\n",
"initial_data = pd.DataFrame({\n",
" 'id': [1, 2, 3, 4],\n",
" 'name': ['Alice', 'Bob', 'Charlie', 'David'],\n",
" 'age': [25, 30, 35, 40]\n",
"})\n",
"\n",
"# Convert to Spark DataFrame\n",
"spark_df = spark.createDataFrame(initial_data)\n",
"\n",
"# Define the path for our Delta table\n",
"delta_table_path = \"./my_delta_table\"\n",
"\n",
"# Write initial data as Delta table\n",
"spark_df.write.format(\"delta\").mode(\"overwrite\").save(delta_table_path)\n",
"\n",
"# Show initial data\n",
"print(\"Initial Data:\")\n",
"DeltaTable.forPath(spark, delta_table_path).toDF().show()"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "A8a9aH_Bupeq",
"outputId": "7f93556e-ce31-4daf-fc12-c81b007439db"
},
"execution_count": 8,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"Initial Data:\n",
"+---+-------+---+\n",
"| id| name|age|\n",
"+---+-------+---+\n",
"| 3|Charlie| 35|\n",
"| 4| David| 40|\n",
"| 1| Alice| 25|\n",
"| 2| Bob| 30|\n",
"+---+-------+---+\n",
"\n"
]
}
]
},
{
"cell_type": "code",
"source": [
"# Create new data with some updates and some new records\n",
"new_data = pd.DataFrame({\n",
" 'id': [3, 3, 5, 6],\n",
" 'name': ['Charlie Updated', 'Charlie Updated 2', 'Eve', 'Frank'],\n",
" 'age': [36, 41, 45, 50]\n",
"})\n",
"\n",
"# Convert to Spark DataFrame\n",
"new_spark_df = spark.createDataFrame(new_data)\n",
"\n",
"new_spark_df = new_spark_df.dropDuplicates(['id'])\n",
"\n",
"# Get the Delta table\n",
"delta_table = DeltaTable.forPath(spark, delta_table_path)\n",
"\n",
"# Perform merge operation\n",
"(delta_table.alias(\"target\")\n",
" .merge(\n",
" new_spark_df.alias(\"source\"),\n",
" \"target.id = source.id\"\n",
" )\n",
" .whenMatchedUpdate(set={\n",
" \"name\": \"source.name\",\n",
" \"age\": \"source.age\"\n",
" })\n",
" .whenNotMatchedInsert(values={\n",
" \"id\": \"source.id\",\n",
" \"name\": \"source.name\",\n",
" \"age\": \"source.age\"\n",
" })\n",
" .execute())\n",
"\n",
"# Show updated data\n",
"print(\"\\nAfter Merge:\")\n",
"delta_table.toDF().orderBy(\"id\").show()"
],
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"id": "iP-oBAluu6wY",
"outputId": "b87337f6-ee2a-4df1-a154-dd94a9320445"
},
"execution_count": 10,
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"text": [
"\n",
"After Merge:\n",
"+---+---------------+---+\n",
"| id| name|age|\n",
"+---+---------------+---+\n",
"| 1| Alice| 25|\n",
"| 2| Bob| 30|\n",
"| 3|Charlie Updated| 36|\n",
"| 4| David| 40|\n",
"| 5| Eve| 45|\n",
"| 6| Frank| 50|\n",
"+---+---------------+---+\n",
"\n"
]
}
]
},
{
"cell_type": "code",
"source": [
"from google.colab import drive\n",
"drive.mount('/content/drive')"
],
"metadata": {
"id": "G8nGZ2370TOw"
},
"execution_count": null,
"outputs": []
}
],
"metadata": {
"colab": {
"name": "Copy of Interview-Answers.ipynb",
"provenance": [],
"include_colab_link": true
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment