Created
April 5, 2025 03:25
-
-
Save marler8997/dc21623a31b3179b67257ee50ee36491 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import datetime | |
import hashlib | |
import filecmp | |
import glob | |
import json | |
import os | |
import pathlib | |
import re | |
import shutil | |
import subprocess | |
import sys | |
import tempfile | |
import urllib | |
import zipfile | |
CHANNEL_MANIFEST_URL_RELEASE = "https://aka.ms/vs/17/release/channel" | |
CHANNEL_MANIFEST_URL_PREVIEW = "https://aka.ms/vs/17/pre/channel" | |
VS_MANIFEST_CHANNEL_ID_RELEASE = "Microsoft.VisualStudio.Manifests.VisualStudio" | |
VS_MANIFEST_CHANNEL_ID_PREVIEW = "Microsoft.VisualStudio.Manifests.VisualStudioPreview" | |
ALL_HOSTS = ("x64", "x86", "arm64") | |
ALL_TARGETS = ("x64", "x86", "arm", "arm64") | |
IS_WINDOWS = (sys.platform == "win32") | |
def log(msg): | |
print(msg, file=sys.stderr) | |
def remove(path): | |
try: | |
os.remove(path) | |
except FileNotFoundError: | |
pass | |
def rmtree(path): | |
if os.path.exists(path): | |
if IS_WINDOWS: | |
os.system('rmdir /S /Q "{}"'.format(path)) | |
else: | |
shutil.rmtree(path) | |
def hashFile(filepath): | |
with open(filepath, "rb") as f: | |
return hashlib.sha256(f.read()).hexdigest() | |
def hashDirectory(dirpath): | |
entries = [] | |
for root, dirs, files in os.walk(dirpath): | |
dirs.sort() | |
files.sort() | |
rel_path = os.path.relpath(root, dirpath) | |
if rel_path == ".": | |
rel_path = "" | |
dir_entry = (rel_path, "DIR") | |
entries.append(dir_entry) | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
rel_filepath = os.path.join(rel_path, filename) | |
file_hash = hashFile(filepath) | |
entries.append((rel_filepath, file_hash)) | |
entries_str = "\n".join(f"{path}\t{hash_val}" for path, hash_val in sorted(entries)) | |
return hashlib.sha256(entries_str.encode()).hexdigest() | |
def download(url, sha256, filepath): | |
if os.path.exists(filepath): | |
if not sha256: | |
log(f"{filepath}: already downloaded") | |
return | |
computed_sha256 = hashFile(filepath) | |
if computed_sha256 == sha256: | |
log(f"{filepath}: already downloaded") | |
return | |
log(f"{filepath}: already downloaded but hash does not match, removing it") | |
log(f" expected: {sha256}") | |
log(f" computed: {computed_sha256}") | |
os.remove(filepath) | |
log(f"{filepath}: downloading from {url}...") | |
dirname = os.path.dirname(filepath) | |
if dirname: | |
os.makedirs(os.path.dirname(filepath), exist_ok=True) | |
tmp_filepath = filepath + ".downloading" | |
remove(tmp_filepath) | |
run(None, ["curl", "--location", url, "--output", tmp_filepath]) | |
if sha256: | |
computed_sha256 = hashFile(tmp_filepath) | |
if sha256 != computed_sha256: | |
log(f"{filepath}: hash mismatch after download") | |
log(f" expected: {sha256}") | |
log(f" computed: {computed_sha256}") | |
raise | |
os.rename(tmp_filepath, filepath) | |
def run(extra_env, *args, **kwargs): | |
if not 'check' in kwargs: | |
kwargs['check'] = True | |
cmd_prefix = '' | |
cmd_suffix = '' | |
cd_prefix = '' | |
if 'cwd' in kwargs: | |
if IS_WINDOWS: | |
cmd_prefix = 'cmd /c "' | |
cmd_suffix = '"' | |
else: | |
cmd_prefix = '(' | |
cmd_suffix = ')' | |
# TODO: should we use & or &&? | |
cd_prefix = f"cd {kwargs['cwd']} && " | |
env_prefix = '' | |
if extra_env: | |
kwargs['env'] = {**os.environ, **extra_env} | |
if IS_WINDOWS: | |
cmd_prefix = 'cmd /c "' | |
cmd_suffix = '"' | |
for e in extra_env: | |
env_prefix += f"set {e}={extra_env[e]} && " | |
else: | |
for e in extra_env: | |
env_prefix += f"{e}={extra_env[e]} " | |
log("[RUN] " | |
+ cmd_prefix + cd_prefix + env_prefix | |
+ subprocess.list2cmdline(*args) | |
+ cmd_suffix) | |
sys.stdout.flush() | |
return subprocess.run(*args, **kwargs) | |
def update_json_file(filepath, obj): | |
if os.path.exists(filepath): | |
log(f"{filepath}: already exists") | |
return | |
tmp = filepath + ".writing" | |
remove(tmp) | |
with open(tmp, "w") as f: | |
f.write(json.dumps(obj, indent=2)) | |
os.rename(tmp, filepath) | |
log(f"{filepath}: updated") | |
def getInstallDir(args): | |
if not "install_directory" in args: | |
sys.exit("missing --install-directory cmdline option") | |
return args.install_directory | |
def getTargets(args): | |
if not args.targets: | |
sys.exit("missing --targets cmdline option") | |
targets = args.targets.split(',') | |
for target in targets: | |
if target not in ALL_TARGETS: | |
sys.exit(f"Unknown target architecture '{target}'") | |
return targets | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--download-directory", required=True, action="store") | |
parser.add_argument("--host", help=f"Host architecture", choices=ALL_HOSTS) | |
parser.add_argument("--targets", help=f"Target architectures, comma separated ({','.join(ALL_TARGETS)})") | |
parser.add_argument("--install-directory", action="store") | |
parser.add_argument("--install-msvc", action="store") | |
parser.add_argument("--install-sdk", action="store") | |
parser.add_argument("--sdk-include-signing", action="store_true") | |
parser.add_argument("--install-source", nargs=3, metavar=("NAME", "PATH", "DEST_SUBDIR"), help="Install files from the given PATH to the given DEST_DIR") | |
parser.add_argument("--accept-license", action="store_true", help="Automatically accept license") | |
parser.add_argument("--list-versions", action="store_true", help="Show available MSVC and Windows SDK versions") | |
parser.add_argument("--list-all-packages", action="store_true", help="List all available packages") | |
parser.add_argument("--list-msvc-packages", action="store", help="List all msvc packages for configured host/targets and the given version") | |
parser.add_argument("--list-sdk-installers", action="store", help="List all the SDK installers for the configured targets and given version") | |
parser.add_argument("--force-channel-update", action="store_true", help="Don't re-use a previous channel manifest") | |
# parser.add_argument("--preview", action="store_true", help="Use preview channel for Preview versions") | |
args = parser.parse_args() | |
download_dir = args.download_directory | |
# we insert the date so it will automatically redownload/update if it's a bit old | |
today = datetime.datetime.now().strftime("%Y-%m-%d") | |
channel_manifest_basename = f"msvc-channel-{today}.json" | |
channel_manifest_filepath = os.path.join(download_dir, channel_manifest_basename) | |
if args.force_channel_update: | |
if os.path.exits(channel_manifest_filepath): | |
log("deleting channel manifest {channel_manifest_filepath} due to --force-channel-update") | |
os.remove(channel_manifest_filepath) | |
if os.path.exists(channel_manifest_filepath): | |
log(f"{channel_manifest_filepath}: up-to-date within the last 24 hours") | |
else: | |
log(f"{channel_manifest_filepath}: downloading") | |
download(CHANNEL_MANIFEST_URL_RELEASE, None, channel_manifest_filepath) | |
# TODO: should we clean up old channel manifests? | |
with open(channel_manifest_filepath, "rb") as f: | |
channel_manifest_string = f.read() | |
channel_manifest_hash = hashlib.sha256(channel_manifest_string).hexdigest() | |
channel_manifest = json.loads(channel_manifest_string) | |
# save a "pretty" "inspectable" version | |
pretty_channel_manifest = os.path.join(tempfile.gettempdir(), f"msvc-channel-{channel_manifest_hash}.json") | |
update_json_file(pretty_channel_manifest, channel_manifest) | |
manifests_item = None | |
for item in channel_manifest["channelItems"]: | |
if item["id"] == VS_MANIFEST_CHANNEL_ID_RELEASE: | |
manifests_item = item | |
if not manifests_item: | |
sys.exit(f"error: no channelItem with id '{VS_MANIFEST_CHANNEL_ID_RELEASE}' in {pretty_channel_manifest}") | |
payloads = manifests_item["payloads"] | |
if len(payloads) != 1: | |
sys.exit(f"{pretty_channel_manifest}: expected manifests item to have 1 payload but has {len(payloads)}") | |
payload = payloads[0] | |
vs_manifest_sha256 = payload["sha256"].lower() | |
manifests_file = os.path.join(download_dir, f"vs-manifest-{vs_manifest_sha256}.json") | |
# NOTE: for some reason the sha256 in the manifest doesn't match the actual sha256 of | |
# the file content | |
download(payload["url"], None, manifests_file) | |
with open(manifests_file, "rb") as f: | |
vs_manifest = json.load(f) | |
if args.install_directory: | |
os.makedirs(args.install_directory, exist_ok=True) | |
update_json_file(os.path.join(args.install_directory, f"vs-manifest-{vs_manifest_sha256}.json"), vs_manifest) | |
packages = {} | |
for pkg in vs_manifest["packages"]: | |
pkg_id = pkg["id"].lower() | |
packages.setdefault(pkg_id, []).append(pkg) | |
msvc_version_to_pkg_id = {} | |
sdk_version_to_pkg_id = {} | |
vc_prefix = "Microsoft.VC.".lower() | |
sdk_win10_prefix = "Microsoft.VisualStudio.Component.Windows10SDK.".lower() | |
sdk_win11_prefix = "Microsoft.VisualStudio.Component.Windows11SDK.".lower() | |
for pkg_id, p in packages.items(): | |
if pkg_id.startswith(vc_prefix) and pkg_id.endswith(".Tools.HostX64.TargetX64.base".lower()): | |
if version := extractVersion(pkg_id[len(vc_prefix):]): | |
msvc_version_to_pkg_id[version] = pkg_id | |
elif pkg_id.startswith(sdk_win10_prefix): | |
if version := extractVersion(pkg_id[len(sdk_win10_prefix):]): | |
sdk_version_to_pkg_id[version] = pkg_id | |
elif pkg_id.startswith(sdk_win11_prefix): | |
if version := extractVersion(pkg_id[len(sdk_win11_prefix):]): | |
sdk_version_to_pkg_id[version] = pkg_id | |
if False: | |
getPkgFiles(packages) | |
msvc_versions = sorted(msvc_version_to_pkg_id.keys()) | |
sdk_versions = sorted(sdk_version_to_pkg_id.keys()) | |
sdk_choices = ", ".join(sdk_versions) | |
if args.list_versions: | |
print("MSVC versions:", ", ".join(msvc_versions)) | |
print("Windows SDK versions:", sdk_choices) | |
sys.exit(0) | |
if args.list_all_packages: | |
for pkg_id in packages: | |
print(f"{pkg_id}") | |
sys.exit(0) | |
if args.list_msvc_packages: | |
msvc_version = args.list_msvc_packages | |
checkMsvcVersion(msvc_version_to_pkg_id, msvc_version) | |
host = args.host if args.host else sys.exit("missing --host cmdline option") | |
targets = getTargets(args) | |
pkgs = getMsvcPkgs(msvc_version, host, targets, packages) | |
for pkg in pkgs: | |
prefix = "PRESENT" if (pkg in packages) else "MISSING" | |
print(f"{prefix} : {pkg}") | |
sys.exit(0) | |
if args.list_sdk_installers: | |
sdk_version = args.list_sdk_installers | |
sdk_pkg = getSdkPkg(packages, sdk_choices, sdk_version_to_pkg_id, sdk_version) | |
targets = getTargets(args) | |
installers = getSdkInstallers(targets, args.sdk_include_signing, sdk_pkg) | |
for installer in installers: | |
payload = first(sdk_pkg["payloads"], lambda p: p["fileName"] == f"Installers\\{installer}") | |
prefix = "PRESENT" if payload else "MISSING" | |
print(f"{prefix} : {installer}") | |
sys.exit(0) | |
tools = first(channel_manifest["channelItems"], lambda x: x["id"] == "Microsoft.VisualStudio.Product.BuildTools") | |
resource = first(tools["localizedResources"], lambda x: x["language"] == "en-us") | |
license = resource["license"] | |
if not args.accept_license: | |
accept = input(f"Do you accept Visual Studio license at {license} [Y/N] ? ") | |
if not accept or accept[0].lower() != "y": | |
sys.exit(0) | |
def resolve_config(install_dir, name, args_value): | |
config_file = pathlib.Path(args.install_directory) / "config" / name | |
if args_value: | |
if config_file.exists(): | |
saved_config = config_file.read_bytes().decode("utf8") | |
if saved_config != args_value: | |
sys.exit(f"cannot change {name} from '{saved_config}' to '{args_value}'") | |
else: | |
config_file.parent.mkdir(exist_ok=True) | |
config_file.write_bytes(args_value.encode("utf8")) | |
if config_file.exists(): | |
return config_file.read_bytes().decode("utf8") | |
return None | |
if args.install_directory: | |
host = resolve_config(args.install_directory, "host", args.host) | |
msvc_version = resolve_config(args.install_directory, "msvc-version", args.install_msvc) | |
sdk_version = resolve_config(args.install_directory, "sdk-version", args.install_sdk) | |
sdk_include_signing_string = str(args.sdk_include_signing) | |
sdk_include_signing = bool(resolve_config(args.install_directory, "sdk-include-signing", sdk_include_signing_string)) | |
if args.install_msvc: | |
msvc_version = args.install_msvc | |
checkMsvcVersion(msvc_version_to_pkg_id, msvc_version) | |
host = args.host if args.host else sys.exit("missing --host cmdline option") | |
targets = getTargets(args) | |
pkgs = getMsvcPkgs(msvc_version, host, targets, packages) | |
for pkg_id in pkgs: | |
installMsvcPkg(download_dir, getInstallDir(args), packages, pkg_id) | |
if args.install_sdk: | |
sdk_version = args.install_sdk | |
sdk_pkg = getSdkPkg(packages, sdk_choices, sdk_version_to_pkg_id, sdk_version) | |
targets = getTargets(args) | |
installers = getSdkInstallers(targets, sdk_include_signing, sdk_pkg) | |
msis = [] | |
cab_payloads = set() | |
for installer in installers: | |
payload = first(sdk_pkg["payloads"], lambda p: p["fileName"] == f"Installers\\{installer}") | |
if not payload: | |
print(f"sdk installer '{installer}' does not exist") | |
continue | |
print(f"found sdk installer '{installer}'") | |
msi = Msi(download_dir, sdk_version, os.path.basename(payload["fileName"]), payload["sha256"].lower()) | |
download(payload["url"], msi.sha256, msi.filepath) | |
msis.append(msi) | |
with open(msi.filepath, "rb") as f: | |
for cab in getMsiCabs(sdk_pkg, f.read()): | |
cab_payloads.add(cab) | |
for cab in cab_payloads: | |
cab_file = os.path.join(download_dir, sdk_version, cab.basename) | |
download(cab.url, cab.sha256, cab_file) | |
install_dir = getInstallDir(args) | |
install_dir_abs = os.path.abspath(install_dir) | |
for msi in msis: | |
installMsi(install_dir_abs, msi) | |
if args.install_source: | |
name, source_path, dest_path = args.install_source | |
installSource(getInstallDir(args), name, source_path, dest_path) | |
if args.install_msvc or args.install_sdk: | |
install_dir = pathlib.Path(getInstallDir(args)) | |
targets = getTargets(args) | |
msvc_glob = first((install_dir / "VC/Tools/MSVC").glob("*")) | |
msvc_fs_version = msvc_glob.name if msvc_glob else None | |
sdk_glob = first((install_dir / "Windows Kits/10/bin").glob("*")) | |
sdk_fs_version = sdk_glob.name if sdk_glob else None | |
if msvc_fs_version: | |
vctools_dir = fr"VC\Tools\MSVC\{msvc_fs_version}" | |
else: | |
vctools_dir = None | |
include_dirs = [] | |
if vctools_dir: | |
include_dirs += [ | |
fr"{vctools_dir}\include", | |
] | |
if sdk_fs_version: | |
include_dirs += [ | |
fr"Windows Kits\10\Include\{sdk_fs_version}\ucrt", | |
fr"Windows Kits\10\Include\{sdk_fs_version}\shared", | |
fr"Windows Kits\10\Include\{sdk_fs_version}\um", | |
fr"Windows Kits\10\Include\{sdk_fs_version}\winrt", | |
fr"Windows Kits\10\Include\{sdk_fs_version}\cppwinrt", | |
] | |
env_no_target = { | |
"WindowsSdkBinPath": "Windows Kits\\10\\bin", | |
"INCLUDE": include_dirs, | |
} | |
if host: | |
env_no_target["VSCMD_ARG_HOST_ARCH"] = host | |
if msvc_fs_version: | |
env_no_target["VCToolsVersion"] = msvc_fs_version | |
if vctools_dir: | |
env_no_target["VCToolsInstallDir"] = vctools_dir | |
with open(os.path.join(install_dir, "env.json"), "w") as f: | |
f.write(json.dumps(env_no_target, indent=4)) | |
for target in targets: | |
path_dirs = [] | |
if host and vctools_dir: | |
path_dirs += [fr"{vctools_dir}\bin\Host{host}\{target}"] | |
if sdk_fs_version: | |
path_dirs += [ | |
fr"Windows Kits\10\bin\{sdk_fs_version}\{host}", | |
fr"Windows Kits\10\bin\{sdk_fs_version}\ucrt", | |
] | |
lib_dirs = [] | |
if vctools_dir: | |
lib_dirs += [ | |
fr"{vctools_dir}\lib\{target}", | |
] | |
if sdk_fs_version: | |
lib_dirs += [ | |
fr"Windows Kits\10\Lib\{sdk_fs_version}\ucrt\{target}", | |
fr"Windows Kits\10\Lib\{sdk_fs_version}\um\{target}", | |
] | |
generate_vcvars( | |
install_dir=install_dir, | |
host=host, | |
msvc_fs_version=msvc_fs_version, | |
sdk_fs_version=sdk_fs_version, | |
vctools_dir=vctools_dir, | |
include_dirs=include_dirs, | |
path_dirs=path_dirs, | |
lib_dirs=lib_dirs, | |
target=target, | |
) | |
env = { | |
"VSCMD_ARG_TGT_ARCH": target, | |
"WindowsSdkBinPath": "Windows Kits\\10\\bin", | |
"INCLUDE": include_dirs, | |
"PATH": path_dirs, | |
"LIB": lib_dirs, | |
} | |
if host: | |
env["VSCMD_ARG_HOST_ARCH"] = host | |
if msvc_fs_version: | |
env["VCToolsVersion"] = msvc_fs_version | |
if vctools_dir: | |
env["VCToolsInstallDir"] = vctools_dir | |
with open(os.path.join(install_dir, f"env_{target}.json"), "w") as f: | |
f.write(json.dumps(env, indent=4)) | |
if msvc_fs_version: | |
generate_cmake( | |
install_dir=install_dir, | |
msvc_fs_version=msvc_fs_version, | |
sdk_fs_version=sdk_fs_version, | |
include_dirs=include_dirs, | |
path_dirs=path_dirs, | |
lib_dirs=lib_dirs, | |
target=target, | |
) | |
def generate_vcvars(*, install_dir, host, msvc_fs_version, sdk_fs_version, vctools_dir, include_dirs, path_dirs, lib_dirs, target): | |
s = "" | |
if host: | |
s += f"set VSCMD_ARG_HOST_ARCH={host}\n" | |
path_dirs_batch_rel = [f"%~dp0{p}" for p in path_dirs] + ["%PATH%"] | |
include_dirs_batch_rel = [f"%~dp0{p}" for p in include_dirs] | |
lib_dirs_batch_rel = [f"%~dp0{p}" for p in lib_dirs] | |
s += fr"""set VSCMD_ARG_TGT_ARCH={target} | |
set VCToolsVersion={msvc_fs_version if msvc_fs_version else ''} | |
set WindowsSDKVersion={sdk_fs_version if sdk_fs_version else ''} | |
set VCToolsInstallDir={('%~dp0' + vctools_dir) if vctools_dir else ''}\ | |
set WindowsSdkBinPath=%~dp0Windows Kits\10\bin\ | |
set INCLUDE={';'.join(include_dirs_batch_rel)} | |
set PATH={';'.join(path_dirs_batch_rel)} | |
set LIB={';'.join(lib_dirs_batch_rel)} | |
""" | |
with open(os.path.join(install_dir, f"vcvars_{target}.bat"), "w") as f: | |
f.write(s) | |
if target == "x64" and msvc_fs_version: | |
with open(os.path.join(install_dir, "VC", f"vcvarsall.bat"), "w") as f: | |
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! | |
# TODO: I *think* this probably takes the ARCH as a cmdline arg | |
# and maybe without an arg it defaults to the host arch? | |
f.write(fr"@call %~dp0..\vcvars_x64.bat") | |
def generate_cmake(*, install_dir, msvc_fs_version, sdk_fs_version, include_dirs, path_dirs, lib_dirs, target): | |
assert(msvc_fs_version) | |
out_file = os.path.join(install_dir, "cmake", f"toolchain.{target}.cmake") | |
os.makedirs(os.path.dirname(out_file), exist_ok=True) | |
def findexe(exe, *, required): | |
for p in path_dirs: | |
full_path = os.path.join(install_dir, p, exe) | |
if os.path.exists(full_path): | |
return os.path.join(p, exe) | |
if required: | |
log(f"error: unable to find '{exe}' in any of the following {len(path_dirs)} paths:") | |
for p in path_dirs: | |
log(f" '{p}'") | |
sys.exit(-1) | |
return None | |
cl = findexe("cl.exe", required=msvc_fs_version) | |
rc = findexe("rc.exe", required=sdk_fs_version) | |
mt = findexe("mt.exe", required=sdk_fs_version) | |
with open(out_file, "w") as f: | |
f.write('get_filename_component(MSVC_ROOT ${CMAKE_CURRENT_LIST_DIR} DIRECTORY)\n') | |
if rc: | |
f.write(f'set(CMAKE_RC_COMPILER {toCmakePath(rc)})\n') | |
if mt: | |
f.write(f'set(CMAKE_MT {toCmakePath(mt)})\n') | |
for lang in ("C", "CXX"): | |
f.write(f'set(CMAKE_{lang}_COMPILER {toCmakePath(cl)})\n') | |
f.write(f'set(CMAKE_{lang}_STANDARD_INCLUDE_DIRECTORIES\n') | |
for include_dir in include_dirs: | |
f.write(f' {toCmakePath(include_dir)}\n') | |
f.write(')\n') | |
f.write(f'link_directories(\n') | |
for lib_dir in lib_dirs: | |
f.write(f' {toCmakePath(lib_dir)}\n') | |
f.write(')\n') | |
def toCmakePath(p): | |
return '"${MSVC_ROOT}/' + p.replace("\\", "/") + '"' | |
def installSource(install_dir, name, source_path, dest_path): | |
sha256 = hashDirectory(source_path) | |
install_manifest = getInstallManifest(install_dir, sha256, name) | |
if os.path.exists(install_manifest): | |
log(f"{sha256} {name} already installed") | |
return | |
startInstall(install_manifest) | |
for subdir, dirs, files in os.walk(source_path): | |
for f in files: | |
full_path = os.path.join(subdir, f) | |
relative_path = os.path.join(dest_path, os.path.relpath(full_path, source_path)) | |
out = pathlib.Path(install_dir) / relative_path | |
if os.path.exists(out): | |
if filecmp.cmp(full_path, out): | |
addInstallingConflict(install_manifest, relative_path) | |
else: | |
sys.exit(f"file conflict! '{full_path}' does not match {out}") | |
else: | |
addInstallingNew(install_manifest, relative_path) | |
out.parent.mkdir(parents=True, exist_ok=True) | |
shutil.copyfile(full_path, out) | |
endInstall(install_dir, install_manifest) | |
class Cab: | |
def __init__(self, payload): | |
self.url = payload["url"] | |
self.sha256 = payload["sha256"].lower() | |
self.basename = os.path.basename(payload["fileName"]) | |
def __eq__(self, other): | |
if not isinstance(other, Cab): | |
return False | |
return self.url == other.url and self.sha256 == other.sha256 and self.basename == other.basename | |
def __hash__(self): | |
return hash((self.url, self.sha256, self.basename)) | |
def getMsiCabs(pkg, msi): | |
# a dumb way to see what cab files an msi needs, just go through | |
# all the cab files in the package and then see if their name | |
# appears somewhere in the msi file | |
cabs = [] | |
payloads = pkg["payloads"] | |
for payload in payloads: | |
filename = payload["fileName"] | |
if not filename.endswith(".cab"): | |
continue | |
basename = os.path.basename(filename) | |
if msi.find(basename.encode("utf8")): | |
cabs.append(Cab(payload)) | |
return cabs | |
class Msi: | |
def __init__(self, download_dir, sdk_version, basename, sha256): | |
self.basename = basename | |
self.sha256 = sha256 | |
self.filepath = os.path.join(download_dir, sdk_version, basename) | |
def installMsi(install_dir_abs, msi): | |
install_manifest = getInstallManifest(install_dir_abs, msi.sha256, msi.basename) | |
if os.path.exists(install_manifest): | |
log(f"{msi.sha256} {msi.basename} already installed") | |
return | |
print(f"installing {msi.basename}") | |
staging_dir = install_dir_abs + ".staging" | |
rmtree(staging_dir) | |
sys.stdout.flush() | |
sys.stderr.flush() | |
subprocess.run([ | |
"msiexec.exe", | |
"/a", | |
msi.filepath, | |
"/quiet", | |
"/qn", | |
#"/?", | |
#"/lv", "C:\\temp\\log.txt", | |
f"TARGETDIR={staging_dir}", | |
], check=True) | |
startInstall(install_manifest) | |
found_msi = False | |
for subdir, dirs, files in os.walk(staging_dir): | |
for f in files: | |
full_path = os.path.join(subdir, f) | |
relative_path = os.path.relpath(full_path, staging_dir) | |
# the msi installs itself to the TARGETDIR for some reason? | |
if relative_path == msi.basename: | |
found_msi = True | |
continue | |
out = pathlib.Path(install_dir_abs) / relative_path | |
if os.path.exists(out): | |
if filecmp.cmp(full_path, out): | |
addInstallingConflict(install_manifest, relative_path) | |
else: | |
sys.exit(f"file conflict! '{full_path}' does not match {out}") | |
else: | |
addInstallingNew(install_manifest, relative_path) | |
out.parent.mkdir(parents=True, exist_ok=True) | |
os.rename(full_path, out) | |
endInstall(install_dir_abs, install_manifest) | |
rmtree(staging_dir) | |
def installMsvcPkg(download_dir, install_dir, packages, pkg_id): | |
if pkg_id not in packages: | |
print(f"msvc package '{pkg_id}' does not exist") | |
return | |
print(f"installing msvc package '{pkg_id}'...") | |
pkg = first(packages[pkg_id], lambda p: p.get("language") in (None, "en-US")) | |
for payload in pkg["payloads"]: | |
installPayload(download_dir, install_dir, payload) | |
def installPayload(download_dir, install_dir, payload): | |
filename = payload["fileName"] | |
sha256 = payload["sha256"].lower() | |
url = payload["url"] | |
if "/" in filename or "\\" in filename: | |
sys.exit("payload fileName contains slashes: {filename}") | |
install_manifest = getInstallManifest(install_dir, sha256, filename) | |
if os.path.exists(install_manifest): | |
log(f"{sha256} {filename} already installed") | |
return | |
filepath = os.path.join(download_dir, filename) | |
download(url, sha256, filepath) | |
startInstall(install_manifest) | |
with zipfile.ZipFile(filepath) as z: | |
for name_encoded in z.namelist(): | |
if name_encoded.startswith("Contents/"): | |
name_decoded = urllib.parse.unquote(name_encoded) | |
relative_path = pathlib.Path(name_decoded).relative_to("Contents") | |
out = install_dir / relative_path | |
if os.path.exists(out): | |
sys.exit(f"file conflict '{relative_path}'") | |
addInstallingNew(install_manifest, relative_path) | |
out.parent.mkdir(parents=True, exist_ok=True) | |
out.write_bytes(z.read(name_encoded)) | |
endInstall(install_dir, install_manifest) | |
def getInstallManifest(install_dir, sha256, filename): | |
return os.path.join(install_dir, "install", f"{sha256}-{filename}.manifest") | |
def startInstall(install_manifest): | |
installing = install_manifest + ".installing" | |
if os.path.exists(installing): | |
log(f"removing old failed installation manifest {installing}") | |
with open(installing, "r") as f: | |
content = f.read() | |
for line in content.splitlines(): | |
path = os.path.join(installing, line) | |
if line.startswith("new "): | |
log(f"removing {path}...") | |
elif line.startswith("add "): | |
log(f"leaving {path}...") | |
else: | |
sys.exit("invalid line: {line}") | |
os.remove(installing) | |
os.makedirs(os.path.dirname(installing), exist_ok=True) | |
with open(installing, "w") as f: | |
f.write("") | |
def endInstall(install_dir, install_manifest): | |
installing = install_manifest + ".installing" | |
assert(not os.path.exists(install_manifest)) | |
assert(os.path.exists(installing)) | |
with open(installing, "r") as f: | |
content = f.read() | |
for line in content.splitlines(): | |
path = os.path.join(install_dir, line[4:]) | |
if not os.path.exists(path): | |
sys.exit(f"file '{path}' was in the manifest file but does not exist? {installing}") | |
os.rename(installing, install_manifest) | |
def addInstallingNew(install_manifest, relative_path): | |
installing = install_manifest + ".installing" | |
with open(installing, "a") as f: | |
f.write(f"new {relative_path}\n") | |
def addInstallingConflict(install_manifest, relative_path): | |
installing = install_manifest + ".installing" | |
with open(installing, "a") as f: | |
f.write(f"add {relative_path}\n") | |
def checkMsvcVersion(msvc_version_to_pkg_id, version): | |
if version not in msvc_version_to_pkg_id: | |
versions = sorted(msvc_version_to_pkg_id.keys()) | |
choices = ", ".join(versions) | |
sys.exit(f"error: unknown msvc version '{version}', choose one of: {choices}") | |
def getOnePkg(packages, pkg_id): | |
matches = packages[pkg_id] | |
if len(matches) != 1: | |
for idx, pkg in enumerate(matches): | |
log(f"package {idx+1} {json.dumps(pkg)}") | |
log(f"expected exactly one package to have id '{pkg_id}' but found {len(matches)}") | |
sys.exit(-1) | |
return matches[0] | |
def getSdkPkg(packages, sdk_choices, sdk_version_to_pkg_id, version): | |
sdk_pkg_id = sdk_version_to_pkg_id.get(version) | |
if not sdk_pkg_id: | |
sys.exit(f"error: unknown sdk version '{sdk_version}', choose one of: {sdk_choices}") | |
sdk_pkg = getOnePkg(packages, sdk_pkg_id) | |
dependency_ids = sdk_pkg["dependencies"] | |
if len(dependency_ids) != 1: | |
sys.exit(f"expectd sdk package '{sdk_pkg_id}' to have exactly 1 dependency but has {len(dependency_ids)}: {dependency_ids}") | |
return getOnePkg(packages, next(iter(dependency_ids)).lower()) | |
def extractVersion(pkg_id): | |
parts = pkg_id.split(".") | |
i = 0 | |
while True: | |
if i >= len(parts): | |
return None | |
if parts[i].isnumeric(): | |
break; | |
i += 1 | |
start = i | |
while True: | |
if i >= len(parts): | |
break | |
if not parts[i].isnumeric(): | |
break; | |
i += 1 | |
return ".".join(parts[start:i]) | |
def getMsvcPkgs(version, host, targets, packages): | |
pkgs = [ | |
f"microsoft.visualcpp.dia.sdk", | |
f"microsoft.vc.{version}.crt.headers.base", | |
f"microsoft.vc.{version}.crt.source.base", | |
f"microsoft.vc.{version}.asan.headers.base", | |
f"microsoft.vc.{version}.pgo.headers.base", | |
] | |
for target in targets: | |
pkgs += [ | |
f"microsoft.vc.{version}.tools.host{host}.target{target}.base", | |
f"microsoft.vc.{version}.tools.host{host}.target{target}.res.base", | |
f"microsoft.vc.{version}.crt.{target}.desktop.base", | |
f"microsoft.vc.{version}.crt.{target}.store.base", | |
f"microsoft.vc.{version}.premium.tools.host{host}.target{target}.base", | |
f"microsoft.vc.{version}.pgo.{target}.base", | |
] | |
if target == "arm64": | |
pkgs += [ f"microsoft.vc.{version}.crt.{target}.desktop.debug.base" ] | |
if target in ["x86", "x64"]: | |
pkgs += [f"microsoft.vc.{version}.asan.{target}.base"] | |
redist_suffix = ".onecore.desktop" if target == "arm" else "" | |
redist_pkg = f"microsoft.vc.{version}.crt.redist.{target}{redist_suffix}.base" | |
if redist_pkg not in packages: | |
redist_name = f"microsoft.visualcpp.crt.redist.{target}{redist_suffix}" | |
redist = first(packages[redist_name]) | |
redist_pkg = first(redist["dependencies"], lambda dep: dep.endswith(".base")).lower() | |
pkgs += [redist_pkg] | |
return pkgs | |
def findSigningInstaller(sdk_pkg): | |
payload_regex = "(Windows SDK Signing Tools-.*)" | |
matches = [] | |
payloads = sdk_pkg["payloads"] | |
for payload in payloads: | |
fileName = payload["fileName"] | |
if match := re.search(payload_regex, fileName): | |
matches.append(match.group(1)) | |
if len(matches) == 1: | |
return matches[0] | |
if len(matches) == 0: | |
print(f"error: unable to find the signing installer from the following {len(payloads)} payloads:") | |
for i in range(0, len(payloads)): | |
fileName = payloads[i]["fileName"] | |
print(f" payload[{i}] = {fileName}") | |
sys.exit(f"error: unable to find signing installer, from the above payloads with this regex '{payload_regex}'") | |
print(f"error: our regex for the signing installer '{payload_regex}' matched {len(matches)} payloads:") | |
for i in range(0, len(matches)): | |
print(f" match {i} '{matches[i]}'") | |
sys.exit("error: multiple sdk signing payload matches") | |
def getSdkInstallers(targets, sdk_include_signing, sdk_pkg): | |
installers = getSdkInstallersBase(targets) | |
if sdk_include_signing: | |
installers += [findSigningInstaller(sdk_pkg)] | |
return installers | |
def getSdkInstallersBase(targets): | |
pkgs = [ | |
f"Windows SDK for Windows Store Apps Tools-x86_en-us.msi", | |
f"Windows SDK for Windows Store Apps Headers-x86_en-us.msi", | |
f"Windows SDK for Windows Store Apps Headers OnecoreUap-x86_en-us.msi", | |
f"Windows SDK for Windows Store Apps Libs-x86_en-us.msi", | |
f"Universal CRT Headers Libraries and Sources-x86_en-us.msi", | |
] | |
for target in ALL_TARGETS: | |
pkgs += [ | |
f"Windows SDK Desktop Headers {target}-x86_en-us.msi", | |
f"Windows SDK OnecoreUap Headers {target}-x86_en-us.msi", | |
] | |
for target in targets: | |
pkgs += [f"Windows SDK Desktop Libs {target}-x86_en-us.msi"] | |
return pkgs | |
def getPkgFiles(packages): | |
def writeFilesTo(manifest_file, pkg, payload): | |
print(f"Grabbing files from '{payload}'") | |
if False: | |
pass | |
elif payload.endswith(".msi"): | |
with open(payload, "rb") as f: | |
for cab in getMsiCabs(pkg, f.read()): | |
cab_file = os.path.join(os.path.dirname(payload), cab.basename) | |
download(cab.url, cab.sha256, cab_file) | |
temppkg = r"C:\temp\temppkg" | |
rmtree(temppkg) | |
args = [ | |
"msiexec.exe", | |
"/a", | |
payload, | |
"/qb", | |
f"TARGETDIR={temppkg}", | |
] | |
print(subprocess.list2cmdline(args)) | |
subprocess.run(args, check=True) | |
for subdir, dirs, files in os.walk(temppkg): | |
for f in files: | |
full_path = os.path.join(subdir, f) | |
relative_path = os.path.relpath(full_path, temppkg) | |
manifest_file.write(f"{relative_path}\n") | |
rmtree(temppkg) | |
elif payload.endswith(".zip"): | |
with zipfile.ZipFile(filepath) as z: | |
for name in z.namelist(): | |
manifest_file.write(f"{name}\n") | |
elif payload.endswith(".nupkg"): | |
with zipfile.ZipFile(filepath) as z: | |
for name in z.namelist(): | |
if name.startswith("content/"): | |
relative_path = pathlib.Path(name).relative_to("content") | |
manifest_file.write(f"{relative_path}\n") | |
elif name.startswith("contentFiles/"): | |
relative_path = pathlib.Path(name).relative_to("contentFiles") | |
manifest_file.write(f"{relative_path}\n") | |
elif payload.lower().endswith(".vsix"): | |
with zipfile.ZipFile(filepath) as z: | |
for name_encoded in z.namelist(): | |
if name.startswith("Contents/"): | |
name_decoded = urllib.parse.unquote(name_encoded) | |
relative_path = pathlib.Path(name_decoded).relative_to("Contents") | |
manifest_file.write(f"{relative_path}\n") | |
else: | |
sys.exit(f"TODO: handle payload extension '{payload}'") | |
dl_dir = r"c:\temp\stage" | |
manifest_dir = r"c:\temp\pkgmanifests" | |
os.makedirs(manifest_dir, exist_ok=True) | |
payload_count = 0 | |
bad_payloads = { | |
"microsoft.visualstudio.devenv.msi|Microsoft.VisualStudio.Devenv.x64.Msi.msi", | |
"microsoft.visualstudio.devenv.resources|payload.vsix", | |
"microsoft.visualstudio.devenv.shared.msi|Microsoft.VisualStudio.Devenv.Shared.Msi.msi", | |
"microsoft.visualstudio.testtools.codeduitest.framework.msi|Microsoft.VisualStudio.TestTools.CodedUITest.Framework.Msi.msi", | |
"microsoft.visualstudio.testtools.cuit.ext.msi.targeted|Microsoft.VisualStudio.TestTools.CUIT.Ext.Msi.x64.msi", | |
"microsoft.visualstudio.testtools.testagent.msi.targeted|Microsoft.VisualStudio.TestTools.TestAgent.Msi.x64.msi", | |
"microsoft.visualstudio.testtools.tp.legacy.tips.msi|Microsoft.VisualStudio.TestTools.TP.Legacy.Tips.Msi.msi", | |
"microsoft.visualstudio.testtools.webloadtest.webtestrecorder.msi.targeted|Microsoft.VisualStudio.TestTools.WebLoadTest.WebTestRecorder.Msi.x64.msi", | |
"microsoft.visualstudio.wcf.diagnosticpack.msi|Microsoft.VisualStudio.WCF.DiagnosticPack.Msi.msi", | |
"microsoft.visualstudio.workflowv2.debugger.msi|Microsoft.VisualStudio.WorkflowV2.Debugger.Msi.x64.msi", | |
"unrealenginev1|EpicGameLauncher.msi", | |
"win10sdk_10.0.18362|MSI Development Tools-x86_en-us.msi", | |
"win10sdk_10.0.18362|SDK ARM Redistributables-x86_en-us.msi", | |
"win10sdk_10.0.18362|Windows SDK Redistributables-x86_en-us.msi", | |
"win10sdk_10.0.19041|MSI Development Tools-x86_en-us.msi", | |
"win10sdk_10.0.19041|SDK ARM Redistributables-x86_en-us.msi", | |
} | |
for pkg_id in packages: | |
group = packages[pkg_id] | |
print(f"{len(group)} Packages with ID: {pkg_id}") | |
for pkg in group[:1]: | |
for payload in pkg.get("payloads", []): | |
basename = os.path.basename(payload["fileName"]) | |
if basename.endswith(".cab"): | |
continue # should be handled by another msi I think | |
if basename.endswith(".exe"): | |
continue # too hard for now | |
if basename.endswith(".ps1"): | |
continue # too hard for now | |
if basename.endswith(".msu"): | |
continue # ignore for now | |
payload_key = f"{pkg_id}|{basename}" | |
if payload_key in bad_payloads: | |
print("skipping problematic payload") | |
continue | |
payload_count += 1 | |
print(f"Payload {payload_count}: {payload_key}") | |
#!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! | |
#continue | |
sha256 = payload["sha256"].lower() | |
url = payload["url"] | |
file_manifest = os.path.join(manifest_dir, f"{pkg_id}_{basename}.txt") | |
if os.path.exists(file_manifest): | |
continue | |
filepath = os.path.join(dl_dir, f"{basename}") | |
download(url, sha256, filepath) | |
tmp = file_manifest + ".tmp" | |
with open(tmp, "w") as f: | |
writeFilesTo(f, pkg, filepath) | |
os.rename(tmp, file_manifest) | |
remove(filepath) | |
rmtree(dl_dir) | |
print(f"{len(packages)} package ids, {payload_count} payloads") | |
sys.exit("here") | |
def first(items, cond = lambda x: True): | |
return next((item for item in items if cond(item)), None) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment