RabbitMQ. Работа с очередями



Перевод второго из шести уроков c официального сайта RabitMQ.
В первом уроке мы писали программу для отправки и получения сообщений из именованной очереди.

В этом уроке мы создадим рабочую очередь, которая будет использоваться для распределения трудоемких задач между несколькими работниками.
Пример работы с брокером сообщений будет разобран на языке PHP и уже предполагает, что RabbitMQ установлен и работает на localhost на стандартном порту (5672). Если вы используете другой хост, порт или учетные данные, настройки соединений потребуют корректировки. Если RabbitMQ сервер не установлен, то вы можете перейти по ссылке и посмотреть как установить RabbitMQ.

Основная идея Work Queues - рабочих очередей (так-же известные как: Task Queues)-избежать немедленного выполнения ресурсоемкой задачи и ждать ее завершения. Вместо этого мы планируем выполнить ресурсоемкую задачу позже. Мы инкапсулируем задачу в виде сообщения и отправляем его в очередь. Рабочий процесс, запущенный в фоновом режиме, будет запускать и выполнить поставленные задачи. И когда вы запустите множество воркеров, то задачи будут распределены и разделены между ними.

Эта концепция особенно полезна в web-приложениях, где невозможно обработать сложную задачу во время короткого срока HTTP запроса.

Подготовка


В предыдущей части этого урока мы отправили сообщение, содержащее " Hello World!". Теперь мы будем отправлять строки, которые будут имитировать сложные задачи. К сожалению у нас нет реально сложной задачи, такой как работа с изображениями или формирование крупного PDF файла, поэтому мы выполним имитацию сложной задачи с помощью функции sleep(). Мы будет определять сложность задачи по количеству точек. Каждая точка будет составлять одну секунду "работы". Например: задача, которая отображает слово Hello... - займет три секунды.

Мы немного изменим код файла send.php из нашего предыдущего примера, чтобы реализовать отправку произвольных сообщений из командной строки.
Эта программа будет планировать задачи в нашей рабочей очереди. Дадим ей наименование: new_task.php.
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo ' [x] Sent ', $data, "\n";

Наш старый receive.php скрипт также требует некоторых изменений. В новый файл receive.php необходимо добавить функцию sleep() для имитации выполнения сложной задачи. Новый receive.php аналогично будет выводить сообщения из очереди и выполнять задачу. Дадим ему наименование: worker.php и добавим в него код:
$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

Обратите внимание, что данная поддельная задача имитирует время выполнения.

Запустить воркера-получателя:
php worker.php

Запустить задачу отправителя:
php new_task.php // Сложная задача, выполнение которой занимает пару секунд 

Round-robin диспетчеризация


Одним из преимуществ использования очереди задач является возможность легко распараллеливать работу.
Если мы создаем задержку в работе, то мы можем просто добавить больше воркеров и таким образом легко масштабироваться.
Давайте попробуем запустить 2 воркера worker.php. Для этого в 2х разных консолях выполните:
php worker.php

В третьей консоли у вас должен быть запущен отправитель new_task.php, где вы можете запустить на публикацию несколько сообщений:
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

После запуска публикаций сообщений откройте воркеров и посмотрите на них.
Вы должны увидеть приблизительно следующие:
  • Воркер номер 1 -
    php worker.php
    # => [*] Waiting for messages. To exit press CTRL+C
    # => [x] Received 'First message.'
    # => [x] Received 'Third message...'
    # => [x] Received 'Fifth message.....'

  • Воркер номер 2 -
    php worker.php
    # => [*] Waiting for messages. To exit press CTRL+C
    # => [x] Received 'Second message..'
    # => [x] Received 'Fourth message....'


По умолчанию RabbitMQ будет отправлять каждое сообщение к следующему получателю, в определенной последовательности. В среднем каждый получатель получит одинаковое количество сообщений. Такой способ распространения сообщений называется циклическим. Попробуйте выполните тоже самое, только с 3 или более отправителями.

Подтверждение сообщений


Выполнение задачи может занять несколько секунд. Вы можете задаться вопросом, что происходит в том случае, если один из получателей начинает выполнять долгую или сложную задачу и данная задача не выполняется или выполняется только частично? В текущем примере, как только RabbitMQ доставляет сообщение получателю, он сразу же помечает его для удаления. В таком случае, если вы убьете воркера, то потеряете сообщение, которое он только что обрабатывал. Так-же будут потеряны все сообщения, которые были отправлены этому конкретному воркеру, но еще не были обработаны.

Но я думаю, что мы бы не хотели потерять ни одного сообщения из очереди. И если воркер завершил свою работу, то мы бы хотели передать задачу другому воркеру.

Для того, чтобы убедиться, что сообщение никогда не будет потеряно. В RabbitMQ есть поддержка подтверждения сообщений.
Подтверждение (nowledgement) - отправляется получателем обратно, для того, чтобы сообщить RabbitMQ, что конкретное сообщение было получено, обработано и что RabbitMQ может удалить его.

Если получатель умирает (его канал закрыт, соединение закрыто или TCP-соединение потеряно) без отправки подтверждения, RabbitMQ поймет, что сообщение не было обработано полностью, и снова поставит его в очередь. Если есть доступ к другим получателям, то RabbitMQ быстро передаст задачу другому получателю. Таким образом, вы можете быть уверены, что сообщение не будет потеряно даже в том случае, если все получатели или их часть будет не доступна.

У сообщений отсутствуют тайм-ауты. И если получатель умрет, то RabbitMQ повторно добавит сообщение в очередь. Это нормально даже в том случае, если обработка сообщений занимает очень много времени.

По умолчанию подтверждения сообщений отключены. Пришло время включить их, установив четвертый параметр basic_consume в false (true означает отсутствие подтверждения) и отправить надлежащее подтверждение от воркера, как только мы закончим с задачей.
$callback = function ($msg) {
  echo ' [x] Received ', $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

Используя этот код, мы можем быть уверены, что даже если вы отключите работника с помощью CTRL+C во время обработки сообщения, ничего не будет потеряно. Вскоре после смерти работника все неподтвержденные сообщения будут переданы повторно в очередь.

Подтверждение должно быть отправлено по тому же каналу, который получил доставку. Любая попытка подтвердить использование другого канала приведет к исключению протокола канального уровня. Для того, чтобы узнать больше см.руководство.

Потерянное подтверждение


Пропуск подтверждения, является распространенной ошибкой. Получить эту ошибку очень просто, но последствия могут быть очень серьезными. Сообщения будут повторно доставлены, когда ваш клиент завершит работу (это может выглядеть как случайная повторная поставка), но RabbitMQ будет использовать все больше и больше памяти, так как он не сможет выполнить обработку и доставку сообщений

Для отладки такого рода ошибок можно использовать rabbitmqctl для отображения информации messages_unacknowledged:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Тоже самое в Windows, Только без sudo
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Долговечность сообщения


Только что мы узнали как защитить задачу от потери, после того как получатель умирает. Но наши задачи все равно будут потеряны после остановки или перезагрузки сервера RabbitMQ.

Когда RabbitMQ завершает работу или происходит сбой, то он забывает очереди и сообщения. Для того, чтобы никогда не потерять сообщения и очереди, нужно отметить очередь как долговечную.
Для этого нужно методу queue_declare третьим параметром передать значение true:
$channel->queue_declare('hello', false, true, false, false);

Хотя эта команда верна сама по себе,но она не будет работать при текущих настройках. Все этого потому, что очередь с именем hello уже определена как не долговечная. RabbitMQ не позволяет переопределить существующую очередь с разными параметрами и вернет ошибку любой программе, которая попытается это сделать. Но есть простой и быстрый способ обойти эту проблему. Для этого нужно переименовать текущую очередь. Давайте назовем нашу новую очередь как: task_queue.
$channel->queue_declare('task_queue', false, true, false, false);

Данный флаг, установленный в true, должен быть применен как к коду отправителя, так и к коду получателя.

На данный момент мы уверены, что очередь task_queue не будет потеряна, даже если RabbitMQ перезапустится. Теперь нам нужно отметить наши сообщения как постоянные, установив свойство delivery_mode = 2, которое AMQPMessage принимает как часть массива свойств.
$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);

Обратите внимание на стойкость сообщений


Пометка сообщений как постоянных не полностью гарантирует, то что сообщение не будет потеряно. Хотя он говорит RabbitMQ сохранить сообщение на диск, есть еще короткое окно времени, когда RabbitMQ принял сообщение и еще не сохранил его. Кроме того, RabbitMQ не делает fsync(2) для каждого сообщения. Сообщение может просто сохранено в кэше и не записано на диск.
Гарантия стойкости не большая, но этого более чем достаточно для нашей простой очереди задач, приведенной в примере. Если вам нужно больше гарантий, то вы можете почитать publisher confirms

Справедливая отправка


Возможно, вы заметили, что диспетчеризация все еще работает не совсем так, как мы хотим. Например, в ситуации с двумя воркерами, один воркер будет постоянно занят, а другой почти не будет работать. Но RabbitMQ ничего об этом не знает и все равно будет отправлять сообщения равномерно в том же порядке.

Это происходит потому, что когда RabbitMQ отправляет сообщение в очередь, то не смотрит на количество неподтвержденных сообщений для получателя. Он просто слепо отправляет каждое n-е сообщение n-му потребителю.

Для того, чтобы избавиться от этой проблемы, мы должны воспользоваться методом basic_qos и передать ему параметр prefetch_count = 1. Это говорит RabbitMQ не давать более одного сообщения работнику за раз. Или, другими словам: "Не отправляйте новое сообщение работнику, пока он не обработает и не подтвердит предыдущее". Вместо этого он отправит его следующему работнику, который еще не занят.
$channel->basic_qos(null, 1, null);

Примечание о размере очереди


Если все работники заняты, ваша очередь может заполниться. В этом случае, возможно придется добавить еще несколько воркеров или разработать какую то другую стратегию.

Собрать все воедино.


Окончательный код нашего new_task.php файла:

И наш worker.php

С помощью подтверждений сообщений и предварительной выборки можно настроить рабочую очередь. Параметры долговечности позволяют задачам выжить, даже в том случае, если RabbitMQ перезапущен.
Информация
Посетители, находящиеся в группе Гости, не могут оставлять комментарии к данной публикации.