Skip to content

Instantly share code, notes, and snippets.

@jeriko
Last active August 31, 2023 07:59
Show Gist options
  • Save jeriko/0cdff384d770a5a0284048a98c2e44ab to your computer and use it in GitHub Desktop.
Save jeriko/0cdff384d770a5a0284048a98c2e44ab to your computer and use it in GitHub Desktop.
recombinary python quickstart
import requests
import os
import json
import time
from io import BytesIO
token_cache = None
def get_token():
global token_cache
if "RECOMBINARY_TOKEN" in os.environ:
return os.environ["RECOMBINARY_TOKEN"]
if "RECOMBINARY_CLIENT_ID" not in os.environ or "RECOMBINARY_CLIENT_SECRET" not in os.environ:
raise Exception(
"Please set either RECOMBINARY_TOKEN or both RECOMBINARY_CLIENT_ID and RECOMBINARY_CLIENT_SECRET."
)
url = 'https://recombinary.eu.auth0.com/oauth/token'
current_time = int(time.time())
one_hour_from_now = current_time + 3600
if token_cache and token_cache["expires_at"] > one_hour_from_now:
return token_cache["access_token"]
data = {
"client_id": os.environ["RECOMBINARY_CLIENT_ID"],
"client_secret": os.environ["RECOMBINARY_CLIENT_SECRET"],
"audience": "https://api.recombinary.com",
"grant_type": "client_credentials",
}
response = requests.post(url, json=data)
if response.status_code == 200 and "access_token" in response.json():
token_cache = response.json()
token_cache["expires_at"] = current_time + token_cache["expires_in"]
return token_cache["access_token"]
else:
raise Exception("No access token found in the response")
class API:
def __init__(self, domain, token=None):
self.domain = domain
self.token = token
def get_headers(self):
token = self.token or get_token()
return {"Authorization": f"Bearer {token}"}
def url(self, path=""):
return f"https://{self.domain}/{path}"
def get(self, path="", params={}):
headers = self.get_headers()
response = requests.get(self.url(path), headers=headers, params=params)
if response.status_code >= 400:
raise Exception(response.text)
content_type = response.headers.get('Content-Type')
if 'application/json' in content_type:
return response.json()
else:
return response.content
def post(self, path, payload):
headers = self.get_headers()
response = requests.post(self.url(path), headers=headers, json=payload)
if response.status_code >= 400:
raise Exception(response.text)
content_type = response.headers.get('Content-Type')
if 'application/json' in content_type:
return response.json()
else:
return response.content
def stream(self, path, payload):
headers = self.get_headers()
response = requests.post(self.url(path) + "?stream=true",
headers=headers,
json=payload,
stream=True)
if response.status_code >= 400:
raise Exception(response.text)
buffer = BytesIO()
for chunk in response.iter_content():
buffer.write(chunk)
buffer.seek(0)
return buffer
def connect(domain, token=None):
api = API(domain, token)
def callable_instance(*args):
if len(args) == 0:
return api.get()
if isinstance(args[0], str) and len(args) == 1:
return api.get(args[0])
if isinstance(args[0], dict):
return api.post("", args[0])
if isinstance(args[0], str) and isinstance(args[1], dict):
return api.post(args[0], args[1])
raise Exception("Invalid arguments")
callable_instance.get = api.get
callable_instance.post = api.post
callable_instance.stream = api.stream
return callable_instance
import mimetypes
import base64
import hashlib
import json
def encode(file_buffer, mime_type):
# if mime_type is None:
# # Here you would use a library to determine file type from buffer
# raise NotImplementedError(
# "Auto-detecting file type is not implemented. Please supply the extension as a second parameter e.g. 'png'"
# )
# mime_type, _ = mimetypes.guess_type(f".{ext}")
# if mime_type is None:
# raise ValueError("Could not determine mime type from extension")
base64_encoded = base64.b64encode(file_buffer).decode("utf-8")
return f"data:{mime_type};base64,{base64_encoded}"
def decode(base64_uri):
mime_part, base64_part = base64_uri.split(';base64,')
mime_type = mime_part.split(':')[1]
file_buffer = base64.b64decode(base64_part)
ext = mimetypes.guess_extension(mime_type)
if ext == ".json":
return json.loads(file_buffer.decode("utf-8"))
if ext in ['.txt', '.html', '.css', '.js', '.xml', '.csv', '.md']:
return file_buffer.decode("utf-8")
return file_buffer
def hash(buffer):
sha256_hash = hashlib.sha256()
sha256_hash.update(buffer)
hash_base64 = base64.b64encode(sha256_hash.digest()).decode("utf-8")
return hash_base64.replace('+', '-').replace('/', '_').rstrip('=')
cpu = connect('cpu.recombinary.com')
drive = connect('drive.recombinary.com')
wallet = connect('wallet.recombinary.com')
io = connect('io.recombinary.com')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment