Async Programming in Rust: Stream Trait and Its Design
Grace Collins
Solutions Engineer · Leapcell

The Stream
trait is similar to the Future
trait. While Future
represents the state change of a single item, Stream
, akin to the Iterator
trait in the standard library, can yield multiple values before it finishes. Or simply put, a Stream
is made up of a series of Futures
, from which we can read each Future
’s result until the Stream
completes.
Definition of Stream
The Future
is the most fundamental concept in asynchronous programming. If a Future
represents a one-time asynchronous value, then a Stream
represents a series of asynchronous values. Future
is 1, while Stream
is 0, 1, or N. The signature of Stream
is as follows:
pub trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
The concept of Stream
corresponds to the Iterator
in synchronous primitives. Recall how similar even their signatures are!
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; }
Stream is used to abstract continuous data sources, although it can also end (when poll
returns None
)
A common example of a Stream
is the consumer Receiver
in the futures
crate's message channel. Every time a message is sent from the Send
side, the receiver gets a Some(val)
value. Once the Send
side is closed (dropped) and there are no more messages in the channel, it receives a None
.
use futures::channel::mpsc; use futures::{executor::block_on, SinkExt, StreamExt}; async fn send_recv() { const BUFFER_SIZE: usize = 10; let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE); println!("tx: Send 1, 2"); tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); drop(tx); // `StreamExt::next` is similar to `Iterator::next`, but instead of returning a value, // it returns a `Future<Output = Option<T>>`, so you need `.await` to get the actual value assert_eq!(Some(1), rx.next().await); assert_eq!(Some(2), rx.next().await); assert_eq!(None, rx.next().await); } fn main() { block_on(send_recv()); }
Differences Between Iterator and Stream
Iterator
allows repeatedly calling thenext()
method to get new values until it returnsNone
.Iterator
is blocking: each call tonext()
occupies the CPU until a result is obtained. In contrast, the asynchronousStream
is non-blocking and yields the CPU while waiting.Stream
'spoll_next()
method is quite similar toFuture
'spoll()
method, and its function is akin to thenext()
method ofIterator
. However, callingpoll_next()
directly is inconvenient because you need to manually handle thePoll
state, which isn’t very ergonomic. That’s why Rust providesStreamExt
, an extension trait forStream
, which offers anext()
method that returns aFuture
implemented by theNext
struct. This way, you can directly iterate over a value withstream.next().await
.
Note:
StreamExt
stands for Stream Extension. In Rust, it's a common practice to keep the minimal trait definition (likeStream
) in one file, and put additional APIs (likeStreamExt
) in a separate, related file.
Note: Unlike
Future
, theStream
trait is not yet in Rust’s core library (std::core
). It resides in thefutures-util
crate, andStreamExtensions
is also not part of the standard library. This means different libraries might provide conflicting imports. For example, Tokio provides its ownStreamExt
, separate fromfutures-util
. If possible, prefer usingfutures-util
, as it's the most commonly used crate for async/await.
Implementation of StreamExt
's next()
method and the Next
struct:
pub trait StreamExt: Stream { fn next(&mut self) -> Next<'_, Self> where Self: Unpin { assert_future::<Option<Self::Item>, _>(Next::new(self)) } } // `next` returns the `Next` struct pub struct Next<'a, St: ?Sized> { stream: &'a mut St, } // If Stream is Unpin, then Next is also Unpin impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { Self { stream } } } // Next implements Future, each poll() is essentially polling from the stream via poll_next() impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { type Output = Option<St::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.stream.poll_next_unpin(cx) } }
Creating Streams
The futures
library provides several convenient methods to create basic Stream
s, such as:
empty()
: creates an emptyStream
once()
: creates aStream
containing a single valuepending()
: creates aStream
that never yields a value and always returnsPoll::Pending
repeat()
: creates aStream
that repeatedly yields the same valuerepeat_with()
: creates aStream
that lazily yields values via a closurepoll_fn()
: creates aStream
from a closure that returnsPoll
unfold()
: creates aStream
from an initial state and a closure that returns aFuture
use futures::prelude::*; #[tokio::main] async fn main() { let mut st = stream::iter(1..10) .filter(|x| future::ready(x % 2 == 0)) .map(|x| x * x); // Iterate over the stream while let Some(x) = st.next().await { println!("Got item: {}", x); } }
In the code above, stream::iter
generates a Stream
, which is then passed through filter
and map
operations. Finally, the stream is iterated, and the resulting data is printed.
When you’re not concerned with async/await and only care about the stream behavior, Stream::iter
is quite handy for testing. Another interesting method is repeat_with
, which lets you pass a closure to lazily generate values on demand, for example:
use futures::stream::{self, StreamExt}; // From the zeroth to the third power of two: async fn stream_repeat_with() { let mut curr = 1; let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp }); assert_eq!(Some(1), pow2.next().await); assert_eq!(Some(2), pow2.next().await); assert_eq!(Some(4), pow2.next().await); assert_eq!(Some(8), pow2.next().await); }
Implementing a Stream
Creating your own Stream
involves two steps:
- First, define a
struct
to hold the stream’s state - Then, implement the
Stream
trait for thatstruct
Let’s create a stream called Counter
that counts from 1 to 5:
#![feature(async_stream)] // First, the struct: /// A stream that counts from one to five struct Counter { count: usize, } // We want the counter to start from one, so let’s add a `new()` method as a helper. // This isn’t strictly necessary, but it’s convenient. // Note that we start `count` from zero — the reason will be clear in the implementation of `poll_next()`. impl Counter { fn new() -> Counter { Counter { count: 0 } } } // Then, we implement `Stream` for `Counter`: impl Stream for Counter { // We’ll use `usize` for counting type Item = usize; // `poll_next()` is the only required method fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // Increment the counter. That’s why we started from zero. self.count += 1; // Check if we've finished counting. if self.count < 6 { Poll::Ready(Some(self.count)) } else { Poll::Ready(None) } } }
Stream Traits
There are several traits related to streams in Rust, such as Stream
, TryStream
, and FusedStream
.
-
Stream
is very similar toIterator
. However, when it returnsNone
, it signifies that the stream is exhausted and should no longer be polled. Continuing to poll a stream after it returnsNone
leads to undefined behavior and may cause unpredictable results. -
TryStream
is a specialized trait for streams that yieldResult<value, error>
items.TryStream
provides functions that make it easy to match and transform the innerResult
s. You can think of it as an API designed for streams that produceResult
items, making it more convenient to work with error-handling cases. -
FusedStream
is similar to a regular stream but adds the ability for users to know whether the stream is truly exhausted after returningNone
, or if it can be safely polled again. For example, if you’re creating a stream backed by a circular buffer, the stream might returnNone
on the first iteration, but withFusedStream
, it would be safe to poll again later to resume a new round of iteration over the buffer.
Iteration and Concurrency
Just like the Iterator
trait, Stream
also supports iteration. For example, you can use methods like map
, filter
, fold
, for_each
, skip
, as well as their error-aware counterparts: try_map
, try_filter
, try_fold
, try_for_each
, and so on.
Unlike Iterator
, however, for
loops can’t be used directly to iterate over a Stream
. Instead, imperative-style loops like while let
or loop
can be used, repeatedly calling next
or try_next
explicitly. For example, you can read from a stream in either of the following ways:
// Iteration pattern 1 while let Some(value) = s.next().await {} // Iteration pattern 2 loop { match s.next().await { Some(value) => {} None => break; } }
An example of computing the sum of values in a stream:
use futures_util::{pin_mut, Stream, stream, StreamExt}; async fn sum(stream: impl Stream<Item=usize>) -> usize { // Don’t forget to pin the stream before iteration pin_mut!(stream); let mut sum: usize = 0; // Iterate over the stream while let Some(item) = stream.next().await { sum = sum + item; } sum }
If you process one value at a time, you might miss out on the benefits of concurrency, which defeats the purpose of asynchronous programming. To process multiple values concurrently from a Stream
, you can use for_each_concurrent
and try_for_each_concurrent
:
use std::{pin::Pin, io}; use futures_util::{Stream, TryStreamExt}; async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> { // Use `try_for_each_concurrent` stream.try_for_each_concurrent(100, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } async fn jump_n_times(num: i32) -> Result<(), io::Error> { println!("jump_n_times :{}", num + 1); Ok(()) } async fn report_n_jumps(num: i32) -> Result<(), io::Error> { println!("report_n_jumps : {}", num); Ok(()) }
Summary
Stream
is similar to Future
, but while Future
represents the state change of a single item, Stream
behaves more like an Iterator
that can yield multiple values before completion. Or put simply: a Stream
consists of a series of Futures
, and we can retrieve the result of each Future
from the Stream
until it finishes—making it an asynchronous iterator.
The poll_next
function of a Stream
can return one of three possible values:
Poll::Pending
: indicates that the next value is not ready yet and we still need to wait.Poll::Ready(Some(val))
: indicates that a value is ready and has been successfully returned; you can callpoll_next
again to retrieve the next one.Poll::Ready(None)
: indicates that the stream has ended andpoll_next
should no longer be called.
We are Leapcell, your top choice for hosting Rust projects.
Leapcell is the Next-Gen Serverless Platform for Web Hosting, Async Tasks, and Redis:
Multi-Language Support
- Develop with Node.js, Python, Go, or Rust.
Deploy unlimited projects for free
- pay only for usage — no requests, no charges.
Unbeatable Cost Efficiency
- Pay-as-you-go with no idle charges.
- Example: $25 supports 6.94M requests at a 60ms average response time.
Streamlined Developer Experience
- Intuitive UI for effortless setup.
- Fully automated CI/CD pipelines and GitOps integration.
- Real-time metrics and logging for actionable insights.
Effortless Scalability and High Performance
- Auto-scaling to handle high concurrency with ease.
- Zero operational overhead — just focus on building.
Explore more in the Documentation!
Follow us on X: @LeapcellHQ