worker_pool 0.0.1
worker_pool: ^0.0.1 copied to clipboard
A Dart/Flutter package for managing a pool of isolates to execute tasks concurrently, with support for resource management, constant sharing, and proper cleanup.
Worker Pool #
A Dart/Flutter package for managing a pool of isolates to execute tasks concurrently, with support for resource management, constant sharing, and proper cleanup.
中文版 README
Features #
- Isolate Pool Management: Efficiently manages a pool of Dart isolates for concurrent task execution
- Resource Management: Initialize and clean up resources in isolates with custom finalizers
- Constant Sharing: Share constants between the main thread and isolates
- Task Queue: Automatically queues tasks when all isolates are busy
- Timeout Handling: Configurable timeouts for task execution
- Health Monitoring: Automatic worker restart based on task count or inactivity
- Function Registration: Register functions that can be executed in isolates
- Statistics: Get pool statistics for monitoring performance
Getting Started #
Prerequisites #
- Dart SDK ^3.8.1
- Flutter >=1.17.0 (for Flutter projects)
Installation #
Add worker_pool to your pubspec.yaml:
dependencies:
worker_pool:
path: /path/to/worker_pool
Or if published to pub.flutter-io.cn:
dependencies:
worker_pool: ^0.0.1
Usage #
Basic Setup #
import 'package:worker_pool/worker_pool.dart';
// Initialize the worker pool
final config = WorkerPoolConfig(
poolSize: 4, // Number of isolates to maintain
isolateTimeout: Duration(seconds: 30), // Timeout for tasks
);
await WorkerPool.initialize(config);
// Get the instance
final pool = WorkerPool.instance;
Registering Functions #
To execute functions in isolates, you need to register them during initialization:
// Define a function that can be executed in an isolate
Future<int> computeSquare(int number) async {
// Simulate some computation
await Future.delayed(Duration(milliseconds: 100));
return number * number;
}
// Register the function during initialization
final config = WorkerPoolConfig(
poolSize: 4,
predefinedFunctions: {
'computeSquare': computeSquare,
},
);
await WorkerPool.initialize(config);
Executing Tasks #
// Submit a task to the pool
try {
final result = await pool.submit<int, int>('computeSquare', 5);
print('Result: $result'); // Output: Result: 25
} catch (e) {
print('Task failed: $e');
}
Using Constants #
Share constants between the main thread and isolates:
// Define a constant provider
Future<Map<String, dynamic>> provideConstants() async {
return {
'apiUrl': 'https://api.example.com',
'timeout': 5000,
};
}
// Register the constant provider
final config = WorkerPoolConfig(
poolSize: 4,
constantProviders: [provideConstants],
predefinedFunctions: {
'computeWithConstants': computeWithConstants,
},
);
await WorkerPool.initialize(config);
In the isolate function:
Future<String> computeWithConstants(String input) async {
final apiUrl = IsolateConstantManager.getConstant<String>('apiUrl');
final timeout = IsolateConstantManager.getConstant<int>('timeout');
// Use the constants in your computation
return '$input processed with API: $apiUrl and timeout: $timeout';
}
Resource Management #
Initialize and clean up resources in isolates:
// Define a resource initializer
Future<Map<String, dynamic>?> initializeDatabase() async {
// Initialize database connection
final db = await Database.connect('database_url');
return {
'database': db,
};
}
// Define a resource finalizer
Future<void> finalizeDatabase(dynamic resource) async {
if (resource is Database) {
await resource.close();
}
}
// Register resource management
final config = WorkerPoolConfig(
poolSize: 4,
resourceInitializers: [initializeDatabase],
resourceFinalizers: {
'database': finalizeDatabase,
},
);
await WorkerPool.initialize(config);
Getting Statistics #
Monitor the pool performance:
final stats = pool.getStatistics();
print('Total workers: ${stats['totalWorkers']}');
print('Available workers: ${stats['availableWorkers']}');
print('Busy workers: ${stats['busyWorkers']}');
print('Queued tasks: ${stats['queuedTasks']}');
Cleanup #
Dispose of the pool when done:
await pool.dispose();
Configuration Options #
The WorkerPoolConfig class provides several configuration options:
| Option | Description | Default |
|---|---|---|
poolSize |
Number of isolates to maintain in the pool | 4 |
resourceInitializers |
List of functions to initialize resources in isolates | [] |
resourceFinalizers |
Map of finalizer functions for resource cleanup | {} |
constantProviders |
List of functions to provide constants to isolates | [] |
healthCheckInterval |
Interval for health checks | 5 minutes |
isolateTimeout |
Timeout for task execution | 30 seconds |
maxTasksPerIsolate |
Maximum tasks per isolate before restart | 1000 |
predefinedFunctions |
Map of functions that can be executed in isolates | {} |
Additional Information #
Error Handling #
The worker pool handles errors gracefully:
- Task execution errors are propagated to the caller
- Isolate crashes are automatically recovered by restarting the worker
- Timeouts are handled with
TimeoutException
Performance Considerations #
- Tasks should be CPU-intensive to benefit from isolate-based concurrency
- I/O operations should use async/await and may not benefit from isolates
- Consider the overhead of data serialization when passing arguments to isolates
Testing #
The project includes comprehensive unit tests to ensure functionality and reliability:
flutter test
Contributing #
Contributions are welcome! Please feel free to submit issues and pull requests.
License #
This project is licensed under the MIT License - see the LICENSE file for details.