From b12bbc45c8ccd579b72102c85aa109760b8fb9fd Mon Sep 17 00:00:00 2001 From: Manuel Thalmann <m@nuth.ch> Date: Thu, 10 Apr 2025 08:22:40 +0200 Subject: [PATCH] Add a merged stream --- stream-composition/src/main.rs | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/stream-composition/src/main.rs b/stream-composition/src/main.rs index 2140ad7..6b26149 100644 --- a/stream-composition/src/main.rs +++ b/stream-composition/src/main.rs @@ -3,10 +3,14 @@ use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { - let mut messages = - pin!(get_messages().timeout(Duration::from_millis(200))); + let messages = get_messages().timeout(Duration::from_millis(200)); + let intervals = get_intervals() + .map(|count| format!("Interval: {count}")) + .timeout(Duration::from_secs(10)); + let merged = messages.merge(intervals); + let mut stream = pin!(merged); - while let Some(result) = messages.next().await { + while let Some(result) = stream.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), @@ -30,3 +34,18 @@ fn get_messages() -> impl Stream<Item = String> { ReceiverStream::new(rx) } + +fn get_intervals() -> impl Stream<Item = u32> { + let (tx, rx) = trpl::channel(); + + trpl::spawn_task(async move { + let mut count = 0; + loop { + trpl::sleep(Duration::from_millis(1)).await; + count += 1; + tx.send(count).unwrap(); + } + }); + + ReceiverStream::new(rx) +}