Created
May 1, 2017 21:59
-
-
Save janfait/ad7c212c211a9f4359fb2a74a1434840 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Created on Sun Apr 30 20:26:43 2017 | |
@author: Jan Fait, [email protected] | |
""" | |
import os | |
import requests | |
import datetime | |
import pandas | |
class InvalidCredentialsException(Exception): | |
def __init__(self, message): self.message = message | |
class MissingCredentialsException(Exception): | |
def __init__(self, message): self.message = message | |
class MissingParameterException(Exception): | |
def __init__(self, name): self.message = 'Missing required parameter'+name | |
class InvalidAttributeException(Exception): | |
pass | |
class MappDmp: | |
def __init__(self,root=None,username=None,password=None,debug=False): | |
self.debug = debug | |
self.data = {} | |
self.authentication = {} | |
self.session = {} | |
self._constants = {} | |
self.endpoints = { | |
'auth':'/auth', | |
'listexports':'/viz/list-exports', | |
'export':'/viz/export' | |
} | |
self.dictionary = {} | |
self.dprint('Initializing Mapp DMP API') | |
if not username or not password: | |
raise MissingCredentialsException(message='username and password attributes are required') | |
else: | |
self.authentication['username']=username | |
self.authentication['password']=password | |
if not root: | |
self.endpoints['root'] = 'https://platform.flxone.com/api' | |
else: | |
self.endpoints['root'] = root | |
self.login() | |
def dprint(self,*args): | |
if self.debug: | |
args = [str(x) for x in args] | |
out = " ".join(args) | |
out = "MappDmp at " + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ': ' + out | |
print(out) | |
def get_authentication(self): | |
username = '='.join(('username',self.authentication['username'])) | |
password = '='.join(('password',self.authentication['password'])) | |
return '&'.join((username,password)) | |
def login(self): | |
url = self.build_url('auth') | |
body = self.get_authentication() | |
response = requests.post(url=url,data=body) | |
self.session = response.json() | |
self.dprint('Loging in with:',body) | |
status = self.check_login() | |
return status | |
def check_login(self): | |
if self.session['response']['status'] == 'ERROR': | |
self.dprint('Login failed') | |
return False | |
now = datetime.datetime.utcnow() | |
expiry = datetime.datetime.strptime(self.session['debug']['now'], '%Y-%m-%d %H:%M:%S') | |
expiry = expiry + datetime.timedelta(minutes=30) | |
self.dprint('UTC timestamp is:',now) | |
self.dprint('Token expires at:',expiry,'UTC') | |
if now>=expiry: | |
self.dprint('Token expired') | |
return False | |
else: | |
self.dprint('Token valid') | |
return True | |
def build_url(self,endpoint=None): | |
if not endpoint: | |
return MissingParameterException | |
url = ''.join((self.endpoints['root'],self.endpoints[endpoint])) | |
return url | |
def build_headers(self): | |
headers = {'X-Auth': self.session['response']['token'],'X-CSRF':self.session['response']['csrf'],'Accept':'application/json'} | |
return headers | |
def call(self,endpoint=None,method='GET',params=None,body=None,stream=False): | |
url = self.build_url(endpoint) | |
headers = self.build_headers() | |
self.dprint('Calling URL',url) | |
self.dprint('With headers',headers) | |
if not self.check_login: | |
self.login() | |
if method == 'GET': | |
response = requests.get(url=url,headers=headers,stream=stream) | |
else: | |
response = requests.post(url=url,data=body,headers=headers,stream=stream) | |
self.dprint('Sending request data',response.request.data) | |
json = response.json() | |
return json | |
def list_exports(self): | |
response = self.call('listexports') | |
return response | |
def get_export(self,export_id): | |
url = self.build_url('export') | |
headers = self.build_headers() | |
params = {'id':export_id} | |
tempfile = 'MappDmpExport_'+ str(export_id) + '.txt' | |
r = requests.get(url,params=params,headers=headers,stream=True) | |
self.dprint('Calling URL',r.request.url) | |
with open(tempfile, 'w') as f: | |
self.dprint('Writing response content to ',tempfile) | |
for chunk in r.iter_content(decode_unicode=True,chunk_size=1024): | |
if chunk: | |
f.write(chunk) | |
else: | |
f.close() | |
return r.raw | |
def get_data(self,dimensions=None,measures=None,filters=None,limit=None,batch=False): | |
dimensions = self.validate_dimensions(dimensions) | |
measures = self.validate_measures(measures) | |
filters = self.validate_filters(filters) | |
m = MappDmp(username='xxxxxxxx',password='xxxxxxxxxxxx',debug=True) | |
x= m.get_export(export_id=44800) | |
counter = 0 | |
with open("MappDmpExport_44800.txt", "rb") as f: | |
byte = f.read(1024) | |
while byte != "": | |
# Do stuff with byte. | |
byte = f.read(1024) | |
string = byte.decode('utf-8').strip() | |
print(string) | |
if counter > 20: | |
break | |
else: | |
counter += 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment