cki_tools.amqp_bridge

AMQP Bridge

Single service to act as a bridge between external AMQP 0.91, AMQP 1.0, and SQS queues and a single RabbitMQ exchange.

                      RabbitMQ
                     +----------------------------------------+
+-----------+        |  +----------+          +------------+  |
| AMQP091   +-----+---->+ Exchange +----+---->+ Consumer_1 |  |
+-----------+     |  |  +----------+    |     +------------+  |
+-----------+     |  |                  |     +------------+  |
| AMQP10    +-----+  |                  +---->+ Consumer_2 |  |
+-----------+     |  |                        +------------+  |
+-----------+     |  |                        :            :  |
| SQS       +-----+  |                        +------------+  |
+-----------+        |                        | Consumer_n |  |
                     +----------------------------------------+

A different process needs to be spawned for each server to mirror. Each one will subscribe to a given list of topics and forward them to the configured RabbitMQ exchange.

Usage

usage: python3 -m cki_tools.amqp_bridge [-h]
           [--from-datagrepper] [--start START] [--end END]

options:
  -h, --help          show this help message and exit

Retrieve messages from datagrepper:
  --from-datagrepper  Retrieve messages from datagrepper instead of listening to AMQP
  --start START       Start time for datagrepper messages in ISO format
  --end END           End time for datagrepper messages in ISO format

When called without any arguments, the service will start listening to the configured AMQP servers and forward messages to the RabbitMQ exchange.

When called with --from-datagrepper, the service will retrieve messages from datagrepper instead of listening to AMQP. The --start and --end options specify the time range for the retrieved messages.

Configuration

Environment variable Description
CKI_DEPLOYMENT_ENVIRONMENT Define the deployment environment (production/staging)
RABBITMQ_HOST AMQP host
RABBITMQ_PORT AMQP port, TLS is used for port 443
RABBITMQ_USER AMQP user
RABBITMQ_PASSWORD AMQP password
RABBITMQ_CAFILE AMQP CA file path
RABBITMQ_CERTFILE AMQP certificate + private key file path
RABBITMQ_PUBLISH_EXCHANGE AMQP exchange for the forwarded messages
RABBITMQ_KEEPALIVE_S Time to keep AMQP connection alive between messages
SENTRY_SDN Sentry SDN
AMQP_BRIDGE_CONFIG YAML object containing the AMQP configuration
AWS_ACCESS_KEY_ID AWS access key ID (for SQS authentication)
AWS_SECRET_ACCESS_KEY AWS secret access key (for SQS authentication)
AWS_DEFAULT_REGION AWS region for SQS (e.g., us-east-1)
AWS_ENDPOINT_URL AWS endpoint URL for SQS (optional, for LocalStack)

CKI_DEPLOYMENT_ENVIRONMENT

On staging developments (CKI_DEPLOYMENT_ENVIRONMENT != production), the behaviour changes the following way:

  • On AMQP 0.91, queue_name is not used and a volatile queue with an uuid name is created instead.
  • On AMQP 1.0, queues (queue://) in receiver_urls are translated into topics (topic://).
  • Messages are not forwarded to the destination but printed instead.

AMQP_BRIDGE_CONFIG

AMQP 1.0 Configuration

Field Description
name Name for the bridge. Will be on the forwarded message headers.
protocol amqp10 for AMQP 1.0
datagrepper_url URL of datagrepper to recover messages.
message_topics List of topics/queues to subscribe to.
receiver_urls List of server URLs to connect to.
cert_path Path to the user authentication certificate.
name: foobar
protocol: amqp10
message_topics:
  - queue://Consumer.consumer-name.foo.VirtualTopic.foo.>
  - queue://Consumer.consumer-name.foo.VirtualTopic.bar.>
receiver_urls:
  - amqps://messaging-broker01.foobar.com:5671
  - amqps://messaging-broker02.foobar.com:5671
cert_path: /path/to/cert.pem

AMQP 0.91 Configuration

Field Description
name Name for the bridge. Will be on the forwarded message headers.
protocol amqp091 for AMQP 0.91
routing_keys List of routing keys to subscribe to.
host URL of the AMQP 0.91 server. Can be a list of space-separated strings.
port Port of the AMQP 0.91 server.
cafile Path of the CA certificate of the AMQP 0.91 server.
cert Path of the client certificate of the AMQP 0.91 server.
virtual_host Virtual host of the AMQP 0.91 server.
exchange Name of the exchange to bind.
queue_name Name of the queue to use.
queue_type Override type of the queue to use.
mirror Do not modify message headers or routing keys.
name: foobar
protocol: amqp091
routing_keys:
  - some.routing.key
  - other.key.#
host: host.org
port: 5671
cafile: /path/to/ca.pem
certfile: /path/to/cert.pem
virtual_host: /virtual_host_name
exchange: exchange.name
queue_name: queue.name

SQS Configuration

Field Description
name Name for the bridge. Will be on the forwarded message headers.
protocol sqs for SQS
queue_url Full SQS queue URL
visibility_timeout Duration (in seconds) that received messages are hidden from subsequent retrieve requests (default: 10 * 60)
wait_time_seconds Long polling wait time (default: 20)

The SQS receiver operates in mirror mode, passing through original message attributes without adding bridge headers.

On message processing failures, messages will be re-queued after the visibility_timeout period. The default timeout of 10 minutes matches the CKI RabbitMQ delay queues.

name: foobar
protocol: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
visibility_timeout: 600
wait_time_seconds: 20

Resulting messages

The messages after being forwarded will have the same body as the original ones, but the headers and routing key will have the following format.

Routing key

Messages will be sent to the exchange keeping the original routing key, but prepending the bridge name: {name}.{routing_key}

For a bridge named foo, and a message received with the topic tests.finished, the resulting routing key will be foo.tests.finished.

Messages bridged from SQS will not have the bridge name prepended to the routing key.

Headers

AMQP 1.0 Headers

Field Value
message-type amqp-bridge
message-amqp-bridge-name Name of the bridge.
message-amqp-bridge-protocol amqp10
<original-headers> Properties (headers) of the original message.

AMQP 0.91 Headers

Field Value
message-type amqp-bridge
message-amqp-bridge-name Name of the bridge.
message-amqp-bridge-protocol amqp091
message-amqp091-topic Topic (routing key) of the original message.
<original-headers> Headers of the original message.