synchronize method
FutureOr<(DatumSyncResult<T> , List<DatumSyncEvent<T> > )>
synchronize(
- String userId, {
- bool force = false,
- DatumSyncOptions<
T> ? options, - DatumSyncScope? scope,
Implementation
FutureOr<(DatumSyncResult<T>, List<DatumSyncEvent<T>>)> synchronize(
String userId, {
bool force = false,
DatumSyncOptions<T>? options,
DatumSyncScope? scope,
}) async {
final generatedEvents = <DatumSyncEvent<T>>[];
final snapshot = statusSubject.value;
if (!await connectivityChecker.isConnected && !force) {
logger.warn('Sync skipped for user $userId: No internet connection.');
return (
// No health change, just skipped.
DatumSyncResult<T>.skipped(userId, snapshot.pendingOperations),
<DatumSyncEvent<T>>[],
);
}
if (snapshot.status == DatumSyncStatus.syncing) {
logger.info('Sync already in progress for user $userId. Skipping.');
return (
DatumSyncResult<T>.skipped(userId, snapshot.pendingOperations),
<DatumSyncEvent<T>>[],
);
}
await checkForUserSwitch(userId);
// If forceFullSync is true, bypass metadata comparison and proceed with sync.
if (options?.forceFullSync == true) {
logger.info('Sync for user $userId forced: forceFullSync option is true.');
} else {
// Fetch local and remote metadata to determine if a sync is necessary.
final localMetadata = await localAdapter.getSyncMetadata(userId);
final remoteMetadata = await remoteAdapter.getSyncMetadata(userId);
// Check if there are any pending local operations.
final pendingLocalOperations = await queueManager.getPendingCount(userId);
// Compare relevant metadata fields for skipping.
final metadataMatches = localMetadata != null && remoteMetadata != null && localMetadata.dataHash == remoteMetadata.dataHash && _deepCompareEntityCounts(localMetadata.entityCounts, remoteMetadata.entityCounts);
// If metadata matches and there are no pending local operations, skip the sync.
if (metadataMatches && pendingLocalOperations == 0) {
logger.info('Sync for user $userId skipped: No changes detected based on metadata and no pending local operations.');
return (
DatumSyncResult<T>.skipped(
userId,
snapshot.pendingOperations,
reason: 'No changes detected based on metadata',
),
<DatumSyncEvent<T>>[],
);
}
}
// Fetch the last sync result to get the previous total byte counts.
final lastSyncResult = await localAdapter.getLastSyncResult(userId);
int bytesPushedThisCycle = 0;
int bytesPulledThisCycle = 0;
final stopwatch = Stopwatch()..start();
// Reset the snapshot for the new sync cycle, preserving only the user ID.
statusSubject.add(
DatumSyncStatusSnapshot.initial(userId).copyWith(
status: DatumSyncStatus.syncing,
health: const DatumHealth(status: DatumSyncHealth.syncing),
// Carry over pending operations count for the start event.
pendingOperations: (await queueManager.getPending(userId)).length,
),
);
final startEvent = DatumSyncStartedEvent<T>(
userId: userId,
pendingOperations: snapshot.pendingOperations,
);
generatedEvents.add(startEvent);
_notifyObservers(startEvent);
try {
final direction = options?.direction ?? config.defaultSyncDirection;
switch (direction) {
case SyncDirection.pushThenPull:
bytesPushedThisCycle += await _pushChanges(userId, generatedEvents);
bytesPulledThisCycle += await _pullChanges(userId, options, scope, generatedEvents);
case SyncDirection.pullThenPush:
bytesPulledThisCycle += await _pullChanges(userId, options, scope, generatedEvents);
bytesPushedThisCycle += await _pushChanges(userId, generatedEvents);
case SyncDirection.pushOnly:
bytesPushedThisCycle += await _pushChanges(userId, generatedEvents);
case SyncDirection.pullOnly:
bytesPulledThisCycle += await _pullChanges(userId, options, scope, generatedEvents);
}
// Update metadata after sync operations
await _updateMetadata(userId);
// After operations, check if the sync was cancelled by a dispose call.
// The status subject would be closed in this case.
if (statusSubject.isClosed) {
logger.warn(
'Sync for user $userId was cancelled mid-process due to manager disposal.',
);
return (
DatumSyncResult<T>.cancelled(userId, statusSubject.value.syncedCount),
generatedEvents,
);
}
final finalPending = await queueManager.getPending(userId);
final result = DatumSyncResult(
userId: userId,
duration: stopwatch.elapsed,
syncedCount: statusSubject.value.syncedCount,
failedCount: statusSubject.value.failedOperations,
conflictsResolved: statusSubject.value.conflictsResolved,
pendingOperations: finalPending,
bytesPushedInCycle: bytesPushedThisCycle,
bytesPulledInCycle: bytesPulledThisCycle,
totalBytesPushed: (lastSyncResult?.totalBytesPushed ?? 0) + bytesPushedThisCycle,
totalBytesPulled: (lastSyncResult?.totalBytesPulled ?? 0) + bytesPulledThisCycle,
);
// Check if controllers are closed before adding events, as the manager
// might have been disposed during the sync operation.
if (!statusSubject.isClosed) {
statusSubject.add(
// The final status should be idle, not completed.
// 'completed' is a transient status for the event, not the final state.
statusSubject.value.copyWith(
status: DatumSyncStatus.idle, // The manager is now idle
health: const DatumHealth(status: DatumSyncHealth.healthy),
),
);
}
if (!eventController.isClosed) {
final completedEvent = DatumSyncCompletedEvent<T>(
userId: userId,
result: result,
);
generatedEvents.add(completedEvent);
_notifyObservers(completedEvent);
}
return (result, generatedEvents);
} catch (e, stack) {
logger.error('Synchronization failed for user $userId: $e', stack);
// If the eventController is closed, it means the manager has been disposed
// during the sync. In this case, we should re-throw the original error
// directly, as there's no point in wrapping it with events that won't
// be processed.
if (eventController.isClosed) {
if (e is SyncExceptionWithEvents<T>) {
throw e.originalError;
} else {
rethrow;
}
}
// If the eventController is still open, proceed with normal error handling:
// Update status, add error event to generatedEvents, notify observers,
// and wrap the exception in SyncExceptionWithEvents.
statusSubject.add(
statusSubject.value.copyWith(
status: DatumSyncStatus.failed, // The sync cycle failed
health: const DatumHealth(status: DatumSyncHealth.error),
errors: [e],
),
);
final errorEvent = DatumSyncErrorEvent<T>(
userId: userId,
error: e is SyncExceptionWithEvents<T> ? e.originalError : e,
stackTrace: stack,
);
generatedEvents.add(errorEvent);
_notifyObservers(errorEvent);
// Instead of a simple `rethrow`, we wrap the error in a custom
// exception. This allows us to transport the `generatedEvents`
// (which now includes the crucial error event) back up to the
// DatumManager, which can process them before the user-facing Future
// completes with an error.
if (e is SyncExceptionWithEvents<T>) {
rethrow; // Re-throw the existing SyncExceptionWithEvents
} else {
throw SyncExceptionWithEvents(e, stack, generatedEvents);
}
}
}