Created
February 1, 2021 23:40
-
-
Save CFSworks/db19c10f674ea82a1c1ff7314780790a to your computer and use it in GitHub Desktop.
Nintendo LP2P decryption tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import re | |
import argparse | |
import pathlib | |
import binascii | |
from dataclasses import dataclass | |
from scapy.all import * | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
from cryptography.exceptions import InvalidTag | |
decrypt_block = lambda k,b: Cipher(algorithms.AES(k), | |
modes.ECB()).decryptor().update(b) | |
class SwitchKeys: | |
def __init__(self): | |
self.keys = {} | |
self.lp2p_keys = {} | |
def load(self, filename): | |
with open(filename, 'r') as f: | |
r = re.compile(r'(\w+)\s*=\s*([0-9A-Fa-f]+)$') | |
for line in f: | |
m = r.match(line) | |
if m: | |
self.keys[m.group(1)] = binascii.unhexlify(m.group(2)) | |
def derive_lp2p(self): | |
kek_8 = decrypt_block(self.keys['master_key_08'], | |
self.keys['aes_kek_generation_source']) | |
# Trigger KeyError if we don't have at least these | |
aes_key_generation_source = self.keys['aes_key_generation_source'] | |
self.keys['lp2p_hardcoded_key_01'] | |
self.keys['lp2p_hardcoded_key_02'] | |
r = re.compile('lp2p_hardcoded_key_([0-9a-fA-F]+)$') | |
for k,v in self.keys.items(): | |
m = r.match(k) | |
if not m: continue | |
key = decrypt_block(decrypt_block(kek_8, v), | |
aes_key_generation_source) | |
self.lp2p_keys[int(m.group(1), 16)] = key | |
@dataclass | |
class IE0: | |
security_type: int | |
static_key: int | |
lcl_com_id: bytes | |
wrapped_key: bytes | |
nonce: bytes | |
mac: bytes | |
body: bytes | |
VERSION = 0x20 | |
@classmethod | |
def parse(cls, data: bytes): | |
if len(data) < 0x1c or data[0x0] != cls.VERSION: | |
return | |
security_type = data[0x01] | |
static_key = data[0x02] | |
lcl_com_id = data[0x04:0x0c][::-1] | |
wrapped_key = data[0x0c:0x1c] | |
if security_type == 0x02: | |
if len(data) < 0x30: | |
return | |
nonce = data[0x1c:0x20] | |
mac = data[0x20:0x30] | |
body = data[0x30:] | |
else: | |
nonce = None | |
mac = None | |
body = data[0x1c:] | |
return cls( | |
security_type=security_type, | |
static_key=static_key, | |
lcl_com_id=lcl_com_id, | |
wrapped_key=wrapped_key, | |
nonce=nonce, | |
mac=mac, | |
body=body, | |
) | |
def pack(self): | |
hdr = bytearray(0x1c) | |
hdr[0x0] = self.VERSION | |
hdr[0x1] = self.security_type | |
hdr[0x2] = self.static_key | |
hdr[0x4:0xc] = self.lcl_com_id[::-1] | |
hdr[0xc:0x1c] = self.wrapped_key | |
if self.security_type == 0x02: | |
hdr.extend(self.nonce) | |
hdr.extend(self.mac) | |
hdr.extend(self.body) | |
return bytes(hdr) | |
def try_decrypt(self, gcm): | |
if self.security_type != 0x02: | |
return False | |
iv = self.nonce + b'\0'*8 | |
ad = self.pack()[:0x20] | |
try: | |
body = gcm.decrypt(iv, self.body+self.mac, ad) | |
except InvalidTag: | |
return False | |
self.body = body | |
self.mac = None | |
self.nonce = None | |
self.security_type = 0x01 | |
return True | |
class LP2PDecryptor: | |
def __init__(self, lp2p_keys: dict, psks: list): | |
self.lp2p_keys = lp2p_keys | |
self.psks = psks | |
self.learned_ccmp_keys = set() | |
self.good_ie_0_stats = set() | |
self.bad_ie_0_stats = set() | |
self.bad_ie_1_stats = set() | |
@staticmethod | |
def key_derivation(key: bytes, tweak: bytes): | |
hs256 = lambda k,m: hmac.HMAC(digestmod='sha256', key=k, msg=m).digest() | |
two_keys = hs256(hs256(key, tweak), b'\x02\x01') | |
return two_keys[:0x10], two_keys[0x10:] | |
def decrypt_raw(self, data: bytes, *, use_key_d: bool = False): | |
ie = IE0.parse(data) | |
if not ie: | |
return data, None | |
assert ie.pack() == data | |
if ie.security_type != 0x02 or ie.static_key not in self.lp2p_keys: | |
return data, None | |
master_key = decrypt_block(self.lp2p_keys[ie.static_key], ie.wrapped_key) | |
key_b, key_d = self.key_derivation(master_key, ie.lcl_com_id) | |
key = key_d if use_key_d else key_b | |
gcm = AESGCM(key) | |
if ie.try_decrypt(gcm): | |
return ie.pack(), master_key | |
else: | |
return data, None | |
@staticmethod | |
def iter_nintendo_elts(pkt): | |
elt = pkt.getlayer(Dot11EltVendorSpecific) | |
while elt: | |
if elt.oui == 0x0022aa: | |
yield elt | |
elt = elt.payload.getlayer(Dot11EltVendorSpecific) | |
def decrypt_ie_0(self, pkt): | |
for elt in self.iter_nintendo_elts(pkt): | |
if elt.info.startswith(b'\x06\x00'): | |
break | |
else: | |
return None | |
body, key = self.decrypt_raw(elt.info[2:]) | |
if key: | |
self.good_ie_0_stats.add(elt.info) | |
elt.info = elt.info[:2] + body | |
elt.len = None | |
return key | |
else: | |
self.bad_ie_0_stats.add(elt.info) | |
def decrypt_ie_1(self, pkt, master_key): | |
for elt in self.iter_nintendo_elts(pkt): | |
if elt.info.startswith(b'\x06\x01'): | |
break | |
else: | |
return | |
if len(elt.info) < 0x16: | |
return | |
stats_key = master_key + elt.info | |
if stats_key in self.bad_ie_1_stats: | |
# We already failed this one, don't even try | |
return | |
ad = elt.info[2:6] | |
iv = elt.info[2:6] + b'\0'*8 | |
mac = elt.info[6:0x16] | |
body = elt.info[0x16:] | |
for psk in self.psks: | |
key_a, key_c = self.key_derivation(master_key, psk) | |
gcm = AESGCM(key_c) | |
try: | |
body = gcm.decrypt(iv, body+mac, ad) | |
except InvalidTag: | |
continue | |
else: | |
break | |
else: | |
self.bad_ie_1_stats.add(stats_key) | |
return | |
elt.info = elt.info[:2] + body | |
elt.len = None | |
self.learned_ccmp_keys.add(key_a) | |
@staticmethod | |
def make_mutable_copy(pkt): | |
pkt = pkt.copy() | |
pkt.len = None | |
if Dot11FCS in pkt: | |
pkt[Dot11FCS].fcs = None | |
return pkt | |
def filter_packet(self, pkt): | |
if Dot11 in pkt and pkt[Dot11].subtype == 13 and Raw in pkt: | |
# Action frame | |
if pkt[Raw].load.startswith(b'\x7f\x00\x22\xaa\x06\x00'): | |
load = pkt[Raw].load | |
data, key = self.decrypt_raw(load[6:], use_key_d=True) | |
if key: | |
pkt = self.make_mutable_copy(pkt) | |
pkt[Raw].load = load[:6] + data | |
elif Dot11EltVendorSpecific in pkt: | |
pkt_copy = self.make_mutable_copy(pkt) | |
master_key = self.decrypt_ie_0(pkt_copy) | |
if master_key: | |
pkt = pkt_copy | |
self.decrypt_ie_1(pkt, master_key) | |
return pkt | |
def main(): | |
try: | |
from tqdm import tqdm | |
except ModuleNotFoundError: | |
tqdm = lambda x, **kw: x | |
def psk(x): | |
x = binascii.unhexlify(x) | |
if len(x) != 0x20: raise ValueError('bad length') | |
return x | |
parser = argparse.ArgumentParser( | |
description="""\ | |
This script will process a (raw, 802.11) WiFi packet capture | |
containing Nintendo Switch LP2P traffic and decrypt the Beacon and | |
Action frames found within, writing plaintext versions to the output. | |
It can additionally accept one or more 0x20-byte LP2P pre-shared keys | |
(PSKs) and use them to decrypt the AdvertiseData field and derive the | |
CCMP keys used to encrypt data frames. This script will not perform | |
CCMP decryption. | |
""" | |
) | |
parser.add_argument('input', | |
help='Input PCAP(NG) file containing 802.11 traffic') | |
parser.add_argument('output', | |
help='Output PCAP file to receive decrypted traffic') | |
parser.add_argument('--psk', '-K', action='append', type=psk, default=[], | |
help='A pre-shared key which will be used to attempt ' | |
'AdvertiseData decryption and CCMP derivation; ' | |
'you may specify this option multiple times.') | |
parser.add_argument('--keyfile', '-f', action='append', default=[], | |
help='The path to your prod.keys file; ' | |
'you may specify this option multiple times.') | |
args = parser.parse_args() | |
if not args.keyfile: | |
args.keyfile.append(pathlib.Path.home().joinpath('.switch/prod.keys')) | |
sk = SwitchKeys() | |
for path in args.keyfile: | |
try: | |
sk.load(path) | |
except OSError as e: | |
print(f'[!] Could not load keyfile: {e}') | |
sys.exit(1) | |
try: | |
sk.derive_lp2p() | |
except KeyError as e: | |
key, = e.args | |
print(f'[!] Missing key: {key}') | |
sys.exit(1) | |
lp2p = LP2PDecryptor(sk.lp2p_keys, args.psk) | |
with PcapNgReader(args.input) as i: | |
with PcapWriter(args.output) as o: | |
for pkt in tqdm(i, unit='pkt'): | |
if pkt.time > (1<<32): | |
pkt.time /= 1000 # Workaround for reader bug | |
o.write(lp2p.filter_packet(pkt)) | |
pm = '-+'[bool(lp2p.good_ie_0_stats)] | |
print(f'[{pm}] Decrypted Beacons for {len(lp2p.good_ie_0_stats)} networks') | |
if lp2p.bad_ie_0_stats: | |
print(f'[!] Failed to decrypt Beacons for {len(lp2p.bad_ie_0_stats)} networks') | |
if not lp2p.good_ie_0_stats: | |
print(' Check that the keys in your keyfile are correct!') | |
if lp2p.bad_ie_1_stats: | |
print(f'[-] PSKs for {len(lp2p.bad_ie_1_stats)} networks not provided') | |
pm = '-+'[bool(lp2p.learned_ccmp_keys)] | |
cn = ':' if lp2p.learned_ccmp_keys else '' | |
print(f'[{pm}] Derived {len(lp2p.learned_ccmp_keys)} CCMP keys{cn}') | |
for key in lp2p.learned_ccmp_keys: | |
print(f' {binascii.hexlify(key).decode("ascii")}') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment