Created
September 3, 2025 19:58
-
-
Save guiathayde/bc3a7082300efdb07df3227381f87f43 to your computer and use it in GitHub Desktop.
Google Drive Uploader Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import mimetypes | |
| from typing import Optional | |
| import os | |
| from urllib.parse import urlencode, quote | |
| # ==== Exceptions ==== | |
| class TokenExchangeError(Exception): | |
| pass | |
| class AccessTokenError(Exception): | |
| pass | |
| class UploadError(Exception): | |
| pass | |
| # ==== CONFIGURATION ==== | |
| CLIENT_ID = '###' | |
| CLIENT_SECRET = '###' | |
| REFRESH_TOKEN = '###' | |
| FOLDER_ID = '###' | |
| UPLOAD_FILE = '###' # Change this to your file | |
| # ==== CONSTANTS ==== | |
| TOKEN_URL = 'https://oauth2.googleapis.com/token' | |
| UPLOAD_URL = 'https://www.googleapis.com/upload/drive/v3/files?uploadType=multipart' | |
| AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' | |
| SCOPES = ['https://www.googleapis.com/auth/drive', 'https://www.googleapis.com/auth/drive.file'] | |
| REDIRECT_PORT = 8765 # usado para loopback; código virá na URL após o consentimento | |
| def _build_auth_url(client_id: str, redirect_uri: str) -> str: | |
| params = { | |
| 'client_id': client_id, | |
| 'redirect_uri': redirect_uri, | |
| 'response_type': 'code', | |
| 'scope': ' '.join(SCOPES), | |
| 'access_type': 'offline', | |
| 'prompt': 'consent', | |
| } | |
| qs = urlencode(params, quote_via=quote) | |
| return f"{AUTH_URL}?{qs}" | |
| def _exchange_code_for_tokens(code: str, client_id: str, client_secret: str, redirect_uri: str) -> dict: | |
| try: | |
| import requests # type: ignore | |
| except ImportError as e: | |
| raise RuntimeError("The 'requests' package is required. Run: pip install -r requirements.txt") from e | |
| data = { | |
| 'code': code, | |
| 'client_id': client_id, | |
| 'client_secret': client_secret, | |
| 'redirect_uri': redirect_uri, | |
| 'grant_type': 'authorization_code', | |
| } | |
| r = requests.post(TOKEN_URL, data=data, timeout=30) | |
| if r.status_code != 200: | |
| raise TokenExchangeError(f"Falha ao trocar code por tokens: {r.status_code} {r.text}") | |
| return r.json() | |
| def get_access_token(client_id: str, client_secret: str, refresh_token: str) -> str: | |
| """Retrieve a new access token using the refresh token.""" | |
| try: | |
| import requests # type: ignore | |
| except ImportError as e: | |
| raise RuntimeError("The 'requests' package is required. Run: pip install -r requirements.txt") from e | |
| headers = {'Content-Type': 'application/x-www-form-urlencoded'} | |
| data = { | |
| 'grant_type': 'refresh_token', | |
| 'client_id': client_id, | |
| 'client_secret': client_secret, | |
| 'refresh_token': refresh_token, | |
| } | |
| response = requests.post(TOKEN_URL, headers=headers, data=data, timeout=30) | |
| if response.status_code == 200: | |
| token_info = response.json() | |
| return token_info.get('access_token') | |
| # Fallback interativo: refresh inválido/revogado | |
| try: | |
| err_json = response.json() | |
| except (ValueError, json.JSONDecodeError): | |
| err_json = {} | |
| err = (err_json or {}).get('error') or '' | |
| err_desc = (err_json or {}).get('error_description') or '' | |
| if response.status_code in (400, 401) and ('invalid_grant' in err or 'invalid_grant' in err_desc or 'revoked' in err_desc): | |
| print('Refresh token inválido ou revogado. Será necessário reautorizar o acesso.') | |
| redirect_uri = 'https://<your_webapp>/oauth/callback' | |
| auth_url = _build_auth_url(client_id, redirect_uri) | |
| print('\nAbra esta URL no navegador, conclua o consentimento e copie o valor do parâmetro "code" da URL de retorno:') | |
| print(auth_url, '\n') | |
| code = input("Cole o 'code' aqui: ").strip() | |
| tokens = _exchange_code_for_tokens(code, client_id, client_secret, redirect_uri) | |
| # Atualiza o refresh token em memória para esta execução | |
| new_rt = tokens.get('refresh_token') | |
| if new_rt: | |
| # opcional: persistir de forma simples | |
| try: | |
| cred_path = os.path.join(os.path.dirname(__file__), 'gd_uploader_credentials.json') | |
| with open(cred_path, 'w', encoding='utf-8') as fh: | |
| json.dump({'refresh_token': new_rt}, fh) | |
| print(f"Novo refresh_token salvo em {cred_path}.") | |
| except OSError: | |
| pass | |
| else: | |
| print('Aviso: resposta não continha refresh_token. Verifique se o consentimento foi exibido (prompt=consent).') | |
| at = tokens.get('access_token') | |
| if not at: | |
| raise AccessTokenError('Falha: resposta não continha access_token.') | |
| return at | |
| # Outro erro | |
| raise AccessTokenError(f"Failed to get access token: {response.status_code} {response.text}") | |
| def _guess_mime_type(file_path: str) -> str: | |
| """Guess a sensible MIME type for the file being uploaded.""" | |
| guessed, _ = mimetypes.guess_type(file_path) | |
| return guessed or 'application/octet-stream' | |
| def upload_to_drive(access_token: str, file_path: str, folder_id: str, timeout: Optional[float] = 60) -> None: | |
| """Upload a file to Google Drive keeping the file handle open during POST. | |
| This fixes 'read of closed file' by opening the file within the same scope | |
| where the POST is executed. | |
| """ | |
| file_name = file_path.split('/')[-1] | |
| metadata = { | |
| 'name': file_name, | |
| 'parents': [folder_id] | |
| } | |
| mime_type = _guess_mime_type(file_path) | |
| headers = { | |
| 'Authorization': f'Bearer {access_token}' | |
| } | |
| # Keep the file open while performing the request | |
| with open(file_path, 'rb') as f: | |
| files = { | |
| 'metadata': ('metadata', json.dumps(metadata), 'application/json; charset=UTF-8'), | |
| 'file': (file_name, f, mime_type) | |
| } | |
| try: | |
| import requests # type: ignore | |
| except ImportError as e: | |
| raise RuntimeError("The 'requests' package is required. Run: pip install -r requirements.txt") from e | |
| response = requests.post(UPLOAD_URL, headers=headers, files=files, timeout=timeout) | |
| if response.status_code not in (200, 201): | |
| raise UploadError(f"Failed to upload file: {response.text}") | |
| print("Upload successful!") | |
| try: | |
| print("Response:", response.json()) | |
| except (ValueError, json.JSONDecodeError): | |
| print("Response (non-JSON):", response.text) | |
| def _load_persisted_refresh_token(default_rt: str) -> str: | |
| cred_path = os.path.join(os.path.dirname(__file__), 'gd_uploader_credentials.json') | |
| try: | |
| with open(cred_path, 'r', encoding='utf-8') as fh: | |
| data = json.load(fh) | |
| return data.get('refresh_token') or default_rt | |
| except (FileNotFoundError, json.JSONDecodeError, OSError): | |
| return default_rt | |
| def main(): | |
| rt = _load_persisted_refresh_token(REFRESH_TOKEN) | |
| access_token = get_access_token(CLIENT_ID, CLIENT_SECRET, rt) | |
| upload_to_drive(access_token, UPLOAD_FILE, FOLDER_ID) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment