Last active
August 23, 2016 10:28
-
-
Save kampta/8fca0ece5089535c423464d0a2ffac28 to your computer and use it in GitHub Desktop.
day to day utilities
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import string | |
import random | |
import os | |
import re | |
from unicodedata import normalize | |
from datetime import datetime | |
import tempfile | |
import requests | |
from urllib.parse import urlsplit | |
_punct_re = re.compile(r'[\t !"#$%&\'()*\-/<=>?@\[\\\]^_`{|},.]+') | |
_url_re = re.compile( | |
r'^(?:http|ftp)s?://' # http:// or https:// | |
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' #domain... | |
r'localhost|' #localhost... | |
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip | |
r'(?::\d+)?' # optional port | |
r'(?:/?|[/?]\S+)$', re.IGNORECASE) | |
def isInt(s): | |
try: | |
int(s) | |
return True | |
except: | |
return False | |
def isFloat(num): | |
try: | |
float(num) | |
return True | |
except: | |
return False | |
def isUrl(url): | |
return _url_re.match(url) is not None | |
def get_current_time(): | |
return datetime.utcnow() | |
def id_generator(size=10, chars=string.ascii_letters + string.digits): | |
return ''.join(random.choice(chars) for x in range(size)) | |
def make_dir(dir_path): | |
try: | |
if not os.path.exists(dir_path): | |
os.makedirs(dir_path) | |
except Exception as e: | |
raise e | |
def smart_truncate(content, length=64, suffix=' ...'): | |
if len(content) <= length: | |
return content | |
else: | |
return content[:length].rsplit(' ', 1)[0] + suffix | |
def get_base_url(url): | |
u = urlsplit(url) | |
u = u._replace(query='') | |
return u.geturl() | |
def ordered(obj): | |
if isinstance(obj, int): | |
return str(abs(obj)) | |
elif isinstance(obj, float): | |
return str(abs(int(obj))) | |
elif isinstance(obj, dict): | |
return sorted((k, ordered(v)) for k, v in obj.items()) | |
elif isinstance(obj, list): | |
return sorted(ordered(x) for x in obj) | |
else: | |
return obj | |
def slugify(obj, delim='-'): | |
if isinstance(obj, int): | |
return abs(obj) | |
elif isinstance(obj, float): | |
return abs(int(obj)) | |
elif isinstance(obj, str): | |
result = [] | |
for word in _punct_re.split(obj.lower()): | |
word = normalize('NFKD', word) | |
if word: | |
result.append(word) | |
return delim.join(result) | |
elif isinstance(obj, list): | |
return delim.join(slugify(item, delim) for item in ordered(obj)) | |
elif isinstance(obj, dict): | |
return delim.join(delim.join([slugify(k, delim), slugify(v, delim)]) | |
for k, v in ordered(obj)) | |
else: | |
return "" | |
def flatten_tuple_list(l): | |
return [item for tempList in l for item in tempList] | |
def random_id(size=6, chars=string.ascii_uppercase + string.digits): | |
return ''.join(random.choice(chars) for _ in range(size)) | |
# Databases | |
def get_or_create(session, model, **kwargs): | |
instance = session.query(model).filter_by(**kwargs).first() | |
if instance: | |
return instance, True | |
else: | |
instance = model(**kwargs) | |
session.add(instance) | |
return instance, False | |
def clear_schema(db): | |
db.engine.execute("DROP SCHEMA IF EXISTS PUBLIC CASCADE") | |
db.engine.execute("CREATE SCHEMA PUBLIC") | |
def clear_tables(db, *args): | |
meta = db.metadata | |
for table in reversed(meta.sorted_tables): | |
if table.name in args: | |
print('Clear table %s' % table) | |
db.session.execute(table.delete()) | |
db.session.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment