cki_tools.message_trigger

Trigger GitLab pipelines from webhook messages

Architecture

Messages flow through two stages:

flowchart LR
    MQ[RabbitMQ\ningress queue] -->|webhook payload| Ingress[Ingress consumer\nHandler.callback]
    Ingress -->|per-matched-config| PQ1[Per-project queue\nproject A]
    Ingress -->|per-matched-config| PQ2[Per-project queue\nproject B]
    PQ1 -->|prefetch=1 + ack delay| PA[Pipeline consumer A\ntrigger → ack]
    PQ2 -->|prefetch=1 + ack delay| PB[Pipeline consumer B\ntrigger → ack]
    PA --> GL[GitLab API]
    PB --> GL

The ingress consumer matches each incoming message against all configs and republishes one message per match to a dedicated per-project RabbitMQ queue.

listen() starts one ingress consumer on the main thread and one pipeline consumer daemon thread per configured project_url.

Only the pipeline threads call the GitLab API to trigger the pipelines, while the ingress thread fans out matched messages to the per-project queues.

flowchart TD
    subgraph ingress ["Main thread: ingress consumer"]
        MQ1[RabbitMQ ingress queue] --> CB[Handler.callback]
        CB --> H1[handle with no message-type header]
        H1 --> MATCH[pipeline_configs matches webhook]
        MATCH --> FAN[send to corresponding project queue]
    end

    subgraph pipeline ["Pipeline thread: one per project_url"]
        MQ2[Per-project RabbitMQ queue] --> PC[pipeline_callback]
        PC --> H2["handle with message-type == 'message-trigger'"]
        H2 --> TR[Handler.trigger]
        TR --> GL[GitLab pipelines.create]
        GL --> ACK[ack_fn]
        ACK --> SLEEP[sleep trigger_interval]
    end

    FAN -->|"routing key message-trigger.pipeline.*"| MQ2

Usage

usage: python3 -m cki_tools.message_trigger [-h] [--message MESSAGE] [--validate]

options:
  -h, --help         show this help message and exit
  --message MESSAGE  message data
  --validate         only validate configuration and exit

Without --message, listens to the AMQP message bus for messages and triggers GitLab pipelines matching the configuration.

Configuration

This service determines its configuration according to the first MESSAGE_TRIGGER_CONFIG* environment variable that is set:

  • from files in the MESSAGE_TRIGGER_CONFIG_DIR directory
  • from the contents of MESSAGE_TRIGGER_CONFIG
  • from the file in MESSAGE_TRIGGER_CONFIG_PATH

Example configuration:

.schema: cki_tools.message_trigger/schema.yml
service:
  project_url: https://gitlab.com/cki-project/cki-tools
  ref: main
  conditions:
    field: {jmespath: msg.field, regex: "[0-9]+"}
    path: {jmespath: msg.some.path, regex: "(?P<captured_value>.+)"}
  variables:
    EXTRACTED_VAR: "{captured_value}"
    TRIGGER_VAR: variable-value

This will trigger a pipeline for the main branch of https://gitlab.com/cki-project/cki-tools for each message that has a message field corresponding to the some.path JMESPath and a number at field. The pipeline will have two trigger variables: EXTRACTED_VAR is set to the value at some.path, while TRIGGER_VAR is set to the static value variable-value.

Field Description
project_url GitLab project URL that contains the deployment pipelines
ref Branch or ref for the triggered pipelines, defaults to main
conditions Message filters, the key is ignored
conditions.jmespath Limit to messages that match the path
conditions.regex If specified, limit to messages where the value matches the regex via re.fullmatch()
variables Dictionary of trigger variable name-value pairs, with named subgroups from conditions.regex provided via str.format_map()
trigger_interval Override seconds between pipeline creates for this project_url. Defaults to MESSAGE_TRIGGER_TRIGGER_INTERVAL.

The jmespath expressions are matched against a data structure that looks like this:

topic: rabbitmq.topic
headers:
  key1: value1
  ...
msg:
  key1: value1
  nested:
    key2: value2
  ...

Complex (non-string) fields captured via JMESPath expressions are serialized as JSON.

The regular expressions are configured with re.DOTALL by default to make it easy to cope with multi-line fields. This can be disabled for part of the expression by enclosing it in (?-s:…).

Boolean values can be matched by regular expressions like (?P<enabled>true).

Environment variables

Environment variable Secret Required Description
MESSAGE_TRIGGER_CONFIG_DIR no no Path to the configuration YAML files, falls back to MESSAGE_TRIGGER_CONFIG
MESSAGE_TRIGGER_CONFIG no yes Configuration in YAML, falls back to MESSAGE_TRIGGER_CONFIG_PATH
MESSAGE_TRIGGER_CONFIG_PATH no no Path to a single configuration YAML file
MESSAGE_TRIGGER_TRIGGER_INTERVAL no no Default seconds between pipeline creates per project (no delay by default). Per-config trigger_interval takes precedence.
MESSAGE_TRIGGER_PIPELINE_QUEUE_PREFIX no no Prefix for per-project pipeline queue names (default: cki.queue.message-trigger.pipeline)
GITLAB_TOKENS no yes URL/environment variable pairs of GitLab instances and private tokens
GITLAB_TOKEN yes yes GitLab private tokens as configured in GITLAB_TOKENS above
RABBITMQ_HOST no yes RabbitMQ host
RABBITMQ_PORT no no RabbitMQ port, TLS is used for port 443
RABBITMQ_USER no yes RabbitMQ user
RABBITMQ_PASSWORD yes yes RabbitMQ password
RABBITMQ_CAFILE no no RabbitMQ CA file path
RABBITMQ_CERTFILE no no RabbitMQ certificate + private key file path
CKI_DEPLOYMENT_ENVIRONMENT no no Define the deployment environment (production/staging)
CKI_LOGGING_LEVEL no no logging level for CKI modules, defaults to WARN
SENTRY_SDN yes no Sentry SDN

CKI_DEPLOYMENT_ENVIRONMENT

On non-production deployments (CKI_DEPLOYMENT_ENVIRONMENT != production), no pipelines are triggered.

Deployment requirements

Because listen() starts per-project pipeline consumer threads at startup, the RabbitMQ per-project queues and their bindings must exist before the service starts. For each project_url in the configuration, bind the exchange to a queue named <MESSAGE_TRIGGER_PIPELINE_QUEUE_PREFIX>.<project-slug> with the routing key message-trigger.pipeline.<project-slug>.

For a hard guarantee on per-project trigger_interval, run a single replica (or use RabbitMQ x-single-active-consumer on each per-project queue so only one replica consumes a given project at a time).

With multiple replicas sharing the same per-project queues, each message is still delivered to exactly one consumer (no duplicate pipeline triggers), but replicas can process messages in parallel and briefly exceed the intended pace. As a best-effort workaround, scale trigger_interval by the replica count (e.g. 3.0 s with 2 replicas → 6.0 s).