Message Queues

A message queue is a component used for interprocess communication (IPC), among others. Such component provides an asynchronous communication protocol, so the sender and receiver of the message do not need to interact with the message queue at the same time: messages placed onto the queue are stored until the recipient retrieves them.

BonFIRE uses a management message queue to alert components of events occurring in other components. The message queue was introduced in the architecture in order to support message passing within modules (Resource Manager, Experiment Manager, etc). For instance, the accounting service uses events from the sites to determine how long a virtual machine ran for.

Note

For a deeper insight on the development please check Source code location, Dependencies and Configuration.

Existing queues

The available queues will be enumerated here along with their respective details.

Hint

There are some restrictions on access. I.e., users are not allowed to access the management message queue (MMQ) as it will contain events beyond their experiments. Users can, however, listen to events on other message queues with messages related to their resources or themselves, such as the experiment (EMQ), user and group message queues. To support intra-experiment control messages users can also write messages to the MMQ for their experiment.

MMQ

The management message queue, where almost every entity writes to and consequently where every event can be retrieved from.

Exchange

Topic. Used to ease its consumers work, so those entities can filter exactly what they want.

Consumers

A list of the entities that read from here, along with their associated username:

Producers

A list of the entities that write here, along with their associated username:

EMQ

The experiment message queue.

Exchange

Direct. For the characteristics of this queue, a more elaborated exchange was not deemed necessary.

Consumers

A list of the entities that read from here:

Producers

Tip

To complete with service username

A list of the entities that write here, along with their associated username:

Implementation details

Apart from the theoretical aspects, here are exposed the dependencies for the message queues software, the location of related source code and some configuration details.

Source code location

Refer to the Spreader’s Source code location section for more information on this producer for the EMQ.

Dependencies

The following UNIX packages are necessary for the correct working of the message queues:

  • RabbitMQ (rabbitmq-server) v2.2.0

Exchanges

There are two different ways to pass and filter messages, each one with different implications. Therefore, a queue may be configured in one way or another.

Topic

This allows the message consumers to use the topic as filtering mechanism. For example, in the figure below, the producers publishes messages with different topics (usa.news, usa.weather, europe.news and europe.weather). If you are only interested in news events coming from the USA, you can simply create a consumer that binds to the usa.news topic. However, if you like to be informed about all news messages, you simply have to bind to *.news. Consuming all the messages can be achieved by binding to the # topic.

Topic exchange sample

Topic exchange sample

The topics are formatted as follows:

{source}.{objectType}.{eventType}

A brief explanation about each part:

  • source: who triggered the event/message (a testbed, Resource Manager, etc)
  • objectType: the resource that triggers some event (experiment, compute, router, etc)
  • eventType: can be one of the following: {create, delete, update, state}

Note

State is a special event type since it contains more topics: {state}.{genericState}.{testbedSpecificState}. Example: state.active.running.

Examples:

  • Consuming all experiment related events: bind to the *.experiment.# topic.
  • Consuming all create events: bind to #.create.#.
  • Consuming both reserved and deleted events: bind to two #.reserved.# and #.deleted.#.

See also

Topic exchange.

Direct

An effective way to connect to a queue: a routing key is used as the unique identifier for a queue. This way, every user can access only the queue(s) assigned to the routing key(s) he/she knows.

See also

Direct exchange.

Topic vs direct

From the producer’s side, there is no difference.

In the direct exchange scenario a producer places a message on a queue using a routingkey and in the other scenario the producer publishes using a topic.

From the consumer’s point of view, the main advantage is easy access to different “queues”.

Whereas in the case of the direct exchange scenario a consumer can only use the routing key for identifying the interesting queues, in the case of topics it is possible to have a more structured identifier. Because the topic itself holds more information - here the source, the objectType and the eventType, it is possible to filter out only the messages the consumer is interested in. This makes processing the events on the consumer’s side less cumbersome.

In the case of direct exchange you would need to check every individual message placed on the subscribed queue (subscribed using the routing key) to see if it is something the consumer needs or if it can be discarded. Another advantage is that there is no hassle if there are multiple consumers interested in the same type of topics. Each consumer will receive their subscribed messages. Having the same functionality with direct exchange is not as trivial because by default once a message is picked up from a queue, it is not available any more for other consumers.

Event schema

Almost every CRUD operation (create, state, update, delete) on resources triggers different types of events.

The format of these events may vary depending on the queue. For instance, MMQ has detailed messages while EMQ also contains simpler messages. These latter, simpler messages are directly provided by the Resource Manager. The format is explained in detail after an example of each type.

Events from Resource Manager

Sample event

{
    "timestamp":1374243305
    "type":"compute",
    "status":"created",
    "path":"/locations/uk-epcc/computes/1263",
}

Fields

The following fields are mandatory for every message:

timestamp

Same as timestamp.

type

Same as objectType.

status

Almost similar to eventType. However, it may vary slightly.

path

Same as objectId.

Events forwarded from Message Spreader

Sample event

{
    "timestamp":1373881015,
    "eventType":"create",
    "objectType":"experiment",
    "objectId":"/experiments/257",
    "source":"res-mng",
    "groupId":"crohr",
    "userId":"crohr",
    "objectData":{
        "name":"test",
        "duration":7200
    }
}

Fields

The following fields are mandatory for every message, except for objectData, which applies only to create and update events.

timestamp

Epoch timestamp.

eventType

Indicates the operation that fired the event. Can be one of the following:

  • create: indicates that an object is created. The definition of that object MUST be available in the objectData (more information in objectData section)
  • delete: indicates that an object is removed from the BonFIRE runtime
  • update: indicates that a particular object changes. Again, the new definition of that object MUST be available in the objectData field
  • state.{generic_state}("."testbed_specific_state): indicates that an object changes its state. More on this in the State section
State

The “state” event defines a change in the object’s state. Evidently, the states depend on the objectType. Since BonFIRE does not want to intervene (much) in the lifecycle management of the resources on the different testbeds, it can happen that for similar object states each testbed has its own representation. This is not very workable when we want to use the topic mechanism for easy event filtering (see Topic). That is why we should have a very small set of generic object states. Because we do not want to loose the detailed (and in most cases, more fine grained) state information of the individual testbeds, the eventType of a “state” event consists of 3 parts:

  • “state.”{generic_state}(”.”testbed_specific_state)
  • state: to denote it is a state event
  • generic_state: the options depend on the objectType. Every partner should feel comfortable adding interesting states or change states and exposing these generic states
    • experiment
    • compute|network|storage:
      • pending: a resource’s description has reached the site, but it still needs to be deployed
      • reserved: a resource is reserved from the moment it is taking away resources from other users Read: when do you want the experimenter to start paying for the resource
      • active: a resource is active when all of its functionality (also BonFIRE specific functionality) can be used by the user
      • stopping
      • stopped: a resource is stoped when it stops using physical resources
      • deleting
      • deleted: a resource is deleted when all references to the resource are removed from the site.
      • failed
    • Federica router
      • pending: a router is in pending state when the slice is still not ready. It is different from a reserved state since there is no resource booking
      • active: a router resource when is ready to be used by the user
      • deleted: the router is deleted from the experiment. The router must be in PENDING state to perfom this operation
      • failed: the router creation failed
    • Federica network
      • pending: a Federica network is in pending state when the slice is still not ready. It is different from a reserved state since there is no resource booking
      • active: a Federica network resource when is ready to be used by the user.
      • deleted: the Federica network is deleted from the experiment. The Federica network must be in PENDING state to perfom this operation
      • failed: the Federica network failed during its creation
    • Autobahn site_link
      • pending
      • active
      • deleted
      • failed
  • A list of testbed specific states, separated by a ”.” (period). For example, disabling a Managed Network at the VirtualWall causes the following state event: state.active.down
objectType

The type of the object this event applies to. E.g. experiment, compute, network, router, network_link, reservation, etc.

objectId

Full object ID. E.g. experiments/345, /locations/uk-epcc/computes/453. For objects that have a location it will be represented in this object ID so there is no need for a locations field.

source

The unique identification of the entity that fired this event. This should be the username used by any producer of MMQ.

userId

The user who caused this event, e.g. in the example: the user that created the Experiment.

groupId

The group on behalf of which the user triggered the event.

objectData

Note

Only mandatory for create and update events.

This field contains a sub-message inside. It should hold sufficient information to be able to recreate the entire object. However, sufficient does not mean all the information one would get by doing a GET on a resource. For example, information to identify a session, like sequence numbers, session ids etc, should not be in the objectData.

Follows a sample for each kind of resource.

Reservation
"objectData": {
  "name": "myReservation",
  "duration": 120,
  "allocationUnits": 23
}

A brief explanation about some fields:

  • duration: in minutes
  • allocationUnits: total number of allocation units in the reservation. Consumers may multiply this by the duration to get allocation unit hours.
OCCI
"objectData": {
  "relatedId": "/experiments/1234",
  "httpRequestLine": "GET /experiments/1234 HTTP/1.1",
  "headers": { "X-BONFIRE-ASSERTED-ID": "gvseghbr", ... },
  "occi": "...",
  "tool": "..."
}

A brief explanation about some fields:

  • relatedId: full reference to the object
  • occi: complete XML message
  • tool: first element of the USER-AGENT header
Experiment
"objectData": {
  "name": "My Experiment",
  "duration": 120,
  "reservationId" : "/reservations/345"
}

A brief explanation about some fields:

  • reservationId: optional reservation id if experiment has an associated reservation
Managed experiment

Any of the following:

"objectData": {
  "name": "My Experiment",
  "experimentId": "/experiments/657",
  "type": "JSON",
  "descriptor": {...}
}

"objectData": {
  "name": "My Experiment",
  "experimentId": "/experiments/657",
  "type": "OVF",
  "descriptor": "..."
}

A brief explanation about some fields:

  • descriptor: full JSON descriptor (1st message), full OVF descriptor in XML (2nd message)
Storage
"objectData": {
  "name": "storage-name",
  "type": "DATABLOCK",
  "persistent": false
  "size": 1024,
  "fstype": "ext3",
  "experimentId": "/experiments/657"
}
Network
"objectData": {
  "name": "network-name",
  "address": "192.168.0.0".
  "size": "C",
  "type": "active"
  "lossrate": 0.1,
  "latency": 10,
  "bandwidth": 100,
  "protocol": "TCP",
  "packetsize": 50,
  "throughput": 30,
  "experimentId": "/experiments/657"
}
Compute
"objectData": {
  "name": "compute-name",
  "instanceType": "/locations/fr-inria/instance_type/medium",
  "cpu": 1,
  "vcpu": 4,
  "memory": 1024,
  "cluster": null,
  "host": "node-1.integration.bonfire.grid5000.fr",
  "osimage": "/locations/fr-inria/storages/571",
  "disks": [
             {"id":"0",
              "storage":"/locations/fr-inria/storages/571",
              "type":"FILE",
              "target":"sda"}, ...
           ],
  "networks": [
                {"network":"/locations/uk-epcc/networks/0",
                 "ip":"172.18.3.57",
                 "mac":"02:00:ac:12:03:39"}, ...
              ],
  "experimentId": "/experiments/657",
  "bestEffortAllocation" : true,
  "allocationUnits" : 4
}

A brief explanation about some fields:

  • bestEffortAllocation: optional (only present in res-mng events). Defaults to false. Is this a best effort allocation (rather than reservation)?
  • allocationUnits: optional (only present in res-mng events). Must be present if bestEffortAllocation is true.
Federica Resources
Router
"objectData": {
  "name": "RouterName",
  "host": "HostName",
  "interfaces" : [
    { "name" : "if_a1",
      "physicalInterface" : "ge-0/2/2",
      "ip" : "192.168.1.2",
      "netmask" : "255.255.255.0"
    },
    { "name" : "if_a2",
      "physicalInterface" : "ge-0/1/0",
      "ip" : "192.168.2.1",
      "netmask" : "255.255.255.0"
    },
    ...
  ],
  "experimentId": "/experiment/1"
  "config": "myRouterConfig"
}
Network
"objectData": {
  "name" : "myFedericaNetwork",
  "vlan": "network vlan id"
  "networkLinks" :  [
    { "src_router" : "RouterA",
      "src_interface" : "if_a1",
      "dst_router" : "RouterB",
      "dst_interface" : "if_b1"
    },
    { "src_router" : "RouterB",
      "src_interface" : "if_b2",
      "dst_router" : "RouterC",
      "dst_interface" : "if_c1"
    },
    ...
  ],
  "experimentId": "/experiment/1"
}
Autobahn resources
Amazon resources

BonFIRE only allows to create compute, network, and storage resources on Amazon. These resources use the same objectData scheme as already presented, with the location being the only difference.

Virtual Wall Resources

The Virtual Wall does not have special resources in BonFIRE. The special thing the VirtualWall will do (soon) is launching a create event for computes on behalf of the user, when the experiment uses Managed Networks. This is done in order to correctly define the number of computes being used by the experimenter.

Message Spreader

New in version R4.1.

Component that extends the writing to the EMQ already made by Resource Manager by forwarding messages from the MMQ to the experiment, user and group queues (EMQ, UMQ, GMQ) respectively.

For that matter, Spreader listens to a chosen set of topics within MMQ and forwards every message here to the Experiment, User and Group queues, properly classifying it with the corresponding routing key that corresponds.

Message Spreader workflow

Message Spreader workflow

Note

For a deeper insight on the development please check Source code location, Dependencies, Configuration and also the specs.

Implementation details

Apart from the theoretical aspects, here are exposed the dependencies for the Spreader module as well as the location of the source code that acts as the proxy between MMQ and the EMQ, resulting in message production at the EMQ.

Source code location

Code is located under spreader/ (source)

Dependencies

The following UNIX packages are necessary for the correct working of the Spreader:

  • MySQL server (mysql-server)
  • Python (python)
  • Python PIKA (python-pika)

Configuration

Spreader has its config file on the root folder (spreader/config), where both the que and database properties are defined, as well as the different exchanges it may use.

Follows a sample configuration file:

[queue]
host = mq.bonfire-project.eu
vhost = bonfire
exchange = <name_of_exchange>
user = <rabbitmq_enabled_consumer_for_exchange>
password = <password_for_previous_consumer>
port = 5672
topics = res-mng.experiment.#, fr-inria.#, be-ibbt.#, uk-epcc.#, de-hlrs.#

[database]
host = localhost
user = <mysql_user>
password = <mysql_password>
database = <mysql_database>


[exchange "experiments"]
user = <rabbitmq_enabled_producer_for_exchange>
password = <password_for_previous_producer>

[exchange "groups"]
user = <rabbitmq_enabled_producer_for_exchange>
password = <password_for_previous_producer>

[exchange "users"]
user = <rabbitmq_enabled_producer_for_exchange>
password = <password_for_previous_producer>

The list of fields and a brief explanation on the non-trivial ones:

  • queue: defines the configuration data for the queue. One per file
    • host
    • vhost
    • exchange: MMQ exchange defined in RabbitMQ
    • user: RabbitMQ user with consumer (reading) permissions for this exchange (MMQ)
    • password
    • port
    • topics: list of topics to listen/bind to in MMQ
  • database: defines the configuration data for the database. One per file
    • host
    • user: MySQL user with reading permissions for this database
    • password
    • database: current database to retrieve the routing keys from (i.e. RM database)
  • exchange sections: one or more per file
    • user: RabbitMQ user with producer (writing) permissions for this exchange
    • password

Specifications

Spreader was initially thought to act not only as a message spreader to the experiment, user and group queues, but to be also the central writing point to these (i.e. replacing the RM writings to EMQ) as in Message Spreader workflow.

Due to the little remaining time to fully test such a component, Spreader remains a simple spreader (see Current), thus sending new messages to EMQ along with the ones already being sent by RM.

Initial

One of the advantages this may have given us would be a means of controlling the different timestamps from could look like the same event (for example the RM would generate a resource create event some time after the site creates the resource create event). The previous spec tried to handle this and to aid with backwards compatibility was attempting to translate new MMQ events into old EMQ events.

This approach was considered to be too risky so late in the day with various key components of BonFIRE relying on the EMQ. The decision was made to leave the old EMQ events in play, as they were, for monitoring and other usages (e.g. the log consumer, as exposed from the Portal under the monitoring tab). We have no reason to break backwards compatibility and no effort and time to fix anything that we may break.

The initial specs assumed the following:

  • Message Spreader would send two types of formatted events to the EMQ:
    1. Events formatted following MMQ format
    2. Events formatted following the original EMQ format (lighter, used for monitoring). This seeked backwards compatibility so as not to break existing services (e.g. monitoring) that rely on the current EMQ format
  • Site and experiment events were to be filtered and adapted to EMQ (lighter) format, depending on its source (site, experiment) and the type of event

Note

Nevertheless, Spreader is prepared to take on from this point and centralize the RM writings to EMQ. If interested, take a look to the spreader ‣ messages ‣ message ‣ postprocess method and set the enable_parsing and enable_filering variables to True. You may also check the initial workflow.

Current

Summing up, Spreader works as follows:

  1. Connects to the MMQ and listens to a chosen set of topics, as defined in the topic section within spreader ‣ config file
  2. Determines the kind of object within the event obtains its routing key. See On routing keys
  3. Forwards -if possible- this message to the Experiment, User and Group queues with the corresponding routing key(s)

On routing keys

The routing key is an alphanumeric string that uniquely identifies some kind of resource (e.g. experiment, but not limited to) and is kept in the Resource Manager database.

Example: getting the experiment routing key for a resource

  • Experiment:
SELECT DISTINCT routing_key
FROM experiments
WHERE CONCAT('/experiments/', id) = objectId;
  • Other resource (compute, link, network, storage):
SELECT DISTINCT e.routing_key
FROM resources AS r
LEFT JOIN locations AS l ON r.location_id=l.id
LEFT JOIN experiments AS e ON r.experiment_id=e.id
WHERE CONCAT('/locations/', l.name, '/', r.kind, 's/', r.remote_id) = objectId;

Note

Not every message contains enough information to retrieve a routing key for it. In that case the message is not forwarded to any of the aforementioned queues.