Dart StreamController
last modified May 29, 2026
Introduction
A StreamController is the object that gives you manual control over
a Stream. While many streams come from built-in sources
(files, sockets, timers), a StreamController lets you build a
stream from scratch: you decide when data, errors, or the done signal are
delivered. It is the bridge between imperative code and the reactive stream
world.
Common use cases include adapting callback-based APIs
into streams, implementing event buses for communication
between app components, and creating custom asynchronous data
sources. The controller exposes a sink to push events
in, and a stream that listeners subscribe to. This separation
keeps the producer side neatly isolated from the consumer.
import 'dart:async';
void main() {
// Create a single-subscription controller.
final controller = StreamController<int>();
// Get the stream we want to listen to.
final stream = controller.stream;
// Subscribe before adding any events.
stream.listen(
(data) => print('Received: $data'),
onDone: () => print('Stream closed.'),
onError: (e) => print('Error: $e'),
);
// Add some data through the sink.
controller.sink.add(1);
controller.sink.add(2);
controller.sink.add(3);
// Close the controller → stream emits done.
controller.close();
// Output:
// Received: 1
// Received: 2
// Received: 3
// Stream closed.
}
The controller must be closed when no more events will be added. Forgetting
to close it keeps the stream open forever and may leak resources. The
sink is a convenience getter that returns the controller
itself (a StreamSink), allowing method chaining.
Single-subscription vs. broadcast streams
A single-subscription stream allows only one listener at a time. A broadcast stream can have many listeners, and events are delivered to all current subscribers. Which one you need depends on the use case.
| Feature | StreamController() | StreamController.broadcast() |
|---|---|---|
| Listeners | Only one | Many |
| Listening after events | Not allowed (only during stream lifetime) | Possible, but misses earlier events |
| onListen callback | Called once | Called for each new listener |
| Typical use | One-shot async operations, request/response | Event buses, UI events, multi-listener scenarios |
The table above highlights the fundamental difference. For most custom
streams that wrap a single data source, the plain
StreamController() is the correct choice.
import 'dart:async';
void main() {
final controller = StreamController<String>.broadcast();
// First subscriber
controller.stream.listen((data) => print('Sub1: $data'));
controller.sink.add('Hello');
// Second subscriber – misses "Hello"
controller.stream.listen((data) => print('Sub2: $data'));
controller.sink.add('World');
// Close after all listeners are done (usually not this soon in real apps)
controller.close();
// Output:
// Sub1: Hello
// Sub1: World
// Sub2: World
}
Notice that the second subscriber only receives events added after it subscribed. The “Hello” event was already delivered to the first listener and is not replayed. Broadcast controllers are inherently live; they do not cache old events.
Lifecycle callbacks and backpressure
A StreamController accepts optional callback arguments:
onListen, onPause, onResume, and
onCancel. These let the producer react to changes in the
listener’s state — for example, pausing an expensive data source when
the consumer is overwhelmed.
import 'dart:async';
void main() {
final controller = StreamController<int>(
onListen: () => print('Listener attached.'),
onPause: () => print('Stream paused.'),
onResume: () => print('Stream resumed.'),
onCancel: () => print('Listener removed.'),
);
final subscription = controller.stream.listen(
(data) {
print('Got: $data');
// Simulate slow processing → backpressure
if (data == 2) subscription.pause();
if (data == 4) subscription.resume();
},
onDone: () => print('Done.'),
);
for (int i = 1; i <= 5; i++) {
controller.sink.add(i);
}
controller.close();
}
When the listener calls subscription.pause() the controller’s
onPause callback fires, and it stops forwarding events until
subscription.resume() is called. The events themselves are
buffered by the controller (within reasonable limits), so no data is lost.
This mechanism is crucial for adapting push-based sources (like hardware
sensors) that might otherwise flood a slow consumer.
You can also create a controller with a custom onCancel that
cleans up external resources — for example, closing a file or cancelling a
timer. This makes the stream a self-contained lifecycle owner.
Adding errors
Streams can also carry errors, which are delivered to the listener’s
onError handler. Use controller.sink.addError(error,
[stackTrace]) to push an error. If no error handler is provided, the
error propagates to the zone’s uncaught error handler.
import 'dart:async';
void main() {
final controller = StreamController<int>();
controller.stream.listen(
(data) => print('Data: $data'),
onError: (e, s) => print('Caught: $e'),
onDone: () => print('Done.'),
);
controller.sink.add(10);
controller.sink.addError('Something went wrong');
controller.sink.add(20);
controller.close();
}
Errors do not close the stream by default. The stream continues to deliver subsequent data or errors unless the controller is explicitly closed. This behaviour mirrors that of other Dart streams: an error is a “stumble”, not a terminal event.
Practical example — simple timer stream
A timer that periodically emits a value is a perfect demonstration of a
controller adapting an imperative construct (Timer.periodic)
into a stream. The controller is created, the timer pushes ticks, and when
the listener cancels, the timer is stopped.
import 'dart:async';
/// Returns a stream that emits an incrementing integer every second.
Stream<int> counterStream([int maxCount = 5]) {
final controller = StreamController<int>(
onCancel: () => print('Timer cleaned up.'),
);
int count = 1;
final timer = Timer.periodic(const Duration(seconds: 1), (timer) {
controller.sink.add(count);
count++;
if (count > maxCount) {
timer.cancel();
controller.close();
}
});
// Ensure the timer is cancelled if the listener cancels early.
controller.onCancel = () => timer.cancel();
return controller.stream;
}
void main() {
final stream = counterStream(3);
final subscription = stream.listen(
(tick) => print('Tick $tick'),
onDone: () => print('Counter finished.'),
);
// Optional: cancel after 2 seconds to see early cleanup.
// Timer(Duration(seconds: 2), () => subscription.cancel());
}
The timer is created inside the function, but its lifetime is tied to the
stream: if the listener cancels early, the onCancel callback
stops the timer. The controller is closed from inside the timer when the
maximum count is reached, signalling completion to the listener.
Practical example — event bus
An event bus is a classic use for a broadcast controller. Different parts of an application can fire events and other parts can listen without knowing about each other, achieving loose coupling.
import 'dart:async';
class EventBus {
final _controller = StreamController<dynamic>.broadcast();
/// Listen to all events of any type.
Stream<dynamic> get events => _controller.stream;
/// Fire an event onto the bus.
void emit(dynamic event) => _controller.sink.add(event);
/// Filter the stream by event type.
Stream<T> onEvent<T>() => events.where((e) => e is T).cast<T>();
void dispose() => _controller.close();
}
void main() {
final bus = EventBus();
// Widget A listens for login events
bus.onEvent<LoginEvent>().listen((e) =>
print('Widget A: User ${e.username} logged in.'),
);
// Service B also listens for login events
bus.onEvent<LoginEvent>().listen((e) =>
print('Service B: Sending welcome email to ${e.username}.'),
);
// Somewhere in the app, a login occurs
bus.emit(LoginEvent('alice'));
bus.dispose();
}
class LoginEvent {
final String username;
LoginEvent(this.username);
}
The event bus is a thin wrapper around a broadcast
StreamController. The onEvent<T>() helper
filters the stream so subscribers only receive events of a specific type.
This pattern scales well and avoids tight coupling between producers and
consumers.
Limitations and common pitfalls
- Always close the controller. An unclosed controller
holds resources and may prevent the program from exiting. Use
onCancelto clean up external resources when the listener disconnects. - No events after close. Adding data or errors to a
closed controller throws a
StateError. Checkcontroller.isClosedbefore adding, or usecontroller.sink.add()which silently ignores the call if the sink is closed. - Broadcast listeners miss earlier events. A new
subscriber to a broadcast stream receives only future events. If
history is needed, consider using a
StreamControllerwith a customonListenthat replays stored data. - Pause/resume buffering. The default controller
buffers events when the subscription is paused. For unbounded sources,
this can cause memory exhaustion. Use a stream transformer like
stream.debounce()or set a customonPausethat tells the producer to stop. - Single-subscription with multiple listeners.
Attempting to listen twice on a single-subscription stream throws
a
StateError. Always use broadcast controllers for multiple listeners.
import 'dart:async';
void main() {
final controller = StreamController<int>();
controller.close();
// controller.sink.add(1); // throws StateError
// controller.sink.addError('oops'); // throws StateError
print('Controller closed: ${controller.isClosed}'); // true
}
The example above shows that once closed, any attempt to add data or errors
fails. The isClosed property is a quick check to avoid such
exceptions in code that may be called asynchronously after the controller
has been shut down.
Complete runnable example — multi-feature demo
The following program brings together all the concepts: a broadcast controller with lifecycle logging, backpressure handling, error injection, and proper cleanup. It simulates a data source that emits numbers, pauses and resumes on demand, and finally closes gracefully.
import 'dart:async';
import 'dart:io';
void main() {
final controller = StreamController<int>.broadcast(
onListen: () => stdout.writeln('🎧 Listener attached.'),
onPause: () => stdout.writeln('⏸️ Stream paused.'),
onResume: () => stdout.writeln('▶️ Stream resumed.'),
onCancel: () => stdout.writeln('🛑 Listener cancelled.'),
);
// First listener: processes slowly to trigger backpressure
late StreamSubscription sub1;
sub1 = controller.stream.listen(
(data) {
stdout.writeln('Sub1 received: $data');
// Pause when value reaches 5, resume after 1 second
if (data == 5) {
sub1.pause();
Timer(const Duration(seconds: 1), () => sub1.resume());
}
},
onError: (e) => stdout.writeln('Sub1 error: $e'),
onDone: () => stdout.writeln('Sub1 done.'),
);
// Second listener (broadcast allows many)
controller.stream.listen(
(data) => stdout.writeln('Sub2 also got: $data'),
onError: (e) => stdout.writeln('Sub2 error: $e'),
onDone: () => stdout.writeln('Sub2 done.'),
);
// Producer side: push events with a small delay
int count = 1;
Timer.periodic(const Duration(milliseconds: 300), (timer) {
if (controller.isClosed) {
timer.cancel();
return;
}
if (count == 8) {
controller.sink.addError('Artificial error at count $count');
count++;
} else if (count <= 10) {
controller.sink.add(count++);
} else {
controller.close();
timer.cancel();
}
});
// Keep the program alive until the stream is done
controller.done.then((_) => stdout.writeln('✅ Controller closed.'));
}
Run the programme and observe the terminal output. The two subscribers receive the same events simultaneously. When subscriber 1 pauses at value 5, the controller logs the pause; subsequent values are buffered until the resume callback fires. The error event at count 8 is delivered to both listeners without terminating the stream. The stream finishes naturally when the controller is closed after the tenth value.
Wrapping a callback-based API
One of the most common reasons to reach for a StreamController
is adapting a legacy callback-based API into a stream. The controller
receives the callbacks from the old API and forwards them into the stream
world, while keeping the calling code clean and idiomatic.
import 'dart:async';
// Simulates a legacy callback-based reader (could be FFI, native plugin, etc.).
void legacyRead(void Function(String line) onLine, void Function() onDone) {
const lines = ['alpha', 'beta', 'gamma', 'delta'];
var i = 0;
Timer.periodic(const Duration(milliseconds: 100), (timer) {
if (i < lines.length) {
onLine(lines[i++]);
} else {
timer.cancel();
onDone();
}
});
}
// Wraps the callback API in a clean Stream<String>.
Stream<String> linesStream() {
final controller = StreamController<String>();
legacyRead(
(line) => controller.add(line), // onLine → push data
() => controller.close(), // onDone → signal completion
);
return controller.stream;
}
void main() async {
await for (final line in linesStream()) {
print(line); // alpha, beta, gamma, delta
}
}
The wrapper function starts the legacy source, wires its callbacks into the
controller's sink, and returns the stream before any events have arrived.
Callers see a plain Stream<String> with no knowledge of
the underlying callback mechanism.
Piping streams with addStream
StreamController.addStream(source) forwards all events from
source into the controller and returns a Future
that completes when source is done. Awaiting it in a loop makes
it straightforward to concatenate multiple streams sequentially without
buffering them in memory.
import 'dart:async';
// Concatenate multiple streams sequentially into one.
Stream<int> concatenate(List<Stream<int>> sources) {
final controller = StreamController<int>();
// Pipe each source in turn; close the controller when all are done.
Future<void> pipeAll() async {
for (final source in sources) {
await controller.addStream(source);
}
controller.close();
}
pipeAll(); // start piping; do not await here so the stream can be returned
return controller.stream;
}
void main() async {
final all = concatenate([
Stream.fromIterable([1, 2, 3]),
Stream.fromIterable([4, 5]),
Stream.fromIterable([6, 7, 8, 9]),
]);
await for (final value in all) {
print(value); // 1 2 3 4 5 6 7 8 9 – always in order
}
}
Because addStream suspends the loop until each source
completes, the output order is deterministic. A single-subscription
controller is the right choice here; the concatenated stream has exactly
one consumer.
Merging concurrent streams
Concatenation is sequential, but sometimes you want to interleave events from multiple sources as they arrive — a fan-in merge. A broadcast or single-subscription controller can be used depending on whether the result needs one or many listeners. The controller closes only after every source has completed.
import 'dart:async';
// Merge multiple streams concurrently; events interleave as they arrive.
// The output stream closes when ALL source streams have completed.
Stream<T> merge<T>(List<Stream<T>> sources) {
final controller = StreamController<T>();
int pending = sources.length;
for (final source in sources) {
source.listen(
controller.add,
onError: controller.addError,
onDone: () {
if (--pending == 0) controller.close(); // last source finished
},
);
}
return controller.stream;
}
void main() async {
final slow = Stream.periodic(
const Duration(milliseconds: 400), (i) => 'slow-$i').take(3);
final fast = Stream.periodic(
const Duration(milliseconds: 150), (i) => 'fast-$i').take(5);
await for (final event in merge([slow, fast])) {
print(event);
// fast-0, fast-1, slow-0, fast-2, fast-3, slow-1, fast-4, slow-2
}
print('All sources complete.');
}
The generic merge<T> function works with any stream type.
Errors from any source are forwarded to the merged stream unchanged. Because
all subscriptions are started immediately, the sources run in parallel on
the event loop with no additional threads required.
Source
StreamController class documentation, Dart streams tutorial, Creating streams in Dart
In this tutorial we covered the StreamController class: how to
create single-subscription and broadcast controllers, add data and errors,
handle backpressure with lifecycle callbacks, build practical examples
like timer streams and event buses, wrap callback-based APIs, pipe streams
sequentially with addStream, and merge concurrent streams.
Author
List all Dart tutorials.