Last active
April 12, 2022 04:04
-
-
Save epsi95/60cb986802da8759ad93eda697a0f55c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from .base import Base | |
import re | |
import pandas as pd | |
from pyspark.sql import DataFrame | |
from functools import reduce | |
class Base(object): | |
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None): | |
self._spark = spark | |
self._host = host | |
self._port = port | |
self._username = username | |
self._password = password | |
self._database = database | |
def run_query(self, query, return_pandasDF=True): | |
raise NotImplementedError('This method must be implemented') | |
# TERADATA | |
class TeradataWithSpark(Base): | |
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None): | |
super().__init__(spark, host, port, database, username, password) | |
self._reader = self._spark.read.format("jdbc") \ | |
.option("url", f'jdbc:teradata://{self._host}/Database={self._database},LOGMECH=LDAP') \ | |
.option("user", self._username) \ | |
.option("password", self._password) \ | |
.option("driver", "com.teradata.jdbc.TeraDriver") | |
def run_query(self, query, return_pandasDF=True): | |
# spark_df = self._reader.option('dbtable', f"({query}) as tbl").load() | |
# if return_pandasDF: | |
# return spark_df.toPandas() | |
# else: | |
# return spark_df | |
return self.split_query_and_run_individually(query, r'union all', return_pandasDF) | |
def run_queries_and_union_all(self, queries, return_pandasDF=True): | |
dataframes = [] | |
for each_query in queries: | |
try: | |
spark_df = self._reader.option('dbtable', f"({each_query}) as tbl").load() | |
dataframes.append(spark_df) | |
except Exception as e: | |
# simply ignoring the query | |
print(f'Error while reading the query {each_query}') | |
concat_sparkDf = reduce(DataFrame.unionAll, dataframes) | |
if return_pandasDF: | |
return concat_sparkDf.toPandas() | |
else: | |
return concat_sparkDf | |
def split_query_and_run_individually(self, query, separator=r'union all', return_pandasDF=True): | |
queries = re.split(separator, query, flags=re.IGNORECASE) | |
return self.run_queries_and_union_all(queries, return_pandasDF) | |
# POSTGRESQL | |
class PostgresWithSpark(Base): | |
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None): | |
super().__init__(spark, host, port, database, username, password) | |
self._reader = self._spark.read\ | |
.format("jdbc")\ | |
.option("url", f"jdbc:postgresql://{self._host}:{self._port}/{self._database}")\ | |
.option("driver", "org.postgresql.Driver")\ | |
.option("user", self._username)\ | |
.option("password", self._password) | |
def run_query(self, query, return_pandasDF=True): | |
spark_df = self._reader.option('dbtable', f"({query}) as tbl").load() | |
if return_pandasDF: | |
return spark_df.toPandas() | |
else: | |
return spark_df | |
# HIVE | |
from .base import Base | |
class Hive(Base): | |
def __init__(self, spark, host=None, port=None, database=None, username=None, password=None): | |
super().__init__(spark, host, port, database, username, password) | |
def run_query(self, query, return_pandasDF=True): | |
if return_pandasDF: | |
return self._spark.sql(query).toPandas() | |
else: | |
return self._spark.sql(query) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment