Skip to content

Instantly share code, notes, and snippets.

@hhblaze
Last active April 3, 2025 07:22
Show Gist options
  • Save hhblaze/a499268565d129d4ed322b17c8127812 to your computer and use it in GitHub Desktop.
Save hhblaze/a499268565d129d4ed322b17c8127812 to your computer and use it in GitHub Desktop.
Reading parquet files with C#

//Example is prepared for reading embedding vectors packed in Parquet //https://huggingface.co/datasets/KShivendu/dbpedia-entities-openai-1M/tree/main/data

  • System.Linq.Async is needed (nuget)
  • Parquet.Net 5.1.1 (nuget)
//Example of usage
 string dirPath = @"D:\Temp\DBPedia\";
 List<string> parquetFiles = new List<string>()
 {
     System.IO.Path.Combine(dirPath, @"train-00000-of-00026-3c7b99d1c7eda36e.parquet"),
     System.IO.Path.Combine(dirPath, @"train-00001-of-00026-2b24035a6390fdcb.parquet")
     //...etc
 };


 int TAKE = 90000;

 await foreach (DataCorpus001 el in ReadParquetFilesAsync(parquetFiles).Take(TAKE))  //For take use System.Linq.Async package
 {
  
 }


//------Parquet reader

using Parquet;
using Parquet.Data;
using Parquet.Schema;

 internal class ParquetReaderExample // Renamed the class to avoid conflict with Parquet.ParquetReader
 {
     //Data from https://huggingface.co/datasets/KShivendu/dbpedia-entities-openai-1M/tree/main/data


     //public static async Task Main(string fileName)
     //{
     //    string dirPath = @"D:\Temp\DBPedia\";
     //    fileName = System.IO.Path.Combine(dirPath, @"train-00000-of-00026-3c7b99d1c7eda36e.parquet"); //38462 rows
     //    fileName = System.IO.Path.Combine(dirPath, @"train-00001-of-00026-2b24035a6390fdcb.parquet"); //38462 rows
        

     //    //26 files* 38462 = 1.000.012 embeddings from dbPedia

     //    int total = 0;
     //    await foreach (DataCorpus001 item in ReadParquetFileAsync(fileName))
     //    {
     //        // Console.WriteLine($"_id: {item.Id}, Title: {item.Title}, Text: {item.Text.Substring(0, Math.Min(item.Text.Length, 50))}..., OpenAI: {item.Embedding.Length} elements");
     //        total++;
     //    }
     //}

     /// <summary>
     /// 
     /// </summary>
     /// <param name="fileNames"></param>
     /// <returns></returns>
     public static async IAsyncEnumerable<DataCorpus001> ReadParquetFilesAsync(List<string> fileNames) // Changed to IAsyncEnumerable
     {
         foreach(var fn in fileNames)
         {
             await foreach (DataCorpus001 item in ReadParquetFileAsync(fn))
             {
                 yield return item;
             }
         }
     }

     public class DataCorpus001
     {
         public string Id { get; set; }
         public string Title { get; set; }
         public string Text { get; set; }
         public double[] Embedding { get; set; }
     }
     public static async IAsyncEnumerable<DataCorpus001> ReadParquetFileAsync(string filePath) // Changed to IAsyncEnumerable
     {
         using (Stream fileStream = File.OpenRead(filePath))
         {
             using (ParquetReader parquetReader = await ParquetReader.CreateAsync(fileStream))
             {
                 DataField[] dataFields = parquetReader.Schema.GetDataFields();

                 // Adjusted Schema Validation
                 if (dataFields.Length != 4 ||
                     dataFields[0].Name != "_id" || dataFields[0].ClrType != typeof(string) ||
                     dataFields[1].Name != "title" || dataFields[1].ClrType != typeof(string) ||
                     dataFields[2].Name != "text" || dataFields[2].ClrType != typeof(string) ||
                     dataFields[3].Name != "item" || dataFields[3].ClrType != typeof(double))  //Corrected
                 {
                     throw new Exception("Parquet schema does not match expected schema.");
                 }

                 int embeddingSize = 1536; // Define the expected embedding size

                 for (int i = 0; i < parquetReader.RowGroupCount; i++)
                 {
                     using (ParquetRowGroupReader groupReader = parquetReader.OpenRowGroupReader(i))
                     {
                         // Read columns
                         string[] idColumn = (await groupReader.ReadColumnAsync(dataFields[0])).Data.Cast<string>().ToArray();
                         string[] titleColumn = (await groupReader.ReadColumnAsync(dataFields[1])).Data.Cast<string>().ToArray();
                         string[] textColumn = (await groupReader.ReadColumnAsync(dataFields[2])).Data.Cast<string>().ToArray();
                         double[] openaiColumn = (await groupReader.ReadColumnAsync(dataFields[3])).Data.Cast<double>().ToArray();

                         // Process rows
                         for (int row = 0; row < idColumn.Length; row++)
                         {
                             DataCorpus001 dcr = new DataCorpus001()
                             {
                                 Id = idColumn[row],
                                 Title = titleColumn[row],
                                 Text = textColumn[row],
                             };
                             double[] openaiEmbedding = new double[embeddingSize];
                             Array.Copy(openaiColumn, row * embeddingSize, openaiEmbedding, 0, embeddingSize);
                             dcr.Embedding = openaiEmbedding;

                             yield return dcr; // Yield each item as it's created
                         }
                     }
                 }
             }
         }
     }


     /// <summary>
     /// 
     /// </summary>
     /// <param name="fileNames"></param>
     /// <returns></returns>
     public static async IAsyncEnumerable<DataCorpus001_float> ReadParquetFilesAsync_float(List<string> fileNames) // Changed to IAsyncEnumerable
     {
         foreach (var fn in fileNames)
         {
             await foreach (var item in ReadParquetFileAsync_float(fn))
             {
                 yield return item;
             }
         }
     }

     public class DataCorpus001_float
     {
         public string Id { get; set; }
         public string Title { get; set; }
         public string Text { get; set; }
         public float[] Embedding { get; set; }
     }
     public static async IAsyncEnumerable<DataCorpus001_float> ReadParquetFileAsync_float(string filePath) // Changed to IAsyncEnumerable
     {
         using (Stream fileStream = File.OpenRead(filePath))
         {
             using (ParquetReader parquetReader = await ParquetReader.CreateAsync(fileStream))
             {
                 DataField[] dataFields = parquetReader.Schema.GetDataFields();

                 // Adjusted Schema Validation
                 if (dataFields.Length != 4 ||
                     dataFields[0].Name != "_id" || dataFields[0].ClrType != typeof(string) ||
                     dataFields[1].Name != "title" || dataFields[1].ClrType != typeof(string) ||
                     dataFields[2].Name != "text" || dataFields[2].ClrType != typeof(string) ||
                     dataFields[3].Name != "item" || dataFields[3].ClrType != typeof(double))  //Corrected
                 {
                     throw new Exception("Parquet schema does not match expected schema.");
                 }

                 int embeddingSize = 1536; // Define the expected embedding size

                 for (int i = 0; i < parquetReader.RowGroupCount; i++)
                 {
                     using (ParquetRowGroupReader groupReader = parquetReader.OpenRowGroupReader(i))
                     {
                         // Read columns
                         string[] idColumn = (await groupReader.ReadColumnAsync(dataFields[0])).Data.Cast<string>().ToArray();
                         string[] titleColumn = (await groupReader.ReadColumnAsync(dataFields[1])).Data.Cast<string>().ToArray();
                         string[] textColumn = (await groupReader.ReadColumnAsync(dataFields[2])).Data.Cast<string>().ToArray();
                         double[] openaiColumn = (await groupReader.ReadColumnAsync(dataFields[3])).Data.Cast<double>().ToArray();

                         // Process rows
                         for (int row = 0; row < idColumn.Length; row++)
                         {
                             DataCorpus001_float dcr = new DataCorpus001_float()
                             {
                                 Id = idColumn[row],
                                 Title = titleColumn[row],
                                 Text = textColumn[row],
                             };
                             double[] openaiEmbedding = new double[embeddingSize];
                             Array.Copy(openaiColumn, row * embeddingSize, openaiEmbedding, 0, embeddingSize);
                             //dcr.Embedding = openaiEmbedding;

                             //Simple cast
                             dcr.Embedding = new float[openaiEmbedding.Length];
                             for (int ii = 0; ii < openaiEmbedding.Length; ii++)
                             {
                                 // Explicit cast to float.  This will handle potential overflow/underflow.
                                 dcr.Embedding[ii] = (float)openaiEmbedding[ii];
                             }

                             yield return dcr; // Yield each item as it's created
                         }
                     }
                 }
             }
         }
     }

 }//eoc
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment