Dartには標準ライブラリのasyncパッケージにStream
クラスが含まれています。
これはこれでとても便利なのですが、Androidで長く使っていたせいで非同期処理の操作がRx脳になっているので、時々こういうのはRxではできるのになーって思うことが多々あります。
Rxで個人的に一番好きなクラスはBehaviorSubject
でStreamの最後の値をキャッシュしておいて、Subscribe(Listen)と同時に最後の値をemitしてくれるものです。
オペレータだとdistinctUntilChanged
かscan
かwithLatestFron(combine)
です。
もちろんRxDart
も存在していてそちらを使うことも可能です。
ただ、DartのStreamはRxにあるBehaviorSubject
のように、最後の値をキャッシュしてSubscribeと同時に値をemitするような関数は用意されていません。
BehaviorSubjectのためにわざわざ導入するのも微妙なので、今回はRxを使わずにDartのasyncパッケージにあるStreamだけでBehavior Subjectを作ってみます。
Stream
まずは通常のStreamの使い方を見てみましょう。
StreamControllerを生成して、読み込み側はlisten()
で値の読み取りをします。
送信側はsink.add()
で値を送ります。
Rxを使ったことがあるなら特に違和感なく使えると思います。
使い終わったStreamControllerはclose()
するようにしましょう。
final StreamController<int> controller = StreamController();
controller.stream.listen((data) {
print('Receive: $data');
});
controller.sink.add(0);
controller.sink.add(1);
controller.close();
// Receive: 0
// Receive: 1
Stream自体に値のキャッシュ機能はないので、Listenより前にStreamに値がemitされても受け取ることはできません。
final StreamController<int> controller = StreamController();
controller.sink.add(0);
controller.stream.listen((data) {
print('Receive: $data');
});
controller.close();
複数のObserverがいる場合は StreamController.broadcast()
にする必要があります。
broadcastを使わないと、Stream has already been listened
でエラーになります。
つまり一度作成したStreamControllerを複数回Listenはできません。
既にListenされているかは StreamController.hasListener()
で調べることができます。
final StreamController<int> controller = StreamController.broadcast();
controller.stream.listen((data) {
print('Receive a: $data');
});
controller.stream.listen((data) {
print('Receive b: $data');
});
controller.sink.add(0);
controller.sink.add(1);
controller.close();
// Receive a: 0
// Receive b: 0
// Receive a: 1
// Receive b: 1
Stream.multi
今回のポイントは Streamクラスに用意されている Stream.multi constructor
になります。
これはDart2.9から導入されました。
Stream.multiとは、作成されたStreamがListenされるたびに、onListenコールバックが新しいMulti Stream Controllerで呼び出され、そのListenの呼び出しによって返されたStreamSubscription
にイベントが転送されます。
Stream.multiの挙動はこんな感じです。
factoryコンストラクタのコールバックとしてMultiStreamControllerが返ってくるので、このStreamControllerに対して値をemitしてあげます。
final Stream<int> stream = Stream.multi((MultiStreamController controller) {
controller.onCancel = () {
controller.close();
};
controller.add(0);
});
stream.listen((data) {
print('Receive a: $data');
});
stream.listen((data) {
print('Receive b: $data');
});
// Receive a: 0
// Receive b: 0
完成形
Stream.multのコールバックにあるcontrollerをうまく使えば、最後の値をキャッシュしてListenと同時に値をemitしてあげることができそうです。
最終的にはこんな感じになります。
class BehaviorStreamNotifier<T> {
BehaviorStreamNotifier({
T? initialValue,
}) : _latestValue = initialValue;
final Set<MultiStreamController<T>> _controllers = {};
T? _latestValue;
Stream<T> get stream {
return Stream.multi((controller) {
controller.onCancel = () {
controller.close();
_controllers.remove(controller);
};
final value = _latestValue;
if (value != null) {
controller.add(value);
}
_controllers.add(controller);
});
}
void add(T value) {
_latestValue = value;
for (final controller in _controllers) {
controller.add(value);
}
}
Future<void> close() async {
await Future.wait<void>(_controllers.map((c) {
c.onCancel = null;
return c.close();
}));
_controllers.clear();
}
}
使い方
使い方はStreamとほぼ同じです。
Dart標準のStreamの場合はListen前にemitされた値が受け取れていませんでしたが、Stream.multiを使って作った自作BehaviorStreamNotifier
の場合は受け取れています。
final behaviorStreamNotifier = BehaviorStreamNotifier<int>();
behaviorStreamNotifier.stream.listen((data) {
print('Receive a: $data');
});
behaviorStreamNotifier.add(100);
behaviorStreamNotifier.stream.listen((data) {
print('Receive b: $data');
});
// Receive b: 100
// Receive a: 100
また、initialValueも取れるようになっているので、初期値を与えることも可能です。
final behaviorStreamNotifier = BehaviorStreamNotifier<int>(initialValue: 0);
behaviorStreamNotifier.stream.listen((data) {
print('Receive a: $data');
});
behaviorStreamNotifier.add(100);
behaviorStreamNotifier.stream.listen((data) {
print('Receive b: $data');
});
// Receive a: 0
// Receive b: 100
// Receive a: 100
DartのStreamをBehaviorSubjectみたいな使い方したいんだけどなーという方の参考になれば幸いです。