Skip to content

Instantly share code, notes, and snippets.

@junnlikestea
Created June 4, 2020 15:15
Show Gist options
  • Save junnlikestea/4e5b953bc56ef7a6fc0eaa3daa59f554 to your computer and use it in GitHub Desktop.
Save junnlikestea/4e5b953bc56ef7a6fc0eaa3daa59f554 to your computer and use it in GitHub Desktop.
Simple Tokio Example.
[package]
name = "xatul"
version = "0.1.0"
authors = ["Junn <[email protected]>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "0.2", features = ["process", "macros", "io-util", "stream", "rt-threaded"] }
futures-util = "0.3.5"
futures = "0.3.5"
futures-macro = "0.3.5"
reqwest = "0.10.6"
surf = "2.0.0-alpha.3"
bytes = "0.5.4"
md-5 = "0.8.0"
use futures::stream::{StreamExt};
use md5::digest::generic_array::typenum::U16;
use md5::digest::generic_array::GenericArray;
use md5::{Digest, Md5};
use tokio::time::Instant;
// might as well make a type for the error so we don't have to keep typing that monster
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync + 'static>>;
pub fn main() -> Result<()> {
//statuscode URL MIMETYPE protocol used[http/1.1,http/2] `md5sum of the response` -> redirect uri
// Create a tokio runtime to manage the execution of our futures.
// this could also be done through the macro #[tokio::main]
let mut multi_threaded_runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.enable_all()
.core_threads(4)
.max_threads(4)
.thread_name("multi-threaded")
.build()?;
// multi_threaded_runtime.block_on(concurrent());
multi_threaded_runtime.block_on(concurrent_fetches());
Ok(())
}
fn md5_hash(input: Vec<u8>) -> GenericArray<u8, U16> {
let mut hasher = Md5::new();
hasher.input(input);
hasher.result()
}
// You could add whatever return types you want later.
// just make a stuct i guess. I used surf, but you could
// just as well use reqwests
async fn fetch(url: String) -> Result<String> {
println!("Started Request");
let mut resp = surf::get(&url).await?;
let status = resp.status();
let mime = resp.mime().unwrap();
let protocol = resp.version();
println!(
"Url:{}\nStatus:{}\tMIMEType:{}\tProtocol:{:?}",
&url, &status, &mime, &protocol
);
// resp.bytes() takes ownership over resp. ;/
let hash = md5_hash(resp.body_bytes().await?);
println!("\nHash:{:x}", hash);
// just debug that we are actually running multiple threads and tasks per thread.
let res = format!(
"current thread {:?} | thread name {}",
std::thread::current().id(),
std::thread::current()
.name()
.get_or_insert("default_thread_name"),
);
Ok(res)
}
async fn concurrent_fetches() {
// in my other code i set this to like 50 or sometimes 250 for waybackurls
// because im fetching over 700 pages sometimes.
const ACTIVE_REQUESTS: usize = 2;
let before = Instant::now();
// this could just be a file you read in or stream from stdin,
// just need to modify that
let urls = vec![
"https://hackerone.com",
"https://google.com",
"http://google.com",
];
// here we turn our urls into a stream of futures, and spawn a task for each of of the urls
// at a limit of 2 requests at a time. You can think of tokio::tasks kinda like goroutines
let responses = futures::stream::iter(
urls.into_iter()
.map(|url| tokio::spawn(async move { fetch(url.to_string()).await })),
)
.buffer_unordered(ACTIVE_REQUESTS) // this is your concurrency threshold
.map(|r| {
println!(
"finished request: {}",
match r.unwrap() {
Ok(rr) => rr,
Err(_) => String::from("Bad"),
}
);
})
.collect::<Vec<_>>();
responses.await;
println!("elapsed time: {:.2?}", before.elapsed());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment