Well well well. It’s time for Christmas!
And as a gift I would like to explain few things about AMQP brokers and how we use RabbitMQ on our Magento 2 CE websites.
How does AMQP work?
First of all, AMQP stands for Advanced Message Queuing Protocol. It is a protocol, brought to you by JP Morgan Bank in the first place.
The idea is simple: send a Message from an application to another.
You need to understand few things before we continue.
To send a Message you use an application: the Publisher or Producer. To receive and process a Message you use another application: the Worker or Consumer. In this article I’ll use the words Publisher and Worker.
An idea grew up in my mind that a Worker is the processus which runs the Consumer (the script). But it’s the “same”.
Of course you can have as many Publishers and Workers as you want. And sometimes the Worker is also a Publisher. Wait… what? You’ll see.
To be sure that a Worker will receive the Messages it understands, it has to listen to a specific place: the Queue. Its own Queue in fact. Because most of the time the Worker creates the Queue it will listen to.
So, we can say that the Publisher has to send the Messages to the Queues then the Workers will consume the Messages from the Queues. But how your Publisher decides in wish Queue to send a Message to be sure that the correct Worker will get it?
That’s where the Exchange takes place.
The role of the Exchange is to route a Message to the right Queue (or to many Queues… depends on the Exchange’s type). According to that, when a Queue is created, we bind the Queue to the Exchange. When done, your Exchange will be aware of the existence of this Queue (better than an orphan Queue). Are you still following?
So. We do it again: the Publisher sends a Message to the Exchange, which routes the Message to a Queue (which is binded to the Exchange itself). The Worker which is listening to the Queue will get the Message and consume it.
The Exchange can do a lot of things, like send a Message to more than one Queue, or delay the Message. You can ask the Exchange to wait 10 seconds before sending the Message to the right Queue.
You can also ask the Exchange to send the Message to all the Queues, etc.
All this magic between the applications is done by the Broker. For us the Broker is RabbitMQ. But you can choose any AMQP broker, like ZeroMQ, ActiveMQ or even Qpid.
When should I use RabbitMQ?
An AMQP broker is used to perform asynchronous operations. So if you think you can process something at a different time, like sending an email, then you can use an AMQP broker, for us: RabbitMQ!
A lot of examples can be taken to explain when you can, or should use RabbitMQ:
- Write logs (it is still better to use rsyslog for that!).
- Request some APIs.
- Make more than 1 API call at the same time.
- Generate files like PDF or Excel reports…
- Delayed complex calculations.
- …
Asynchronous can be useful in different situations.
Today we are going to perform a simple discussion with some delay between messages without blocking our script:
Bob: Hi!
Bob: I'm Bob. I'm from Paris.
Alice: Hello Bob! Nice to meet you.
Alice: I'm Alice. What are you doing here?
Bob: I'm learning RabbitMQ. Voilà !
Alice: Sounds great!
How can I use RabbitMQ with Magento 2?
Get a running instance of RabbitMQ
First of all you’ll need to install RabbitMQ on your platform.
If you need an easy setup you can use the Docker Image monsieurbiz/rabbitmq:3.7.0-management
.
docker run --rm -p "15672:15672" -p "5672:5672" -v rabbitmq_data:/var/lib/rabbitmq:rw monsieurbiz/rabbitmq:3.7.0-management
We use the management
interface enabled version. It is really nice to have an UI in the project when you play with RabbitMQ. And our image provides RabbitMQ with the plugin rabbitmq_delayed_message_exchange
out of the box.
You can still use the official Docker image.
The port 5672
is used by the AMQP protocol. And the 15672
is our web UI: http://localhost:15672. The credentials are guest
for both user and password by default.
You can find all the Docker images on Docker Hub, and the installation instruction on the RabbitMQ’s website directly.
Install the AMQP Module
I won’t say that you should use your own AMQP module to use RabbitMQ as we already have published our own module at Monsieur Biz to manage RabbitMQ.
Of course you can install it using composer: composer require monsieurbiz/amqp
Create your first exchange
This step is really easy. We just need a x-delayed-type
type for our exchange because we want to perform a discussion with time between messages.
magento monsieurbiz:amqp:exchange:create --delayed consume-me
We use the consume-me
Exchange name because we will use the sample worker given with the module.
Create your Worker
I won’t explain how to create a worker. Take a look to the Worker given as example in the module source code.
I’ll just give you one tip: Your Worker will be disconnected to the database if it is asleep, doing nothing, just waiting for the next message for hours now…
It means that if your Worker is there to perform some database operations, it will exit with a Connection Lost error when the next message is ready to be consumed.
To avoid that we created a simple method: _tryAndReconnect()
. This method tries to connect to the database. If it success then it’s fine. But if it doesn’t then we reconnect the Worker to the database.
You just have to perform the call each time your Worker does its job.
Create a command to perform the art
To create your command you can use magerun! It is really easy:
$ magerun dev:console
>>> make:module Firegento_Demo
Module: Firegento_Demo >>> make:command discuss
Now you have your firegento:demo:discuss
command available.
<?php
namespace Firegento\Demo\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use MonsieurBiz\Amqp\Helper\Amqp;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class DiscussCommand extends Command
{
/**
* @var Amqp
*/
private $amqp;
/**
* @param Amqp $amqp Amqp Helper
* @param string|null $name
*/
public function __construct(Amqp $amqp, string $name = null)
{
parent::__construct($name);
$this->amqp = $amqp;
}
public function configure()
{
$this->setName('firegento:demo:discuss');
$this->setDescription('firegento:demo:discuss');
}
/**
* @param InputInterface $input An InputInterface instance
* @param OutputInterface $output An OutputInterface instance
* @return null|int null or 0 if everything went fine, or an error code
*/
public function execute(\Symfony\Component\Console\Input\InputInterface $input, \Symfony\Component\Console\Output\OutputInterface $output)
{
$messages = [
4000 => "Alice: Hello Bob! Nice to meet you.",
1000 => "Bob: I'm Bob. I'm from Paris.",
8000 => "Bob: I'm learning RabbitMQ. Voilà !",
0 => "Bob: Hi!",
10000 => "Alice: Sounds great!",
6000 => "Alice: I'm Alice. What are you doing here?",
];
foreach ($messages as $delay => $message) {
try {
$this->amqp->sendMessage('consume-me', new AMQPMessage($message, [
'application_headers' => new AMQPTable([
'x-delay' => $delay
])
]), [], Amqp::MODE_BATCH);
} catch (\Exception $e) {
$output->writeln("<error>{$e->getMessage()}</error>");
}
}
$this->amqp->sendBatch();
return 0;
}
}
This class is pretty simple.
First we need the helper from the module to send our messages, using the constructor (and the Magento dynamic DI).
Then we need to fill the execute
method with our code.
We define the messages, and we send them. You can notice that the messages are in a total disorder.
I used the sendMessage
method with the Amqp::MODE_BATCH
to avoid calling 6 times the network to send our messages. I do it just once with the $this->amqp->sendBatch();
call.
Let’s talk!
You have to open 2 terminals.
On the first one you run the Worker/Consumer:
magento monsieurbiz:amqp:consumer:sample
On the second one you run the discussion:
magento firegento:demo:discuss
The command above should not take more than a second to execute. On the first terminal you should see a discussion happening for the next 10 seconds.
You did it!
As you can see, it is really simple.
If you run more than one Worker you can see some magic there. The Exchange is doing a great job!
I heard about RPC, what’s that!?
Hum… RPC is like synchronous messaging using asynchronous messaging. Yes yes.
The first Application sends a Message and wait for the Response.
But the Response can be given by any Worker listening (via a Queue) to the correct Exchange.
Let’s say we want to make 3 API calls. But we want to perform this magic in a 1 call time. See?
3 calls in 1 call time. It is like reducing the call time by 3.
For that we need 3 workers at least. Then we send 3 messages at the same time, and we wait for the Responses:
/* @var $rpc \MonsieurBiz\Amqp\Helper\Amqp\Rpc */
$r1 = $rpc->request('call-me-maybe', ['Rouven is awesome.']);
$r2 = $rpc->request('call-me-maybe', ['He loves to dance.']);
$r3 = $rpc->request('call-me-maybe', ['And Christmas of course.']);
$responses = $rpc->getResponses();
var_dump($responses[$r1]); // string(20) "["Rouven is awesome."]"
var_dump($responses[$r2]); // string(21) "["He loves to dance."]"
var_dump($responses[$r3]); // string(21) "["And Christmas of course."]"
That’s it!
I wish you a Happy Christmas to all of you!
And like my friend Maxime says it often these past days: « Be Creole! The Earth turns, the sun rises, life is beautiful! »