Communication fabric

How CKI components communicate via message queues.

CKI infrastructure is composed of a wide variety of services running on different OpenShift clusters. As the information shared by these services is critical for the CKI setup to operate correctly, communication needs to be handled in a reliable way.

To achieve this, the CKI Team uses an elaborated configuration of RabbitMQ replicated servers, queues and exchanges, creating a safe comunication channel for all the applications, scripts and microservices running both inside and outside Red Hat firewall.

Servers setup

Currently, 3 instances of RabbitMQ are running on AWS in a cluster configuration. With this setup, and following the high availability documentation, all queues are replicated on 2 nodes. This means that each queue has a master and a mirror node, which vary between the queues.

Connecting to the cluster

All nodes in the cluster have a public infra.cki-project.org sub domain name attached and use TLS encryption. The hosts follow the arr-cki-prod-rabbitmq-{aws zone}{a|b|c}.infra.cki-project.org naming schema. Currently only us-east-1 zone is being used, so the hosts are the following:

  • arr-cki-prod-rabbitmq-us-east-1a.infra.cki-project.org
  • arr-cki-prod-rabbitmq-us-east-1b.infra.cki-project.org
  • arr-cki-prod-rabbitmq-us-east-1c.infra.cki-project.org

For a correct connection fallback, it is necessary to specify all the cluster nodes when connecting. This feature is supported by pika. A simple interface is provided via cki-lib’s messagequeue.MessageQueue helper that allows setting the list of nodes as a space-separated list on a string. This module is also able to pick up all the necessary configurations from RABBITMQ_{HOST,PORT,USER,PASSWORD,CAFILE} environment variables.

As pika will always connect to the first host of the list and uses the others only after the first one fails, messagequeue.MessageQueue randomizes the order of hosts to distribute the load smoothly across the servers.

Note: Do not hardcode these values. On deployment-all configs, the PUBLIC_RABBITMQ_HOSTS environment variable should be used instead.

Development environments

Messages on a queue can only be consumed once, i.e. when a client consumes a message it becomes unavailable for other clients using the same queue.

To be able to run clients in a development environment, messagequeue.MessageQueue uses the CKI_DEPLOYMENT_ENVIRONMENT environment variable flag (via the is_production_or_staging method) to determine whether to create temporary queues instead of the persistent production queues.

These queues are disposable and emulate the behaviour of the production/staging queues without “stealing” the messages from the real consumers.

Temporary queues use a UUID4 as name, and have the auto_delete flag enabled, i.e. they will be automatically deleted after the last connection is closed. These temporary queues are bound to the same exchanges, and will start receiving messages after declaration. Messages already present on the production/staging queues will not be available.

Retry messages with DLX

It’s not unusual that something goes wrong while a service is processing a message. In this case, there are 3 important things to consider:

  • The message must not be lost
  • The failure must not block the service
  • The message should be retried automatically

To accomplish this, a retry system was created using Dead Letter Exchanges. DLX + Time-To-Live allows us to requeue a rejected message after a certain period of time.

In this architecture, an extra “retry” queue per production queue is created. Messages waiting for retry are going to patiently sit there until it is time to be reprocessed. Having an extra queue per consumer provides per-application visibility and also per-application reprocess time period configuration.

The message flow is the following:

  • A message arrives at an exchange (cki.exchange.{producer}), is optionally transmitted to a second exchange (cki.exchange.webhooks), and delivered to a certain queue (cki.queue.webhooks.{queue}).
  • The consumer rejects the message calling nack.
  • The message is routed to the cki.exchange.retry.incoming direct exchange via the DLX configuration of the queue, which includes the name of the original queue as the dead letter routing key.
  • The cki.exchange.retry.incoming direct exchange adds the message to the correct retry queue based on the routing key. The retry queue has a message TTL configured.
  • After the TTL expires, the message is requeued to the cki.exchange.retry.outgoing direct exchange via the DLX configuration of the retry queue. The original queue name is kept as the routing key.
  • The cki.exchange.retry.incoming direct exchange adds the message to the original queue based on the routing key.

Message flow

Configure TTL

To allow an easier configuration, x-message-ttl policy is not set on the retry queues by messagequeue.MessageQueue but is configured as a policy on the RabbitMQ server.

In order to use the messagequeue.MessageQueue retry feature with DLX on a different server, a policy needs to be added as follows:

name: retry queues
apply_to: queues
pattern: cki\.queue\.retry\..*
tags:
  # x-message-ttl: 10 minutes == 1000ms/s * 60s/1min * 10 minutes
  message-ttl: 600000

Use case: GitLab Webhooks

Producer-consumer overview

GitLab provides a webhook interface to plug in external services to GitLab events. These events include push actions, merge request interactions, and many other activities wanted by other running services, such as bots and data collectors.

As webhooks do not provide a reliable way to handle incidents such as infrastructure issues and consumers downtime, a message queue is vital to ensure that no messages are lost.

To accomplish this, a simple webhook-receiver is deployed using AWS Lambda. This script has the core responsibility of translating and publish the webhooks as RabbitMQ messages.

Being the Single Point of Failure, this script needs to be as simple and robust as possible. After a message is received, the json body is used as message payload and a routing key is generated containing the domain name of the GitLab instance and the project’s path.

kernel-workflow contains a set of consumers that react to GitLab webhooks for kernel-related projects. Applications subscribe to different messages using routing keys to filter the needed projects and events.

The kernel-workflow webhooks are not the only consumers of these messages. Other services such as the slack-bot and pipeline-herder also consume webhooks for other reasons.

Message types

RabbitMQ messages on the CKI message bus are marked by a message-type AMQP header depending on the producer.

post xchg listen xchg routing key message type descripion
webhooks webhooks gitlab.host.com.* gitlab GitLab webhook payload
webhooks webhooks sentry.io.* sentry Sentry webhook payload
webhooks webhooks jira.host.com.* jira Jira webhook payload
webhooks webhooks umb.* amqp-bridge UMB messages
webhooks webhooks fedmsg.* amqp-bridge fedmsg messages
kwf.bugzilla webhooks cki.kwf.bugzilla.* cki.kwf.umb-bz-event-bridge bz events
chatbot webhooks chatbot.* chatbot chat notifications
herder webhooks herder.* herder pipeline-herder notifications
datawarehouse webhooks datawarehouse.* datawarehouse DataWarehouse notifications

gitlab

  • Producer: webhook-receiver via webhook listener
  • Exchange: cki.exchange.webhooks
  • Routing keys: {web_url.hostname}.{web_url.path}.{object_kind}
AMQP header description
message-type gitlab
message-gitlab-event {http_headers.x-gitlab-event}
message-gitlab-event-uuid {http_headers.x-gitlab-event-uuid}
message-gitlab-instance {http_headers.x-gitlab-instance}
message-gitlab-webhook-uuid {http_headers.x-gitlab-webhook-uuid}
message-date {http_headers.date}

sentry

  • Producer: webhook-receiver via webhook listener
  • Exchange: cki.exchange.webhooks
  • Routing keys: sentry.io.{project}.{resource}.{action}
AMQP header description
message-type sentry
message-sentry-resource {http_headers.sentry-hook-resource}
message-date {http_headers.date}

jira

  • Producer: webhook-receiver via webhook listener
  • Exchange: cki.exchange.webhooks
  • Routing keys: {hostname}.{project_key}.{event_type}
AMQP header description
message-type jira
message-date {http_headers.date}

amqp-bridge (UMB)

  • Producer: amqp-bridge via UMB listener
  • Exchange: cki.exchange.webhooks
  • Routing keys: umb.VirtualTopic.{message.address}
AMQP header description
message-type amqp-bridge
message-amqp-bridge-name umb
message-amqp-bridge-protocol amqp10
message-amqp10-properties {message.properties}

amqp-bridge (fedmsg)

  • Producer: amqp-bridge via fedmsg listener
  • Exchange: cki.exchange.webhooks
  • Routing keys: fedmsg.{message.routing_key}
AMQP header description
message-type amqp-bridge
message-amqp-bridge-name fedmsg
message-amqp-bridge-protocol amqp091
message-amqp091-topic {message.routing_key}

cki.kwf.umb-bz-event

chatbot

  • Producer: cki-lib/chatbot and direct AMQP message from various services

  • Exchange: cki.exchange.webhooks

  • Routing keys: chatbot.{object}

  • Body:

    message: "chat message contents"
    
AMQP header description
message-type chatbot

pipeline-herder notifications (retry messages)

  • Producer: pipeline-herder

  • Exchange: cki.exchange.webhooks

  • Routing keys: herder.retry

  • Body:

    web_url: "{job_web_url}"
    not_before: "{not_before.isoformat()}"
    result:
      name: "{match_context.name}"
      description: "{match_context.description}"
      action: "{match_context.action}"
      maximum_artifact_size: "{match_context.maximum_artifact_size}"
      retry_delays: "{match_context.retry_delays}"
      retry_limit: "{match_context.retry_limit}"
    
AMQP header description
message-type herder
message-herder-type retry

pipeline-herder notifications (finished jobs)

  • Producer: pipeline-herder

  • Exchange: cki.exchange.webhooks

  • Routing keys: herder.build

  • Body:

    web_url: "{job_web_url}"
    
AMQP header description
message-type herder
message-herder-type build

DataWarehouse notifications

  • Producer: DataWarehouse

  • Exchange cki.exchange.webhooks

  • Routing keys: datawarehouse.{object_type}.{status}

  • Body:

    status: new|updated|needs_triage|build_setups_finished|tests_finished|ready_to_report
    object_type: checkout|build|test|testresult|issue|incident
    object: "{object}"
    misc: {issueregex_ids: [{issueregex_ids}...]}
    timestamp: "{timezone.now().isoformat()}"
    
AMQP header description
message-type datawarehouse

More details about the supported messages can be found in the DataWarehouse microservice docs.