Skip to content

5. Instrumentation & Evidence

1What breaks
2Tools miss it
3Epistemic integrity
4MASO controls
5Instrumentation

After this module you will be able to

  • Define and emit the specific metrics needed at each agent boundary to detect reasoning-basis failures
  • Design a chain-level integrity dashboard with actionable alerts
  • Implement retrieval completeness, context utilisation, and cross-agent consistency metrics
  • Build canary tests and chaos tests that catch Phantom Compliance-style failures
  • Establish an evidence pipeline that proves your controls are working

The metrics you don't have yet

Module 2 showed you what your current observability stack misses. This module gives you the specific metrics to close those gaps, the dashboards to visualise them, and the tests to prove they work.

Every metric in this module maps to a failure mode from Module 1:

Metric Failure mode it detects Where it's emitted
Retrieval completeness ratio Truncated retrieval After every retrieval step
Context utilisation percentage Context overflow Before every LLM inference
Context truncation flag Context overflow Before every LLM inference
Tool response freshness Stale tool responses After every tool call
Delegation depth Delegation loops At every delegation call
Cross-agent confidence consistency All failure modes At every inter-agent boundary
Drift score (rolling baseline) Semantic drift Computed daily over sliding window

Metric 1: Retrieval completeness ratio

This is the single most important metric for detecting Phantom Compliance-style failures.

Definition: actual_results / expected_results for each retrieval step.

How to compute expected_results:

There are three approaches, from simplest to most robust:

  1. Static configuration: Manually set expected counts for each data source and query type. Simple but requires maintenance.

    EXPECTED_COUNTS = {
        "restricted_securities": 312,
        "approved_counterparties": 1847,
        "concentration_limits": 45,
    }
    
  2. Rolling baseline: Compute the median result count over the last N queries of the same type. Adapts automatically but can drift.

    async def get_rolling_baseline(source_id, query_type, window=100):
        recent_counts = await metrics_store.get_recent_counts(
            source_id=source_id,
            query_type=query_type,
            limit=window,
        )
        return statistics.median(recent_counts)
    
  3. Count query: Before (or in parallel with) the retrieval, run a count query against the source to get the exact current count. Most accurate but adds latency.

    async def counted_retrieval(query, source_id):
        # Run count and retrieval in parallel
        count_task = vector_store.count(query)
        results_task = vector_store.query(query, limit=MAX_RESULTS)
    
        expected, results = await asyncio.gather(
            count_task, results_task
        )
    
        ratio = len(results) / expected if expected > 0 else 0
        emit_metric("retrieval_completeness_ratio", ratio, tags={
            "source_id": source_id,
            "query": query,
            "expected": expected,
            "actual": len(results),
        })
    
        return results, ratio
    

Alert thresholds:

  • Below 0.9: Warning. Investigate whether the shortfall is expected (sparse data) or unexpected (truncation, source issue).
  • Below 0.5: Critical. The agent is seeing less than half the available data. Block or escalate.
  • Below 0.2: Emergency. PACE emergency path. This is the Phantom Compliance range.

Grafana panel specification:

Panel: Retrieval Completeness Ratio
Type: Time series
Query: histogram_quantile(0.05, retrieval_completeness_ratio)
Thresholds:
  - Green: >= 0.9
  - Yellow: 0.5 - 0.9
  - Red: < 0.5
Group by: source_id, agent_id

Metric 2: Context utilisation and truncation

Definition: tokens_used / tokens_available for each LLM inference step.

Context utilisation itself is not a problem; high utilisation is normal for agents processing large inputs. The danger signal is utilisation at or near 100%, combined with a truncation event.

Implementation:

def emit_context_metrics(
    prompt_tokens: int,
    max_context_tokens: int,
    truncation_occurred: bool,
    truncated_sections: list[str],
):
    utilisation = prompt_tokens / max_context_tokens

    emit_metric("context_utilisation", utilisation, tags={
        "agent_id": agent_id,
        "chain_id": chain_id,
    })

    if truncation_occurred:
        emit_metric("context_truncation", 1, tags={
            "agent_id": agent_id,
            "chain_id": chain_id,
            "truncated_sections": ",".join(truncated_sections),
        })

        # Log what was truncated for debugging
        log.warning(
            "Context truncation occurred",
            agent_id=agent_id,
            chain_id=chain_id,
            utilisation=utilisation,
            truncated_sections=truncated_sections,
        )

Alert thresholds:

  • Utilisation above 0.95 without truncation: Warning. Close to the limit; future inputs may trigger truncation.
  • Any truncation event: Alert. Investigate what was dropped and whether it was critical.
  • Truncation of system prompt or safety constraints: Critical. Safety instructions were dropped. Block the chain.

Grafana panel specification:

Panel: Context Utilisation
Type: Gauge + Time series overlay
Query: avg(context_utilisation) by (agent_id)
Thresholds:
  - Green: < 0.85
  - Yellow: 0.85 - 0.95
  - Red: >= 0.95

Panel: Context Truncation Events
Type: Stat (count)
Query: sum(rate(context_truncation[5m])) by (agent_id)
Alert: > 0 events in 5 minutes

Metric 3: Tool response freshness

Definition: current_time - response_last_updated for each tool call.

Implementation:

from datetime import datetime, timedelta

def check_tool_freshness(
    response: dict,
    freshness_config: dict,
) -> float:
    """Check tool response freshness. Returns staleness in seconds."""

    # Try common timestamp field names
    timestamp_fields = [
        "last_updated", "timestamp", "as_of",
        "data_timestamp", "generated_at",
    ]

    response_time = None
    for field_name in timestamp_fields:
        value = deep_get(response, field_name)
        if value:
            response_time = parse_timestamp(value)
            break

    if response_time is None:
        # No timestamp found -- emit unknown freshness
        emit_metric("tool_response_freshness_unknown", 1, tags={
            "tool_id": freshness_config["tool_id"],
        })
        return float("inf")

    staleness = (datetime.utcnow() - response_time).total_seconds()

    emit_metric("tool_response_staleness_seconds", staleness, tags={
        "tool_id": freshness_config["tool_id"],
        "agent_id": freshness_config["agent_id"],
    })

    max_staleness = freshness_config.get("max_staleness_seconds", 300)
    if staleness > max_staleness:
        emit_metric("tool_response_stale", 1, tags={
            "tool_id": freshness_config["tool_id"],
            "staleness": staleness,
            "threshold": max_staleness,
        })

    return staleness

Alert thresholds (domain-specific):

  • Market data: Stale after 60 seconds
  • Compliance data: Stale after 3600 seconds (1 hour)
  • Reference data: Stale after 86400 seconds (24 hours)

Configure per tool based on the data's volatility and the decision's criticality.


Metric 4: Cross-agent confidence consistency

This is the chain-level metric that ties everything together. It detects when a downstream agent produces high-confidence output from low-quality upstream inputs.

Definition: For each agent pair (upstream, downstream), compare:

  • Upstream data quality: the minimum completeness ratio and freshness from the upstream receipt
  • Downstream confidence: the confidence score in the downstream agent's output

If downstream confidence significantly exceeds upstream data quality, something is wrong.

Implementation:

def compute_consistency_score(
    upstream_receipt,
    downstream_receipt,
) -> float:
    """Compute cross-agent consistency.
    Returns 1.0 for perfect consistency, 0.0 for worst case."""

    upstream_quality = upstream_receipt.warranted_confidence
    downstream_confidence = downstream_receipt.stated_confidence

    if upstream_quality == 0:
        return 0.0 if downstream_confidence > 0.1 else 1.0

    # Consistency is high when downstream confidence
    # does not exceed upstream quality
    if downstream_confidence <= upstream_quality:
        return 1.0

    # Penalise proportionally to the gap
    gap = downstream_confidence - upstream_quality
    return max(0.0, 1.0 - gap)


def emit_consistency_metrics(chain_id, receipts):
    """Emit consistency metrics for a full chain."""
    for i in range(len(receipts) - 1):
        score = compute_consistency_score(receipts[i], receipts[i + 1])

        emit_metric("cross_agent_consistency", score, tags={
            "chain_id": chain_id,
            "upstream_agent": receipts[i].agent_id,
            "downstream_agent": receipts[i + 1].agent_id,
        })

        if score < 0.5:
            log.warning(
                "Low cross-agent consistency",
                chain_id=chain_id,
                upstream=receipts[i].agent_id,
                downstream=receipts[i + 1].agent_id,
                score=score,
                upstream_quality=receipts[i].warranted_confidence,
                downstream_confidence=receipts[i + 1].stated_confidence,
            )

Alert threshold: Consistency score below 0.5 indicates a significant gap between upstream data quality and downstream confidence. This is the direct signal for Phantom Compliance.


The chain integrity dashboard

Combine the above metrics into a single dashboard:

Chain integrity dashboard: health score, six metric panels, and recent integrity failures

The Chain Health Score is a composite metric:

def chain_health_score(metrics: dict) -> float:
    """Weighted composite of chain integrity metrics."""
    weights = {
        "retrieval_completeness": 0.30,
        "context_utilisation_ok": 0.20,
        "tool_freshness_ok": 0.15,
        "cross_agent_consistency": 0.25,
        "pace_primary_ratio": 0.10,
    }

    score = sum(
        weights[k] * metrics.get(k, 0.0)
        for k in weights
    )
    return round(score, 2)

Testing patterns

Metrics tell you what's happening now. Tests tell you whether your controls will catch the failures you've designed for. You need both.

Canary testing

Inject known-bad inputs on a schedule to verify your controls catch them:

CANARY_TESTS = [
    {
        "name": "truncated_retrieval",
        "description": "Simulate retrieval returning 15% of expected results",
        "setup": lambda: mock_retrieval(actual=47, expected=312),
        "expected_outcome": "chain_blocked",
        "expected_path": "alternate",
        "failure_mode": "truncated_retrieval",
    },
    {
        "name": "stale_tool_response",
        "description": "Simulate tool returning 30-minute-old data",
        "setup": lambda: mock_tool_response(
            staleness=timedelta(minutes=30)
        ),
        "expected_outcome": "chain_warning",
        "expected_path": "primary_with_flag",
        "failure_mode": "stale_tool_response",
    },
    {
        "name": "context_overflow",
        "description": "Simulate context truncation dropping safety constraints",
        "setup": lambda: mock_context(
            utilisation=1.0, truncated_sections=["system_prompt"]
        ),
        "expected_outcome": "chain_blocked",
        "expected_path": "emergency",
        "failure_mode": "context_overflow",
    },
    {
        "name": "confidence_inflation",
        "description": "Upstream quality 0.3, downstream confidence 0.95",
        "setup": lambda: mock_receipts(
            upstream_quality=0.3, downstream_confidence=0.95
        ),
        "expected_outcome": "chain_blocked",
        "expected_path": "contingency",
        "failure_mode": "confidence_inflation",
    },
]

async def run_canary_suite():
    results = []
    for test in CANARY_TESTS:
        with test["setup"]():
            outcome = await execute_chain(canary_input)
            passed = (outcome.status == test["expected_outcome"])
            results.append({
                "test": test["name"],
                "passed": passed,
                "actual_outcome": outcome.status,
                "expected_outcome": test["expected_outcome"],
            })

            emit_metric("canary_test_result", 1 if passed else 0, tags={
                "test_name": test["name"],
                "failure_mode": test["failure_mode"],
            })

    return results

Run the canary suite:

  • On every deployment: Before traffic shifts to the new version
  • Daily: To catch configuration drift and model updates
  • After any data source change: New data sources, schema changes, index rebuilds

Chaos testing for agent chains

Chaos testing goes beyond canaries by introducing failures at random points in the chain to verify resilience:

class AgentChainChaosMonkey:
    """Injects failures into agent chains to test resilience."""

    FAILURE_MODES = [
        "truncate_retrieval",       # Return random subset of results
        "add_latency",              # Delay tool responses
        "stale_cache",              # Serve old data
        "drop_receipt",             # Remove verification receipt
        "inflate_confidence",       # Set confidence to 0.99
        "truncate_context",         # Force context overflow
        "corrupt_boundary",         # Remove required fields
    ]

    def __init__(self, failure_rate=0.05):
        self.failure_rate = failure_rate

    def maybe_inject(self, agent_id, step_type):
        if random.random() > self.failure_rate:
            return None  # No injection

        mode = random.choice(self.FAILURE_MODES)
        log.info(
            "Chaos injection",
            agent_id=agent_id,
            step_type=step_type,
            failure_mode=mode,
        )
        return mode

Run chaos testing in a staging environment with production-like traffic. Track the results:

  • Catch rate: What percentage of injected failures did your controls detect?
  • False negative rate: What percentage of injected failures passed through undetected?
  • Recovery rate: When a failure was caught, did PACE correctly route to the alternate/contingency path?
  • Blast radius: When a failure was not caught, how far did it propagate?

The testing principle: Your controls are only as good as your last test. Canary tests verify that specific known failure modes are caught. Chaos tests verify that your system is resilient to unexpected combinations of failures. You need both, run regularly, with results tracked as evidence.


Chain integrity tests

The highest-value test is the end-to-end chain integrity test. This is the test that would have caught Phantom Compliance.

class ChainIntegrityTest:
    """End-to-end test that verifies chain-level integrity."""

    async def test_complete_data_correct_decision(self):
        """Baseline: complete data should produce correct output."""
        result = await execute_chain(COMPLETE_DATA_INPUT)
        assert result.status == "approved"
        assert result.receipt.integrity_pass is True
        assert result.receipt.warranted_confidence > 0.8

    async def test_incomplete_data_detected(self):
        """Phantom Compliance test: incomplete data must be caught."""
        with mock_retrieval(actual=47, expected=312):
            result = await execute_chain(TRADE_REVIEW_INPUT)

        # The chain must NOT approve
        assert result.status != "approved"
        # The integrity check must have fired
        assert result.receipt.integrity_pass is False
        # PACE must have activated
        assert result.path_used in ["alternate", "contingency", "emergency"]

    async def test_stale_data_detected(self):
        """Data that was valid yesterday must trigger a freshness alert."""
        with mock_tool_response(staleness=timedelta(hours=25)):
            result = await execute_chain(TRADE_REVIEW_INPUT)

        assert result.receipt.data_sources[0].freshness_ok is False

    async def test_confidence_inflation_detected(self):
        """High confidence from low-quality data must be caught."""
        with mock_low_quality_high_confidence():
            result = await execute_chain(TRADE_REVIEW_INPUT)

        assert result.status != "approved"
        consistency = compute_consistency_score(
            result.upstream_receipts[-1],
            result.receipt,
        )
        assert consistency < 0.5  # Inconsistency detected

Run these tests:

  • In CI/CD on every pull request
  • Nightly against production-like data
  • After any model, prompt, or data source change

The evidence pipeline

Everything above produces evidence. To demonstrate that your controls work (to auditors, to regulators, to your own management) you need to collect, store, and report on that evidence systematically.

What to store

Evidence type Retention Storage
Verification receipts (all chains) 90 days Structured data store (e.g., PostgreSQL, BigQuery)
Integrity metric time series 1 year Time series database (e.g., Prometheus, InfluxDB)
Canary test results 1 year CI/CD artefact store + metrics
Chaos test results 1 year Test results database
Chain integrity test results 1 year CI/CD artefact store
PACE path usage logs 90 days Structured data store
Alert history 1 year Alerting platform (PagerDuty, OpsGenie)

The weekly evidence report

Automate a weekly report that answers four questions:

  1. Are the controls deployed? (Deployment evidence)

    • All agents emit verification receipts: YES/NO
    • All boundaries enforce contracts: YES/NO
    • PACE paths configured for all critical chains: YES/NO
  2. Are the controls running? (Activity evidence)

    • Receipts generated this week: N
    • Integrity checks executed: N
    • PACE activations: N (primary: X%, alternate: Y%, contingency: Z%)
  3. Are the controls catching things? (Effectiveness evidence)

    • Integrity failures detected: N
    • Chains blocked: N
    • Chains escalated: N
    • Canary test pass rate: X%
    • Chaos test catch rate: X%
  4. Do the controls cover the threat model? (Coverage evidence)

    • Failure modes tested by canaries: N of 5
    • Data sources with completeness baselines: N of M
    • Agents with context utilisation tracking: N of M
    • Tools with freshness checks: N of M
    • Chain integrity tests passing: N of N

Coverage evidence is the hardest to produce and the most valuable. It tells you not just that your controls work, but that they cover the specific failure modes you've identified.


What Phantom Compliance looks like with instrumentation

To close the loop, here is what the Phantom Compliance scenario looks like with full instrumentation in place:

Before (Module 2): No alerts. Clean logs. 15 days to detection by manual audit.

After (this module):

  • T+0.3s: Agent B retrieval completeness metric emitted: 0.15. Alert fires immediately.
  • T+0.5s: Agent B receipt generated with integrity_pass: false. Boundary check blocks output.
  • T+0.5s: PACE alternate path activates. Forced full retrieval begins.
  • T+2.5s: Alternate path retrieval returns 312 results. Completeness ratio: 1.0.
  • T+3.5s: Agent B re-runs compliance check. Finds restricted security. Returns FLAGGED.
  • T+3.6s: Boundary check passes. Agent C receives FLAGGED result. Trade rejected.
  • T+3.6s: Dashboard shows: 1 PACE alternate activation, 1 integrity failure detected and resolved.

Total time from failure to resolution: 3.6 seconds. Total human intervention required: zero (for this case). Total trades approved with incomplete data: zero.


Reflection

Of the metrics in this module, which one would have the highest immediate impact if added to your production system tomorrow? Which one would be the hardest to implement? Start with the highest-impact, lowest-effort metric and build from there.

Consider

For most teams, retrieval completeness ratio is the highest-impact, lowest-effort metric. It requires: (1) a count query or baseline for each data source, (2) emitting the ratio after each retrieval, (3) an alert threshold. This can be implemented in a single sprint and immediately catches the most dangerous class of silent failures.


Next: Decision Exercise →