Created
February 9, 2021 23:22
-
-
Save fteychene/ea1bb0d7ec205484216b3f1c55cec7c0 to your computer and use it in GitHub Desktop.
Lolilol - a test of https://github.com/akiradeveloper/lol usage
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "lolilol" | |
version = "0.1.0" | |
authors = ["fteychene <[email protected]>"] | |
edition = "2018" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
lol-core = "0.7.1" | |
async-trait = "0.1.42" | |
anyhow = "1.0" | |
tokio = { version = "1", features = ["full"] } | |
tonic = "0.4" | |
simple-logging = "2.0.2" | |
log = "0.4" | |
bytes = "1.0" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::Error; | |
use async_trait::async_trait; | |
use log::LevelFilter; | |
use lol_core::core_message; | |
use lol_core::proto_compiled; | |
use lol_core::proto_compiled::raft_client::RaftClient; | |
use lol_core::proto_compiled::{AddServerReq, CommitReq}; | |
use lol_core::snapshot::{SnapshotStream, SnapshotTag}; | |
use lol_core::{Config, MakeSnapshot, RaftApp, RaftCore, TunableConfig}; | |
use std::io::Read; | |
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; | |
use std::sync::Arc; | |
use std::{io, thread, time}; | |
struct LoliApp { | |
contents: bytes::Bytes, | |
} | |
#[async_trait] | |
impl RaftApp for LoliApp { | |
/// How state machine interacts with inputs from clients. | |
async fn process_message(&self, request: &[u8]) -> Result<Vec<u8>, Error> { | |
println!("process_message : {:?}", request); | |
unimplemented!() | |
} | |
/// Almost same as process_message but is called in log application path. | |
/// This function may return `MakeSnapshot` to make a new snapshot. | |
/// Note that the snapshot entry corresponding to the copy snapshot is not guaranteed to be made | |
/// due to possible I/O errors, etc. | |
async fn apply_message( | |
&self, | |
request: &[u8], | |
apply_index: u64, | |
) -> Result<(Vec<u8>, MakeSnapshot), Error> { | |
println!( | |
"apply_message: index={} request={:?}", | |
apply_index, | |
std::str::from_utf8(request) | |
); | |
Ok((vec![], MakeSnapshot::None)) | |
} | |
/// Special type of apply_message but when the entry is snapshot entry. | |
/// Snapshot is None happens iff apply_index is 1 which is the most initial snapshot. | |
async fn install_snapshot( | |
&self, | |
snapshot: Option<&SnapshotTag>, | |
apply_index: u64, | |
) -> Result<(), Error> { | |
Ok(()) | |
} | |
/// This function is called from compaction threads. | |
/// It should return new snapshot from accumulative compution with the old_snapshot and the subsequent log entries. | |
async fn fold_snapshot( | |
&self, | |
old_snapshot: Option<&SnapshotTag>, | |
requests: Vec<&[u8]>, | |
) -> Result<SnapshotTag, Error> { | |
Ok(SnapshotTag::from(vec![])) | |
} | |
/// Make a snapshot resource and returns the tag. | |
async fn from_snapshot_stream( | |
&self, | |
st: SnapshotStream, | |
snapshot_index: u64, | |
) -> Result<SnapshotTag, Error> { | |
unimplemented!() | |
} | |
/// Make a snapshot stream from a snapshot resource bound to the tag. | |
async fn to_snapshot_stream(&self, x: &SnapshotTag) -> SnapshotStream { | |
unimplemented!() | |
} | |
/// Delete a snapshot resource bound to the tag. | |
async fn delete_resource(&self, x: &SnapshotTag) -> Result<(), Error> { | |
unimplemented!() | |
} | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), Error> { | |
simple_logging::log_to(std::io::stdout(), LevelFilter::Info); | |
println!("Lolilol project. Use lol"); | |
tokio::spawn(async { | |
let app = LoliApp { | |
contents: Default::default(), | |
}; | |
let storage = lol_core::storage::memory::Storage::new(); | |
let config = Config::new("http://127.0.0.1:5000".to_string()); | |
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5000); | |
let core = RaftCore::new( | |
app, | |
storage, | |
config, | |
TunableConfig { | |
compaction_delay_sec: 3, | |
compaction_interval_sec: 5, | |
}, | |
) | |
.await; | |
let service = lol_core::make_service(core); | |
tonic::transport::Server::builder() | |
.add_service(service) | |
.serve(socket) | |
.await | |
}); | |
tokio::spawn(async { | |
let app = LoliApp { | |
contents: Default::default(), | |
}; | |
let storage = lol_core::storage::memory::Storage::new(); | |
let config = Config::new("http://127.0.0.1:5001".to_string()); | |
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5001); | |
let core = RaftCore::new( | |
app, | |
storage, | |
config, | |
TunableConfig { | |
compaction_delay_sec: 3, | |
compaction_interval_sec: 5, | |
}, | |
) | |
.await; | |
let service = lol_core::make_service(core); | |
tonic::transport::Server::builder() | |
.add_service(service) | |
.serve(socket) | |
.await; | |
}); | |
tokio::spawn(async { | |
let app = LoliApp { | |
contents: Default::default(), | |
}; | |
let storage = lol_core::storage::memory::Storage::new(); | |
let config = Config::new("http://127.0.0.1:5002".to_string()); | |
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 5002); | |
let core = RaftCore::new( | |
app, | |
storage, | |
config, | |
TunableConfig { | |
compaction_delay_sec: 3, | |
compaction_interval_sec: 5, | |
}, | |
) | |
.await; | |
let service = lol_core::make_service(core); | |
tonic::transport::Server::builder() | |
.add_service(service) | |
.serve(socket) | |
.await; | |
}); | |
let ten_millis = time::Duration::from_millis(500); | |
thread::sleep(ten_millis); | |
let mut client = RaftClient::connect(tonic::transport::channel::Endpoint::from_static( | |
"http://127.0.0.1:5000", | |
)) | |
.await?; | |
let response = client | |
.add_server(tonic::Request::new(AddServerReq { | |
id: "http://127.0.0.1:5000".into(), | |
})) | |
.await?; | |
println!("adding 5000: {:?}", response); | |
let response = client | |
.add_server(tonic::Request::new(AddServerReq { | |
id: "http://127.0.0.1:5001".into(), | |
})) | |
.await?; | |
println!("adding 5001: {:?}", response); | |
let response = client | |
.add_server(tonic::Request::new(AddServerReq { | |
id: "http://127.0.0.1:5002".into(), | |
})) | |
.await?; | |
println!("adding 5002: {:?}", response); | |
let mut conn = lol_core::connection::connect(tonic::transport::channel::Endpoint::from_static( | |
"http://127.0.0.1:5000", | |
)) | |
.await | |
.unwrap(); | |
let msg = core_message::Req::ClusterInfo; | |
let req = proto_compiled::ProcessReq { | |
message: core_message::Req::serialize(&msg), | |
core: true, | |
}; | |
let res = conn | |
.request_process_locally(req) | |
.await | |
.unwrap() | |
.into_inner(); | |
let msg = core_message::Rep::deserialize(&res.message).unwrap(); | |
println!("clusterInfo: {:?}", msg); | |
let response = conn | |
.request_commit(proto_compiled::CommitReq { | |
core: false, | |
message: String::from("coucou from brest").as_bytes().to_vec(), | |
}) | |
.await?; | |
println!("requestCommit: {:?}", response); | |
let mut buffer = String::new(); | |
std::io::stdin().read_to_string(&mut buffer)?; | |
Ok(()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment