watch<D extends Dto<D>, F extends DtoFilter<D>, E extends DtoExpand<D>, S extends DtoSort<D>, F2 extends DtoFieldSelect<D>> method

Stream<WatchEvent<D>> watch<D extends Dto<D>, F extends DtoFilter<D>, E extends DtoExpand<D>, S extends DtoSort<D>, F2 extends DtoFieldSelect<D>>(
  1. DtoMeta<D, F, E, S, F2> meta, {
  2. String topic = '*',
  3. void expand(
    1. E
    )?,
  4. void filter(
    1. F
    )?,
  5. void fields(
    1. F2
    )?,
  6. Map<String, dynamic> query = const {},
  7. Map<String, String> headers = const {},
})

Implementation

Stream<WatchEvent<D>> watch<
  D extends Dto<D>,
  F extends DtoFilter<D>,
  E extends DtoExpand<D>,
  S extends DtoSort<D>,
  F2 extends DtoFieldSelect<D>
>(
  DtoMeta<D, F, E, S, F2> meta, {
  String topic = '*',
  void Function(E)? expand,
  void Function(F)? filter,
  void Function(F2)? fields,
  Map<String, dynamic> query = const {},
  Map<String, String> headers = const {},
}) {
  var collection = meta.collectionName;
  var converter = meta.fromRecord;
  late StreamController<WatchEvent<D>> controller;
  Future<UnsubscribeFunc>? unsubscribeFuture;

  void startListening() {
    unsubscribeFuture = _pb
        .collection(collection)
        .subscribe(
          topic,
          (event) {
            controller.add(
              WatchEvent(
                event.action,
                event.record != null ? converter(event.record!) : null,
              ),
            );
          },
          expand: expand?.toString(),
          filter: filter?.toString(),
          fields: fields?.toString(),
          query: query,
          headers: headers,
        );
  }

  void stopListening() async {
    if (unsubscribeFuture != null) {
      var unsubscribe = await unsubscribeFuture!;
      await unsubscribe();
      unsubscribeFuture = null;
    }
  }

  controller = StreamController<WatchEvent<D>>(
    onListen: () async {
      startListening();
    },
    onPause: () {
      stopListening();
    },
    onResume: () {
      startListening();
    },
    onCancel: () {
      stopListening();
    },
  );

  return controller.stream;
}