Created
April 14, 2020 21:41
-
-
Save dignifiedquire/fc98b855b2d470cc5f5d595e0f4e14ca to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use async_std::task::Poll; | |
use byte_pool::{Block, BytePool}; | |
use byteorder::{BigEndian, ByteOrder}; | |
use jetscii::bytes; | |
use std::io::{Cursor, Result}; | |
use std::pin::Pin; | |
use std::task::Context; | |
use subslice::SubsliceExt; | |
fn main() { | |
let pool = BytePool::<Vec<u8>>::new(); | |
let source = vec![1; 1024 * 1024]; | |
let incoming_bytes = Cursor::new(&source); | |
let mut dest = Vec::new(); | |
incoming_bytes | |
.chunks(&pool, 1024) | |
.map(|mut chunk| { | |
// wrap the chunk into something | |
chunk.insert(0, 0); | |
chunk.push(255); | |
chunk | |
}) | |
.pipe(&mut dest) | |
.unwrap(); | |
assert_eq!(dest.len(), source.len() + (source.len() / 1024) * 2); | |
// Parse incoming data to `u32` | |
let incoming_bytes = Cursor::new(&source); | |
let u32s = incoming_bytes | |
.chunks(&pool, 4) | |
.map(|chunk| BigEndian::read_u32(&chunk)) | |
.collect::<Vec<u32>>(); | |
assert_eq!(u32s[0], u32::from_le_bytes([1, 1, 1, 1])); | |
} | |
#[test] | |
fn test_async() { | |
use async_std::{io::Cursor, stream::StreamExt, task}; | |
let pool = BytePool::<Vec<u8>>::new(); | |
task::block_on(async move { | |
let mut incoming_bytes = BlockBuffer::new( | |
&pool, | |
Cursor::new( | |
"7\r\n\ | |
Mozilla\r\n\ | |
9\r\n\ | |
Developer\r\n\ | |
7\r\n\ | |
Network\r\n\ | |
0\r\n\ | |
Expires: Wed, 21 Oct 2015 07:28:00 GMT\r\n\ | |
\r\n" | |
.as_bytes(), | |
), | |
); | |
let mut result = Vec::new(); | |
loop { | |
// read the length of the chunk | |
let hex_block: Block = incoming_bytes.read_until(b"\r\n").await.unwrap(); | |
let len = parse_chunk_size(&hex_block[..]); | |
if len == 0 { | |
// we are done with the main parts | |
break; | |
} | |
// read the actual data | |
let data: Block = incoming_bytes.take(len).await.unwrap(); | |
result.extend_from_slice(&data[..]); | |
// read the trailing \r\n | |
let crlf = incoming_bytes.take(2).await.unwrap(); | |
assert_eq!(&crlf[..], "\r\n".as_bytes()); | |
} | |
// parse trailers | |
let trailers: Vec<(String, String)> = incoming_bytes | |
.split_at(b"\r\n") // create chunks splitting at each crlf boundary | |
.map(|chunk: Result<Block>| { | |
let chunk = chunk?; | |
// split into the first occururence of ": " | |
let i = bytes!(':', ' ').find(&chunk[..]).unwrap(); | |
let (key, value) = chunk.split_at(i); | |
Ok(( | |
String::from_utf8(key.into()).unwrap(), | |
String::from_utf8(value[2..].into()).unwrap(), | |
)) | |
}) | |
.collect::<Result<_>>() | |
.await | |
.unwrap(); | |
assert_eq!(result, "MozillaDeveloperNetwork".as_bytes()); | |
assert_eq!( | |
trailers, | |
vec![( | |
"Expires".to_string(), | |
"Wed, 21 Oct 2015 07:28:00 GMT".to_string() | |
)] | |
); | |
}); | |
} | |
fn parse_chunk_size(data: &[u8]) -> usize { | |
let mut data = data.to_vec(); | |
data.extend_from_slice(b"\r\n"); | |
httparse::parse_chunk_size(&data).unwrap().unwrap().1 as usize | |
} | |
pub struct BlockBuffer<'a, T> { | |
pool: &'a BytePool<Vec<u8>>, | |
source: T, | |
buffer: Block<'a, Vec<u8>>, | |
pos: usize, | |
done: bool, | |
} | |
impl<'a, T: async_std::io::Read + Unpin> BlockBuffer<'a, T> { | |
pub fn new(pool: &'a BytePool<Vec<u8>>, source: T) -> Self { | |
Self { | |
pool, | |
source, | |
buffer: pool.alloc(1024), | |
pos: 0, | |
done: false, | |
} | |
} | |
fn make_room(&mut self) { | |
let pos = self.pos; | |
if self.buffer.len() < 1024 || (pos + 5 >= self.buffer.len() && pos * 2 <= 1024 * 1024 * 10) | |
{ | |
let size = std::cmp::max(1024, pos * 2); | |
self.buffer.realloc(size); | |
} | |
} | |
pub fn split_at<'b, 'c>(&'b mut self, predicate: &'c [u8]) -> SplitAt<'a, 'b, 'c, T> { | |
SplitAt { | |
source: self, | |
predicate, | |
done: false, | |
first: true, | |
pos: 0, | |
} | |
} | |
pub fn read_until<'b, 'c>(&'b mut self, predicate: &'c [u8]) -> ReadUntil<'a, 'b, 'c, T> { | |
ReadUntil { | |
source: self, | |
predicate, | |
first: true, | |
} | |
} | |
pub fn take<'b>(&'b mut self, n: usize) -> Take<'a, 'b, T> { | |
Take { source: self, n } | |
} | |
} | |
pub struct SplitAt<'a, 'b, 'c, T: async_std::io::Read> { | |
source: &'b mut BlockBuffer<'a, T>, | |
predicate: &'c [u8], | |
done: bool, | |
first: bool, | |
pos: usize, | |
} | |
impl<'a, 'b, 'c, T: async_std::io::Read + Unpin> async_std::stream::Stream | |
for SplitAt<'a, 'b, 'c, T> | |
{ | |
type Item = Result<Block<'a, Vec<u8>>>; | |
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | |
if self.done || self.source.done { | |
return Poll::Ready(None); | |
} | |
let pos = self.source.pos; | |
// make sure we have enough room in the buffer | |
self.source.make_room(); | |
let mut new_buffer = self.source.pool.alloc(1024); | |
new_buffer.resize(1024, 0); | |
let mut buffer = std::mem::replace(&mut self.source.buffer, new_buffer); | |
// fill buffer | |
let read = if !self.first || self.source.pos == 0 { | |
match Pin::new(&mut self.source.source).poll_read(cx, &mut buffer[pos..]) { | |
Poll::Pending => { | |
// put buffer back | |
self.source.buffer = buffer; | |
return Poll::Pending; | |
} | |
Poll::Ready(Err(err)) => { | |
// put buffer back | |
self.source.buffer = buffer; | |
self.done = true; | |
return Poll::Ready(Some(Err(err))); | |
} | |
Poll::Ready(Ok(r)) => r, | |
} | |
} else { | |
0 | |
}; | |
if read == 0 && !self.first { | |
self.done = true; | |
self.source.done = true; | |
} | |
self.first = false; | |
// buffer filled, use it | |
if let Some(i) = (&buffer[self.pos..pos + read]).find(self.predicate) { | |
let i = self.pos + i; | |
// success, found one | |
let read_end = read + pos; | |
let end = i + self.predicate.len(); | |
// copy the the unneeded parts over into our current buffer | |
let rest_len = read_end - end; | |
self.source.buffer[..rest_len].copy_from_slice(&buffer[end..read_end]); | |
self.source.pos = pos + end; | |
self.pos += end; | |
// resize buffer to return | |
buffer.truncate(i); | |
return Poll::Ready(Some(Ok(buffer))); | |
} | |
// nothing found, next round | |
self.source.pos = pos + read; | |
self.source.buffer = buffer; | |
if self.done || self.source.done { | |
Poll::Ready(None) | |
} else { | |
Poll::Pending | |
} | |
} | |
} | |
pub struct ReadUntil<'a, 'b, 'c, T: async_std::io::Read> { | |
source: &'b mut BlockBuffer<'a, T>, | |
predicate: &'c [u8], | |
first: bool, | |
} | |
impl<'a, 'b, 'c, T: async_std::io::Read + Unpin> async_std::future::Future | |
for ReadUntil<'a, 'b, 'c, T> | |
{ | |
type Output = Result<Block<'a, Vec<u8>>>; | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | |
if self.source.done { | |
return Poll::Ready(Err(std::io::Error::new( | |
std::io::ErrorKind::Other, | |
"no found", | |
))); | |
} | |
let pos = self.source.pos; | |
self.source.make_room(); | |
let mut new_buffer = self.source.pool.alloc(1024); | |
new_buffer.resize(1024, 0); | |
let mut buffer = std::mem::replace(&mut self.source.buffer, new_buffer); | |
// fill buffer | |
let read: usize = if !self.first || self.source.pos == 0 { | |
match Pin::new(&mut self.source.source).poll_read(cx, &mut buffer[pos..]) { | |
Poll::Pending => { | |
// put buffer back | |
self.source.buffer = buffer; | |
return Poll::Pending; | |
} | |
Poll::Ready(Err(err)) => { | |
// put buffer back | |
self.source.buffer = buffer; | |
return Poll::Ready(Err(err)); | |
} | |
Poll::Ready(Ok(r)) => r, | |
} | |
} else { | |
0 | |
}; | |
self.first = false; | |
// buffer filled, use it | |
if let Some(i) = (&buffer[..pos + read]).find(self.predicate) { | |
// success, found one | |
let read_end = read + pos; | |
let end = i + self.predicate.len(); | |
// copy over the bytes we read too much | |
let rest_len = read_end - end; | |
self.source.buffer[..rest_len].copy_from_slice(&buffer[end..read_end]); | |
self.source.pos = rest_len; | |
// resize buffer to return | |
buffer.truncate(i); | |
return Poll::Ready(Ok(buffer)); | |
} | |
// nothing found, next round | |
self.source.buffer = buffer; | |
self.source.pos = pos + read; | |
Poll::Pending | |
} | |
} | |
pub struct Take<'a, 'b, T: async_std::io::Read> { | |
source: &'b mut BlockBuffer<'a, T>, | |
n: usize, | |
} | |
impl<'a, 'b, T: async_std::io::Read + Unpin> async_std::future::Future for Take<'a, 'b, T> { | |
type Output = Result<Block<'a, Vec<u8>>>; | |
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { | |
if self.source.done { | |
return Poll::Ready(Err(std::io::Error::new( | |
std::io::ErrorKind::Other, | |
"no found", | |
))); | |
} | |
let pos = self.source.pos; | |
// make sure we have enough room in the buffer | |
self.source.make_room(); | |
let mut new_buffer = self.source.pool.alloc(1024); | |
new_buffer.resize(1024, 0); | |
let mut buffer = std::mem::replace(&mut self.source.buffer, new_buffer); | |
// fill buffer | |
let read: usize = if self.n > pos { | |
match Pin::new(&mut self.source.source).poll_read(cx, &mut buffer[pos..]) { | |
Poll::Pending => { | |
// put buffer back | |
self.source.buffer = buffer; | |
return Poll::Pending; | |
} | |
Poll::Ready(Err(err)) => { | |
// put buffer back | |
self.source.buffer = buffer; | |
return Poll::Ready(Err(err)); | |
} | |
Poll::Ready(Ok(r)) => r, | |
} | |
} else { | |
0 | |
}; | |
// buffer filled, use it | |
if read + pos >= self.n { | |
// success, enough | |
let read_end = pos + read; | |
let end = self.n; | |
// copy over the bytes we read too much | |
let rest_len = read_end - end; | |
self.source.buffer[..rest_len].copy_from_slice(&buffer[end..read_end]); | |
self.source.pos = rest_len; | |
// resize buffer to return | |
buffer.truncate(end); | |
return Poll::Ready(Ok(buffer)); | |
} | |
// nothing found, next round | |
self.source.buffer = buffer; | |
self.source.pos = pos + read; | |
Poll::Pending | |
} | |
} | |
pub trait ChunksExt: std::io::Read + Sized { | |
fn chunks<'a>(self, pool: &'a BytePool<Vec<u8>>, chunk_size: usize) -> Chunks<'a, Self> { | |
Chunks::new(pool, self, chunk_size) | |
} | |
} | |
impl<T: std::io::Read> ChunksExt for T {} | |
pub trait PipeExt<'a>: Iterator<Item = Block<'a, Vec<u8>>> + Sized { | |
fn pipe<T: std::io::Write>(mut self, mut dest: T) -> Result<()> { | |
while let Some(val) = self.next() { | |
dest.write_all(&val)?; | |
} | |
Ok(()) | |
} | |
} | |
impl<'a, T: Iterator<Item = Block<'a, Vec<u8>>>> PipeExt<'a> for T {} | |
#[derive(Debug)] | |
pub struct Chunks<'a, T: std::io::Read> { | |
pool: &'a BytePool<Vec<u8>>, | |
source: T, | |
chunk_size: usize, | |
current: Option<(Block<'a, Vec<u8>>, usize)>, | |
done: bool, | |
} | |
impl<'a, T: std::io::Read> Chunks<'a, T> { | |
pub fn new(pool: &'a BytePool<Vec<u8>>, source: T, chunk_size: usize) -> Self { | |
Self { | |
pool, | |
source, | |
chunk_size, | |
current: None, | |
done: false, | |
} | |
} | |
} | |
impl<'a, T: std::io::Read> Iterator for Chunks<'a, T> { | |
type Item = Block<'a, Vec<u8>>; | |
#[inline] | |
fn next(&mut self) -> Option<Self::Item> { | |
if self.done { | |
return None; | |
} | |
let mut buf = self.pool.alloc(self.chunk_size); | |
buf.truncate(self.chunk_size); | |
let mut pos = 0; | |
while pos < self.chunk_size { | |
let read = self.source.read(&mut buf[pos..]).unwrap(); // TODO: handle error | |
if read == 0 { | |
self.done = true; | |
break; | |
} | |
pos += read; | |
} | |
if pos == 0 { | |
return None; | |
} | |
if pos + 1 < buf.len() { | |
// shrink buffer | |
buf.realloc(pos + 1); | |
} | |
Some(buf) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment