subscribe<TModel extends TRepositoryModel> method

Stream<List<TModel>> subscribe<TModel extends TRepositoryModel>({
  1. OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
  2. Query? query,
})
inherited

Listen for streaming changes when the sqliteProvider is invoked. For example, whenever new data is acquired from remote, or data is upserted locally, or data is deleted locally, the stream will be notified with a local fetch of query.

get is invoked on the memoryCacheProvider and sqliteProvider following an upsert invocation. For more, see notifySubscriptionsWithLocalData.

policy is only applied to the initial population of the stream. Only local data is supplied on subsequent events to notifySubscriptionsWithLocalData.

It is strongly recommended that this invocation be immediately .listened assigned with the assignment/subscription .cancel()'d as soon as the data is no longer needed. The stream will not close naturally.

Implementation

Stream<List<TModel>> subscribe<TModel extends TRepositoryModel>({
  OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.localOnly,
  Query? query,
}) {
  query ??= const Query();
  if (subscriptions[TModel]?[query] != null) {
    return subscriptions[TModel]![query]!.stream as Stream<List<TModel>>;
  }

  final controller = StreamController<List<TModel>>(
    onCancel: () async {
      await subscriptions[TModel]?[query]?.close();
      subscriptions[TModel]?.remove(query);
      if (subscriptions[TModel]?.isEmpty ?? false) {
        subscriptions.remove(TModel);
      }
    },
  );

  subscriptions[TModel] ??= {};
  subscriptions[TModel]?[query] = controller;

  // ignore: discarded_futures
  get<TModel>(query: query, policy: policy).then(
    (results) {
      if (!controller.isClosed) controller.add(results);
    },
    onError: (error) {
      if (!controller.isClosed) controller.addError(error);
    },
  );

  return controller.stream;
}