Many systems already know a lot about themselves — you just have to listen.
MongoDB change streams (CDC) emit a continuous feed of inserts, updates, and deletes. With a little routing into a fast analytical database like ClickHouse, you can let the system “discover itself”: jobs, runs, schedules, dependencies, and even the fingerprints of human intervention.


1. Capture the Raw Feed

First, set up a connector:

MongoDB → Kafka → ClickHouse

In ClickHouse, land the JSON envelopes losslessly:

CREATE TABLE raw_cdc
(
  ts       DateTime64(3, 'UTC'),
  ns_db    LowCardinality(String),
  ns_coll  LowCardinality(String),
  op       LowCardinality(String),
  doc_id   String,
  full     String   -- full JSON document as string
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(ts)
ORDER BY (ns_db, ns_coll, ts);

Now you’ve got an immutable log of everything Mongo emits.


2. Normalize Events

Next, create a view to extract candidates for analysis. Different systems put identifiers in different places, so keep it flexible:

CREATE VIEW evt AS
SELECT
  ts,
  ns_db,
  ns_coll,
  op,
  coalesce(
    JSON_VALUE(full, '$.fullDocument.job_id'),
    JSON_VALUE(full, '$.job_id'),
    concat(ns_db, '.', ns_coll)
  ) AS job_id,
  coalesce(
    JSON_VALUE(full, '$.fullDocument.event_type'),
    JSON_VALUE(full, '$.event'),
    op
  ) AS event_type,
  JSON_VALUE(full, '$.fullDocument.corr_id') AS corr_id
FROM raw_cdc;

At this point you can ask: what kinds of events exist?

SELECT event_type, count() AS n
FROM evt
GROUP BY event_type
ORDER BY n DESC;

3. Segment Runs

A run is one execution of a job. There are two main ways to segment:

By correlation ID

If your events have a corr_id:

SELECT job_id, corr_id,
       minIf(ts, event_type ILIKE 'START%') AS t_start,
       maxIf(ts, event_type ILIKE 'SUCCESS%') AS t_success,
       maxIf(ts, event_type ILIKE 'FAIL%') AS t_fail
FROM evt
GROUP BY job_id, corr_id;

By time gaps

If not, use idle gaps to split runs:

WITH j AS (
  SELECT job_id, ts
  FROM evt
  ORDER BY job_id, ts
),
gaps AS (
  SELECT job_id, ts,
         dateDiff('minute',
           lagInFrame(ts) OVER (PARTITION BY job_id ORDER BY ts), ts
         ) AS gap_min
  FROM j
)
SELECT job_id,
       sum(if(gap_min IS NULL OR gap_min > 60, 1, 0))
         OVER (PARTITION BY job_id ORDER BY ts) AS run_seq,
       min(ts) OVER (PARTITION BY job_id, run_seq) AS t_start,
       max(ts) OVER (PARTITION BY job_id, run_seq) AS t_end
FROM gaps;

4. Learn Schedules and SLAs

With a few weeks of runs, you can compute typical due times and windows:

WITH runs AS (
  SELECT job_id, toStartOfDay(ts) AS d, max(ts) AS t_done
  FROM evt
  WHERE event_type ILIKE 'SUCCESS%'
  GROUP BY job_id, d
)
SELECT
  job_id,
  toDayOfWeek(t_done) AS dow,
  quantileExact(0.5)(toHour(t_done)*60 + toMinute(t_done)) AS p50_min_of_day,
  greatest(15,
    quantileExact(0.9)(toHour(t_done)*60 + toMinute(t_done)) -
    quantileExact(0.1)(toHour(t_done)*60 + toMinute(t_done))
  ) AS window_min,
  count() AS n
FROM runs
GROUP BY job_id, dow
HAVING n >= 20
ORDER BY job_id, dow;

Result:

  • “Job X usually completes around 01:05 on Mondays.”
  • “The window of variation is ~20 minutes.”

5. Infer Dependencies

Jobs often follow one another. Two approaches:

With correlation IDs

WITH s AS (
  SELECT job_id, corr_id, min(ts) AS t
  FROM evt
  WHERE event_type ILIKE 'SUCCESS%' AND corr_id IS NOT NULL
  GROUP BY job_id, corr_id
)
SELECT a.job_id AS from_job, b.job_id AS to_job,
       count() AS co, quantileExact(0.5)(dateDiff('minute', a.t, b.t)) AS lag_min
FROM s a
JOIN s b ON a.corr_id=b.corr_id AND a.job_id!=b.job_id AND b.t >= a.t
GROUP BY from_job, to_job
HAVING co >= 20
ORDER BY co DESC;

Without correlation IDs

WITH daily AS (
  SELECT job_id, toStartOfDay(ts) AS d,
         min(ts) AS first_ts, max(ts) AS last_ts
  FROM evt
  WHERE event_type ILIKE 'SUCCESS%'
  GROUP BY job_id, d
)
SELECT a.job_id AS from_job, b.job_id AS to_job,
       count() AS days,
       quantileExact(0.5)(dateDiff('minute', a.last_ts, b.first_ts)) AS lag_min
FROM daily a
JOIN daily b ON a.d=b.d AND a.job_id!=b.job_id AND b.first_ts>=a.last_ts
GROUP BY from_job, to_job
HAVING days >= 10
ORDER BY days DESC;

This produces a DAG: “Job A usually precedes Job B by ~15 minutes.”


6. Detect Late or Missing Runs

Generate daily expectations automatically:

CREATE TABLE runs_expected
ENGINE = MergeTree
ORDER BY (job_id, due_at) AS
SELECT
  job_id,
  toDateTime(
    concat(formatDateTime(addDays(today(), 1), '%Y-%m-%d'), ' ',
           toString(intDiv(p50_min_of_day,60)), ':',
           toString(modulo(p50_min_of_day,60)), ':00'),
    'America/Chicago') AS due_at,
  due_at + toIntervalMinute(window_min) AS wait_until
FROM job_profile
WHERE dow = toDayOfWeek(addDays(today(), 1));

Then compare actual events vs. expected windows to mark each run as MET, LATE, or MISSING.


7. Fingerprints of Human Intervention

Even without ticket logs, some patterns are clear:

  • Restart: fail → quick success outside normal cadence.
  • Backfill: a cluster of old dates appearing at once.
  • Override: downstream runs without upstream MET.

You can store these as synthetic “actions”:

SELECT job_id, ts AS fail_ts,
       anyLastIf(ts, event_type ILIKE 'SUCCESS%') OVER
         (PARTITION BY job_id ORDER BY ts ROWS BETWEEN CURRENT ROW AND 10 FOLLOWING)
         AS next_success_ts,
       dateDiff('minute', ts, next_success_ts) AS gap_min
FROM evt
WHERE event_type ILIKE 'FAIL%'
  AND gap_min BETWEEN 0 AND 15;

8. What You Get

After a few weeks of data flowing in:

  • A catalog of jobs discovered from events.
  • Profiles of their schedules and variability.
  • A DAG of dependencies with typical lags.
  • A status table of daily runs (MET/LATE/MISSING).
  • Inferred action logs showing how problems were resolved.

All of it without hand-entered crontabs or manual DAG definitions.


Why This Matters

  • Self-Discovery: The system tells you what’s normal.
  • Reliability: You can alert on late/missing runs without configuring each one by hand.
  • Learning: Over time, you can predict lateness earlier, suggest interventions, or even automate the simplest remediations.

Mongo already knows a lot from its history — streaming CDC into ClickHouse just makes it visible, queryable, and actionable.