April 30, 2021

DartのStreamでRxのBehavior Subjectを再現する

Dartには標準ライブラリのasyncパッケージにStream クラスが含まれています。
これはこれでとても便利なのですが、Androidで長く使っていたせいで非同期処理の操作がRx脳になっているので、時々こういうのはRxではできるのになーって思うことが多々あります。 Rxで個人的に一番好きなクラスはBehaviorSubjectでStreamの最後の値をキャッシュしておいて、Subscribe(Listen)と同時に最後の値をemitしてくれるものです。
オペレータだとdistinctUntilChangedscanwithLatestFron(combine)です。

もちろんRxDart も存在していてそちらを使うことも可能です。
ただ、DartのStreamはRxにあるBehaviorSubjectのように、最後の値をキャッシュしてSubscribeと同時に値をemitするような関数は用意されていません。
BehaviorSubjectのためにわざわざ導入するのも微妙なので、今回はRxを使わずにDartのasyncパッケージにあるStreamだけでBehavior Subjectを作ってみます。

Stream

まずは通常のStreamの使い方を見てみましょう。
StreamControllerを生成して、読み込み側はlisten()で値の読み取りをします。
送信側はsink.add()で値を送ります。
Rxを使ったことがあるなら特に違和感なく使えると思います。
使い終わったStreamControllerはclose()するようにしましょう。

final StreamController<int> controller = StreamController();

controller.stream.listen((data) {
  print('Receive: $data');
});

controller.sink.add(0);
controller.sink.add(1);

controller.close();

// Receive: 0 
// Receive: 1 

Stream自体に値のキャッシュ機能はないので、Listenより前にStreamに値がemitされても受け取ることはできません。

final StreamController<int> controller = StreamController();

controller.sink.add(0);

controller.stream.listen((data) {
  print('Receive: $data');
});

controller.close();

複数のObserverがいる場合は StreamController.broadcast()にする必要があります。
broadcastを使わないと、Stream has already been listened でエラーになります。 つまり一度作成したStreamControllerを複数回Listenはできません。
既にListenされているかは StreamController.hasListener() で調べることができます。

final StreamController<int> controller = StreamController.broadcast();

controller.stream.listen((data) {
  print('Receive a: $data');
});
controller.stream.listen((data) {
  print('Receive b: $data');
});

controller.sink.add(0);
controller.sink.add(1);

controller.close();

// Receive a: 0 
// Receive b: 0 
// Receive a: 1 
// Receive b: 1 

Stream.multi

今回のポイントは Streamクラスに用意されている Stream.multi constructor になります。 これはDart2.9から導入されました。
Stream.multiとは、作成されたStreamがListenされるたびに、onListenコールバックが新しいMulti Stream Controllerで呼び出され、そのListenの呼び出しによって返されたStreamSubscriptionにイベントが転送されます。

Stream.multiの挙動はこんな感じです。
factoryコンストラクタのコールバックとしてMultiStreamControllerが返ってくるので、このStreamControllerに対して値をemitしてあげます。

final Stream<int> stream = Stream.multi((MultiStreamController controller) {
  controller.onCancel = () {
    controller.close();
  };
  controller.add(0);
});

stream.listen((data) {
  print('Receive a: $data');
});
stream.listen((data) {
  print('Receive b: $data');
});

// Receive a: 0 
// Receive b: 0 

完成形

Stream.multのコールバックにあるcontrollerをうまく使えば、最後の値をキャッシュしてListenと同時に値をemitしてあげることができそうです。
最終的にはこんな感じになります。

class BehaviorStreamNotifier<T> {
  BehaviorStreamNotifier({
    T? initialValue,
  }) : _latestValue = initialValue;

  final Set<MultiStreamController<T>> _controllers = {};

  T? _latestValue;

  Stream<T> get stream {
    return Stream.multi((controller) {
      controller.onCancel = () {
        controller.close();
        _controllers.remove(controller);
      };
      final value = _latestValue;
      if (value != null) {
        controller.add(value);
      }
      _controllers.add(controller);
    });
  }

  void add(T value) {
    _latestValue = value;
    for (final controller in _controllers) {
      controller.add(value);
    }
  }

  Future<void> close() async {
    await Future.wait<void>(_controllers.map((c) {
      c.onCancel = null;
      return c.close();
    }));
    _controllers.clear();
  }
}

使い方

使い方はStreamとほぼ同じです。

Dart標準のStreamの場合はListen前にemitされた値が受け取れていませんでしたが、Stream.multiを使って作った自作BehaviorStreamNotifierの場合は受け取れています。

final behaviorStreamNotifier = BehaviorStreamNotifier<int>();

behaviorStreamNotifier.stream.listen((data) {
  print('Receive a: $data');
});
behaviorStreamNotifier.add(100);
behaviorStreamNotifier.stream.listen((data) {
  print('Receive b: $data');
});


// Receive b: 100
// Receive a: 100

また、initialValueも取れるようになっているので、初期値を与えることも可能です。

final behaviorStreamNotifier = BehaviorStreamNotifier<int>(initialValue: 0);

behaviorStreamNotifier.stream.listen((data) {
  print('Receive a: $data');
});
behaviorStreamNotifier.add(100);
behaviorStreamNotifier.stream.listen((data) {
  print('Receive b: $data');
});


// Receive a: 0
// Receive b: 100
// Receive a: 100

DartのStreamをBehaviorSubjectみたいな使い方したいんだけどなーという方の参考になれば幸いです。

© AAkira 2023