Как мы видели в предыдущей статье этой серии, очередь в памяти может быть полезна для оптимизации рабочей нагрузки и повторных попыток восстановления после ошибок, но с ней возникают некоторые проблемы.

Если в течение некоторого времени производство сообщений превышает рабочую мощность, очередь может начать занимать слишком много памяти.

Кроме того, если процесс Node.js выйдет из строя, вы потеряете всю незавершенную работу, которую он выполнял. Это может создать или не создать проблему, в зависимости от приложения.

Извлекая рабочую очередь из памяти и помещая ее в постоянное хранилище, мы можем избежать этих проблем: сохраняя в памяти только текущую работу, мы можем компенсировать рабочие пики, поскольку они будут записаны на диск. Кроме того, при запуске процесса Node.js любая незавершенная работа возобновляется.

В качестве примера предположим, что вы строите домашнюю систему сигнализации и хотите передавать каждое событие (выключение сигнализации, включение сигнализации, открытие двери, оповещение и т. д.) в удаленную службу для хранения и использования в будущем. целей аудита. Иногда эта удаленная служба может быть недоступна или недоступна, но вы все равно хотите, чтобы события в конце концов — извините за каламбур — попадали туда.

Давайте затем создадим модуль, который создает и экспортирует постоянную очередь, в которой будут храниться события безопасности. Этот модуль также будет отвечать за ретрансляцию этих событий в удаленную службу.

event_relay.js:

var level = require('level');  
var db = level('./db');
var Jobs = require('level-jobs');
var maxConcurrency = 1;  
var queue = Jobs(db, worker, maxConcurrency);
module.exports = queue;
function worker(event, cb) {  
  sendEventToRemoteService(event, function(err) {
    if (err) console.error('Error processing event %s: %s', event.id, err.message);
    else console.log('event %s successfully relayed', event.id);
    cb(err);
  });
}

function sendEventToRemoteService(event, cb) {  
  setTimeout(function() {
    var err;
    if (Math.random() > 0.5) err = Error('something awful has happened');
    cb(err);
  }, 100);
}

Здесь мы начинаем с создания базы данных LevelDB, в которой все события будут постоянно храниться в папке с именем db внутри текущего каталога:

var level = require('level');  
var db = level('./db');

LevelDB — это универсальное хранилище ключей и значений, но здесь мы не будем использовать его напрямую; вместо этого мы передадим его пакету level-jobs, который реализует рабочую очередь поверх базы данных LevelDB:

var Jobs = require('level-jobs');
var maxConcurrency = 1;  
var queue = Jobs(db, worker, maxConcurrency);

Здесь мы создаем очередь заданий, предоставляя базу данных LevelDB, которую мы создали ранее, и рабочую функцию, а также определяем максимальный параллелизм, равный 1. Рабочая функция очень похожа на рабочую функцию async.queue: она принимает единицу работы как первый аргумент и функцию обратного вызова в качестве второго аргумента:

function worker(event, cb) {  
  sendEventToRemoteService(event, function(err) {
    if (err) console.error('Error processing event %s: %s', event.id, err.message);
    else console.log('event %s successfully relayed', event.id);
    cb(err);
  });
}

Рабочая функция просто пытается отправить рабочую единицу (в нашем случае событие) в удаленную службу хранения событий, используя эту функцию sendEventToRemoteService. Если отправка по какой-то причине не удалась, наш обратный вызов вызывается с ошибкой, которую мы распространяем на обратный вызов рабочего процесса. Когда обратный вызов работника вызывается с ошибкой, level-jobs повторяет попытки до тех пор, пока не увенчается успехом (до предварительно определенного максимального количества попыток с использованием внутреннего алгоритма отсрочки, как в предыдущем разделе).

Затем мы моделируем ошибки доставки событий с вероятностью 50% в поддельной реализации функции sendEventToRemoteService:

function sendEventToRemoteService(event, cb) {  
  setTimeout(function() {
    var err;
    if (Math.random() > 0.5) err = Error('something awful has happened');
    cb(err);
  }, 100);
}

Давайте затем создадим производителя событий, чтобы мы могли протестировать наш модуль ретрансляции событий:

event_producer.js:

var relay = require('./event_relay');
for(var i = 0 ; i < 10; i ++) {  
  relay.push({id: i, event: 'door opened', when: Date.now()});
}

и запустите его:

$ node event_producer
event 0 successfully relayed  
event 1 successfully relayed  
event 2 successfully relayed  
event 3 successfully relayed  
Error processing event 4: something awful has happened  
Error processing event 4: something awful has happened  
Error processing event 4: something awful has happened  
Error processing event 4: something awful has happened  
event 4 successfully relayed  
Error processing event 5: something awful has happened  
Error processing event 5: something awful has happened  
Error processing event 5: something awful has happened  
event 5 successfully relayed  
event 6 successfully relayed  
Error processing event 7: something awful has happened  
event 7 successfully relayed  
Error processing event 8: something awful has happened  
event 8 successfully relayed  
Error processing event 9: something awful has happened  
event 9 successfully relayed

По выходным данным мы видим, что, несмотря на то, что некоторые из них потерпели неудачу, все события в конечном итоге были переданы. Кроме того, установив для параллелизма значение 1, мы гарантируем, что в любой момент времени обрабатывается не более одной ожидающей рабочей единицы, независимо от скорости создания заданий, что позволяет экономить память и оптимизировать рабочую нагрузку.

Следующая статья

До сих пор мы видели полезность создания локальной очереди для оптимизации рабочей нагрузки и сохранения работы во время перезапуска процесса. Однако, в зависимости от типа работы, которую вы выполняете, у этого подхода все еще есть некоторые проблемы: если эта работа так или иначе интенсивно использует ЦП, вам может потребоваться передать работу набору внешних рабочих процессов.

Вы можете найти предыдущую публикацию о шаблонах очереди работ здесь:

В следующей статье этой серии мы рассмотрим, как мы можем использовать службу очередей для распределения нагрузки между несколькими процессами узла.

Эта статья взята из книги Work Queues, книги из серии Node Patterns.

Написано Педро Тейшейрой — опубликовано для YLD.

Вам также может понравиться: