Created
May 19, 2023 03:49
-
-
Save this-post/30570cebdb6d02e9bb52015813a27ffe to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, jsonify, request | |
from pymemcache.client import base # brew install memcached && pip install pymemcache | |
from cryptography import x509 | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import hashes, serialization | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from cryptography.hazmat.primitives.kdf.hkdf import HKDF | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
import ssl, sys, sqlite3, uuid, os, json | |
app = Flask(__name__) | |
playfab_title_id = 'TITLE_ID' # change into your title ID | |
playfab_url = '{0}.playfabapi.com'.format(playfab_title_id) | |
@app.route('/getPinnedCertSha512', methods=['POST']) | |
def getPinnedCertSha512(): | |
if not request.is_json: | |
return '', 400 | |
data = request.json | |
kid = data.get('kid') | |
if mc_client.get(kid) is None: | |
return 'Key not found', 403 | |
cert_pem = ssl.get_server_certificate((playfab_url, 443)) | |
cert = x509.load_pem_x509_certificate(cert_pem.encode('utf-8'), default_backend()) | |
sha512fingerprint = cert.fingerprint(hashes.SHA256()) | |
payload = { | |
'sha512fingerprint': sha512fingerprint.hex() | |
} | |
response = { | |
'data': aes_encrypt(kid, json.dumps(payload).encode('utf-8')) | |
} | |
return jsonify(response), 200 | |
@app.route('/getE2eeParams', methods=['POST']) | |
def keyExchange(): | |
# print(request.headers, file=sys.stderr) | |
# print(request.form, file=sys.stderr) | |
# print(request.json, file=sys.stderr) | |
# print(request.files, file=sys.stderr) | |
if request.is_json: | |
data = request.json | |
public_key = data.get('publicKey') | |
if public_key is not None: | |
k_uuid = generate_kid(public_key) | |
_salt = os.urandom(16) | |
result, shared_secret = get_shared_secret(k_uuid) | |
if not result: | |
return 'Key exchange failed', 500 | |
session_key = get_shared_derived_key(_salt, shared_secret) | |
# print(session_key.hex(), file=sys.stderr) | |
mc_client.set(k_uuid, session_key) | |
response = { | |
'kid': k_uuid, | |
'salt': _salt.hex(), | |
'serverPublicKey': server_private_key.public_key().public_bytes( | |
encoding=serialization.Encoding.DER, | |
format=serialization.PublicFormat.SubjectPublicKeyInfo | |
).hex() | |
} | |
return jsonify(response), 200 | |
return 'Unexpected content type', 400 | |
def get_shared_secret(kid): | |
sql_conn = sqlite3.connect('data.db') | |
cur = sql_conn.cursor() | |
cur.execute("SELECT value FROM public_key WHERE id = ?", (kid, )) | |
res_s = cur.fetchone() | |
if res_s is not None: | |
public_key_hex, = res_s | |
public_key_der = serialization.load_der_public_key(bytes.fromhex(public_key_hex)) | |
shared_secret = server_private_key.exchange(ec.ECDH(), public_key_der) | |
sql_conn.close() | |
return (True, shared_secret) | |
else: | |
sql_conn.close() | |
return (False, "") | |
def get_shared_derived_key(_salt, shared_secret): | |
return HKDF( | |
algorithm=hashes.SHA256(), | |
length=32, | |
salt=_salt, | |
info=b'handshake data' | |
).derive(shared_secret) | |
def aes_encrypt(kid, plain): | |
session_key = mc_client.get(kid) | |
nonce = os.urandom(16) | |
aad = os.urandom(16) | |
# print(aad.hex(), file=sys.stderr) | |
aesgcm = AESGCM(session_key) | |
cipher = aesgcm.encrypt(nonce, plain, aad) | |
return nonce.hex() + cipher.hex() + aad.hex() # 16 bytes of nonce + cipher + 16 bytes of AAD | |
def aes_decrypt(kid, cipher): | |
session_key = mc_client.get(kid) | |
nonce = cipher[:16 * 2] | |
aad = cipher[:-(16 * 2)] | |
aesgcm = AESGCM(session_key) | |
plain = aesgcm.decrypt(nonce, cipher, aad) | |
return plain | |
def generate_kid(public_key): | |
sql_conn = sqlite3.connect('data.db') | |
cur = sql_conn.cursor() | |
kid = str(uuid.uuid4()) | |
cur.execute("INSERT INTO public_key(id, value) VALUES(?, ?)", (kid, public_key)) | |
sql_conn.commit() | |
sql_conn.close() | |
return kid | |
def run_memcache(): | |
global mc_client | |
mc_client = base.Client(('localhost', 11211)) | |
def init(): | |
global server_private_key | |
server_private_key = ec.generate_private_key(ec.SECP521R1()) | |
sql_conn = sqlite3.connect('data.db') # create empty file "data.db" first | |
cur = sql_conn.cursor() | |
cur.execute("DROP TABLE IF EXISTS public_key") | |
cur.execute("CREATE TABLE public_key(id, value)") | |
sql_conn.close() | |
run_memcache() | |
if __name__ == '__main__': | |
init() | |
app.run(host='127.0.0.1', port=1337, debug=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment