Created
August 3, 2019 13:49
-
-
Save leetking/7fc64ee800bb6f1170f3bbf1b915f5fa to your computer and use it in GitHub Desktop.
a little python script to scrape chengdu metro infomation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import hashlib | |
import sqlite3 | |
import os | |
from functools import reduce | |
import requests | |
class DbCache: | |
# TODO implement get data by a range | |
def __init__(self, dbpath): | |
directory = os.path.dirname(dbpath) | |
if not os.path.exists(directory): | |
os.makedirs(directory) | |
self._db = sqlite3.connect(dbpath) | |
cur = self._db.cursor() | |
cur.execute("""create table if not exists `cdmetro`( | |
`time` integer not null unique, | |
`data` blob not null, | |
primary key(`time`))""") | |
self._db.commit() | |
cur.close() | |
def __getitem__(self, time): | |
import gzip | |
cur = self._db.cursor() | |
cur.execute("select `data` from `cdmetro` where `time` = ?", (time,)) | |
data = cur.fetchone() | |
cur.close() | |
return gzip.decompress(data[0]).decode() if data else "" | |
def __setitem__(self, time, data): | |
import gzip | |
cur = self._db.cursor() | |
cur.execute("insert into `cdmetro`(`time`,`data`) values(?, ?)", | |
(time, gzip.compress(data.encode()))) | |
self._db.commit() | |
cur.close() | |
def close(self): | |
self._db.close() | |
def get_data(): | |
from time import time | |
HOST = "http://webapp.cocc.cdmetro.cn:10080" | |
URL = "/api/realDmyjdSearch" | |
t_out = 7 # unit: second | |
headers = { | |
'platformType': 'android', | |
'Accept-APIVersion': '1.0', | |
'mobileBrand': 'OnePlus', | |
'appVersionNo': '77', | |
'tokenId': '', | |
'userId': '', | |
'sign': 'p@ssw0rd', | |
'platformVersion': '8.0.0', | |
'mobileStandard': 'WIFI', | |
'callTime': str(int(time()*1000)), | |
} | |
def generate_sign(headers): | |
md5 = hashlib.md5() | |
data = reduce(lambda x,y: y if x is '' else x+'&'+y, | |
[key+'='+headers[key] for key in sorted(headers)], '') | |
md5.update(data.encode()) | |
return md5.hexdigest() | |
headers['sign'] = generate_sign(headers) | |
headers['Content-Type'] = 'application/x-www-form-urlencoded' | |
headers['User-Agent'] = 'okhttp/3.4.1' | |
try: | |
res = requests.post(HOST+URL, headers=headers, timeout=t_out) | |
except requests.exceptions.Timeout: | |
print("request timeout.") | |
return '' | |
return res.text | |
def main(): | |
from time import sleep, time | |
import datetime | |
combine = datetime.datetime.combine | |
DBPATH = "./cdmetro.db3" | |
start = datetime.time(6, 5) # 6:05 | |
end = datetime.time(23, 30) # 23:30 | |
gap = 60 # unit: second | |
while True: | |
s = combine(datetime.date.today(), start) | |
e = combine(datetime.date.today(), end) | |
now = datetime.datetime.now() | |
# the train is running | |
if s <= now <= e: | |
db = DbCache(DBPATH) | |
tm = int(time()*1000) | |
data = get_data() | |
if data is not '': | |
db[tm] = data | |
print("get a datum.") | |
db.close() | |
now2 = datetime.datetime.now() | |
sleep(gap - (now2-now).total_seconds()) | |
print("Oops, I have to quit!") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment