Skip to content

Instantly share code, notes, and snippets.

View djouallah's full-sized avatar

Mimoune djouallah

View GitHub Profile
def duckdb_attach_lakehouse(workspace,LH):
--- needed only outside Fabric Notebook
duckdb.sql(f""" CREATE or replace PERSISTENT SECRET onelake (TYPE azure,PROVIDER credential_chain, CHAIN 'cli',ACCOUNT_NAME 'onelake'); """)
sql_schema = set()
sql_statements = set()
duckdb.sql(f""" SELECT * FROM glob ("abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{LH}.Lakehouse/Tables/dbo/*") """).df()['file'].tolist()
list_tables = con.sql(f""" SELECT distinct(split_part(file, '_delta_log', 1)) as tables FROM glob ("abfss://{workspace}@onelake.dfs.fabric.microsoft.com/{LH}.Lakehouse/Tables/*/*/_delta_log/*.json") """).df()['tables'].tolist()
for table_path in list_tables:
from datetime import datetime, timezone, timedelta
import json
import os
from azure.identity import DefaultAzureCredential
from azure.storage.blob import (
BlobClient,
BlobSasPermissions,
BlobServiceClient,
generate_blob_sas,
)
cache_httpfs_get_profile()
"For temp profile collector and stats for on_disk_cache_reader (unit in milliseconds)
metadata cache hit count = 78
metadata cache miss count = 9
data block cache hit count = 9
data block cache miss count = 69
IO latency is Outliers latency with unit millisec: 1704.000000, 1599.000000, 1128.000000, 1299.000000, 2127.000000, 1248.000000, 1262.000000, 1262.000000, 1412.000000, 1760.000000, 1806.000000, 1908.000000
Max latency = 916.000000 millisec
Min latency = 235.000000 millisec
Mean latency = 366.859649 millisec
import boto3
s3_resource = boto3.resource('s3',
region_name = "us-east-1" ,
endpoint_url = "zzzzzzzzzzzzzzzzzzzz" ,
aws_access_key_id = "uuuuuuuuuuuuuuuuuu",
aws_secret_access_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
)
bucket_name = "uuuuu"
bucket = s3_resource.Bucket(bucketName)
COPY (select '' as filename, null as year ) TO "/lakehouse/default/Files/scada" (FORMAT PARQUET, PARTITION_BY (year),OVERWRITE_OR_IGNORE ) ;
SET VARIABLE list_of_files = (select list(file) from glob("/lakehouse/default/Files/Daily_Reports/*.CSV")
where parse_filename(file) not in
(select filename from read_parquet("/lakehouse/default/Files/scada/*/*.parquet"))) ;
create or replace view raw as (select * from read_csv(getvariable('list_of_files'),Skip=1,header =0,all_varchar=1,
columns={
'I': 'VARCHAR','UNIT': 'VARCHAR','XX': 'VARCHAR','VERSION': 'VARCHAR','SETTLEMENTDATE': 'VARCHAR','RUNNO': 'VARCHAR',
import requests
import time
import json
import base64
def get_notebook_content(notebook_id_or_name):
nb = notebookutils.notebook.get(notebook_id_or_name)
workspaceId = nb['workspaceId']
notebookId = nb['id']
format = 'ipynb'
SELECT
--Query01
l_returnflag,
l_linestatus,
SUM(l_quantity) AS sum_qty,
SUM(l_extendedprice) AS sum_base_price,
SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
AVG(l_quantity) AS avg_qty,
AVG(l_extendedprice) AS avg_price,
@djouallah
djouallah / delta.py
Created October 25, 2023 12:50
write to Onelake using Python
%%time
!pip install -q duckdb
!pip install -q deltalake
import duckdb
from deltalake.writer import write_deltalake
from trident_token_library_wrapper import PyTridentTokenLibrary
aadToken = PyTridentTokenLibrary.get_access_token("storage")
sf =1
for x in range(0, sf) :
con=duckdb.connect()
@djouallah
djouallah / gist:998571cf7560fb697ed174d1ef65b7fe
Created July 5, 2023 11:14
load latest metadata iceberg
import boto3
import pandas as pd
s3_client = boto3.client('s3')
bucket = 'xxxxxxx'
prefix = 'zzzz/yyyyyy/metadata'
paginator = s3_client.get_paginator('list_objects_v2')
response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
file_names = pd.DataFrame(columns=['file','date'])
for response in response_iterator:
for object_data in response['Contents']:
#V order thing you can ignore those two lines
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
#Load from the default lakehouse, make sure you click on the pin
from pyspark.sql.types import *
df = spark.read.option("header", "true").format("csv").load("Files/csv/*.csv")
df.write.mode("overwrite").format("delta").save("Tables/tablecsv")