Skip to content

Instantly share code, notes, and snippets.

@mganss
Created January 2, 2013 12:55

Revisions

  1. mganss created this gist Jan 2, 2013.
    63 changes: 63 additions & 0 deletions Pool.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,63 @@
    using System;
    using System.Collections.Concurrent;

    namespace XY
    {
    public interface IPool<T>
    {
    T Take();
    void Return(T t);
    }

    public interface IBufferPool : IPool<byte[]>
    {
    int BufferSize { get; }
    }

    /// <summary>
    /// A pool of reusable objects.
    /// </summary>
    /// <typeparam name="T">The type of objects in the pool.</typeparam>
    public class Pool<T> : IPool<T>
    {
    protected ConcurrentStack<T> _Pool = new ConcurrentStack<T>();
    protected Func<T> Allocator;

    public Pool(Func<T> allocator)
    {
    Allocator = allocator;
    }

    public T Take()
    {
    T t;
    if (!_Pool.TryPop(out t)) t = Allocator();
    return t;
    }

    public void Return(T t)
    {
    _Pool.Push(t);
    }
    }

    /// <summary>
    /// A pool of reusable byte array buffers.
    /// </summary>
    public class BufferPool : Pool<byte[]>, IBufferPool
    {
    private const int _BufferSize = 8192;

    public int BufferSize
    {
    get { return _BufferSize; }
    }

    public BufferPool()
    : base(() => new byte[_BufferSize])
    {
    }

    public static BufferPool Instance = new BufferPool();
    }
    }
    448 changes: 448 additions & 0 deletions PoolMemoryStream.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,448 @@
    using System;
    using System.IO;
    using System.Collections.Generic;

    namespace XY
    {
    /// <summary>
    /// An alternative to <see cref="System.IO.MemoryStream"/> that uses a number of buffers
    /// taken from a pool as its backing store instead of a single buffer.
    /// </summary>
    public class PoolMemoryStream : Stream
    {
    private List<byte[]> Buffers = new List<byte[]>();
    private IBufferPool _pool;
    public IBufferPool Pool
    {
    get { return _pool; }
    private set { _pool = value; }
    }

    public PoolMemoryStream(IBufferPool pool = null)
    {
    if (pool == null) pool = BufferPool.Instance;
    Pool = pool;
    }

    public PoolMemoryStream(int capacity, IBufferPool pool = null)
    {
    if (pool == null) pool = BufferPool.Instance;
    Pool = pool;
    SetCapacity(capacity);
    }

    public PoolMemoryStream(byte[] buffer, IBufferPool pool = null)
    {
    Constructor(buffer, 0, buffer.Length, true, false, pool);
    }

    public PoolMemoryStream(byte[] buffer, bool writable, IBufferPool pool = null)
    {
    Constructor(buffer, 0, buffer.Length, writable, false, pool);
    }

    public PoolMemoryStream(byte[] buffer, int index, int count, IBufferPool pool = null)
    {
    Constructor(buffer, index, count, true, false, pool);
    }

    public PoolMemoryStream(byte[] buffer, int index, int count, bool writable, IBufferPool pool = null)
    {
    Constructor(buffer, index, count, writable, false, pool);
    }

    public PoolMemoryStream(byte[] buffer, int index, int count, bool writable, bool publiclyVisible, IBufferPool pool = null)
    {
    Constructor(buffer, index, count, writable, publiclyVisible, pool);
    }

    private void Constructor(byte[] buffer, int index, int count, bool writable, bool publiclyVisible, IBufferPool pool)
    {
    if (buffer == null)
    throw new ArgumentNullException("buffer");
    if (index < 0)
    throw new ArgumentOutOfRangeException("index", "index is negative");
    if (count < 0)
    throw new ArgumentOutOfRangeException("count", "count is negative");
    if (buffer.Length - index < count)
    throw new ArgumentException("index+count", "size of buffer is less than index + count");

    if (pool == null) pool = BufferPool.Instance;

    Pool = pool;
    Write(buffer, index, count);
    Position = 0;
    canWrite = writable;
    visible = publiclyVisible;
    expandable = false;
    }

    private bool expandable = true;
    private bool canWrite = true;
    private bool visible = true;

    public virtual byte[] ToArray()
    {
    var buffer = new byte[Length];
    var bufNum = 0;
    var posInBuf = 0;
    var bytesToRead = Length;
    var bytesRead = 0;
    var bytesLeft = Length;
    var offset = 0;

    while (bytesLeft > 0)
    {
    var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? (int)bytesLeft : Pool.BufferSize - posInBuf;
    var buf = Buffers[bufNum];
    Buffer.BlockCopy(buf, posInBuf, buffer, offset, bytesToCopy);
    Position += bytesToCopy;
    offset += bytesToCopy;
    bytesLeft -= bytesToCopy;
    bytesRead += bytesToCopy;
    bufNum++;
    posInBuf = 0;
    }

    return buffer;
    }

    public virtual byte[] GetBuffer()
    {
    if (!visible)
    throw new UnauthorizedAccessException();

    var buffer = new byte[Capacity];
    var offset = 0;

    foreach (var buf in Buffers)
    {
    Buffer.BlockCopy(buf, 0, buffer, offset, buf.Length);
    offset += buf.Length;
    }

    return buffer;
    }

    #region implemented abstract members of Stream

    public override void Flush()
    {
    }

    private void CheckIfDisposed()
    {
    if (_disposed)
    throw new ObjectDisposedException("PoolMemoryStream");
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
    if (buffer == null)
    throw new ArgumentNullException("buffer");
    if ((offset + count) > buffer.Length)
    throw new ArgumentException("buffer too small", "buffer");
    if (offset < 0)
    throw new ArgumentException("offset must be >= 0", "offset");
    if (count < 0)
    throw new ArgumentException("count must be >= 0", "count");

    CheckIfDisposed();

    if (Position >= Length || count == 0)
    return 0;

    var bufNum = (int)Position / Pool.BufferSize;
    var posInBuf = (int)Position - bufNum * Pool.BufferSize;
    var bytesToRead = Math.Min(count, (int)(Length - Position));
    var bytesRead = 0;
    var bytesLeft = bytesToRead - bytesRead;

    while (bytesLeft > 0)
    {
    var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? bytesLeft : Pool.BufferSize - posInBuf;
    var buf = Buffers[bufNum];
    Buffer.BlockCopy(buf, posInBuf, buffer, offset, bytesToCopy);
    Position += bytesToCopy;
    offset += bytesToCopy;
    bytesLeft -= bytesToCopy;
    bytesRead += bytesToCopy;
    bufNum++;
    posInBuf = 0;
    }

    return bytesToRead;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
    CheckIfDisposed();

    if (offset > (long)int.MaxValue)
    throw new ArgumentOutOfRangeException("offset out of range. " + offset);

    switch (origin)
    {
    case SeekOrigin.Current:
    offset += Position;
    break;
    case SeekOrigin.End:
    offset += Length;
    break;
    case SeekOrigin.Begin:
    break;
    default:
    throw new ArgumentException("origin", "invalid SeekOrigin");
    }

    if (offset < 0)
    throw new IOException("Attempted to seek before start of PoolMemoryStream.");

    Position = offset;

    return Position;
    }

    public int Capacity
    {
    get
    {
    CheckIfDisposed();
    return Buffers.Count * Pool.BufferSize;
    }

    set
    {
    CheckIfDisposed();

    if (value < 0 || value < Length)
    throw new ArgumentOutOfRangeException("value", "capacity cannot be negative or smaller than length of stream.");

    SetCapacity(value);
    }
    }

    private bool dirty = true;

    public override void SetLength(long value)
    {
    if (!canWrite)
    throw new NotSupportedException("cannot write to stream");

    SetCapacity(value);

    if (value < _length)
    dirty = true;

    _length = value;

    if (Position > _length)
    Position = _length;
    }

    private void SetCapacity(long value)
    {
    CheckIfDisposed();

    if (!expandable && value > Capacity)
    throw new NotSupportedException("cannot expand stream");

    if (value < 0 || value > int.MaxValue)
    throw new ArgumentOutOfRangeException();

    if (value == 0)
    {
    foreach (var buf in Buffers)
    {
    Pool.Return(buf);
    }
    Buffers.Clear();
    return;
    }

    if (value == Capacity) return;

    int buffers = (int)(value / Pool.BufferSize);
    if ((value - buffers * Pool.BufferSize) > 0)
    buffers++;

    if (buffers < Buffers.Count)
    {
    for (int i = buffers + 1; i < Buffers.Count; i++)
    {
    Pool.Return(Buffers[i]);
    }

    Buffers.RemoveRange(buffers + 1, Buffers.Count - buffers);
    }
    else
    {
    if (dirty && Buffers.Count > 0)
    {
    var dirtyBytes = Capacity - Length;
    var lastBuf = Buffers[Buffers.Count - 1];
    for (var i = Pool.BufferSize - dirtyBytes; i < Pool.BufferSize; i++)
    {
    lastBuf[i] = 0;
    }

    dirty = false;
    }

    if (buffers > Buffers.Count)
    {
    for (int i = Buffers.Count; i < buffers; i++)
    {
    var buf = Pool.Take();
    Buffers.Add(buf);
    }
    }
    }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
    if (!canWrite)
    throw new NotSupportedException("cannot write to stream");
    if (buffer == null)
    throw new ArgumentNullException("buffer");
    if ((offset + count) > buffer.Length)
    throw new ArgumentException("buffer too small", "buffer");
    if (offset < 0)
    throw new ArgumentException("offset must be >= 0", "offset");
    if (count < 0)
    throw new ArgumentException("count must be >= 0", "count");

    CheckIfDisposed();

    if (Position > Length - count)
    SetLength(Position + count);

    var bufNum = (int)Position / Pool.BufferSize;
    var posInBuf = (int)Position - bufNum * Pool.BufferSize;
    var bytesWritten = 0;
    var bytesLeft = count - bytesWritten;

    while (bytesLeft > 0)
    {
    var bytesToCopy = (posInBuf + bytesLeft) < Pool.BufferSize ? bytesLeft : Pool.BufferSize - posInBuf;
    var buf = Buffers[bufNum];
    Buffer.BlockCopy(buffer, offset, buf, posInBuf, bytesToCopy);
    Position += bytesToCopy;
    offset += bytesToCopy;
    bytesLeft -= bytesToCopy;
    bytesWritten += bytesToCopy;
    bufNum++;
    posInBuf = 0;
    }
    }

    public override void WriteByte(byte value)
    {
    if (!canWrite)
    throw new NotSupportedException("cannot write to stream");

    CheckIfDisposed();

    if (Position >= Length)
    SetLength(Position + 1);

    var bufNum = (int)Position / Pool.BufferSize;
    var posInBuf = (int)Position - bufNum * Pool.BufferSize;

    var buf = Buffers[bufNum];
    buf[posInBuf] = value;

    Position++;
    }

    public override int ReadByte()
    {
    CheckIfDisposed();

    if (Position >= Length)
    return -1;

    var bufNum = (int)Position / Pool.BufferSize;
    var posInBuf = (int)Position - bufNum * Pool.BufferSize;

    var buf = Buffers[bufNum];
    Position++;

    return buf[posInBuf];
    }

    public override bool CanRead
    {
    get
    {
    return !_disposed;
    }
    }

    public override bool CanSeek
    {
    get
    {
    return !_disposed;
    }
    }

    public override bool CanWrite
    {
    get
    {
    return !_disposed && canWrite;
    }
    }

    private long _length;

    public override long Length
    {
    get
    {
    CheckIfDisposed();
    return _length;
    }
    }

    private long _position;

    public override long Position
    {
    get
    {
    CheckIfDisposed();
    return _position;
    }
    set
    {
    CheckIfDisposed();
    _position = value;
    }
    }

    #endregion

    private bool _disposed;

    protected override void Dispose(bool disposing)
    {
    if (!_disposed)
    {
    if (disposing)
    {
    foreach (var buffer in Buffers)
    {
    Pool.Return(buffer);
    }

    Buffers.Clear();
    }

    _disposed = true;
    }
    }
    }
    }