Created
June 9, 2026 03:47
-
-
Save nateberkopec/d441eb8b1456ef3d25c17b042d1f8d3e to your computer and use it in GitHub Desktop.
Audit Gemfile.lock gems for human MFA enforcement and Trusted Publishing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env ruby | |
| # frozen_string_literal: true | |
| require "bundler" | |
| require "cgi" | |
| require "json" | |
| require "net/http" | |
| require "optparse" | |
| require "thread" | |
| require "uri" | |
| MFA_DOWNLOAD_THRESHOLD = 180_000_000 | |
| DEFAULT_JOBS = 25 | |
| USER_AGENT = "gem-mfa-audit/1.0 (+https://rubygems.org)" | |
| options = { jobs: Integer(ENV.fetch("JOBS", DEFAULT_JOBS)), threshold: MFA_DOWNLOAD_THRESHOLD } | |
| OptionParser.new do |parser| | |
| parser.banner = "Usage: ruby gem_mfa_audit.rb [options] [Gemfile.lock]" | |
| parser.on("--jobs N", Integer, "HTTP worker threads (default: #{DEFAULT_JOBS})") { |v| options[:jobs] = v } | |
| parser.on("--threshold N", Integer, "RubyGems download threshold for account MFA (default: #{MFA_DOWNLOAD_THRESHOLD})") { |v| options[:threshold] = v } | |
| end.parse! | |
| Response = Struct.new(:body, :status, :url, :error, keyword_init: true) do | |
| def ok? = status&.between?(200, 299) | |
| end | |
| GemInfo = Struct.new(:name, :latest, :downloads, :metadata_mfa, :threshold_mfa, :tp_observed, :errors, keyword_init: true) do | |
| def human_mfa_enforced? = metadata_mfa || threshold_mfa | |
| def safe? = human_mfa_enforced? && !tp_observed | |
| def risky? = !safe? | |
| def reasons | |
| [].tap do |reasons| | |
| reasons << "no_human_mfa_enforcement" unless human_mfa_enforced? | |
| reasons << "trusted_publishing_observed" if tp_observed | |
| end | |
| end | |
| end | |
| def fetch(url, attempts: 3) | |
| attempts.times do |attempt| | |
| uri = URI(url) | |
| request = Net::HTTP::Get.new(uri) | |
| request["Accept"] = "application/json, text/html;q=0.9, */*;q=0.8" | |
| request["User-Agent"] = USER_AGENT | |
| response = Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == "https", open_timeout: 10, read_timeout: 20) do |http| | |
| http.request(request) | |
| end | |
| if response.code.to_i == 429 && attempt < attempts - 1 | |
| sleep(Integer(response["Retry-After"] || 2)) | |
| next | |
| end | |
| return Response.new(body: response.body, status: response.code.to_i, url: url) | |
| rescue StandardError => e | |
| return Response.new(url: url, error: "#{e.class}: #{e.message}") if attempt == attempts - 1 | |
| sleep(0.5 * (attempt + 1)) | |
| end | |
| end | |
| def fetch_all(requests, jobs:) | |
| queue = Queue.new | |
| requests.each { |key, url| queue << [key, url] } | |
| mutex = Mutex.new | |
| results = {} | |
| Array.new([jobs, requests.size].min) do | |
| Thread.new do | |
| loop do | |
| key, url = queue.pop(true) | |
| mutex.synchronize { results[key] = fetch(url) } | |
| rescue ThreadError | |
| break | |
| end | |
| end | |
| end.each(&:join) | |
| results | |
| end | |
| def rubygems_org_spec?(spec) | |
| spec.source.is_a?(Bundler::Source::Rubygems) && spec.source.remotes.map(&:to_s).include?("https://rubygems.org/") | |
| end | |
| def gem_api_url(name) = "https://rubygems.org/api/v1/gems/#{CGI.escape(name)}.json" | |
| def gem_page_url(name) = "https://rubygems.org/gems/#{CGI.escape(name)}" | |
| def yes_no(value) = value ? "yes" : "no" | |
| def metadata_mfa?(metadata) | |
| metadata && metadata["rubygems_mfa_required"].to_s == "true" | |
| end | |
| def trusted_publishing_observed?(response) | |
| return false unless response.ok? | |
| pushed_by = response.body.index("Pushed by:") | |
| pushed_by && response.body[pushed_by, 1_500].include?("GitHub Actions") | |
| end | |
| def json_body(response, errors) | |
| unless response.ok? | |
| errors << "#{response.url} failed: #{response.status || response.error}" | |
| return {} | |
| end | |
| JSON.parse(response.body) | |
| rescue JSON::ParserError => e | |
| errors << "#{response.url} returned invalid JSON: #{e.message}" | |
| {} | |
| end | |
| lockfile = ARGV[0] || "Gemfile.lock" | |
| parser = Bundler::LockfileParser.new(File.read(lockfile)) | |
| rubygems_specs, skipped_specs = parser.specs.partition { |spec| rubygems_org_spec?(spec) } | |
| names = rubygems_specs.map(&:name).uniq.sort | |
| requests = names.each_with_object({}) do |name, hash| | |
| hash[[:api, name]] = gem_api_url(name) | |
| hash[[:page, name]] = gem_page_url(name) | |
| end | |
| responses = fetch_all(requests, jobs: options[:jobs]) | |
| infos = names.map do |name| | |
| errors = [] | |
| gem_data = json_body(responses.fetch([:api, name]), errors) | |
| page_response = responses.fetch([:page, name]) | |
| errors << "#{page_response.url} failed: #{page_response.status || page_response.error}" unless page_response.ok? | |
| downloads = Integer(gem_data.fetch("downloads", 0) || 0) | |
| GemInfo.new( | |
| name: name, | |
| latest: gem_data["version"], | |
| downloads: downloads, | |
| metadata_mfa: metadata_mfa?(gem_data["metadata"]), | |
| threshold_mfa: downloads > options[:threshold], | |
| tp_observed: trusted_publishing_observed?(page_response), | |
| errors: errors | |
| ) | |
| end | |
| safe = infos.reject(&:risky?).sort_by(&:name) | |
| risky = infos.select(&:risky?).sort_by(&:name) | |
| errors = infos.select { |info| info.errors.any? } | |
| skipped = skipped_specs.map { |spec| [spec.name, spec.version.to_s, spec.source.class.to_s] }.uniq.sort | |
| total = safe.size + risky.size | |
| safe_percent = total.zero? ? 0.0 : (safe.size.to_f / total * 100) | |
| puts "Total: #{total}" | |
| puts "(Safe) Human MFA Enforced, No TP Observed: #{safe.size}" | |
| puts "(Risky) MFA not present or TP observed: #{risky.size}" | |
| puts "Safe %: #{format('%.1f', safe_percent)}%" | |
| puts "Errors: #{errors.size}" | |
| puts | |
| puts "Definitions:" | |
| puts "- human_mfa_enforced: latest gem metadata has rubygems_mfa_required=true OR total downloads are > #{options[:threshold]}." | |
| puts "- tp_observed: the current RubyGems gem page shows the latest version was pushed by a Trusted Publisher." | |
| puts "- listed: NOT human_mfa_enforced OR tp_observed." | |
| puts | |
| puts "Gems without human-only MFA coverage for future versions (or with observed Trusted Publishing):" | |
| puts "name\tlatest\tdownloads\tmetadata_mfa\tthreshold_mfa\ttp_latest\ttp_historical\treasons" | |
| risky.each do |info| | |
| puts [ | |
| info.name, | |
| info.latest, | |
| info.downloads, | |
| yes_no(info.metadata_mfa), | |
| yes_no(info.threshold_mfa), | |
| yes_no(info.tp_observed), | |
| "no", | |
| info.reasons.join(",") | |
| ].join("\t") | |
| end | |
| unless errors.empty? | |
| warn "\nErrors:" | |
| errors.sort_by(&:name).each do |info| | |
| warn "#{info.name}:" | |
| info.errors.each { |error| warn " - #{error}" } | |
| end | |
| end | |
| unless skipped.empty? | |
| puts "\nNon-rubygems.org specs skipped:" | |
| skipped.each do |name, version, source_class| | |
| puts "#{name} #{version} # #{source_class}" | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment