watch<D extends Dto<D> , F extends DtoFilter<D> , E extends DtoExpand<D> , S extends DtoSort<D> , F2 extends DtoFieldSelect<D> > method
Stream<WatchEvent<D> >
watch<D extends Dto<D> , F extends DtoFilter<D> , E extends DtoExpand<D> , S extends DtoSort<D> , F2 extends DtoFieldSelect<D> >(})
Implementation
Stream<WatchEvent<D>> watch<
D extends Dto<D>,
F extends DtoFilter<D>,
E extends DtoExpand<D>,
S extends DtoSort<D>,
F2 extends DtoFieldSelect<D>
>(
DtoMeta<D, F, E, S, F2> meta, {
String topic = '*',
void Function(E)? expand,
void Function(F)? filter,
void Function(F2)? fields,
Map<String, dynamic> query = const {},
Map<String, String> headers = const {},
}) {
var collection = meta.collectionName;
var converter = meta.fromRecord;
late StreamController<WatchEvent<D>> controller;
Future<UnsubscribeFunc>? unsubscribeFuture;
void startListening() {
unsubscribeFuture = _pb
.collection(collection)
.subscribe(
topic,
(event) {
controller.add(
WatchEvent(
event.action,
event.record != null ? converter(event.record!) : null,
),
);
},
expand: expand?.toString(),
filter: filter?.toString(),
fields: fields?.toString(),
query: query,
headers: headers,
);
}
void stopListening() async {
if (unsubscribeFuture != null) {
var unsubscribe = await unsubscribeFuture!;
await unsubscribe();
unsubscribeFuture = null;
}
}
controller = StreamController<WatchEvent<D>>(
onListen: () async {
startListening();
},
onPause: () {
stopListening();
},
onResume: () {
startListening();
},
onCancel: () {
stopListening();
},
);
return controller.stream;
}