diff --git a/stream-composition/src/main.rs b/stream-composition/src/main.rs index cc3c3aa..ef1512d 100644 --- a/stream-composition/src/main.rs +++ b/stream-composition/src/main.rs @@ -25,11 +25,15 @@ fn get_messages() -> impl Stream<Item = String> { trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; + for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; - tx.send(format!("Message: '{message}'")).unwrap(); + if let Err(send_error) = tx.send(format!("Message: '{message}'")) { + eprintln!("Cannot send message '{message}': {send_error}"); + break; + } } }); @@ -44,9 +48,13 @@ fn get_intervals() -> impl Stream<Item = u32> { loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; - tx.send(count).unwrap(); + + if let Err(send_error) = tx.send(count) { + eprintln!("Could not send interval {count}: {send_error}"); + break; + }; } }); ReceiverStream::new(rx) -} +} \ No newline at end of file