Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
580 views
in Technique[技术] by (71.8m points)

asynchronous - Is there any way to create a async stream generator that yields the result of repeatedly calling a function?

I want to build a program that collects weather updates and represents them as a stream. I want to call get_weather() in an infinite loop, with 60 seconds delay between finish and start.

A simplified version would look like this:

async fn get_weather() -> Weather { /* ... */ }

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    loop {
        tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
        let weather = get_weather().await;
        yield weather; // This is not supported
        // Note: waiting for get_weather() stops the timer and avoids overflows.
    }
}

Is there any way to do this easily?

Using tokio::timer::Interval will not work when get_weather() takes more than 60 seconds:

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
        .then(|| get_weather())
}

If that happens, the next function will start immediately. I want to keep exactly 60 seconds between the previous get_weather() start and the next get_weather() start.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Use stream::unfold to go from the "world of futures" to the "world of streams". We don't need any extra state, so we use the empty tuple:

use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11

struct Weather;

async fn get_weather() -> Weather {
    Weather
}

const BETWEEN: Duration = Duration::from_secs(1);

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    futures::stream::unfold((), |_| async {
        time::delay_for(BETWEEN).await;
        let weather = get_weather().await;
        Some((weather, ()))
    })
}

#[tokio::main]
async fn main() {
    get_weather_stream()
        .take(3)
        .for_each(|_v| async {
            println!("Got the weather");
        })
        .await;
}
% time ./target/debug/example

Got the weather
Got the weather
Got the weather

real    3.085   3085495us
user    0.004   3928us
sys     0.003   3151us

See also:


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...