isolated_stream 2.0.0  isolated_stream: ^2.0.0 copied to clipboard
isolated_stream: ^2.0.0 copied to clipboard
A Dart package for running CPU-intensive stream transformations in separate isolates to prevent blocking the main thread.
isolated_stream #
Make isolates as simple as streams. Transform data in separate isolates with the same ease as regular stream operations - no complex setup.
Features 🎯 #
- 🔄 Non-blocking processing - Transform streams in separate isolates
- ⚡ Configurable concurrency - Process multiple elements while preserving order
- 🎱 Isolate pooling - Distribute work across multiple isolates
- 🎮 Simple API - Easy-to-use extension method on streams
When to use 🤔 #
This package is perfect for:
- CPU-intensive computations on stream data (image processing, mathematical calculations, parsing)
- Preventing UI freezes in Flutter apps during heavy processing
- Maintaining responsiveness while processing large datasets
Quick start 🚀 #
- 
Install this package. dependencies: isolated_stream: ^2.0.0
- 
Import it in your Dart file. import 'package:isolated_stream/isolated_stream.dart';
- 
Create a handler and transform your stream. // Simple handler that doubles numbers class DoubleHandler extends IsolatedHandler<int, int> { @override int compute(int value) => value * 2; // Runs in separate isolate! } final stream = Stream.fromIterable([1, 2, 3, 4, 5]); final transformed = stream.isolatedMap(DoubleHandler()); // Results: [2, 4, 6, 8, 10] - all calculated in separate isolate
- 
Enjoy non-blocking stream processing! 😎 
Less time waiting for frames, more time enjoying fluent animations! 🚀
Features #
🔄 Basic transformation #
Transform stream elements in separate isolates without blocking your main thread:
// Define your computation logic
class MultiplyHandler extends IsolatedHandler<int, int> {
  final int factor;
  
  MultiplyHandler(this.factor) : super(debugName: 'Multiply by $factor');
  
  @override
  int compute(int value) => value * factor; // Runs in isolate!
}
// Use it like any other stream transformation
final stream = Stream.fromIterable([1, 2, 3, 4, 5]);
final transformed = stream.isolatedMap(MultiplyHandler(10)); // Sequential by default
await for (final value in transformed) {
  print(value); // Output: 10, 20, 30, 40, 50 (processed one at a time)
}
⚡ Processing strategies #
Choose the right strategy for your use case:
final stream = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8]);
// Sequential: process one at a time (default)
stream.isolatedMap(handler); // or .sequential()
// Concurrent: process multiple simultaneously, preserving order
stream.isolatedMap(handler, 
  strategy: IsolatedProcessingStrategy.concurrent(concurrency: 3));
// Droppable: drop new events when at max concurrency
stream.isolatedMap(handler,
  strategy: IsolatedProcessingStrategy.droppable(concurrency: 2));
// Restartable: cancel previous work when new events arrive
stream.isolatedMap(handler,
  strategy: IsolatedProcessingStrategy.restartable());
🎱 Isolate pooling #
Distribute work across multiple isolates for maximum performance:
final bigStream = Stream.fromIterable(List.generate(1000, (i) => i));
// Use multiple isolates for high-throughput processing
final processed = bigStream.isolatedMap(
  HeavyComputationHandler(),
  strategy: IsolatedProcessingStrategy.concurrent(
    concurrency: 8,  // 8 total concurrent operations
    isolates: 4,     // Work distributed among 4 isolates 💪
  ),
);
// Scales beautifully with available CPU cores!
print('Processed ${await processed.length} items lightning fast! ⚡');
🌐 Async operations #
Async operations work seamlessly:
class HeavyApiDataParser extends IsolatedHandler<String, Map<String, dynamic>> {
  @override
  Future<Map<String, dynamic>> compute(String endpoint) async {
    final response = await http.get(Uri.parse(endpoint));
    return json.decode(response.body);
  }
}
// Fetch and parse data from multiple APIs concurrently
final apis = Stream.fromIterable([
  'https://example.com/data-1',
  'https://example.com/data-2', 
  'https://example.com/data-3'
]);
final responses = apis.isolatedMap(
  HeavyApiDataParser(),
  strategy: IsolatedProcessingStrategy.concurrent(concurrency: 3),
);
⚠️ Important notes #
📦 Serialization requirements #
Data must be sendable through Dart's SendPort mechanism for inter-isolate communication. Since isolates created with Isolate.spawn share the same code, most objects are sendable:
✅ Always sendable:
- Primitives: null,true,false,int,double,String
- Collections: List,Map,LinkedHashMap,Set,LinkedHashSet(with sendable contents)
- Typed data: TransferableTypedDataand related types
- Special types: Capability,SendPort,Typeinstances
- Custom objects: Any class instance (with exceptions below)
❌ Cannot be sent:
- Objects with native resources: Socket,File,HttpClient, etc.
- Isolate infrastructure: ReceivePort,DynamicLibrary
- Finalizers: Finalizable,Finalizer,NativeFinalizer
- VM internals: UserTag,MirrorReference
- Classes marked with @pragma('vm:isolate-unsendable')
⚠️ Closures limitation: Functions and closures may capture more state than needed due to VM implementation, potentially causing larger object graphs than expected (https://github.com/dart-lang/sdk/issues/36983).
ℹ️ Performance tips #
- Use for CPU-heavy work: There's communication overhead, so only use for intensive computations
📚 Overview (API reference) #
Creating handlers #
class MyHandler extends IsolatedHandler<InputType, OutputType> {
  @override
  OutputType compute(InputType input) {
    // Your transformation logic here
    return transformedOutput;
  }
}
Using isolatedMap #
stream.isolatedMap(
  handler,           // Your IsolatedHandler instance
  strategy: IsolatedProcessingStrategy.concurrent(
    concurrency: 4,  // How many operations to run in parallel
    isolates: 2,     // How many isolates to distribute work across
  ),
)
Strategies #
- sequential(): Process one at a time (default)
- concurrent(concurrency, isolates): Process multiple simultaneously
- droppable(concurrency, isolates): Drop events when busy
- restartable(concurrency, isolates): Cancel previous work for new events
Returns: Stream<E> with transformed elements in original order
📄 License #
MIT License - see the LICENSE file for details.