pushToStream method

Resolvable<Option<Object>> pushToStream(
  1. Result<TData> data, {
  2. bool eagerError = false,
})

Implementation

Resolvable<Option> pushToStream(
  Result<TData> data, {
  bool eagerError = false,
}) {
  UNSAFE:
  {
    final sequencer = SafeSequencer();
    return sequencer.addSafe((prev1) {
      assert(!state.didDispose());
      if (state.didDispose()) {
        return Sync.value(prev1);
      }
      sequencer.addSafe((_) {
        return Resolvable(() {
          if (_streamController.isSome()) {
            _streamController.unwrap().add(data);
          }
          return _initDataCompleter.map(
            (e) => e.resolve(Sync.value(data)).value,
          );
        });
      }).end();
      provideOnPushToStreamListeners().map((listener) {
        sequencer.addSafe((prev2) {
          if (prev2.isErr()) {
            assert(prev2.isErr(), prev2.err().unwrap());
            if (eagerError) {
              return Sync.value(prev2);
            }
          }
          return listener(data).map((e) => prev2).flatten2();
        }).end();
      });
      return Sync.value(prev1);
    });
  }
}