Created
March 9, 2024 12:04
-
-
Save rksm/e6739610d9aba825ba395f8fe2fb5913 to your computer and use it in GitHub Desktop.
streaming rust command with tokio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async fn run_speechpipeline_command( | |
job: Job, | |
) -> Result<( | |
mpsc::Receiver<Either<String, Result<Response>>>, | |
tokio::sync::oneshot::Sender<()>, | |
)> { | |
let payload = serde_json::to_string(&job)?; | |
let payload = shellwords::escape(&payload); | |
let cmd = format!( | |
r#" | |
source /media/robert/LINUX_DATA/python-venv/podwriter-speechpipeline/bin/activate; | |
python /home/robert/projects/biz/podwriter/transcription-pipeline/main.py {payload} | |
"# | |
); | |
println!("cmd: {}", cmd); | |
let mut proc = Command::new("bash") | |
.arg("-c") | |
.arg(cmd) | |
.stdout(Stdio::piped()) | |
.stderr(Stdio::piped()) | |
.spawn()?; | |
let stdout = proc.stdout.take().expect("stdout"); | |
let stderr = proc.stderr.take().expect("stderr"); | |
let (tx, rx) = mpsc::channel(256); | |
let (exit_tx, mut exit_rx) = tokio::sync::oneshot::channel(); | |
tokio::spawn(async move { | |
let mut stdout = tokio::io::BufReader::new(stdout).lines(); | |
let mut stderr = tokio::io::BufReader::new(stderr).lines(); | |
loop { | |
let (msg, cont) = tokio::select! { | |
status = proc.wait() => { | |
let err = eyre!("process exited unexpectedly with status: {status:?}"); | |
error!("{err}"); | |
(Some(either::Either::Right(Err(err))), false) | |
} | |
exit = &mut exit_rx => { | |
match exit { | |
Ok(()) => { | |
info!("received signal to kill process"); | |
} | |
Err(_) => { | |
warn!("process/exit channel closed before process completed, killing process"); | |
} | |
} | |
if let Err(err) = proc.kill().await { | |
error!("failed to kill process: {err}"); | |
} | |
(None, false) | |
} | |
line = stdout.next_line() => { | |
if let Ok(Some(line)) = line { | |
if let Ok(output) = serde_json::from_str(&line) { | |
(Some(either::Either::Right(Ok(output))), true) | |
} else { | |
(Some(either::Either::Left(line)), true) | |
} | |
} else { | |
warn!("stdout closed"); | |
(None, false) | |
} | |
} | |
line = stderr.next_line() => { | |
if let Ok(Some(line)) = line { | |
(Some(either::Either::Left(line)), true) | |
} else { | |
warn!("stderr closed"); | |
(None, false) | |
} | |
} | |
}; | |
if let Some(msg) = msg { | |
if (tx.send(msg).await).is_err() { | |
warn!("process output channel closed, exiting"); | |
break; | |
} | |
} | |
if !cont { | |
break; | |
} | |
} | |
debug!("stop reading from process"); | |
}); | |
Ok((rx, exit_tx)) | |
} | |
#[instrument(level = "debug", skip(input), fields(id = id.to_string(), task = input.name()))] | |
async fn spechpipeline_command_stream( | |
id: impl ToString, | |
input: Task, | |
) -> Result<impl Stream<Item = Either<SpeechpipelineUpdate, Response>>> { | |
let id = id.to_string(); | |
let task = input.name(); | |
info!(%id, %task, "starting task"); | |
let (mut rx, exit_tx) = run_speechpipeline_command(Job { | |
id: id.clone(), | |
input, | |
}) | |
.await?; | |
let mut record = false; | |
let mut recorded_output = String::new(); | |
let stream = async_stream::stream! { | |
let _exit_tx = exit_tx; // keep the sender alive, when the stream is cancelled, the process will be killed | |
while let Some(item) = rx.recv().await { | |
match item { | |
either::Either::Left(line) if record || line.to_lowercase().contains("error") => { | |
record = true; | |
recorded_output.push_str(&line); | |
recorded_output.push('\n'); | |
debug!("{line}"); | |
} | |
either::Either::Left(line) => { | |
debug!("{line}"); | |
} | |
either::Either::Right(Ok(response)) => { | |
trace!("got response"); | |
yield Either::Right(response); | |
return; | |
} | |
either::Either::Right(Err(err)) => { | |
yield Either::Left(SpeechpipelineUpdate::Failed(err.into())); | |
} | |
} | |
} | |
warn!("unexpected end of stream: {recorded_output}"); | |
yield Either::Left(SpeechpipelineUpdate::Failed(eyre!("unexpected end of stream: {recorded_output}").into())); | |
}; | |
Ok(stream) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment