pub package Dart License Build Status Buy Me A Coffee

Stem

Stem is a Dart-native background job platform. It gives you Celery-style task execution with a Dart-first API, Redis Streams integration, retries, scheduling, observability, and security tooling—all without leaving the Dart ecosystem.

Install

dart pub add stem           # core runtime APIs
dart pub add stem_redis     # Redis broker + result backend
dart pub add stem_postgres  # (optional) Postgres broker + backend
dart pub global activate stem_cli

Add the pub-cache bin directory to your PATH so the stem_cli tool is available:

export PATH="$HOME/.pub-cache/bin:$PATH"
stem --help

Quick Start

import 'dart:async';
import 'package:stem/stem.dart';
import 'package:stem_redis/stem_redis.dart';

class HelloTask implements TaskHandler<void> {
  @override
  String get name => 'demo.hello';

  @override
  TaskOptions get options => const TaskOptions(
        queue: 'default',
        maxRetries: 3,
        rateLimit: '10/s',
        visibilityTimeout: Duration(seconds: 60),
      );

  @override
  Future<void> call(TaskContext context, Map<String, Object?> args) async {
    final who = args['name'] as String? ?? 'world';
    print('Hello $who (attempt ${context.attempt})');
  }
}

Future<void> main() async {
  final registry = SimpleTaskRegistry()..register(HelloTask());
  final broker = await RedisStreamsBroker.connect('redis://localhost:6379');
  final backend = await RedisResultBackend.connect('redis://localhost:6379/1');

  final stem = Stem(broker: broker, registry: registry, backend: backend);
  final worker = Worker(broker: broker, registry: registry, backend: backend);

  unawaited(worker.start());
  await stem.enqueue('demo.hello', args: {'name': 'Stem'});
  await Future<void>.delayed(const Duration(seconds: 1));
  await worker.shutdown();
  await broker.close();
  await backend.close();
}

Features

  • Task pipeline – enqueue with delays, priorities, idempotency helpers, and retries.
  • Workers – isolate pools with soft/hard time limits, autoscaling, and remote control (stem worker ping|revoke|shutdown).
  • Scheduling – Beat-style scheduler with interval/cron/solar/clocked entries and drift tracking.
  • Observability – Dartastic OpenTelemetry metrics/traces, heartbeats, CLI inspection (stem observe, stem dlq).
  • Security – Payload signing (HMAC or Ed25519), TLS automation scripts, revocation persistence.
  • Adapters – In-memory drivers included here; Redis Streams and Postgres adapters ship via the stem_redis and stem_postgres packages.
  • Specs & tooling – OpenSpec change workflow, quality gates (tool/quality/run_quality_checks.sh), chaos/regression suites.

Documentation & Examples

Running Tests Locally

Start the dockerised dependencies and export the integration variables before invoking the test suite:

source packages/stem_cli/_init_test_env
dart test

The helper script launches packages/stem_cli/docker/testing/docker-compose.yml (Redis + Postgres) and populates STEM_TEST_* environment variables needed by the integration suites.

Adapter Contract Tests

Stem ships a reusable adapter contract suite in packages/stem_adapter_tests. Adapter packages (Redis broker/postgres backend, SQLite adapters, and any future integrations) add it as a dev_dependency and invoke runBrokerContractTests / runResultBackendContractTests from their integration tests. The harness exercises core behaviours—enqueue/ack/nack, dead-letter replay, lease extension, result persistence, group aggregation, and heartbeat storage—so all adapters stay aligned with the broker and result backend contracts. See test/integration/brokers/postgres_broker_integration_test.dart and test/integration/backends/postgres_backend_integration_test.dart for reference usage.

Libraries

stem