synchronize method

FutureOr<(DatumSyncResult<T>, List<DatumSyncEvent<T>>)> synchronize(
  1. String userId, {
  2. bool force = false,
  3. DatumSyncOptions<T>? options,
  4. DatumSyncScope? scope,
})

Implementation

FutureOr<(DatumSyncResult<T>, List<DatumSyncEvent<T>>)> synchronize(
  String userId, {
  bool force = false,
  DatumSyncOptions<T>? options,
  DatumSyncScope? scope,
}) async {
  final generatedEvents = <DatumSyncEvent<T>>[];
  final snapshot = statusSubject.value;
  if (!await connectivityChecker.isConnected && !force) {
    logger.warn('Sync skipped for user $userId: No internet connection.');
    return (
      // No health change, just skipped.
      DatumSyncResult<T>.skipped(userId, snapshot.pendingOperations),
      <DatumSyncEvent<T>>[],
    );
  }

  if (snapshot.status == DatumSyncStatus.syncing) {
    logger.info('Sync already in progress for user $userId. Skipping.');
    return (
      DatumSyncResult<T>.skipped(userId, snapshot.pendingOperations),
      <DatumSyncEvent<T>>[],
    );
  }

  await checkForUserSwitch(userId);

  // If forceFullSync is true, bypass metadata comparison and proceed with sync.
  if (options?.forceFullSync == true) {
    logger.info('Sync for user $userId forced: forceFullSync option is true.');
  } else {
    // Fetch local and remote metadata to determine if a sync is necessary.
    final localMetadata = await localAdapter.getSyncMetadata(userId);
    final remoteMetadata = await remoteAdapter.getSyncMetadata(userId);

    // Check if there are any pending local operations.
    final pendingLocalOperations = await queueManager.getPendingCount(userId);

    // Compare relevant metadata fields for skipping.
    final metadataMatches = localMetadata != null && remoteMetadata != null && localMetadata.dataHash == remoteMetadata.dataHash && _deepCompareEntityCounts(localMetadata.entityCounts, remoteMetadata.entityCounts);

    // If metadata matches and there are no pending local operations, skip the sync.
    if (metadataMatches && pendingLocalOperations == 0) {
      logger.info('Sync for user $userId skipped: No changes detected based on metadata and no pending local operations.');
      return (
        DatumSyncResult<T>.skipped(
          userId,
          snapshot.pendingOperations,
          reason: 'No changes detected based on metadata',
        ),
        <DatumSyncEvent<T>>[],
      );
    }
  }

  // Fetch the last sync result to get the previous total byte counts.
  final lastSyncResult = await localAdapter.getLastSyncResult(userId);

  int bytesPushedThisCycle = 0;
  int bytesPulledThisCycle = 0;

  final stopwatch = Stopwatch()..start();

  // Reset the snapshot for the new sync cycle, preserving only the user ID.
  statusSubject.add(
    DatumSyncStatusSnapshot.initial(userId).copyWith(
      status: DatumSyncStatus.syncing,
      health: const DatumHealth(status: DatumSyncHealth.syncing),
      // Carry over pending operations count for the start event.
      pendingOperations: (await queueManager.getPending(userId)).length,
    ),
  );
  final startEvent = DatumSyncStartedEvent<T>(
    userId: userId,
    pendingOperations: snapshot.pendingOperations,
  );
  generatedEvents.add(startEvent);
  _notifyObservers(startEvent);

  try {
    final direction = options?.direction ?? config.defaultSyncDirection;

    switch (direction) {
      case SyncDirection.pushThenPull:
        bytesPushedThisCycle += await _pushChanges(userId, generatedEvents);
        bytesPulledThisCycle += await _pullChanges(userId, options, scope, generatedEvents);
      case SyncDirection.pullThenPush:
        bytesPulledThisCycle += await _pullChanges(userId, options, scope, generatedEvents);
        bytesPushedThisCycle += await _pushChanges(userId, generatedEvents);
      case SyncDirection.pushOnly:
        bytesPushedThisCycle += await _pushChanges(userId, generatedEvents);
      case SyncDirection.pullOnly:
        bytesPulledThisCycle += await _pullChanges(userId, options, scope, generatedEvents);
    }

    // Update metadata after sync operations
    await _updateMetadata(userId);

    // After operations, check if the sync was cancelled by a dispose call.
    // The status subject would be closed in this case.
    if (statusSubject.isClosed) {
      logger.warn(
        'Sync for user $userId was cancelled mid-process due to manager disposal.',
      );
      return (
        DatumSyncResult<T>.cancelled(userId, statusSubject.value.syncedCount),
        generatedEvents,
      );
    }

    final finalPending = await queueManager.getPending(userId);
    final result = DatumSyncResult(
      userId: userId,
      duration: stopwatch.elapsed,
      syncedCount: statusSubject.value.syncedCount,
      failedCount: statusSubject.value.failedOperations,
      conflictsResolved: statusSubject.value.conflictsResolved,
      pendingOperations: finalPending,
      bytesPushedInCycle: bytesPushedThisCycle,
      bytesPulledInCycle: bytesPulledThisCycle,
      totalBytesPushed: (lastSyncResult?.totalBytesPushed ?? 0) + bytesPushedThisCycle,
      totalBytesPulled: (lastSyncResult?.totalBytesPulled ?? 0) + bytesPulledThisCycle,
    );

    // Check if controllers are closed before adding events, as the manager
    // might have been disposed during the sync operation.
    if (!statusSubject.isClosed) {
      statusSubject.add(
        // The final status should be idle, not completed.
        // 'completed' is a transient status for the event, not the final state.
        statusSubject.value.copyWith(
          status: DatumSyncStatus.idle, // The manager is now idle
          health: const DatumHealth(status: DatumSyncHealth.healthy),
        ),
      );
    }
    if (!eventController.isClosed) {
      final completedEvent = DatumSyncCompletedEvent<T>(
        userId: userId,
        result: result,
      );
      generatedEvents.add(completedEvent);
      _notifyObservers(completedEvent);
    }
    return (result, generatedEvents);
  } catch (e, stack) {
    logger.error('Synchronization failed for user $userId: $e', stack);

    // If the eventController is closed, it means the manager has been disposed
    // during the sync. In this case, we should re-throw the original error
    // directly, as there's no point in wrapping it with events that won't
    // be processed.
    if (eventController.isClosed) {
      if (e is SyncExceptionWithEvents<T>) {
        throw e.originalError;
      } else {
        rethrow;
      }
    }

    // If the eventController is still open, proceed with normal error handling:
    // Update status, add error event to generatedEvents, notify observers,
    // and wrap the exception in SyncExceptionWithEvents.
    statusSubject.add(
      statusSubject.value.copyWith(
        status: DatumSyncStatus.failed, // The sync cycle failed
        health: const DatumHealth(status: DatumSyncHealth.error),
        errors: [e],
      ),
    );
    final errorEvent = DatumSyncErrorEvent<T>(
      userId: userId,
      error: e is SyncExceptionWithEvents<T> ? e.originalError : e,
      stackTrace: stack,
    );
    generatedEvents.add(errorEvent);
    _notifyObservers(errorEvent);

    // Instead of a simple `rethrow`, we wrap the error in a custom
    // exception. This allows us to transport the `generatedEvents`
    // (which now includes the crucial error event) back up to the
    // DatumManager, which can process them before the user-facing Future
    // completes with an error.
    if (e is SyncExceptionWithEvents<T>) {
      rethrow; // Re-throw the existing SyncExceptionWithEvents
    } else {
      throw SyncExceptionWithEvents(e, stack, generatedEvents);
    }
  }
}