processQueuesIsolate method
Implementation
Future processQueuesIsolate() async {
print("preparing queues(${allQueueProcessors.allClassNames.length}).");
amqp.ConnectionSettings settings = amqp.ConnectionSettings(host: config.getRequired<String>('amqp.host'), port: config.getRequired<int>('amqp.port'));
amqpClient = amqp.Client(settings: settings);
amqp.Channel channel = await amqpClient!.channel();
//TODO: configurable concurrent processors
channel = await channel.qos(0, 6);
int serviceId = config.getRequired<int>('service_id');
amqpConsumers = [];
for (var processorName in allQueueProcessors.allClassNames) {
var processor = createQueueProcessor(processorName);
amqp.Queue amqpQueue = await channel.queue(processor.queue.queueName);
amqp.Consumer consumer = await amqpQueue.consume(noAck: false);
amqpConsumers.add(consumer);
consumer.listen((amqp.AmqpMessage message) async {
var processor = createQueueProcessor(processorName);
int start = DateTime.now().millisecondsSinceEpoch;
processor.stats = Stats(config, serviceId, 'queue', processor.className);
try {
var decodedMessage = json.decode(message.payloadAsString);
await processor.processMessage(decodedMessage);
} catch (error, stacktrace) {
await processor.logger.handleError('queue.${processor.queue.className}', error, stacktrace, requestBody: message.payloadAsString);
}
await processor.db.query('INSERT INTO run_queues SET app_id = ?, queue = ? ON DUPLICATE KEY UPDATE process_count=process_count+1, last_process=NOW()', [
serviceId,
processor.queue.className,
]);
int timeMs = DateTime.now().millisecondsSinceEpoch - start;
await processor.db.disconnect();
processor.stats?.saveStats(processor.db.counter, timeMs);
message.ack();
});
}
}