Как работать с очередями (RabbitMQ, Redis)?php-54

Основные понятия очередей

Очереди сообщений - это механизм асинхронной обработки задач, где:

  • Производители (Producers) отправляют сообщения
  • Очереди (Queues) хранят сообщения
  • Потребители (Consumers) обрабатывают сообщения

Преимущества:

  • Разделение компонентов системы
  • Гибкое масштабирование
  • Отказоустойчивость
  • Балансировка нагрузки

RabbitMQ

1. Установка и подключение

composer require php-amqplib/php-amqplib

2. Отправка сообщений

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// Соединение
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// Объявление очереди (если не существует)
$channel->queue_declare('task_queue', false, true, false, false);

// Создание сообщения
$data = ['type' => 'email', 'to' => 'user@example.com'];
$msg = new AMQPMessage(
    json_encode($data),
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

// Отправка
$channel->basic_publish($msg, '', 'task_queue');

// Закрытие соединения
$channel->close();
$connection->close();

3. Получение сообщений

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$callback = function ($msg) {
    $data = json_decode($msg->body, true);
    echo " [x] Received ", $msg->body, "\n";

    // Обработка сообщения
    processMessage($data);

    // Подтверждение обработки
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// Настройка потребителя
$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

// Бесконечный цикл ожидания сообщений
while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

Redis

1. Установка

composer require predis/predis

2. Отправка сообщений

<?php
require 'vendor/autoload.php';
$redis = new Predis\Client();

// Простая очередь
$redis->rpush('emails', json_encode([
    'to' => 'user@example.com',
    'subject' => 'Welcome'
]));

// Отложенные задачи
$redis->zadd('delayed_tasks', time() + 3600, 'task_data');

3. Получение сообщений

<?php
require 'vendor/autoload.php';
$redis = new Predis\Client();

// Блокирующее получение (BLPOP)
while (true) {
    $message = $redis->blpop(['emails'], 0);
    $data = json_decode($message[1], true);
    processEmail($data);
}

Сравнение RabbitMQ и Redis

Характеристика RabbitMQ Redis как очередь
Надежность Высокая (подтверждения, персистентность) Средняя (нет подтверждений)
Сложность Высокая Низкая
Паттерны Поддержка exchange, routing keys Только базовые очереди
Производительность Средняя Очень высокая
Отложенные сообщения Поддержка плагинами Через ZSET

Лучшие практики

  1. Обработка ошибок: Всегда ловите исключения при работе с очередями
  2. Подтверждение обработки: В RabbitMQ используйте manual ack
  3. Персистентность: Включайте durable очереди в RabbitMQ
  4. Мониторинг: Используйте инструменты вроде RabbitMQ Management UI
  5. Масштабирование: Запускайте несколько consumer'ов

Пример обработки воркером

#!/usr/bin/env php
<?php
require __DIR__.'/vendor/autoload.php';

$worker = new Worker();
$worker->start();

Резюмируем:

RabbitMQ - мощное решение для сложных сценариев, Redis - простой и быстрый вариант для базовых очередей. Выбор зависит от требований к надежности и сложности маршрутизации сообщений.