Created
January 5, 2024 16:05
-
-
Save MarkPflug/2c3c4fc39b1716788fd18093d64ab9da to your computer and use it in GitHub Desktop.
CsvPostgresBulkInsert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Npgsql; | |
using NpgsqlTypes; | |
using Sylvan.Data.Csv; | |
using System.Collections.ObjectModel; | |
using System.Data.Common; | |
class Program | |
{ | |
static async Task Main() | |
{ | |
/* | |
CREATE TABLE contacts | |
( | |
id serial primary key, | |
firstname VARCHAR(40) not null, | |
lastname varchar(40) not null, | |
dateofbirth timestamp not null | |
); | |
*/ | |
var tableName = "contacts"; | |
/* | |
FirstName,LastName,DateOfBirth | |
tony,eltigre,1973-01-01 | |
capn,crunch,1953-11-07 | |
*/ | |
var path = "contacts.csv"; | |
var conn = new NpgsqlConnection(); | |
conn.ConnectionString = new NpgsqlConnectionStringBuilder | |
{ | |
Host = "localhost", | |
Database = "test", | |
}.ConnectionString; | |
await conn.OpenAsync(); | |
var cmd = conn.CreateCommand(); | |
// limit 0. Don't need any data, just the schema. | |
cmd.CommandText = $"select * from {tableName} limit 0"; | |
ReadOnlyCollection<DbColumn> sqltableSchema; | |
using (var sqlreader = cmd.ExecuteReader()) { | |
// this must be called through the interface | |
// because NpgsqlDataReader exposes a different GetColumnSchema method. | |
sqltableSchema = ((IDbColumnSchemaGenerator)sqlreader).GetColumnSchema(); | |
} | |
var csvReadOpt = new CsvDataReaderOptions | |
{ | |
Schema = new CsvSchema(sqltableSchema) | |
}; | |
using var edr = await CsvDataReader.CreateAsync(path, csvReadOpt); | |
WriteData(conn, tableName, edr); | |
} | |
static long WriteData(NpgsqlConnection conn, string tableName, DbDataReader data) | |
{ | |
var schema = data.GetColumnSchema(); | |
var sw = new StringWriter(); | |
sw.WriteLine($"copy {tableName} ("); | |
for (int i = 0; i < schema.Count; i++) | |
{ | |
if (i > 0) | |
sw.Write(", "); | |
var colSchema = schema[i]; | |
sw.Write('\"'); | |
sw.Write(colSchema.ColumnName); | |
sw.Write('\"'); | |
} | |
sw.Write(")"); | |
sw.Write("from stdin (format binary);"); | |
var cmd = sw.ToString(); | |
var bi = conn.BeginBinaryImport(cmd); | |
while (data.Read()) | |
{ | |
bi.StartRow(); | |
for (var i = 0; i < data.FieldCount; i++) | |
{ | |
if (data.IsDBNull(i)) | |
{ | |
bi.WriteNull(); | |
continue; | |
} | |
var type = schema[i].DataType; | |
var dbType = GetType(type); | |
switch (dbType) | |
{ | |
case NpgsqlDbType.Text: | |
var t = data.GetString(i); | |
bi.Write(t, dbType); | |
break; | |
case NpgsqlDbType.Char: | |
var str = data.GetString(i); | |
bi.Write(str, dbType); | |
break; | |
case NpgsqlDbType.Smallint: | |
// TODO: need to figure out "tinyint" scenario. postgres doesn't support it. | |
bi.Write(data.GetInt16(i), dbType); | |
break; | |
case NpgsqlDbType.Integer: | |
bi.Write(data.GetInt32(i), dbType); | |
break; | |
case NpgsqlDbType.Bigint: | |
bi.Write(data.GetInt64(i), dbType); | |
break; | |
case NpgsqlDbType.Boolean: | |
bi.Write(data.GetBoolean(i), dbType); | |
break; | |
case NpgsqlDbType.Double: | |
bi.Write(data.GetDouble(i), dbType); | |
break; | |
case NpgsqlDbType.Real: | |
bi.Write(data.GetFloat(i), dbType); | |
break; | |
case NpgsqlDbType.Money: | |
case NpgsqlDbType.Numeric: | |
bi.Write(data.GetDecimal(i), dbType); | |
break; | |
case NpgsqlDbType.Timestamp: | |
bi.Write(data.GetDateTime(i), dbType); | |
break; | |
case NpgsqlDbType.Uuid: | |
bi.Write(data.GetGuid(i), dbType); | |
break; | |
case NpgsqlDbType.Bytea: | |
bi.Write((byte[])data.GetValue(i), dbType); | |
break; | |
default: | |
throw new NotSupportedException(); | |
} | |
} | |
} | |
return (long)bi.Complete(); | |
} | |
static NpgsqlDbType GetType(Type type) | |
{ | |
if (type == typeof(string)) | |
return NpgsqlDbType.Text; | |
if (type == typeof(byte)) | |
return NpgsqlDbType.Smallint; | |
if (type == typeof(int)) | |
return NpgsqlDbType.Integer; | |
if (type == typeof(short)) | |
return NpgsqlDbType.Smallint; | |
if (type == typeof(long)) | |
return NpgsqlDbType.Bigint; | |
if (type == typeof(bool)) | |
return NpgsqlDbType.Boolean; | |
if (type == typeof(float)) | |
return NpgsqlDbType.Real; | |
if (type == typeof(double)) | |
return NpgsqlDbType.Double; | |
if (type == typeof(decimal)) | |
return NpgsqlDbType.Numeric; | |
if (type == typeof(DateTime)) | |
return NpgsqlDbType.Timestamp; | |
if (type == typeof(byte[])) | |
return NpgsqlDbType.Bytea; | |
if (type == typeof(Guid)) | |
return NpgsqlDbType.Uuid; | |
throw new NotSupportedException(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment