How to Transform Dart Streams: A Basic Guide to Stream Operators

Explores Some of the Basic Stream Operators to Transform the Stream.
Jul 25 2023 · 5 min read

Background

According to the Flutter documentation, streams provide an essential feature for handling asynchronous sequences of data. It provides a way to handle and process a continuous flow of data, where each event can be asynchronously received and processed.

To use the streams to their full potential, we can use stream operators. These operators are the methods that allow us to transform, filter, process, and combine stream data efficiently.

In this article, we’ll take a tour of some operators/methods with examples that are available in the Stream class.

Sponsored

Positive thinking leads to positive outcomes. Try out Justly, build good habits, and start thinking positively today!

Stream Operators

asyncMap

Stream<E> asyncMap<E>(FutureOr<E> Function(List<String>) convert)

The asyncMap operator takes a source stream and creates a new stream by mapping each event of the source stream to a new event. It behaves similarly to a map, but with a mapping function convert that can return a Future and the stream waits for that Future to complete before emitting the result.

void main()async {
  final dataStream= Stream.fromIterable([1,2,3,4,5]);
  final stream= dataStream.asyncMap((data)async=>processData(data));
}

Future<int> processData(int data) async {
  // Simulate an asynchronous operation
  await Future.delayed(Duration(seconds: data));
  print('ProcessedData: $data');
  return data;  
}

//prints data with 1 second interval
ProcessedData: 1
ProcessedData: 2
ProcessedData: 3
ProcessedData: 4
ProcessedData: 5

ayncMap is particularly useful when need to perform some asynchronous API Calls on each event of a stream.

asyncExpand

Stream<E> asyncExpand<E>(Stream<E>? Function(List<String>) convert)

The asyncExpand operator applies an asynchronous mapping function convert to each event of the source stream, where the mapping function returns a new stream.

Stream<List<int>> getNestedStream() async* {
  for (int i = 1; i <= 3; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield List<int>.generate(i, (index) => index + 1);
  }
}

void main() {
  getNestedStream()
      .asyncExpand((nestedList) => Stream.fromIterable(nestedList))
      .listen((value) {
    print(value); // Prints: 1, 1, 2, 1, 2, 3
  });
}

asyncExpand is useful when you want to transform each event into a stream and then combine those streams into a single output stream, flattening any nested structure. This is useful when you want to perform operations on each event, such as making asynchronous API calls, database queries, or performing computationally expensive calculations.

However, it’s important to note that the result stream will only move on to the next event in the source stream if the substream of the first event has closed.

StreamTransformer

Stream<S> transform<S>(StreamTransformer<List<String>, S> streamTransformer)

It simply transforms a stream. You might find yourself using the streamTransformer class multiple times. When you have a stream and want to apply specific transformations or operations on the emitted data, you can use StreamTansformer.

To use StreamTransformer, we have to provide the input and output type of the transformation and implement the transformation logic within the StreamTransformer’s bind method. the bind method is called when the stream.tansform method is invoked on a stream within the StreamTransformer.

void main() {
  final sourceStream = Stream.fromIterable([1, 2, 3, 4]);
  final transformer = StreamTransformer<int, String>.fromHandlers(
    handleData: (data, sink) {
      if (data % 2 == 0) {
        sink.add('Even: $data');
      } else {
        sink.add('Odd: $data');
      }
    },
  );

  final transformedStream = sourceStream.transform(transformer);

  transformedStream.listen((data) {
    print(data);
  });
}

//Prints:
Odd: 1
Even: 2
Odd: 3
Even: 4


StreamTransformer provides a way to customize the behavior of the stream, apply a transformation, perform validation, and handle errors.

pipe

Future<dynamic> pipe(StreamConsumer<List<String>> streamConsumer)

The pipe() method in streams is commonly used for efficiently transferring data from one stream to another.

import 'dart:async';

void main() {
  final sourceStream = Stream.fromIterable(['How', 'are', 'you']);
  final destinationStreamController = new StreamController<String>();   
  
sourceStream.transform(_capitalizeTransformer()).pipe(destinationStreamController);

destinationStreamController.stream.listen((data) {
    print(data); // Prints: HOW, ARE, YOU
  });
}

StreamTransformer<String, String> _capitalizeTransformer() {
  return StreamTransformer<String, String>.fromHandlers(
    handleData: (data, sink) {
      final capitalizedData = data.toUpperCase();
      sink.add(capitalizedData);
    },
  )

Yes, we can do it by calling destinationStreamController.add() , but the destinationStreamController will still be open after the stream is done but sourceStream.pipe(destinationStreamCotroller) will close() the destinationStreamController once the stream is finished.

pipe() enables you to compose complex data processing pipelines by connecting multiple streams together. You can transform, filter, or manipulate the data in each stream using various stream operators and transformers, and then pass the transformed data to the next stream in the pipeline using pipe().

singleWhere

Future<List<String>> singleWhere(bool Function(List<String>) test, {List<String> Function()? orElse})

The singleWhere method is used in streams to find a single element that matches a specified condition. It searches for the first element that satisfies the given condition and returns it, If no matching element is found or if multiple elements match the condition, it throws an error.

void main() async{
  final stream = Stream.fromIterable([
    {'name': 'Alice', 'age': 25},
    {'name': 'Bob', 'age': 30},
    {'name': 'Charlie', 'age': 35},
  ]);
  try {
    final person = await  stream.singleWhere((element) => element['age'] == 30);
    print(person); // Output: {name: Bob, age: 30}
  } catch (e) {
    print(e); // Output: Bad state: No element
  }

It calls data asynchronously because singleWhere needs to wait for the stream to emit the matching element, and if no match is found, it needs to wait until the stream is done emitting elements.

cast

Stream<R> cast<R>()

This stream is wrapped as a Stream<R> which checks at run-time that each data event emitted by this stream is also an instance of R.

By using cast() operator you can ensure that the events emitted by the stream are of the correct type to prevent type errors and allows you to work with the stream’s data in a type-safe manner.

At runtime, it checks each data event emitted by the stream. If an event cannot be cast to the desired type, it will be excluded from the resulting stream, or it may result in an error depending on how the stream processing is handled.

import 'dart:async';
void main() {
  final numbersStream = Stream<dynamic>.fromIterable([1,2,3.5,4,5]);

  final transformer = StreamTransformer<int, int>.fromHandlers(
    handleData: (int, sink) {
      sink.add(int*2);
    },
  );

  final adaptedStream = numbersStream.cast<int>().transform(transformer);

    adaptedStream.listen((event) { 
      print(event);//prints: 2 4 8 10
    },onError: (e,s){
   //if we add print here
    print(e.toString);  //prints TypeError: 3.5: type 'double' is not a subtype of type 'int'
    });
  
}

Everything works and the type system is happy. .😊 😊 !!

The cast() operator doesn’t modify the stream. Instead, it creates a new stream that emits events of the desired type. the original stream remains unchanged, and the new stream with the desired type is used for subsequent operations.

distinct

Stream<List<String>> distinct([bool Function(List<String>, List<String>)? equals])

The distinct() method returns a stream with unique elements. It is useful when we need to remove duplicate elements from the stream. The distinct() method has equals parameter function that checks if the previous and current value is the same.

void main() async {
  
  Stream<Person> personStream = Stream.fromIterable([
                                       Person( "Lokesh", "Gupta"),
                                       Person( "Lokesh", "Gupta"),
                                       Person( "Brian", "Clooney"),
                                       Person( "Brian", "Clooney"),
                                       Person( "Alex", "Kolen")
                                ]);
  
  
  personStream.distinct((previous, next)=>
    previous.name==next.name && previous.surname==next.surname
  ).listen((person)=>print(person.name));//Prints: Lokesh Brian Alex

}

skip

Stream<List<String>> skip(int count)

The skip(int count) method returns a stream consisting of the remaining elements after the first count elements. If the stream contains fewer than count elements, we’ll get an empty stream.

void main() {
  final numbersStream = Stream<int>.fromIterable([1,2,3,4,5]);
  
    numbersStream.skip(2).listen((event) { 
      print(event)
    },onError: (e,s){
      print(e.toString());
    });  
}
//Prints: 3 4 5 

Conclusion

Streams are very helpful when you want to listen for changes in your data and react according to them and stream operators add functionality to the stream. In this article, we’ve explored a few of these operators of the stream.

So, this is all for this article. If you have any questions or feedback, feel free to leave them in the comments section.

Let’s experiment with these operators, explore their different use cases, and adapt them to fit your specific needs.

Keep Streaming..!! 👋 👋


sneha-s image
Sneha Sanghani
Flutter developer | Writing a Blog on Flutter development


sneha-s image
Sneha Sanghani
Flutter developer | Writing a Blog on Flutter development

footer
Subscribe Here!
Follow us on
2024 Canopas Software LLP. All rights reserved.