asStream method
Stream implementation
Implementation
@override
Stream<TetherClientReturn<TModel>> asStream() {
try {
if (type != SqlOperationType.select) {
throw UnsupportedError(
'Streaming is only supported for SELECT operations.',
);
}
if (localQuery == null) {
throw ArgumentError('localQuery must be provided for streaming.');
}
// FIX 1: Build the query once to get both SQL and arguments.
final aliasedQuery = localQuery!.copyWith(
selectColumns: '${localQuery!.selectColumns} AS jsobjects',
);
final FinalSqlStatement builtQuery = aliasedQuery.build();
// Stream data from the local database, now with parameters.
final localStream = localDb
.watch(builtQuery.sql, parameters: builtQuery.arguments)
.map((rows) {
// Filter out rows where 'jsobjects' is null before mapping.
return rows
.where((row) => row['jsobjects'] != null)
.map((row) => fromSqliteFactory(createRowFromMap(jsonDecode(row['jsobjects'] as String))))
.toList();
});
final controller = StreamController<TetherClientReturn<TModel>>();
int? currentRemoteCount;
// This function will run in the background.
Future<void> syncRemoteData() async {
if (isLocalOnly) return;
try {
// Reset Prefer first, then call count to avoid duplicates
final remoteDataResponse =
await _resetPreferForCount().count(CountOption.exact);
currentRemoteCount = remoteDataResponse.count;
await upsertSupabaseData(remoteDataResponse.data);
} catch (error, stackTrace) {
// FIX 2: Pipe the error into the stream instead of throwing.
if (!controller.isClosed) {
log('Error fetching remote data for stream: $error');
controller.addError(error, stackTrace);
}
}
}
// Start the background synchronization
syncRemoteData();
final localSubscription = localStream.listen(
(listData) {
// When localDataStream emits, package it with the currentRemoteCount
// If supabase fetch hasn't completed, currentRemoteCount will be null
if (!controller.isClosed) {
controller.add(
TetherClientReturn<TModel>(
data: listData,
count: currentRemoteCount,
),
);
}
},
onError: (error, stackTrace) {
if (!controller.isClosed) {
controller.addError(error, stackTrace);
}
},
onDone: () {
if (!controller.isClosed) {
controller.close();
}
},
);
controller.onCancel = () {
localSubscription.cancel();
};
return controller.stream;
} catch (e) {
return Stream.value(
TetherClientReturn<TModel>(data: [], error: 'Error in asStream: $e'),
);
}
}