|
import logging |
|
from typing import ( |
|
Callable, |
|
Optional, |
|
TYPE_CHECKING, |
|
Tuple, |
|
) |
|
from urllib.parse import ( |
|
parse_qs, |
|
urlencode, |
|
urlparse, |
|
) |
|
|
|
# noinspection PyProtectedMember |
|
from playwright._impl._errors import TimeoutError |
|
from playwright.async_api import ( |
|
Browser, |
|
async_playwright, |
|
) |
|
from pyotp import TOTP |
|
|
|
if TYPE_CHECKING: |
|
# noinspection PyProtectedMember |
|
from playwright.async_api._context_manager import PlaywrightContextManager |
|
|
|
# noinspection PyProtectedMember |
|
from playwright.async_api._generated import Playwright as AsyncPlaywright |
|
|
|
__all__ = ["login", "refresh"] |
|
|
|
USER_AGENT = "PixivAndroidApp/5.0.234 (Android 11; Pixel 5)" |
|
REDIRECT_URL = "https://app-api.pixiv.net/web/v1/users/auth/pixiv/callback" |
|
LOGIN_URL = "https://app-api.pixiv.net/web/v1/login" |
|
LOGIN_VERIFY = "https://accounts.pixiv.net/ajax/login" |
|
LOGIN_TWO_FACTOR = "https://accounts.pixiv.net/ajax/login/two-factor-authentication" |
|
AUTH_TOKEN_URL = "https://oauth.secure.pixiv.net/auth/token" |
|
# noinspection SpellCheckingInspection |
|
CLIENT_ID = "MOBrBDS8blbauoSck0ZfDbtuzpyT" |
|
# noinspection SpellCheckingInspection |
|
CLIENT_SECRET = "lsACyCD94FhDUtGTXi3QzcFE2uU1hqtDaKeqrdwj" |
|
|
|
playwright_manager: Optional["PlaywrightContextManager"] = None |
|
playwright: Optional["AsyncPlaywright"] = None |
|
logger = logging.getLogger("pixiv_auth") |
|
|
|
|
|
async def close_browser() -> "PlaywrightContextManager": |
|
global playwright_manager, playwright |
|
await playwright_manager.__aexit__() |
|
del playwright |
|
return playwright_manager |
|
|
|
|
|
async def get_browser(headless: bool, proxy: Optional[str] = None) -> Browser: |
|
global playwright_manager, playwright |
|
playwright_manager = async_playwright() |
|
playwright = await playwright_manager.__aenter__() |
|
browser_context = playwright.firefox |
|
browser = await browser_context.launch( |
|
headless=headless, proxy={"server": proxy} if proxy is not None else None |
|
) |
|
return browser |
|
|
|
|
|
def s256(data: bytes) -> str: |
|
from hashlib import sha256 |
|
from base64 import urlsafe_b64encode |
|
|
|
return urlsafe_b64encode(sha256(data).digest()).rstrip(b"=").decode("ascii") |
|
|
|
|
|
def oauth_pkce(transform: Callable[[bytes], str]) -> Tuple[str, str]: |
|
from secrets import token_urlsafe |
|
|
|
code_verifier = token_urlsafe(32) |
|
code_challenge = transform(code_verifier.encode("ascii")) |
|
return code_verifier, code_challenge |
|
|
|
|
|
async def login( |
|
username: str, |
|
password: str, |
|
secret: Optional[str] = None, |
|
*, |
|
headless: bool = True, |
|
proxy: Optional[str] = None, |
|
) -> Optional[Tuple[str, str, int]]: |
|
"""pixiv_auth.login |
|
|
|
使用用户名、密码来登录 Pixiv 获取 token |
|
|
|
Args: |
|
username: str |
|
登录 Pixiv 时所需要的用户名或者邮箱 |
|
password: str |
|
登录 Pixiv 时所需要的密码 |
|
secret: str | None |
|
Pixiv 的两步验证 secret |
|
headless: bool |
|
是否以无头模式运行浏览器 |
|
proxy: Optional[str] |
|
访问 Pixiv 时所需的代理 |
|
|
|
Returns: |
|
token 和 expires |
|
""" |
|
totp = TOTP(secret) if secret else None |
|
|
|
browser = await get_browser(headless, proxy) |
|
context = await browser.new_context() |
|
api_request_context = context.request |
|
page = await context.new_page() |
|
|
|
async def raise_errors(_response) -> None: |
|
if errors := _response["body"].get("errors"): |
|
logger.debug(f"登录错误:{errors}") |
|
info_box = page.locator("form > p") |
|
if not (await info_box.count()): |
|
info_box = page.locator("form > div:first-child") |
|
information = await info_box.inner_text() |
|
raise Exception(information or "请检查输入的信息是否正确") |
|
|
|
# 访问登录页面 |
|
code_verifier, code_challenge = oauth_pkce(s256) |
|
await page.goto( |
|
urlparse(LOGIN_URL) |
|
._replace( |
|
query=urlencode( |
|
{ |
|
"code_challenge": code_challenge, |
|
"code_challenge_method": "S256", |
|
"client": "pixiv-android", |
|
} |
|
) |
|
) |
|
.geturl(), |
|
timeout=0, |
|
) |
|
|
|
# 输入用户名与密码 |
|
await page.locator('input[autocomplete="username"]').fill(username) |
|
await page.locator('input[type="password"]').fill(password) |
|
|
|
submit_button = page.locator("form:has(fieldset) button[type='submit']") |
|
|
|
# 验证登录 |
|
async with page.expect_response(LOGIN_VERIFY + "*") as future_response: |
|
await submit_button.click() |
|
response = await (await future_response.value).json() |
|
|
|
await raise_errors(response) |
|
|
|
# 获取code |
|
if response["body"].get("requireTwoFactorAuthentication"): |
|
ont_time_code_input = page.locator('input[autocomplete="one-time-code"]') |
|
await ont_time_code_input.fill( |
|
totp.now() |
|
if totp |
|
else input("由于您开启了两步验证,现请输入两步验证的验证码:") |
|
) |
|
|
|
async with page.expect_request(REDIRECT_URL + "*") as request: |
|
try: |
|
async with ( |
|
page.expect_response( |
|
LOGIN_TWO_FACTOR + "*", timeout=3000 |
|
) as future, |
|
): |
|
await submit_button.click() |
|
response = await (await future.value).json() |
|
await raise_errors(response) |
|
except TimeoutError: |
|
logger.debug("两步验证成功") |
|
url = urlparse((await request.value).url) |
|
else: |
|
async with page.expect_request(REDIRECT_URL + "*") as request: |
|
url = urlparse((await request.value).url) |
|
|
|
code = parse_qs(url.query)["code"][0] |
|
|
|
logger.debug("登录成功,正尝试获取 token") |
|
|
|
# 获取token |
|
response = await api_request_context.post( |
|
AUTH_TOKEN_URL, |
|
form={ |
|
"client_id": CLIENT_ID, |
|
"client_secret": CLIENT_SECRET, |
|
"code": code, |
|
"code_verifier": code_verifier, |
|
"grant_type": "authorization_code", |
|
"include_policy": "true", |
|
"redirect_uri": REDIRECT_URL, |
|
}, |
|
headers={ |
|
"Accept-Encoding": "gzip, deflate", |
|
"Content-Type": "application/x-www-form-urlencoded", |
|
"User-Agent": USER_AGENT, |
|
"Host": "oauth.secure.pixiv.net", |
|
}, |
|
timeout=0, |
|
) |
|
data = await response.json() |
|
await close_browser() |
|
return data["access_token"], data["refresh_token"], data["expires_in"] |
|
|
|
|
|
async def refresh( |
|
refresh_token: str, *, headless: bool = True, proxy: Optional[str] = None |
|
) -> tuple[str, str, int]: |
|
"""pixiv_auth.refresh |
|
|
|
使用 refresh token 登录 Pixiv 获取 token |
|
|
|
Args: |
|
refresh_token: str |
|
刷新 access token 时所需要的 token |
|
headless: bool |
|
是否以无头模式运行浏览器 |
|
proxy: Optional[str] |
|
访问 Pixiv 时所需的代理 |
|
|
|
Returns: |
|
token 和 expires |
|
""" |
|
browser = await get_browser(headless, proxy) |
|
context = await browser.new_context() |
|
response = await context.request.post( |
|
AUTH_TOKEN_URL, |
|
form={ |
|
"client_id": CLIENT_ID, |
|
"client_secret": CLIENT_SECRET, |
|
"grant_type": "refresh_token", |
|
"include_policy": "true", |
|
"refresh_token": refresh_token, |
|
}, |
|
headers={"User-Agent": USER_AGENT}, |
|
) |
|
data = await response.json() |
|
await close_browser() |
|
return data["access_token"], data["refresh_token"], data["expires_in"] |
|
|
|
|
|
async def main(): |
|
import os |
|
|
|
access_token, refresh_token, expires = await login( |
|
os.environ["PIXIV_USERNAME"], |
|
os.environ["PIXIV_PASSWORD"], |
|
os.environ.get("PIXIV_SECRET"), |
|
headless=True, |
|
proxy=os.environ.get("HTTP_PROXY"), |
|
) |
|
print(access_token, refresh_token, expires, sep="\n") |
|
|
|
access_token, refresh_token, expires = await refresh( |
|
refresh_token, headless=False, proxy=os.environ.get("HTTP_PROXY", None) |
|
) |
|
print(access_token, refresh_token, expires, sep="\n") |
|
|
|
|
|
if __name__ == "__main__": |
|
import asyncio |
|
import sys |
|
|
|
if (3, 8) <= sys.version_info <= (3, 10) and sys.platform.startswith("win"): |
|
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) |
|
cmds = [["C:/Windows/system32/HOSTNAME.EXE"]] |
|
else: |
|
cmds = [ |
|
["du", "-sh", "/Users/fredrik/Desktop"], |
|
["du", "-sh", "/Users/fredrik"], |
|
["du", "-sh", "/Users/fredrik/Pictures"], |
|
] |
|
|
|
asyncio.run(main()) |