Add a merged stream
This commit is contained in:
parent
0706bae9f7
commit
b12bbc45c8
1 changed files with 22 additions and 3 deletions
|
@ -3,10 +3,14 @@ use trpl::{ReceiverStream, Stream, StreamExt};
|
|||
|
||||
fn main() {
|
||||
trpl::run(async {
|
||||
let mut messages =
|
||||
pin!(get_messages().timeout(Duration::from_millis(200)));
|
||||
let messages = get_messages().timeout(Duration::from_millis(200));
|
||||
let intervals = get_intervals()
|
||||
.map(|count| format!("Interval: {count}"))
|
||||
.timeout(Duration::from_secs(10));
|
||||
let merged = messages.merge(intervals);
|
||||
let mut stream = pin!(merged);
|
||||
|
||||
while let Some(result) = messages.next().await {
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(message) => println!("{message}"),
|
||||
Err(reason) => eprintln!("Problem: {reason:?}"),
|
||||
|
@ -30,3 +34,18 @@ fn get_messages() -> impl Stream<Item = String> {
|
|||
|
||||
ReceiverStream::new(rx)
|
||||
}
|
||||
|
||||
fn get_intervals() -> impl Stream<Item = u32> {
|
||||
let (tx, rx) = trpl::channel();
|
||||
|
||||
trpl::spawn_task(async move {
|
||||
let mut count = 0;
|
||||
loop {
|
||||
trpl::sleep(Duration::from_millis(1)).await;
|
||||
count += 1;
|
||||
tx.send(count).unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
ReceiverStream::new(rx)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue