Skip to content

Instantly share code, notes, and snippets.

Created August 5, 2014 07:34
Show Gist options
  • Save anonymous/dcc05221c7b227df0cb6 to your computer and use it in GitHub Desktop.
Save anonymous/dcc05221c7b227df0cb6 to your computer and use it in GitHub Desktop.
import os
import re
import zlib
import functools
import urllib.request
from mmap import mmap
from struct import unpack
from ipaddress import IPv4Address
from collections import namedtuple
class QQWayIPSeeker():
Location = namedtuple('Location', ['start_ip', 'end_ip', 'country', 'area'])
def __init__(self, path: str, charset: str='GBK'):
self.__path = path
self.__charset = charset
if os.path.exists(path):
with open(self.__path, 'rb') as fp:
self.__fp = mmap(fp.fileno(), 0, access=1)
self.__fp.seek(0)
self.__index_base_offset, = unpack('<L', self.__fp.read(4)) # 索引区基址
self.__index_count = (unpack('<L', self.__fp.read(4))[0] - self.__index_base_offset) // 7 # 索引数-1
else:
raise FileNotFoundError
def __lookup(self, ip: IPv4Address):
ip = int(ip)
start_index = 0
end_index = self.__index_count
if ip < self.__read_index(start_index)[0]:
raise LookupError('IP not found.')
elif ip >= self.__read_index(end_index)[0]:
start_index = end_index
else: # keep start_index <= ip < end_index
while (start_index + 1) < end_index:
middle_index = (start_index + end_index) // 2
if self.__read_index(middle_index)[0] <= ip:
start_index = middle_index
else:
end_index = middle_index
if ip > int(self[start_index].end_ip):
raise LookupError('IP not found.')
else:
return self[start_index]
def reload(self):
with open(self.__path, 'rb') as fp:
self.__fp.close()
self.__fp = mmap(fp.fileno(), 0, access=1)
@property
def meta(self):
meta_info = {
'version': self[self.__index_count].area[:-4],
'record_count': len(self),
}
meta_info['date'] = tuple(int(x) for x in re.findall('\d+', meta_info['version']))
return meta_info
def __len__(self):
return self.__index_count + 1
@functools.lru_cache()
def __getitem__(self, ip):
if isinstance(ip, int):
if 0 <= ip <= self.__index_count:
start_ip, offset = self.__read_index(ip)
start_ip = IPv4Address(start_ip)
self.__fp.seek(offset)
end_ip, = unpack('<L', self.__fp.read(4))
end_ip = IPv4Address(end_ip)
country, area = self.__read_record()
area = None if area == ' CZ88.NET' else area
return QQWayIPSeeker.Location(start_ip, end_ip, country, area)
else:
raise KeyError('index out of range.')
elif isinstance(ip, str):
return self.__lookup(IPv4Address(ip))
elif isinstance(ip, IPv4Address):
return self.__lookup(ip)
else:
raise TypeError('wrong key type.')
def __read_index(self, index: int):
self.__fp.seek(self.__index_base_offset + 7 * index)
return unpack('<LL', self.__fp.read(7) + b'\x00')
def __read_record(self, only_one: bool=False):
mode, = unpack('B', self.__fp.read(1))
if mode in [0x01, 0x02]:
offset_record, = unpack('<L', self.__fp.read(3) + b'\x00')
offset = self.__fp.tell()
self.__fp.seek(offset_record)
record = self.__read_record(True)
self.__fp.seek(offset)
if not only_one and mode == 0x02:
record[1] = self.__read_record(True)[0]
else:
self.__fp.seek(-1, 1)
record = [self.__read_c_string(), None]
if not only_one:
record[1] = self.__read_record(True)[0]
return record
def __read_c_string(self):
if self.__fp.tell() == 0:
return 'unknown'
start = self.__fp.tell()
end = self.__fp.find(b'\x00')
if end < 0:
raise Exception('fail to read C string')
self.__fp.seek(end + 1)
return self.__fp[start:end].decode(self.__charset, errors='replace')
def __del__(self):
self.__fp.close()
class QQWayIPSeekerUpdate():
@staticmethod
def __download_meta():
url = 'http://update.cz88.net/ip/copywrite.rar'
with urllib.request.urlopen(url) as fp:
data = fp.read()
return QQWayIPSeekerUpdate.__unpack_meta(data)
@staticmethod
def __download_database(path, key):
url = 'http://update.cz88.net/ip/qqwry.rar'
with urllib.request.urlopen(url) as fp:
data = fp.read()
data = QQWayIPSeekerUpdate.__decipher_data(data, key)
with open(path, 'wb') as fp:
fp.write(data)
@staticmethod
def __unpack_meta(data):
# http://microcai.org/2014/05/11/qqwry_dat_download.html
sign, version, _, size, _, key, text, link = unpack('<4sIIIII128s128s', data)
text = text.rstrip(b'\x00').decode('GB18030')
return {
'version': version,
'size': size,
'key': key,
'text': text,
'date': tuple(int(x) for x in re.findall('\d+', text)),
'link': link.rstrip(b'\x00').decode('GB18030'),
}
@staticmethod
def __decipher_data(data: bytes, key: int):
data = bytearray(data)
for index in range(0, 0x200):
key *= 0x805
key += 1
key &= 0xFF
data[index] ^= key
return zlib.decompress(data)
@staticmethod
def check_update(path: str):
meta = QQWayIPSeekerUpdate.__download_meta()
return meta['date'] >= QQWayIPSeeker(path).meta['date']
@staticmethod
def update(path: str):
if os.path.exists(path):
QQWayIPSeekerUpdate.upgrade(path)
else:
QQWayIPSeekerUpdate.download(path)
@staticmethod
def upgrade(path: str):
meta = QQWayIPSeekerUpdate.__download_meta()
if meta['date'] >= QQWayIPSeeker(path).meta['date']:
QQWayIPSeekerUpdate.__download_database(path, meta['key'])
@staticmethod
def download(path: str):
meta = QQWayIPSeekerUpdate.__download_meta()
QQWayIPSeekerUpdate.__download_database(path, meta['key'])
class SeventeenMonIPSeeker():
def __init__(self, path: str):
self.__path = path
if not os.path.exists(path):
raise FileNotFoundError
with open(self.__path, 'rb') as fp:
self.__fp = mmap(fp.fileno(), 0, access=1)
self.__fp.seek(0)
self.__data_offset = 4
self.__index_offset, = unpack('>L', self.__fp.read(4))
self.__max_comp_length = self.__data_offset + (self.__index_offset - 1028)
def __locate(self, ip: bytes):
begin, = unpack('<L', self.__fp.read(4))
begin = self.__data_offset + (begin * 8) + 1024
while begin < self.__max_comp_length:
self.__fp.seek(begin)
if self.__fp.read(4) >= ip:
offset, = unpack('<L', self.__fp.read(3) + b'\0')
length = int.from_bytes(self.__fp.read(1), 'big')
return offset, length
begin += 8
def __lookup(self, ip: IPv4Address):
ip = int(ip).to_bytes(4, 'big')
self.__fp.seek(self.__data_offset + (ip[0] * 4))
offset, length = self.__locate(ip)
if offset == 0:
return
self.__fp.seek(self.__index_offset + offset - 1024)
return self.__fp.read(length).decode('UTF-8').split('\t')
@functools.lru_cache()
def __getitem__(self, ip):
if isinstance(ip, int):
return self.__lookup(IPv4Address(ip))
elif isinstance(ip, str):
return self.__lookup(IPv4Address(ip))
elif isinstance(ip, IPv4Address):
return self.__lookup(ip)
else:
raise TypeError('wrong key type.')
def reload(self):
with open(self.__path, 'rb') as fp:
self.__fp.close()
self.__fp = mmap(fp.fileno(), 0, access=1)
def __del__(self):
self.__fp.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment