cki_tools.amqp_bridge
Single service to act as a bridge between external AMQP 0.91, AMQP 1.0, and SQS queues and a single RabbitMQ exchange.
RabbitMQ
+----------------------------------------+
+-----------+ | +----------+ +------------+ |
| AMQP091 +-----+---->+ Exchange +----+---->+ Consumer_1 | |
+-----------+ | | +----------+ | +------------+ |
+-----------+ | | | +------------+ |
| AMQP10 +-----+ | +---->+ Consumer_2 | |
+-----------+ | | +------------+ |
+-----------+ | | : : |
| SQS +-----+ | +------------+ |
+-----------+ | | Consumer_n | |
+----------------------------------------+
A different process needs to be spawned for each server to mirror. Each one will subscribe to a given list of topics and forward them to the configured RabbitMQ exchange.
Usage
usage: python3 -m cki_tools.amqp_bridge [-h]
[--from-datagrepper] [--start START] [--end END]
options:
-h, --help show this help message and exit
Retrieve messages from datagrepper:
--from-datagrepper Retrieve messages from datagrepper instead of listening to AMQP
--start START Start time for datagrepper messages in ISO format
--end END End time for datagrepper messages in ISO format
When called without any arguments, the service will start listening to the configured AMQP servers and forward messages to the RabbitMQ exchange.
When called with --from-datagrepper
, the service will retrieve
messages from datagrepper instead of listening to AMQP. The --start
and --end
options specify the time range for the retrieved messages.
Configuration
Environment variable | Description |
---|---|
CKI_DEPLOYMENT_ENVIRONMENT |
Define the deployment environment (production/staging) |
RABBITMQ_HOST |
AMQP host |
RABBITMQ_PORT |
AMQP port, TLS is used for port 443 |
RABBITMQ_USER |
AMQP user |
RABBITMQ_PASSWORD |
AMQP password |
RABBITMQ_CAFILE |
AMQP CA file path |
RABBITMQ_CERTFILE |
AMQP certificate + private key file path |
RABBITMQ_PUBLISH_EXCHANGE |
AMQP exchange for the forwarded messages |
RABBITMQ_KEEPALIVE_S |
Time to keep AMQP connection alive between messages |
SENTRY_SDN |
Sentry SDN |
AMQP_BRIDGE_CONFIG |
YAML object containing the AMQP configuration |
AWS_ACCESS_KEY_ID |
AWS access key ID (for SQS authentication) |
AWS_SECRET_ACCESS_KEY |
AWS secret access key (for SQS authentication) |
AWS_DEFAULT_REGION |
AWS region for SQS (e.g., us-east-1) |
AWS_ENDPOINT_URL |
AWS endpoint URL for SQS (optional, for LocalStack) |
CKI_DEPLOYMENT_ENVIRONMENT
On staging developments (CKI_DEPLOYMENT_ENVIRONMENT != production
), the
behaviour changes the following way:
- On AMQP 0.91,
queue_name
is not used and a volatile queue with an uuid name is created instead. - On AMQP 1.0, queues (
queue://
) inreceiver_urls
are translated into topics (topic://
). - Messages are not forwarded to the destination but printed instead.
AMQP_BRIDGE_CONFIG
AMQP 1.0 Configuration
Field | Description |
---|---|
name |
Name for the bridge. Will be on the forwarded message headers. |
protocol |
amqp10 for AMQP 1.0 |
datagrepper_url |
URL of datagrepper to recover messages. |
message_topics |
List of topics/queues to subscribe to. |
receiver_urls |
List of server URLs to connect to. |
cert_path |
Path to the user authentication certificate. |
name: foobar
protocol: amqp10
message_topics:
- queue://Consumer.consumer-name.foo.VirtualTopic.foo.>
- queue://Consumer.consumer-name.foo.VirtualTopic.bar.>
receiver_urls:
- amqps://messaging-broker01.foobar.com:5671
- amqps://messaging-broker02.foobar.com:5671
cert_path: /path/to/cert.pem
AMQP 0.91 Configuration
Field | Description |
---|---|
name |
Name for the bridge. Will be on the forwarded message headers. |
protocol |
amqp091 for AMQP 0.91 |
routing_keys |
List of routing keys to subscribe to. |
host |
URL of the AMQP 0.91 server. Can be a list of space-separated strings. |
port |
Port of the AMQP 0.91 server. |
cafile |
Path of the CA certificate of the AMQP 0.91 server. |
cert |
Path of the client certificate of the AMQP 0.91 server. |
virtual_host |
Virtual host of the AMQP 0.91 server. |
exchange |
Name of the exchange to bind. |
queue_name |
Name of the queue to use. |
queue_type |
Override type of the queue to use. |
mirror |
Do not modify message headers or routing keys. |
name: foobar
protocol: amqp091
routing_keys:
- some.routing.key
- other.key.#
host: host.org
port: 5671
cafile: /path/to/ca.pem
certfile: /path/to/cert.pem
virtual_host: /virtual_host_name
exchange: exchange.name
queue_name: queue.name
SQS Configuration
Field | Description |
---|---|
name |
Name for the bridge. Will be on the forwarded message headers. |
protocol |
sqs for SQS |
queue_url |
Full SQS queue URL |
visibility_timeout |
Duration (in seconds) that received messages are hidden from subsequent retrieve requests (default: 10 * 60) |
wait_time_seconds |
Long polling wait time (default: 20) |
The SQS receiver operates in mirror mode, passing through original message attributes without adding bridge headers.
On message processing failures, messages will be re-queued after the
visibility_timeout
period. The default timeout of 10 minutes matches the CKI
RabbitMQ delay queues.
name: foobar
protocol: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/my-queue
visibility_timeout: 600
wait_time_seconds: 20
Resulting messages
The messages after being forwarded will have the same body as the original ones, but the headers and routing key will have the following format.
Routing key
Messages will be sent to the exchange keeping the original routing key, but
prepending the bridge name: {name}.{routing_key}
For a bridge named foo
, and a message received with the topic
tests.finished
, the resulting routing key will be foo.tests.finished
.
Messages bridged from SQS will not have the bridge name prepended to the routing key.
Headers
AMQP 1.0 Headers
Field | Value |
---|---|
message-type |
amqp-bridge |
message-amqp-bridge-name |
Name of the bridge. |
message-amqp-bridge-protocol |
amqp10 |
<original-headers> |
Properties (headers) of the original message. |
AMQP 0.91 Headers
Field | Value |
---|---|
message-type |
amqp-bridge |
message-amqp-bridge-name |
Name of the bridge. |
message-amqp-bridge-protocol |
amqp091 |
message-amqp091-topic |
Topic (routing key) of the original message. |
<original-headers> |
Headers of the original message. |