ZetCode

Dart Streams

last modified May 29, 2026

Introduction to Dart Streams

A Stream is a sequence of asynchronous events. While a Future represents a single value that will be available at some point in the future, a Stream can deliver many values over time. Think of a Future as a pizza delivery that arrives once; a Stream is more like a conveyor belt that drops a new item every few seconds.

Streams are everywhere in Dart: user interface events, file I/O, WebSocket messages, and even asynchronous generators are all built on top of streams. They allow you to react to data as it arrives without blocking the main thread.

FeatureFutureStream
Number of valuesSingle value (or error)Zero or more values
CompletionAlways completes (with value or error)May or may not complete (some are infinite)
Consumerthen(), awaitlisten(), await for
Error handlingcatchError(), try-catchonError callback, catchError(), try-catch in await for
ExampleHTTP request resultMouse clicks, file system watcher events

Dart has two fundamental types of streams: single‑subscription and broadcast. A single‑subscription stream can be listened to only once; it is ideal for representing a series of events that belong to a single consumer, such as reading a file from start to finish. Broadcast streams allow multiple listeners and are suited for event buses, mouse clicks, or any scenario where several parts of the application need the same events.

stream_types.dart
import 'dart:async';

void main() {
  // Single-subscription stream (only one listener allowed)
  final singleStream = Stream<int>.fromIterable([1, 2, 3]);
  singleStream.listen((val) => print('Single: $val'));
  // singleStream.listen(...) // this would throw a StateError

  // Broadcast stream (many listeners allowed)
  final broadcastController = StreamController<int>.broadcast();
  final broadcastStream = broadcastController.stream;

  broadcastStream.listen((val) => print('Listener 1: $val'));
  broadcastStream.listen((val) => print('Listener 2: $val'));

  broadcastController.sink.add(42);
  broadcastController.close();
  // Both listeners receive 42.
}

The example above shows the critical difference: the broadcast stream gracefully handles multiple listeners, while the single‑subscription stream does not.

Creating and Consuming Streams

Creating streams with async*

The easiest way to create a stream is with an asynchronous generator function marked async*. Inside, yield emits a new value into the stream. When the function body ends, the stream closes automatically.

async_generator.dart
import 'dart:async';

Stream<int> countDown(int from) async* {
  for (int i = from; i > 0; i--) {
    yield i;
    await Future.delayed(const Duration(milliseconds: 500));
  }
}

void main() {
  final stream = countDown(3);
  stream.listen((val) => print(val));
}

The countDown function returns a lazy stream. Nothing executes until a listener subscribes. Each yield pauses the generator until the next event is requested. This is perfect for producing sequences with built‑in delays or data that naturally unfolds over time.

Creating streams with StreamController

When you need manual control — perhaps to adapt a callback‑based API — StreamController is the right tool. You push events into the controller’s sink, and listeners receive them on the stream.

controller_creation.dart
import 'dart:async';

void main() {
  final controller = StreamController<String>();

  controller.stream.listen((msg) => print('Received: $msg'));

  controller.sink.add('First');
  controller.sink.add('Second');
  controller.close(); // signals that no more events will come
}

Remember to always call close() when you are done; otherwise the stream stays open forever and may leak resources.

Consuming streams: listen()

The most common way to consume a stream is the listen() method. It accepts callbacks for data, errors, and completion. The returned StreamSubscription lets you pause, resume, or cancel the stream.

listen_method.dart
import 'dart:async';

void main() {
  final stream = Stream<int>.periodic(
    const Duration(seconds: 1),
    (count) => count + 1,
  ).take(3);

  final subscription = stream.listen(
    (data) => print('Tick: $data'),
    onError: (e) => print('Error: $e'),
    onDone:  () => print('Stream finished.'),
  );

  // Cancel after 2 seconds if not already done
  Timer(const Duration(seconds: 2), () => subscription.cancel());
}

Consuming streams: await for

Inside an async function, you can use an await for loop. It reads events one‑by‑one and pauses execution until the next event or until the stream closes. This is the most readable way to consume a stream when you are in an asynchronous context.

await_for.dart
import 'dart:async';

Future<void> main() async {
  final stream = Stream<String>.fromIterable(['D', 'A', 'R', 'T']);

  await for (final letter in stream) {
    stdout.write('$letter ');
  }
  print(''); // newline
}

An await for loop automatically halts when the stream closes. If the stream never closes, the loop runs forever — use it carefully with infinite streams.

Stream Error Handling

Emitting errors into a stream

add_error.dart
import 'dart:async';

void main() {
  final controller = StreamController<int>();

  controller.stream.listen(
    (data) => print('Data: $data'),
    onError: (e) => print('Stream error: $e'),
  );

  controller.sink.add(1);
  controller.sink.addError('Oops! Something broke.');
  controller.sink.add(2); // still delivered after the error
  controller.close();
  // Output: Data: 1 → Stream error: Oops! → Data: 2
}

Handling errors with the onError callback

onerror_callback.dart
import 'dart:async';

void main() {
  final stream = Stream<int>.error(Exception('Simulated failure'));

  stream.listen(
    (data) => print('Data: $data'),
    onError: (e) => print('Caught: $e'),
    onDone: () => print('Done.'),
  );
}

Handling errors with .catchError() transformer

catcherror_transformer.dart
import 'dart:async';

void main() {
  // A stream that occasionally throws.
  final source = Stream<int>.fromFutures([
    Future.value(1),
    Future.error('Invalid data'),
    Future.value(3),
  ]);

  final safeStream = source.catchError((error) {
    print('Replacing error "$error" with 0');
    return 0; // replace the error with a fallback value
  });

  safeStream.listen(print);
  // Output: 1 → Replacing error "Invalid data" with 0 → 0 → 3
}

Using try-catch inside an await for loop

await_for_trycatch.dart
import 'dart:async';

Future<void> main() async {
  final controller = StreamController<int>();

  // Add some data and an error
  controller.sink.add(10);
  controller.sink.add(20);
  controller.sink.addError('Boom at 30');
  controller.sink.add(40); // this will not be received because the loop breaks
  controller.close();

  try {
    await for (final value in controller.stream) {
      print('Processing $value');
    }
  } catch (e) {
    print('Caught in await for: $e');
  }

  // Output:
  // Processing 10
  // Processing 20
  // Caught in await for: Boom at 30
}

Error-resilient stream processor

sensor_stream.dart
import 'dart:async';

/// Simulated sensor data. Every third reading is invalid.
Stream<double> sensorReadings() async* {
  for (int i = 1; i <= 9; i++) {
    await Future.delayed(const Duration(milliseconds: 200));
    if (i % 3 == 0) {
      throw FormatException('Corrupted reading at index $i');
    }
    yield i * 1.5;
  }
}

Future<void> main() async {
  final safeStream = sensorReadings().catchError((error) {
    print('⚠️  Replacing error: $error');
    return -1.0; // sentinel for invalid data
  });

  int readingCount = 0;
  try {
    await for (final value in safeStream) {
      readingCount++;
      if (value == -1.0) {
        print('Reading $readingCount: INVALID (skipped)');
      } else {
        print('Reading $readingCount: ${value.toStringAsFixed(1)} °C');
      }
    }
  } catch (e) {
    print('Fatal stream error: $e');
  }

  print('Processed $readingCount readings in total.');
}

Delayed stream

Stream.periodic emits one event per interval; the factory callback receives the zero-based event index and can return any computed value. An async* generator can also introduce delays between yield statements with await Future.delayed, giving fine-grained control over timing.

delayed_stream.dart
import 'dart:async';

// async* generator that yields Fibonacci numbers with a pause between each.
Stream<int> fibStream() async* {
  int a = 0, b = 1;
  while (true) {
    yield a;
    await Future.delayed(const Duration(milliseconds: 200));
    final next = a + b;
    a = b;
    b = next;
  }
}

void main() async {
  // Stream.periodic: one event per interval, value computed from index.
  print('-- Periodic --');
  await for (final t in Stream.periodic(
      const Duration(milliseconds: 300),
      (i) => 'tick ${i + 1}').take(4)) {
    print(t); // tick 1, tick 2, tick 3, tick 4
  }

  // async* generator: take(8) caps the infinite stream safely.
  print('-- Fibonacci --');
  await for (final n in fibStream().take(8)) {
    print(n); // 0 1 1 2 3 5 8 13
  }
}

take(n) automatically cancels the subscription after n events, making it safe to drive infinite streams without a manual cancel.

Transformation chain

Stream operators such as map, where, and asyncMap each return a new stream and compose into a pipeline. asyncMap is particularly useful when each element requires an asynchronous step before the next transformation can proceed. The pipeline is lazy: processing begins only when a terminal operation is attached.

transform_chain.dart
import 'dart:async';

// Simulates an async lookup (e.g., a database or cache read).
Future<String> fetchLabel(int id) async {
  await Future.delayed(const Duration(milliseconds: 50));
  return 'item-$id';
}

void main() async {
  final result = await Stream.fromIterable(List.generate(10, (i) => i + 1))
      .where((n) => n.isOdd)      // 1 3 5 7 9
      .map((n) => n * n)           // 1 9 25 49 81
      .asyncMap(fetchLabel)        // await a lookup for each squared value
      .toList();

  print(result);
  // [item-1, item-9, item-25, item-49, item-81]
}

Merging streams

Dart does not include a built-in merge operator, but one can be assembled with a StreamController. Events from all sources interleave in real-time arrival order. The merged stream closes once every source stream has completed.

merge_streams.dart
import 'dart:async';

// Merge multiple streams concurrently – events interleave as they arrive.
Stream<T> merge<T>(List<Stream<T>> sources) {
  final controller = StreamController<T>();
  int pending = sources.length;
  for (final s in sources) {
    s.listen(
      controller.add,
      onError: controller.addError,
      onDone: () { if (--pending == 0) controller.close(); },
    );
  }
  return controller.stream;
}

void main() async {
  final fast = Stream.periodic(
      const Duration(milliseconds: 200), (i) => 'fast-$i').take(4);
  final slow = Stream.periodic(
      const Duration(milliseconds: 500), (i) => 'slow-$i').take(2);

  await for (final event in merge([fast, slow])) {
    print(event);
    // fast-0, fast-1, slow-0, fast-2, fast-3, slow-1
  }
  print('All sources complete.');
}

Because all subscriptions are started immediately, both sources produce events concurrently on the same event loop without any additional threads.

Source

Dart streams tutorial, Stream class documentation, Creating streams in Dart

In this tutorial we covered Dart Streams: what they are, single-subscription vs broadcast models, creating streams with async* and StreamController, consuming them with listen() and await for, transforming data through operator pipelines, and handling errors. The final examples demonstrated delayed and periodic streams, a multi-step transformation chain, and merging concurrent streams.

Author

My name is Jan Bodnar, and I am a passionate programmer with extensive programming experience. I have been writing programming articles since 2007. To date, I have authored over 1,400 articles and 8 e-books. I possess more than ten years of experience in teaching programming.

List all Dart tutorials.