ZetCode

Dart StreamController

last modified May 29, 2026

Introduction

A StreamController is the object that gives you manual control over a Stream. While many streams come from built-in sources (files, sockets, timers), a StreamController lets you build a stream from scratch: you decide when data, errors, or the done signal are delivered. It is the bridge between imperative code and the reactive stream world.

Common use cases include adapting callback-based APIs into streams, implementing event buses for communication between app components, and creating custom asynchronous data sources. The controller exposes a sink to push events in, and a stream that listeners subscribe to. This separation keeps the producer side neatly isolated from the consumer.

basic_controller.dart
import 'dart:async';

void main() {
  // Create a single-subscription controller.
  final controller = StreamController<int>();

  // Get the stream we want to listen to.
  final stream = controller.stream;

  // Subscribe before adding any events.
  stream.listen(
    (data) => print('Received: $data'),
    onDone: () => print('Stream closed.'),
    onError: (e) => print('Error: $e'),
  );

  // Add some data through the sink.
  controller.sink.add(1);
  controller.sink.add(2);
  controller.sink.add(3);

  // Close the controller → stream emits done.
  controller.close();
  // Output:
  // Received: 1
  // Received: 2
  // Received: 3
  // Stream closed.
}

The controller must be closed when no more events will be added. Forgetting to close it keeps the stream open forever and may leak resources. The sink is a convenience getter that returns the controller itself (a StreamSink), allowing method chaining.

Single-subscription vs. broadcast streams

A single-subscription stream allows only one listener at a time. A broadcast stream can have many listeners, and events are delivered to all current subscribers. Which one you need depends on the use case.

FeatureStreamController()StreamController.broadcast()
ListenersOnly oneMany
Listening after eventsNot allowed (only during stream lifetime)Possible, but misses earlier events
onListen callbackCalled onceCalled for each new listener
Typical useOne-shot async operations, request/responseEvent buses, UI events, multi-listener scenarios

The table above highlights the fundamental difference. For most custom streams that wrap a single data source, the plain StreamController() is the correct choice.

broadcast_controller.dart
import 'dart:async';

void main() {
  final controller = StreamController<String>.broadcast();

  // First subscriber
  controller.stream.listen((data) => print('Sub1: $data'));

  controller.sink.add('Hello');

  // Second subscriber – misses "Hello"
  controller.stream.listen((data) => print('Sub2: $data'));

  controller.sink.add('World');

  // Close after all listeners are done (usually not this soon in real apps)
  controller.close();

  // Output:
  // Sub1: Hello
  // Sub1: World
  // Sub2: World
}

Notice that the second subscriber only receives events added after it subscribed. The “Hello” event was already delivered to the first listener and is not replayed. Broadcast controllers are inherently live; they do not cache old events.

Lifecycle callbacks and backpressure

A StreamController accepts optional callback arguments: onListen, onPause, onResume, and onCancel. These let the producer react to changes in the listener’s state — for example, pausing an expensive data source when the consumer is overwhelmed.

lifecycle_callbacks.dart
import 'dart:async';

void main() {
  final controller = StreamController<int>(
    onListen: () => print('Listener attached.'),
    onPause:  () => print('Stream paused.'),
    onResume: () => print('Stream resumed.'),
    onCancel: () => print('Listener removed.'),
  );

  final subscription = controller.stream.listen(
    (data) {
      print('Got: $data');
      // Simulate slow processing → backpressure
      if (data == 2) subscription.pause();
      if (data == 4) subscription.resume();
    },
    onDone: () => print('Done.'),
  );

  for (int i = 1; i <= 5; i++) {
    controller.sink.add(i);
  }
  controller.close();
}

When the listener calls subscription.pause() the controller’s onPause callback fires, and it stops forwarding events until subscription.resume() is called. The events themselves are buffered by the controller (within reasonable limits), so no data is lost. This mechanism is crucial for adapting push-based sources (like hardware sensors) that might otherwise flood a slow consumer.

You can also create a controller with a custom onCancel that cleans up external resources — for example, closing a file or cancelling a timer. This makes the stream a self-contained lifecycle owner.

Adding errors

Streams can also carry errors, which are delivered to the listener’s onError handler. Use controller.sink.addError(error, [stackTrace]) to push an error. If no error handler is provided, the error propagates to the zone’s uncaught error handler.

error_handling.dart
import 'dart:async';

void main() {
  final controller = StreamController<int>();

  controller.stream.listen(
    (data) => print('Data: $data'),
    onError: (e, s) => print('Caught: $e'),
    onDone:  () => print('Done.'),
  );

  controller.sink.add(10);
  controller.sink.addError('Something went wrong');
  controller.sink.add(20);
  controller.close();
}

Errors do not close the stream by default. The stream continues to deliver subsequent data or errors unless the controller is explicitly closed. This behaviour mirrors that of other Dart streams: an error is a “stumble”, not a terminal event.

Practical example — simple timer stream

A timer that periodically emits a value is a perfect demonstration of a controller adapting an imperative construct (Timer.periodic) into a stream. The controller is created, the timer pushes ticks, and when the listener cancels, the timer is stopped.

timer_stream.dart
import 'dart:async';

/// Returns a stream that emits an incrementing integer every second.
Stream<int> counterStream([int maxCount = 5]) {
  final controller = StreamController<int>(
    onCancel: () => print('Timer cleaned up.'),
  );

  int count = 1;
  final timer = Timer.periodic(const Duration(seconds: 1), (timer) {
    controller.sink.add(count);
    count++;
    if (count > maxCount) {
      timer.cancel();
      controller.close();
    }
  });

  // Ensure the timer is cancelled if the listener cancels early.
  controller.onCancel = () => timer.cancel();

  return controller.stream;
}

void main() {
  final stream = counterStream(3);

  final subscription = stream.listen(
    (tick) => print('Tick $tick'),
    onDone: () => print('Counter finished.'),
  );

  // Optional: cancel after 2 seconds to see early cleanup.
  // Timer(Duration(seconds: 2), () => subscription.cancel());
}

The timer is created inside the function, but its lifetime is tied to the stream: if the listener cancels early, the onCancel callback stops the timer. The controller is closed from inside the timer when the maximum count is reached, signalling completion to the listener.

Practical example — event bus

An event bus is a classic use for a broadcast controller. Different parts of an application can fire events and other parts can listen without knowing about each other, achieving loose coupling.

event_bus.dart
import 'dart:async';

class EventBus {
  final _controller = StreamController<dynamic>.broadcast();

  /// Listen to all events of any type.
  Stream<dynamic> get events => _controller.stream;

  /// Fire an event onto the bus.
  void emit(dynamic event) => _controller.sink.add(event);

  /// Filter the stream by event type.
  Stream<T> onEvent<T>() => events.where((e) => e is T).cast<T>();

  void dispose() => _controller.close();
}

void main() {
  final bus = EventBus();

  // Widget A listens for login events
  bus.onEvent<LoginEvent>().listen((e) =>
    print('Widget A: User ${e.username} logged in.'),
  );

  // Service B also listens for login events
  bus.onEvent<LoginEvent>().listen((e) =>
    print('Service B: Sending welcome email to ${e.username}.'),
  );

  // Somewhere in the app, a login occurs
  bus.emit(LoginEvent('alice'));

  bus.dispose();
}

class LoginEvent {
  final String username;
  LoginEvent(this.username);
}

The event bus is a thin wrapper around a broadcast StreamController. The onEvent<T>() helper filters the stream so subscribers only receive events of a specific type. This pattern scales well and avoids tight coupling between producers and consumers.

Limitations and common pitfalls

close_guard.dart
import 'dart:async';

void main() {
  final controller = StreamController<int>();
  controller.close();

  // controller.sink.add(1);           // throws StateError
  // controller.sink.addError('oops'); // throws StateError
  print('Controller closed: ${controller.isClosed}'); // true
}

The example above shows that once closed, any attempt to add data or errors fails. The isClosed property is a quick check to avoid such exceptions in code that may be called asynchronously after the controller has been shut down.

Complete runnable example — multi-feature demo

The following program brings together all the concepts: a broadcast controller with lifecycle logging, backpressure handling, error injection, and proper cleanup. It simulates a data source that emits numbers, pauses and resumes on demand, and finally closes gracefully.

stream_controller_demo.dart
import 'dart:async';
import 'dart:io';

void main() {
  final controller = StreamController<int>.broadcast(
    onListen: () => stdout.writeln('🎧 Listener attached.'),
    onPause:  () => stdout.writeln('⏸️  Stream paused.'),
    onResume: () => stdout.writeln('▶️  Stream resumed.'),
    onCancel: () => stdout.writeln('🛑 Listener cancelled.'),
  );

  // First listener: processes slowly to trigger backpressure
  late StreamSubscription sub1;
  sub1 = controller.stream.listen(
    (data) {
      stdout.writeln('Sub1 received: $data');
      // Pause when value reaches 5, resume after 1 second
      if (data == 5) {
        sub1.pause();
        Timer(const Duration(seconds: 1), () => sub1.resume());
      }
    },
    onError: (e) => stdout.writeln('Sub1 error: $e'),
    onDone:  () => stdout.writeln('Sub1 done.'),
  );

  // Second listener (broadcast allows many)
  controller.stream.listen(
    (data) => stdout.writeln('Sub2 also got: $data'),
    onError: (e) => stdout.writeln('Sub2 error: $e'),
    onDone:  () => stdout.writeln('Sub2 done.'),
  );

  // Producer side: push events with a small delay
  int count = 1;
  Timer.periodic(const Duration(milliseconds: 300), (timer) {
    if (controller.isClosed) {
      timer.cancel();
      return;
    }
    if (count == 8) {
      controller.sink.addError('Artificial error at count $count');
      count++;
    } else if (count <= 10) {
      controller.sink.add(count++);
    } else {
      controller.close();
      timer.cancel();
    }
  });

  // Keep the program alive until the stream is done
  controller.done.then((_) => stdout.writeln('✅ Controller closed.'));
}

Run the programme and observe the terminal output. The two subscribers receive the same events simultaneously. When subscriber 1 pauses at value 5, the controller logs the pause; subsequent values are buffered until the resume callback fires. The error event at count 8 is delivered to both listeners without terminating the stream. The stream finishes naturally when the controller is closed after the tenth value.

Wrapping a callback-based API

One of the most common reasons to reach for a StreamController is adapting a legacy callback-based API into a stream. The controller receives the callbacks from the old API and forwards them into the stream world, while keeping the calling code clean and idiomatic.

callback_wrap.dart
import 'dart:async';

// Simulates a legacy callback-based reader (could be FFI, native plugin, etc.).
void legacyRead(void Function(String line) onLine, void Function() onDone) {
  const lines = ['alpha', 'beta', 'gamma', 'delta'];
  var i = 0;
  Timer.periodic(const Duration(milliseconds: 100), (timer) {
    if (i < lines.length) {
      onLine(lines[i++]);
    } else {
      timer.cancel();
      onDone();
    }
  });
}

// Wraps the callback API in a clean Stream<String>.
Stream<String> linesStream() {
  final controller = StreamController<String>();
  legacyRead(
    (line) => controller.add(line),  // onLine  → push data
    ()     => controller.close(),    // onDone  → signal completion
  );
  return controller.stream;
}

void main() async {
  await for (final line in linesStream()) {
    print(line); // alpha, beta, gamma, delta
  }
}

The wrapper function starts the legacy source, wires its callbacks into the controller's sink, and returns the stream before any events have arrived. Callers see a plain Stream<String> with no knowledge of the underlying callback mechanism.

Piping streams with addStream

StreamController.addStream(source) forwards all events from source into the controller and returns a Future that completes when source is done. Awaiting it in a loop makes it straightforward to concatenate multiple streams sequentially without buffering them in memory.

concatenate.dart
import 'dart:async';

// Concatenate multiple streams sequentially into one.
Stream<int> concatenate(List<Stream<int>> sources) {
  final controller = StreamController<int>();

  // Pipe each source in turn; close the controller when all are done.
  Future<void> pipeAll() async {
    for (final source in sources) {
      await controller.addStream(source);
    }
    controller.close();
  }
  pipeAll(); // start piping; do not await here so the stream can be returned

  return controller.stream;
}

void main() async {
  final all = concatenate([
    Stream.fromIterable([1, 2, 3]),
    Stream.fromIterable([4, 5]),
    Stream.fromIterable([6, 7, 8, 9]),
  ]);

  await for (final value in all) {
    print(value); // 1 2 3 4 5 6 7 8 9 – always in order
  }
}

Because addStream suspends the loop until each source completes, the output order is deterministic. A single-subscription controller is the right choice here; the concatenated stream has exactly one consumer.

Merging concurrent streams

Concatenation is sequential, but sometimes you want to interleave events from multiple sources as they arrive — a fan-in merge. A broadcast or single-subscription controller can be used depending on whether the result needs one or many listeners. The controller closes only after every source has completed.

merge.dart
import 'dart:async';

// Merge multiple streams concurrently; events interleave as they arrive.
// The output stream closes when ALL source streams have completed.
Stream<T> merge<T>(List<Stream<T>> sources) {
  final controller = StreamController<T>();
  int pending = sources.length;

  for (final source in sources) {
    source.listen(
      controller.add,
      onError: controller.addError,
      onDone: () {
        if (--pending == 0) controller.close(); // last source finished
      },
    );
  }
  return controller.stream;
}

void main() async {
  final slow = Stream.periodic(
      const Duration(milliseconds: 400), (i) => 'slow-$i').take(3);
  final fast = Stream.periodic(
      const Duration(milliseconds: 150), (i) => 'fast-$i').take(5);

  await for (final event in merge([slow, fast])) {
    print(event);
    // fast-0, fast-1, slow-0, fast-2, fast-3, slow-1, fast-4, slow-2
  }
  print('All sources complete.');
}

The generic merge<T> function works with any stream type. Errors from any source are forwarded to the merged stream unchanged. Because all subscriptions are started immediately, the sources run in parallel on the event loop with no additional threads required.

Source

StreamController class documentation, Dart streams tutorial, Creating streams in Dart

In this tutorial we covered the StreamController class: how to create single-subscription and broadcast controllers, add data and errors, handle backpressure with lifecycle callbacks, build practical examples like timer streams and event buses, wrap callback-based APIs, pipe streams sequentially with addStream, and merge concurrent streams.

Author

My name is Jan Bodnar, and I am a passionate programmer with extensive programming experience. I have been writing programming articles since 2007. To date, I have authored over 1,400 articles and 8 e-books. I possess more than ten years of experience in teaching programming.

List all Dart tutorials.