diff --git a/stream-composition/src/main.rs b/stream-composition/src/main.rs index 57fe578..2140ad7 100644 --- a/stream-composition/src/main.rs +++ b/stream-composition/src/main.rs @@ -1,22 +1,32 @@ +use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { - let mut messages = get_messages(); + let mut messages = + pin!(get_messages().timeout(Duration::from_millis(200))); - while let Some(message) = messages.next().await { - println!("{message}"); + while let Some(result) = messages.next().await { + match result { + Ok(message) => println!("{message}"), + Err(reason) => eprintln!("Problem: {reason:?}"), + } } - }); + }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); - let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; - for message in messages { - tx.send(format!("Message: '{message}'")).unwrap(); - } + trpl::spawn_task(async move { + let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; + for (index, message) in messages.into_iter().enumerate() { + let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; + trpl::sleep(Duration::from_millis(time_to_sleep)).await; + + tx.send(format!("Message: '{message}'")).unwrap(); + } + }); ReceiverStream::new(rx) }