Async Programming in Rust: Stream Trait and Its Design
Ethan Miller
Product Engineer · Leapcell

The Stream trait is similar to the Future trait. While Future represents the state change of a single item, Stream, akin to the Iterator trait in the standard library, can yield multiple values before it finishes. Or simply put, a Stream is made up of a series of Futures, from which we can read each Future’s result until the Stream completes.
Definition of Stream
The Future is the most fundamental concept in asynchronous programming. If a Future represents a one-time asynchronous value, then a Stream represents a series of asynchronous values. Future is 1, while Stream is 0, 1, or N. The signature of Stream is as follows:
pub trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
The concept of Stream corresponds to the Iterator in synchronous primitives. Recall how similar even their signatures are!
pub trait Iterator { type Item; fn next(&mut self) -> Option<Self::Item>; }
Stream is used to abstract continuous data sources, although it can also end (when poll returns None)
A common example of a Stream is the consumer Receiver in the futures crate's message channel. Every time a message is sent from the Send side, the receiver gets a Some(val) value. Once the Send side is closed (dropped) and there are no more messages in the channel, it receives a None.
use futures::channel::mpsc; use futures::{executor::block_on, SinkExt, StreamExt}; async fn send_recv() { const BUFFER_SIZE: usize = 10; let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE); println!("tx: Send 1, 2"); tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); drop(tx); // `StreamExt::next` is similar to `Iterator::next`, but instead of returning a value, // it returns a `Future<Output = Option<T>>`, so you need `.await` to get the actual value assert_eq!(Some(1), rx.next().await); assert_eq!(Some(2), rx.next().await); assert_eq!(None, rx.next().await); } fn main() { block_on(send_recv()); }
Differences Between Iterator and Stream
- Iteratorallows repeatedly calling the- next()method to get new values until it returns- None.- Iteratoris blocking: each call to- next()occupies the CPU until a result is obtained. In contrast, the asynchronous- Streamis non-blocking and yields the CPU while waiting.
- Stream's- poll_next()method is quite similar to- Future's- poll()method, and its function is akin to the- next()method of- Iterator. However, calling- poll_next()directly is inconvenient because you need to manually handle the- Pollstate, which isn’t very ergonomic. That’s why Rust provides- StreamExt, an extension trait for- Stream, which offers a- next()method that returns a- Futureimplemented by the- Nextstruct. This way, you can directly iterate over a value with- stream.next().await.
Note:
StreamExtstands for Stream Extension. In Rust, it's a common practice to keep the minimal trait definition (likeStream) in one file, and put additional APIs (likeStreamExt) in a separate, related file.
Note: Unlike
Future, theStreamtrait is not yet in Rust’s core library (std::core). It resides in thefutures-utilcrate, andStreamExtensionsis also not part of the standard library. This means different libraries might provide conflicting imports. For example, Tokio provides its ownStreamExt, separate fromfutures-util. If possible, prefer usingfutures-util, as it's the most commonly used crate for async/await.
Implementation of StreamExt's next() method and the Next struct:
pub trait StreamExt: Stream { fn next(&mut self) -> Next<'_, Self> where Self: Unpin { assert_future::<Option<Self::Item>, _>(Next::new(self)) } } // `next` returns the `Next` struct pub struct Next<'a, St: ?Sized> { stream: &'a mut St, } // If Stream is Unpin, then Next is also Unpin impl<St: ?Sized + Unpin> Unpin for Next<'_, St> {} impl<'a, St: ?Sized + Stream + Unpin> Next<'a, St> { pub(super) fn new(stream: &'a mut St) -> Self { Self { stream } } } // Next implements Future, each poll() is essentially polling from the stream via poll_next() impl<St: ?Sized + Stream + Unpin> Future for Next<'_, St> { type Output = Option<St::Item>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.stream.poll_next_unpin(cx) } }
Creating Streams
The futures library provides several convenient methods to create basic Streams, such as:
- empty(): creates an empty- Stream
- once(): creates a- Streamcontaining a single value
- pending(): creates a- Streamthat never yields a value and always returns- Poll::Pending
- repeat(): creates a- Streamthat repeatedly yields the same value
- repeat_with(): creates a- Streamthat lazily yields values via a closure
- poll_fn(): creates a- Streamfrom a closure that returns- Poll
- unfold(): creates a- Streamfrom an initial state and a closure that returns a- Future
use futures::prelude::*; #[tokio::main] async fn main() { let mut st = stream::iter(1..10) .filter(|x| future::ready(x % 2 == 0)) .map(|x| x * x); // Iterate over the stream while let Some(x) = st.next().await { println!("Got item: {}", x); } }
In the code above, stream::iter generates a Stream, which is then passed through filter and map operations. Finally, the stream is iterated, and the resulting data is printed.
When you’re not concerned with async/await and only care about the stream behavior, Stream::iter is quite handy for testing. Another interesting method is repeat_with, which lets you pass a closure to lazily generate values on demand, for example:
use futures::stream::{self, StreamExt}; // From the zeroth to the third power of two: async fn stream_repeat_with() { let mut curr = 1; let mut pow2 = futures::stream::repeat_with(|| { let tmp = curr; curr *= 2; tmp }); assert_eq!(Some(1), pow2.next().await); assert_eq!(Some(2), pow2.next().await); assert_eq!(Some(4), pow2.next().await); assert_eq!(Some(8), pow2.next().await); }
Implementing a Stream
Creating your own Stream involves two steps:
- First, define a structto hold the stream’s state
- Then, implement the Streamtrait for thatstruct
Let’s create a stream called Counter that counts from 1 to 5:
#![feature(async_stream)] // First, the struct: /// A stream that counts from one to five struct Counter { count: usize, } // We want the counter to start from one, so let’s add a `new()` method as a helper. // This isn’t strictly necessary, but it’s convenient. // Note that we start `count` from zero — the reason will be clear in the implementation of `poll_next()`. impl Counter { fn new() -> Counter { Counter { count: 0 } } } // Then, we implement `Stream` for `Counter`: impl Stream for Counter { // We’ll use `usize` for counting type Item = usize; // `poll_next()` is the only required method fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // Increment the counter. That’s why we started from zero. self.count += 1; // Check if we've finished counting. if self.count < 6 { Poll::Ready(Some(self.count)) } else { Poll::Ready(None) } } }
Stream Traits
There are several traits related to streams in Rust, such as Stream, TryStream, and FusedStream.
- 
Streamis very similar toIterator. However, when it returnsNone, it signifies that the stream is exhausted and should no longer be polled. Continuing to poll a stream after it returnsNoneleads to undefined behavior and may cause unpredictable results.
- 
TryStreamis a specialized trait for streams that yieldResult<value, error>items.TryStreamprovides functions that make it easy to match and transform the innerResults. You can think of it as an API designed for streams that produceResultitems, making it more convenient to work with error-handling cases.
- 
FusedStreamis similar to a regular stream but adds the ability for users to know whether the stream is truly exhausted after returningNone, or if it can be safely polled again. For example, if you’re creating a stream backed by a circular buffer, the stream might returnNoneon the first iteration, but withFusedStream, it would be safe to poll again later to resume a new round of iteration over the buffer.
Iteration and Concurrency
Just like the Iterator trait, Stream also supports iteration. For example, you can use methods like map, filter, fold, for_each, skip, as well as their error-aware counterparts: try_map, try_filter, try_fold, try_for_each, and so on.
Unlike Iterator, however, for loops can’t be used directly to iterate over a Stream. Instead, imperative-style loops like while let or loop can be used, repeatedly calling next or try_next explicitly. For example, you can read from a stream in either of the following ways:
// Iteration pattern 1 while let Some(value) = s.next().await {} // Iteration pattern 2 loop { match s.next().await { Some(value) => {} None => break; } }
An example of computing the sum of values in a stream:
use futures_util::{pin_mut, Stream, stream, StreamExt}; async fn sum(stream: impl Stream<Item=usize>) -> usize { // Don’t forget to pin the stream before iteration pin_mut!(stream); let mut sum: usize = 0; // Iterate over the stream while let Some(item) = stream.next().await { sum = sum + item; } sum }
If you process one value at a time, you might miss out on the benefits of concurrency, which defeats the purpose of asynchronous programming. To process multiple values concurrently from a Stream, you can use for_each_concurrent and try_for_each_concurrent:
use std::{pin::Pin, io}; use futures_util::{Stream, TryStreamExt}; async fn jump_around(stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>) -> Result<(), io::Error> { // Use `try_for_each_concurrent` stream.try_for_each_concurrent(100, |num| async move { jump_n_times(num).await?; report_n_jumps(num).await?; Ok(()) }).await?; Ok(()) } async fn jump_n_times(num: i32) -> Result<(), io::Error> { println!("jump_n_times :{}", num + 1); Ok(()) } async fn report_n_jumps(num: i32) -> Result<(), io::Error> { println!("report_n_jumps : {}", num); Ok(()) }
Summary
Stream is similar to Future, but while Future represents the state change of a single item, Stream behaves more like an Iterator that can yield multiple values before completion. Or put simply: a Stream consists of a series of Futures, and we can retrieve the result of each Future from the Stream until it finishes—making it an asynchronous iterator.
The poll_next function of a Stream can return one of three possible values:
- Poll::Pending: indicates that the next value is not ready yet and we still need to wait.
- Poll::Ready(Some(val)): indicates that a value is ready and has been successfully returned; you can call- poll_nextagain to retrieve the next one.
- Poll::Ready(None): indicates that the stream has ended and- poll_nextshould no longer be called.
We are Leapcell, your top choice for hosting Rust projects.
Leapcell is the Next-Gen Serverless Platform for Web Hosting, Async Tasks, and Redis:
Multi-Language Support
- Develop with Node.js, Python, Go, or Rust.
Deploy unlimited projects for free
- pay only for usage — no requests, no charges.
Unbeatable Cost Efficiency
- Pay-as-you-go with no idle charges.
- Example: $25 supports 6.94M requests at a 60ms average response time.
Streamlined Developer Experience
- Intuitive UI for effortless setup.
- Fully automated CI/CD pipelines and GitOps integration.
- Real-time metrics and logging for actionable insights.
Effortless Scalability and High Performance
- Auto-scaling to handle high concurrency with ease.
- Zero operational overhead — just focus on building.
Explore more in the Documentation!
Follow us on X: @LeapcellHQ



