rpc_dart_data 3.5.0 copy "rpc_dart_data: ^3.5.0" to clipboard
rpc_dart_data: ^3.5.0 copied to clipboard

Data service layer (CRUD + querying + change streams) built on rpc_dart.

example/example.dart

// SPDX-FileCopyrightText: 2025 Karim "nogipx" Mamatkazin <nogipx@gmail.com>
//
// SPDX-License-Identifier: MIT

import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';

import 'package:rpc_dart_data/rpc_dart_data.dart';

/// Minimal in-memory transport with a file-backed SQLite adapter.
///
/// The SQLite file lives on disk, while the RPC transport between client and
/// server is in-memory for simplicity. Replace the transport with your own
/// network layer to expose the same repository remotely.
Future<void> main() async {
  final dbFile = File('notes.sqlite');
  if (dbFile.existsSync()) {
    dbFile.deleteSync();
  }

  final connection = await openFileDb(
    options: SqliteConnectionOptions(nativeFileName: dbFile.path),
    // Add your own PRAGMAs or WAL config here.
    sqliteSetup: (db) => db.execute('PRAGMA journal_mode=WAL;'),
    // To enable SQLCipher, also pass sqlCipherKey.
    // sqlCipherKey: cipherKey,
  );

  final storage = SqliteDataStorageAdapter.connection(connection);
  await storage.ensureReady();

  // Enable schema validation and migrations (SQLite repo rebuilds indexes/FTS
  // after migrations by default).
  final schemaEngine = SchemaValidationEngine(
    registry: storage.schemaRegistry,
    config: const SchemaValidationConfig(
      defaultSchemaEnabled: true,
      defaultRequireValidation: true,
    ),
  );

  final repository = SqliteDataRepository(
    storage: storage,
    schemaValidation: schemaEngine,
  );
  final env = await DataServiceFactory.inMemory(repository: repository);
  final client = env.client;
  final serverRepo = env.server.repository as SqliteDataRepository;

  // Bootstrap schema and migrations declaratively (including initial schema).
  final notesMigrations = MigrationPlan.forCollection('notes')
      .initial(
        migrationId: 'notes_init_v1',
        toVersion: 1,
        schema: {
          // Supported schema keywords (subset of JSON Schema):
          // - type: object/array/string/integer/number/boolean/null
          // - enum, required, properties
          // - strings: minLength, maxLength, pattern
          // - numbers: minimum, maximum
          // - arrays: items, minItems, maxItems
          // Advanced keywords (oneOf/anyOf/allOf/$ref/additionalProperties/uniqueItems/etc.)
          // are not implemented and will be ignored.
          'type': 'object',
          'required': ['title'],
          'properties': {
            'title': {'type': 'string'},
            'done': {'type': 'boolean'},
            'tags': {
              'type': 'array',
              'items': {'type': 'string'},
              'minItems': 0,
            },
            'meta': {
              'type': 'object',
              'properties': {
                'priority': {'type': 'integer', 'minimum': 1, 'maximum': 5},
                'rating': {'type': 'number', 'minimum': 0, 'maximum': 1},
                'archived': {'type': 'boolean'},
              },
            },
          },
        },
      )
      .next(
        migrationId: 'notes_add_slug_v2',
        toVersion: 2,
        schema: {
          'type': 'object',
          'required': ['title', 'slug'],
          'properties': {
            'title': {'type': 'string'},
            'slug': {'type': 'string'},
            'done': {'type': 'boolean'},
            'tags': {
              'type': 'array',
              'items': {'type': 'string'},
            },
            'meta': {
              'type': 'object',
              'properties': {
                'priority': {'type': 'integer'},
                'rating': {'type': 'number'},
                'archived': {'type': 'boolean'},
              },
            },
          },
        },
        transformer: _addSlug,
      )
      .build();

  final helper = MigrationRunnerHelper(
    repository: serverRepo,
    migrations: notesMigrations,
  );
  await helper.applyPendingMigrations();
  final schema = await serverRepo.schemaValidationEngine.getSchema('notes');
  print('active schema version=${schema?.version}');

  final created = await client.create(
    collection: 'notes',
    payload: {
      'title': 'Hello',
      'slug': 'hello',
      'done': false,
      'tags': ['welcome', 'intro'],
      'meta': {'priority': 3, 'rating': 0.75, 'archived': false},
    },
  );
  print('created id=${created.id} v=${created.version}');

  final updated = await client.patch(
    collection: 'notes',
    id: created.id,
    expectedVersion: created.version,
    patch: const RecordPatch(set: {'done': true}),
  );
  print('patched id=${updated.id} v=${updated.version}');

  await _printNotes(client);

  await _inspectSchemas(client);
  await _validationFailure(client, updated);
  await _versionConflict(client, updated);
  await _exportSnapshot(client);
  await _demoStreamingImportResume(client);

  await env.dispose();
  await connection.close();
}

Future<void> _printNotes(DataServiceClient client) async {
  final listed = await client.list(
    collection: 'notes',
    options: const QueryOptions(limit: 10),
  );
  for (final note in listed.records) {
    print(
      'note ${note.id} title=${note.payload['title']} slug=${note.payload['slug']} done=${note.payload['done']}',
    );
  }
}

Future<void> _inspectSchemas(DataServiceClient client) async {
  final schemas = await client.listSchemas();
  print(
    'schemas: ${schemas.schemas.map((s) => '${s.collection}@${s.version}')}',
  );
  final fetchedSchema = await client.getSchema(collection: 'notes');
  final activeSchema = fetchedSchema.schema;
  print('active schema for notes v=${activeSchema?.version}');
  await client.setSchemaPolicy(
    collection: 'notes',
    enabled: true,
    requireValidation: true,
  );
}

Future<void> _validationFailure(
  DataServiceClient client,
  DataRecord current,
) async {
  try {
    await client.update(
      collection: 'notes',
      id: current.id,
      expectedVersion: current.version,
      payload: {'title': 'Missing slug', 'done': true},
    );
  } on RpcDataError catch (err) {
    print('validation blocked update: code=${err.code} details=${err.details}');
  }
}

Future<void> _versionConflict(
  DataServiceClient client,
  DataRecord current,
) async {
  try {
    await client.update(
      collection: 'notes',
      id: current.id,
      expectedVersion: current.version - 1,
      payload: {'title': 'Stale write', 'slug': 'stale-write', 'done': true},
    );
  } on RpcDataError catch (err) {
    print('version conflict: code=${err.code} details=${err.details}');
  }
}

Future<void> _exportSnapshot(DataServiceClient client) async {
  final export = client.exportDatabase();
  await for (final chunk in export) {
    print(utf8.decode(chunk));
  }
}

Future<void> _demoStreamingImportResume(DataServiceClient source) async {
  final backupEnv = await DataServiceFactory.inMemory(
    serverLabel: 'backup-server',
    clientLabel: 'backup-client',
  );
  final dump = await source.exportDatabase().toList();

  var resumeAfter = -1;
  try {
    await backupEnv.client.importDatabase(
      // Truncate the dump to emulate a transport drop.
      payload: Stream<Uint8List>.fromIterable(dump.take(3)),
      replaceExisting: true,
    );
  } on ImportResumeException catch (error) {
    resumeAfter = error.lastChunkIndex ?? -1;
    print('import interrupted at chunk $resumeAfter, resuming...');
  }

  final result = await backupEnv.client.importDatabase(
    payload: Stream<Uint8List>.fromIterable(dump),
    replaceExisting: true,
    resumeAfterChunk: resumeAfter,
  );
  print(
    'restored backup collections=${result.collectionCount} records=${result.recordCount} lastChunk=${result.lastChunkIndex}',
  );

  await backupEnv.dispose();
}

Map<String, dynamic> _addSlug(Map<String, dynamic> payload) => {
  ...payload,
  'slug': (payload['title'] as String).toLowerCase(),
};
0
likes
160
points
930
downloads

Publisher

verified publisherdart.nogipx.dev

Weekly Downloads

Data service layer (CRUD + querying + change streams) built on rpc_dart.

Homepage
Repository (GitHub)
View/report issues

Documentation

API reference

License

MIT (license)

Dependencies

async, collection, equatable, licensify, meta, path, postgres, rpc_dart, sqlite3

More

Packages that depend on rpc_dart_data