Skip to content

Instantly share code, notes, and snippets.

@duzhuoshanwai
Created August 18, 2025 23:05
Show Gist options
  • Save duzhuoshanwai/f00f06d5c9b70cb3e277128f89dd7a5e to your computer and use it in GitHub Desktop.
Save duzhuoshanwai/f00f06d5c9b70cb3e277128f89dd7a5e to your computer and use it in GitHub Desktop.
雅思哥 IELTS 口语话题 爬取
-- 创建topicList表(主表)
CREATE TABLE topicList (
oralTopicId TEXT PRIMARY KEY,
oralTopicName TEXT NOT NULL,
part INTEGER NOT NULL,
questionCount INTEGER DEFAULT 0,
oralQuestion TEXT,
timeTag TEXT,
oralTopCategoryName TEXT,
createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建topicDetail表(从表)
CREATE TABLE topicDetail (
oralTopicId TEXT NOT NULL,
oralTopicName TEXT NOT NULL,
oralTopCatalog INTEGER,
oralTopCategoryName TEXT,
oralQuestion TEXT NOT NULL,
oralPart INTEGER,
createdAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (oralTopicId, oralQuestion), -- 复合主键
FOREIGN KEY (oralTopicId) REFERENCES topicList(oralTopicId) ON DELETE CASCADE
);
-- 创建索引以提高查询性能
CREATE INDEX idx_topicList_part ON topicList(part);
CREATE INDEX idx_topicDetail_category ON topicDetail(oralTopCategoryName);
CREATE INDEX idx_topicDetail_part ON topicDetail(oralPart);
-- 创建触发器自动更新timestamp
CREATE TRIGGER update_topicList_timestamp
AFTER UPDATE ON topicList
FOR EACH ROW
BEGIN
UPDATE topicList SET updatedAt = CURRENT_TIMESTAMP WHERE oralTopicId = NEW.oralTopicId;
END;
import sqlite3
import httpx
import asyncio
import time
from datetime import datetime
import itertools
# 配置参数
CONFIG = {
"base_url": "https://ielts-bro-proxy.duzhuo.icu/ielts-bro",
"catalogs": ["人物", "事物", "事件", "地点"],
"parts": ["0", "1"],
"max_retries": 3, # 最大重试次数
"db_name": "ielts_topics.db", # 数据库文件名
"concurrency": 4, # 并发请求数
}
class IELTSSpider:
def __init__(self):
self.conn = self.init_db()
# Configure httpx client with connection limits
limits = httpx.Limits(max_connections=CONFIG['concurrency'], max_keepalive_connections=CONFIG['concurrency'])
self.session = httpx.AsyncClient(headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}, timeout=30.0, limits=limits)
def init_db(self):
"""初始化数据库, 仅连接数据库.
请先使用 init_db.sql 文件初始化数据库结构.
sqlite3 ielts_topics.db < init_db.sql
"""
conn = sqlite3.connect(CONFIG['db_name'])
return conn
async def make_request(self, url, params=None, retry=0):
"""带重试机制的异步请求函数"""
try:
response = await self.session.get(url, params=params)
response.raise_for_status()
return response.json()
except (httpx.RequestError, httpx.HTTPStatusError) as e:
if retry < CONFIG['max_retries']:
print(f"请求失败,第 {retry+1} 次重试... (URL: {url})")
await asyncio.sleep(2 * (retry + 1)) # 指数退避
return await self.make_request(url, params, retry + 1)
print(f"请求最终失败: {e} (URL: {url})")
return None
async def fetch_topic_list(self, catalog, part):
"""获取主题列表"""
url = f"{CONFIG['base_url']}/topic-list"
params = {"catalog": catalog, "part": part}
print(f"正在获取主题列表: catalog={catalog}, part={part}")
return await self.make_request(url, params)
async def fetch_topic_detail(self, topic_id, part):
"""获取主题详情"""
url = f"{CONFIG['base_url']}/topic-detail"
params = {"topicID": topic_id, "part": part}
return await self.make_request(url, params)
def save_topic_list(self, catalog, part, topic_data):
"""保存主题列表到数据库"""
cursor = self.conn.cursor()
saved_count = 0
category_map = {"人物": "person", "事物": "thing", "事件": "event", "地点": "location"}
category_name = category_map.get(catalog)
for topic in topic_data['content']['list']:
try:
cursor.execute('''
INSERT OR REPLACE INTO topicList (
oralTopicId, oralTopicName, part, questionCount,
oralQuestion, timeTag, oralTopCategoryName, updatedAt
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
topic['oralTopicId'],
topic['oralTopicName'],
topic['part'],
topic['questionCount'],
topic['oralQuestion'],
topic['timeTag'],
category_name,
datetime.now().strftime('%Y-%m-%d %H:%M:%S')
))
saved_count += 1
except sqlite3.Error as e:
print(f"保存主题列表失败 (topicID: {topic['oralTopicId']}): {e}")
self.conn.commit()
print(f"已保存 {saved_count} 个主题 (catalog={catalog}, part={part})")
def save_topic_detail(self, topic_id, detail_data):
"""保存主题详情到数据库"""
cursor = self.conn.cursor()
for detail in detail_data['content']['oralQuestionDetailVOList']:
try:
cursor.execute('''
INSERT OR REPLACE INTO topicDetail (
oralTopicId, oralTopicName, oralTopCatalog,
oralTopCategoryName, oralQuestion, oralPart
) VALUES (?, ?, ?, ?, ?, ?)
''', (
detail['oralTopicId'],
detail['oralTopicName'],
detail['oralTopCatalog'],
detail['oralTopCategoryName'],
detail['oralQuestion'],
detail['oralPart']
))
except sqlite3.Error as e:
print(f"保存主题详情失败 (topicID: {detail['oralTopicId']}, question: {detail['oralQuestion']}): {e}")
self.conn.commit()
async def run(self):
"""运行爬虫"""
print("="*50)
print("开始爬取雅思口语题目数据 (httpx 并发版)")
print(f"参数组合: catalogs={CONFIG['catalogs']}, parts={CONFIG['parts']}")
print(f"并发数: {CONFIG['concurrency']}")
print("="*50)
start_time = time.monotonic()
# 1. Fetch all topic lists concurrently
param_combinations = list(itertools.product(CONFIG['catalogs'], CONFIG['parts']))
list_tasks = [self.fetch_topic_list(c, p) for c, p in param_combinations]
topic_lists_results = await asyncio.gather(*list_tasks)
all_topics_to_fetch_details = []
# 2. Process and save topic lists, and collect all topics for detail fetching
for (catalog, part), topic_list_data in zip(param_combinations, topic_lists_results):
if topic_list_data and topic_list_data.get('status') == 0:
self.save_topic_list(catalog, part, topic_list_data)
for topic in topic_list_data.get('content', {}).get('list', []):
all_topics_to_fetch_details.append((topic, part))
else:
print(f"获取主题列表失败或返回状态不正确 (catalog={catalog}, part={part})")
print(f"\n共找到 {len(all_topics_to_fetch_details)} 个主题, 开始获取详情...")
# 3. Fetch all topic details concurrently with semaphore
semaphore = asyncio.Semaphore(CONFIG['concurrency'])
detail_tasks = []
async def fetch_with_semaphore(topic, part):
async with semaphore:
return await self.fetch_topic_detail(topic['oralTopicId'], part)
for topic, part in all_topics_to_fetch_details:
detail_tasks.append(fetch_with_semaphore(topic, part))
detail_results = await asyncio.gather(*detail_tasks)
print(f"\n获取详情完成, 开始保存到数据库...")
# 4. Save all topic details
saved_details_count = 0
for detail_data in detail_results:
if detail_data and detail_data.get('status') == 0 and detail_data.get('content', {}).get('oralQuestionDetailVOList'):
topic_id = detail_data['content']['oralQuestionDetailVOList'][0]['oralTopicId']
self.save_topic_detail(topic_id, detail_data)
saved_details_count += 1
print(f"共保存 {saved_details_count} 个主题的详情。")
# Close resources
self.conn.close()
await self.session.aclose()
duration = time.monotonic() - start_time
print("\n" + "="*50)
print(f"爬取完成! 总耗时: {duration:.2f} 秒")
print("="*50)
if __name__ == "__main__":
print("提示: 此脚本使用 httpx, 请确保已安装 (pip install 'httpx' 'httpx[http2]')")
spider = IELTSSpider()
asyncio.run(spider.run())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment