processQueuesIsolate method

Future processQueuesIsolate()

Implementation

Future processQueuesIsolate() async {
  print("preparing queues(${allQueueProcessors.allClassNames.length}).");
  amqp.ConnectionSettings settings = amqp.ConnectionSettings(host: config.getRequired<String>('amqp.host'), port: config.getRequired<int>('amqp.port'));
  amqpClient = amqp.Client(settings: settings);
  amqp.Channel channel = await amqpClient!.channel();
  //TODO: configurable concurrent processors
  channel = await channel.qos(0, 6);
  int serviceId = config.getRequired<int>('service_id');
  amqpConsumers = [];
  for (var processorName in allQueueProcessors.allClassNames) {
    var processor = createQueueProcessor(processorName);
    amqp.Queue amqpQueue = await channel.queue(processor.queue.queueName);
    amqp.Consumer consumer = await amqpQueue.consume(noAck: false);
    amqpConsumers.add(consumer);
    consumer.listen((amqp.AmqpMessage message) async {
      var processor = createQueueProcessor(processorName);
      int start = DateTime.now().millisecondsSinceEpoch;
      processor.stats = Stats(config, serviceId, 'queue', processor.className);
      try {
        var decodedMessage = json.decode(message.payloadAsString);
        await processor.processMessage(decodedMessage);
      } catch (error, stacktrace) {
        await processor.logger.handleError('queue.${processor.queue.className}', error, stacktrace, requestBody: message.payloadAsString);
      }
      await processor.db.query('INSERT INTO run_queues SET app_id = ?, queue = ? ON DUPLICATE KEY UPDATE process_count=process_count+1, last_process=NOW()', [
        serviceId,
        processor.queue.className,
      ]);
      int timeMs = DateTime.now().millisecondsSinceEpoch - start;
      await processor.db.disconnect();
      processor.stats?.saveStats(processor.db.counter, timeMs);
      message.ack();
    });
  }
}