5. Instrumentation & Evidence¶
After this module you will be able to¶
- Define and emit the specific metrics needed at each agent boundary to detect reasoning-basis failures
- Design a chain-level integrity dashboard with actionable alerts
- Implement retrieval completeness, context utilisation, and cross-agent consistency metrics
- Build canary tests and chaos tests that catch Phantom Compliance-style failures
- Establish an evidence pipeline that proves your controls are working
The metrics you don't have yet¶
Module 2 showed you what your current observability stack misses. This module gives you the specific metrics to close those gaps, the dashboards to visualise them, and the tests to prove they work.
Every metric in this module maps to a failure mode from Module 1:
| Metric | Failure mode it detects | Where it's emitted |
|---|---|---|
| Retrieval completeness ratio | Truncated retrieval | After every retrieval step |
| Context utilisation percentage | Context overflow | Before every LLM inference |
| Context truncation flag | Context overflow | Before every LLM inference |
| Tool response freshness | Stale tool responses | After every tool call |
| Delegation depth | Delegation loops | At every delegation call |
| Cross-agent confidence consistency | All failure modes | At every inter-agent boundary |
| Drift score (rolling baseline) | Semantic drift | Computed daily over sliding window |
Metric 1: Retrieval completeness ratio¶
This is the single most important metric for detecting Phantom Compliance-style failures.
Definition: actual_results / expected_results for each retrieval step.
How to compute expected_results:
There are three approaches, from simplest to most robust:
-
Static configuration: Manually set expected counts for each data source and query type. Simple but requires maintenance.
EXPECTED_COUNTS = { "restricted_securities": 312, "approved_counterparties": 1847, "concentration_limits": 45, } -
Rolling baseline: Compute the median result count over the last N queries of the same type. Adapts automatically but can drift.
async def get_rolling_baseline(source_id, query_type, window=100): recent_counts = await metrics_store.get_recent_counts( source_id=source_id, query_type=query_type, limit=window, ) return statistics.median(recent_counts) -
Count query: Before (or in parallel with) the retrieval, run a count query against the source to get the exact current count. Most accurate but adds latency.
async def counted_retrieval(query, source_id): # Run count and retrieval in parallel count_task = vector_store.count(query) results_task = vector_store.query(query, limit=MAX_RESULTS) expected, results = await asyncio.gather( count_task, results_task ) ratio = len(results) / expected if expected > 0 else 0 emit_metric("retrieval_completeness_ratio", ratio, tags={ "source_id": source_id, "query": query, "expected": expected, "actual": len(results), }) return results, ratio
Alert thresholds:
- Below 0.9: Warning. Investigate whether the shortfall is expected (sparse data) or unexpected (truncation, source issue).
- Below 0.5: Critical. The agent is seeing less than half the available data. Block or escalate.
- Below 0.2: Emergency. PACE emergency path. This is the Phantom Compliance range.
Grafana panel specification:
Panel: Retrieval Completeness Ratio
Type: Time series
Query: histogram_quantile(0.05, retrieval_completeness_ratio)
Thresholds:
- Green: >= 0.9
- Yellow: 0.5 - 0.9
- Red: < 0.5
Group by: source_id, agent_id
Metric 2: Context utilisation and truncation¶
Definition: tokens_used / tokens_available for each LLM inference step.
Context utilisation itself is not a problem; high utilisation is normal for agents processing large inputs. The danger signal is utilisation at or near 100%, combined with a truncation event.
Implementation:
def emit_context_metrics(
prompt_tokens: int,
max_context_tokens: int,
truncation_occurred: bool,
truncated_sections: list[str],
):
utilisation = prompt_tokens / max_context_tokens
emit_metric("context_utilisation", utilisation, tags={
"agent_id": agent_id,
"chain_id": chain_id,
})
if truncation_occurred:
emit_metric("context_truncation", 1, tags={
"agent_id": agent_id,
"chain_id": chain_id,
"truncated_sections": ",".join(truncated_sections),
})
# Log what was truncated for debugging
log.warning(
"Context truncation occurred",
agent_id=agent_id,
chain_id=chain_id,
utilisation=utilisation,
truncated_sections=truncated_sections,
)
Alert thresholds:
- Utilisation above 0.95 without truncation: Warning. Close to the limit; future inputs may trigger truncation.
- Any truncation event: Alert. Investigate what was dropped and whether it was critical.
- Truncation of system prompt or safety constraints: Critical. Safety instructions were dropped. Block the chain.
Grafana panel specification:
Panel: Context Utilisation
Type: Gauge + Time series overlay
Query: avg(context_utilisation) by (agent_id)
Thresholds:
- Green: < 0.85
- Yellow: 0.85 - 0.95
- Red: >= 0.95
Panel: Context Truncation Events
Type: Stat (count)
Query: sum(rate(context_truncation[5m])) by (agent_id)
Alert: > 0 events in 5 minutes
Metric 3: Tool response freshness¶
Definition: current_time - response_last_updated for each tool call.
Implementation:
from datetime import datetime, timedelta
def check_tool_freshness(
response: dict,
freshness_config: dict,
) -> float:
"""Check tool response freshness. Returns staleness in seconds."""
# Try common timestamp field names
timestamp_fields = [
"last_updated", "timestamp", "as_of",
"data_timestamp", "generated_at",
]
response_time = None
for field_name in timestamp_fields:
value = deep_get(response, field_name)
if value:
response_time = parse_timestamp(value)
break
if response_time is None:
# No timestamp found -- emit unknown freshness
emit_metric("tool_response_freshness_unknown", 1, tags={
"tool_id": freshness_config["tool_id"],
})
return float("inf")
staleness = (datetime.utcnow() - response_time).total_seconds()
emit_metric("tool_response_staleness_seconds", staleness, tags={
"tool_id": freshness_config["tool_id"],
"agent_id": freshness_config["agent_id"],
})
max_staleness = freshness_config.get("max_staleness_seconds", 300)
if staleness > max_staleness:
emit_metric("tool_response_stale", 1, tags={
"tool_id": freshness_config["tool_id"],
"staleness": staleness,
"threshold": max_staleness,
})
return staleness
Alert thresholds (domain-specific):
- Market data: Stale after 60 seconds
- Compliance data: Stale after 3600 seconds (1 hour)
- Reference data: Stale after 86400 seconds (24 hours)
Configure per tool based on the data's volatility and the decision's criticality.
Metric 4: Cross-agent confidence consistency¶
This is the chain-level metric that ties everything together. It detects when a downstream agent produces high-confidence output from low-quality upstream inputs.
Definition: For each agent pair (upstream, downstream), compare:
- Upstream data quality: the minimum completeness ratio and freshness from the upstream receipt
- Downstream confidence: the confidence score in the downstream agent's output
If downstream confidence significantly exceeds upstream data quality, something is wrong.
Implementation:
def compute_consistency_score(
upstream_receipt,
downstream_receipt,
) -> float:
"""Compute cross-agent consistency.
Returns 1.0 for perfect consistency, 0.0 for worst case."""
upstream_quality = upstream_receipt.warranted_confidence
downstream_confidence = downstream_receipt.stated_confidence
if upstream_quality == 0:
return 0.0 if downstream_confidence > 0.1 else 1.0
# Consistency is high when downstream confidence
# does not exceed upstream quality
if downstream_confidence <= upstream_quality:
return 1.0
# Penalise proportionally to the gap
gap = downstream_confidence - upstream_quality
return max(0.0, 1.0 - gap)
def emit_consistency_metrics(chain_id, receipts):
"""Emit consistency metrics for a full chain."""
for i in range(len(receipts) - 1):
score = compute_consistency_score(receipts[i], receipts[i + 1])
emit_metric("cross_agent_consistency", score, tags={
"chain_id": chain_id,
"upstream_agent": receipts[i].agent_id,
"downstream_agent": receipts[i + 1].agent_id,
})
if score < 0.5:
log.warning(
"Low cross-agent consistency",
chain_id=chain_id,
upstream=receipts[i].agent_id,
downstream=receipts[i + 1].agent_id,
score=score,
upstream_quality=receipts[i].warranted_confidence,
downstream_confidence=receipts[i + 1].stated_confidence,
)
Alert threshold: Consistency score below 0.5 indicates a significant gap between upstream data quality and downstream confidence. This is the direct signal for Phantom Compliance.
The chain integrity dashboard¶
Combine the above metrics into a single dashboard:
The Chain Health Score is a composite metric:
def chain_health_score(metrics: dict) -> float:
"""Weighted composite of chain integrity metrics."""
weights = {
"retrieval_completeness": 0.30,
"context_utilisation_ok": 0.20,
"tool_freshness_ok": 0.15,
"cross_agent_consistency": 0.25,
"pace_primary_ratio": 0.10,
}
score = sum(
weights[k] * metrics.get(k, 0.0)
for k in weights
)
return round(score, 2)
Testing patterns¶
Metrics tell you what's happening now. Tests tell you whether your controls will catch the failures you've designed for. You need both.
Canary testing¶
Inject known-bad inputs on a schedule to verify your controls catch them:
CANARY_TESTS = [
{
"name": "truncated_retrieval",
"description": "Simulate retrieval returning 15% of expected results",
"setup": lambda: mock_retrieval(actual=47, expected=312),
"expected_outcome": "chain_blocked",
"expected_path": "alternate",
"failure_mode": "truncated_retrieval",
},
{
"name": "stale_tool_response",
"description": "Simulate tool returning 30-minute-old data",
"setup": lambda: mock_tool_response(
staleness=timedelta(minutes=30)
),
"expected_outcome": "chain_warning",
"expected_path": "primary_with_flag",
"failure_mode": "stale_tool_response",
},
{
"name": "context_overflow",
"description": "Simulate context truncation dropping safety constraints",
"setup": lambda: mock_context(
utilisation=1.0, truncated_sections=["system_prompt"]
),
"expected_outcome": "chain_blocked",
"expected_path": "emergency",
"failure_mode": "context_overflow",
},
{
"name": "confidence_inflation",
"description": "Upstream quality 0.3, downstream confidence 0.95",
"setup": lambda: mock_receipts(
upstream_quality=0.3, downstream_confidence=0.95
),
"expected_outcome": "chain_blocked",
"expected_path": "contingency",
"failure_mode": "confidence_inflation",
},
]
async def run_canary_suite():
results = []
for test in CANARY_TESTS:
with test["setup"]():
outcome = await execute_chain(canary_input)
passed = (outcome.status == test["expected_outcome"])
results.append({
"test": test["name"],
"passed": passed,
"actual_outcome": outcome.status,
"expected_outcome": test["expected_outcome"],
})
emit_metric("canary_test_result", 1 if passed else 0, tags={
"test_name": test["name"],
"failure_mode": test["failure_mode"],
})
return results
Run the canary suite:
- On every deployment: Before traffic shifts to the new version
- Daily: To catch configuration drift and model updates
- After any data source change: New data sources, schema changes, index rebuilds
Chaos testing for agent chains¶
Chaos testing goes beyond canaries by introducing failures at random points in the chain to verify resilience:
class AgentChainChaosMonkey:
"""Injects failures into agent chains to test resilience."""
FAILURE_MODES = [
"truncate_retrieval", # Return random subset of results
"add_latency", # Delay tool responses
"stale_cache", # Serve old data
"drop_receipt", # Remove verification receipt
"inflate_confidence", # Set confidence to 0.99
"truncate_context", # Force context overflow
"corrupt_boundary", # Remove required fields
]
def __init__(self, failure_rate=0.05):
self.failure_rate = failure_rate
def maybe_inject(self, agent_id, step_type):
if random.random() > self.failure_rate:
return None # No injection
mode = random.choice(self.FAILURE_MODES)
log.info(
"Chaos injection",
agent_id=agent_id,
step_type=step_type,
failure_mode=mode,
)
return mode
Run chaos testing in a staging environment with production-like traffic. Track the results:
- Catch rate: What percentage of injected failures did your controls detect?
- False negative rate: What percentage of injected failures passed through undetected?
- Recovery rate: When a failure was caught, did PACE correctly route to the alternate/contingency path?
- Blast radius: When a failure was not caught, how far did it propagate?
The testing principle: Your controls are only as good as your last test. Canary tests verify that specific known failure modes are caught. Chaos tests verify that your system is resilient to unexpected combinations of failures. You need both, run regularly, with results tracked as evidence.
Chain integrity tests¶
The highest-value test is the end-to-end chain integrity test. This is the test that would have caught Phantom Compliance.
class ChainIntegrityTest:
"""End-to-end test that verifies chain-level integrity."""
async def test_complete_data_correct_decision(self):
"""Baseline: complete data should produce correct output."""
result = await execute_chain(COMPLETE_DATA_INPUT)
assert result.status == "approved"
assert result.receipt.integrity_pass is True
assert result.receipt.warranted_confidence > 0.8
async def test_incomplete_data_detected(self):
"""Phantom Compliance test: incomplete data must be caught."""
with mock_retrieval(actual=47, expected=312):
result = await execute_chain(TRADE_REVIEW_INPUT)
# The chain must NOT approve
assert result.status != "approved"
# The integrity check must have fired
assert result.receipt.integrity_pass is False
# PACE must have activated
assert result.path_used in ["alternate", "contingency", "emergency"]
async def test_stale_data_detected(self):
"""Data that was valid yesterday must trigger a freshness alert."""
with mock_tool_response(staleness=timedelta(hours=25)):
result = await execute_chain(TRADE_REVIEW_INPUT)
assert result.receipt.data_sources[0].freshness_ok is False
async def test_confidence_inflation_detected(self):
"""High confidence from low-quality data must be caught."""
with mock_low_quality_high_confidence():
result = await execute_chain(TRADE_REVIEW_INPUT)
assert result.status != "approved"
consistency = compute_consistency_score(
result.upstream_receipts[-1],
result.receipt,
)
assert consistency < 0.5 # Inconsistency detected
Run these tests:
- In CI/CD on every pull request
- Nightly against production-like data
- After any model, prompt, or data source change
The evidence pipeline¶
Everything above produces evidence. To demonstrate that your controls work (to auditors, to regulators, to your own management) you need to collect, store, and report on that evidence systematically.
What to store¶
| Evidence type | Retention | Storage |
|---|---|---|
| Verification receipts (all chains) | 90 days | Structured data store (e.g., PostgreSQL, BigQuery) |
| Integrity metric time series | 1 year | Time series database (e.g., Prometheus, InfluxDB) |
| Canary test results | 1 year | CI/CD artefact store + metrics |
| Chaos test results | 1 year | Test results database |
| Chain integrity test results | 1 year | CI/CD artefact store |
| PACE path usage logs | 90 days | Structured data store |
| Alert history | 1 year | Alerting platform (PagerDuty, OpsGenie) |
The weekly evidence report¶
Automate a weekly report that answers four questions:
-
Are the controls deployed? (Deployment evidence)
- All agents emit verification receipts: YES/NO
- All boundaries enforce contracts: YES/NO
- PACE paths configured for all critical chains: YES/NO
-
Are the controls running? (Activity evidence)
- Receipts generated this week: N
- Integrity checks executed: N
- PACE activations: N (primary: X%, alternate: Y%, contingency: Z%)
-
Are the controls catching things? (Effectiveness evidence)
- Integrity failures detected: N
- Chains blocked: N
- Chains escalated: N
- Canary test pass rate: X%
- Chaos test catch rate: X%
-
Do the controls cover the threat model? (Coverage evidence)
- Failure modes tested by canaries: N of 5
- Data sources with completeness baselines: N of M
- Agents with context utilisation tracking: N of M
- Tools with freshness checks: N of M
- Chain integrity tests passing: N of N
Coverage evidence is the hardest to produce and the most valuable. It tells you not just that your controls work, but that they cover the specific failure modes you've identified.
What Phantom Compliance looks like with instrumentation¶
To close the loop, here is what the Phantom Compliance scenario looks like with full instrumentation in place:
Before (Module 2): No alerts. Clean logs. 15 days to detection by manual audit.
After (this module):
- T+0.3s: Agent B retrieval completeness metric emitted: 0.15. Alert fires immediately.
- T+0.5s: Agent B receipt generated with
integrity_pass: false. Boundary check blocks output. - T+0.5s: PACE alternate path activates. Forced full retrieval begins.
- T+2.5s: Alternate path retrieval returns 312 results. Completeness ratio: 1.0.
- T+3.5s: Agent B re-runs compliance check. Finds restricted security. Returns FLAGGED.
- T+3.6s: Boundary check passes. Agent C receives FLAGGED result. Trade rejected.
- T+3.6s: Dashboard shows: 1 PACE alternate activation, 1 integrity failure detected and resolved.
Total time from failure to resolution: 3.6 seconds. Total human intervention required: zero (for this case). Total trades approved with incomplete data: zero.
Reflection
Of the metrics in this module, which one would have the highest immediate impact if added to your production system tomorrow? Which one would be the hardest to implement? Start with the highest-impact, lowest-effort metric and build from there.
Consider
For most teams, retrieval completeness ratio is the highest-impact, lowest-effort metric. It requires: (1) a count query or baseline for each data source, (2) emitting the ratio after each retrieval, (3) an alert threshold. This can be implemented in a single sprint and immediately catches the most dangerous class of silent failures.