Last active
March 22, 2021 19:26
-
-
Save daaku/58557e2545612df8f40b13b66b7d3bd0 to your computer and use it in GitHub Desktop.
mmap, parallel, zero-copy, xml parsing of wikipedia articles
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use aho_corasick::AhoCorasick; | |
use anyhow::Result; | |
use crossbeam_channel::{bounded, Receiver, Sender}; | |
use crossbeam_utils::thread; | |
use itertools::Itertools; | |
use memmap::Mmap; | |
use std::convert::TryInto; | |
use std::fs::File; | |
use std::str::from_utf8; | |
const NUM_WORKERS: usize = 32; | |
const CHUNK_SIZE: usize = 10 * 1024 * 1024; | |
const DUMP_FILE: &'static str = | |
"/home/naitik/Downloads/enwiki-20210101-pages-articles-multistream.xml"; | |
const DEBUG: bool = true; | |
fn tag_text<'a>(doc: &'a roxmltree::Document, tag: &'static str) -> &'a str { | |
doc.descendants() | |
.find(|n| n.has_tag_name(tag)) | |
.expect("tag to be found") | |
.text() | |
.expect("tag to have text") | |
} | |
fn process_chunk(data: &Mmap, receiver: &Receiver<(usize, usize)>) -> Result<()> { | |
let ac = AhoCorasick::new_auto_configured(&["<page>", "</page>"]); | |
for (start, end) in receiver { | |
let chunk = &data[start..end]; | |
for (start, end) in ac.find_iter(&chunk).tuples() { | |
assert_eq!(start.pattern(), 0); | |
assert_eq!(end.pattern(), 1); | |
let text = from_utf8(&chunk[start.start()..end.end()])?; | |
let doc = roxmltree::Document::parse(text)?; | |
let id: u64 = tag_text(&doc, "id").parse()?; | |
let title = tag_text(&doc, "title"); | |
println!("id: {}, title: {}", id, title); | |
} | |
} | |
Ok(()) | |
} | |
fn read_dump(data: &Mmap, len: usize, sender: Sender<(usize, usize)>) -> Result<()> { | |
let ac = AhoCorasick::new_auto_configured(&["<page>"]); | |
let mut start = ac | |
.find(data.as_ref()) | |
.expect("to find start of page") | |
.start(); | |
let ac = AhoCorasick::new_auto_configured(&["</page>"]); | |
loop { | |
if DEBUG && start > 1_000_000 { | |
break; | |
} | |
if start >= len { | |
break; | |
} | |
let mut end = start + CHUNK_SIZE; | |
if end >= len { | |
end = len; | |
} | |
end = match ac.find(&data[end..]) { | |
Some(m) => end + m.end(), | |
None => match twoway::rfind_bytes(&data[start..], b"</page>") { | |
Some(v) => start + v + 7, | |
None => break, | |
}, | |
}; | |
sender.send((start, end))?; | |
start = end; | |
} | |
Ok(()) | |
} | |
fn main() -> Result<()> { | |
let file = File::open(DUMP_FILE)?; | |
let len: usize = file.metadata()?.len().try_into()?; | |
let data = unsafe { Mmap::map(&file)? }; | |
let (sender, receiver) = bounded(NUM_WORKERS); | |
thread::scope(|s| { | |
s.spawn(|_| read_dump(&data, len, sender).unwrap()); | |
for _ in 0..NUM_WORKERS { | |
s.spawn(|_| process_chunk(&data, &receiver).unwrap()); | |
} | |
}) | |
.unwrap(); | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment