Friday 25 November 2016

Setting up highly available message queues with RabbitMQ and Cent OS 7

Since RabbitMQ runs on Erlang we will need to install it from the epel repo (as well as a few other dependancies):

yum install epel-release erlang-R16B socat python-pip

Download and install rabbitmq:

cd /tmp
wget https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm
rpm -i rabbitmq-server-3.6.6-1.el7.noarch.rpm

ensure that the service starts on boot and is started:

chkconfig rabbitmq-server on
sudo service rabbitmq-server start

Rinse and repeat on the second server.

Now before creating the cluster ensure that both servers have the rabbitmq-server is a stopped state:

sudo service rabbitmq-server stop

There is a cookie file that needs to be consistent across all nodes (master, slaves etc.):

cat /var/lib/rabbitmq/.erlang.cookie

Copy this file from the master to all other nodes in the cluster.

And then turn on the rabbitmq-server service on all nodes:

sudo service rabbitmq-server start

Now reset the app on ALL slave nodes:

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

and the following on the master node:

rabbitmqctl stop_app
rabbitmqctl reset

and create the cluster from the master node:

rabbitmqctl join_cluster rabbit@slave

* Substituting 'slave' with the hostname of the slave(s.)

NOTE: Make sure that you use a hostname / FQDN (and that each node can resolve each others) otherwise you might encounter problems when connecting the nodes.

Once the cluster has been created we can verify the status with:

rabbitmqctl cluster_status

We can then define a policy to define to provide HA for our queues:

rabbitmqctl start_app
rabbitmqctl set_policy ha-all "" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

Ensure you the pika (python) library installed (this provides a way of interacting with AMQP):

pip install pika

Let's enable the HTTP management interface so we can easily view our exchanges, queues, users and so on:

rabbitmq-plugins enable rabbitmq_management

and restart the server with:

sudo service rabbitmq-server restart

and navigate to:

http://<rabitmqhost>:15672

By default the guest user should be only useable from localhost (guest/guest) - although if you are on a cli you might need to remotely access the web interface and as a result will need to enable the guest account temporrily:

echo '[{rabbit, [{loopback_users, []}]}].' > /etc/rabbitmq/rabbitmq.config

Once logged in proceed to the 'Admin' tab >> 'Add a User' and ensure they can access the relevent virtual hosts.

We can revoke guest access from interfaces other than localhost by remove the line with:

sed -i '/loopback_users/d' /etc/rabbitmq/rabbitmq.config

NOTE: You might need to delete the

and once again restart the server:

sudo service rabbitmq-server restart

We can now test the mirrored queue with a simple python publish / consumer setup.

The publisher (node that instigates the messages) code should look something like:

import pika

credentials = pika.PlainCredentials('guest', 'guest')

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='rabbitmaster.domain.internal',port=5672,credentials=credentials))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

We should now see a new queue (that we have declared) in the 'Queues' tab within the web interface.

We can also check the queue status accross each node with:

sudo rabbitmqctl list_queues

If all goes to plan we should see the queue length in consistent accross all of our nodes.

Now for the consumer we can create something like the following:

import pika

credentials = pika.PlainCredentials('guest', 'guest')

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='rabbitmaster.domain.internal',port=5672,credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
   print(" [x] Received %r" % (body,))
channel.basic_consume(callback,
                     queue='hello',
                     no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

Now the last piece of the puzzles is to ensure that the network level has some kind of HA e.g. haproxy, nginx and so on.

* Partly based of / adapted from the article here: http://blog.flux7.com/blogs/tutorials/how-to-creating-highly-available-message-queues-using-rabbitmq

0 comments:

Post a Comment