When you already have systems tracking their own state in MongoDB, you can turn that into a real-time stream of structured events without rewriting application logic. This approach captures every meaningful change from Mongo, tags it with relevant metadata, and makes it instantly queryable in ClickHouse — all through a generic, reusable pattern.
The idea:
- One fixed event envelope for all sources
- Dynamic tags/attributes defined in config files
- No code changes when onboarding new collections
1. The Fixed Event Envelope
Every CDC message has the same top-level structure, no matter what source system or collection it came from:
{
"ts": "2025-08-09T14:00:07Z",
"source": "mongo",
"ns": {"db":"<db>","coll":"<coll>"},
"event_type": "insert|update|delete",
"idempotency_key": "mongo:<_id>:<op>:<ts>",
"tags": { /* dynamic */ },
"attrs": { /* dynamic */ },
"raw": { /* optional original doc/delta */ }
}
The core fields never change; only tags
and attrs
adapt to the source.
2. Config-Driven Mapping
A YAML file declares the tagging and attribute logic for each collection.
defaults:
hash_pii: true
max_label_cardinality: 100000
mappings:
- match: { ns.db: commodity, ns.coll: supplier_responses }
event_type_override:
when: "$exists(fullDocument.response_at)"
value: "response_received"
else: "request_sent"
tags:
- supplier_id
- consumer_id
- status
attrs:
- latency_sec: "$secondsDiff(fullDocument.response_at, fullDocument.sent_at)"
- record_count
- file_name
drops:
- payload_blob
pii:
- consumer_email
- match: { ns.db: ingest, ns.coll: files }
event_type_override:
when: "$eq(fullDocument.status, 'arrived')"
value: "file_arrived"
tags:
- consumer_id
- file_type
attrs:
- size_bytes
- expected_count
- checksum
computed:
- is_large: "$gt(fullDocument.size_bytes, 1048576)"
3. The Normalizer Service
A small containerized service:
- Consumes raw MongoDB Atlas change stream topics from Kafka
- Looks up the mapping for
ns.db
+ns.coll
- Applies tag/attr extraction rules
- Emits the normalized event to a common Kafka topic
Onboarding a new source means adding to mapping.yaml
— no redeploy required.
4. ClickHouse as the Sink
A single table handles all normalized events:
CREATE TABLE record_raw
(
ts DateTime CODEC(Delta, ZSTD),
event_type LowCardinality(String),
source LowCardinality(String),
ns_db LowCardinality(String),
ns_coll LowCardinality(String),
idempotency_key String,
tags JSON,
attrs JSON,
_ingested_at DateTime DEFAULT now()
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ts)
ORDER BY (ts, event_type, ns_db, ns_coll);
JSON columns keep the schema flexible for dynamic tags/attrs.
5. Guardrails
Generic pipelines need protection against runaway complexity:
- Cardinality caps on tags
- PII hashing/dropping from config
- Large field drops to keep payloads lean
- Fallback mode for unmapped collections
6. Why This Works
This pattern is reusable because:
- The envelope is fixed
- The metadata is externalized to config
- Any team can plug in their collection by writing a mapping block
- ClickHouse queries work for any source with zero schema changes
It’s a classic Adage/Karma move — decoupling what is happening from how you use it later.
7. Example Queries
Recent top event types:
SELECT event_type, count()
FROM record_raw
WHERE ts >= now() - INTERVAL 1 HOUR
GROUP BY event_type
ORDER BY count() DESC;
Latency by supplier:
SELECT JSON_VALUE(tags,'$.supplier_id') AS supplier,
avg(toFloat64OrZero(JSON_VALUE(attrs,'$.latency_sec'))) AS avg_lat
FROM record_raw
WHERE ts >= now() - INTERVAL 24 HOUR
AND supplier IS NOT NULL
GROUP BY supplier
ORDER BY avg_lat DESC;
End result: You can tap any MongoDB-backed system, stream its state changes to Kafka, and land them in ClickHouse — giving you a live, queryable history of everything that happens, with zero disruption to existing workflows.
Related Posts