Last active
October 16, 2017 13:58
-
-
Save keimpema/8552968b2da9c4bca2dc077a9e03d1cf to your computer and use it in GitHub Desktop.
PipeStream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.IO; | |
namespace Usenet.Util | |
{ | |
public abstract class AbstractBaseStream : Stream | |
{ | |
public override void Flush() {} | |
public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException(); | |
public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException(); | |
public override void SetLength(long value) => throw new NotImplementedException(); | |
public override void Write(byte[] buffer, int offset, int count) => throw new NotImplementedException(); | |
public override bool CanRead => false; | |
public override bool CanSeek => false; | |
public override bool CanWrite => false; | |
public override long Length => throw new NotSupportedException(); | |
public override long Position | |
{ | |
get => throw new NotSupportedException(); | |
set => throw new NotSupportedException(); | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using Microsoft.Extensions.Logging; | |
namespace Usenet.Util | |
{ | |
public class PipeStream : AbstractBaseStream | |
{ | |
private class Chunk | |
{ | |
public byte[] Data { get; set; } | |
public int Offset { get; set; } | |
public int Length => Data.Length; | |
} | |
private static readonly ILogger log = Logger.Create<PipeStream>(); | |
private readonly BlockingCollection<Chunk> chunks; | |
private Chunk currentChunk; | |
private long length; | |
private int readTimeout = Timeout.Infinite; | |
public PipeStream() | |
{ | |
chunks = new BlockingCollection<Chunk>(); | |
} | |
public override int Read(byte[] buffer, int offset, int count) | |
{ | |
Guard.ThrowIfNull(buffer, nameof(buffer)); | |
if (offset < 0 || offset >= buffer.Length) | |
{ | |
throw new ArgumentOutOfRangeException(nameof(offset)); | |
} | |
if (count < 0 || offset + count > buffer.Length) | |
{ | |
throw new ArgumentOutOfRangeException(nameof(count)); | |
} | |
var total = 0; | |
while (count > 0) | |
{ | |
if (currentChunk == null || currentChunk.Offset >= currentChunk.Length) | |
{ | |
// need a new chunk | |
log.LogDebug("Taking chunk from pipe"); | |
if (!chunks.TryTake(out currentChunk, ReadTimeout)) | |
{ | |
// no more chunks available | |
log.LogDebug("Pipe is empty"); | |
return total; | |
} | |
log.LogDebug("Took chunk from pipe: {Size} bytes", currentChunk.Length); | |
} | |
int copyCount = Math.Min(count, currentChunk.Length - currentChunk.Offset); | |
log.LogDebug("Reading from current chunk: {Count} bytes", copyCount); | |
Buffer.BlockCopy(currentChunk.Data, currentChunk.Offset, buffer, offset, copyCount); | |
currentChunk.Offset += copyCount; | |
offset += copyCount; | |
total += copyCount; | |
count -= copyCount; | |
length -= copyCount; | |
} | |
return total; | |
} | |
public override void Write(byte[] buffer, int offset, int count) | |
{ | |
Guard.ThrowIfNull(buffer, nameof(buffer)); | |
if (offset < 0 || offset >= buffer.Length) | |
{ | |
throw new ArgumentOutOfRangeException(nameof(offset)); | |
} | |
if (count < 0 || offset + count > buffer.Length) | |
{ | |
throw new ArgumentOutOfRangeException(nameof(count)); | |
} | |
var chunk = new Chunk {Data = new byte[count]}; | |
log.LogDebug("Writing chunk to pipe: {Size} bytes", count); | |
Buffer.BlockCopy(buffer, offset, chunk.Data, 0, count); | |
chunks.Add(chunk); | |
length += count; | |
} | |
public override bool CanTimeout => true; | |
public override int ReadTimeout | |
{ | |
get => readTimeout; | |
set => readTimeout = value < 0 ? Timeout.Infinite : value; | |
} | |
public override bool CanRead => true; | |
public override bool CanWrite => true; | |
public override long Length => length; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Usenet.Util; | |
using Xunit; | |
namespace UsenetTests.Util | |
{ | |
public class PipeStreamTests | |
{ | |
[Fact] | |
public void SingleWriteSingleRead() | |
{ | |
var count = 10; | |
byte[] expected = GetBuffer(count); | |
var actual = new byte[count]; | |
var pipe = new PipeStream(); | |
pipe.Write(expected, 0, expected.Length); | |
int actualCount = pipe.Read(actual, 0, count); | |
Assert.Equal(count, actualCount); | |
Assert.Equal(expected, actual); | |
Assert.Equal(0, pipe.Length); | |
} | |
[Fact] | |
public void MultipleWritesSingleRead() | |
{ | |
var count = 10; | |
byte[] expected = GetBuffer(count); | |
var actual = new byte[count]; | |
var pipe = new PipeStream(); | |
pipe.Write(expected, 0, 3); | |
pipe.Write(expected, 3, 3); | |
pipe.Write(expected, 6, 4); | |
int actualCount = pipe.Read(actual, 0, 10); | |
Assert.Equal(count, actualCount); | |
Assert.Equal(expected, actual); | |
Assert.Equal(0, pipe.Length); | |
} | |
[Fact] | |
public void MultipleWritesMultipleReads() | |
{ | |
const int count = 10; | |
byte[] expected = GetBuffer(count); | |
var actual = new byte[count]; | |
var pipe = new PipeStream(); | |
pipe.Write(expected, 0, 3); | |
pipe.Write(expected, 3, 3); | |
pipe.Write(expected, 6, 4); | |
int actualCount = pipe.Read(actual, 0, 5); | |
actualCount += pipe.Read(actual, 5, 5); | |
Assert.Equal(count, actualCount); | |
Assert.Equal(expected, actual); | |
Assert.Equal(0, pipe.Length); | |
} | |
[Fact] | |
public void SingleWriteMultipleReads() | |
{ | |
const int count = 10; | |
byte[] expected = GetBuffer(count); | |
var actual = new byte[count]; | |
var pipe = new PipeStream(); | |
pipe.Write(expected, 0, count); | |
int actualCount = pipe.Read(actual, 0, 3); | |
actualCount += pipe.Read(actual, 3, 3); | |
actualCount += pipe.Read(actual, 6, 4); | |
Assert.Equal(count, actualCount); | |
Assert.Equal(expected, actual); | |
Assert.Equal(0, pipe.Length); | |
} | |
[Fact] | |
public void ReadMoreThanWrittenShouldTimeout() | |
{ | |
const int count = 10; | |
byte[] expected = GetBuffer(count); | |
var actual = new byte[count+1]; | |
var pipe = new PipeStream {ReadTimeout = 0}; | |
pipe.Write(expected, 0, count); | |
int actualCount = pipe.Read(actual, 0, actual.Length); | |
Assert.Equal(count, actualCount); | |
//Assert.Equal(expected, actual); | |
Assert.Equal(0, pipe.Length); | |
} | |
[Fact] | |
public void ReadShouldWaitForIncommingData() | |
{ | |
const int count = 10; | |
byte[] expected = GetBuffer(count); | |
var actual = new byte[count]; | |
var pipe = new PipeStream(); | |
Task<int> readTask = Task.Run(() => pipe.Read(actual, 0, actual.Length)); | |
pipe.Write(expected, 0, count); | |
Assert.Equal(count, readTask.Result); | |
Assert.Equal(expected, actual); | |
Assert.Equal(0, pipe.Length); | |
} | |
[Fact] | |
public void ReadShouldWaitForMultipleWrites() | |
{ | |
const int count = 10; | |
byte[] expected = GetBuffer(count); | |
var actual = new byte[count]; | |
var pipe = new PipeStream(); | |
Task<int> readTask = Task.Run(() => pipe.Read(actual, 0, actual.Length)); | |
pipe.Write(expected, 0, 3); | |
pipe.Write(expected, 3, 3); | |
pipe.Write(expected, 6, 4); | |
Assert.Equal(count, readTask.Result); | |
Assert.Equal(expected, actual); | |
Assert.Equal(0, pipe.Length); | |
} | |
[Fact] | |
public void ReadShouldTimeout() | |
{ | |
const int count = 10; | |
var actual = new byte[count]; | |
var pipe = new PipeStream {ReadTimeout = 0}; | |
int actualCount = pipe.Read(actual, 0, actual.Length); | |
Assert.Equal(0, actualCount); | |
Assert.Equal(0, pipe.Length); | |
} | |
[Fact] | |
public void NegativeReadTimeoutShouldResultInInfinte() | |
{ | |
var pipe = new PipeStream { ReadTimeout = -12345 }; | |
Assert.Equal(Timeout.Infinite, pipe.ReadTimeout); | |
} | |
[Fact] | |
public void ZeroReadTimeoutShouldBeSetCorrectly() | |
{ | |
const int expected = 0; | |
var pipe = new PipeStream { ReadTimeout = expected }; | |
Assert.Equal(expected, pipe.ReadTimeout); | |
} | |
[Fact] | |
public void PositiveReadTimeoutShouldBeSetCorrectly() | |
{ | |
const int expected = 1234; | |
var pipe = new PipeStream { ReadTimeout = expected }; | |
Assert.Equal(expected, pipe.ReadTimeout); | |
} | |
private static byte[] GetBuffer(int count) | |
{ | |
var random = new Random((int)DateTimeOffset.UtcNow.UtcTicks); | |
var buffer = new byte[count]; | |
random.NextBytes(buffer); | |
return buffer; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment