Skip to content

Instantly share code, notes, and snippets.

@MarkPflug
Created May 16, 2024 19:57
Show Gist options
  • Select an option

  • Save MarkPflug/caab490eae67364eadcefe50b89e4552 to your computer and use it in GitHub Desktop.

Select an option

Save MarkPflug/caab490eae67364eadcefe50b89e4552 to your computer and use it in GitHub Desktop.
.NET Stream Inversion
// Include nupkgs: Sylvan.Data, Sylvan.Data.Csv
using Sylvan.Data;
using Sylvan.Data.Csv;
int count = 10000000;// controls how many records to write
const string file = "dump.csv";
// "invert" the WriteCsvToStream code into a readable stream.
using Stream stream = new InvertedStream((stream) => WriteCsvToStream(stream, count));
// copy the readable stream to a file to simulate the consumer
using (var o = File.Create(file))
{
stream.CopyTo(o);
}
var proc = System.Diagnostics.Process.GetCurrentProcess();
Console.WriteLine("Mem used: " + proc.PeakWorkingSet64);
Console.WriteLine("File size: " + new FileInfo(file).Length);
// This represents the code that pulls from the database and writes CSV data to a stream
static void WriteCsvToStream(Stream s, int count)
{
// get some data records
var data = new[]
{
new {Id = 1, Name = "Alpha", Date = new DateTime(2020, 1, 11) },
new {Id = 2, Name = "Beta", Date = new DateTime(2022, 2, 13) },
new {Id = 3, Name = "Delta", Date = new DateTime(2024, 11, 29) },
};
System.Data.Common.DbDataReader reader =
// repeat the 3 sample records count number of times
Enumerable.Range(0, count)
.SelectMany(_ => data)
// create a data reader of the records (Sylvan.Data)
.AsDataReader();
// Write the records to the provided Stream
// which will be inverted into a readable stream for the consuming code
using var tw = new StreamWriter(s);
using var w = CsvDataWriter.Create(tw);
w.Write(reader);
}
/// <summary>
/// Creates a Stream that can turn code that writes to a Stream, into a Stream that can be read from,
/// without requiring the entire stream be buffered in memory.
/// </summary>
public sealed class InvertedStream : Stream
{
// lock on the inner writerStream which is marshalbyref, but it will never be marshaled.
#pragma warning disable CA2002
readonly Action<Stream> writer;
readonly InvertedWriterStream writerStream;
readonly byte[] buffer;
int bufferPos;
int bufferEnd;
bool writerComplete;
Task? writerTask;
/// <summary>
/// Creates a new InvertedStream.
/// </summary>
/// <param name="writer">The code that writes the stream content.</param>
/// <param name="bufferSize">The size of internal buffer to use.</param>
public InvertedStream(Action<Stream> writer, int bufferSize = 0x10000)
{
this.writer = new Action<Stream>(writer);
this.writerStream = new InvertedWriterStream(this);
this.buffer = new byte[bufferSize];
this.bufferPos = this.bufferEnd = 0;
this.writerComplete = false;
}
/// <inheritdoc/>
public override bool CanRead => true;
/// <inheritdoc/>
public override bool CanSeek => false;
/// <inheritdoc/>
public override bool CanWrite => false;
/// <inheritdoc/>
public override long Length => throw new NotSupportedException();
/// <inheritdoc/>
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
/// <inheritdoc/>
public override void Flush()
{
// doesn't make sense to flush a reader stream, but also should be harmless to call.
}
/// <inheritdoc/>
public override int Read(byte[] buffer, int offset, int count)
{
if (buffer == null) throw new ArgumentNullException(nameof(buffer));
if (offset + count > buffer.Length) throw new ArgumentOutOfRangeException(nameof(count));
if (this.writerTask == null)
{
this.writerTask = Task.Run(() => writer(this.writerStream));
}
lock (this.writerStream)
{
int c = 0;
while (c < count)
{
if (bufferPos == bufferEnd)
{
if (writerComplete)
{
return c;
}
// give the writer a chance to fill the buffer.
Monitor.Pulse(this.writerStream);
Monitor.Wait(this.writerStream);
}
var avail = bufferEnd - bufferPos;
var len = Math.Min(avail, count - c);
Buffer.BlockCopy(this.buffer, this.bufferPos, buffer, offset, len);
offset += len;
c += len;
this.bufferPos += len;
}
return c;
}
}
#region NotSupported
/// <inheritdoc/>
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
/// <inheritdoc/>
public override void SetLength(long value)
{
throw new NotSupportedException();
}
/// <inheritdoc/>
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
#endregion
sealed class InvertedWriterStream : Stream
{
readonly InvertedStream s;
public InvertedWriterStream(InvertedStream s)
{
this.s = s;
}
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
lock (this)
{
while (count > 0)
{
if (s.bufferPos > 0)
{
// shift any unread bytes to the start of the buffer
if (s.bufferPos == s.bufferEnd)
{
s.bufferEnd = 0;
}
else
{
var len = s.bufferEnd - s.bufferPos;
// TODO: verify that this works correctly with overlap
Buffer.BlockCopy(s.buffer, s.bufferPos, s.buffer, 0, len);
s.bufferEnd -= len;
}
s.bufferPos = 0;
}
var avail = s.buffer.Length - s.bufferEnd;
var c = Math.Min(avail, count);
Buffer.BlockCopy(buffer, offset, s.buffer, s.bufferEnd, c);
s.bufferEnd += c;
count -= c;
if (c == avail)
{
// we've exhausted the space in the buffer, and the reader needs a chance to consume it
Monitor.Pulse(this);
Monitor.Wait(this);
}
}
}
}
public override void Flush()
{
lock (this)
{
if (s.bufferPos < s.bufferEnd)
{
// there is content in the buffer that the reader needs to consume.
Monitor.Pulse(this);
Monitor.Wait(this);
}
}
}
public override void Close()
{
lock (this)
{
s.writerComplete = true;
// reader needs see that writing is complete
Monitor.Pulse(this);
Monitor.Wait(this);
}
}
#region NotSupported
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment