Created
November 8, 2024 21:57
-
-
Save dzmitry-savitski/d8a80520246f3b0c10c37f81a6c3030e to your computer and use it in GitHub Desktop.
Python Oauth 2.0 device code grant flow with PKCE client
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import hashlib | |
import base64 | |
import time | |
import urllib.parse | |
# Configuration | |
CLIENT_ID = 'your_client_id' | |
AUTHORIZATION_SERVER = 'https://authorization-server.com' | |
DEVICE_CODE_URL = f'{AUTHORIZATION_SERVER}/oauth/device/code' | |
TOKEN_URL = f'{AUTHORIZATION_SERVER}/oauth/token' | |
# PKCE helper functions | |
def generate_code_verifier(): | |
return base64.urlsafe_b64encode(hashlib.sha256().digest()).decode('utf-8').rstrip('=') | |
def generate_code_challenge(code_verifier): | |
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() | |
return base64.urlsafe_b64encode(code_challenge).decode('utf-8').rstrip('=') | |
# Generate PKCE parameters | |
code_verifier = generate_code_verifier() | |
code_challenge = generate_code_challenge(code_verifier) | |
# Step 1: Request a device code | |
device_code_response = requests.post(DEVICE_CODE_URL, data={ | |
'client_id': CLIENT_ID, | |
'code_challenge': code_challenge, | |
'code_challenge_method': 'S256', | |
}) | |
if device_code_response.status_code != 200: | |
print("Failed to get device code:", device_code_response.json()) | |
exit(1) | |
device_data = device_code_response.json() | |
print("Please visit:", device_data['verification_uri']) | |
print("And enter the code:", device_data['user_code']) | |
# Step 2: Poll for token using device code | |
while True: | |
time.sleep(device_data['interval']) | |
token_response = requests.post(TOKEN_URL, data={ | |
'client_id': CLIENT_ID, | |
'device_code': device_data['device_code'], | |
'grant_type': 'urn:ietf:params:oauth:grant-type:device_code', | |
'code_verifier': code_verifier, | |
}) | |
if token_response.status_code == 200: | |
tokens = token_response.json() | |
print("Access Token:", tokens['access_token']) | |
break | |
elif token_response.status_code == 400: | |
error = token_response.json().get('error') | |
if error == 'authorization_pending': | |
print("Waiting for user authorization...") | |
continue | |
elif error == 'slow_down': | |
time.sleep(5) | |
else: | |
print("Error:", error) | |
break | |
else: | |
print("Failed to retrieve token:", token_response.json()) | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment