Skip to content

Instantly share code, notes, and snippets.

@paralleltree
Created February 27, 2020 13:04
Show Gist options
  • Save paralleltree/abf08d2edfedca20c7c6ed8493106cde to your computer and use it in GitHub Desktop.
Save paralleltree/abf08d2edfedca20c7c6ed8493106cde to your computer and use it in GitHub Desktop.
import sys
import json
import argparse
from datetime import date, time, datetime
import itertools
import nfc
from binascii import hexlify
from struct import pack, unpack
class Suica:
SUICA_SYSTEM_CODE = 0x0003
SERVICE_SUICA_ATTRS = 0x008b
SERVICE_SUICA_GATE_HISTORY = 0x108f
SERVICE_SUICA_PURCHASE_HISTORY = 0x090f
def __init__(self, *, debug=False):
self.debug = debug
def on_connect(tag):
if self.debug:
print('connecting...', file=sys.stderr)
idm, pmm = tag.polling(system_code=self.SUICA_SYSTEM_CODE)
tag.idm, tag.pmm, tag.sys = idm, pmm, self.SUICA_SYSTEM_CODE
self.tag = tag
if self.debug:
print('initializing...', file=sys.stderr)
nfc.ContactlessFrontend('usb').connect(rdwr={
'on-connect': on_connect
})
if self.debug:
print('connection established: (%s).' % self.tag, file=sys.stderr)
def read_blocks(self, service_code):
for i in itertools.count():
try:
yield self.tag.read_without_encryption([self.create_service_code(service_code)], [nfc.tag.tt3.BlockCode(i)])
except nfc.tag.tt3.Type3TagCommandError:
break
def create_service_code(self, code):
return nfc.tag.tt3.ServiceCode(code >> 6, code & 0x3f)
def idm(self):
return hexlify(self.tag.idm).decode()
def balance(self):
return unpack('<H', next(self.read_blocks(self.SERVICE_SUICA_ATTRS))[11:13])[0]
def purchase_history_raw(self):
for data in self.read_blocks(self.SERVICE_SUICA_PURCHASE_HISTORY):
be = unpack('>BBBBHIHBHB', data)
le = unpack('<BBBBHIHBHB', data)
record = {
'vendor_type': be[0],
'event_type': be[1],
'payment_type': be[2],
'gate_type': be[3],
'date': date((be[4] >> 9) + 2000, (be[4] >> 5) & 0xf, be[4] & 0x1f),
'extra_data': be[5],
'balance': le[6],
'serial_number': be[8],
'special_data': be[9]
}
if be[1] in {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x14}:
record['origin_line_code'] = be[5] >> 24
record['origin_station_code'] = (be[5] >> 16) & 0xf
record['dest_line_code'] = (be[5] >> 8) & 0xf
record['dest_station_code'] = be[5] & 0xf
elif be[1] in {0x0d, 0x0f, 0x1f}:
record['bus_company_code'] = (be[5] >> 16) & 0xff
record['bus_station_code'] = be[5] & 0xff
elif be[1] in {0x46, 0x4b}:
record['time'] = datetime.combine(record['date'], time(be[5] >> 27, (be[5] >> 21) & 0x3f, ((be[5] >> 16) & 0x1f) * 2))
record['device_id'] = be[5] & 0xff
yield record
def gate_history_raw(self):
for data in self.read_blocks(self.SERVICE_SUICA_GATE_HISTORY):
be = unpack('>BBBBHHBBHHBB', data)
le = unpack('<BBBBHHBBHHBB', data)
yield {
'gate_type': be[0],
'line_code': be[2],
'station_code': be[3],
'device_code': be[4],
'time': datetime((be[5] >> 9) + 2000, (be[5] >> 5) & 0xf, be[5] & 0x1f, int(hex(be[6])[2:]), int(hex(be[7])[2:]), 0),
'nearest_pass_fare': le[9],
'nearest_pass_line_code': be[10],
'nearest_pass_station_code': be[11]
}
@classmethod
def event_str(self, code):
event_map = {
0x01: '改札出場',
0x02: 'チャージ',
0x03: '磁気券購入',
0x04: '精算',
0x05: '入場精算',
0x06: '改札窓口処理',
0x07: '新規発行',
0x08: '窓口控除',
0x0d: 'バス',
0x0f: 'バス',
0x11: '再発行処理',
0x13: '支払(新幹線利用)',
0x14: '入場時オートチャージ',
0x15: '出場時オートチャージ',
0x1f: 'バスチャージ',
0x23: 'バス路面電車企画券購入',
0x46: '物販',
0x48: '特典チャージ',
0x49: 'レジ入金',
0x4a: '物販取消',
0x4b: '入場物販',
0xc6: '現金併用物販',
0xcb: '入場現金併用物販',
0x84: '他社精算',
0x85: '他社入場精算',
}
return event_map[code]
@classmethod
def vendor_str(self, code):
vendor_map = {
0x03: '精算機',
0x05: '車載端末',
0x07: '券売機',
0x08: '券売機',
0x09: '入金機',
0x12: '券売機',
0x14: '券売機等',
0x15: '券売機等',
0x16: '改札機',
0x17: '簡易改札機',
0x18: '窓口端末',
0x19: '窓口端末',
0x1a: '改札端末',
0x1b: '携帯電話',
0x1c: '乗継精算機',
0x1d: '連絡改札機',
0x1f: '簡易入金機',
0x23: '新幹線改札機',
0x46: 'VIEW ALTTE',
0x48: 'VIEW ALTTE',
0xc7: '物販端末',
0xc8: '自販機',
}
return vendor_map[code]
@classmethod
def gate_str(self, code):
gate_map = {
0x20: '出場',
0x40: '出場(定期)',
0xa0: '入場',
0xc0: '入場(定期)',
0x00: '清算',
}
return gate_map[code]
try:
with open('stations.json') as f:
dic = json.load(f)['0']
except FileNotFoundError:
dic = {}
def resolve_station(line_code, station_code):
try:
return dic[str(line_code)][str(station_code)]
except KeyError:
return None
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('--raw', action='store_true')
parser.add_argument('--idm', action='store_true')
parser.add_argument('--balance', action='store_true')
parser.add_argument('--gate-history', action='store_true')
parser.add_argument('--purchase-history', action='store_true')
args = parser.parse_args()
suica = Suica(debug=args.verbose)
if args.idm:
print(suica.idm())
elif args.balance:
print(suica.balance())
elif args.gate_history:
for record in suica.gate_history_raw():
if args.raw:
print(record['time'].strftime('%Y-%m-%dT%H:%M'), record['gate_type'], record['line_code'], record['station_code'])
else:
print(record['time'].strftime('%Y-%m-%dT%H:%M'), Suica.gate_str(record['gate_type']), resolve_station(record['line_code'], record['station_code']), record['nearest_pass_fare'], resolve_station(record['nearest_pass_line_code'], record['nearest_pass_station_code']))
elif args.purchase_history:
for record in reversed(list(suica.purchase_history_raw())):
if args.raw:
print(record['serial_number'], record['date'].strftime('%Y-%m-%d'), record['balance'], record['event_type'], record['vendor_type'])
else:
print(record['serial_number'], record['date'].strftime('%Y-%m-%d'), record['balance'], Suica.event_str(record['event_type']), Suica.vendor_str(record['vendor_type']), end='')
if 'origin_line_code' in record:
print(' ({} -> {})'.format(resolve_station(record['origin_line_code'], record['origin_station_code']), resolve_station(record['dest_line_code'], record['dest_station_code'])))
elif 'time' in record:
print(' ({}) '.format(record['time'].strftime('%H:%M')))
else:
print()
else:
parser.print_help()
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment