watchQuery method

Stream<List<T>>? watchQuery(
  1. DatumQuery query, {
  2. String? userId,
})

Watches a subset of items matching a query. Returns null if the adapter does not support reactive queries.

Implementation

Stream<List<T>>? watchQuery(DatumQuery query, {String? userId}) {
  _ensureInitialized();
  final adapterStream = localAdapter.watchQuery(query, userId: userId);
  if (adapterStream == null) return null;

  return adapterStream.asyncMap((list) async {
    try {
      // Apply post-fetch transforms with error handling
      final transformedList = <T>[];
      for (final entity in list) {
        try {
          final transformed = await _applyPostFetchTransforms(entity);
          transformedList.add(transformed);
        } catch (e, stack) {
          _logger.error('Failed to apply post-fetch transforms to entity ${entity.id}: $e', stack);
          // Continue with other entities instead of failing the entire stream
          transformedList.add(entity); // Use original entity if transform fails
        }
      }
      return transformedList;
    } catch (e, stack) {
      _logger.error('Failed to transform entity list in watchQuery: $e', stack);
      // Return the original list if transformation fails completely
      return list;
    }
  }).handleError((error, stack) {
    _logger.error('Error in watchQuery stream for $T: $error', stack);
    // Don't rethrow - let the stream continue
  });
}