|
#!/usr/bin/env python3 |
|
""" |
|
Shatel Traffic Checker |
|
This script manages tokens and fetches traffic data from Shatel API. |
|
""" |
|
|
|
import requests |
|
import os |
|
import sys |
|
import shelve |
|
from datetime import datetime, timedelta |
|
import base64 |
|
import json |
|
from typing import Optional, Dict, Any |
|
|
|
|
|
INITIAL_ACCESS_TOKEN = '' |
|
INITIAL_REFRESH_TOKEN = '' |
|
|
|
def app_folder(prog: str) -> str: |
|
"""Create and return application data folder path""" |
|
if sys.platform == "win32": |
|
directory = os.path.join(os.path.expanduser("~"), "AppData", "Local", prog) |
|
else: |
|
directory = os.path.join(os.path.expanduser("~"), "." + prog) |
|
if not os.path.exists(directory): |
|
os.makedirs(directory) |
|
return directory |
|
|
|
|
|
def save_key(key: str, value: Any) -> None: |
|
"""Save a key-value pair to the database""" |
|
path = os.path.join(app_folder('script_data'), '.shateldb') |
|
with shelve.open(path) as db: |
|
db[key] = value |
|
|
|
|
|
def load_key(key: str, default: Any = None) -> Any: |
|
"""Load a value from the database by key""" |
|
path = os.path.join(app_folder('script_data'), '.shateldb') |
|
with shelve.open(path) as db: |
|
try: |
|
value = db[key] |
|
except KeyError: |
|
value = None |
|
return default if value is None else value |
|
|
|
|
|
class ShatelTokenManager: |
|
"""Manages Shatel API tokens and handles authentication""" |
|
|
|
def __init__(self): |
|
self.client_id = "AppMyShatel" |
|
self.token_url = "https://account-api.shatel.ir/connect/token" |
|
self.traffic_url = "https://gateway.shatel.ir/api/v1.0/appmyshatel/traffic" |
|
|
|
def is_token_expired(self, access_token: str) -> bool: |
|
"""Check if the access token is expired""" |
|
try: |
|
# JWT format: header.payload.signature |
|
payload_b64 = access_token.split('.')[1] |
|
# Add padding if necessary |
|
padding = '=' * (-len(payload_b64) % 4) |
|
payload_b64 += padding |
|
payload_json = base64.urlsafe_b64decode(payload_b64) |
|
payload = json.loads(payload_json) |
|
exp_timestamp = payload.get('exp') |
|
if exp_timestamp: |
|
exp_datetime = datetime.fromtimestamp(exp_timestamp) |
|
buffer_time = exp_datetime - timedelta(minutes=5) |
|
return datetime.now() >= buffer_time |
|
return True # If no expiration found, consider it expired |
|
except Exception as e: |
|
print(f"β οΈ Error checking token expiration: {e}") |
|
return True # If we can't decode, consider it expired |
|
|
|
def refresh_access_token(self, refresh_token: str) -> Optional[Dict[str, Any]]: |
|
"""Refresh the access token using refresh token""" |
|
headers = { |
|
'accept': 'application/json', |
|
'accept-charset': 'UTF-8', |
|
'accept-encoding': 'gzip', |
|
'connection': 'Keep-Alive', |
|
'content-type': 'application/x-www-form-urlencoded; charset=UTF-8', |
|
'host': 'account-api.shatel.ir', |
|
'user-agent': 'Ktor client' |
|
} |
|
|
|
data = { |
|
'grant_type': 'refresh_token', |
|
'client_id': self.client_id, |
|
'refresh_token': refresh_token |
|
} |
|
|
|
try: |
|
print("π Refreshing access token...") |
|
response = requests.post(self.token_url, headers=headers, data=data) |
|
response.raise_for_status() |
|
|
|
token_data = response.json() |
|
print("β
Token refreshed successfully!") |
|
|
|
# Save new tokens to database |
|
save_key('access_token', token_data.get('access_token')) |
|
save_key('refresh_token', token_data.get('refresh_token')) |
|
|
|
return token_data |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"β Error refreshing token: {e}") |
|
if hasattr(e, 'response') and e.response is not None: |
|
print(f"Response: {e.response.text}") |
|
return None |
|
|
|
def get_valid_access_token(self) -> Optional[str]: |
|
"""Get a valid access token, refreshing if necessary""" |
|
access_token = load_key('access_token') |
|
refresh_token = load_key('refresh_token') |
|
|
|
# If no tokens exist, prompt user to enter them |
|
if not access_token or not refresh_token: |
|
# check initial tokens |
|
if INITIAL_ACCESS_TOKEN and INITIAL_REFRESH_TOKEN: |
|
access_token = INITIAL_ACCESS_TOKEN |
|
refresh_token = INITIAL_REFRESH_TOKEN |
|
save_key('access_token', access_token) |
|
save_key('refresh_token', refresh_token) |
|
print("β
Initial tokens saved to database!") |
|
else: |
|
print("π No tokens found in database.") |
|
print("Put tokens in the code file at the start of the file and run it again") |
|
return None |
|
|
|
# Check if token is expired and refresh if needed |
|
if self.is_token_expired(access_token): |
|
print("β° Access token expired, refreshing...") |
|
token_data = self.refresh_access_token(refresh_token) |
|
if token_data: |
|
access_token = token_data.get('access_token') |
|
else: |
|
print("β Failed to refresh token. Clearing stored tokens...") |
|
# Delete invalid tokens from database |
|
save_key('access_token', None) |
|
save_key('refresh_token', None) |
|
print("ποΈ Stored tokens cleared. Please re-authenticate on next run.") |
|
return None |
|
|
|
return access_token |
|
|
|
def get_traffic_data(self) -> Optional[Dict[str, Any]]: |
|
"""Fetch traffic data from Shatel API""" |
|
access_token = self.get_valid_access_token() |
|
|
|
if not access_token: |
|
return None |
|
|
|
headers = { |
|
'accept': 'application/json', |
|
'accept-charset': 'UTF-8', |
|
'accept-encoding': 'gzip', |
|
'accept-language': 'fa,fa', |
|
'authorization': f'Bearer {access_token}', |
|
'connection': 'Keep-Alive', |
|
'host': 'gateway.shatel.ir', |
|
'user-agent': 'Ktor client' |
|
} |
|
|
|
try: |
|
print("π Fetching traffic data...") |
|
response = requests.get(self.traffic_url, headers=headers) |
|
response.raise_for_status() |
|
|
|
traffic_data = response.json() |
|
print("β
Traffic data retrieved successfully!") |
|
return traffic_data |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"β Error fetching traffic data: {e}") |
|
if hasattr(e, 'response') and e.response is not None: |
|
print(f"Response: {e.response.text}") |
|
return None |
|
|
|
|
|
def format_bytes(bytes_value: int) -> str: |
|
"""Convert bytes to human readable format""" |
|
if bytes_value < 0: |
|
return "N/A" |
|
|
|
for unit in ['B', 'KB', 'MB', 'GB', 'TB']: |
|
if bytes_value < 1024.0: |
|
return f"{bytes_value:.2f} {unit}" |
|
bytes_value /= 1024.0 |
|
return f"{bytes_value:.2f} PB" |
|
|
|
|
|
def format_traffic_data(traffic_data: Dict[str, Any]) -> None: |
|
"""Format and display traffic data in a readable way""" |
|
if not traffic_data.get('isSuccess'): |
|
print("β API returned unsuccessful response") |
|
return |
|
|
|
results = traffic_data.get('result', []) |
|
if not results: |
|
print("π No traffic data found") |
|
return |
|
|
|
print("\n" + "=" * 80) |
|
print("π SHATEL TRAFFIC SUMMARY") |
|
print("=" * 80) |
|
|
|
total_credit = 0 |
|
total_usage = 0 |
|
|
|
for i, package in enumerate(results, 1): |
|
print(f"\nπ¦ Package {i}: {package.get('trafficDomainName', 'Unknown')} - {package.get('trafficPackageTypeName', 'Unknown')}") |
|
print("-" * 50) |
|
|
|
credit_kb = package.get('creditKB', 0) |
|
usage_kb = package.get('usageKB', 0) |
|
remaining_kb = credit_kb - usage_kb if credit_kb > 0 else 0 |
|
|
|
total_credit += credit_kb |
|
total_usage += usage_kb |
|
|
|
print(f"π³ Total Credit: {format_bytes(credit_kb * 1024)}") |
|
print(f"π Used: {format_bytes(usage_kb * 1024)}") |
|
print(f"π° Remaining: {format_bytes(remaining_kb * 1024)}") |
|
|
|
# Usage percentage |
|
if credit_kb > 0: |
|
usage_percent = (usage_kb / credit_kb) * 100 |
|
print(f"π Usage: {usage_percent:.1f}%") |
|
|
|
# Dates |
|
start_date = package.get('startDate', '') |
|
exp_date = package.get('expirationDate', '') |
|
last_usage = package.get('lastUsage', '') |
|
|
|
if start_date: |
|
print(f"π
Start Date: {start_date}") |
|
if exp_date and exp_date != "9999-12-31T23:59:59.9999999": |
|
print(f"β° Expiration: {exp_date}") |
|
if last_usage and last_usage != "9999-12-31T23:59:59.9999999": |
|
print(f"π Last Usage: {last_usage}") |
|
|
|
# Traffic details |
|
send_kb = package.get('sendKB', -1) |
|
receive_kb = package.get('receiveKB', -1) |
|
|
|
if send_kb >= 0: |
|
print(f"β¬οΈ Upload: {format_bytes(send_kb * 1024)}") |
|
if receive_kb >= 0: |
|
print(f"β¬οΈ Download: {format_bytes(receive_kb * 1024)}") |
|
|
|
# Total summary |
|
print("\n" + "=" * 80) |
|
print("π TOTAL SUMMARY") |
|
print("=" * 80) |
|
print(f"π³ Total Credit: {format_bytes(total_credit * 1024)}") |
|
print(f"π Total Used: {format_bytes(total_usage * 1024)}") |
|
print(f"π° Total Remaining: {format_bytes((total_credit - total_usage) * 1024)}") |
|
|
|
if total_credit > 0: |
|
total_usage_percent = (total_usage / total_credit) * 100 |
|
print(f"π Total Usage: {total_usage_percent:.1f}%") |
|
|
|
print("=" * 80) |
|
|
|
|
|
def main(): |
|
"""Main function""" |
|
print("π Shatel Traffic Checker") |
|
print("=" * 50) |
|
|
|
# Initialize token manager |
|
manager = ShatelTokenManager() |
|
traffic_data = manager.get_traffic_data() |
|
if traffic_data: |
|
# Display formatted traffic data |
|
format_traffic_data(traffic_data) |
|
|
|
# Raw JSON Response |
|
# print(json.dumps(traffic_data, indent=2, ensure_ascii=False)) |
|
|
|
else: |
|
print("β Failed to retrieve traffic data") |
|
return 1 |
|
|
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
try: |
|
exit_code = main() |
|
sys.exit(exit_code) |
|
except KeyboardInterrupt: |
|
print("\n\nβΉοΈ Interrupted by user") |
|
sys.exit(1) |
|
except Exception as e: |
|
print(f"\nβ Unexpected error: {e}") |
|
sys.exit(1) |