This article is part of a series: Jump to series overview

In this tutorial we will setup correct message passing in RabbitMQ for our cloud computing provider. If you think back to the first tutorial, you might remember that we had the problem that list-vms commands were only sent to one server - just like all other messages. Moreover, what use is it if server A gets the create VM command, but server B gets the delete VM command for the same VM? Our users would get erroneous replies that the VM does not exist. As you can see there is quite some mess to clean up.

To get a bit of structure into the situation let’s first think about which servers have to get informed about which messages:

  • create-vm: This message can go to any server. If a server does not have capacity it will drop the message and not acknowledge it. Another server can pick it up.
  • start-vm, stop-vm, delete-vm: These messages all have to go to the one server where the VM with a given VM ID is currently allocated. If it goes to any other server the VM will not be found.
  • list-vms: This message has to go to all servers.

Of course, there could be more limitations like for example dedicated customer groups or geographical regions and availability zones, but we will forget about all these for now.

So let’s recap which standard exchanges we have with RabbitMQ:

  • Direct Exchange: sends a message to queues with the matching routing key
  • Fanout Exchange: sends a message to all registered queues, which means that all registered consumers will receive it
  • Topic Exchange: allows queues to register with a topic that might also contain wildcards
  • Headers Exchange: similar to topic exchange, but uses header fields without wildcards for registration

Since we’re ignoring use cases like different regions and customers, direct exchange with a better assignment of queues will be enough. Each worker will read from two queues: First from a dedicated queue with messages for this worker, and second from a competing consumers queue. The command name will serve as a binding key. This way the client will not have to implement any logic about which messages have to go to which queues, it just sets the command name as routing key.

First of all, the client will not declare any queues anymore. It does not care about the queues, it only wants to send its message to an exchange. If the queues do not exist yet, there is no server running and messages can be dropped. We use the command name as the routing_key.

If we want to bind queues to the exchange with a user-defined binding key, we will have to setup a new exchange. Binding keys cannot be specified with the default exchange. We will also let the server handle the setup of the exchange. If no server is running, the client cannot do much anyway.

Inside client.py we change the function send_msg:

EXCHANGE_NAME = 'computing'

self.channel.basic_publish(
    exchange=EXCHANGE_NAME,
    routing_key=data['command'],
    properties=pika.BasicProperties(
        reply_to=reply_to,
        content_type='application/json',
    ),
    body=json.dumps(data).encode('utf-8'))

The client also has to be changed to not stop reading responses after the first response was received. It can now happen that there are many responses from many servers. Since we currently do not have a way to know how many hosts are participating and thus how many responses we can expect, we will just wait for a few seconds. To achieve this, we can delete self.channel.stop_consuming() from on_response and everything else remains the same.

Next, we will change the server to setup the required queues. For the queues that are only read by one server we can use an exclusive queue with a name generated by RabbitMQ. An exclusive queue will only be used by one consumer and as soon as the connection to this consumer is closed it will be deleted. For a production application this might be a problem on host restarts, because we will lose messages then, but it’s much nicer for playing around.

EXCHANGE_NAME = 'computing'
COMPETING_QUEUE = 'computing-competing'
QUEUE_COMMANDS_MAP = {
    '': ['list-vms', 'start-vm', 'stop-vm', 'delete-vm'],
    COMPETING_QUEUE: ['create-vm'],
}


def run():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=config.RABBITMQ_HOST))
    channel = connection.channel()

    channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='direct')

    # let rabbitmq define a name for the exclusive queue
    result = channel.queue_declare(queue='', exclusive=True)
    exclusive_queue_name = result.method.queue
    # setup one queue that is shared by all consumers
    channel.queue_declare(queue=COMPETING_QUEUE)

    for queue, commands in QUEUE_COMMANDS_MAP.items():
        if queue == '':
            queue = exclusive_queue_name

        for command in commands:
            channel.queue_bind(
                exchange=EXCHANGE_NAME, queue=queue, routing_key=command)

    channel.basic_consume(
        queue=exclusive_queue_name, on_message_callback=callback)
    channel.basic_consume(
        queue=COMPETING_QUEUE, on_message_callback=callback)

    # ...

Since this is the first time I run the software on multiple hosts, I’m starting to use environment variables for configuration. I introduced a new environment variable RABBITMQ_HOST to define the IP address or hostname of the RabbitMQ server. For this I created a new file config.py:

import os


RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', default='localhost')

In order to be able to connect to RabbitMQ from multiple hosts we can either allow the user guest on other connections than localhost or we can create a new custom user. For production systems only the latter is recommended, but since we are only playing around we will allow guest connections from remote machines.

For this, we create a configuration file /etc/rabbitmq/rabbitmq.conf with the following content:

loopback_users = none

With this all done, we can now run the server on two different computers and send a few requests. On one of my hosts I have the required base image available, on the other I do not. This allows us to easily see that requests are routed to different hosts. Each of the broadcast requests (list-vms, stop-vm, start-vm and delete-vm) will show up with two responses in the response list now.

$ aetherscale-cli create-vm --image ubuntu_base
[{'execution-info': {'status': 'success'}, 'response': {'status': 'starting', 'vm-id': 'sgeqxcfi'}}]

$ aetherscale-cli list-vms
[{'execution-info': {'status': 'success'}, 'response': ['vm-sgeqxcfi']},
{'execution-info': {'status': 'success'}, 'response': []}]

$ aetherscale-cli create-vm --image ubuntu_base
[{'execution-info': {'status': 'error', 'reason': 'Image "ubuntu_base" does not exist'}}]

$ aetherscale-cli stop-vm --vm-id sgeqxcfi
[{'execution-info': {'status': 'success'}, 'response': {'status': 'stopped', 'vm-id': 'sgeqxcfi'}},
{'execution-info': {'status': 'error', 'reason': 'VM does not exist'}}]

$ aetherscale-cli start-vm --vm-id sgeqxcfi
[{'execution-info': {'status': 'error', 'reason': 'VM does not exist'}},
{'execution-info': {'status': 'success'}, 'response': {'status': 'starting', 'vm-id': 'sgeqxcfi'}}]

$ aetherscale-cli delete-vm --vm-id sgeqxcfi
[{'execution-info': {'status': 'error', 'reason': 'VM does not exist'}},
{'execution-info': {'status': 'success'}, 'response': {'status': 'deleted', 'vm-id': 'sgeqxcfi'}}]
I do not maintain a comments section. If you have any questions or comments regarding my posts, please do not hesitate to send me an e-mail to blog@stefan-koch.name.