cki_tools.message_trigger
Architecture
Messages flow through two stages:
flowchart LR
MQ[RabbitMQ\ningress queue] -->|webhook payload| Ingress[Ingress consumer\nHandler.callback]
Ingress -->|per-matched-config| PQ1[Per-project queue\nproject A]
Ingress -->|per-matched-config| PQ2[Per-project queue\nproject B]
PQ1 -->|prefetch=1 + ack delay| PA[Pipeline consumer A\ntrigger → ack]
PQ2 -->|prefetch=1 + ack delay| PB[Pipeline consumer B\ntrigger → ack]
PA --> GL[GitLab API]
PB --> GL
The ingress consumer matches each incoming message against all configs and republishes one message per match to a dedicated per-project RabbitMQ queue.
listen() starts one ingress consumer on the main thread and one
pipeline consumer daemon thread per configured project_url.
Only the pipeline threads call the GitLab API to trigger the pipelines, while the ingress thread fans out matched messages to the per-project queues.
flowchart TD
subgraph ingress ["Main thread: ingress consumer"]
MQ1[RabbitMQ ingress queue] --> CB[Handler.callback]
CB --> H1[handle with no message-type header]
H1 --> MATCH[pipeline_configs matches webhook]
MATCH --> FAN[send to corresponding project queue]
end
subgraph pipeline ["Pipeline thread: one per project_url"]
MQ2[Per-project RabbitMQ queue] --> PC[pipeline_callback]
PC --> H2["handle with message-type == 'message-trigger'"]
H2 --> TR[Handler.trigger]
TR --> GL[GitLab pipelines.create]
GL --> ACK[ack_fn]
ACK --> SLEEP[sleep trigger_interval]
end
FAN -->|"routing key message-trigger.pipeline.*"| MQ2
Usage
usage: python3 -m cki_tools.message_trigger [-h] [--message MESSAGE] [--validate]
options:
-h, --help show this help message and exit
--message MESSAGE message data
--validate only validate configuration and exit
Without --message, listens to the AMQP message bus for messages and triggers
GitLab pipelines matching the configuration.
Configuration
This service determines its configuration according to the first
MESSAGE_TRIGGER_CONFIG* environment variable that is set:
- from files in the
MESSAGE_TRIGGER_CONFIG_DIRdirectory - from the contents of
MESSAGE_TRIGGER_CONFIG - from the file in
MESSAGE_TRIGGER_CONFIG_PATH
Example configuration:
.schema: cki_tools.message_trigger/schema.yml
service:
project_url: https://gitlab.com/cki-project/cki-tools
ref: main
conditions:
field: {jmespath: msg.field, regex: "[0-9]+"}
path: {jmespath: msg.some.path, regex: "(?P<captured_value>.+)"}
variables:
EXTRACTED_VAR: "{captured_value}"
TRIGGER_VAR: variable-value
This will trigger a pipeline for the main branch of
https://gitlab.com/cki-project/cki-tools for each message that has a message
field corresponding to the some.path JMESPath and a number at field. The
pipeline will have two trigger variables: EXTRACTED_VAR is set to the value
at some.path, while TRIGGER_VAR is set to the static value variable-value.
| Field | Description |
|---|---|
project_url |
GitLab project URL that contains the deployment pipelines |
ref |
Branch or ref for the triggered pipelines, defaults to main |
conditions |
Message filters, the key is ignored |
conditions.jmespath |
Limit to messages that match the path |
conditions.regex |
If specified, limit to messages where the value matches the regex via re.fullmatch() |
variables |
Dictionary of trigger variable name-value pairs, with named subgroups from conditions.regex provided via str.format_map() |
trigger_interval |
Override seconds between pipeline creates for this project_url. Defaults to MESSAGE_TRIGGER_TRIGGER_INTERVAL. |
The jmespath expressions are matched against a data structure that looks like
this:
topic: rabbitmq.topic
headers:
key1: value1
...
msg:
key1: value1
nested:
key2: value2
...
Complex (non-string) fields captured via JMESPath expressions are serialized as JSON.
The regular expressions are configured with re.DOTALL by default to make it easy to cope with multi-line fields. This can be disabled for part of the expression by enclosing it in (?-s:…).
Boolean values can be matched by regular expressions like (?P<enabled>true).
Environment variables
| Environment variable | Secret | Required | Description |
|---|---|---|---|
MESSAGE_TRIGGER_CONFIG_DIR |
no | no | Path to the configuration YAML files, falls back to MESSAGE_TRIGGER_CONFIG |
MESSAGE_TRIGGER_CONFIG |
no | yes | Configuration in YAML, falls back to MESSAGE_TRIGGER_CONFIG_PATH |
MESSAGE_TRIGGER_CONFIG_PATH |
no | no | Path to a single configuration YAML file |
MESSAGE_TRIGGER_TRIGGER_INTERVAL |
no | no | Default seconds between pipeline creates per project (no delay by default). Per-config trigger_interval takes precedence. |
MESSAGE_TRIGGER_PIPELINE_QUEUE_PREFIX |
no | no | Prefix for per-project pipeline queue names (default: cki.queue.message-trigger.pipeline) |
GITLAB_TOKENS |
no | yes | URL/environment variable pairs of GitLab instances and private tokens |
GITLAB_TOKEN |
yes | yes | GitLab private tokens as configured in GITLAB_TOKENS above |
RABBITMQ_HOST |
no | yes | RabbitMQ host |
RABBITMQ_PORT |
no | no | RabbitMQ port, TLS is used for port 443 |
RABBITMQ_USER |
no | yes | RabbitMQ user |
RABBITMQ_PASSWORD |
yes | yes | RabbitMQ password |
RABBITMQ_CAFILE |
no | no | RabbitMQ CA file path |
RABBITMQ_CERTFILE |
no | no | RabbitMQ certificate + private key file path |
CKI_DEPLOYMENT_ENVIRONMENT |
no | no | Define the deployment environment (production/staging) |
CKI_LOGGING_LEVEL |
no | no | logging level for CKI modules, defaults to WARN |
SENTRY_SDN |
yes | no | Sentry SDN |
CKI_DEPLOYMENT_ENVIRONMENT
On non-production deployments (CKI_DEPLOYMENT_ENVIRONMENT != production), no
pipelines are triggered.
Deployment requirements
Because listen() starts per-project pipeline consumer threads at startup, the
RabbitMQ per-project queues and their bindings must exist before the service
starts. For each project_url in the configuration, bind the exchange to a
queue named <MESSAGE_TRIGGER_PIPELINE_QUEUE_PREFIX>.<project-slug> with the
routing key message-trigger.pipeline.<project-slug>.
For a hard guarantee on per-project trigger_interval, run a single replica
(or use RabbitMQ x-single-active-consumer on each per-project queue so only
one replica consumes a given project at a time).
With multiple replicas sharing the same per-project queues, each message is
still delivered to exactly one consumer (no duplicate pipeline triggers), but
replicas can process messages in parallel and briefly exceed the intended pace.
As a best-effort workaround, scale trigger_interval by the replica count
(e.g. 3.0 s with 2 replicas → 6.0 s).