Created
July 28, 2024 13:20
-
-
Save jerrylususu/7711da4f6e03c08adb2aa89278ca6be8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import logging | |
import requests | |
import subprocess | |
from pathlib import Path | |
from datetime import datetime | |
import time | |
import json | |
import time | |
# 自定义日志处理类 | |
class CustomHTTPHandler(logging.Handler): | |
def __init__(self, url): | |
logging.Handler.__init__(self) | |
self.url = url | |
def emit(self, record): | |
try: | |
msg = self.format(record) | |
current_time = int(time.time()) | |
log_data = { | |
"logs": [{ | |
"contents": { | |
"source": "myvm", | |
"time": str(current_time), | |
"task": "github-bak", | |
"message": str(msg), | |
"level": str(record.levelname), | |
"line_no": str(record.lineno), | |
"file_name": str(record.filename), | |
"function_name": str(record.funcName) | |
}, | |
"time": current_time # 使用当前时间戳 | |
}] | |
} | |
json_data = json.dumps(log_data) | |
headers = {'Content-Type': 'application/json'} | |
requests.post(self.url, headers=headers, data=json_data) | |
except Exception: | |
self.handleError(record) | |
# 设置日志 | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# 添加自定义日志处理 | |
# TODO: 填入 region, topic id | |
region = "TODO" | |
topic_id = "TODO" | |
url = f"https://ap-{region}.cls.tencentcs.com/tracklog?topic_id=${topic_id}" | |
custom_handler = CustomHTTPHandler(url) | |
logging.getLogger().addHandler(custom_handler) | |
logging.info(f"开始处理") | |
# 从环境变量中读取GitHub个人访问令牌 | |
GITHUB_PAT = os.getenv('GITHUB_PAT') | |
if not GITHUB_PAT: | |
logging.error("GITHUB_PAT环境变量未设置") | |
exit(1) | |
# GitHub API URL | |
GITHUB_API_URL = "https://api.github.com/user" | |
# 获取用户信息 | |
response = requests.get(GITHUB_API_URL, headers={"Authorization": f"token {GITHUB_PAT}"}) | |
if response.status_code != 200: | |
logging.error(f"无法获取用户信息,状态码: {response.status_code}") | |
exit(1) | |
user_info = response.json() | |
username = user_info['login'] | |
logging.info(f"登录成功,用户名 {username}") | |
# 获取用户名下的所有仓库 | |
GITHUB_REPOS_URL = f"https://api.github.com/user/repos" | |
page = 1 | |
repos = [] | |
while True: | |
logging.info(f"获取 repo 页码:{page}") | |
response = requests.get(GITHUB_REPOS_URL, headers={"Authorization": f"token {GITHUB_PAT}"}, params={"per_page": 100, "page": page}) | |
if response.status_code != 200: | |
logging.error(f"无法获取仓库列表,状态码: {response.status_code}") | |
exit(1) | |
page_repos = response.json() | |
if not page_repos: | |
break | |
repos.extend(page_repos) | |
page += 1 | |
# 过滤出用户拥有的仓库 | |
user_repos = [repo for repo in repos if repo['owner']['login'] == username] | |
total_repos = len(user_repos) | |
logging.info(f"repo 总数:{total_repos}") | |
# 处理每个仓库 | |
for i, repo in enumerate(user_repos, 1): | |
repo_name = repo['name'] | |
repo_url = f"https://{username}:{GITHUB_PAT}@github.com/{username}/{repo_name}.git" | |
local_path = Path(repo_name) | |
logging.info(f"正在处理仓库 {repo_name} ({i}/{total_repos})") | |
try: | |
if local_path.exists(): | |
logging.info(f"仓库 {repo_name} 已存在,准备执行 git reset --hard") | |
start_time = time.time() | |
result = subprocess.run(['git', '-C', str(local_path), 'reset', '--hard'], capture_output=True, text=True) | |
end_time = time.time() | |
logging.info(f"仓库 {repo_name}: git reset --hard 耗时: {end_time - start_time:.2f} 秒, 返回码: {result.returncode}") | |
if result.returncode != 0: | |
logging.error(f"仓库 {repo_name}: git reset --hard 失败: {result.stderr}") | |
continue | |
logging.info(f"仓库 {repo_name}: 准备执行 git pull") | |
start_time = time.time() | |
result = subprocess.run(['git', '-C', str(local_path), 'pull'], capture_output=True, text=True) | |
end_time = time.time() | |
logging.info(f"仓库 {repo_name}: git pull 耗时: {end_time - start_time:.2f} 秒, 返回码: {result.returncode}") | |
if result.returncode != 0: | |
logging.error(f"仓库 {repo_name}: git pull 失败: {result.stderr}") | |
continue | |
else: | |
logging.info(f"仓库 {repo_name} 不存在,准备执行 git clone") | |
start_time = time.time() | |
result = subprocess.run(['git', 'clone', repo_url, str(local_path)], capture_output=True, text=True) | |
end_time = time.time() | |
logging.info(f"仓库 {repo_name}: git clone 耗时: {end_time - start_time:.2f} 秒, 返回码: {result.returncode}") | |
if result.returncode != 0: | |
logging.error(f"仓库 {repo_name}: git clone 失败: {result.stderr}") | |
continue | |
logging.info(f"成功处理仓库 {repo_name}") | |
except subprocess.CalledProcessError as e: | |
logging.error(f"处理仓库 {repo_name} 时出错: {e}") | |
exit(1) | |
logging.info("所有仓库处理完毕") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment