Dart Streams
last modified May 29, 2026
Introduction to Dart Streams
A Stream is a sequence of asynchronous events. While a
Future represents a single value that will be available at
some point in the future, a Stream can deliver many
values over time. Think of a Future as a pizza delivery that arrives
once; a Stream is more like a conveyor belt that drops a new item every few
seconds.
Streams are everywhere in Dart: user interface events, file I/O, WebSocket messages, and even asynchronous generators are all built on top of streams. They allow you to react to data as it arrives without blocking the main thread.
| Feature | Future | Stream |
|---|---|---|
| Number of values | Single value (or error) | Zero or more values |
| Completion | Always completes (with value or error) | May or may not complete (some are infinite) |
| Consumer | then(), await | listen(), await for |
| Error handling | catchError(), try-catch | onError callback, catchError(), try-catch in await for |
| Example | HTTP request result | Mouse clicks, file system watcher events |
Dart has two fundamental types of streams: single‑subscription and broadcast. A single‑subscription stream can be listened to only once; it is ideal for representing a series of events that belong to a single consumer, such as reading a file from start to finish. Broadcast streams allow multiple listeners and are suited for event buses, mouse clicks, or any scenario where several parts of the application need the same events.
import 'dart:async';
void main() {
// Single-subscription stream (only one listener allowed)
final singleStream = Stream<int>.fromIterable([1, 2, 3]);
singleStream.listen((val) => print('Single: $val'));
// singleStream.listen(...) // this would throw a StateError
// Broadcast stream (many listeners allowed)
final broadcastController = StreamController<int>.broadcast();
final broadcastStream = broadcastController.stream;
broadcastStream.listen((val) => print('Listener 1: $val'));
broadcastStream.listen((val) => print('Listener 2: $val'));
broadcastController.sink.add(42);
broadcastController.close();
// Both listeners receive 42.
}
The example above shows the critical difference: the broadcast stream gracefully handles multiple listeners, while the single‑subscription stream does not.
Creating and Consuming Streams
Creating streams with async*
The easiest way to create a stream is with an asynchronous generator
function marked async*. Inside, yield emits a new
value into the stream. When the function body ends, the stream closes
automatically.
import 'dart:async';
Stream<int> countDown(int from) async* {
for (int i = from; i > 0; i--) {
yield i;
await Future.delayed(const Duration(milliseconds: 500));
}
}
void main() {
final stream = countDown(3);
stream.listen((val) => print(val));
}
The countDown function returns a lazy stream. Nothing executes
until a listener subscribes. Each yield pauses the generator
until the next event is requested. This is perfect for producing sequences
with built‑in delays or data that naturally unfolds over time.
Creating streams with StreamController
When you need manual control — perhaps to adapt a callback‑based API —
StreamController is the right tool. You push events into the
controller’s sink, and listeners receive them on the
stream.
import 'dart:async';
void main() {
final controller = StreamController<String>();
controller.stream.listen((msg) => print('Received: $msg'));
controller.sink.add('First');
controller.sink.add('Second');
controller.close(); // signals that no more events will come
}
Remember to always call close() when you are done; otherwise the
stream stays open forever and may leak resources.
Consuming streams: listen()
The most common way to consume a stream is the listen() method.
It accepts callbacks for data, errors, and completion. The returned
StreamSubscription lets you pause, resume, or cancel the stream.
import 'dart:async';
void main() {
final stream = Stream<int>.periodic(
const Duration(seconds: 1),
(count) => count + 1,
).take(3);
final subscription = stream.listen(
(data) => print('Tick: $data'),
onError: (e) => print('Error: $e'),
onDone: () => print('Stream finished.'),
);
// Cancel after 2 seconds if not already done
Timer(const Duration(seconds: 2), () => subscription.cancel());
}
Consuming streams: await for
Inside an async function, you can use an await for loop. It
reads events one‑by‑one and pauses execution until the next event or until
the stream closes. This is the most readable way to consume a stream when
you are in an asynchronous context.
import 'dart:async';
Future<void> main() async {
final stream = Stream<String>.fromIterable(['D', 'A', 'R', 'T']);
await for (final letter in stream) {
stdout.write('$letter ');
}
print(''); // newline
}
An await for loop automatically halts when the stream closes.
If the stream never closes, the loop runs forever — use it carefully with
infinite streams.
Stream Error Handling
Emitting errors into a stream
import 'dart:async';
void main() {
final controller = StreamController<int>();
controller.stream.listen(
(data) => print('Data: $data'),
onError: (e) => print('Stream error: $e'),
);
controller.sink.add(1);
controller.sink.addError('Oops! Something broke.');
controller.sink.add(2); // still delivered after the error
controller.close();
// Output: Data: 1 → Stream error: Oops! → Data: 2
}
Handling errors with the onError callback
import 'dart:async';
void main() {
final stream = Stream<int>.error(Exception('Simulated failure'));
stream.listen(
(data) => print('Data: $data'),
onError: (e) => print('Caught: $e'),
onDone: () => print('Done.'),
);
}
Handling errors with .catchError() transformer
import 'dart:async';
void main() {
// A stream that occasionally throws.
final source = Stream<int>.fromFutures([
Future.value(1),
Future.error('Invalid data'),
Future.value(3),
]);
final safeStream = source.catchError((error) {
print('Replacing error "$error" with 0');
return 0; // replace the error with a fallback value
});
safeStream.listen(print);
// Output: 1 → Replacing error "Invalid data" with 0 → 0 → 3
}
Using try-catch inside an await for loop
import 'dart:async';
Future<void> main() async {
final controller = StreamController<int>();
// Add some data and an error
controller.sink.add(10);
controller.sink.add(20);
controller.sink.addError('Boom at 30');
controller.sink.add(40); // this will not be received because the loop breaks
controller.close();
try {
await for (final value in controller.stream) {
print('Processing $value');
}
} catch (e) {
print('Caught in await for: $e');
}
// Output:
// Processing 10
// Processing 20
// Caught in await for: Boom at 30
}
Error-resilient stream processor
import 'dart:async';
/// Simulated sensor data. Every third reading is invalid.
Stream<double> sensorReadings() async* {
for (int i = 1; i <= 9; i++) {
await Future.delayed(const Duration(milliseconds: 200));
if (i % 3 == 0) {
throw FormatException('Corrupted reading at index $i');
}
yield i * 1.5;
}
}
Future<void> main() async {
final safeStream = sensorReadings().catchError((error) {
print('⚠️ Replacing error: $error');
return -1.0; // sentinel for invalid data
});
int readingCount = 0;
try {
await for (final value in safeStream) {
readingCount++;
if (value == -1.0) {
print('Reading $readingCount: INVALID (skipped)');
} else {
print('Reading $readingCount: ${value.toStringAsFixed(1)} °C');
}
}
} catch (e) {
print('Fatal stream error: $e');
}
print('Processed $readingCount readings in total.');
}
Delayed stream
Stream.periodic emits one event per interval; the factory
callback receives the zero-based event index and can return any computed
value. An async* generator can also introduce delays between
yield statements with await Future.delayed,
giving fine-grained control over timing.
import 'dart:async';
// async* generator that yields Fibonacci numbers with a pause between each.
Stream<int> fibStream() async* {
int a = 0, b = 1;
while (true) {
yield a;
await Future.delayed(const Duration(milliseconds: 200));
final next = a + b;
a = b;
b = next;
}
}
void main() async {
// Stream.periodic: one event per interval, value computed from index.
print('-- Periodic --');
await for (final t in Stream.periodic(
const Duration(milliseconds: 300),
(i) => 'tick ${i + 1}').take(4)) {
print(t); // tick 1, tick 2, tick 3, tick 4
}
// async* generator: take(8) caps the infinite stream safely.
print('-- Fibonacci --');
await for (final n in fibStream().take(8)) {
print(n); // 0 1 1 2 3 5 8 13
}
}
take(n) automatically cancels the subscription after n
events, making it safe to drive infinite streams without a manual cancel.
Transformation chain
Stream operators such as map, where, and
asyncMap each return a new stream and compose into a pipeline.
asyncMap is particularly useful when each element requires an
asynchronous step before the next transformation can proceed. The pipeline
is lazy: processing begins only when a terminal operation is attached.
import 'dart:async';
// Simulates an async lookup (e.g., a database or cache read).
Future<String> fetchLabel(int id) async {
await Future.delayed(const Duration(milliseconds: 50));
return 'item-$id';
}
void main() async {
final result = await Stream.fromIterable(List.generate(10, (i) => i + 1))
.where((n) => n.isOdd) // 1 3 5 7 9
.map((n) => n * n) // 1 9 25 49 81
.asyncMap(fetchLabel) // await a lookup for each squared value
.toList();
print(result);
// [item-1, item-9, item-25, item-49, item-81]
}
Merging streams
Dart does not include a built-in merge operator, but one can
be assembled with a StreamController. Events from all sources
interleave in real-time arrival order. The merged stream closes once every
source stream has completed.
import 'dart:async';
// Merge multiple streams concurrently – events interleave as they arrive.
Stream<T> merge<T>(List<Stream<T>> sources) {
final controller = StreamController<T>();
int pending = sources.length;
for (final s in sources) {
s.listen(
controller.add,
onError: controller.addError,
onDone: () { if (--pending == 0) controller.close(); },
);
}
return controller.stream;
}
void main() async {
final fast = Stream.periodic(
const Duration(milliseconds: 200), (i) => 'fast-$i').take(4);
final slow = Stream.periodic(
const Duration(milliseconds: 500), (i) => 'slow-$i').take(2);
await for (final event in merge([fast, slow])) {
print(event);
// fast-0, fast-1, slow-0, fast-2, fast-3, slow-1
}
print('All sources complete.');
}
Because all subscriptions are started immediately, both sources produce events concurrently on the same event loop without any additional threads.
Source
Dart streams tutorial, Stream class documentation, Creating streams in Dart
In this tutorial we covered Dart Streams: what they are, single-subscription
vs broadcast models, creating streams with async* and
StreamController, consuming them with listen() and
await for, transforming data through operator pipelines, and
handling errors. The final examples demonstrated delayed and periodic streams,
a multi-step transformation chain, and merging concurrent streams.
Author
List all Dart tutorials.