Created
April 22, 2022 10:43
-
-
Save algon-320/ed8efe501dd06d0151e499e0ed48640a to your computer and use it in GitHub Desktop.
TCP echo server implemented with actix
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//# actix = "0.13.0" | |
//# tokio = { version = "1.17.0", features = ["net"] } | |
//# tokio-util = { version = "0.7.1", features = ["codec"] } | |
//# bytes = "1.1.0" | |
use actix::prelude::*; | |
use std::net::SocketAddr; | |
use tokio::net::{ | |
tcp::{OwnedReadHalf, OwnedWriteHalf}, | |
TcpListener, TcpStream, | |
}; | |
use actix::io::{SinkWrite, WriteHandler}; | |
use bytes::{Bytes, BytesMut}; | |
use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite}; | |
type BytesWriter = SinkWrite<Bytes, FramedWrite<OwnedWriteHalf, BytesCodec>>; | |
struct Connection { | |
peer_addr: SocketAddr, | |
writer: BytesWriter, | |
} | |
impl Actor for Connection { | |
type Context = Context<Self>; | |
fn started(&mut self, _: &mut Self::Context) { | |
println!("{}: started", self.peer_addr); | |
} | |
fn stopped(&mut self, _: &mut Self::Context) { | |
println!("{}: stopped", self.peer_addr); | |
} | |
} | |
impl Connection { | |
fn new(stream: TcpStream) -> Addr<Connection> { | |
let peer_addr = stream.peer_addr().unwrap(); | |
let (rhalf, whalf): (OwnedReadHalf, OwnedWriteHalf) = stream.into_split(); | |
// Create a context in advance (without attaching any actor yet.) | |
let mut ctx = Context::new(); | |
// Stream<Item = Result<BytesMut, std::io::Error>> | |
// is implemented for FramedRead. | |
let stream = FramedRead::new(rhalf, BytesCodec::default()); | |
// By attaching the stream to the context here, | |
// we can receive the item on StreamHandler::handle | |
// when Item (BytesMut) is produced from the stream. | |
Connection::add_stream(stream, &mut ctx); | |
// impl Sink<Item = Bytes, Error = std::io::Error> | |
let sink = FramedWrite::new(whalf, BytesCodec::default()); | |
// SinkWrite is a wrapper around the sink, | |
// which absorbs async handling in synchronous handlers. | |
// NOTE: SinkWrite requires `WriteHandler<Sink::Error>` impl for the actor. | |
let writer = SinkWrite::new(sink, &mut ctx); | |
// Create and run an actor. | |
let tcp = Connection { peer_addr, writer }; | |
ctx.run(tcp) | |
} | |
} | |
impl StreamHandler<Result<BytesMut, std::io::Error>> for Connection { | |
fn handle(&mut self, res: Result<BytesMut, std::io::Error>, _: &mut Self::Context) { | |
match res { | |
Ok(bytes) => { | |
// Convert BytesMut to Bytes | |
let bytes = bytes.freeze(); | |
println!("{}: echo {} bytes", self.peer_addr, bytes.len()); | |
// Wirte to underlying sink | |
self.writer.write(bytes).expect("closed sink"); | |
} | |
Err(err) => { | |
println!("{}: error: {}", self.peer_addr, err); | |
} | |
} | |
} | |
} | |
// Handler for asynchronous errors generated by SinkWrite::write | |
impl WriteHandler<std::io::Error> for Connection { | |
fn error(&mut self, err: std::io::Error, _: &mut Self::Context) -> Running { | |
println!("{}: error: {}", self.peer_addr, err); | |
Running::Continue | |
} | |
} | |
#[actix::main] | |
async fn main() { | |
let addr: SocketAddr = "127.0.0.1:12121".parse().unwrap(); | |
println!("listening on {}", addr); | |
let listener = TcpListener::bind(addr).await.expect("bind"); | |
// Wait for a client | |
while let Ok((stream, addr)) = listener.accept().await { | |
println!("new connection: {}", addr); | |
// Spawn a dedicated task for the client | |
actix::spawn(async move { Connection::new(stream) }); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment