Last active
February 26, 2026 06:29
-
-
Save ksss/1f8a95a7dea32a5b8ccc4b255e2742a6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env ruby | |
| require 'prism' | |
| require 'rbs' | |
| require 'stringio' | |
| class RBS::Prototype::RBI | |
| def self.parse(buffer, prism) | |
| result = RBS::InlineParser::Result.new(buffer, prism) | |
| new(result).parse(prism) | |
| end | |
| def initialize(result) | |
| @result = result | |
| end | |
| def parse(prism) | |
| parser = Parser.new(@result) | |
| parser.visit(prism.value) | |
| self | |
| end | |
| def write(out:) | |
| w = Writer.new(out:) | |
| w.write(@result.declarations) | |
| out | |
| end | |
| class Writer | |
| attr_reader :out | |
| def initialize(out:) | |
| @out = out | |
| @indent = 0 | |
| end | |
| def nesting | |
| @indent += 1 | |
| yield | |
| ensure | |
| @indent -= 1 | |
| end | |
| def puts(string) | |
| @out.puts(" " * @indent + string) | |
| end | |
| def write(decls) | |
| decls.each do |decl| | |
| write_decl(decl) | |
| end | |
| end | |
| def write_decl(decl) | |
| case decl | |
| when RBS::AST::Ruby::Declarations::ModuleDecl | |
| write_module(decl) | |
| when RBS::AST::Ruby::Declarations::ClassDecl | |
| write_class(decl) | |
| when RBS::AST::Ruby::Declarations::ConstantDecl | |
| write_constant(decl) | |
| else | |
| binding.irb | |
| end | |
| end | |
| def write_module(decl) | |
| puts("module #{decl.module_name}") | |
| nesting do | |
| write_members(decl.members) | |
| end | |
| puts("end") | |
| end | |
| def write_class(decl) | |
| superclass_str = if decl.super_class | |
| " < #{decl.super_class.type_name}" | |
| else | |
| "" | |
| end | |
| puts("class #{decl.class_name}#{superclass_str}") | |
| nesting do | |
| write_members(decl.members) | |
| end | |
| puts("end") | |
| end | |
| def write_constant(decl) | |
| puts "#{decl.constant_name}: #{decl.type}" | |
| end | |
| def write_members(members) | |
| members.each do |member| | |
| case member | |
| when RBS::AST::Ruby::Declarations::ModuleDecl, | |
| RBS::AST::Ruby::Declarations::ClassDecl | |
| write_decl(member) | |
| when RBS::AST::Ruby::Members::DefMember | |
| method_type = member.method_type.type_annotations.first.method_type | |
| puts("def #{member.name}: #{method_type}") | |
| else | |
| binding.irb | |
| end | |
| end | |
| end | |
| end | |
| class Sig | |
| attr_accessor :type_parameters | |
| attr_accessor :keyward_arguments | |
| attr_accessor :returns | |
| def initialize | |
| @type_parameters = [] | |
| @keyward_arguments = {} | |
| @returns = nil | |
| end | |
| end | |
| class DocStyle < RBS::AST::Ruby::Members::MethodTypeAnnotation::DocStyle | |
| attr_accessor :keyward_arguments | |
| def initialize | |
| super | |
| @keyward_arguments = {} | |
| end | |
| end | |
| class Parser < ::RBS::InlineParser::Parser | |
| def initialize(result) | |
| @result = result | |
| @module_nesting = [] | |
| @comments = ::RBS::InlineParser::CommentAssociation.new([]) | |
| end | |
| def node_to_type(node) | |
| case node | |
| when Prism::ConstantReadNode | |
| const = node.name.to_s | |
| case const | |
| when 'NilClass' | |
| ::RBS::Types::Bases::Nil.new(location: nil) | |
| else | |
| RBS::Types::ClassInstance.new( | |
| name: RBS::TypeName.parse(node.name.to_s), | |
| args: [], | |
| location: nil | |
| ) | |
| end | |
| when Prism::CallNode | |
| case node.receiver | |
| when Prism::ConstantReadNode | |
| if node.receiver.name == :T | |
| case node.name | |
| when :any | |
| # T.any(type1, type2) | |
| types = node.arguments.arguments.map do |a| | |
| node_to_type(a) | |
| end | |
| ::RBS::Types::Union.new( | |
| types:, | |
| location: nil | |
| ) | |
| when :all | |
| # T.all(type1, type2) | |
| types = node.arguments.arguments.map do |a| | |
| node_to_type(a) | |
| end | |
| ::RBS::Types::Intersection.new( | |
| types:, | |
| location: nil | |
| ) | |
| when :nilable | |
| # T.nilable(type) | |
| t = node_to_type(node.arguments.arguments.first) or raise | |
| RBS::Types::Optional.new( | |
| type: t, | |
| location: nil | |
| ) | |
| when :untyped | |
| # T.untyped | |
| ::RBS::Types::Bases::Any.new(location: nil) | |
| when :class_of | |
| # T.class_of(type) | |
| t = node_to_type(node.arguments.arguments.first) or raise | |
| RBS::Types::ClassSingleton.new( | |
| name: RBS::TypeName.parse(t.to_s), | |
| location: nil | |
| ) | |
| when :type_parameter | |
| # T.type_parameter(:U) | |
| ::RBS::Types::Variable.new( | |
| name: node.arguments.arguments.first.value.to_sym, | |
| location: nil | |
| ) | |
| else | |
| binding.irb | |
| end | |
| end | |
| when Prism::ConstantPathNode | |
| if node.receiver.parent.name == :T | |
| # T::Array[type] | |
| ::RBS::Types::ClassInstance.new( | |
| name: RBS::TypeName.parse("::#{node.receiver.name}"), | |
| args: node.arguments.arguments.map { |a| node_to_type(a) }, | |
| location: nil | |
| ) | |
| end | |
| when Prism::CallNode | |
| case node.receiver | |
| when Prism::CallNode | |
| case node.receiver.receiver | |
| when Prism::ConstantReadNode | |
| if node.receiver.receiver.name == :T | |
| case node.receiver.name | |
| when :proc | |
| # T.proc.returns(type) | |
| r = case node.name | |
| when :returns | |
| node_to_type(node.arguments.arguments.first) or raise | |
| when :void | |
| ::RBS::Types::Bases::Void.new(location: nil) | |
| end | |
| ::RBS::Types::Proc.new( | |
| type: ::RBS::Types::Function.empty(r), | |
| block: nil, | |
| self_type: nil, | |
| location: nil | |
| ) | |
| end | |
| end | |
| when Prism::CallNode | |
| case node.receiver.receiver.receiver | |
| when Prism::ConstantReadNode | |
| if node.receiver.receiver.receiver.name == :T | |
| case node.receiver.receiver.name | |
| when :proc | |
| # T.proc.params(arg: type).returns(type) | |
| proc_sig = resolve_proc(node) | |
| ::RBS::Types::Proc.new( | |
| type: ::RBS::Types::Function.new( | |
| required_positionals: proc_sig.keyward_arguments.map { |name, type| | |
| ::RBS::Types::Function::Param.new(type:, name:, location: nil) | |
| }, | |
| optional_positionals: [], | |
| rest_positionals: nil, | |
| trailing_positionals: [], | |
| required_keywords: {}, | |
| optional_keywords: {}, | |
| rest_keywords: nil, | |
| return_type: proc_sig.returns, | |
| ), | |
| block: nil, | |
| self_type: nil, | |
| location: nil | |
| ) | |
| end | |
| else | |
| binding.irb | |
| end | |
| else | |
| binding.irb | |
| end | |
| else | |
| binding.irb | |
| end | |
| else | |
| binding.irb | |
| end | |
| else | |
| binding.irb | |
| end | |
| when Prism::ConstantPathNode | |
| case full_name = node.full_name | |
| when 'T::Boolean' | |
| RBS::Types::Bases::Bool.new(location: nil) | |
| else | |
| RBS::Types::ClassInstance.new( | |
| name: RBS::TypeName.parse(full_name), | |
| args: [], | |
| location: nil | |
| ) | |
| end | |
| when Prism::ArrayNode | |
| # [type] | |
| RBS::Types::Tuple.new( | |
| types: node.elements.map { |e| node_to_type(e) }, | |
| location: nil | |
| ) | |
| when Prism::HashNode | |
| # { a: type, b: type } | |
| RBS::Types::Record.new( | |
| fields: node.elements.to_h { |e| [e.key.value.to_sym, node_to_type(e.value)] }, | |
| location: nil | |
| ) | |
| else | |
| binding.irb | |
| end | |
| end | |
| def each_method_chain(node) | |
| yield node | |
| receiver = node.receiver | |
| while receiver | |
| yield receiver | |
| break unless receiver.respond_to?(:receiver) | |
| receiver = receiver.receiver | |
| end | |
| end | |
| def resolve_proc(body) | |
| Sig.new.tap do |sig| | |
| each_method_chain(body) do |node| | |
| case node | |
| when Prism::ConstantReadNode | |
| case node.name | |
| when :T | |
| # ok | |
| else | |
| binding.irb | |
| end | |
| when Prism::CallNode | |
| case node.name | |
| when :params | |
| # sig { params(type).returns(type) } | |
| node.arguments.arguments.each do |arg| | |
| case arg | |
| when Prism::KeywordHashNode | |
| arg.elements.each do |element| | |
| case element | |
| when Prism::AssocNode | |
| key = element.key | |
| value = element.value | |
| sig.keyward_arguments[key.value.to_sym] = node_to_type(element.value) | |
| else | |
| binding.irb | |
| end | |
| end | |
| else | |
| binding.irb | |
| end | |
| end | |
| when :override | |
| # sig { override } | |
| when :overridable | |
| # sig { overridable } | |
| when :abstract | |
| # sig { abstract } | |
| when :type_parameters | |
| # sig { type_parameters(:U) } | |
| sig.type_parameters.concat body.receiver.receiver.arguments.arguments.map { |a| a.value.to_sym } | |
| when :returns | |
| # sig { returns(type) } | |
| raise if body.arguments.arguments.size > 1 | |
| sig.returns = node_to_type(body.arguments.arguments.first) | |
| when :void | |
| # sig { void } | |
| sig.returns = ::RBS::Types::Bases::Void.new(location: nil) | |
| when :proc | |
| # sig { proc.returns(type) } | |
| else | |
| warn "Unknown call node: T.#{node.name}" | |
| end | |
| else | |
| binding.irb | |
| end | |
| end | |
| end | |
| end | |
| def visit_call_node(node) | |
| return unless node.name == :sig | |
| return unless node.block | |
| node.block.body.body.each do |body| | |
| @sig = resolve_proc(body) | |
| end | |
| end | |
| def visit_def_node(node) | |
| return if node.receiver | |
| return if current_module.nil? | |
| method_type = ::RBS::AST::Ruby::Members::MethodTypeAnnotation.new( | |
| type_annotations: [ | |
| RBS::AST::Ruby::Annotations::ColonMethodTypeAnnotation.new( | |
| method_type: build_method_type(node), | |
| prefix_location: nil, | |
| annotations: [], | |
| location: nil | |
| ) | |
| ] | |
| ) | |
| defn = ::RBS::AST::Ruby::Members::DefMember.new(buffer, node.name, node, method_type, nil) | |
| current_module.members << defn | |
| end | |
| def build_method_type(node) | |
| keyward_arguments = @sig.keyward_arguments | |
| return_type = @sig.returns || RBS::Types::Bases::Any.new(location: nil) | |
| required_positionals = [] | |
| optional_positionals = [] | |
| rest_positionals = nil | |
| trailing_positionals = [] | |
| required_keywords = {} | |
| optional_keywords = {} | |
| rest_keywords = nil | |
| node.parameters&.tap do |pa| | |
| pa.requireds.each do |required| | |
| required_positionals << ::RBS::Types::Function::Param.new(type: keyward_arguments[required.name], name: required.name, location: nil) | |
| end | |
| pa.optionals.each do |optional| | |
| optional_positionals << ::RBS::Types::Function::Param.new(type: keyward_arguments[optional.name], name: optional.name, location: nil) | |
| end | |
| pa.keywords.each do |keyword| | |
| case keyword | |
| when Prism::RequiredKeywordParameterNode | |
| required_keywords[keyword.name] = keyward_arguments[keyword.name] | |
| when Prism::OptionalKeywordParameterNode | |
| optional_keywords[keyword.name] = keyward_arguments[keyword.name] | |
| else | |
| binding.irb | |
| end | |
| end | |
| end | |
| RBS::MethodType.new( | |
| type: RBS::Types::Function.new( | |
| required_positionals:, | |
| optional_positionals:, | |
| rest_positionals:, | |
| trailing_positionals:, | |
| required_keywords:, | |
| optional_keywords:, | |
| rest_keywords:, | |
| return_type: | |
| ), | |
| type_params: @sig.type_parameters, | |
| block: nil, | |
| location: nil | |
| ) | |
| end | |
| end | |
| end | |
| # Usage: | |
| # bundle exec ruby rbs-prototype-rbi.rb foo.rbi | |
| outdir = Pathname.new('sig') | |
| if ARGV.delete('test') | |
| def assert_sig(rbi, rb, rbs) | |
| content = "module Test\n #{rbi}\n #{rb}\n end\nend" | |
| buffer = RBS::Buffer.new(name: 'test.rbi', content:) | |
| result = Prism.parse(content) | |
| out = StringIO.new | |
| RBS::Prototype::RBI.parse(buffer, result).write(out:) | |
| actual_rbs = ::RBS::Parser.parse_signature(out.string)[2][0].members.first.overloads.first.method_type.to_s | |
| unless rbs == actual_rbs | |
| raise "Expected:\n#{rbs}\nGot:\n#{actual_rbs}" | |
| end | |
| end | |
| def assert_type(rbi, rbs) | |
| result = Prism.parse(rbi) | |
| node = result.value.statements.body.first | |
| type = ::RBS::Prototype::RBI::Parser.new(result).node_to_type(node) | |
| unless type.to_s == rbs | |
| raise "Expected:\n#{rbs}\nGot:\n#{type.to_s}" | |
| end | |
| end | |
| assert_sig 'sig { void }', 'def test', '() -> void' | |
| assert_sig 'sig { returns(String) }', 'def test', '() -> String' | |
| assert_sig 'sig { params(type: T.untyped).returns(String) }', 'def test(type)', '(untyped type) -> String' | |
| assert_sig 'sig { params(type: T.untyped).returns(String) }', 'def test(type:)', '(type: untyped) -> String' | |
| assert_sig 'sig { type_parameters(:U).params(foo: T.type_parameter(:U)).returns(T.type_parameter(:U)) }', 'def test(foo)', '[U] (U foo) -> U' | |
| assert_type 'String', 'String' | |
| assert_type 'NilClass', 'nil' | |
| assert_type 'T.untyped', 'untyped' | |
| assert_type 'T::Boolean', 'bool' | |
| assert_type 'T.nilable(String)', 'String?' | |
| assert_type 'T.class_of(String)', 'singleton(String)' | |
| assert_type 'T.any(Integer, String)', 'Integer | String' | |
| assert_type 'T.all(Integer, String)', 'Integer & String' | |
| assert_type 'T::Array[String]', '::Array[String]' | |
| assert_type '[T.untyped, String]', '[ untyped, String ]' | |
| assert_type '{ a: Foo, b: Bar }', '{ a: Foo, b: Bar }' | |
| assert_type 'T.proc.void', '^() -> void' | |
| assert_type 'T.proc.returns(String)', '^() -> String' | |
| assert_type 'T.proc.params(arg: String).void', '^(String arg) -> void' | |
| assert_type 'T.proc.params(arg: String).returns(Integer)', '^(String arg) -> Integer' | |
| exit 0 # ok | |
| end | |
| ARGV.each do |name| | |
| next unless name.end_with?('.rbi') | |
| content = File.read(name) | |
| buffer = RBS::Buffer.new(name:, content:) | |
| result = Prism.parse(content) | |
| out = StringIO.new | |
| RBS::Prototype::RBI.parse(buffer, result).write(out:) | |
| rbs = (outdir / name).to_s | |
| rbs[-1] = 's' | |
| FileUtils.mkdir_p(File.dirname(rbs)) | |
| File.write(rbs, out.string) | |
| # print '.' | |
| puts out.string | |
| end | |
| puts "Generated #{ARGV.size} RBS files to #{outdir}/" if ARGV.size > 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment