diff --git a/stream-composition/src/main.rs b/stream-composition/src/main.rs index 2140ad7..6b26149 100644 --- a/stream-composition/src/main.rs +++ b/stream-composition/src/main.rs @@ -3,10 +3,14 @@ use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { - let mut messages = - pin!(get_messages().timeout(Duration::from_millis(200))); + let messages = get_messages().timeout(Duration::from_millis(200)); + let intervals = get_intervals() + .map(|count| format!("Interval: {count}")) + .timeout(Duration::from_secs(10)); + let merged = messages.merge(intervals); + let mut stream = pin!(merged); - while let Some(result) = messages.next().await { + while let Some(result) = stream.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), @@ -30,3 +34,18 @@ fn get_messages() -> impl Stream<Item = String> { ReceiverStream::new(rx) } + +fn get_intervals() -> impl Stream<Item = u32> { + let (tx, rx) = trpl::channel(); + + trpl::spawn_task(async move { + let mut count = 0; + loop { + trpl::sleep(Duration::from_millis(1)).await; + count += 1; + tx.send(count).unwrap(); + } + }); + + ReceiverStream::new(rx) +}