Created
August 4, 2017 19:14
-
-
Save kardeiz/1d0b50dacbeaefe9c37ecf3315134461 to your computer and use it in GitHub Desktop.
hyper async + mysql async
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate hyper; | |
extern crate mysql_async as my; | |
extern crate net2; | |
extern crate tokio_core; | |
extern crate futures; | |
use std::net::SocketAddr; | |
use hyper::server::Service; | |
use hyper::server::Http; | |
use hyper::server::{Request, Response}; | |
use hyper::header::{ContentType, ContentLength}; | |
use hyper::StatusCode; | |
mod cored { | |
use std::cell::RefCell; | |
use tokio_core::reactor::{Core, Handle}; | |
struct Container { | |
core: RefCell<Option<Core>>, | |
handle: RefCell<Handle> | |
} | |
thread_local!(static CONTAINER: Container = { | |
let core = Core::new().unwrap(); | |
let handle = core.handle(); | |
Container { | |
core: RefCell::new(Some(core)), | |
handle: RefCell::new(handle) | |
} | |
}); | |
pub fn take_local_core() -> Option<Core> { | |
CONTAINER.with(|o| o.core.borrow_mut().take() ) | |
} | |
pub fn with_local_handle<T, F: FnOnce(&Handle) -> T>(cls: F) -> T { | |
CONTAINER.with(|o| cls(&*o.handle.borrow())) | |
} | |
} | |
struct App; | |
impl App { | |
pub fn quick_serve(self, num_threads: usize, addr: SocketAddr) { | |
use std::sync::Arc; | |
use net2::unix::UnixTcpBuilderExt; | |
use futures::Stream; | |
fn inner(addr: &SocketAddr, protocol: Arc<Http>, router: Arc<App>) { | |
let mut core = cored::take_local_core().unwrap(); | |
let hdl = core.handle(); | |
let listener = net2::TcpBuilder::new_v4() | |
.unwrap() | |
.reuse_port(true) | |
.unwrap() | |
.bind(addr) | |
.unwrap() | |
.listen(128) | |
.unwrap(); | |
let listener = | |
tokio_core::net::TcpListener::from_listener(listener, addr, &hdl).unwrap(); | |
core.run(listener.incoming().for_each(|(socket, addr)| { | |
protocol.bind_connection(&hdl, socket, addr, router.clone()); | |
Ok(()) | |
})).unwrap(); | |
} | |
let protocol = Arc::new(Http::new()); | |
let router = Arc::new(self); | |
for _ in 0..(num_threads - 1) { | |
let protocol_c = protocol.clone(); | |
let router_c = router.clone(); | |
std::thread::spawn(move || inner(&addr, protocol_c, router_c)); | |
} | |
inner(&addr, protocol, router); | |
} | |
} | |
impl Service for App { | |
type Request = Request; | |
type Response = Response; | |
type Error = hyper::Error; | |
type Future = Box<futures::future::Future<Item=Response, Error=hyper::Error>>; | |
fn call(&self, req: Request) -> Self::Future { | |
use futures::Future; | |
use my::prelude::Queryable; | |
let db = ::std::env::var("DATABASE_URL").unwrap(); | |
let pool = cored::with_local_handle(|hdl| my::Pool::new(&db, hdl)); | |
let body = pool.get_conn() | |
.and_then(|conn| conn.prep_exec("SELECT id FROM clips", ())) | |
.and_then(|result| { | |
result.map_and_drop(|row| { | |
let (id,): (i32,) = my::from_row(row); | |
id.to_string() | |
}) | |
}) | |
.and_then(|(_, ids)| pool.disconnect().map(|_| ids)) | |
.map(|ids| { | |
let body = ids.join(", "); | |
Response::new() | |
.with_header(ContentLength(body.len() as u64)) | |
.with_body(body) | |
}) | |
.map_err(|_| unimplemented!() ); | |
Box::new(body) | |
} | |
} | |
fn main() { | |
App.quick_serve(2, "127.0.0.1:3000".parse().unwrap()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment