Last active
March 18, 2024 21:53
-
-
Save grahamwren/e6f785a04ea6895a52f01ebcc0f2bcd6 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'csv' | |
require 'set' | |
require 'bigdecimal' | |
require 'pg' | |
require 'optparse' | |
require 'logger' | |
require 'json' | |
DATA_TYPES = %i[null boolean bigint numeric uuid jsonb text].freeze | |
VALID_NULL = [nil, 'nil', 'null', '', '""'].to_set.freeze | |
IS_NULL = VALID_NULL.method(:include?) | |
VALID_BOOLEANS = [true, false, 0, 1, 'true', 'false', '0', '1'].to_set.freeze | |
IS_BOOL = VALID_BOOLEANS.method(:include?) | |
IS_INT = ->(v) { v.is_a?(Integer) || v.is_a?(String) && v.to_i.to_s == v } | |
IS_NUMERIC = lambda { |v| | |
v.is_a?(Numeric) || v.is_a?(String) && begin | |
!!BigDecimal(v) | |
rescue StandardError | |
false | |
end | |
} | |
IS_UUID = ->(v) { v.is_a?(String) && v.match?(/\A[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}\z/i) } | |
IS_JSON = lambda do |v| | |
JSON.parse(v) | |
true | |
rescue StandardError | |
false | |
end | |
def detect_pg_type_of_val(val) | |
case val | |
when IS_NULL | |
:null | |
when IS_BOOL | |
:boolean | |
when IS_INT | |
:bigint | |
when IS_NUMERIC | |
:numeric | |
when IS_UUID | |
:uuid | |
when IS_JSON | |
:jsonb | |
else | |
:text | |
end | |
end | |
def detect_type_of_col(column) | |
types = column.map(&method(:detect_pg_type_of_val)) | |
type = types.sort_by! { |t| DATA_TYPES.find_index(t) }.last | |
if type == :null | |
[:boolean, true] | |
else | |
[type, types.include?(:null)] | |
end | |
end | |
def coerce_val_to_pg_val(type, val) | |
return nil if IS_NULL[val] | |
return [true, 'true', 1].include?(val) if type == :boolean | |
return val.to_i if type == :bigint | |
return BigDecimal(val) if type == :numeric | |
val | |
end | |
class PGHelper | |
def initialize(dbname:) | |
@conn = PG.connect(host: 'localhost', port: '5432', dbname: dbname) | |
@sql_logger = if ENV['SQL_LOGGING'] | |
Logger.new('/dev/stdout') | |
else | |
Logger.new('/dev/null') | |
end | |
rescue PG::ConnectionBad => e | |
if e.message.include?('database "csv_imports" does not exist') | |
raise ArgumentError, 'Database "csv_imports" does not exist. Please create it with `createdb csv_imports`' | |
end | |
raise | |
end | |
def create_table(table_name, columns) | |
return if table_exists?(table_name) | |
columns_str = columns.map do |column| | |
name, type, constraint = column.values_at(:name, :type, :constraint) | |
"#{sanitize(name)} #{sanitize(type)} #{constraint || ''}" | |
end.join(",\n") | |
execute(<<~SQL) | |
CREATE TABLE #{sanitize(table_name)} ( | |
#{sanitize(columns_str)} | |
) | |
SQL | |
end | |
def table_exists?(name) | |
result = execute(<<~SQL, name) | |
SELECT 1 | |
FROM information_schema.tables#{' '} | |
WHERE table_name = $1 | |
SQL | |
result.any? | |
end | |
def insert(table_name, values) | |
keys = values.flat_map(&:keys).uniq | |
i = 0 | |
values_str = values.map do |_acc, _h| | |
vals = keys.map do | |
i += 1 | |
"$#{i}" | |
end.join(',') | |
"(#{vals})" | |
end.join(',') | |
values_params = values.flat_map do |h| | |
keys.map do |k| | |
h[k] | |
end | |
end | |
execute(<<~SQL, *values_params) | |
INSERT INTO #{sanitize(table_name)} | |
(#{sanitize(keys.join(','))}) | |
VALUES #{sanitize(values_str)} | |
SQL | |
end | |
private | |
attr_reader :conn, :sql_logger | |
def execute(sql, *params) | |
sql_logger.info { "execute(#{sql}, #{params})" } | |
conn.exec_params(sql, params) | |
end | |
def sanitize(str) | |
conn.escape_string(str.to_s) | |
end | |
end | |
options = { primary_key: :id } | |
OptionParser.new do |opts| | |
opts.banner = 'Usage: import_csv_to_postgres.rb [options]' | |
opts.on('-f', '--file [FILE]', 'Path to the csv file') do |f| | |
options[:file] = f | |
end | |
opts.on('-p', '--primary-key [PRIMARY_KEY]', 'Primary key column name') do |p| | |
options[:primary_key] = p | |
end | |
opts.on('-t', '--table [TABLE]', 'Table name') do |t| | |
options[:table] = t | |
end | |
end.parse! | |
input = options[:file] ? File.read(File.expand_path(options[:file])) : $stdin.read | |
table_name = | |
if options[:table] | |
options[:table] | |
elsif (file = options[:file]) | |
file.split('/').last.split('.').first.tr('-', '_') | |
else | |
"import_#{Time.now.to_i}" | |
end | |
csv_table = CSV.parse(input, quote_char: '"', liberal_parsing: true, headers: true).each do |row| | |
row.each { |_, v| v&.strip! } # stip whitespace out of values | |
end | |
headers = csv_table.headers | |
columns = headers.map do |h| | |
pg_type, nullable = detect_type_of_col(csv_table[h]) | |
pk_constraint = 'PRIMARY KEY' if h == options[:primary_key] | |
raise ArgumentError, 'Primary key cannot include null values' if pk_constraint && nullable | |
{ | |
name: h, | |
type: pg_type, | |
constraint: pk_constraint || (nullable ? '' : 'NOT NULL') | |
} | |
end | |
# if we don't already have a primary key, add one | |
unless columns.find { |c| c[:constraint] == 'PRIMARY KEY' } | |
columns.push( | |
name: options[:primary_key], | |
type: :serial, | |
constraint: 'PRIMARY KEY' | |
) | |
end | |
data = csv_table.map(&:to_h) | |
db = PGHelper.new(dbname: 'csv_imports') | |
db.create_table(table_name, columns) | |
data.each_slice(1_000) do |slice| | |
db.insert(table_name, slice) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment