Created
March 2, 2019 15:39
-
-
Save jbarton311/df1f5e876ff97eb848a41edcbdcb5e6c to your computer and use it in GitHub Desktop.
Script to call spotify API. Functions for current track and top historical artists & songs.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import os | |
import logging | |
# Local file that contains API keys and secrets | |
from config import Config | |
# create logger | |
logger = logging.getLogger(__file__) | |
logger.setLevel(logging.DEBUG) | |
# create console handler and set level to debug | |
ch = logging.StreamHandler() | |
ch.setLevel(logging.DEBUG) | |
# create formatter | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
# add formatter to ch | |
ch.setFormatter(formatter) | |
# add ch to logger | |
logger.addHandler(ch) | |
def get_fresh_access_key(): | |
''' | |
Will use the 'spotify_refresh' environment variables | |
to request a new access token | |
''' | |
logger.info("Refreshing access key...") | |
client_id = Config.spotify_client_id | |
client_secret = Config.spotify_client_secret | |
refresh_token = Config.spotify_refresh | |
data = { | |
'client_id':client_id, | |
'client_secret':client_secret, | |
'refresh_token': refresh_token, | |
'grant_type': 'refresh_token', | |
} | |
response = requests.post('https://accounts.spotify.com/api/token', data=data) | |
# Set an environment variable | |
access = response.json()['access_token'] | |
Config.access = access | |
return access | |
def make_api_call(func, **kwargs): | |
''' | |
Checks the response from the API and returns the appropriate data. | |
Will refresh token if necessary. | |
Returns the JSON data from the API containing current track info | |
''' | |
if not Config.access: | |
Config.access = get_fresh_access_key() | |
response = func(**kwargs) | |
# Refresh token and try again | |
if response.status_code == 401: | |
logger.info("Token likely expired. Refreshing token now") | |
get_fresh_access_key() | |
response = func(**kwargs) | |
code = response.status_code | |
if code == 200: | |
logger.info("We are all good") | |
return response.json() | |
elif code == 204: | |
logger.info("No current track being played") | |
return None | |
elif code == 403: | |
logger.warning("403 - Forbidden - The server understood the request, but is refusing to fulfill it.") | |
return None | |
else: | |
logger.info(f"Code = {code}. Response.content: \n") | |
logger.info(response.content) | |
return None | |
def api_current_track(): | |
''' | |
Standard API call to get the current track | |
Returns the response from the API | |
''' | |
auth = f'Bearer {Config.access}' | |
headers = { | |
'Accept': 'application/json', | |
'Content-Type': 'application/json', | |
'Authorization': auth, | |
} | |
params = ( | |
('market', 'ES'), | |
) | |
response = requests.get('https://api.spotify.com/v1/me/player', headers=headers, params=params) | |
return response | |
def current_track_data(): | |
''' | |
Takes the JSON from the API call for current track | |
and creates a dictionary with the specific attributes we want | |
Returns a dictionary of current track info | |
''' | |
data = make_api_call(api_current_track) | |
jboy={} | |
try: | |
jboy['artist'] = data['item']['artists'][0]['name'] | |
jboy['album'] = data['item']['album']['name'] | |
jboy['release_date'] = data['item']['album']['release_date'] | |
jboy['song'] = data['item']['name'] | |
jboy['album_art'] = data['item']['album']['images'][1]['url'] | |
# Calculate time into song | |
total_seconds = data['progress_ms'] / 1000.0 | |
minutes = int(total_seconds // 60) | |
seconds = str(int(total_seconds % 60)).zfill(2) | |
jboy['time_elapsed'] = f"{minutes}:{seconds}" | |
except Exception as e: | |
logger.info(e) | |
jboy['status'] = 'issue with data clean up' | |
if data['currently_playing_type'] == 'episode': | |
jboy['artist'] = 'podcast' | |
jboy['album'] = 'podcast' | |
jboy['song'] = 'podcast' | |
jboy['status'] = 'podcast' | |
return jboy | |
def api_top_user_data(limit=20, time_range='short_term', data_requested='artists'): | |
''' | |
Standard API call for pulling data on users | |
stats (artist and track plays) | |
''' | |
if data_requested not in ['artists','tracks']: | |
raise ValueError("data_requested argument must be ['artist','tracks']") | |
if time_range not in ['short_term','medium_term','long_term']: | |
raise ValueError("time_range argument must be ['short_term','medium_term','long_term']") | |
if not 1 <= limit <= 50: | |
raise ValueError("Limit must be between 1 and 50") | |
headers = { | |
'Authorization': f"Bearer {Config.access}", | |
} | |
params = {'limit':f'{limit}', | |
'time_range':f'{time_range}' | |
} | |
headers | |
response = requests.get(f'https://api.spotify.com/v1/me/top/{data_requested}', headers=headers, params=params) | |
return response | |
def top_artists(limit=1, time_range='short_term', data_requested='artists'): | |
''' | |
Will pull data for the top artists played. | |
Returns a dictionary with rank and info | |
''' | |
data = make_api_call(api_top_user_data, limit=limit, time_range=time_range, data_requested=data_requested) | |
art_dict = {} | |
i = 1 | |
for result in data['items']: | |
art_dict[i] = {} | |
art_dict[i]['artist'] = result['name'] | |
art_dict[i]['popularity'] = result['popularity'] | |
try: | |
art_dict[i]['image'] = result['images'][2]['url'] | |
except IndexError: | |
art_dict[i]['image'] = 'Sorry no image' | |
i += 1 | |
return art_dict | |
def top_tracks(limit=1, time_range='short_term', data_requested='tracks'): | |
''' | |
Will pull data for the top tracks played. | |
Returns a dictionary with rank and info | |
''' | |
data = make_api_call(api_top_user_data, limit=limit, time_range=time_range, data_requested=data_requested) | |
track_dict = {} | |
i = 1 | |
for result in data['items']: | |
track_dict[i] = {} | |
track_dict[i]['song_name'] = result['name'] | |
track_dict[i]['artist_name'] = result['artists'][0]['name'] | |
track_dict[i]['album_name'] = result['album']['name'] | |
try: | |
track_dict[i]['image'] = result['album']['images'][2]['url'] | |
except IndexError: | |
track_dict[i]['image'] = 'Sorry no image' | |
i += 1 | |
return track_dict |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment