cross_channel 0.7.1 copy "cross_channel: ^0.7.1" to clipboard
cross_channel: ^0.7.1 copied to clipboard

High-performance & flexible channels for Dart/Flutter.

License pub.flutter-io.cn pub points CI

cross_channel #

Fast & flexible channels for Dart/Flutter.
Rust-style concurrency primitives: MPSC, MPMC, SPSC, OneShot, plus drop policies, latest-only, select, stream/isolate/web adapters.

Bench-tested · ~1.7–1.9 Mops/s for classic queues · ~130–140 Mops/s for LatestOnly

✨ Features #

  • Drop policies: block, oldest, newest
  • LatestOnly buffers (coalesce to last)
  • Select over Futures, Streams, Receivers & Timers
  • Backpressure with bounded queues; rendezvous (capacity=0)
  • Notify primitive (lightweight wakeups: notifyOne, notifyAll, notified)
  • Stream adapters: toBroadcastStream, redirectToSender
  • Isolate/Web adapters: request/reply, ReceivePort / MessagePort bridges
  • Battle-tested: unit + stress tests, micro-benchmarks included

📦 Install #

dart pub add cross_channel

🧭 API #

High-Level (XChannel) #

final (tx, rx) = XChannel.mpsc<T>(capacity: 1024, dropPolicy: DropPolicy.block);
final (tx, rx) = XChannel.mpmc<T>(capacity: 1024, dropPolicy: DropPolicy.oldest);
final (tx, rx) = XChannel.mpscLatest<T>(); // MPSC latest-only
final (tx, rx) = XChannel.mpmcLatest<T>(); // MPMC latest-only (competitive)
final (tx, rx) = XChannel.spsc<int>(capacity: 1024); // pow2 rounded internally
final (tx, rx) = XChannel.oneshot<T>(consumeOnce: false);

Low-Level #

Low-level flavors are available in mpsc.dart, mpmc.dart, spsc.dart, oneshot.dart.

import 'package:cross_channel/mpsc.dart';
final (tx, rx) = Mpsc.unbounded<T>(); // chunked=true (default)
final (tx, rx) = Mpsc.unbounded<T>(chunked: false); // simple unbounded
final (tx, rx) = Mpsc.bounded<T>( 1024);
final (tx, rx) = Mpsc.channel<T>(capacity: 1024, dropPolicy: DropPolicy.oldest, onDrop: (d) {});
final (tx, rx) = Mpsc.latest<T>();

import 'package:cross_channel/mpmc.dart';
final (tx, rx) = Mpmc.unbounded<T>(); // chunked=true (default)
final (tx, rx) = Mpmc.unbounded<T>(chunked: false); // simple unbounded
final (tx, rx) = Mpmc.bounded<T>(1024);
final (tx, rx) = Mpmc.channel<T>(capacity: 1024, dropPolicy: DropPolicy.oldest, onDrop: (d) {});
final (tx, rx) = Mpmc.latest<T>();

import 'package:cross_channel/spsc.dart';
final (tx, rx) = Spsc.channel<T>(1024); // pow2 rounded internally

import 'package:cross_channel/oneshot.dart';
final (tx, rx) = OneShot.channel<T>(consumeOnce: false);

Note #

  • Unbounded channels use chunked buffer by default (hot ring and chunked overflow)
    • Defaults: hot=8192, chunk=4096, rebalanceBatch=64, threshold=cap/16, gate=chunk/2.
  • Streams are single-subscription. Clone receivers for parallel consumers on MPMC.
  • Interop available in isolate_extension.dart, stream_extension.dart, web_extension.dart.
  • Core traits & buffers available in src/*.

🚦 Choosing a channel #

Channel Producers Consumers Use case Notes
MPSC multi single Task queue, async pipeline Backpressure & drop policies
MPMC multi multi Work-sharing worker pool Consumers compete (no broadcast)
SPSC single single Ultra-low-latency hot path Lock-free ring (pow2 capacity)
OneShot 1 1 (or many) Request/response, once-only signal consumeOnce option
LatestOnly multi single (MPSC) / competitive (MPMC) Progress, sensors, UI signals Always coalesces to last value

When to use Notify vs channels #

  • Use Notify for control-plane wakeups without payloads: config-changed, flush, shutdown, “poke a waiter”, etc. (notifyOne / notifyAll; waiter calls notified() and awaits.)
  • Use channels for data-plane messages with payloads and ordering: tasks, jobs, progress values, events to be processed, etc.

💡 Quick examples #

MPSC with backpressure #

import 'package:cross_channel/cross_channel.dart';

Future<void> producer(MpscSender<int> tx) async {
  for (var i = 0; i < 100; i++) {
    await tx.send(i); // waits if queue is full
  }
  tx.close();
}

Future<void> consumer(MpscReceiver<int> rx) async {
  await for (final v in rx.stream()) {
    // handle v
  }
}

void main() {
  final (tx, rx) = XChannel.mpsc<int>(capacity: 8);
  Future.wait([producer(tx), consumer(rx)])
}

MPMC worker pool (competitive consumption) #

import 'package:cross_channel/cross_channel.dart';

Future<void> worker(int id, MpmcReceiver<String> rx) async {
  await for (final task in rx.stream()) {
    // process task
  }
}

void main() async {
  final (tx, rx0) = XChannel.mpmc<String>(capacity: 16);
  final rx1 = rx0.clone();
  final rx2 = rx0.clone();

  final w0 = worker(0, rx0);
  final w1 = worker(1, rx1);
  final w2 = worker(2, rx2);

  for (var i = 0; i < 20; i++) {
    await tx.send('task $i');
  }
  tx.close();

  await Future.wait([w0, w1, w2]);
}

LatestOnly signal (coalesced progress) #

import 'package:cross_channel/cross_channel.dart';

Future<void> ui(MpscReceiver<double> rx) async {
  await for (final p in rx.stream()) {
    // update progress bar with p in [0..1]
  }
}

void main() async {
  final (tx, rx) = XChannel.mpscLatest<double>();

  final _ = ui(rx);
  for (var i = 0; i <= 100; i++) {
    tx.trySend(i / 100); // overwrites previous value
    await Future.delayed(const Duration(milliseconds: 10));
  }
  tx.close();
}

Sliding queues (drop policies) #

final (tx, rx) = XChannel.mpsc<int>(
  capacity: 1024,
  dropPolicy: DropPolicy.oldest, // or DropPolicy.newest
  onDrop: (d) => print('dropped $d'),
);
  • oldest: evicts the oldest queued item to make room (keeps newest data flowing)
  • newest: drops the incoming item (send “looks ok” but value discarded)
  • block: default (producer waits when full)

OneShot (single vs multi observe) #

// consumeOnce = true: first receiver consumes, then disconnects
final (stx, srx) = XChannel.oneshot<String>(consumeOnce: true);

// consumeOnce = false: every receiver sees the same value (until higher-level teardown)
final (btx, brx) = XChannel.oneshot<String>(consumeOnce: false);

🔔 Notify (lightweight wakeups) #

A tiny synchronization primitive to signal tasks without passing data.

  • notified() → returns a (Future<void>, cancel) pair.
    • If a permit is available, it completes immediately and consumes one permit.
    • Otherwise it registers a waiter until notified or canceled.
  • notifyOne() → wakes one waiter or stores one permit if none is waiting.
  • notifyAll() → wakes all current waiters (does not store permits).
  • close() → wakes everyone with disconnected.
  • Integrates with XSelect via onFuture.
import 'package:cross_channel/cross_channel.dart';

final n = Notify();

// Waiter
final (f, cancel) = n.notified();
// ... later: cancel();  // optional

// Notifiers
n.notifyOne(); // or
n.notifyAll();

// With XSelect
final (fu, _) = n.notified();
await XSelect.run<void>((s) => s
  ..onFuture<void>(fu, (_) => null, tag: 'notify')
  ..onTick(Ticker.every(const Duration(seconds: 1)), () => null, tag: 'tick')
);

🧰 Select (futures/streams/receivers/timers) #

XSelect lets you race multiple asynchronous branches and cancel the losers.

import 'package:cross_channel/cross_channel.dart';

Future<void> main() async {
  final (tx, rx) = XChannel.mpsc<int>(capacity: 8);

  // Example: first event wins among a stream, a channel, a timer, and a future.
  final result = await XSelect.run<String>((s) => s
    ..onStream<int>(
      Stream.periodic(const Duration(milliseconds: 50), (i) => i),
      (i) => 'stream:$i',
      tag: 'S',
    )
    // simple
    ..onRecvValue<int>(
      rx,
      (v) => 'recv:$v',
      onDisconnected: () => 'disconnected',
      tag: 'R',
    )
    // full control over RecvResult
    ..onReceiver<int>(
      rx,
      (res) {
        if (res is RecvOk<int>) return 'recv:${res.value}';
        if (res is RecvErrorDisconnected) return 'disconnected';
        return 'unexpected:$res';
      },
      tag: 'R'?
    )
    ..onTick(
     Ticker.every(const Duration(milliseconds: 50)),
      () => 'timer',
      tag: 'T',
    )
    ..onFuture<String>(
      Future<String>.delayed(const Duration(milliseconds: 80), () => 'future'),
      (s) => s,
      tag: 'F',
    )
  );

  print('winner -> $result');
}

Notes:

  • returns the first resolved branch and cancels the rest.
  • ordered = true forces order, otherwise we use fairness rotation
  • syncRun only uses immediate arms, aka non blocking

🔗 Interop #

Stream #

import 'package:cross_channel/stream_extension.dart';

// Receiver → broadcast Stream (pause/resume when no listeners)
final broadcast = rx.toBroadcastStream(
  waitForListeners: true,
  stopWhenNoListeners: true,
  closeReceiverOnDone: false,
);

// Stream → Sender (optional drop on full; auto-close sender on done)
await someStream.redirectToSender(tx, dropWhenFull: true);

Isolates #

import 'dart:isolate';
import 'package:cross_channel/isolate_extension.dart';

// Typed request/reply
final reply = await someSendPort.request<Map<String, Object?>>(
  'get_user',
  data: {'id': 42},
  timeout: const Duration(seconds: 3),
);

// Port → channel bridge
final rp = ReceivePort();
final (tx, rx) = rp.toMpsc<MyEvent>(capacity: 512, strict: true);

Web #

import 'package:web/web.dart';
import 'package:cross_channel/web_extension.dart';


final channel = MessageChannel();

// Typed request/reply
final res = await channel.port1.request<String>('ping');

// Port → channel bridge
final (tx, rx) = channel.port2.toMpmc<JsEvent>(capacity: 512, strict: true);

🧩 Results & helpers #

// SendResult: SendOk | SendErrorFull | SendErrorDisconnected
// RecvResult: RecvOk(value) | RecvErrorEmpty | RecvErrorDisconnected

// Extensions:

// SendResult
r.hasSend;
r.isFull;
r.isDisconnected;
r.isTimeout;
r.isFailed;
r.hasError;

// RecvResult
rr.hasValue;
rr.isEmpty;
rr.isDisconnected;
rr.isTimeout;
rr.isFailed;
rr.hasError;
rr.valueOrNull;

// Timeouts/batching/draining:
await tx.sendTimeout(v, const Duration(milliseconds: 10));
await tx.sendAll(iterable);     // waits on full
tx.trySendAll(iterable);        // best-effort

rx.tryRecvAll(max: 128);        // burst non-blocking drain
await rx.recvAll(idle: Duration(milliseconds: 1), max: 1024);

// Cancelable receive (KeepAliveReceiver only):
final (fut, cancel) = rx.recvCancelable();
cancel(); // attempts to abort the wait if still pending

📊 Benchmarks (Dart VM, i7-8550U, High priority, CPU affinity set) #

  • Benches are single-isolate micro-benchmarks.
  • Pinning affinity/priority helps stabilize latencies.

MPSC #

case Mops/s (median) ns/op (median) max us (median)
ping-pong cap=1 (1P/1C) 1.87 535.7 177.0
pipeline cap=1024 (1P/1C) 1.83 547.1 264.0
pipeline unbounded (1P/1C) 1.83 546.5 269.0
multi-producers x4 cap=1024 1.06 946.5 13907.0
pipeline rendezvous cap=0 (1P/1C) 1.68 593.8 150.0
sliding=oldest cap=1024 (1P/1C) 24.19 41.3 41133.0
sliding=newest cap=1024 (1P/1C) 55.25 18.1 17895.0
latestOnly (coalesce) (1P/1C) 135.89 7.4 7353.0

MPMC #

case Mops/s (median) ns/op (median) max us (median)
ping-pong cap=1 (1P/1C) 1.87 536.1 129.0
pipeline cap=1024 (1P/1C) 1.90 525.0 121.0
pipeline unbounded (1P/1C) 1.87 534.1 233.0
producers x4 cap=1024 (1C) 1.07 937.8 12581.0
producers x4 / consumers x4 cap=1024 1.78 561.3 95.0
pipeline rendezvous cap=0 (1P/1C) 1.72 580.1 126.0
sliding=oldest cap=1024 (1P/1C) 24.24 41.3 41048.0
sliding=newest cap=1024 (1P/1C) 54.14 18.5 18270.0
latestOnly (coalesce) (1P/1C) 139.90 7.1 7143.0
latestOnly (coalesce) (1P/4C competitive) 125.00 8.0 7994.0

SPSC #

case Mops/s (median) ns/op (median) max us (median)
spsc ping-pong pow2=1024 (1P/1C) 1.76 569.8 108.0
spsc pipeline pow2=1024 (1P/1C) 1.78 562.8 99.0
spsc pipeline pow2=4096 (1P/1C) 1.80 555.3 92.0

ONESHOT #

case Mops/s (median) ns/op (median) max us (median)
oneshot create+send+recv 2.62 381.7 146.0
oneshot reuse sender (new rx) 2.56 390.5 44.0

INTER-ISOLATE #

case Mops/s (median) ns/op (median) max us (median)
SendPort roundtrip Uint8List (16.0KB) 0.05 20665.0 199.0
raw SendPort ping-pong (int) 0.20 5078.9 1334.0
raw event drain (int) 2.42 413.9 0.0

ISOLATE #

case Mops/s (median) ns/op (median) max us (median)
isolate event drain 0.60 1675.8 0.0
isolate request echo 0.04 24088.7 1498.0

How to bench #

This repo ships with two PowerShell scripts to run the micro-benchmarks. They produce consistent, copy-pastable output and (optionally) a CSV for later analysis.

Windows or PowerShell Core (pwsh) on macOS/Linux is fine.
For non-PowerShell usage, see the manual commands at the end.

Lite — fast dev loop

Compiles and runs once per target. No CSV, no CPU pinning, no priority tweaks.

# All suites, 1e6 iterations per case
.\bench_lite.ps1 -Target all -Count 1000000

# Single suite
.\bench_lite.ps1 -Target mpsc -Count 1000000

Full — reproducible runs + CSV

Lets you set CPU affinity, process priority, repeat counts, and append results to a CSV.

# Compile & Run, MPMC only, 5 repeats, High priority, CPU0,
# append CSV lines to bench\out.csv
.\bench_full.ps1 `
  -Target mpmc `
  -Action cr `
  -Count 1000000 `
  -Repeat 5 `
  -Priority High `
  -Affinity 0x1 `
  -Csv `
  -OutCsv "bench\out.csv" `
  -AppendCsv

Parameters (full mode):

  • Target: spsc, mpsc, mpmc, oneshot, isolate, inter_isolate, all
  • Action: compile (just build), run (run existing exes), cr (compile+run)
  • Count: iterations per case (e.g., 1000000)
  • Repeat: run the suite multiple times (e.g., -Repeat 5)
  • Priority: Idle · BelowNormal · Normal · AboveNormal · High · RealTime
  • Affinity: CPU bitmask (e.g., 0x1 = CPU0, 0x3 = CPU0–1)
  • Csv: write results to CSV
  • OutCsv: CSV path (default bench\out.csv)
  • AppendCsv: append instead of overwriting the header

Stability tips: pin CPU with -Affinity, raise -Priority High, do a warm-up repeat, and keep the machine idle.

CSV format

Each benchmark prints lines like:

suite,case,mops,ns_per_op,max_latency_us,notes
MPMC,"pipeline cap=1024 (1P/1C)",1.91,524.5,51.0,
  • suite: group (e.g., MPSC, MPMC, SPSC, ONESHOT, ISOLATE, INTER-ISOLATE)
  • case: scenario description (e.g., pipeline cap=1024 (1P/1C))
  • mops: million ops/sec (higher = better)
  • ns_per_op: nanoseconds per op (lower = better)
  • max_latency_us: max observed recv latency in microseconds (lower = better)
  • notes: free text (often empty)

You can aggregate across repeats (median/mean/p95) in your own tooling.

🧪 Testing #

This repo ships with comprehensive unit, stress and integration tests (isolate/web/stream).

dart test

License #

MIT

0
likes
0
points
17
downloads

Publisher

verified publishermki.dev

Weekly Downloads

High-performance & flexible channels for Dart/Flutter.

Repository (GitHub)
View/report issues

Topics

#concurrency #async #channels #mpsc #mpmc

License

unknown (license)

Dependencies

web

More

Packages that depend on cross_channel