Skip to content

Instantly share code, notes, and snippets.

@crhan
Created June 27, 2025 06:40
Show Gist options
  • Save crhan/144811b5f2c2b688f377474e7b9ecb17 to your computer and use it in GitHub Desktop.
Save crhan/144811b5f2c2b688f377474e7b9ecb17 to your computer and use it in GitHub Desktop.
Test bambulab login in china region
#!/usr/bin/env python3
"""
完整的 Bambu Lab 中国区登录测试脚本,绕过 Cloudflare 并测试完整流程
"""
import json
import argparse
import time
from curl_cffi import requests as curl_requests
import cloudscraper
import requests
class BambuLoginTester:
def __init__(self, username, password):
self.username = username
self.password = password
self.login_url = "https://api.bambulab.cn/v1/user-service/user/login"
self.sms_code_url = "https://api.bambulab.cn/v1/user-service/user/sendsmscode"
self.email_code_url = "https://api.bambulab.cn/v1/user-service/user/sendemail/code"
self.tfa_url = "https://bambulab.cn/api/sign-in/tfa"
self.bind_url = "https://api.bambulab.cn/v1/iot-service/api/user/bind"
self.access_token = None
self.tfa_key = None
def initial_login_curl_cffi(self):
"""使用 curl_cffi 进行初始登录尝试"""
print("\n=== 测试 curl_cffi 方法 - 初始登录 ===")
auth_payload = {
"account": self.username,
"password": self.password,
"apiError": ""
}
try:
response = curl_requests.post(
self.login_url,
json=auth_payload,
impersonate="chrome120",
timeout=30
)
print(f"状态码: {response.status_code}")
if response.status_code == 403:
if 'cloudflare' in response.text.lower():
print("❌ 被 Cloudflare 拦截")
return None
else:
print("❌ 403 错误但不是 Cloudflare")
print(f"响应内容: {response.text[:200]}...")
return None
elif response.status_code == 200:
try:
data = response.json()
print(f"✅ 请求成功,响应: {json.dumps(data, indent=2, ensure_ascii=False)}")
# 首先检查 loginType,如果存在且不为空,说明需要额外验证步骤
login_type = data.get('loginType', '')
access_token = data.get('accessToken', '')
if login_type == 'verifyCode':
print("✅ 需要验证码")
return 'verifyCode'
elif login_type == 'tfa':
print("✅ 需要 2FA 验证")
self.tfa_key = data.get('tfaKey', '')
return 'tfa'
elif access_token and access_token.strip():
# 只有当 accessToken 不为空且不是空字符串时才认为登录成功
print("✅ 直接登录成功!")
self.access_token = access_token
return 'success'
else:
print(f"⚠️ 响应格式异常或登录失败:")
print(f" loginType: '{login_type}'")
print(f" accessToken: '{access_token}'")
return None
except json.JSONDecodeError:
print(f"❌ 无法解析 JSON 响应: {response.text[:200]}...")
return None
else:
print(f"❌ 意外的状态码: {response.status_code}")
print(f"响应内容: {response.text[:200]}...")
return None
except Exception as e:
print(f"❌ 请求异常: {e}")
return None
def send_verification_code_curl_cffi(self):
"""发送验证码"""
print("\n=== 发送验证码 ===")
# 判断是手机号还是邮箱
if '@' in self.username:
# 邮箱验证码
code_payload = {
"email": self.username,
"type": "codeLogin"
}
url = self.email_code_url
print(f"发送邮箱验证码到: {self.username}")
else:
# 手机验证码
code_payload = {
"phone": self.username,
"type": "codeLogin"
}
url = self.sms_code_url
print(f"发送短信验证码到: {self.username}")
try:
response = curl_requests.post(
url,
json=code_payload,
impersonate="chrome120",
timeout=30
)
print(f"发送验证码状态码: {response.status_code}")
if response.status_code == 200:
print("✅ 验证码发送成功")
return True
else:
print(f"❌ 验证码发送失败: {response.text}")
return False
except Exception as e:
print(f"❌ 发送验证码异常: {e}")
return False
def login_with_verification_code_curl_cffi(self, code):
"""使用验证码登录"""
print(f"\n=== 使用验证码登录: {code} ===")
verify_payload = {
"account": self.username,
"code": code
}
try:
response = curl_requests.post(
self.login_url,
json=verify_payload,
impersonate="chrome120",
timeout=30
)
print(f"验证码登录状态码: {response.status_code}")
if response.status_code == 200:
try:
data = response.json()
print(f"验证码登录响应: {json.dumps(data, indent=2, ensure_ascii=False)}")
access_token = data.get('accessToken', '')
if access_token and access_token.strip():
print("✅ 验证码登录成功!")
self.access_token = access_token
return True
else:
print(f"❌ 验证码登录失败,accessToken 为空: {data}")
return False
except json.JSONDecodeError:
print(f"❌ 无法解析验证码登录响应: {response.text}")
return False
else:
print(f"❌ 验证码登录失败,状态码: {response.status_code}")
print(f"响应: {response.text}")
return False
except Exception as e:
print(f"❌ 验证码登录异常: {e}")
return False
def login_with_tfa_code_curl_cffi(self, tfa_code):
"""使用 2FA 代码登录"""
print(f"\n=== 使用 2FA 代码登录: {tfa_code} ===")
if not self.tfa_key:
print("❌ 没有 TFA Key")
return False
tfa_payload = {
"tfaKey": self.tfa_key,
"tfaCode": tfa_code
}
try:
response = curl_requests.post(
self.tfa_url,
json=tfa_payload,
impersonate="chrome120",
timeout=30
)
print(f"2FA 登录状态码: {response.status_code}")
if response.status_code == 200:
# 2FA 成功通常返回 token 在 cookies 中
cookies = response.cookies.get_dict()
if 'token' in cookies:
print("✅ 2FA 登录成功!")
self.access_token = cookies['token']
return True
else:
print(f"❌ 2FA 响应中没有找到 token: {response.text}")
return False
else:
print(f"❌ 2FA 登录失败,状态码: {response.status_code}")
print(f"响应: {response.text}")
return False
except Exception as e:
print(f"❌ 2FA 登录异常: {e}")
return False
def test_api_access(self):
"""测试 API 访问能力"""
print(f"\n=== 测试 API 访问 ===")
if not self.access_token:
print("❌ 没有访问令牌")
return False
headers = {
'Authorization': f'Bearer {self.access_token}',
'accept': 'application/json',
'Content-Type': 'application/json'
}
try:
# 测试获取绑定设备列表
response = curl_requests.get(
self.bind_url,
headers=headers,
impersonate="chrome120",
timeout=30
)
print(f"设备列表 API 状态码: {response.status_code}")
if response.status_code == 200:
try:
data = response.json()
if 'devices' in data:
devices = data['devices']
print(f"✅ 成功获取设备列表,共 {len(devices)} 个设备")
for i, device in enumerate(devices[:3]): # 只显示前3个设备
print(f" 设备 {i+1}: {device.get('name', 'Unknown')} ({device.get('dev_id', 'No ID')})")
return True
else:
print(f"✅ API 访问成功但没有设备: {data}")
return True
except json.JSONDecodeError:
print(f"❌ 无法解析设备列表响应: {response.text[:200]}...")
return False
else:
print(f"❌ 设备列表 API 失败,状态码: {response.status_code}")
print(f"响应: {response.text[:200]}...")
return False
except Exception as e:
print(f"❌ API 访问异常: {e}")
return False
def run_complete_test(self):
"""运行完整的登录测试流程"""
print("Bambu Lab 中国区完整登录流程测试")
print("=" * 60)
print(f"用户名: {self.username}")
print("=" * 60)
# 第一步:初始登录尝试
login_result = self.initial_login_curl_cffi()
if login_result is None:
print("\n❌ 初始登录失败,可能被 Cloudflare 拦截或其他错误")
return False
elif login_result == 'success':
print("\n🎉 直接登录成功!")
elif login_result == 'verifyCode':
print("\n📱 需要验证码,开始验证码流程...")
# 发送验证码
if self.send_verification_code_curl_cffi():
# 等待用户输入验证码
try:
code = input("\n请输入收到的验证码: ").strip()
if not code:
print("❌ 验证码不能为空")
return False
# 使用验证码登录
if not self.login_with_verification_code_curl_cffi(code):
print("❌ 验证码登录失败")
return False
except KeyboardInterrupt:
print("\n❌ 用户取消")
return False
else:
print("❌ 验证码发送失败")
return False
elif login_result == 'tfa':
print("\n🔐 需要 2FA 验证,开始 2FA 流程...")
try:
tfa_code = input("\n请输入 2FA 验证码: ").strip()
if not tfa_code:
print("❌ 2FA 验证码不能为空")
return False
# 使用 2FA 代码登录
if not self.login_with_tfa_code_curl_cffi(tfa_code):
print("❌ 2FA 登录失败")
return False
except KeyboardInterrupt:
print("\n❌ 用户取消")
return False
# 测试 API 访问
print(f"\n✅ 登录成功!Access Token: {self.access_token[:20]}...")
if self.test_api_access():
print("\n🎉 完整登录流程测试成功!")
print("✅ Cloudflare 绕过成功")
print("✅ 登录流程验证成功")
print("✅ API 访问正常")
return True
else:
print("\n⚠️ 登录成功但 API 访问有问题")
return False
def quick_test_cloudflare_bypass(username, password):
"""快速测试 Cloudflare 绕过能力"""
print("\n=== 快速测试 Cloudflare 绕过 ===")
login_url = "https://api.bambulab.cn/v1/user-service/user/login"
auth_payload = {
"account": username,
"password": password,
"apiError": ""
}
methods = [
("curl_cffi", lambda: curl_requests.post(login_url, json=auth_payload, impersonate="chrome120", timeout=10)),
("cloudscraper", lambda: cloudscraper.create_scraper().post(login_url, json=auth_payload, timeout=10))
]
for method_name, request_func in methods:
try:
print(f"\n测试 {method_name}...")
response = request_func()
if response.status_code == 403 and 'cloudflare' in response.text.lower():
print(f"❌ {method_name}: 被 Cloudflare 拦截")
elif response.status_code == 200:
print(f"✅ {method_name}: 成功绕过 Cloudflare")
return method_name
else:
print(f"⚠️ {method_name}: 状态码 {response.status_code}")
except Exception as e:
print(f"❌ {method_name}: 异常 {e}")
return None
def main():
parser = argparse.ArgumentParser(description="测试 Bambu Lab 中国区完整登录流程和 Cloudflare 绕过")
parser.add_argument("--username", required=True, help="Bambu Lab 用户名(手机号或邮箱)")
parser.add_argument("--password", required=True, help="Bambu Lab 密码")
parser.add_argument("--quick", action="store_true", help="只进行快速 Cloudflare 绕过测试")
args = parser.parse_args()
if args.quick:
# 快速测试模式
print("快速 Cloudflare 绕过测试")
print("=" * 40)
working_method = quick_test_cloudflare_bypass(args.username, args.password)
if working_method:
print(f"\n🎉 {working_method} 方法能够绕过 Cloudflare!")
print("修改后的代码应该可以正常工作。")
else:
print("\n💥 所有方法都被 Cloudflare 拦截")
print("建议:")
print("1. 更换网络环境或 IP")
print("2. 使用代理服务器")
print("3. 稍后再试")
else:
# 完整测试模式
print("开始完整的 Bambu Lab 中国区登录流程测试...")
print("这将测试 Cloudflare 绕过 + 完整登录流程 + API 访问")
print("=" * 60)
tester = BambuLoginTester(args.username, args.password)
success = tester.run_complete_test()
if success:
print("\n" + "=" * 60)
print("🎉🎉🎉 完整测试成功!")
print("✅ Cloudflare 绕过:成功")
print("✅ 登录流程验证:成功")
print("✅ API 访问测试:成功")
print("\n💡 你的 Home Assistant 集成应该能够正常工作了!")
print("确保在 HA 环境中安装了: pip install curl_cffi")
else:
print("\n" + "=" * 60)
print("❌ 测试失败")
print("\n可能的解决方案:")
print("1. 检查用户名和密码是否正确")
print("2. 更换网络环境")
print("3. 使用代理服务器")
print("4. 运行快速测试: --quick 参数")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment