rpc_dart_data 3.4.1
rpc_dart_data: ^3.4.1 copied to clipboard
Data service layer (CRUD + querying + change streams) built on rpc_dart.
rpc_dart_data #
rpc_dart_data is the high-level data layer (CRUD + queries + change streams) that sits on top of rpc_dart.
It gives you a transport-agnostic contract, ready-to-use storage adapters, and utilities for building resilient backends and clients.
Feature highlights #
- Unified
DataServicecontract with helpers for create/get/list/update/patch/delete/deleteCollection. - Bulk workflows via
bulkUpsert,bulkUpsertStream, andbulkDeleteto move large batches atomically. - Search delegated to the storage adapter, including backend pagination.
- Streaming snapshots: export/import the full database as NDJSON, stream it through
payloadStream, or setincludePayloadString: falseto skip building large strings. - Change streams:
watchChangeswith cursors. - Schema validation & migrations: per-collection schema registry (nested objects/arrays), validation on all write/import RPCs. Server-side migrations with checkpoints/logs and automatic index/FTS rebuilds after success; migrations are bound to a collection and run via repository helpers.
- Pass a
SqlCipherKeyto enable encryption; the runtime fails fast if cipher pragmas are missing. - Ready-made environments (
DataServiceFactory.inMemory) for tests, demos, and local prototyping. - Collection discovery: RPC
listCollections()queries the storage adapter for the current list of collection names so clients can introspect available datasets without enumerating all records.
Adapters #
- SQLite adapter with SQLCipher support out-of-the-box via SQLite3MultipleCiphers (
hooks.user_defines.sqlite3.source: sqlite3mc), no systemlibsqlcipherneeded; web usessqlite3mc.wasm.SqliteSetupHooklets you register custom pragmas before the database is exposed to your code. - PostgreSQL adapter (alpha): one collection per JSONB table, optimistic concurrency; filters/sorting/pagination/FTS execute inside Postgres (GIN over
to_tsvector).
Architecture in seven layers #
- Transport (WebSocket / HTTP/2 / isolates / TURN / in-memory) provided by
rpc_dartimplementations (you can plug in anyIRpcTransport; this package now relies on the transports shipped byrpc_dartinstead of bundlingrpc_dart_transportsdirectly). - Endpoint (
RpcCallerEndpoint/RpcResponderEndpoint). - Contract + codecs (
IDataServiceContractandRpcCodec<...>). - Low-level plumbing (
DataServiceCaller/DataServiceResponder). - Repository + storage adapter (
BaseDataRepository, change journal, and the concrete adapter: in-memory or SQLite). - Facade (
DataServiceClient,DataServiceServer,DataServiceFactory,InMemoryDataServiceEnvironment). Consumers usually interact only with layer 6 while everything below stays private.
Quick start #
import 'package:rpc_dart/rpc_dart.dart';
import 'package:rpc_dart_data/rpc_dart_data.dart';
Future<void> main() async {
final env = await DataServiceFactory.inMemory();
final client = env.client;
final ctx = RpcContext.withHeaders({'authorization': 'Bearer dev'});
final created = await client.create(
collection: 'notes',
payload: {'title': 'Hello', 'done': false},
context: ctx,
);
final sub = client
.watchChanges(collection: 'notes', context: ctx)
.listen((event) => print('change=${event.type} id=${event.id} v=${event.version}'));
await client.patch(
collection: 'notes',
id: created.id,
expectedVersion: created.version,
patch: const RecordPatch(set: {'done': true}),
context: ctx,
);
await Future<void>.delayed(const Duration(milliseconds: 50));
await sub.cancel();
await env.dispose();
}
See example/extended_demo.dart for a larger walkthrough.
Streaming export and import #
DataRepository.exportDatabase writes every snapshot as newline-delimited JSON (NDJSON).
Each line contains a header, collection, record, collectionEnd, or footer entry. You can either:
- Work with the whole payload as a string (default), or
- Set
ExportDatabaseRequest(includePayloadString: false)and consume thepayloadStreamto keep memory flat while sending it over the wire.
Imports accept the same format. When replaceExisting is true, missing collections are removed and re-created so the target repository matches the snapshot exactly. Legacy JSON (format version 1.0.0) is still accepted for backward compatibility, but it requires loading the entire blob in memory first.
ImportDatabaseRequest always validates the snapshot stream before mutating any collections, and the importer processes records in databaseImportBatchSize chunks, so very large dumps no longer spike RAM usage.
Example: piping exported data #
final export = await repository.exportDatabase(
const ExportDatabaseRequest(includePayloadString: false),
);
await for (final chunk in export.payloadStream!) {
sink.add(chunk); // write to file/socket/etc
}
Schema validation and migrations #
- Per-collection schemas (nested objects/arrays) live in a registry; validation runs on create/update/patch/bulk/import over RPC.
- Default policy is controlled via
SchemaValidationConfig(defaultSchemaEnabled, defaultRequireValidation); per-collection overrides usesetSchemaPolicy. - Migrations are server-side only: register declarative steps with
MigrationDefinition(bound to acollection) and run them viaMigrationRunnerHelperorSqliteDataRepository.runMigration. Migration success activates the new schema and rebuilds indexes/FTS; any errors keep the checkpoint and block activation.
Minimal server-side wiring:
final storage = await SqliteDataStorageAdapter.memory();
final repo = SqliteDataRepository(storage: storage);
final migrations = MigrationPlan.forCollection('notes')
.initial(
migrationId: 'notes_init_v1',
toVersion: 1,
schema: {
'type': 'object',
'required': ['title'],
'properties': {
'title': {'type': 'string'},
},
},
)
.next(
migrationId: 'notes_add_slug_v2',
toVersion: 2,
schema: {
'type': 'object',
'required': ['title', 'slug'],
'properties': {
'title': {'type': 'string'},
'slug': {'type': 'string'},
},
},
transformer: (payload) => {...payload, 'slug': (payload['title'] as String).toLowerCase()},
)
.build();
final helper = MigrationRunnerHelper(
repository: repo,
migrations: migrations,
);
await helper.applyPendingMigrations(); // idempotent across restarts; checkpoints are persisted.
Change streams and optimistic concurrency #
watchChangesaccepts an optionalcursor, so clients can resume after a reconnect.- SQLite keeps a persistent
s_change_journaltable, allowing cursors to survive restarts. updateandpatchrequire anexpectedVersion. A mismatch triggersRpcDataError.conflict(...)(or a transportRpcException).
Listing collections #
Call DataServiceClient.listCollections() to retrieve the current catalog of collection names without enumerating records.
final collections = await client.listCollections(context: ctx);
print('collections: ${collections.join(', ')}');
The RPC experience simply forwards to DataStorageAdapter.listCollections(), so the result reflects whatever tables your adapter created.
Extending the storage layer #
Implement DataStorageAdapter to plug in any backend (PostgreSQL, Elastic, Firestore, ...):
class PostgresAdapter implements DataStorageAdapter {
@override
Future<DataRecord?> readRecord(String collection, String id) async {
// Query your store and return DataRecord when a row exists.
}
@override
Future<ListRecordsResponse> queryCollection(ListRecordsRequest request) async {
// Apply filters + pagination server-side.
}
// Implement searchCollection, writeRecords, deleteCollection, etc.
}
Adapters participate in streaming export/import by overriding readCollectionChunks, and they can expose custom indices via CollectionIndexStorageAdapter if needed.
SQLite profile #
SqliteDataStorageAdapter ships as the default single-node implementation:
- Creates per-collection tables on demand plus indexes on
version,created_at, andupdated_at. - Delegates filtering, sorting, and pagination to SQL for predictable O(log N) plans.
- Uses batched UPSERT statements to keep
writeRecordsfast even when thousands of rows are updated. - SQLCipher:
- Pass a
SqlCipherKeyto enable encryption; the runtime will auto-loadlibsqlcipheron macOS/Linux (checksSQLITE3_LIB_DIR/SQLITE3_LIB_NAMEor common Homebrew paths) before opening the database and will fail fast if cipher pragmas are missing. - On the web, the adapter uses the
sqlite3mc.wasmbuild (SQLite3MultipleCiphers) by default; setwebSqliteWasmUrito a custom location if you host your own WASM. - You can still register extra PRAGMA via
SqliteSetupHookfor fine-tuning.
- Pass a
- Stores
watch()events in a durable journal, so cursor-based clients recover after restarts.