Created
March 9, 2020 10:22
-
-
Save Sherlock-Holo/a9e084310535581682cb0186604d6bbe to your computer and use it in GitHub Desktop.
async-std hyper listener and connector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::future::Future; | |
use std::io; | |
use std::io::ErrorKind; | |
use std::io::Result; | |
use std::pin::Pin; | |
use std::sync::Arc; | |
use std::task::{Context, Poll}; | |
use async_std::io::{Read, Write}; | |
use async_std::net::{TcpListener, TcpStream}; | |
use async_std::stream::Stream; | |
use async_std::task; | |
use async_tls::{TlsAcceptor, TlsConnector}; | |
use async_tls::client::TlsStream as ClientTlsStream; | |
use async_tls::server::TlsStream as ServerTlsStream; | |
use futures::future::ready; | |
use hyper::client::connect::{Connected, Connection}; | |
use hyper::service::Service; | |
use rustls::ClientConfig; | |
use tonic::transport::Uri; | |
#[derive(Clone)] | |
pub struct HyperExecutor; | |
impl<F> hyper::rt::Executor<F> for HyperExecutor | |
where | |
F: Future + Send + 'static, | |
F::Output: Send + 'static, | |
{ | |
fn execute(&self, fut: F) { | |
task::spawn(fut); | |
} | |
} | |
pub struct HyperListener { | |
tls_acceptor: TlsAcceptor, | |
tcp_listener: TcpListener, | |
} | |
impl hyper::server::accept::Accept for HyperListener { | |
type Conn = HyperStream<ServerTlsStream<TcpStream>>; | |
type Error = io::Error; | |
fn poll_accept( | |
mut self: Pin<&mut Self>, | |
cx: &mut Context, | |
) -> Poll<Option<Result<Self::Conn>>> { | |
let stream = task::ready!(Pin::new(&mut self.tcp_listener.incoming()).poll_next(cx)).unwrap()?; | |
let stream = task::ready!(Pin::new(&mut self.tls_acceptor.accept(stream)).poll(cx)); | |
match stream { | |
Err(err) => Poll::Ready(Some(Err(err))), | |
Ok(stream) => Poll::Ready(Some(Ok(HyperStream(stream)))) | |
} | |
} | |
} | |
pub struct HyperStream<T>(pub T); | |
impl<T> tokio::io::AsyncRead for HyperStream<T> | |
where T: Read + Unpin | |
{ | |
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> { | |
Pin::new(&mut self.0).poll_read(cx, buf) | |
} | |
} | |
impl<T> tokio::io::AsyncWrite for HyperStream<T> | |
where T: Write + Unpin | |
{ | |
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { | |
Pin::new(&mut self.0).poll_write(cx, buf) | |
} | |
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | |
Pin::new(&mut self.0).poll_flush(cx) | |
} | |
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { | |
Pin::new(&mut self.0).poll_close(cx) | |
} | |
} | |
impl Connection for HyperStream<ClientTlsStream<TcpStream>> { | |
fn connected(&self) -> Connected { | |
Connected::new() | |
} | |
} | |
/*pub struct HyperServerStream(pub ServerTlsStream<TcpStream>); | |
impl tokio::io::AsyncRead for HyperServerStream { | |
fn poll_read( | |
mut self: Pin<&mut Self>, | |
cx: &mut Context, | |
buf: &mut [u8], | |
) -> Poll<io::Result<usize>> { | |
Pin::new(&mut self.0).poll_read(cx, buf) | |
} | |
} | |
impl tokio::io::AsyncWrite for HyperServerStream { | |
fn poll_write( | |
mut self: Pin<&mut Self>, | |
cx: &mut Context, | |
buf: &[u8], | |
) -> Poll<io::Result<usize>> { | |
Pin::new(&mut self.0).poll_write(cx, buf) | |
} | |
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
Pin::new(&mut self.0).poll_flush(cx) | |
} | |
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
Pin::new(&mut self.0).poll_close(cx) | |
} | |
} | |
pub struct HyperClientStream(pub ClientTlsStream<TcpStream>); | |
impl tokio::io::AsyncRead for HyperClientStream { | |
fn poll_read( | |
mut self: Pin<&mut Self>, | |
cx: &mut Context, | |
buf: &mut [u8], | |
) -> Poll<io::Result<usize>> { | |
Pin::new(&mut self.0).poll_read(cx, buf) | |
} | |
} | |
impl tokio::io::AsyncWrite for HyperClientStream { | |
fn poll_write( | |
mut self: Pin<&mut Self>, | |
cx: &mut Context, | |
buf: &[u8], | |
) -> Poll<io::Result<usize>> { | |
Pin::new(&mut self.0).poll_write(cx, buf) | |
} | |
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
Pin::new(&mut self.0).poll_flush(cx) | |
} | |
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> { | |
Pin::new(&mut self.0).poll_close(cx) | |
} | |
} | |
impl Connection for HyperClientStream { | |
fn connected(&self) -> Connected { | |
let connected = Connected::new(); | |
if let Ok(remote_addr) = self.0.get_ref().peer_addr() { | |
connected.extra(remote_addr) | |
} else { | |
connected | |
} | |
} | |
}*/ | |
#[derive(Clone)] | |
pub struct HyperConnector { | |
tls_connector: TlsConnector, | |
} | |
impl Service<Uri> for HyperConnector { | |
type Response = HyperStream<ClientTlsStream<TcpStream>>; | |
type Error = std::io::Error; | |
type Future = Pin<Box<dyn Future<Output=io::Result<Self::Response>>>>; | |
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> { | |
Poll::Ready(Ok(())) | |
} | |
fn call(&mut self, req: Uri) -> Self::Future { | |
match req.authority() { | |
None => Box::pin(ready(Err(io::Error::new(ErrorKind::AddrNotAvailable, format!("{} is invalid", req)).into()))), | |
Some(authority) => { | |
let host = authority.host().to_string(); | |
let authority = authority.to_string(); | |
let tls_connector = self.tls_connector.clone(); | |
Box::pin(async move { | |
let stream = TcpStream::connect(authority).await?; | |
let tls_stream = tls_connector.connect(host, stream).await?; | |
Ok(HyperStream(tls_stream)) | |
}) | |
} | |
} | |
} | |
} | |
impl From<ClientConfig> for HyperConnector { | |
fn from(cfg: ClientConfig) -> Self { | |
Self { | |
tls_connector: TlsConnector::from(Arc::new(cfg)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment