I'm trying to write a web crawler in Rust using the tokio asynchronous runtime. I want to fetch/process multiple pages asynchronously but I also want the crawler to stop when it reaches the end (in other words if there is nothing left to crawl). So far I have used futures::future::try_join_all for getting a collective result from the async functions that I have provide as Futures but this obviously requires the program to know the total pages to crawl beforehand. For example:
async fn fetch(_url: String) -> Result<String, ()> {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(String::from("foo"))
}
#[tokio::main]
async fn main() {
let search_url = "https://example.com/?page={page_num}";
let futures = (1..=3)
.map(|page_num| search_url.replace("{page_num}", &page_num.to_string()))
.map(|url| fetch(url));
let _ = futures::future::try_join_all(futures).await.unwrap();
}
In this simple example I have to know the total pages to go through (1..=3) before actually fetching them. What I want is, not providing any range and have a condition to stop the whole process. (e.g. if the HTML result contains "not found")
I looked into futures::executor::block_on but I'm not sure if it is something that I can utilize for this task.
Here's roughly how to do this using
Streamand.buffered():I'll go over the steps in
main()in detail:stream::iter(1..)creates an unboundedStreamof integers representing each page number.map(fetch_page)of course will callfetch_pagefor each page number.buffered(10)this will allow up to 10fetch_pagecalls to occur concurrently and will preserve the original order.take_while(|page| future::ready(page.is_ok()))will keep the stream going until afetch_pagereturns an error, it usesfutures::future::readysince the function passed totake_whilemust return a future.map(|page| page.unwrap())will pull out the successful pages, it won't panic because we know the stream will stop when any errors occur.collect()does essentially the same thing as for an iterator except you have to.awaititRunning the above code prints out the following, showing that it tries 10 at a time but will only return up to the first failure:
This glosses over some nice-to-haves like handling non-missing-page errors or retrying, but I hope this gives you a good foundation. In those cases you might reach for the methods on
TryStreamExt, which specially handle streams ofResults.