asStream method

  1. @override
Stream<TetherClientReturn<TModel>> asStream()
override

Stream implementation

Implementation

@override
Stream<TetherClientReturn<TModel>> asStream() {
  try {
    if (type != SqlOperationType.select) {
      throw UnsupportedError(
        'Streaming is only supported for SELECT operations.',
      );
    }

    if (localQuery == null) {
      throw ArgumentError('localQuery must be provided for streaming.');
    }

    // FIX 1: Build the query once to get both SQL and arguments.
    final aliasedQuery = localQuery!.copyWith(
      selectColumns: '${localQuery!.selectColumns} AS jsobjects',
    );
    final FinalSqlStatement builtQuery = aliasedQuery.build();

    // Stream data from the local database, now with parameters.
    final localStream = localDb
        .watch(builtQuery.sql, parameters: builtQuery.arguments)
        .map((rows) {
      // Filter out rows where 'jsobjects' is null before mapping.
      return rows
          .where((row) => row['jsobjects'] != null)
          .map((row) => fromSqliteFactory(createRowFromMap(jsonDecode(row['jsobjects'] as String))))
          .toList();
    });

    final controller = StreamController<TetherClientReturn<TModel>>();
    int? currentRemoteCount;

    // This function will run in the background.
    Future<void> syncRemoteData() async {
      if (isLocalOnly) return;
      try {
        // Reset Prefer first, then call count to avoid duplicates
        final remoteDataResponse =
            await _resetPreferForCount().count(CountOption.exact);

        currentRemoteCount = remoteDataResponse.count;
        await upsertSupabaseData(remoteDataResponse.data);
      } catch (error, stackTrace) {
        // FIX 2: Pipe the error into the stream instead of throwing.
        if (!controller.isClosed) {
          log('Error fetching remote data for stream: $error');
          controller.addError(error, stackTrace);
        }
      }
    }

    // Start the background synchronization
    syncRemoteData();

    final localSubscription = localStream.listen(
      (listData) {
        // When localDataStream emits, package it with the currentRemoteCount
        // If supabase fetch hasn't completed, currentRemoteCount will be null
        if (!controller.isClosed) {
          controller.add(
            TetherClientReturn<TModel>(
              data: listData,
              count: currentRemoteCount,
            ),
          );
        }
      },
      onError: (error, stackTrace) {
        if (!controller.isClosed) {
          controller.addError(error, stackTrace);
        }
      },
      onDone: () {
        if (!controller.isClosed) {
          controller.close();
        }
      },
    );
    controller.onCancel = () {
      localSubscription.cancel();
    };

    return controller.stream;
  } catch (e) {
    return Stream.value(
      TetherClientReturn<TModel>(data: [], error: 'Error in asStream: $e'),
    );
  }
}