Skip to content

Instantly share code, notes, and snippets.

@flq
Last active October 3, 2024 15:10
Show Gist options
  • Save flq/c476de9b51e6759b660a47783c040f47 to your computer and use it in GitHub Desktop.
Save flq/c476de9b51e6759b660a47783c040f47 to your computer and use it in GitHub Desktop.
A class that is supposed to help you consume an IAsyncEnumerable<byte[]> and expose it as a Stream
#nullable enable
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace Utilities;
public class AsyncEnumerableStream(IAsyncEnumerable<byte[]> source) : Stream
{
private readonly IAsyncEnumerator<byte[]> asyncEnumerator = source.GetAsyncEnumerator();
private byte[] currentBuffer = [];
private int position;
private bool isCompleted;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
if (isCompleted && position >= currentBuffer.Length)
{
return 0;
}
var totalBytesRead = 0;
while (totalBytesRead < count)
{
// If we've consumed current buffer, try to get next chunk
if (position >= currentBuffer.Length)
{
if (!await asyncEnumerator.MoveNextAsync(cancellationToken))
{
isCompleted = true;
break;
}
currentBuffer = asyncEnumerator.Current;
position = 0;
}
// Calculate how many bytes we can copy from current buffer
var remainingBytesInBuffer = currentBuffer.Length - position;
var bytesToCopy = Math.Min(count - totalBytesRead, remainingBytesInBuffer);
// Copy bytes to the output buffer
Array.Copy(currentBuffer, position, buffer, offset + totalBytesRead, bytesToCopy);
position += bytesToCopy;
totalBytesRead += bytesToCopy;
}
return totalBytesRead;
}
public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count).GetAwaiter().GetResult();
}
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override void Flush() { }
protected override void Dispose(bool disposing)
{
if (disposing)
{
asyncEnumerator.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
base.Dispose(disposing);
}
public override async ValueTask DisposeAsync()
{
await asyncEnumerator.DisposeAsync();
await base.DisposeAsync();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment