Cleaning up Message Routing of our Cloud Computing Project
In this tutorial we will setup correct message passing in RabbitMQ for
our cloud computing provider. If you think back to the first tutorial,
you might remember that we had the problem that list-vms
commands were
only sent to one server - just like all other messages. Moreover, what use is
it if server A gets the create VM command, but server B gets the delete VM
command for the same VM? Our users would get erroneous replies that the VM
does not exist. As you can see there is quite some mess to clean up.
To get a bit of structure into the situation let’s first think about which servers have to get informed about which messages:
create-vm
: This message can go to any server. If a server does not have capacity it will drop the message and not acknowledge it. Another server can pick it up.start-vm
,stop-vm
,delete-vm
: These messages all have to go to the one server where the VM with a given VM ID is currently allocated. If it goes to any other server the VM will not be found.list-vms
: This message has to go to all servers.
Of course, there could be more limitations like for example dedicated customer groups or geographical regions and availability zones, but we will forget about all these for now.
So let’s recap which standard exchanges we have with RabbitMQ:
- Direct Exchange: sends a message to queues with the matching routing key
- Fanout Exchange: sends a message to all registered queues, which means that all registered consumers will receive it
- Topic Exchange: allows queues to register with a topic that might also contain wildcards
- Headers Exchange: similar to topic exchange, but uses header fields without wildcards for registration
Since we’re ignoring use cases like different regions and customers, direct exchange with a better assignment of queues will be enough. Each worker will read from two queues: First from a dedicated queue with messages for this worker, and second from a competing consumers queue. The command name will serve as a binding key. This way the client will not have to implement any logic about which messages have to go to which queues, it just sets the command name as routing key.
First of all, the client will
not declare any queues anymore. It does not care about the queues, it only
wants to send its message to an exchange. If the queues do not exist yet,
there is no server running and messages can be dropped. We use the
command name as the routing_key
.
If we want to bind queues to the exchange with a user-defined binding key, we will have to setup a new exchange. Binding keys cannot be specified with the default exchange. We will also let the server handle the setup of the exchange. If no server is running, the client cannot do much anyway.
Inside client.py
we change the function send_msg
:
EXCHANGE_NAME = 'computing'
self.channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key=data['command'],
properties=pika.BasicProperties(
reply_to=reply_to,
content_type='application/json',
),
body=json.dumps(data).encode('utf-8'))
The client also has to be changed to not stop reading responses after
the first response was received. It can now happen that there are many
responses from many servers. Since we currently do not have a way to know how many hosts are
participating and thus how many responses we can expect, we will just wait
for a few seconds. To achieve this, we can delete self.channel.stop_consuming()
from on_response
and everything else remains the same.
Next, we will change the server to setup the required queues. For the
queues that are only read by one server we can use an exclusive
queue
with a name generated by RabbitMQ. An exclusive queue will only be used by
one consumer and as soon as the connection to this consumer is closed it will
be deleted. For a production application this might be a problem on host
restarts, because we will lose messages then, but it’s much nicer for
playing around.
EXCHANGE_NAME = 'computing'
COMPETING_QUEUE = 'computing-competing'
QUEUE_COMMANDS_MAP = {
'': ['list-vms', 'start-vm', 'stop-vm', 'delete-vm'],
COMPETING_QUEUE: ['create-vm'],
}
def run():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=config.RABBITMQ_HOST))
channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='direct')
# let rabbitmq define a name for the exclusive queue
result = channel.queue_declare(queue='', exclusive=True)
exclusive_queue_name = result.method.queue
# setup one queue that is shared by all consumers
channel.queue_declare(queue=COMPETING_QUEUE)
for queue, commands in QUEUE_COMMANDS_MAP.items():
if queue == '':
queue = exclusive_queue_name
for command in commands:
channel.queue_bind(
exchange=EXCHANGE_NAME, queue=queue, routing_key=command)
channel.basic_consume(
queue=exclusive_queue_name, on_message_callback=callback)
channel.basic_consume(
queue=COMPETING_QUEUE, on_message_callback=callback)
# ...
Since this is the first time I run the software on multiple hosts,
I’m starting to use environment variables for configuration. I introduced
a new environment variable RABBITMQ_HOST
to define the IP address or hostname
of the RabbitMQ server. For this I created a new file config.py
:
import os
RABBITMQ_HOST = os.getenv('RABBITMQ_HOST', default='localhost')
In order to be able to connect to RabbitMQ from multiple hosts we can either allow the user guest on other connections than localhost or we can create a new custom user. For production systems only the latter is recommended, but since we are only playing around we will allow guest connections from remote machines.
For this, we create a configuration file /etc/rabbitmq/rabbitmq.conf
with
the following content:
loopback_users = none
With this all done, we can now run the server on two different computers and
send a few requests. On one of my hosts I have the required base image
available, on the other I do not. This allows us to easily see that requests
are routed to different hosts. Each of the broadcast requests (list-vms
,
stop-vm
, start-vm
and delete-vm
) will show up with two responses
in the response list now.
$ aetherscale-cli create-vm --image ubuntu_base
[{'execution-info': {'status': 'success'}, 'response': {'status': 'starting', 'vm-id': 'sgeqxcfi'}}]
$ aetherscale-cli list-vms
[{'execution-info': {'status': 'success'}, 'response': ['vm-sgeqxcfi']},
{'execution-info': {'status': 'success'}, 'response': []}]
$ aetherscale-cli create-vm --image ubuntu_base
[{'execution-info': {'status': 'error', 'reason': 'Image "ubuntu_base" does not exist'}}]
$ aetherscale-cli stop-vm --vm-id sgeqxcfi
[{'execution-info': {'status': 'success'}, 'response': {'status': 'stopped', 'vm-id': 'sgeqxcfi'}},
{'execution-info': {'status': 'error', 'reason': 'VM does not exist'}}]
$ aetherscale-cli start-vm --vm-id sgeqxcfi
[{'execution-info': {'status': 'error', 'reason': 'VM does not exist'}},
{'execution-info': {'status': 'success'}, 'response': {'status': 'starting', 'vm-id': 'sgeqxcfi'}}]
$ aetherscale-cli delete-vm --vm-id sgeqxcfi
[{'execution-info': {'status': 'error', 'reason': 'VM does not exist'}},
{'execution-info': {'status': 'success'}, 'response': {'status': 'deleted', 'vm-id': 'sgeqxcfi'}}]