This crate provides the way to "link" futures into a single block, which stops executing once any of these futures complete.
Under the hood, it uses FuturesUnordered
to execute multiple futures efficiently. In order to avoid boxing, custom one-of
type from
one-of-futures
crate is generated for
each link_futures
block.
License: MIT
Add this to your Cargo.toml
:
[dependencies]
linked-futures = "0.1"
use std::time::Duration;
use futures::{pin_mut, SinkExt, StreamExt};
use futures::channel::mpsc;
use futures::executor::block_on;
use tokio::time::{delay_for, interval, Instant};
use linked_futures::{link_futures, linked_block};
linked_block!(PeriodicStoppableSender, PeriodicStoppableSenderFutureIdentifier;
Forwarder,
Reader,
Generator,
Stop
);
#[tokio::main]
async fn main() {
let (mut tx1, mut rx1) = mpsc::channel::<Instant>(1);
let (mut tx2, mut rx2) = mpsc::channel::<Instant>(1);
let mut interval = interval(Duration::from_millis(100));
let generator = async {
while let Some(instant) = interval.next().await {
tx1.send(instant).await;
}
};
let forwarder = async {
while let Some(instant) = rx1.next().await {
tx2.send(instant).await;
}
};
let reader = async {
while let Some(instant) = rx2.next().await {
println!("instant: {:?}", instant);
}
};
let stop = async {
delay_for(Duration::from_secs(1)).await;
};
let linked = link_futures!(
PeriodicStoppableSender,
PeriodicStoppableSenderFutureIdentifier;
Generator => generator,
Forwarder => forwarder,
Reader => reader,
Stop => stop
);
block_on(async {
pin_mut!(linked);
let (completed_future_identifier, _) = linked.await;
match completed_future_identifier {
PeriodicStoppableSenderFutureIdentifier::Stop =>
println!("linked block stopped normally"),
n =>
panic!("linked block unexpectedly terminated by future: {:?}", n),
}
});
}