According to the Flutter documentation, streams provide an essential feature for handling asynchronous sequences of data. It provides a way to handle and process a continuous flow of data, where each event can be asynchronously received and processed.
To use the streams to their full potential, we can use stream operators. These operators are the methods that allow us to transform, filter, process, and combine stream data efficiently.
In this article, we’ll take a tour of some operators/methods with examples that are available in the Stream class.
Positive thinking leads to positive outcomes. Try out Justly, build good habits, and start thinking positively today!
Stream<E> asyncMap<E>(FutureOr<E> Function(List<String>) convert)
The asyncMap
operator takes a source stream and creates a new stream by mapping each event of the source stream to a new event. It behaves similarly to a map, but with a mapping function convert
that can return a Future
and the stream waits for that Future to complete before emitting the result.
void main()async {
final dataStream= Stream.fromIterable([1,2,3,4,5]);
final stream= dataStream.asyncMap((data)async=>processData(data));
}
Future<int> processData(int data) async {
// Simulate an asynchronous operation
await Future.delayed(Duration(seconds: data));
print('ProcessedData: $data');
return data;
}
//prints data with 1 second interval
ProcessedData: 1
ProcessedData: 2
ProcessedData: 3
ProcessedData: 4
ProcessedData: 5
ayncMap
is particularly useful when need to perform some asynchronous API Calls on each event of a stream.
Stream<E> asyncExpand<E>(Stream<E>? Function(List<String>) convert)
The asyncExpand
operator applies an asynchronous mapping function convert to each event of the source stream, where the mapping function returns a new stream.
Stream<List<int>> getNestedStream() async* {
for (int i = 1; i <= 3; i++) {
await Future.delayed(Duration(seconds: 1));
yield List<int>.generate(i, (index) => index + 1);
}
}
void main() {
getNestedStream()
.asyncExpand((nestedList) => Stream.fromIterable(nestedList))
.listen((value) {
print(value); // Prints: 1, 1, 2, 1, 2, 3
});
}
asyncExpand
is useful when you want to transform each event into a stream and then combine those streams into a single output stream, flattening any nested structure. This is useful when you want to perform operations on each event, such as making asynchronous API calls, database queries, or performing computationally expensive calculations.
However, it’s important to note that the result stream will only move on to the next event in the source stream if the substream of the first event has closed.
Stream<S> transform<S>(StreamTransformer<List<String>, S> streamTransformer)
It simply transforms a stream. You might find yourself using the streamTransformer class multiple times. When you have a stream and want to apply specific transformations or operations on the emitted data, you can use StreamTansformer
.
To use StreamTransformer
, we have to provide the input and output type of the transformation and implement the transformation logic within the StreamTransformer’s bind
method. the bind
method is called when the stream.tansform
method is invoked on a stream within the StreamTransformer
.
void main() {
final sourceStream = Stream.fromIterable([1, 2, 3, 4]);
final transformer = StreamTransformer<int, String>.fromHandlers(
handleData: (data, sink) {
if (data % 2 == 0) {
sink.add('Even: $data');
} else {
sink.add('Odd: $data');
}
},
);
final transformedStream = sourceStream.transform(transformer);
transformedStream.listen((data) {
print(data);
});
}
//Prints:
Odd: 1
Even: 2
Odd: 3
Even: 4
StreamTransformer
provides a way to customize the behavior of the stream, apply a transformation, perform validation, and handle errors.
Future<dynamic> pipe(StreamConsumer<List<String>> streamConsumer)
The pipe()
method in streams is commonly used for efficiently transferring data from one stream to another.
import 'dart:async';
void main() {
final sourceStream = Stream.fromIterable(['How', 'are', 'you']);
final destinationStreamController = new StreamController<String>();
sourceStream.transform(_capitalizeTransformer()).pipe(destinationStreamController);
destinationStreamController.stream.listen((data) {
print(data); // Prints: HOW, ARE, YOU
});
}
StreamTransformer<String, String> _capitalizeTransformer() {
return StreamTransformer<String, String>.fromHandlers(
handleData: (data, sink) {
final capitalizedData = data.toUpperCase();
sink.add(capitalizedData);
},
)
Yes, we can do it by calling destinationStreamController.add()
, but the destinationStreamController
will still be open after the stream is done but sourceStream.pipe(destinationStreamCotroller)
will close()
the destinationStreamController
once the stream is finished.
pipe()
enables you to compose complex data processing pipelines by connecting multiple streams together. You can transform, filter, or manipulate the data in each stream using various stream operators and transformers, and then pass the transformed data to the next stream in the pipeline using pipe()
.
Future<List<String>> singleWhere(bool Function(List<String>) test, {List<String> Function()? orElse})
The singleWhere
method is used in streams to find a single element that matches a specified condition. It searches for the first element that satisfies the given condition and returns it, If no matching element is found or if multiple elements match the condition, it throws an error.
void main() async{
final stream = Stream.fromIterable([
{'name': 'Alice', 'age': 25},
{'name': 'Bob', 'age': 30},
{'name': 'Charlie', 'age': 35},
]);
try {
final person = await stream.singleWhere((element) => element['age'] == 30);
print(person); // Output: {name: Bob, age: 30}
} catch (e) {
print(e); // Output: Bad state: No element
}
It calls data asynchronously because singleWhere
needs to wait for the stream to emit the matching element, and if no match is found, it needs to wait until the stream is done emitting elements.
Stream<R> cast<R>()
This stream is wrapped as a Stream<R>
which checks at run-time that each data event emitted by this stream is also an instance of R
.
By using cast()
operator you can ensure that the events emitted by the stream are of the correct type to prevent type errors and allows you to work with the stream’s data in a type-safe manner.
At runtime, it checks each data event emitted by the stream. If an event cannot be cast to the desired type, it will be excluded from the resulting stream, or it may result in an error depending on how the stream processing is handled.
import 'dart:async';
void main() {
final numbersStream = Stream<dynamic>.fromIterable([1,2,3.5,4,5]);
final transformer = StreamTransformer<int, int>.fromHandlers(
handleData: (int, sink) {
sink.add(int*2);
},
);
final adaptedStream = numbersStream.cast<int>().transform(transformer);
adaptedStream.listen((event) {
print(event);//prints: 2 4 8 10
},onError: (e,s){
//if we add print here
print(e.toString); //prints TypeError: 3.5: type 'double' is not a subtype of type 'int'
});
}
Everything works and the type system is happy. .😊 😊 !!
The cast()
operator doesn’t modify the stream. Instead, it creates a new stream that emits events of the desired type. the original stream remains unchanged, and the new stream with the desired type is used for subsequent operations.
Stream<List<String>> distinct([bool Function(List<String>, List<String>)? equals])
The distinct()
method returns a stream with unique elements. It is useful when we need to remove duplicate elements from the stream. The distinct()
method has equals parameter function that checks if the previous and current value is the same.
void main() async {
Stream<Person> personStream = Stream.fromIterable([
Person( "Lokesh", "Gupta"),
Person( "Lokesh", "Gupta"),
Person( "Brian", "Clooney"),
Person( "Brian", "Clooney"),
Person( "Alex", "Kolen")
]);
personStream.distinct((previous, next)=>
previous.name==next.name && previous.surname==next.surname
).listen((person)=>print(person.name));//Prints: Lokesh Brian Alex
}
Stream<List<String>> skip(int count)
The skip(int count)
method returns a stream consisting of the remaining elements after the first count elements. If the stream contains fewer than count elements, we’ll get an empty stream.
void main() {
final numbersStream = Stream<int>.fromIterable([1,2,3,4,5]);
numbersStream.skip(2).listen((event) {
print(event)
},onError: (e,s){
print(e.toString());
});
}
//Prints: 3 4 5
Streams are very helpful when you want to listen for changes in your data and react according to them and stream operators add functionality to the stream.
We’re Grateful to have you with us on this journey!
Suggestions and feedback are more than welcome!
Please reach us at Canopas Twitter handle @canopas_eng with your content or feedback. Your input enriches our content and fuels our motivation to create more valuable and informative articles for you.
Keep Streaming..!! 👋 👋