Skip to content

Instantly share code, notes, and snippets.

@wolfspider
Created February 19, 2024 06:36
Show Gist options
  • Save wolfspider/0c890683734f1c4304029c72da2f84d9 to your computer and use it in GitHub Desktop.
Save wolfspider/0c890683734f1c4304029c72da2f84d9 to your computer and use it in GitHub Desktop.
Tokio SSE Client
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
async fn read_stream(stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
let mut buf_reader = BufReader::new(stream);
let mut is_200_response = false;
let mut is_sse_mode = false;
loop {
let mut line = String::new();
// Read until a newline
buf_reader.read_line(&mut line).await?;
match !is_200_response {
true => {
// Check if response is success
match line.contains("HTTP/1.1 200 OK") {
true => {
is_200_response = true;
}
false => (),
}
// Continue reading HTTP headers
continue;
}
false => (),
}
match !is_sse_mode {
true => {
// Check if we've reached the SSE mode
match line.contains("Content-Type: text/event-stream") {
true => {
is_sse_mode = true;
}
false => (),
}
// Continue reading HTTP headers
continue;
}
false => (),
}
match line.contains("Date:")
|| line.contains("Transfer-Encoding:")
|| line == "\r\n"
|| line == "\n"
{
true => continue,
false => (),
}
// Parse the length of the next message
let message_length = line.trim_end_matches("\r\n").parse::<usize>()?;
println!("Message Length: {} bytes", message_length);
// Create a buffer to read the message
let mut message_buffer = vec![0u8];
buf_reader.read_until(b'\r', &mut message_buffer).await?;
// Process the message
let message = String::from_utf8_lossy(&message_buffer);
println!("Received Message: {}", message.trim_end_matches("\n\r"));
}
Ok(())
}
const CLIENT_ID: &str = r#"GET / HTTP/1.1
Host: localhost:3000
User-Agent: curl/7.81.0
Accept: */*
"#;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stream = TcpStream::connect("127.0.0.1:3000").await?;
stream.set_nodelay(true)?;
stream.set_linger(None)?;
stream.write(CLIENT_ID.as_bytes()).await?;
read_stream(stream).await?;
Ok(())
}
@jbennettufl
Copy link

jbennettufl commented Feb 19, 2024

The blank line on line 75 has to be there! https://gist.github.com/wolfspider/0c890683734f1c4304029c72da2f84d9#file-main-rs-L75 I may switch back to not using this raw string because cutting and pasting from github removes the blank line and caused me to test out several versions of Tokio and strace the application which gets stuck on a futex indefinitely. I almost reinstalled Rust completely until I saw the problem.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment