Skip to content

Instantly share code, notes, and snippets.

@shenjackyuanjie
Last active April 25, 2025 04:43
Show Gist options
  • Save shenjackyuanjie/af989c8474c0b1f9b785315c5b8de2b3 to your computer and use it in GitHub Desktop.
Save shenjackyuanjie/af989c8474c0b1f9b785315c5b8de2b3 to your computer and use it in GitHub Desktop.
ica db sqlite -> pg
lib_not_dr
psycopg
import sys
import time
import json
import sqlite3
import traceback
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
from typing import Generator
import psycopg
from lib_not_dr import loggers
from lib_not_dr.loggers.outstream import FileCacheOutputStream
from lib_not_dr.types.options import Options
logger = loggers.config.get_logger("transfer")
log_file_name = f"{time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime())}-transfer.log"
logger.add_output(FileCacheOutputStream(file_name=log_file_name, level=0))
if "-v" in sys.argv:
logger.global_level = 10
if "-vv" in sys.argv:
logger.global_level = 5
schem = "SCHEM"
DB_NAME = "DB_NAME"
def get_pgdb() -> tuple:
connect = psycopg.connect(
# "dbname= user= password= host= port=", database=""
)
cur = connect.cursor()
return connect, cur
def get_sqlitedb() -> tuple[sqlite3.Connection, sqlite3.Cursor]:
connect = sqlite3.connect(DB_NAME)
cur = connect.cursor()
cur.execute("PRAGMA synchronous = 2;") # 同步
cur.execute("PRAGMA mmap_size = 10240000;") # 用内存映射
return connect, cur
class Room(Options):
roomId: int
roomName: str
index: int
unreadCount: int
priority: int
utime: int
users: str
lastMessage: str
at: str = "false"
autoDownload: bool | None = None
downloadPath: str | None = None
def __init__(self, room: list):
# logger.info(f"创建房间 {room}", tag="room")
kwargs = {
"roomId": int(round(float(room[0]))),
"roomName": room[1],
"index": int(room[2]),
"unreadCount": int(room[3]),
"priority": int(room[4]),
"utime": int(room[5]),
"users": room[6],
"lastMessage": room[7],
"at": room[8] if room[8] else None,
"autoDownload": room[9] if room[9] else None,
"downloadPath": room[10] if room[10] else None,
}
super().__init__(**kwargs)
# logger.debug(self.as_markdown())
# logger.debug(self.option())
def output(self) -> list:
return [
self.roomId,
self.roomName,
self.index,
self.unreadCount,
self.priority,
self.utime,
self.users,
self.lastMessage,
self.at,
self.autoDownload,
self.downloadPath,
]
def read_rooms():
sql_connect, sql_cur = get_sqlitedb()
sql_cur.execute("SELECT * from rooms")
return sql_cur.fetchall()
def room():
pg_connect, pg_cur = get_pgdb()
pg_cur.execute(f"SELECT * from {schem}.rooms")
exist_rooms = pg_cur.fetchall()
exist_rooms = [int(item[0]) for item in exist_rooms]
# print(exist_rooms)
data_room = read_rooms()
# roomId,roomName,index,unreadCount,priority,utime,users,lastMessage,at,autoDownload,downloadPath
for room in data_room:
# 预处理数据
room_data = Room(room)
if room_data.roomId not in exist_rooms:
logger.info(f"添加房间 {room_data.roomId}", tag="room")
pg_cur.execute(
f"INSERT INTO {schem}.rooms VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)",
room_data.output(),
)
else:
logger.info(f"房间 {room_data.roomId} 已存在, 跳过", tag="room")
pg_connect.commit()
pg_cur.close()
pg_connect.close()
def translate_sqlite_data(data: tuple | list) -> list:
if isinstance(data, tuple):
data = list(data)
data[-12] = bool(data[-12]) # deleted
data[-11] = bool(data[-11]) # system
data[-9] = bool(data[-9]) # reveal
data[-8] = bool(data[-8]) # flush
data[-3] = bool(data[-3]) # hide
return data
def verify():
connect, cur = get_pgdb()
cur.execute(f"SELECT * from {schem}.rooms")
exist_rooms = cur.fetchall()
# lastMessage
json_data = [item[-3] for item in exist_rooms]
len_count = 0
for item_index, item in enumerate(json_data):
logger.debug(item)
try:
json.loads(item)
except Exception as e:
len_count += 1
logger.error(f"json error {e}\n{item}", tag="room")
logger.info(len_count, tag="room")
def insert_message(data: list, t_id):
"""
这个函数是用于在子线程中运行的
"""
logger.trace(f"{data[0][-6]}", tag=f"start-{t_id}")
try:
conn, cur = get_pgdb()
[
cur.execute(
f"INSERT INTO {schem}.messages VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) on conflict do nothing",
item,
)
for item in data
]
conn.commit()
cur.close()
conn.close()
except Exception:
logger.error(f"error {traceback.format_exc()}", tag="done")
finally:
print(f"|{t_id}", end="", flush=True, file=sys.stdout)
logger.trace(f"{data[0][-6]}", tag=f"done-{t_id}")
def batch_get_message(sql_cur: sqlite3.Cursor, batch_size, room_id) -> Generator:
sql_cur.execute("SELECT * from messages where roomId = ?", (room_id,))
while data := sql_cur.fetchmany(batch_size):
yield data
yield []
def messages():
sql_connect, sql_cur = get_sqlitedb()
pg_connect, pg_cur = get_pgdb()
# 从 pg 获取 rooms
pg_cur.execute(f"SELECT * from {schem}.rooms")
rooms = (item[0] for item in pg_cur.fetchall())
# 获取每一个room在 sqlite 中的数据量
room_count: list[list[int]] = []
for a_room in rooms:
sql_cur.execute("SELECT COUNT(*) from messages where roomId = ?", (a_room,))
count = sql_cur.fetchall()
count = count[0][0]
logger.debug(f"room {a_room} count {count}", tag="message")
if count > 0:
room_count.append([a_room, count])
# sort 一遍
room_count = sorted(room_count, key=lambda x: x[1], reverse=False)
logger.info(room_count, tag="message")
batch_limit = 1000
for [room_id, count] in room_count:
sql_cur.execute(f"SELECT * from messages where roomId = {room_id}")
# if count > batch_limit:
# continue
if count > batch_limit:
# 分批处理
# 太痛苦了
logger.info(
f"room {room_id} count {count} 大于 {batch_limit}, 分 {count // batch_limit} 个包",
tag="message",
)
with ThreadPoolExecutor(max_workers=40) as pool:
tasks = []
t_id = 1
for data in batch_get_message(sql_cur, batch_limit, room_id):
data = [translate_sqlite_data(item) for item in data]
tasks.append(pool.submit(insert_message, data, t_id))
print(f":{t_id}", end="", flush=True)
t_id += 1
print()
logger.info(f"等待 {len(tasks)} 个任务完成", tag="message")
# while data := sql_cur.fetchmany(batch_limit):
# tasks.append(pool.submit(insert_message, data, pg_cur, pg_connect, t_id))
# t_id += 1
wait(tasks, return_when=ALL_COMPLETED)
print()
else:
logger.info(
f"room {room_id} count {count} 小于 {batch_limit}, 一次处理", tag="message"
)
data = sql_cur.fetchall()
# logger.trace(data, tag="message")
for item in data:
item = translate_sqlite_data(item)
pg_cur.execute(
f"INSERT INTO {schem}.messages VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) on conflict do nothing",
item,
)
pg_connect.commit()
pg_connect.commit()
# SELECT _id, "senderId", username, content, code, "timestamp", date, role, file, files, "time", "replyMessage", at, deleted, system, mirai, reveal, flash, title, "roomId", "anonymousId", anonymousflag, hide, bubble_id, subid
sql_cur.close()
sql_connect.close()
pg_cur.close()
pg_connect.close()
if __name__ == "__main__":
room()
# print(read_rooms())
# verify()
messages()
# reconfirm()
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment