Skip to content

Instantly share code, notes, and snippets.

@laughingman7743
Last active July 23, 2018 12:57
Show Gist options
  • Save laughingman7743/d6982ef9c5010b4766f9074aa2dec331 to your computer and use it in GitHub Desktop.
Save laughingman7743/d6982ef9c5010b4766f9074aa2dec331 to your computer and use it in GitHub Desktop.
import json
from redash.utils import JSONEncoder
from redash.query_runner import (BaseQueryRunner, register,
TYPE_DATETIME, TYPE_DATE, TYPE_STRING,
TYPE_BOOLEAN, TYPE_FLOAT, TYPE_INTEGER)
import logging
logger = logging.getLogger(__name__)
try:
from pyathenajdbc import connect
enabled = True
except ImportError:
enabled = False
_ATHENA_TYPES_MAPPING = {
-6: TYPE_INTEGER,
5: TYPE_INTEGER,
4: TYPE_INTEGER,
-5: TYPE_INTEGER,
8: TYPE_FLOAT,
16: TYPE_BOOLEAN,
-16: TYPE_STRING,
91: TYPE_DATE,
93: TYPE_DATETIME,
}
class Athena(BaseQueryRunner):
noop_query = 'SELECT 1'
@classmethod
def name(cls):
return "Amazon Athena"
@classmethod
def configuration_schema(cls):
return {
'type': 'object',
'properties': {
'region': {
'type': 'string',
'title': 'AWS Region'
},
'aws_access_key': {
'type': 'string',
'title': 'AWS Access Key'
},
'aws_secret_key': {
'type': 'string',
'title': 'AWS Secret Key'
},
's3_staging_dir': {
'type': 'string',
'title': 'S3 Staging Path'
},
'schema': {
'type': 'string',
'title': 'Schema'
},
'jvm_path': {
'type': 'string',
'title': 'JVM Path'
},
'jvm_options': {
'type': 'string',
'title': 'JVM Options'
}
},
'required': ['region', 'aws_access_key', 'aws_secret_key', 's3_staging_dir'],
'secret': ['aws_secret_key']
}
def get_schema(self, get_stats=False):
schema = {}
query = """
SELECT table_schema, table_name, column_name
FROM information_schema.columns
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
"""
results, error = self.run_query(query, None)
if error is not None:
raise Exception("Failed getting schema.")
results = json.loads(results)
for row in results['rows']:
table_name = '{}.{}'.format(row['table_schema'], row['table_name'])
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
schema[table_name]['columns'].append(row['column_name'])
return schema.values()
def run_query(self, query, user):
conn = connect(s3_staging_dir=self.configuration['s3_staging_dir'],
region_name=self.configuration['region'],
access_key=self.configuration['aws_access_key'],
secret_key=self.configuration['aws_secret_key'],
schema_name=self.configuration.get('schema', 'default'),
jvm_path=self.configuration.get('jvm_path', None),
jvm_options=self.configuration.get['jvm_options'].split(',')
if self.configuration.get('jvm_options', None) else None)
try:
with conn.cursor() as cursor:
cursor.execute(query)
column_tuples = [(i[0], _ATHENA_TYPES_MAPPING.get(i[1], None)) for i in cursor.description]
columns = self.fetch_columns(column_tuples)
rows = [dict(zip(([c['name'] for c in columns]), r)) for i, r in enumerate(cursor.fetchall())]
data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
except Exception as e:
json_data = None
error = e.message
finally:
if conn:
conn.close()
return json_data, error
register(Athena)
FROM redash/redash:1.0.0.b2521
COPY ./supervisord.conf /opt/redash/supervisord/supervisord.conf
COPY ./athena.py /opt/redash/current/redash/query_runner/athena.py
RUN sed -i -e "s/'Copy of/u'Copy of/g" /opt/redash/current/redash/models.py \
&& apt-get update \
&& apt-get install -y software-properties-common \
&& add-apt-repository ppa:openjdk-r/ppa \
&& apt-get update \
&& apt-get install -y openjdk-8-jdk ca-certificates-java \
&& update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* \
&& pip install PyAthenaJDBC
[supervisord]
nodaemon=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
directory=/opt/redash/current
[inet_http_server]
port = 0.0.0.0:9001
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[program:redash_server]
command=gunicorn -b 0.0.0.0:5000 --name redash -w 4 --max-requests 1000 redash.wsgi:app
directory=/opt/redash/current
process_name=redash_server
numprocs=1
priority=999
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
user=redash
# There are two queue types here: one for ad-hoc queries, and one for the refresh of scheduled queries
# (note that "scheduled_queries" appears only in the queue list of "redash_celery_scheduled").
# The default concurrency level for each is 2 (-c2), you can increase based on your machine's resources.
[program:redash_celery]
command=celery worker --app=redash.worker --beat -c5 -Qqueries,celery --maxtasksperchild=10 -Ofair
directory=/opt/redash/current
process_name=redash_celery
numprocs=1
priority=999
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
user=redash
[program:redash_celery_scheduled]
command=celery worker --app=redash.worker -c1 -Qscheduled_queries --maxtasksperchild=10 -Ofair
directory=/opt/redash/current
process_name=redash_celery_scheduled
numprocs=1
priority=999
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
user=redash
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment